0%

ThreadPoolExecutor自定义线程池

文章字数:316,阅读全文大约需要1分钟

自定义线程池,设置数据

构造方法

1
2
3
4
5
6
7
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue)

ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory)

ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler)

ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)

构造方法参数

名称 类型 含义
corePoolSize int 核心线程池大小
maximumPoolSize int 最大线程池大小
keepAliveTime long 最大线程池空闲时间
unit TimeUnit 时间单位
workQueue BlockingQueue 线程等待队列,如ArrayBlockingQueue,有界队列,构造函数需要传入队列最大值。
threadFactory ThreadFactory 线程创建工厂
handler RejectedExecutionHandler 拒绝策略

预定义线程池

  1. FixedThreadPool:
1
2
3
4
5
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
  • 核心线程数和最大线程数相同,所以是固定大小。
  • keepAliveTime对核心线程无效
  • LinkedBlockingQueue是无界阻塞队列,最大值是Integer.MAX_VALUE。如果提交速度大于处理速度,会造成队列阻塞,又因为队列无界,所以可能会内存溢出。
  1. CachedThreadPool:
1
2
3
4
5
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
  • 核心线程0,最大线程Integer.MAX_VALUE。无核心线程,最大线程几乎无限。
  • keepAliveTime = 60,即60s后空闲线程自动结束
  • workQueueSynchronousQueue,无缓冲队列。入队和出队必须同时进行。

自定义线程池案例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
public class ThreadTest {

public static void main(String[] args) throws InterruptedException, IOException {
int corePoolSize = 2;
int maximumPoolSize = 4;
long keepAliveTime = 10;
TimeUnit unit = TimeUnit.SECONDS;
BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(2);
ThreadFactory threadFactory = new NameTreadFactory();
RejectedExecutionHandler handler = new MyIgnorePolicy();
ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit,
workQueue, threadFactory, handler);
executor.prestartAllCoreThreads(); // 预启动所有核心线程

for (int i = 1; i <= 10; i++) {
MyTask task = new MyTask(String.valueOf(i));
executor.execute(task);
}

System.in.read(); //阻塞主线程
}

static class NameTreadFactory implements ThreadFactory {

private final AtomicInteger mThreadNum = new AtomicInteger(1);

@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r, "my-thread-" + mThreadNum.getAndIncrement());
System.out.println(t.getName() + " has been created");
return t;
}
}

public static class MyIgnorePolicy implements RejectedExecutionHandler {

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
doLog(r, e);
}

private void doLog(Runnable r, ThreadPoolExecutor e) {
// 可做日志记录等
System.err.println( r.toString() + " rejected");
// System.out.println("completedTaskCount: " + e.getCompletedTaskCount());
}
}

static class MyTask implements Runnable {
private String name;

public MyTask(String name) {
this.name = name;
}

@Override
public void run() {
try {
System.out.println(this.toString() + " is running!");
Thread.sleep(3000); //让任务执行慢点
} catch (InterruptedException e) {
e.printStackTrace();
}
}

public String getName() {
return name;
}

@Override
public String toString() {
return "MyTask [name=" + name + "]";
}
}
}