热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

开发笔记:多线程高并发编程Fork/Join源码分析

本文由编程笔记#小编为大家整理,主要介绍了多线程高并发编程--Fork/Join源码分析相关的知识,希望对你有一定的参考价值。一.概念Fork/J
本文由编程笔记#小编为大家整理,主要介绍了多线程高并发编程 -- Fork/Join源码分析相关的知识,希望对你有一定的参考价值。

一.概念

  Fork/Join就是将一个大任务分解(fork)成许多个独立的小任务,然后多线程并行去处理这些小任务,每个小任务处理完得到结果再进行合并(join)得到最终的结果。

技术图片

  流程:任务继承RecursiveTask,重写compute方法,使用ForkJoinPool的submit提交任务,任务在某个线程中运行,工作任务中的compute方法的代码开始对任务进行分析,如果符合条件就进行任务拆分,拆分成多个子任务,每个子任务进行数据的计算或操作,得到结果返回给上一层任务开启线程进行合并,最终通过get获取整体处理结果。【只能将任务1个切分为两个,不能切分为3个或其他数量

技术图片


  • ForkJoinTask:代表fork/join里面的任务类型,一般用它的两个子类RecursiveTask(任务有返回值)和RecursiveAction(任务没有返回值),任务的处理逻辑包括任务的切分都是在重写compute方法里面进行处理。只有ForkJoinTask任务可以被拆分运行和合并运行。【可查看上篇Future源码分析的类图结构】【ForkJoinTask使用了模板模式进行设计,将ForkJoinTask的执行相关代码进行隐藏,通过提供抽象类(即子类RecursiveTask、RecursiveAction)暴露用户的实际业务处理。】

    • RecursiveTask:在进行exec之后会使用一个result的变量进行接受返回的结果;

      public abstract class RecursiveTask extends ForkJoinTask {
      V result;
      protected abstract V compute();
      public final V getRawResult() {
      return result;
      }
      protected final void setRawResult(V value) {
      result
      = value;
      }
      protected final boolean exec() {
      result
      = compute();
      return true;
      }
      }



    • RecursiveAction:在进行exec之后没有返回结果;

      public abstract class RecursiveAction extends ForkJoinTask {
      protected abstract void compute();
      public final Void getRawResult() { return null; }
      protected final void setRawResult(Void mustBeNull) { }
      protected final boolean exec() {
      compute();
      return true;
      }
      }
       





  • ForkJoinPool:fork/join框架的管理者,最原始的任务都要交给它来处理。它负责控制整个fork/join有多少个工作线程,工作线程的创建、机会都是由它来控制。它还负责workQueue队列的创建和分配,每当创建一个工作线程,它负责分配对应的workQueue,然后它把接到的活都交给工作线程去处理。是整个fork/join的容器。

    • ForkJoinPool.WorkQueue:双端队列,负责存储接收的任务;



  • ForkJoinWorkerThread:fork/join里面真正干活的”工人“,它继承了Thread,所以本质是一个线程。它有一个ForkJoinPool.WorkQueue的队列存放着它要干的活,接活之前它要向ForkJoinPool注册(registerWorker),拿到相应的workQueue,然后就从workQueue里面拿任务出来处理。它是依附于ForkJoinPool而存活,如果ForkJoinPool销毁了,它也会跟着结束。【每一个ForkJoinWorkerThread线程都具有一个独立的任务等待队列workQueue。】

    • 当使用ForkJoinPool进行submit任务提交时,创建1个workQueue将任务放进去,然后进行fork任务切分,如果切分后的任务放的进去之前的workQueue就放进去,不行就随机选取workQueue放进去,如果还放不了就创建一个新的workQueue放进去;技术图片




public class ForkJoinWorkerThread extends Thread {
final ForkJoinPool pool;
final ForkJoinPool.WorkQueue workQueue;
protected ForkJoinWorkerThread(ForkJoinPool pool) {
super("aForkJoinWorkerThread");
this.pool = pool;
this.workQueue = pool.registerWorker(this);
}
}


二.用法

  以前1+2+3+...+100这样的处理可以用for循环处理,现在使用fork/join来处理:从下面结果可以看到,大任务被不断的拆分成小任务,然后添加到工作线程的队列中,每个小任务都会被工作线程从队列中取出进行运行,然后每个小任务的结果的合并也由工作线程执行,然后不断的汇总成最终结果。【task通过ForkJoinPool来执行,分割的子任务添加到当前工作线程的队列中,进入队列的头部,当一个工作线程中没有任务时,会从其他工作线程的队列尾部获取一个任务。(工作窃取:当前工作线程对应的队列中没有任务了,从其他工作线程对应的队列中取出任务进行操作,然后将操作结果返还给对应队列的线程。)】

public class MyFrokJoinTask extends RecursiveTask {
private int begin;
private int end;
public MyFrokJoinTask(int begin, int end) {
this.begin = begin;
this.end = end;
}
public static void main(String[] args) throws Exception {
ForkJoinPool pool
= new ForkJoinPool();
ForkJoinTask
result = pool.submit(new MyFrokJoinTask(1, 100));//提交任务
System.out.println("计算的值:"+result.get());//得到最终的结果

}
@Override
protected Integer compute() {
int sum = 0;
if (end - begin <= 2) {
for (int i = begin; i <= end; i++) {
sum
+= i;
System.out.println(
"i:"+i);
}
}
else {
MyFrokJoinTask d1
= new MyFrokJoinTask(begin, (begin + end) / 2);
MyFrokJoinTask d2
= new MyFrokJoinTask((begin + end) / 2+1, end);
d1.fork();
//任务拆分
d2.fork();//任务拆分
Integer a = d1.join();//每个任务的结果
Integer b = d2.join();//每个任务的结果
sum = a + b;//汇总任务结果
System.out.println("sum:" + sum + ",a:" + a + ",b:" + b);
}
System.out.println(
"name:"+Thread.currentThread().getName());
return sum;
}
}
//=========结果============
i:1
i:
2
name:ForkJoinPool
-1-worker-1
i:
3
i:
4
name:ForkJoinPool
-1-worker-1
sum:
10,a:3,b:7
name:ForkJoinPool
-1-worker-1
i:
5
i:
6
i:
7
name:ForkJoinPool
-1-worker-1
sum:
28,a:10,b:18
name:ForkJoinPool
-1-worker-1
...............
...............
sum:
91,a:28,b:63
sum:
99,a:45,b:54
name:ForkJoinPool
-1-worker-3
name:ForkJoinPool
-1-worker-1
i:
23
i:
24
i:
25
name:ForkJoinPool
-1-worker-2
sum:
135,a:63,b:72
name:ForkJoinPool
-1-worker-2
sum:
234,a:99,b:135
name:ForkJoinPool
-1-worker-3
sum:
325,a:91,b:234
name:ForkJoinPool
-1-worker-1
sum:
1275,a:325,b:950
name:ForkJoinPool
-1-worker-1
sum:
5050,a:1275,b:3775
name:ForkJoinPool
-1-worker-1
计算的值:
5050


三.分析


  ForkJoinPool


ForkJoinPool forkJoinPool = new ForkJoinPool();
//Runtime.getRuntime().availableProcessors()当前操作系统可以使用的CPU内核数量
public ForkJoinPool() {
this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()),
defaultForkJoinWorkerThreadFactory,
null, false);
}
//this调用到下面这段代码
public ForkJoinPool(int parallelism,
ForkJoinWorkerThreadFactory factory,
UncaughtExceptionHandler handler,
boolean asyncMode) {
this(checkParallelism(parallelism), //并行度
checkFactory(factory), //工作线程创建工厂
handler, //异常处理handler
asyncMode ? FIFO_QUEUE : LIFO_QUEUE, //任务队列出队模式 异步:先进先出,同步:后进先出
"ForkJoinPool-" + nextPoolId() + "-worker-");
checkPermission();
}
//上面的this最终调用到下面这段代码
private ForkJoinPool(int parallelism,
ForkJoinWorkerThreadFactory factory,
UncaughtExceptionHandler handler,
int mode,
String workerNamePrefix) {
this.workerNamePrefix = workerNamePrefix;
this.factory = factory;
this.ueh = handler;
this.cOnfig= (parallelism & SMASK) | mode;
long np = (long)(-parallelism); // offset ctl counts
this.ctl = ((np < TC_MASK);
}



  • parallelism:可并行数量,fork/join框架将依据这个并行数量的设定,决定框架内并行执行的线程数量。并行的每一个任务都会有一个线程进行处理;

  • factory当fork/join创建一个新的线程时,同样会用到线程创建工厂。它实现了ForkJoinWorkerThreadFactory接口,使用默认的的接口实现类DefaultForkJoinWorkerThreadFactory来实现newThread方法创建一个新的工作线程;

    public static interface ForkJoinWorkerThreadFactory {
    /**
    * Returns a new worker thread operating in the given pool.
    */
    public ForkJoinWorkerThread newThread(ForkJoinPool pool);
    }
    static final class DefaultForkJoinWorkerThreadFactory
    implements ForkJoinWorkerThreadFactory {
    public final ForkJoinWorkerThread newThread(ForkJoinPool pool) {
    return new ForkJoinWorkerThread(pool);
    }
    }



  • handler:异常捕获处理器。当执行的任务出现异常,并从任务中被抛出时,就会被handler捕获;

  • asyncMode:fork/join为每一个独立的工作线程准备了对应的待执行任务队列,这个任务队列是使用数组进行组合的双向队列。即可以使用先进先出的工作模式,也可以使用后进先出的工作模式;技术图片


   Fork()和Join()

  fork/join框架中提供的fork()和join()是最重要的两个方法,它们和parallelism(”可并行任务数量“)配合工作,可以导致拆分的子任务T1.1、T1.2甚至TX在fork/join中不同的运行效果(上面1+2....+100的每次运行的子任务都是不同的)。即TX子任务或等待其他已存在的线程运行关联的子任务(sum操作),或在运行TX的线程中”递归“执行其他任务(将1-50进行拆分后的子任务递归运行),或启动一个新的线程执行子任务(运行1-50另一边拆分的任务,即50-100的子任务)。

  fork()用于将新创建的子任务放入当前线程的workQueue队列中,fork/join框架将根据当前正在并发执行ForkJoinTask任务的ForkJoinWorkerThread线程状态,决定是让这个任务在队列中等待,还是创建一个新的ForkJoinWorkedThread线程运行它,又或者是唤起其他正在等待任务的ForkJoinWorkerThread线程运行它。

  join()用于让当前线程阻塞,直到对应的子任务完成运行并返回执行结果。或者,如果这个子任务存在于当前线程的任务等待队列workQueue中,则取出这个子任务进行”递归“执行,其目的是尽快得到当前子任务的运行结果,然后继续执行。

  提交任务:




  1.  sumbit的第一次提交:ForkJoinPool.submit(ForkJoinTask task) -> externalPush(task) -> externalSubmit(task)



    1. submit:

      public ForkJoinTask submit(ForkJoinTask task) {
      if (task == null)
      throw new NullPointerException();
      externalPush(task);
      return task;
      }
      public ForkJoinTask submit(Callable task) {
      ForkJoinTask
      job = new ForkJoinTask.AdaptedCallable(task);
      externalPush(job);
      return job;
      }
      public ForkJoinTask submit(Runnable task, T result) {
      ForkJoinTask
      job = new ForkJoinTask.AdaptedRunnable(task, result);
      externalPush(job);
      return job;
      }
      public ForkJoinTask submit(Runnable task) {
      if (task == null)
      throw new NullPointerException();
      ForkJoinTask
      job;
      if (task instanceof ForkJoinTask) // avoid re-wrap
      job = (ForkJoinTask) task;
      else
      job
      = new ForkJoinTask.AdaptedRunnableAction(task);
      externalPush(job);
      return job;
      }



    2. externalPush:将任务添加到随机选取的队列中或新创建的队列中;

      final void externalPush(ForkJoinTask task) {
      WorkQueue[] ws; WorkQueue q;
      int m;
      int r = ThreadLocalRandom.getProbe();//当前线程的一个随机数
      int rs = runState;//当前容器的状态
      //如果随机选取的队列还有空位置可以存放、队列加锁锁定成功,任务就放入队列中
      if ((ws = workQueues) != null && (m = (ws.length - 1)) >= 0 &&
      (q
      = ws[m & r & SQMASK]) != null && r != 0 && rs > 0 &&
      U.compareAndSwapInt(q, QLOCK,
      0, 1)) {
      ForkJoinTask
      [] a; int am, n, s;
      if ((a = q.array) != null &&
      (am
      = a.length - 1) > (n = (s = q.top) - q.base)) {
      int j = ((am & s) < ABASE;
      U.putOrderedObject(a, j, task);//任务加入队列中
      U.putOrderedInt(q, QTOP, s + 1);//挪动下次任务存放的槽的位置
      U.putIntVolatile(q, QLOCK, 0);//队列解锁
      if (n <= 1)//当前数组元素少时,进行唤醒当前线程;或者当没有活动线程或线程数较少时,添加新的线程
      signalWork(ws, q);
      return;
      }
      U.compareAndSwapInt(q, QLOCK,
      1, 0);//队列解锁
      }
      externalSubmit(task);
      //升级版的externalPush
      }
      volatile int runState; // lockable status锁定状态
      // runState: SHUTDOWN为负数,其他的为2的次幂
      private static final int RSLOCK = 1;
      private static final int RSIGNAL = 1 <<1;//唤醒
      private static final int STARTED = 1 <<2;//启动
      private static final int STOP = 1 <<29;//停止
      private static final int TERMINATED = 1 <<30;//结束
      private static final int SHUTDOWN = 1 <<31;//关闭



    3. externalSubmit:队列添加任务失败,进行升级版操作,即创建队列数组和创建队列后,将任务放入新创建的队列中;

      private void externalSubmit(ForkJoinTask task) {
      int r; // initialize caller‘s probe
      if ((r = ThreadLocalRandom.getProbe()) == 0) {
      ThreadLocalRandom.localInit();
      r
      = ThreadLocalRandom.getProbe();
      }
      for (;;) {//自旋
      WorkQueue[] ws; WorkQueue q; int rs, m, k;
      boolean move = false;
      /**
      *ForkJoinPool执行器停止工作了,抛出异常
      *ForkJoinPool extends AbstractExecutorService
      *abstract class AbstractExecutorService implements ExecutorService
      *interface ExecutorService extends Executor
      *interface Executor执行提交的对象Runnable任务
      */
      if ((rs = runState) <0) {
      tryTerminate(
      false, false); // help terminate
      throw new RejectedExecutionException();
      }
      //第一次遍历,队列数组未创建,进行创建
      else if ((rs & STARTED) == 0 || // initialize初始化
      ((ws = workQueues) == null || (m = ws.length - 1) <0)) {
      int ns = 0;
      rs
      = lockRunState();
      try {
      if ((rs & STARTED) == 0) {
      U.compareAndSwapObject(
      this, STEALCOUNTER, null,
      new AtomicLong());
      // create workQueues array with size a power of two
      int p = config & SMASK; // ensure at least 2 slots,config是CPU核数
      int n = (p > 1) ? p - 1 : 1;
      n
      |= n >>> 1; n |= n >>> 2; n |= n >>> 4;
      n
      |= n >>> 8; n |= n >>> 16; n = (n + 1) <<1;
      workQueues
      = new WorkQueue[n];//创建
      ns = STARTED;
      }
      }
      finally {
      unlockRunState(rs, (rs
      & ~RSLOCK) | ns);
      }
      }
      //第三次遍历,把任务放入队列中
      else if ((q = ws[k = r & m & SQMASK]) != null) {
      if (q.qlock == 0 && U.compareAndSwapInt(q, QLOCK, 0, 1)) {
      ForkJoinTask
      [] a = q.array;
      int s = q.top;
      boolean submitted = false; // initial submission or resizing
      try { // locked version of push
      if ((a != null && a.length > s + 1 - q.base) ||
      (a
      = q.growArray()) != null) {
      int j = (((a.length - 1) & s) < ABASE;
      U.putOrderedObject(a, j, task);
      U.putOrderedInt(q, QTOP, s + 1);
      submitted
      = true;
      }
      }
      finally {
      U.compareAndSwapInt(q, QLOCK,
      1, 0);
      }
      if (submitted) {
      signalWork(ws, q);
      return;
      }
      }
      move
      = true; // move on failure
      }
      //第二次遍历,队列数组为空,创建队列
      else if (((rs = runState) & RSLOCK) == 0) { // create new queue
      q = new WorkQueue(this, null);
      q.hint
      = r;
      q.config
      = k | SHARED_QUEUE;
      q.scanState
      = INACTIVE;
      rs
      = lockRunState(); // publish index
      if (rs > 0 && (ws = workQueues) != null &&
      k
      null)
      ws[k]
      = q; // else terminated
      unlockRunState(rs, rs & ~RSLOCK);
      }
      else
      move
      = true; // move if busy
      if (move)
      r
      = ThreadLocalRandom.advanceProbe(r);
      }
      }





  2. fork任务切分的提交:ForkJoinTask.fork() -> ForkJoinWorkerThread.workQueue.push(task)/ForkJoinPool.common.externalPush(task) -> ForkJoinPool.push(task)/externalPush(task)



    1. fork:

      public final ForkJoinTask fork() {
      Thread t;
      if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)//当前线程是workerThread,任务直接放入workerThread当前的workQueue
      ((ForkJoinWorkerThread)t).workQueue.push(this);
      else
      ForkJoinPool.common.externalPush(
      this);//将任务添加到随机选取的队列中或新创建的队列中
      return this;
      }



    2.  push:

      public class ForkJoinPool extends AbstractExecutorService {
      static final class WorkQueue {
      final void push(ForkJoinTask task) {
      ForkJoinTask
      [] a; ForkJoinPool p;
      int b = base, s = top, n;
      if ((a = array) != null) { // ignore if queue removed,队列被移除忽略
      int m = a.length - 1; // fenced write for task visibility
      U.putOrderedObject(a, ((m & s) <//任务加入队列中
      U.putOrderedInt(this, QTOP, s + 1);//挪动下次任务存放的槽的位置
      if ((n = s - b) <= 1) {//当前数组元素少时,进行唤醒当前线程;或者当没有活动线程或线程数较少时,添加新的线程
      if ((p = pool) != null)
      p.signalWork(p.workQueues,
      this);
      }
      else if (n >= m)//数组所有元素都满了进行2倍扩容
      growArray();
      }
      }
      final ForkJoinTask[] growArray() {
      ForkJoinTask
      [] oldA = array;
      int size = oldA != null ? oldA.length <<1 : INITIAL_QUEUE_CAPACITY;//2倍扩容或初始化
      if (size > MAXIMUM_QUEUE_CAPACITY)
      throw new RejectedExecutionException("Queue capacity exceeded");
      int oldMask, t, b;
      ForkJoinTask
      [] a = array = new ForkJoinTask[size];
      if (oldA != null && (oldMask = oldA.length - 1) >= 0 &&
      (t
      = top) - (b = base) > 0) {
      int mask = size - 1;
      do { // emulate poll from old array, push to new array遍历从旧数组中取出放到新数组中
      ForkJoinTask x;
      int oldj = ((b & oldMask) < ABASE;
      int j = ((b & mask) < ABASE;
      x = (ForkJoinTask)U.getObjectVolatile(oldA, oldj);//从旧数组中取出
      if (x != null &&
      U.compareAndSwapObject(oldA, oldj, x,
      null))//将旧数组取出的位置的对象置为null
      U.putObjectVolatile(a, j, x);//放入新数组
      } while (++b != t);
      }
      return a;
      }
      }
      }





  任务的消费

  任务的消费的执行链路是ForkJoinTask.doExec() -> RecursiveTask.exec()/RecursiveAction.exec() -> 覆盖重写的compute()



  1.  doExec:任务的执行入口


    final int doExec() {
    int s; boolean completed;
    if ((s = status) >= 0) {
    try {
    completed
    = exec();//消费任务
    } catch (Throwable rex) {
    return setExceptionalCompletion(rex);
    }
    if (completed)
    s
    = setCompletion(NORMAL);//任务执行完设置状态为NORMAL,并唤醒其他等待任务
    }
    return s;
    }
    protected abstract boolean exec();
    private int setCompletion(int completion) {
    for (int s;;) {
    if ((s = status) <0)
    return s;
    if (U.compareAndSwapInt(this, STATUS, s, s | completion)) {//任务状态修改为NORMAL
    if ((s >>> 16) != 0)//状态不是SMASK
    synchronized (this) { notifyAll(); }//唤醒其他等待任务
    return completion;
    }
    }
    }
    /** The run status of this task 任务的运行状态*/
    volatile int status; // accessed directly by pool and workers由ForkJoinPool池或ForkJoinWorkerThread控制
    static final int DONE_MASK = 0xf0000000; // mask out non-completion bits
    static final int NORMAL = 0xf0000000; // must be negative
    static final int CANCELLED = 0xc0000000; // must be
    static final int EXCEPTIOnAL= 0x80000000; // must be
    static final int SIGNAL = 0x00010000; // must be >= 1 <<16
    static final int SMASK = 0x0000ffff; // short bits for tags




  任务真正执行处理逻辑

  任务提交到ForkJoinPool,最终真正的是由继承Thread的ForkJoinWorkerThread的run方法来执行消费任务的,ForkJoinWorkerThread处理哪个任务是由join来出队的;





    1. ForkJoinTask.join()


      public final V join() {
      int s;
      if ((s = doJoin() & DONE_MASK) != NORMAL)
      reportException(s);
      return getRawResult();//得到返回结果
      }
      private int doJoin() {
      int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
      /**
      * (s = status) <0 判断任务是否已经完成,完成直接返回s
      * 任务未完成:
      * 1)线程是ForkJoinWorkerThread,tryUnpush任务出队然后消费任务doExec
      * 1.1)出队或消费失败,执行awaitJoin进行自旋,如果任务状态是完成就退出,否则继续尝试出队,直到任务完成或超时为止;
      * 2)如果线程不是ForkJoinWorkerThread,执行externalAwaitDone进行出队消费
      */
      return (s = status) <0 ? s :
      ((t
      = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
      (w
      = (wt = (ForkJoinWorkerThread)t).workQueue).
      tryUnpush(
      this) && (s = doExec()) <0 ? s :
      wt.pool.awaitJoin(w,
      this, 0L) :
      externalAwaitDone();
      }
      private void reportException(int s) {
      if (s == CANCELLED)//取消
      throw new CancellationException();
      if (s == EXCEPTIONAL)//异常
      rethrow(getThrowableException());
      }




      1. awaitJoin:

        public class ForkJoinPool{
        final int awaitJoin(WorkQueue w, ForkJoinTask task, long deadline) {
        int s = 0;
        if (task != null && w != null) {
        ForkJoinTask
        prevJoin = w.currentJoin;
        U.putOrderedObject(w, QCURRENTJOIN, task);
        CountedCompleter
        cc = (task instanceof CountedCompleter) ?
        (CountedCompleter
        )task : null;
        for (;;) {
        if ((s = task.status) <0)//任务完成退出
        break;
        if (cc != null)//当前任务即将完成,检查是否还有其他的等待任务,如果有
        //运行当前队列的其他任务,若当前的队列中没有任务了,则窃取其他队列的任务并运行
        helpComplete(w, cc, 0);
        //当前队列没有任务了,或队列只剩下最后一个任务执行完了
        else if (w.base == w.top || w.tryRemoveAndExec(task))
        helpStealer(w, task);
        //窃取其他队列的任务
        if ((s = task.status) <0)
        break;
        long ms, ns;
        if (deadline == 0L)
        ms
        = 0L;
        else if ((ns = deadline - System.nanoTime()) <= 0L)//超时退出
        break;
        else if ((ms = TimeUnit.NANOSECONDS.toMillis(ns)) <= 0L)
        ms
        = 1L;
        if (tryCompensate(w)) {//当前队列阻塞了
        task.internalWait(ms);//进行等待
        U.getAndAddLong(this, CTL, AC_UNIT);
        }
        }
        U.putOrderedObject(w, QCURRENTJOIN, prevJoin);
        }
        return s;
        }
        }



      2. externalAwaitDone:

        private int externalAwaitDone() {
        /**
        * 当前任务是CountedCompleter
        * 1)是则执行ForkJoinPool.common.externalHelpComplete()
        * 2)否则执行ForkJoinPool.common.tryExternalUnpush(this)进行任务出队
        * 2.1)出队成功,进行doExec()消费,否则进行阻塞等待
        */
        int s = ((this instanceof CountedCompleter) ? // try helping
        ForkJoinPool.common.externalHelpComplete(
        (CountedCompleter
        )this, 0) :
        ForkJoinPool.common.tryExternalUnpush(
        this) ? doExec() : 0);
        if (s >= 0 && (s = status) >= 0) {//任务未完成
        boolean interrupted = false;
        do {
        if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {//任务状态标记为SIGNAL
        synchronized (this) {
        if (status >= 0) {
        try {
        wait(
        0L);//阻塞等待
        } catch (InterruptedException ie) {//有中断异常
        interrupted = true;//设置中断标识为true
        }
        }
        else
        notifyAll();
        //任务完成唤醒其他任务
        }
        }
        }
        while ((s = status) >= 0);
        if (interrupted)
        Thread.currentThread().interrupt();
        //当前线程进行中断
        }
        return s;
        }
        final int externalHelpComplete(CountedCompleter task, int maxTasks) {
        WorkQueue[] ws;
        int n;
        int r = ThreadLocalRandom.getProbe();
        //没有任务直接结束,有任务则执行helpComplete
        //helpComplete:运行随机选取的队列的任务,若选取的队列中没有任务了,则窃取其他队列的任务并运行
        return ((ws = workQueues) == null || (n = ws.length) == 0) ? 0 :
        helpComplete(ws[(n
        - 1) & r & SQMASK], task, maxTasks);
        }
         





  1. run和工作窃取



  任务是由workThread来窃取的,workThread是一个线程。线程的所有逻辑都是由run()方法执行:

public class ForkJoinWorkerThread extends Thread {
public void run() {
if (workQueue.array == null) { // only run once
Throwable exception = null;
try {
onStart();
//初始化状态
pool.runWorker(workQueue);//处理任务队列
} catch (Throwable ex) {
exception
= ex;
}
finally {
try {
onTermination(exception);
}
catch (Throwable ex) {
if (exception == null)
exception
= ex;
}
finally {
pool.deregisterWorker(
this, exception);
}
}
}
}
}
public class ForkJoinPool{
final void runWorker(WorkQueue w) {
w.growArray();
// allocate queue,队列初始化
int seed = w.hint; // initially holds randomization hint
int r = (seed == 0) ? 1 : seed; // avoid 0 for xorShift
for (ForkJoinTask t;;) {//自旋
if ((t = scan(w, r)) != null)//从队列中窃取任务成功,scan()进行任务窃取
w.runTask(t);//执行任务,内部方法调用了doExec()进行任务的消费
else if (!awaitWork(w, r))//队列没有任务了则结束
break;
r
^= r <<13; r ^= r >>> 17; r ^= r <<5; // xorshift
}
}
}




    1. scan:

      private ForkJoinTask scan(WorkQueue w, int r) {
      WorkQueue[] ws;
      int m;
      if ((ws = workQueues) != null && (m = ws.length - 1) > 0 && w != null) {
      int ss = w.scanState; // initially non-negative
      for (int origin = r & m, k = origin, oldSum = 0, checkSum = 0;;) {
      WorkQueue q; ForkJoinTask
      [] a; ForkJoinTask t;
      int b, n; long c;
      if ((q = ws[k]) != null) { //随机选中了非空队列 q
      if ((n = (b = q.base) - q.top) <0 &&
      (a
      = q.array) != null) { // non-empty
      long i = (((a.length - 1) & b) <//从尾部出队,b是尾部下标
      if ((t = ((ForkJoinTask)
      U.getObjectVolatile(a, i)))
      != null &&
      q.base
      == b) {
      if (ss >= 0) {
      if (U.compareAndSwapObject(a, i, t, null)) { //利用cas出队
      q.base = b + 1;
      if (n <-1) // signal others
      signalWork(ws, q);
      return t; //出队成功,成功窃取一个任务!
      }
      }
      else if (oldSum == 0 && // try to activate 队列没有激活,尝试激活
      w.scanState <0)
      tryRelease(c
      = ctl, ws[m & (int)c], AC_UNIT);
      }
      if (ss <0) // refresh
      ss = w.scanState;
      r
      ^= r <<1; r ^= r >>> 3; r ^= r <<10;
      origin
      = k = r & m; // move and rescan
      oldSum = checkSum = 0;
      continue;
      }
      checkSum
      += b;
      }

      //k = k + 1表示取下一个队列 如果(k + 1) & m == origin表示 已经遍历完所有队列了
      if ((k = (k + 1) & m) == origin) { // continue until stable
      if ((ss >= 0 || (ss == (ss = w.scanState))) &&
      oldSum
      == (oldSum = checkSum)) {
      if (ss <0 || w.qlock <0) // already inactive
      break;
      int ns = ss | INACTIVE; // try to inactivate
      long nc = ((SP_MASK & ns) |
      (UC_MASK
      & ((c = ctl) - AC_UNIT)));
      w.stackPred
      = (int)c; // hold prev stack top
      U.putInt(w, QSCANSTATE, ns);
      if (U.compareAndSwapLong(this, CTL, c, nc))
      ss
      = ns;
      else
      w.scanState
      = ss; // back out
      }
      checkSum
      = 0;
      }
      }
      }
      return null;
      }



    2. ForkJoinPool.runTask:

      final void runTask(ForkJoinTask task) {
      if (task != null) {
      scanState
      &= ~SCANNING; // mark as busy
      (currentSteal = task).doExec();
      U.putOrderedObject(
      this, QCURRENTSTEAL, null); // release for GC
      execLocalTasks();
      ForkJoinWorkerThread thread
      = owner;
      if (++nsteals <0) // collect on overflow
      transferStealCount(pool);
      scanState
      |= SCANNING;
      if (thread != null)
      thread.afterTopLevelExec();
      }
      }





四.总结

  对于fork/join来说,在使用时还是存在下面的一些问题的:


  • 在使用JVM的时候我们要考虑OOM的问题,如果我们的任务处理时间非常耗时,并且处理的数据非常大的时候,会造成OOM;

  • ForkJoin是通过多线程的方式进行处理任务,那么我们不得不考虑是否应该使用ForkJoin。因为当数据量不是特别大的时候,我们没有必要使用ForkJoin。因为多线程会涉及到上下文的切换,所以数据量不大的时候使用串行比使用多线程快;

    • 项目中进行本地测试发现,业务层Service进行excel表数据(数据量几百)的复杂处理,进行单线程for循环统计消耗时间,然后与使用fork/join进行处理统计消耗时间,发现fork/join的消耗时间是单线程for的2倍;




推荐阅读
  • 1Lock与ReadWriteLock1.1LockpublicinterfaceLock{voidlock();voidlockInterruptibl ... [详细]
  • 深入理解Kafka服务端请求队列中请求的处理
    本文深入分析了Kafka服务端请求队列中请求的处理过程,详细介绍了请求的封装和放入请求队列的过程,以及处理请求的线程池的创建和容量设置。通过场景分析、图示说明和源码分析,帮助读者更好地理解Kafka服务端的工作原理。 ... [详细]
  • 基于Socket的多个客户端之间的聊天功能实现方法
    本文介绍了基于Socket的多个客户端之间实现聊天功能的方法,包括服务器端的实现和客户端的实现。服务器端通过每个用户的输出流向特定用户发送消息,而客户端通过输入流接收消息。同时,还介绍了相关的实体类和Socket的基本概念。 ... [详细]
  • 第七课主要内容:多进程多线程FIFO,LIFO,优先队列线程局部变量进程与线程的选择线程池异步IO概念及twisted案例股票数据抓取 ... [详细]
  • java多线程获取线程返回结果
    我们在使用java多线程编写相关业务代码时,往往有这样一种情况,某个线程依赖于其他线程执行结果。也就是说,我们需要在一个线程中获取另一个线程的信息。可以分为两种情况,一种是轮询,一 ... [详细]
  • Java太阳系小游戏分析和源码详解
    本文介绍了一个基于Java的太阳系小游戏的分析和源码详解。通过对面向对象的知识的学习和实践,作者实现了太阳系各行星绕太阳转的效果。文章详细介绍了游戏的设计思路和源码结构,包括工具类、常量、图片加载、面板等。通过这个小游戏的制作,读者可以巩固和应用所学的知识,如类的继承、方法的重载与重写、多态和封装等。 ... [详细]
  • Iamtryingtomakeaclassthatwillreadatextfileofnamesintoanarray,thenreturnthatarra ... [详细]
  • 在Android开发中,使用Picasso库可以实现对网络图片的等比例缩放。本文介绍了使用Picasso库进行图片缩放的方法,并提供了具体的代码实现。通过获取图片的宽高,计算目标宽度和高度,并创建新图实现等比例缩放。 ... [详细]
  • 向QTextEdit拖放文件的方法及实现步骤
    本文介绍了在使用QTextEdit时如何实现拖放文件的功能,包括相关的方法和实现步骤。通过重写dragEnterEvent和dropEvent函数,并结合QMimeData和QUrl等类,可以轻松实现向QTextEdit拖放文件的功能。详细的代码实现和说明可以参考本文提供的示例代码。 ... [详细]
  • Python正则表达式学习记录及常用方法
    本文记录了学习Python正则表达式的过程,介绍了re模块的常用方法re.search,并解释了rawstring的作用。正则表达式是一种方便检查字符串匹配模式的工具,通过本文的学习可以掌握Python中使用正则表达式的基本方法。 ... [详细]
  • 重入锁(ReentrantLock)学习及实现原理
    本文介绍了重入锁(ReentrantLock)的学习及实现原理。在学习synchronized的基础上,重入锁提供了更多的灵活性和功能。文章详细介绍了重入锁的特性、使用方法和实现原理,并提供了类图和测试代码供读者参考。重入锁支持重入和公平与非公平两种实现方式,通过对比和分析,读者可以更好地理解和应用重入锁。 ... [详细]
  • 本文介绍了在Android开发中使用软引用和弱引用的应用。如果一个对象只具有软引用,那么只有在内存不够的情况下才会被回收,可以用来实现内存敏感的高速缓存;而如果一个对象只具有弱引用,不管内存是否足够,都会被垃圾回收器回收。软引用和弱引用还可以与引用队列联合使用,当被引用的对象被回收时,会将引用加入到关联的引用队列中。软引用和弱引用的根本区别在于生命周期的长短,弱引用的对象可能随时被回收,而软引用的对象只有在内存不够时才会被回收。 ... [详细]
  • linux进阶50——无锁CAS
    1.概念比较并交换(compareandswap,CAS),是原⼦操作的⼀种,可⽤于在多线程编程中实现不被打断的数据交换操作࿰ ... [详细]
  • 我所理解的JMM 2 new原子性
    概述文本探讨构造函数是否为原子性问题。案例我们首先如下代码:publicclassPerson{publicintage;publicPerson(){age ... [详细]
  • java线程池的实现原理源码分析
    这篇文章主要介绍“java线程池的实现原理源码分析”,在日常操作中,相信很多人在java线程池的实现原理源码分析问题上存在疑惑,小编查阅了各式资 ... [详细]
author-avatar
程橙屋04_kc275_938
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有