0%

JUC

文章字数:2051,阅读全文大约需要8分钟

Java.Util.Concurrent是JDK1.5之后推出的java并发工具包。其中有很多多线程工具类,能有效减少竞争条件,减少死锁进程。

线程池

线程池是一组可复用的线程集合。由处理线程+任务队列组成。

涉及对象信息

  1. Executor: 线程池中Runnable任务执行者。
  2. ExecutorService: 管理线程池对象,能够把RunnableCallable交到线程池中执行
  3. Executors: 此类的静态方法能够生成不同的线程池,并返回ExecutorService用于管理。

常用程池类型

可以使用ThreadPoolExecutor 的构造函数创建属性细节不同的线程池,一下为已经定义好的线程池

  1. newFixedThreadPool: 固定大小的线程池,共享的无界队列管理任务。关闭前发生异常导致线程终止,则会使用新线程代替。(如果需要)

  2. newCachedThreadPool: 无界线程池,当需要线程使用但是线程池内无线程,则会新建一个线程并加入线程池。60s未用的线程会被移除。

  3. newSingleThreadExecutor: 无界队列,单个执行线程。如果关闭前因为异常线程被迫结束,后续需要线程时会创建新的线程并替换不可用的。

  4. ThreadPoolExecutor: 这个类的构造方法可以生成自定义配置的线程池,可以设置最大线程数最小线程数空闲线程keepAlive时间

以上线程池生成后返回的都是ExecutorService对象

  1. ExecutorService.submit(): 方法可以提交任务给线程池

  2. ExecutorService.shutdown(): 此方法可以结束线程池

Semaphore计数信号量(许可集合)

可以初始化任意数量的许可。acquire()方法会拿走一个许可,如果没有许可了则阻塞。release()方法可以释放当前线程拿走的许可。限制同一时间可以同时执行的线程数量

10个人抢2个位置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;

public class MySemaphore extends Thread {
private Semaphore position;
private int id;

public MySemaphore(int i, Semaphore s) {
this.id = i;
this.position = s;
}

public void run() {
try {
//这个方法可以判断还有多少许可
if (position.availablePermits() > 0) {
System.out.println("顾客[" + this.id + "]进入厕所,有空位");
}else {
System.out.println("顾客[" + this.id + "]进入厕所,没空位,排队");
}
//获取到空厕所了(没许可等待许可)
position.acquire();
System.out.println("顾客[" + this.id + "]获得坑位");
//使用中...
Thread.sleep((int) (Math.random() * 1000));
System.out.println("顾客[" + this.id + "]使用完毕");
//厕所使用完之后释放
position.release();
}catch (Exception e) {
e.printStackTrace();
}
}

public static void main(String args[]) {
ExecutorService list = Executors.newCachedThreadPool();
Semaphore position = new Semaphore(2);//只有两个厕所
//有十个人
for (int i = 0; i < 10; i++) {
list.submit(new MySemaphore(i + 1, position));
}
list.shutdown();
position.acquireUninterruptibly(2);
System.out.println("使用完毕,需要清扫了");
position.release(2);
}
}

ReentrantLock可重入互斥锁

一个可重入的互斥锁定 Lock,它具有与使用 synchronized 方法和语句所访问的隐式监视器锁定相同的一些基本行为和语义,但功能更强大。

  1. ReentrantLock将由最近成功获得锁定,并且还没有释放该锁定的线程所拥有。

  2. 当锁定没有被另一个线程所拥有时,调用 lock 的线程将成功获取该锁定并返回。

  3. 如果当前线程已经拥有该锁定,此方法将立即返回。可以使用 isHeldByCurrentThread() 和 getHoldCount() 方法来检查此情况是否发生。

  4. 此类的构造方法接受一个可选的公平参数。当设置为 true时,在多个线程的争用下,这些锁定倾向于将访问权授予等待时间最长的线程。否则此锁定将无法保证任何特定访问顺序。

与采用默认设置(使用不公平锁定)相比,使用公平锁定的程序在许多线程访问时表现为很低的总体吞吐量(即速度很慢,常常极其慢),但是在获得锁定和保证锁定分配的均衡性时差异较小。
不过要注意的是,公平锁定不能保证线程调度的公平性。因此,使用公平锁定的众多线程中的一员可能获得多倍的成功机会,这种情况发生在其他活动线程没有被处理并且目前并未持有锁定时。还要注意的是,未定时的 tryLock 方法并没有使用公平设置。因为即使其他线程正在等待,只要该锁定是可用的,此方法就可以获得成功。

基本使用

1
2
3
4
5
6
7
8
9
10
11
12
class X {
private final ReentrantLock lock = new ReentrantLock();
// ...
public void m() {
lock.lock(); // block until condition holds
try {
// ... method body
}finally {
lock.unlock()
}
}
}

CountDownLatch同步辅助类(计数)

初始化锁,指定一个数量。begin.await()会阻塞线程,begin.countDown();每次计数-1,直到为0阻塞的线程才会释放

CyclicBarrier同步辅助类(公共屏障)

锁初始化时指定几个await()之后await()方法才会释放当前线程

例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class TestCyclicBarrier {

// 徒步需要的时间: 分别代表Shenzhen, Guangzhou, Shaoguan, Changsha, Wuhan的时间
private static int[] timeWalk = {5, 8, 15, 15, 10};
// 自驾游
private static int[] timeSelf = {1, 3, 4, 4, 5};
// 旅游大巴
private static int[] timeBus = {2, 4, 6, 6, 7};

static String now() {
SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");
return sdf.format(new Date()) + ": ";
}

static class Tour implements Runnable {

private int[] times;
private CyclicBarrier barrier;
private String tourName;

public Tour(CyclicBarrier barrier, String tourName, int[] times) {
this.times = times;
this.tourName = tourName;
this.barrier = barrier;
}

public void run() {
try {
Thread.sleep(times[0] * 1000);
System.out.println(now() + tourName + " Reached Shenzhen");
// 等三个线程执行完之后才会释放
barrier.await();
Thread.sleep(times[1] * 1000);
System.out.println(now() + tourName + " Reached Guangzhou");
barrier.await();
Thread.sleep(times[2] * 1000);
System.out.println(now() + tourName + " Reached Shaoguan");
barrier.await();
Thread.sleep(times[3] * 1000);
System.out.println(now() + tourName + " Reached Changsha");
barrier.await();
Thread.sleep(times[4] * 1000);
System.out.println(now() + tourName + " Reached Wuhan");
barrier.await();
} catch (InterruptedException e) {
} catch (BrokenBarrierException e) {
}
}
}

public static void main(String[] args) {
// 三个旅行团(三个await()之后才会释放)
CyclicBarrier barrier = new CyclicBarrier(3);
ExecutorService exec = Executors.newFixedThreadPool(3);
exec.submit(new Tour(barrier, "WalkTour", timeWalk));
exec.submit(new Tour(barrier, "SelfTour", timeSelf));
// 当我们把下面的这段代码注释后,会发现,程序阻塞了,无法继续运行下去。
exec.submit(new Tour(barrier, "BusTour", timeBus));
exec.shutdown();
}
}

阻塞队列

BlockingQueue强化的阻塞队列

支持两个附加操作的 Queue,这两个操作是:检索元素时等待队列变为非空,以及存储元素时等待空间变得可用。

  1. 支持两个附加操作的 Queue,这两个操作是:检索元素时等待队列变为非空,以及存储元素时等待空间变得可用。

  2. BlockingQueue 不接受 null 元素。试图 addputoffer 一个 null 元素时,某些实现会抛出 NullPointerException。null 被用作指示 poll 操作失败的警戒值。

  3. BlockingQueue 可以是限定容量的。它在任意给定时间都可以有一个 remainingCapacity,超出此容量,便无法无阻塞地 put 额外的元素。

  4. 没有任何内部容量约束的 BlockingQueue 总是报告 Integer.MAX_VALUE 的剩余容量。

5.BlockingQueue 实现主要用于生产者-使用者队列,但它另外还支持 Collection 接口。因此,举例来说,使用 remove(x) 从队列中移除任意一个元素是有可能的。(最好不要使用这些操作)

然而,这种操作通常不会有效执行,只能有计划地偶尔使用,比如在取消排队信息时。
BlockingQueue 实现是线程安全的。所有排队方法都可以使用内部锁定或其他形式的并发控制来自动达到它们的目的。
然而,大量的 Collection 操作(addAll、containsAll、retainAll 和 removeAll)没有必要自动执行,除非在实现中特别说明。
因此,举例来说,在只添加了 c 中的一些元素后,addAll(c) 有可能失败(抛出一个异常)。
BlockingQueue 实质上不 支持使用任何一种“close”或“shutdown”操作来指示不再添加任何项。
这种功能的需求和使用有依赖于实现的倾向。例如,一种常用的策略是:对于生产者,插入特殊的 end-of-stream 或 poison 对象,并根据使用者获取这些对象的时间来对它们进行解释。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
import java.util.concurrent.BlockingQueue;    
import java.util.concurrent.ExecutorService;   
import java.util.concurrent.Executors;  
import java.util.concurrent.LinkedBlockingQueue;  
  
public class MyBlockingQueue extends Thread {  
  
    public static BlockingQueue<String> queue = new LinkedBlockingQueue<String>(3);  
    private int index;  
  
    public MyBlockingQueue(int i) {  
        this.index = i;  
    }  
  
    public void run() {  
        try {  
            queue.put(String.valueOf(this.index));  
            System.out.println("{" + this.index + "} in queue!");  
        } catch (Exception e) {  
            e.printStackTrace();  
        }  
    }  
  
    public static void main(String args[]) {  
        ExecutorService service = Executors.newCachedThreadPool();  
        for (int i = 0; i < 10; i++) {  
            service.submit(new MyBlockingQueue(i));  
        }  
  
        Thread thread = new Thread() {  
            public void run() {  
                try {  
                    while (true) {  
                        Thread.sleep((int) (Math.random() * 1000));  
                        if (MyBlockingQueue.queue.isEmpty())  
                            break;  
                        String str = MyBlockingQueue.queue.take();  
                        System.out.println(str + " has take!");  
                    }  
                } catch (Exception e) {  
                    e.printStackTrace();  
                }  
            }  
        };  
        service.submit(thread);  
        service.shutdown();  
    }  
}

线程返回结果

CompletionService包装线程池返回结果

相比于一个个去获取线程的结果Future.get()会造成线程阻塞,消耗时间。循环CompletionService就是一个保存已经完成的线程的结果FutureBlockingQueue,take()方法就会从中取出一个。

循环executorCompletionService.take().get()就能得到结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
import java.util.concurrent.Callable;  
  
import java.util.concurrent.CompletionService;  
  
import java.util.concurrent.ExecutorCompletionService;  
  
import java.util.concurrent.ExecutorService;  
  
import java.util.concurrent.Executors;  
  
public class MyCompletionService implements Callable<String{  
    private int id;  
    public MyCompletionService(int i) {  
        this.id = i;  
    }  
  
    public static void main(String[] args) throws Exception {  
        ExecutorService service = Executors.newCachedThreadPool();  
        CompletionService<String> completion = new ExecutorCompletionService<String>(service);  
        for (int i = 0; i < 10; i++) {  
            completion.submit(new MyCompletionService(i));  
        }  
          
        for (int i = 0; i < 10; i++) {  
            System.out.println(completion.take().get());  
        }  
        service.shutdown();  
    }  
  
    public String call() throws Exception {  
        Integer time = (int) (Math.random() * 1000);  
        try {  
            System.out.println(this.id + " start");  
            Thread.sleep(time);  
            System.out.println(this.id + " end");  
        }catch (Exception e) {  
            e.printStackTrace();  
        }  
        return this.id + ":" + time;  
    }  
  
}

Future异步执行结果

执行线程后返回的Future代表线程返回的结果,用于检测异步线程是否结束,如果结束获取返回值,没结束可以阻塞线程直到有结果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
import java.util.concurrent.Callable;  
import java.util.concurrent.ExecutionException;  
import java.util.concurrent.ExecutorService;  
import java.util.concurrent.Executors;  
import java.util.concurrent.Future;  
public class MyFutureTask {  
    /** 
     * @param args 
     * @throws InterruptedException  
     * @throws ExecutionException 
     * @throws InterruptedException 
     * @throws ExecutionException  
     */  
    public static void main(String[] args) throws InterruptedException, ExecutionException {  
          
        final ExecutorService exe=Executors.newFixedThreadPool(3);  
        Callable<String> call=new Callable<String>(){  
            public String call() throws InterruptedException {  
                return "Thread is finished";  
            }  
        };  
        Future<String> task=exe.submit(call);  
        String obj=task.get();  
        System.out.println(obj+"进程结束");  
        System.out.println("总进程结束");  
        exe.shutdown();  
    }  
}  
class MyThreadTest implements Runnable {  
    private String str;  
    public MyThreadTest(String str) {  
        this.str = str;  
    }  
    public void run() {  
        this.setStr("allen"+str);  
    }  
    public void addString(String str) {  
        this.str = "allen:" + str;  
    }  
    public String getStr() {  
        return str;  
    }  
    public void setStr(String str) {  
        this.str = str;  
    }  
}

定时器延时器

创建延时任务,取消任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import static java.util.concurrent.TimeUnit.SECONDS;  
  
import java.util.Date;  
  
import java.util.concurrent.Executors;  
  
import java.util.concurrent.ScheduledExecutorService;  
  
import java.util.concurrent.ScheduledFuture;  
  
public class TestScheduledThread {  
    public static void main(String[] args) {  
        final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);  
        final Runnable beeper = new Runnable() {  
            int count = 0;  
            public void run() {  
                System.out.println(new Date() + " beep " + (++count));  
            }  
        };  
  
        // 1秒钟后运行,并每隔2秒运行一次  
        final ScheduledFuture beeperHandle = scheduler.scheduleAtFixedRate(beeper, 12, SECONDS);  
        // 2秒钟后运行,并每次在上次任务运行完后等待5秒后重新运行  
        final ScheduledFuture beeperHandle2 = scheduler.scheduleWithFixedDelay(beeper, 25, SECONDS);  
        // 30秒后结束关闭任务,并且关闭Scheduler  
        scheduler.schedule(new Runnable() {  
            public void run() {  
                beeperHandle.cancel(true);  
                beeperHandle2.cancel(true);  
                scheduler.shutdown();  
            }  
  
        }, 30, SECONDS);   
    }   
}