关键词:
Thread、RunnableThreadPoolExecutor、ScheduledThreadPoolExecutorThread Starvation Dead Lock)Executors两种方式
Thread类并重写run方法 Thread thread = new Thread() {
@Override
public void run() {
log.debug("thread test");
}
};
Runnable接口 Runnable runnable = () -> {
log.debug("thread runnable test");
};
Thread thread = new Thread(runnable);
要想启动一个线程,只需调用该线程的start方法即可
thread.start();
线程池接口java.util.concurrent.Executor,其主要实现类有
java.util.concurrent.ThreadPoolExecutor
java.util.concurrent.ScheduledThreadPoolExecutor
java.util.concurrent.ForkJoinPool
这三个类都直接或间接的继承了抽象类java.util.concurrent.AbstractExecutorService。
构造器
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
还有几个重载的构造器与此类似。
参数说明:
corePoolSize:线程池中保留的线程数,即使线程是空闲的,除非设置了参数allowCoreThreadTimeOut
maximumPoolSize:线程池中能达到的最大线程数
keepAliveTime/unit:当线程池中线程数超过corePoolSize,这是这些线程在被终止前等待新任务的最大时间
workQueue:任务在被执行之前保存在该队列中,该队列可以是有届队列也可以是无届队列
threadFactory:线程工厂,创建新线程
handler:决定了当线程池中线程数达到maximumPoolSize并且workQueue已满时新提交的任务会被怎么处理。
Java提供了4个实现:
AbortPolicy:直接拒绝新提交的任务并抛出RejectedExecutionExceptionDiscardOldestPolicy:忽略先前的未被执行的任务并尝试执行当前新提交的任务CallerRunsPolicy:直接在调用线程池的execute的线程中执行新提交的任务DiscardPolicy:直接忽略新提交的任务corePoolSize、maximumPoolSize、workQueue、handler四者之间的关
corePoolSize时,创建新线程corePoolSize后并且队列未满时,将任务放入workQueuecorePoolSize后并且队列已满而线程数还未达到maximumPoolSize时,创建新线程maximumPoolSize后,新任务将被handler处理 ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
4,
8,
0, TimeUnit.SECONDS, new LinkedBlockingQueue<>(2),
Executors.defaultThreadFactory(),
(r, executor) -> log.debug("just ignore {},executor {}", r, executor));
log.debug("executor {}", threadPoolExecutor);
for (int i = 0; i < 16; i++) {
threadPoolExecutor.submit(() -> {
try {
Thread.sleep(3000);
log.debug("thread {} execute", Thread.currentThread());
} catch (InterruptedException e) {
}
});
}
threadPoolExecutor.shutdown();
示例代码创建了一个corePoolSize = 4、maximumPoolSize = 8、workQueue size = 2、handler为仅打印并忽略任务的线程池。
在示例代码中,向线程池提交了16个任务,从日志中观察可知,其中执行了10个,其余6个都被忽略了。
ScheduledThreadPoolExecutor是一个定时调度器,实现了定点执行任务与周期性执行任务。
它继承了java.util.concurrent.ThreadPoolExecutor并实现了接口java.util.concurrent.ScheduledExecutorService。
构造器与ThreadPoolExecutor类似,提交定时或周期性任务使用scheduleXXX,不再啰嗦。
线程饥饿死锁(Thread Starvation Dead Lock)
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
1,
8,
0, TimeUnit.SECONDS, new LinkedBlockingQueue<>(2),
Executors.defaultThreadFactory(),
(r, executor) -> log.debug("just ignore {},executor {}", r, executor));
Callable<String> taskA = () -> {
Callable<String> taskB = () -> "task b";
Future<String> future = threadPoolExecutor.submit(taskB);
String resultOfTaskB = future.get();
log.debug("task b result: {}", resultOfTaskB);
return "task a ".concat(resultOfTaskB);
};
Future<String> future = threadPoolExecutor.submit(taskA);
String resultOfTaskA = future.get();
log.debug("task a result: {}", resultOfTaskA);
threadPoolExecutor.shutdown();
如上代码,taskA将任务taskB提交到同一个Executor实例中,由于corePoolSiz = 1,
taskB将进入workQueue中等待taskA执行结束,而taskA又在等待taskB的执行结果,从而导致了taskA、taskB互相等待对方的执行。
这种情况叫做线程饥饿死锁(Thread Starvation Dead Lock)。
解决线程饥饿死锁的方法:
corePoolSize值设置为足够支持你的线程数的值(这样如果你不设置allowCoreThreadTimeOut,你的空闲线程将不能被回收)taskB创建另一个线程池corePoolSize值设置为0,将maximumPoolSize设置为足够支持你的线程数的值(确保你的线程数不会达到该值,否则新提交的任务将会被handler处理),
并且workQueue使用SynchronousQueue(Executors的newCachedThreadPool方法就是这样做的)所以,尽量不要在任务内部向同一个Executor实例提交任务,以免造成线程饥饿死锁。
ThreadPoolExecutor提供了三个未做任何操作的空方法,如下:
//任务执行前被调用
protected void beforeExecute(Thread t, Runnable r) { }
//任务执行后被调用
protected void afterExecute(Runnable r, Throwable t) { }
/线程池关闭时被调用
protected void terminated() { }
可以重写这三个方法来实现对ThreadPoolExecutor的扩展,比如实现任务执行耗时统计/日志记录等。
示例代码
//扩展ThreadPoolExecutor
class ExtThreadPoolExecutor extends ThreadPoolExecutor {
//记录线程开始执行的时间
private ThreadLocal<Long> startThreadLocal = new ThreadLocal<>();
//记录线程执行的总时间
private AtomicLong totalTime = new AtomicLong(0);
//记录已执行的线程数
private AtomicLong totalCount = new AtomicLong(0);
public ExtThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
public ExtThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
}
public ExtThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
}
public ExtThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
}
@Override
protected void beforeExecute(Thread t, Runnable r) {
super.beforeExecute(t, r);
startThreadLocal.set(System.currentTimeMillis());
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
if (t == null) {
Long start = startThreadLocal.get();
startThreadLocal.remove();
long thisTime = System.currentTimeMillis() - start;
long count = totalCount.addAndGet(1);
long total = totalTime.addAndGet(thisTime);
log.debug("task {} taking {} ms, average taking {} ms", r, thisTime, total / count);
} else {
log.error("task {} exception {}", r, t);
}
}
@Override
protected void terminated() {
super.terminated();
log.debug("threadPoolExecutor {} terminated", this);
}
}
//测试自己扩展的ExtThreadPoolExecutor
int nThreads = Runtime.getRuntime().availableProcessors();
ThreadPoolExecutor threadPoolExecutor = new ExtThreadPoolExecutor(
nThreads,
nThreads << 1,
0, TimeUnit.SECONDS,
new LinkedBlockingQueue<>());
for (int i = 0; i < nThreads << 1; i++) {
threadPoolExecutor.submit(() -> {
try {
Thread.sleep(new Random().nextInt(3000));
log.debug("current time: {}", LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
threadPoolExecutor.shutdown();
Java在其类库中java.util.concurrent.Executors中提供了些创建线程池的静态工厂方法:
newFixedThreadPool:创建一个corePoolSize = maximumPoolSize 、workQueue无界的线程池newWorkStealingPool:创建一个ForkJoinPoolnewCachedThreadPool:创建一个corePoolSize = 0、maximumPoolSize = Integer.MAX_VALUE、workQueue为SynchronousQueue的线程池。
当一个新任务被提交到SynchronousQueue时,必须有另一个线程正在等待接受这个任务。如果没有线程正在等待,且线程数未达到最大线程数,则创建新的线程;否则任务将会被handler处理。newScheduledThreadPool:创建一个定时任务线程池newSingleThreadExecutor:创建一个单线程的线程池newSingleThreadScheduledExecutor:创建一个单线程的定时任务线程池