Fork/Join线程池是Java 7中引入的一个用于并行执行任务的框架,旨在充分利用多核处理器的计算能力,加快处理速度,提高性能。
分治思想图
将大任务拆分成多个小任务,多个小任务并行计算,最终将结果汇总
原理
任务分解(Fork)
Fork/Join框架的核心思想是将一个大任务分解(Fork)成若干个小任务。如果这些小任务还太大,则继续分解,直到足够小可以直接计算。
任务分解通常采用递归的方式进行,这是Fork/Join框架鼓励的并行计算模式。
并行执行
分解后的小任务会被并行地提交到ForkJoinPool(ForkJoin框架的线程池实现)中执行。
ForkJoinPool中的每个线程都维护自己的双端队列,用于存储待执行的任务。线程执行完队列中的一个任务后,会尝试执行下一个任务。
工作窃取(Work-Stealing)
ForkJoinPool采用工作窃取算法来优化任务的执行。当某个线程的任务队列为空时,它会尝试从其他线程的任务队列中“偷取”任务来执行,以此来减少线程间的竞争和提高效率。
结果合并(Join)
当所有子任务执行完成后,需要将这些子任务的结果合并(Join)以得到最终的结果。
使用
创建ForkJoinPool
首先,需要创建一个ForkJoinPool对象来管理ForkJoin任务的执行。创建时可以指定并行级别(parallelism level),即线程池中的线程数量。如果不指定,ForkJoinPool将使用系统的可用处理器数量作为并行级别。
ForkJoinPool pool = new ForkJoinPool();
定义ForkJoinTask
接下来,需要定义ForkJoinTask的子类来实现具体的任务逻辑。ForkJoinTask有两个重要的子类:RecursiveAction(用于没有返回结果的任务)和RecursiveTask(用于有返回结果的任务)。
在子类中,需要重写compute()方法来定义任务的执行逻辑,包括任务的分解、执行和结果合并。
// 简单的求和任务
class SumTask extends RecursiveTask<Long> {
private final long[] numbers;
private final int start;
private final int end;
public SumTask(long[] numbers, int start, int end) {
this.numbers = numbers;
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
long length = end - start;
if (length < THRESHOLD) { // THRESHOLD是一个预设的阈值,用于决定任务是否需要继续分解
long sum = 0;
for (int i = start; i < end; i++) {
sum += numbers[i];
}
return sum;
} else {
int middle = (start + end) / 2;
SumTask leftTask = new SumTask(numbers, start, middle);
SumTask rightTask = new SumTask(numbers, middle, end);
leftTask.fork(); // 递归分解任务
long rightResult = rightTask.compute(); // 直接计算右半部分
long leftResult = leftTask.join(); // 等待左半部分完成并获取结果
return leftResult + rightResult;
}
}
}
//SumTask leftTask = new SumTask(numbers, start, middle);
//SumTask rightTask = new SumTask(numbers, middle, end);
//leftTask.fork();
//rightTask.fork();
//return leftTask.join() + rightTask.join();
提交任务并执行
使用ForkJoinPool的submit()、invoke()或execute()方法将ForkJoinTask提交到线程池中执行。
submit()方法会返回一个Future对象,可以用来查询任务执行的结果。
invoke()方法是同步的,会等待任务执行完成并返回结果。
execute()方法是异步的,提交任务后立即返回,不会等待任务完成。
long[] numbers = ...; // 初始化数组
SumTask task = new SumTask(numbers, 0, numbers.length);
Long result = pool.invoke(task); // 提交任务并等待结果
结果处理
对于有返回结果的任务,可以使用Future对象的get()方法来获取结果,但需要注意get()方法是阻塞的,直到任务完成。
在RecursiveTask中,可以通过join()方法等待子任务完成并获取其结果,然后将这些结果合并以得到最终的结果。
评论区