一, Dubbo eager
TaskQueued 继承自LinkedBlockingQueue
/**
* TaskQueue in the EagerThreadPoolExecutor
* It offer a task if the executor's submittedTaskCount less than currentPoolThreadSize
* or the currentPoolThreadSize more than executor's maximumPoolSize.
* That can make the executor create new worker
* when the task num is bigger than corePoolSize but less than maximumPoolSize.
*/
public class TaskQueue<R extends Runnable> extends LinkedBlockingQueue<Runnable> {
private static final long serialVersionUID = -2635853580887179627L;
private EagerThreadPoolExecutor executor;
public TaskQueue(int capacity) {
super(capacity);
}
public void setExecutor(EagerThreadPoolExecutor exec) {
executor = exec;
}
@Override
public boolean offer(Runnable runnable) {
if (executor == null) {
throw new RejectedExecutionException("The task queue does not have executor!");
}
int currentPoolThreadSize = executor.getPoolSize();
// have free worker. put task into queue to let the worker deal with task.
if (executor.getSubmittedTaskCount() < currentPoolThreadSize) {
return super.offer(runnable);
}
// return false to let executor create new worker.
if (currentPoolThreadSize < executor.getMaximumPoolSize()) {
return false;
}
// currentPoolThreadSize >= max
return super.offer(runnable);
}
/**
* retry offer task
*
* @param o task
* @return offer success or not
* @throws RejectedExecutionException if executor is terminated.
*/
public boolean retryOffer(Runnable o, long timeout, TimeUnit unit) throws InterruptedException {
if (executor.isShutdown()) {
throw new RejectedExecutionException("Executor is shutdown!");
}
return super.offer(o, timeout, unit);
}
}
EagerThreadPoolExecutor
1.默认线程池ThreadPoolExecutor ,如果线程池达到核心线程数后,新提交的任务进入队列
2.Eager线程池,当线程池达到核心线程数后,会继续创建线程
public class EagerThreadPoolExecutor extends ThreadPoolExecutor {
/**
* task count
*/
private final AtomicInteger submittedTaskCount = new AtomicInteger(0);
public EagerThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit, TaskQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
}
/**
* @return current tasks which are executed
*/
public int getSubmittedTaskCount() {
return submittedTaskCount.get();
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
submittedTaskCount.decrementAndGet();
}
@Override
public void execute(Runnable command) {
if (command == null) {
throw new NullPointerException();
}
// do not increment in method beforeExecute!
submittedTaskCount.incrementAndGet();
try {
super.execute(command);
} catch (RejectedExecutionException rx) {
// retry to offer the task into queue.
final TaskQueue queue = (TaskQueue) super.getQueue();
try {
if (!queue.retryOffer(command, 0, TimeUnit.MILLISECONDS)) {
submittedTaskCount.decrementAndGet();
throw new RejectedExecutionException("Queue capacity is full.", rx);
}
} catch (InterruptedException x) {
submittedTaskCount.decrementAndGet();
throw new RejectedExecutionException(x);
}
} catch (Throwable t) {
// decrease any way
submittedTaskCount.decrementAndGet();
throw t;
}
}
}
看起来与tomcat线程池类似
二,Tomcat
tomcat ThreadPoolExecutor
//org.apache.tomcat.util.threads.ThreadPoolExecutor
public void execute(Runnable command, long timeout, TimeUnit unit) {
submittedCount.incrementAndGet();
try {
super.execute(command);
} catch (RejectedExecutionException rx) {
if (super.getQueue() instanceof TaskQueue) {
final TaskQueue queue = (TaskQueue)super.getQueue();
try {
if (!queue.force(command, timeout, unit)) {
submittedCount.decrementAndGet();
throw new RejectedExecutionException("Queue capacity is full.");
}
} catch (InterruptedException x) {
submittedCount.decrementAndGet();
throw new RejectedExecutionException(x);
}
} else {
submittedCount.decrementAndGet();
throw rx;
}
}
}
tomcat TaskQueue force方法与dubbo TaskQueue的retryOffer
public boolean force(Runnable o, long timeout, TimeUnit unit) throws InterruptedException {
if ( parent==null || parent.isShutdown() ) throw new RejectedExecutionException("Executor not running, can't force a command into the queue");
return super.offer(o,timeout,unit); //forces the item onto the queue, to be used if the task is rejected
}
@Override
public boolean offer(Runnable o) {
//we can't do any checks
if (parent==null) return super.offer(o);
//we are maxed out on threads, simply queue the object
if (parent.getPoolSize() == parent.getMaximumPoolSize()) return super.offer(o);
//we have idle threads, just add it to the queue
if (parent.getSubmittedCount()<(parent.getPoolSize())) return super.offer(o);
//if we have less threads than maximum force creation of a new thread
if (parent.getPoolSize()<parent.getMaximumPoolSize()) return false;
//if we reached here, we need to add it to the queue
return super.offer(o);
}
catalina使用的线程池StandardThreadExecutor,这个也是封装的上边的ThreadPoolExecutor
protected void startInternal() throws LifecycleException {
taskqueue = new TaskQueue(maxQueueSize);
TaskThreadFactory tf = new TaskThreadFactory(namePrefix,daemon,getThreadPriority());
executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), maxIdleTime, TimeUnit.MILLISECONDS,taskqueue, tf);
executor.setThreadRenewalDelay(threadRenewalDelay);
if (prestartminSpareThreads) {
executor.prestartAllCoreThreads();
}
taskqueue.setParent(executor);
setState(LifecycleState.STARTING);
}
@Override
public void execute(Runnable command, long timeout, TimeUnit unit) {
if ( executor != null ) {
executor.execute(command,timeout,unit);
} else {
throw new IllegalStateException("StandardThreadExecutor not started.");
}
}
@Override
public void execute(Runnable command) {
if ( executor != null ) {
try {
executor.execute(command);
} catch (RejectedExecutionException rx) {
//there could have been contention around the queue
if ( !( (TaskQueue) executor.getQueue()).force(command) ) throw new RejectedExecutionException("Work queue full.");
}
} else throw new IllegalStateException("StandardThreadPool not started.");
}
但是TaskQueue用的是同一个
public boolean force(Runnable o, long timeout, TimeUnit unit) throws InterruptedException {
if ( parent==null || parent.isShutdown() ) throw new RejectedExecutionException("Executor not running, can't force a command into the queue");
return super.offer(o,timeout,unit); //forces the item onto the queue, to be used if the task is rejected
}
@Override
public boolean offer(Runnable o) {
//we can't do any checks
if (parent==null) return super.offer(o);
//we are maxed out on threads, simply queue the object
if (parent.getPoolSize() == parent.getMaximumPoolSize()) return super.offer(o);
//we have idle threads, just add it to the queue
if (parent.getSubmittedCount()<(parent.getPoolSize())) return super.offer(o);
//if we have less threads than maximum force creation of a new thread
if (parent.getPoolSize()<parent.getMaximumPoolSize()) return false;
//if we reached here, we need to add it to the queue
return super.offer(o);
}
三 Jetty
Jetty线程池看起来简单的多,直接丢给queue就完事了
QueuedThreadPool
@Override
protected void doStart() throws Exception
{
super.doStart();
_threadsStarted.set(0);
if (_jobs==null)
{
_jobs=_maxQueued>0 ?new ArrayBlockingQueue<Runnable>(_maxQueued)
:new BlockingArrayQueue<Runnable>(_minThreads,_minThreads);
}
int threads=_threadsStarted.get();
while (isRunning() && threads<_minThreads)
{
startThread(threads);
threads=_threadsStarted.get();
}
}
public void execute(Runnable job)
{
if (!dispatch(job))
throw new RejectedExecutionException();
}
public boolean dispatch(Runnable job)
{
if (isRunning())
{
final int jobQ = _jobs.size();
final int idle = getIdleThreads();
if(_jobs.offer(job))
{
// If we had no idle threads or the jobQ is greater than the idle threads
if (idle==0 || jobQ>idle)
{
int threads=_threadsStarted.get();
if (threads<_maxThreads)
startThread(threads);
}
return true;
}
}
return false;
}
一, Dubbo eager
TaskQueued 继承自LinkedBlockingQueue
EagerThreadPoolExecutor
1.默认线程池ThreadPoolExecutor ,如果线程池达到核心线程数后,新提交的任务进入队列
2.Eager线程池,当线程池达到核心线程数后,会继续创建线程
看起来与tomcat线程池类似
二,Tomcat
tomcat ThreadPoolExecutor
tomcat TaskQueue force方法与dubbo TaskQueue的retryOffer
catalina使用的线程池StandardThreadExecutor,这个也是封装的上边的ThreadPoolExecutor
但是TaskQueue用的是同一个
三 Jetty
Jetty线程池看起来简单的多,直接丢给queue就完事了
QueuedThreadPool