0%

并发工具类

文章字数:755,阅读全文大约需要3分钟

一、Fork/Join

1.1分治思想

  • 体现了分而治之的算法思想
  • 将大任务分成若干规模小的子任务
  • 规模为N的问题,N<阈值直接解决。N>阈值分解成若干小规模问题。子问题见互相对立,且与原问题形式相同。子问题的解合并就是原问题的解。
  • 动态规划和分治的不同在于动态规划的子问题间没有联系。

1.2 fork/join

  • Fork将大问题分成小问题
  • Join小问题的解合并成大问题的解
  • 工作窃取,线程执行完自己任务队列的任务后会从其他任务队列最后面取任务执行。节约资源
1
2
3
4
5
6
ForkJoinPool pool = new ForkJoinPool();
// 自己定义的任务
MyTask myTask = new ForkJoinTask(src, 0, src.length - 1);
Pool.invoke(myTask);// 主线程阻塞
// pool.execute(myTask);// 异步执行主线程不阻塞
Result = myTask.join();//阻塞等待结果

自定义任务需要继承于RecuriveTask/RecursiveAction/ForkJoinTaskForkJoinTask是父级类,一般用另外两个

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// <>里为返回值
class SumTask extends RecursiveTask<Integer> {
private final static int THRESHOLD = 10; // 子任务的最大数量
private int[] src; // 表示要处理的数组
private fromIndex; //开始的下标
private toIndex; // 结束的下标

public SumTask(int[] src, int fromIndex, int toIndex) {
this.src = src;
this.fromIndex = fromIndex;
this.toIndex = toIndex;
}

@Override
protected Integer compute() {
if(toIndex - fromIndex < THRESHOLD) {
// 小于子任务最大数量,直接计算返回
int count = 0;
for(int i = fromIndex; i <= toIndex; i++) {
count = count + src[i];
}
return count;
} else {
int mid = (formIndex + toIndex) / 2;
// 拆分成两个任务,并执行,返回两个任务执行结果的和
SumTask left = new SumTask(src, fromIndex, mid);
SumTask right = new SumTask(src, mid + 1, toIndex);
invokeAll(left, right);
return left.join + right.join();
}
}
}

二、CountDownlatch

  • 一个线程等待其他线程工作完成之后执行,加强版的join
  • CountDownlatch(n)初始化n个标记
  • await()等待标记数量为0
  • countdown()标记数量-1

三、CyclicBarrier

  • 一组线程达到某个屏障,阻塞。直到一组最后一个到达屏障,开放。
  • CyclicBarrier(n)总共几个线程
  • await()等待所有的线程都到达await()位置
  • 等于CountDownlatch每个线程自动countdown()

四、Semaphore

  • 控制同时访问某个特定资源的数量,常用于流量控制
  • new Semaphore(n)初始化n个令牌,release()可累加超过初始的数
  • acquire()令牌-1
  • release()令牌+1

五、Exchange

  • 两个线程之间的数据交换
  • exchange(Object)两个线程都调用此方法,线程会阻塞在此位置。直到两个线程都到这个位置,会进行一次数据交换。
  • 只能适用于两个线程

六、Callable和Future

  • Callable可以返回值和抛出异常,线程的业务内容
  • FutureTaskCallable包装成Runnable

6.1future接口

  • isDone是否结束
  • isCanncelled任务完成前被取消,返回true
  • cancel(true)中断并运行任务,成功返回truecancel(false)不会中断。已结束返回false
  • get()阻塞等待结果