在Java高并发领域,线程池一直是一个不可回避的话题。有些童鞋一直在用线池。然而,如何创建线程池仅仅停留在使用执行器工具类的方式上。那么,创建线程池的方法有哪些呢?让我们从创建线程池的源代码中深入分析创建线程池的方法。
使用Executors工具类创建线程池
创建线程池时,初学者最常使用工具类Executors。但是,当使用这个工具类创建线程池时,它非常简单,没有过多关注线程池的细节,只是传入必要的参数。Executors工具类提供了几种创建线程池的方法,如下所示。
Executors.newCachedThreadPool:创建一个可缓存的线程池。如果线程池的大小超过需要,空闲线程可以灵活回收。如果没有可回收的线程,可以创建一个新线程。
Executors.newFixedThreadPool:创建一个固定长度的线程池,可以控制并发线程的最大数量,多余的线程会在队列中等待。
执行者。newscheduledthreadpool:创建一个固定长度的线程池,以支持定期和周期性的任务执行。
执行者。newsinglethreadexecutor :创建一个单线程线程池,使用一个唯一的工作线程来执行任务,并确保所有任务都按照指定的顺序(先进先出或优先级)执行。
执行者。newsingleterieadschedule executor :创建一个单线程线程池,支持定期和周期性的任务执行。
创建一个并行级别的窃取工作的线程池。
其中,
Executors.newWorkStealingPool方法是Java 8中创建线程池的一种新方法。它可以为线程池设置并行级别,具有较高的并发性和性能。除了这个方法之外,其他创建线程池的方法本质上都是调用ThreadPoolExecutor类的构造函数方法。
例如,我们可以使用下面的代码创建一个线程池。
executors . new workstealingpool();
executors . new cachedthreadpool();
executors . newscheduledreadpool(3);
使用ThreadPoolExecutor类创建线程池
从代码结构来看,ThreadPoolExecutor类继承了AbstractExecutorService,也就是说,threadpoolexecutorservice类拥有AbstractExecutorService类的所有功能。
由于Executors工具类中创建的大部分线程池都调用了ThreadPoolExecutor类的构造函数,所以我们也可以直接调用ThreadPoolExecutor类的构造函数来创建线程池,而不用Executors工具类。接下来,让我们看看ThreadPoolExecutor类的构造方法。
ThreadPoolExecutor类中的所有构造方法如下。
公共线程池执行器(int corePoolSize,
int maximumPoolSize,
长keepAliveTime,
时间单位单位,
BlockingQueueRunnable workQueue){ 0
这个(corePoolSize,maximumPoolSize,keepAliveTime,unit,workQueue,
Executors.defaultThreadFactory()、defaultHandler);
}
公共线程池执行器(int corePoolSize,
int maximumPoolSize,
长keepAliveTime,
时间单位单位,
BlockingQueueRunnable工作队列,
thread factory thread factory){ 0
这个(corePoolSize,maximumPoolSize,keepAliveTime,unit,workQueue,
threadFactory,DefaultHandler);
}
公共线程池执行器(int corePoolSize,
int maximumPoolSize,
长keepAliveTime,
时间单位单位,
BlockingQueueRunnable工作队列,
RejectedExecutionHandler处理程序){ 0
这个(corePoolSize
, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), handler); } public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.acc = System.getSecurityManager() == null ? null : AccessController.getContext(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }由ThreadPoolExecutor类的构造方法的源代码可知,创建线程池最终调用的构造方法如下。
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
关于此构造方法中各参数的含义和作用,各位可以移步《高并发之——不得不说的线程池与ThreadPoolExecutor类浅析》进行查阅。
大家可以自行调用ThreadPoolExecutor类的构造方法来创建线程池。例如,我们可以使用如下形式创建线程池。
new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
使用ForkJoinPool类创建线程池
在Java8的Executors工具类中,新增了如下创建线程池的方式。
public static ExecutorService newWorkStealingPool(int parallelism) {
return new ForkJoinPool
(parallelism,
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
public static ExecutorService newWorkStealingPool() {
return new ForkJoinPool
(Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
从源代码可以可以,本质上调用的是ForkJoinPool类的构造方法类创建线程池,而从代码结构上来看ForkJoinPool类继承自AbstractExecutorService抽象类。接下来,我们看下ForkJoinPool类的构造方法。
public ForkJoinPool() {
this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()),
defaultForkJoinWorkerThreadFactory, null, false);
}
public ForkJoinPool(int parallelism) {
this(parallelism, defaultForkJoinWorkerThreadFactory, null, false);
}
public ForkJoinPool(int parallelism,
ForkJoinWorkerThreadFactory factory,
UncaughtExceptionHandler handler,
boolean asyncMode) {
this(checkParallelism(parallelism),
checkFactory(factory),
handler,
asyncMode ? FIFO_QUEUE : LIFO_QUEUE,
"ForkJoinPool-" + nextPoolId() + "-worker-");
checkPermission();
}
private ForkJoinPool(int parallelism,
ForkJoinWorkerThreadFactory factory,
UncaughtExceptionHandler handler,
int mode,
String workerNamePrefix) {
this.workerNamePrefix = workerNamePrefix;
this.factory = factory;
this.ueh = handler;
this.config = (parallelism & SMASK) | mode;
long np = (long)(-parallelism); // offset ctl counts
this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
}
通过查看源代码得知,ForkJoinPool的构造方法,最终调用的是如下私有构造方法。
private ForkJoinPool(int parallelism,
ForkJoinWorkerThreadFactory factory,
UncaughtExceptionHandler handler,
int mode,
String workerNamePrefix) {
this.workerNamePrefix = workerNamePrefix;
this.factory = factory;
this.ueh = handler;
this.config = (parallelism & SMASK) | mode;
long np = (long)(-parallelism); // offset ctl counts
this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
}
其中,各参数的含义如下所示。
- parallelism:并发级别。
- factory:创建线程的工厂类对象。
- handler:当线程池中的线程抛出未捕获的异常时,统一使用UncaughtExceptionHandler对象处理。
- mode:取值为FIFO_QUEUE或者LIFO_QUEUE。
- workerNamePrefix:执行任务的线程名称的前缀。
当然,私有构造方法虽然是参数最多的一个方法,但是其不会直接对外方法,我们可以使用如下方式创建线程池。
new ForkJoinPool();
new ForkJoinPool(Runtime.getRuntime().availableProcessors());
new ForkJoinPool(Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
使用ScheduledThreadPoolExecutor类创建线程池
在Executors工具类中存在如下方法类创建线程池。
public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
return new DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1));
}
public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) {
return new DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1, threadFactory));
}
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public static ScheduledExecutorService newScheduledThreadPool(
int corePoolSize, ThreadFactory threadFactory) {
return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}
从源码来看,这几个方法本质上调用的都是
ScheduledThreadPoolExecutor类的构造方法,ScheduledThreadPoolExecutor中存在的构造方法如下所示。
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory);
}
public ScheduledThreadPoolExecutor(int corePoolSize, RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), handler);
}
public ScheduledThreadPoolExecutor(int corePoolSize,ThreadFactory threadFactory, RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory, handler);
}
而从代码结构上看,
ScheduledThreadPoolExecutor类继承自ThreadPoolExecutor类,本质上还是调用ThreadPoolExecutor类的构造方法,只不过此时传递的队列为DelayedWorkQueue。我们可以直接调用ScheduledThreadPoolExecutor类的构造方法来创建线程池,例如以如下形式创建线程池。
new ScheduledThreadPoolExecutor(3)