异常现象
在进行性能测试的时候发现,dubbo线程consumer服务直接线程打满超过 5000个线程,而实际配置的线程数 明明最多配置的是1000
原因
先看下consumer配置参数
<dubbo:consumer timeout="1000" threads="1000" connections="10"
这里配置了10个连接,线程池大小1000;在只有一台provider的前提下,consumer怎么可能会超过5000个线程,加上容器最大线程限制,导致start完consumer服务直接卡死或者down掉了
实际上早起dubbo版本是一个连接一个线程池的,那么10个连接,自然就是最大10*1000
public WrappedChannelHandler(ChannelHandler handler, URL url) {
this.handler = handler;
this.url = url;
executor = (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url);
String componentKey = Constants.EXECUTOR_SERVICE_COMPONENT_KEY;
if (CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(SIDE_KEY))) {
componentKey = CONSUMER_SIDE;
}
DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
dataStore.put(componentKey, Integer.toString(url.getPort()), executor);
}
//默认使用fixed线程池
public class FixedThreadPool implements ThreadPool {
@Override
public Executor getExecutor(URL url) {
String name = url.getParameter(THREAD_NAME_KEY, DEFAULT_THREAD_NAME);
int threads = url.getParameter(THREADS_KEY, DEFAULT_THREADS);
int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES);
return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS,
queues == 0 ? new SynchronousQueue<Runnable>() :
(queues < 0 ? new LinkedBlockingQueue<Runnable>()
: new LinkedBlockingQueue<Runnable>(queues)),
new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
}
}
解决方案
可以参考官网issue
https://github.com/apache/dubbo/pull/4131
Reduce context switching cost by optimizing thread model on consumer side
可以升级版本或者自定义线程池
比如2.7.5
public ExecutorService getExecutor(URL url) {
String componentKey = EXECUTOR_SERVICE_COMPONENT_KEY;
if (CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(SIDE_KEY))) {
componentKey = CONSUMER_SIDE;
}
Map<Integer, ExecutorService> executors = data.get(componentKey);
/**
* It's guaranteed that this method is called after {@link #createExecutorIfAbsent(URL)}, so data should already
* have Executor instances generated and stored.
*/
if (executors == null) {
logger.warn("No available executors, this is not expected, framework should call createExecutorIfAbsent first " +
"before coming to here.");
return null;
}
Integer portKey = url.getPort();
ExecutorService executor = executors.get(portKey);
if (executor != null) {
if (executor.isShutdown() || executor.isTerminated()) {
executors.remove(portKey);
executor = createExecutor(url);
executors.put(portKey, executor);//相同port共享
}
}
return executor;
}
3.x
1.先调用createExecutorIfAbsent 创建线程池,注意这里是以port为维度的,也就是同一个dubbo服务共享一个线程池
2.createExecutor具体的创建,这里才调用那几种fixed,cached,eager等
3.用的时候判断有没有就行了,没有的话只能用shared的共享线程池
public synchronized ExecutorService createExecutorIfAbsent(URL url) {
Map<Integer, ExecutorService> executors = data.computeIfAbsent(getExecutorKey(url), k -> new ConcurrentHashMap<>());
// Consumer's executor is sharing globally, key=Integer.MAX_VALUE. Provider's executor is sharing by protocol.
Integer portKey = CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(SIDE_KEY)) ? Integer.MAX_VALUE : url.getPort();
if (url.getParameter(THREAD_NAME_KEY) == null) {
url = url.putAttribute(THREAD_NAME_KEY, "Dubbo-protocol-" + portKey);
}
URL finalUrl = url;
ExecutorService executor = executors.computeIfAbsent(portKey, k -> createExecutor(finalUrl));
// If executor has been shut down, create a new one
if (executor.isShutdown() || executor.isTerminated()) {
executors.remove(portKey);
executor = createExecutor(url);//创建线程池
executors.put(portKey, executor);
}
return executor;
}
/**
* Return the executor key based on the type (internal or biz) of the current service.
*
* @param url
* @return
*/
private String getExecutorKey(URL url) {
String executorKey = INTERNAL_EXECUTOR_SERVICE_COMPONENT_KEY;
ServiceDescriptor serviceDescriptor = applicationModel.getInternalModule().getServiceRepository().lookupService(url.getServiceInterface());
// if not found in internal service repository, then it's biz service defined by user.
if (serviceDescriptor == null) {
executorKey = EXECUTOR_SERVICE_COMPONENT_KEY;
}
return executorKey;
}
private ExecutorService createExecutor(URL url) {
return (ExecutorService) extensionAccessor.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url);
}
@Override
public ExecutorService getExecutor(URL url) {
Map<Integer, ExecutorService> executors = data.get(getExecutorKey(url));
/*
* It's guaranteed that this method is called after {@link #createExecutorIfAbsent(URL)}, so data should already
* have Executor instances generated and stored.
*/
if (executors == null) {
logger.warn("No available executors, this is not expected, framework should call createExecutorIfAbsent first " +
"before coming to here.");
return null;
}
// Consumer's executor is sharing globally, key=Integer.MAX_VALUE. Provider's executor is sharing by protocol.
Integer portKey = CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(SIDE_KEY)) ? Integer.MAX_VALUE : url.getPort();
ExecutorService executor = executors.get(portKey);
if (executor != null && (executor.isShutdown() || executor.isTerminated())) {//如果已经shutdown 直接清除缓存
executors.remove(portKey);
// Does not re-create a shutdown executor, use SHARED_EXECUTOR for downgrade.
executor = null;
logger.info("Executor for " + url + " is shutdown.");
}
if (executor == null) {//如果没有使用默认的共享线程池,是一个cache类型的
return frameworkExecutorRepository.getSharedExecutor();
} else {
return executor;
}
}
异常现象
在进行性能测试的时候发现,dubbo线程consumer服务直接线程打满超过 5000个线程,而实际配置的线程数 明明最多配置的是1000
原因
先看下consumer配置参数
这里配置了10个连接,线程池大小1000;在只有一台provider的前提下,consumer怎么可能会超过5000个线程,加上容器最大线程限制,导致start完consumer服务直接卡死或者down掉了
实际上早起dubbo版本是一个连接一个线程池的,那么10个连接,自然就是最大10*1000
解决方案
可以参考官网issue
Reduce context switching cost by optimizing thread model on consumer side
可以升级版本或者自定义线程池
比如2.7.5
3.x
1.先调用createExecutorIfAbsent 创建线程池,注意这里是以port为维度的,也就是同一个dubbo服务共享一个线程池
2.createExecutor具体的创建,这里才调用那几种fixed,cached,eager等
3.用的时候判断有没有就行了,没有的话只能用shared的共享线程池