自定义线程池拒绝策略_创建线程池的三种方法

Դ腾讯新闻

ߣseo技术

18

2021-11-06 13:44:46

在Java高并发领域,线程池一直是一个不可回避的话题。有些童鞋一直在用线池。然而,如何创建线程池仅仅停留在使用执行器工具类的方式上。那么,创建线程池的方法有哪些呢?让我们从创建线程池的源代码中深入分析创建线程池的方法。

使用Executors工具类创建线程池

创建线程池时,初学者最常使用工具类Executors。但是,当使用这个工具类创建线程池时,它非常简单,没有过多关注线程池的细节,只是传入必要的参数。Executors工具类提供了几种创建线程池的方法,如下所示。

Executors.newCachedThreadPool:创建一个可缓存的线程池。如果线程池的大小超过需要,空闲线程可以灵活回收。如果没有可回收的线程,可以创建一个新线程。

Executors.newFixedThreadPool:创建一个固定长度的线程池,可以控制并发线程的最大数量,多余的线程会在队列中等待。

执行者。newscheduledthreadpool:创建一个固定长度的线程池,以支持定期和周期性的任务执行。

执行者。newsinglethreadexecutor :创建一个单线程线程池,使用一个唯一的工作线程来执行任务,并确保所有任务都按照指定的顺序(先进先出或优先级)执行。

执行者。newsingleterieadschedule executor :创建一个单线程线程池,支持定期和周期性的任务执行。

创建一个并行级别的窃取工作的线程池。

其中,

Executors.newWorkStealingPool方法是Java 8中创建线程池的一种新方法。它可以为线程池设置并行级别,具有较高的并发性和性能。除了这个方法之外,其他创建线程池的方法本质上都是调用ThreadPoolExecutor类的构造函数方法。

例如,我们可以使用下面的代码创建一个线程池。

executors . new workstealingpool();

executors . new cachedthreadpool();

executors . newscheduledreadpool(3);

使用ThreadPoolExecutor类创建线程池

从代码结构来看,ThreadPoolExecutor类继承了AbstractExecutorService,也就是说,threadpoolexecutorservice类拥有AbstractExecutorService类的所有功能。

由于Executors工具类中创建的大部分线程池都调用了ThreadPoolExecutor类的构造函数,所以我们也可以直接调用ThreadPoolExecutor类的构造函数来创建线程池,而不用Executors工具类。接下来,让我们看看ThreadPoolExecutor类的构造方法。

ThreadPoolExecutor类中的所有构造方法如下。

公共线程池执行器(int corePoolSize,

int maximumPoolSize,

长keepAliveTime,

时间单位单位,

BlockingQueueRunnable workQueue){ 0

这个(corePoolSize,maximumPoolSize,keepAliveTime,unit,workQueue,

Executors.defaultThreadFactory()、defaultHandler);

}

公共线程池执行器(int corePoolSize,

int maximumPoolSize,

长keepAliveTime,

时间单位单位,

BlockingQueueRunnable工作队列,

thread factory thread factory){ 0

这个(corePoolSize,maximumPoolSize,keepAliveTime,unit,workQueue,

threadFactory,DefaultHandler);

}

公共线程池执行器(int corePoolSize,

int maximumPoolSize,

长keepAliveTime,

时间单位单位,

BlockingQueueRunnable工作队列,

RejectedExecutionHandler处理程序){ 0

这个(corePoolSize

, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), handler); } public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.acc = System.getSecurityManager() == null ? null : AccessController.getContext(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }

由ThreadPoolExecutor类的构造方法的源代码可知,创建线程池最终调用的构造方法如下。

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
			  long keepAliveTime, TimeUnit unit,
			  BlockingQueue<Runnable> workQueue,
			  ThreadFactory threadFactory,
		          RejectedExecutionHandler handler) {
	if (corePoolSize < 0 ||
		maximumPoolSize <= 0 ||
		maximumPoolSize < corePoolSize ||
		keepAliveTime < 0)
		throw new IllegalArgumentException();
	if (workQueue == null || threadFactory == null || handler == null)
		throw new NullPointerException();
	this.acc = System.getSecurityManager() == null ?
			null :
			AccessController.getContext();
	this.corePoolSize = corePoolSize;
	this.maximumPoolSize = maximumPoolSize;
	this.workQueue = workQueue;
	this.keepAliveTime = unit.toNanos(keepAliveTime);
	this.threadFactory = threadFactory;
	this.handler = handler;
}

关于此构造方法中各参数的含义和作用,各位可以移步《高并发之——不得不说的线程池与ThreadPoolExecutor类浅析》进行查阅。

大家可以自行调用ThreadPoolExecutor类的构造方法来创建线程池。例如,我们可以使用如下形式创建线程池。

new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                       60L, TimeUnit.SECONDS,
                       new SynchronousQueue<Runnable>());

使用ForkJoinPool类创建线程池

在Java8的Executors工具类中,新增了如下创建线程池的方式。

public static ExecutorService newWorkStealingPool(int parallelism) {
	return new ForkJoinPool
		(parallelism,
		 ForkJoinPool.defaultForkJoinWorkerThreadFactory,
		 null, true);
}
public static ExecutorService newWorkStealingPool() {
	return new ForkJoinPool
		(Runtime.getRuntime().availableProcessors(),
		 ForkJoinPool.defaultForkJoinWorkerThreadFactory,
		 null, true);
}

从源代码可以可以,本质上调用的是ForkJoinPool类的构造方法类创建线程池,而从代码结构上来看ForkJoinPool类继承自AbstractExecutorService抽象类。接下来,我们看下ForkJoinPool类的构造方法。

public ForkJoinPool() {
	this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()),
		 defaultForkJoinWorkerThreadFactory, null, false);
}
 public ForkJoinPool(int parallelism) {
	this(parallelism, defaultForkJoinWorkerThreadFactory, null, false);
}
public ForkJoinPool(int parallelism,
				ForkJoinWorkerThreadFactory factory,
				UncaughtExceptionHandler handler,
				boolean asyncMode) {
	this(checkParallelism(parallelism),
		 checkFactory(factory),
		 handler,
		 asyncMode ? FIFO_QUEUE : LIFO_QUEUE,
		 "ForkJoinPool-" + nextPoolId() + "-worker-");
	checkPermission();
}
private ForkJoinPool(int parallelism,
				 ForkJoinWorkerThreadFactory factory,
				 UncaughtExceptionHandler handler,
				 int mode,
				 String workerNamePrefix) {
	this.workerNamePrefix = workerNamePrefix;
	this.factory = factory;
	this.ueh = handler;
	this.config = (parallelism & SMASK) | mode;
	long np = (long)(-parallelism); // offset ctl counts
	this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
}

通过查看源代码得知,ForkJoinPool的构造方法,最终调用的是如下私有构造方法。

private ForkJoinPool(int parallelism,
				 ForkJoinWorkerThreadFactory factory,
				 UncaughtExceptionHandler handler,
				 int mode,
				 String workerNamePrefix) {
	this.workerNamePrefix = workerNamePrefix;
	this.factory = factory;
	this.ueh = handler;
	this.config = (parallelism & SMASK) | mode;
	long np = (long)(-parallelism); // offset ctl counts
	this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
}

其中,各参数的含义如下所示。

  • parallelism:并发级别。
  • factory:创建线程的工厂类对象。
  • handler:当线程池中的线程抛出未捕获的异常时,统一使用UncaughtExceptionHandler对象处理。
  • mode:取值为FIFO_QUEUE或者LIFO_QUEUE。
  • workerNamePrefix:执行任务的线程名称的前缀。

当然,私有构造方法虽然是参数最多的一个方法,但是其不会直接对外方法,我们可以使用如下方式创建线程池。

new ForkJoinPool();
new ForkJoinPool(Runtime.getRuntime().availableProcessors());
new ForkJoinPool(Runtime.getRuntime().availableProcessors(),
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);

使用ScheduledThreadPoolExecutor类创建线程池

在Executors工具类中存在如下方法类创建线程池。

public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
	return new DelegatedScheduledExecutorService
		(new ScheduledThreadPoolExecutor(1));
}
public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) {
	return new DelegatedScheduledExecutorService
		(new ScheduledThreadPoolExecutor(1, threadFactory));
}
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
	return new ScheduledThreadPoolExecutor(corePoolSize);
}
public static ScheduledExecutorService newScheduledThreadPool(
		int corePoolSize, ThreadFactory threadFactory) {
	return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}

从源码来看,这几个方法本质上调用的都是
ScheduledThreadPoolExecutor类的构造方法,ScheduledThreadPoolExecutor中存在的构造方法如下所示。

public ScheduledThreadPoolExecutor(int corePoolSize) {
	super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
		  new DelayedWorkQueue());
}
public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) {
	super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
		  new DelayedWorkQueue(), threadFactory);
}
public ScheduledThreadPoolExecutor(int corePoolSize, RejectedExecutionHandler handler) {
	super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
		  new DelayedWorkQueue(), handler);
}
public ScheduledThreadPoolExecutor(int corePoolSize,ThreadFactory threadFactory, RejectedExecutionHandler handler) {
	super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
		  new DelayedWorkQueue(), threadFactory, handler);
}

而从代码结构上看,
ScheduledThreadPoolExecutor类继承自ThreadPoolExecutor类,本质上还是调用ThreadPoolExecutor类的构造方法,只不过此时传递的队列为DelayedWorkQueue。我们可以直接调用ScheduledThreadPoolExecutor类的构造方法来创建线程池,例如以如下形式创建线程池。

new ScheduledThreadPoolExecutor(3)
佭ϴý Ѷ Media8ý

在线客服

外链咨询

扫码加我微信

微信:juxia_com

返回顶部