Fork/Join框架是JDK1.7提供的一个用于并行执行任务的框架,开发者可以在不去了解如Thread、Runnable等相关知识的情况下,只要遵循fork/join开发模式,就完成写出很好的多线程并发任务。
同时其按照**分而治之**的思想,可以把一个大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架。
对于Fork/Join框架的理解可以认为其由两部分组成,Fork就是把一个大任务切分为若干个子任务并行执行。Join就是合并这些子任务的执行结果,最后得到这个大任务的结果。

工作窃取算法
即当前线程的 Task 已经全被执行完毕,则自动取到其他线程的 Task 池中取出 Task 继续执行。ForkJoinPool 中维护着多个线程(一般为 CPU 核数)在不断地执行 Task,每个线程除了执行自己任务列表内的 Task 之外,还会根据自己工作线程的闲置情况去获取其他繁忙的工作线程的 Task,如此一来就能能够减少线程阻塞或是闲置的时间,提高 CPU 利用率。

Fork/Join的使用
基本概念
要使用Fork/Join的话,首先需要有一个Pool。通过它可以来执行任务。 而每一个任务叫做ForkJoinTask,其内部提供了fork和join的操作机制。通常情况下开发者不需要直接继承ForkJoinTask,而是继承它的子类。分别为:

1)新建ForkJoinPool;
2)新建ForkJoinTask(RecursiveAction || RecursiveTask);
3)在任务中的compute方法,会根据自定义条件进行任务拆分,如果条件满足则执行任务,如果条件不满足则继续拆分任务。当所有任务都执行完,进行最终结果的汇总。
4)最终通过get或join获取数据结果。
同步有结果值返回
需求:统计整型数组中所有元素的和。
public class GenArray {
public static final int ARRAY_LENGTH=400000;
public static int[] genArray(){
Random random = new Random(); int[] result = new int[ARRAY_LENGTH]; for (int i = 0; i < ARRAY_LENGTH; i++) { result[i]= random.nextInt(ARRAY_LENGTH*3); } return result; } }
|
public class SumNormal {
public static void main(String[] args) {
int count = 0; int[] src = GenArray.genArray();
long start = System.currentTimeMillis();
for (int i = 0; i < src.length; i++) { count+=src[i]; }
System.out.println("spend time: "+(System.currentTimeMillis()-start)); } }
|
public class SumForkJoin{
private static class SumTask extends RecursiveTask<Integer> {
private final static int THRESHOLD=GenArray.ARRAY_LENGTH/10; private int[] src; private int fromIndex; private int endIndex;
public SumTask(int[] src, int fromIndex, int endIndex) { this.src = src; this.fromIndex = fromIndex; this.endIndex = endIndex; }
@Override protected Integer compute() {
if (endIndex-fromIndex<THRESHOLD){ int count = 0; for (int i = fromIndex; i <= endIndex; i++) { count+=src[i]; } return count; }else { int mid = (fromIndex+endIndex)/2; SumTask left = new SumTask(src,fromIndex,mid); SumTask right = new SumTask(src,mid+1,endIndex); invokeAll(left,right); return left.join()+right.join(); } } }
public static void main(String[] args) {
ForkJoinPool pool = new ForkJoinPool();
int[] src = GenArray.genArray(); SumTask sumTask = new SumTask(src,0,src.length-1);
long start = System.currentTimeMillis(); pool.invoke(sumTask); System.out.println("spend time: "+(System.currentTimeMillis()-start)); } }
|
根据执行结果可以看到,如果数据量较小的情况下,使用普通循环的效率更高,因为其内部是以总线的形式进行相加。而forkjoin的话,要利用当前可用的CPU核数结合线程的上下文切换,所以存在一定的性能消耗。
但是如果数据量较大的话,可以看到使用forkjoin的效率会明显高于普通for循环。
异步无结果值返回
需求:遍历目录(包含子目录)寻找txt类型文件。
public class FindFile extends RecursiveAction {
private File path;
public FindFile(File path) { this.path = path; }
@Override protected void compute() { List<FindFile> takes = new ArrayList<>();
File[] files = path.listFiles();
if (files != null){ for (File file : files) { if (file.isDirectory()){ takes.add(new FindFile(file)); }else { if (file.getAbsolutePath().endsWith("txt")){ System.out.println(file.getAbsolutePath()); } } }
if (!takes.isEmpty()){ for (FindFile task : invokeAll(takes)){ task.join(); } } } }
public static void main(String[] args) {
ForkJoinPool pool = new ForkJoinPool();
FindFile task = new FindFile(new File("F://"));
pool.submit(task);
task.join();
System.out.println("task end"); } }
|