<source id="wfbuo"><optgroup id="wfbuo"></optgroup></source>
<rt id="wfbuo"></rt><strong id="wfbuo"></strong>
<rt id="wfbuo"><optgroup id="wfbuo"></optgroup></rt>
<rt id="wfbuo"></rt>

  • <strong id="wfbuo"></strong>

    <menuitem id="wfbuo"></menuitem>
  • <strong id="wfbuo"></strong>
  • 當前位置 博文首頁 > 等你歸去來:java線程池趣味事:這不是線程池

      等你歸去來:java線程池趣味事:這不是線程池

      作者:等你歸去來 時間:2021-02-20 16:33

        要想寫出高性能高并發的應用,自然有許多關鍵,如io,算法,異步,語言特性,操作系統特性,隊列,內存,cpu,分布式,網絡,數據結構,高性能組件。

        胡說一通先。

        回到主題,線程池。如果說多線程是提高系統并發能力利器之一,那么線程池就是讓這個利器更容易控制的一種工具。如果我們自己純粹使用多線程基礎特性編寫,那么,必然需要相當老道的經驗,才能夠駕馭復雜的環境。而線程池則不需要,你只需知道如何使用,即可輕松掌控多線程,安全地為你服務。

       

      1. 常見線程池的應用樣例

        線程池,不說本身很簡單,但應用一定是簡單的。

        線程池有許多的實現,但我們只說 ThreadPoolExecutor 版本,因其應用最廣泛,別無其他。當然了,還有一個定時調度線程池 ScheduledThreadPoolExecutor 另說,因其需求場景不同,無法比較。

        下面,我就幾個應用級別,說明下我們如何快速使用線程池。(走走過場而已,無關其他)

       

      1.1. 初級線程池

        初級版本的使用線程池,只需要借助一個工具類即可: Executors . 它提供了許多靜態方法,你只需隨便選一個就可以使用線程池了。比如:

      // 創建固定數量的線程池
      Executors.newFixedThreadPool(8);
      // 創建無限動態創建的線程池
      Executors.newCachedThreadPool();
      // 創建定時調度線程池
      Executors.newScheduledThreadPool(2);
      // 還有個創建單線程的就不說了,都一樣

        使用上面這些方法創建好的線程池,直接調用其 execute() 或者 submit() 方法,就可以實現多線程編程了。沒毛!

       

      1.2. 中級線程池

        我這里所說的中級,實際就是不使用以上超級簡單方式使用線程池的方式。即你已經知道了 ThreadPoolExecutor 這個東東了。這不管你的出發點是啥!

      // 自定義各線程參數
      ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(4, 20, 20, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());

        具體參數解釋就不說了,咱們不掃盲?傊,使用這玩意兒,說明你已經開始有點門道了。

       

      1.3. 高級線程池

        實際上,這個版本就沒法具體說如何做了。

        但它可能是,你知道你的線程池應用場景的,你清楚你的硬件運行環境的,你會使用線程池命名的,你會定義你的隊列大小的,你會考慮上下文切換的,你會考慮線程安全的,你會考慮鎖性能的,你可能會自己造個輪子的。。。

       

      2. 這不是線程池

        我們通常理解的線程池,就是能夠同時跑多個任務的地方。但有時候線程池不一像線程池,而像一個單線程。來看一個具體的簡單的線程池的使用場景:

          // 初始化線程池
          private ExecutorService executor
                  = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(),
                      Runtime.getRuntime().availableProcessors(),
                      0L, TimeUnit.SECONDS,
                      new ArrayBlockingQueue<>(50),
                      new NamedThreadFactory("test-pool"),
                      new ThreadPoolExecutor.CallerRunsPolicy());
          // 使用線程池處理任務
          public Integer doTask(String updateIntervalDesc) throws Exception {
              long startTime = System.currentTimeMillis();
              List<TestDto> testList;
              AtomicInteger affectNum = new AtomicInteger(0);
              int pageSize = 1000;
              AtomicInteger pageNo = new AtomicInteger(1);
              Map<String, Object> condGroupLabel = new HashMap<>();
              log.info("start do sth:{}", updateIntervalDesc);
              List<Future<?>> futureList = new ArrayList<>();
              do {
                  PageHelper.startPage(pageNo.getAndIncrement(), pageSize);
                  List<TestDto> list
                          = testDao.getLabelListNew(condGroupLabel);
                  testList = list;
                  // 循環向線程池中提交任務
                  for (TestDto s : list) {
                      Future<?> future = executor.submit(() -> {
                          try {
                              // do sth...
                              affectNum.incrementAndGet();
                          }
                          catch (Throwable e) {
                              log.error("error:{}", pageNo.get(), e);
                          }
                      });
                      futureList.add(future);
                  }
              } while (testList.size() >= pageSize);
              // 等待任務完成
              int i = 0;
              for (Future<?> future : futureList) {
                  future.get();
                  log.info("done:+{} ", i++);
              }
              log.info("doTask done:{}, num:{}, cost:{}ms",
                      updateIntervalDesc, affectNum.get(), System.currentTimeMillis() - startTime);
              return affectNum.get();
          }

        主要業務就是,從數據庫中取出許多任務,放入線程池中運行。因為任務又涉及到db等的io操作,所以使用多線程處理,非常合理。

        然而,有一種情況的出現,也許會打破這個平衡:那就是當單個任務能夠快速執行完成時,而且快到剛上一任務提交完成,還沒等下一次提交時,就任務就已被執行完成。這時,你就可能會看到一個神奇的現象,即一直只有一個線程在運行任務。這不是線程池該干的事,更像是單線程任務在跑。

        然后,我們可能開始懷疑:某個線程被阻塞了?線程調度不公平了?隊列選擇不正確了?觸發jdk bug了?線程池未完全利用的線程了?等等。。。

        然而結果并非如此,糾其原因只是當我們向線程池提交任務時,實際上只是向線程池的隊列中添加了任務。即上面顯示的 ArrayBlockingQueue 添加了任務,而線程池中的各worker負責從隊列中獲取任務進行執行。而當任務數很少時,自然只有一部分worker會處理執行中了。至于為什么一直是同一個線程在執行,則可能是由于jvm的調度機制導致。事實上,是受制于 ArrayBlockingQueue.poll() 的公平性。而這個poll()的實現原理,則是由 wait/notify 機制的公平性決定的。

       

        如下,是線程池的worker工作原理:

          // java.util.concurrent.ThreadPoolExecutor#runWorker
          /**
           * Main worker run loop.  Repeatedly gets tasks from queue and
           * executes them, while coping with a number of issues:
           *
           * 1. We may start out with an initial task, in which case we
           * don't need to get the first one. Otherwise, as long as pool is
           * running, we get tasks from getTask. If it returns null then the
           * worker exits due to changed pool state or configuration
           * parameters.  Other exits result from exception throws in
           * external code, in which case completedAbruptly holds, which
           * usually leads processWorkerExit to replace this thread.
           *
           * 2. Before running any task, the lock is acquired to prevent
           * other pool interrupts while the task is executing, and then we
           * ensure that unless pool is stopping, this thread does not have
           * its interrupt set.
           *
           * 3. Each task run is preceded by a call to beforeExecute, which
           * might throw an exception, in which case we cause thread to die
           * (breaking loop with completedAbruptly true) without processing
           * the task.
           *
           * 4. Assuming beforeExecute completes normally, we run the task,
           * gathering any of its thrown exceptions to send to afterExecute.
           * We separately handle RuntimeException, Error (both of which the
           * specs guarantee that we trap) and arbitrary Throwables.
           * Because we cannot rethrow Throwables within Runnable.run, we
           * wrap them within Errors on the way out (to the thread's
           * UncaughtExceptionHandler).  Any thrown exception also
           * conservatively causes thread to die.
           *
           * 5. After task.run completes, we call afterExecute, which may
           * also throw an exception, which will also cause thread to
           * die. According to JLS Sec 14.20, this exception is the one that
           * will be in effect even if task.run throws.
           *
           * The net effect of the exception mechanics is that afterExecute
           * and the thread's UncaughtExceptionHandler have as accurate
           * information as we can provide about any problems encountered by
           * user code.
           *
           * @param w the worker
           */
          final void runWorker(Worker w) {
              Thread wt = Thread.currentThread();
              Runnable task = w.firstTask;
              w.firstTask = null;
              w.unlock(); // allow interrupts
              boolean completedAbruptly = true;
              try {
                  // worker 不停地向隊列中獲取任務,然后執行
                  // 其中獲取任務的過程,可能被中斷,也可能不會,受到線程池伸縮配置的影響
                  while (task != null || (task = getTask()) != null) {
                      w.lock();
                      // If pool is stopping, ensure thread is interrupted;
                      // if not, ensure thread is not interrupted.  This
                      // requires a recheck in second case to deal with
                      // shutdownNow race while clearing interrupt
                      if ((runStateAtLeast(ctl.get(), STOP) ||
                           (Thread.interrupted() &&
                            runStateAtLeast(ctl.get(), STOP))) &&
                          !wt.isInterrupted())
                          wt.interrupt();
                      try {
                          beforeExecute(wt, task);
                          Throwable thrown = null;
                          try {
                              task.run();
                          } catch (RuntimeException x) {
                              thrown = x; throw x;
                          } catch (Error x) {
                              thrown = x; throw x;
                          } catch (Throwable x) {
                              thrown = x; throw new Error(x);
                          } finally {
                              afterExecute(task, thrown);
                          }
                      } finally {
                          task = null;
                          w.completedTasks++;
                          w.unlock();
                      }
                  }
                  completedAbruptly = false;
              } finally {
                  processWorkerExit(w, completedAbruptly);
              }
          }
          /**
           * Performs blocking or timed wait for a task, depending on
           * current configuration settings, or returns null if this worker
           * must exit because of any of:
           * 1. There are more than maximumPoolSize workers (due to
           *    a call to setMaximumPoolSize).
           * 2. The pool is stopped.
           * 3. The pool is shutdown and the queue is empty.
           * 4. This worker timed out waiting for a task, and timed-out
           *    workers are subject to termination (that is,
           *    {@code allowCoreThreadTimeOut || workerCount > corePoolSize})
           *    both before and after the timed wait, and if the queue is
           *    non-empty, this worker is not the last thread in the pool.
           *
           * @return task, or null if the worker must exit, in which case
           *         workerCount is decremented
           */
          private Runnable getTask() {
              boolean timedOut = false; // Did the last poll() time out?
      
              for (;;) {
                  int c = ctl.get();
                  int rs = runStateOf(c);
      
                  // Check if queue empty only if necessary.
                  if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                      decrementWorkerCount();
                      return null;
                  }
      
                  int wc = workerCountOf(c);
      
                  // Are workers subject to culling?
                  boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
      
                  if ((wc > maximumPoolSize || (timed && timedOut))
                      && (wc > 1 || workQueue.isEmpty())) {
                      if (compareAndDecrementWorkerCount(c))
                          return null;
                      continue;
                  }
      
                  try {
                      // 可能調用超時方法,也可能調用阻塞方法
                      // 固定線程池的情況下,調用阻塞 take() 方法
                      Runnable r = timed ?
                          workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                          workQueue.take();
                      if (r != null)
                          return r;
                      timedOut = true;
                  } catch (InterruptedException retry) {
                      timedOut = false;
                  }
              }
          }

        即線程池worker持續向隊列獲取任務,執行即可。而隊列任務的獲取,則由兩個讀寫鎖決定:

          // java.util.concurrent.ArrayBlockingQueue#take
          public E take() throws InterruptedException {
              final ReentrantLock lock = this.lock;
              // 此處鎖,保證執行線程安全性
              lock.lockInterruptibly();
              try {
                  while (count == 0)
                      // 此處釋放鎖等待,再次喚醒時,要求必須重新持有鎖
                      notEmpty.await();
                  return dequeue();
              } finally {
                  lock.unlock();
              }
          }
          // 
          /**
           * Inserts the specified element at the tail of this queue, waiting
           * for space to become available if the queue is full.
           *
           * @throws InterruptedException {@inheritDoc}
           * @throws NullPointerException {@inheritDoc}
           */
          public void put(E e) throws InterruptedException {
              checkNotNull(e);
              final ReentrantLock lock = this.lock;
              lock.lockInterruptibly();
              try {
                  while (count == items.length)
                      notFull.await();
                  enqueue(e);
              } finally {
                  lock.unlock();
              }
          }
          /**
           * Inserts element at current put position, advances, and signals.
           * Call only when holding lock.
           */
          private void enqueue(E x) {
              // assert lock.getHoldCount() == 1;
              // assert items[putIndex] == null;
              final Object[] items = this.items;
              items[putIndex] = x;
              if (++putIndex == items.length)
                  putIndex = 0;
              count++;
              // 通知取等線程,喚醒
              notEmpty.signal();
          }

        所以,具體誰取到任務,就是要看誰搶到了鎖。而這,可能又涉及到jvm的高效調度策略啥的了吧。(雖然不確定,但感覺像) 至少,任務運行的表象是,所有任務被某個線程一直搶到。

       

      3. 回歸線程池

        線程池的目的,在于處理一些異步的任務,或者并發的執行多個無關聯的任務。在于讓系統減負。而當任務的提交消耗,大于了任務的執行消耗,那就沒必要使用多線程了,或者說這是錯誤的用法了。我們應該線程池做更重的活,而不是輕量級的。如上問題,執行性能必然很差。但我們稍做轉變,也許就不一樣了。

          // 初始化線程池
          private ExecutorService executor
                  = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(),
                      Runtime.getRuntime().availableProcessors(),
                      0L, TimeUnit.SECONDS,
                      new ArrayBlockingQueue<>(50),
                      new NamedThreadFactory("test-pool"),
                      new ThreadPoolExecutor.CallerRunsPolicy());
          // 使用線程池處理任務
          public Integer doTask(String updateIntervalDesc) throws Exception {
              long startTime = System.currentTimeMillis();
              List<TestDto> testList;
              AtomicInteger affectNum = new AtomicInteger(0);
              int pageSize = 1000;
              AtomicInteger pageNo = new AtomicInteger(1);
              Map<String, Object> condGroupLabel = new HashMap<>();
              log.info("start do sth:{}", updateIntervalDesc);
              List<Future<?>> futureList = new ArrayList<>();
              do {
                  PageHelper.startPage(pageNo.getAndIncrement(), pageSize);
                  List<TestDto> list
                          = testDao.getLabelListNew(condGroupLabel);
                  testList = list;
                  // 一批任務只向線程池中提交任務
                  Future<?> future = executor.submit(() -> {
                      for (TestDto s : list) {
                          try {
                              // do sth...
                              affectNum.incrementAndGet();
                          }
                          catch (Throwable e) {
                              log.error("error:{}", pageNo.get(), e);
                          }
                      }
                  });
                  futureList.add(future);
              } while (testList.size() >= pageSize);
              //
      
      下一篇:沒有了
    推土機2019:ImageCombiner - Java服務端圖片合成工具,好用! 萊布尼茨:【從零開始擼一個App】Fragment和導航中的使用 等不到的口琴:億級流量架構之資源隔離思路與方法 TOP生物信息:一文學會常規轉錄組分析 Twittytop:我的2020之路 我愛睡蓮:keepalived-1.3.5+MHA部署mysql集群 程序員養成日記:mysql一張表到底能存多少數據? 等你歸去來:java線程池趣味事:這不是線程池 Linyiwei:C++算法代碼――Tuna Linux中執行shell腳本的4種方法總結 關于ios配置微信config出現驗簽失敗的問題解決 ASP.NET Core擴展庫之Http通用擴展庫的使用詳解 Java在Excel中添加水印的實現(單一水印、平鋪水印) 基于UDP協議實現聊天系統 R語言中ifelse、which、%in%的用法詳解 constant,Java中定義常量(Constant)的幾種方法 float,CSS Float(浮動),CSS Float(浮動)用法 fixed,JavaScript fixed() 方法,JavaScript fixed() 實例 sprintf,sprintf 函數用法詳細注解 html代碼 php網站,php網站搭建步驟,PHP環境搭建教程 調試js,JS調試,js調試工具 xml是什么.xml格式文件,xml怎么用 jsswitch語句,JavaScript Switch 語句,JavaScript Switch用法 html5 教程,什么是 HTML5,HTML5都有什么功能 css,通過JS修改CSS樣式 網站建設哪,什么是網站建設?網站建設的常見要素有哪些? html網站,前端html網站的發布過程 document.cookie,使用document對象操作cookie javascript 數組,JavaScript數組去掉重復數據總結
    欧美日韩免费无码