侧边栏壁纸
博主头像
ZHD的小窝博主等级

行动起来,活在当下

  • 累计撰写 79 篇文章
  • 累计创建 53 个标签
  • 累计收到 1 条评论

目 录CONTENT

文章目录

Fork/Join线程池的原理和使用

江南的风
2015-10-18 / 0 评论 / 0 点赞 / 18 阅读 / 4895 字 / 正在检测是否收录...

Fork/Join线程池是Java 7中引入的一个用于并行执行任务的框架,旨在充分利用多核处理器的计算能力,加快处理速度,提高性能。

分治思想图

将大任务拆分成多个小任务,多个小任务并行计算,最终将结果汇总

原理

  • 任务分解(Fork)

Fork/Join框架的核心思想是将一个大任务分解(Fork)成若干个小任务。如果这些小任务还太大,则继续分解,直到足够小可以直接计算。

任务分解通常采用递归的方式进行,这是Fork/Join框架鼓励的并行计算模式。

  • 并行执行

分解后的小任务会被并行地提交到ForkJoinPool(ForkJoin框架的线程池实现)中执行。

ForkJoinPool中的每个线程都维护自己的双端队列,用于存储待执行的任务。线程执行完队列中的一个任务后,会尝试执行下一个任务。

  • 工作窃取(Work-Stealing)

ForkJoinPool采用工作窃取算法来优化任务的执行。当某个线程的任务队列为空时,它会尝试从其他线程的任务队列中“偷取”任务来执行,以此来减少线程间的竞争和提高效率。

  • 结果合并(Join)

当所有子任务执行完成后,需要将这些子任务的结果合并(Join)以得到最终的结果。

使用

  • 创建ForkJoinPool

首先,需要创建一个ForkJoinPool对象来管理ForkJoin任务的执行。创建时可以指定并行级别(parallelism level),即线程池中的线程数量。如果不指定,ForkJoinPool将使用系统的可用处理器数量作为并行级别。

ForkJoinPool pool = new ForkJoinPool(); 
  • 定义ForkJoinTask

接下来,需要定义ForkJoinTask的子类来实现具体的任务逻辑。ForkJoinTask有两个重要的子类:RecursiveAction(用于没有返回结果的任务)和RecursiveTask(用于有返回结果的任务)。

在子类中,需要重写compute()方法来定义任务的执行逻辑,包括任务的分解、执行和结果合并。

// 简单的求和任务
class SumTask extends RecursiveTask<Long> {  
    private final long[] numbers;  
    private final int start;  
    private final int end;  
  
    public SumTask(long[] numbers, int start, int end) {  
        this.numbers = numbers;  
        this.start = start;  
        this.end = end;  
    }  
  
    @Override  
    protected Long compute() {  
        long length = end - start;  
        if (length < THRESHOLD) { // THRESHOLD是一个预设的阈值,用于决定任务是否需要继续分解  
            long sum = 0;  
            for (int i = start; i < end; i++) {  
                sum += numbers[i];  
            }  
            return sum;  
        } else {  
            int middle = (start + end) / 2;  
            SumTask leftTask = new SumTask(numbers, start, middle);  
            SumTask rightTask = new SumTask(numbers, middle, end);  
            leftTask.fork(); // 递归分解任务  
            long rightResult = rightTask.compute(); // 直接计算右半部分  
            long leftResult = leftTask.join(); // 等待左半部分完成并获取结果  
            return leftResult + rightResult;  
        }  
    }  
}

//SumTask leftTask = new SumTask(numbers, start, middle);  
//SumTask rightTask = new SumTask(numbers, middle, end);  
//leftTask.fork(); 
//rightTask.fork(); 
//return leftTask.join() + rightTask.join();  
  • 提交任务并执行

使用ForkJoinPool的submit()、invoke()或execute()方法将ForkJoinTask提交到线程池中执行。

submit()方法会返回一个Future对象,可以用来查询任务执行的结果。

invoke()方法是同步的,会等待任务执行完成并返回结果。

execute()方法是异步的,提交任务后立即返回,不会等待任务完成。

long[] numbers = ...; // 初始化数组  
SumTask task = new SumTask(numbers, 0, numbers.length);  
Long result = pool.invoke(task); // 提交任务并等待结果  
  • 结果处理

对于有返回结果的任务,可以使用Future对象的get()方法来获取结果,但需要注意get()方法是阻塞的,直到任务完成。

在RecursiveTask中,可以通过join()方法等待子任务完成并获取其结果,然后将这些结果合并以得到最终的结果。

0

评论区