Skip to content

dubbo comsuer线程池  #14

Description

@2pc

异常现象

在进行性能测试的时候发现,dubbo线程consumer服务直接线程打满超过 5000个线程,而实际配置的线程数 明明最多配置的是1000

原因

先看下consumer配置参数

  <dubbo:consumer timeout="1000" threads="1000"  connections="10" 

这里配置了10个连接,线程池大小1000;在只有一台provider的前提下,consumer怎么可能会超过5000个线程,加上容器最大线程限制,导致start完consumer服务直接卡死或者down掉了
实际上早起dubbo版本是一个连接一个线程池的,那么10个连接,自然就是最大10*1000

  public WrappedChannelHandler(ChannelHandler handler, URL url) {
      this.handler = handler;
      this.url = url;
      executor = (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url);

      String componentKey = Constants.EXECUTOR_SERVICE_COMPONENT_KEY;
      if (CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(SIDE_KEY))) {
          componentKey = CONSUMER_SIDE;
      }
      DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
      dataStore.put(componentKey, Integer.toString(url.getPort()), executor);
  }
//默认使用fixed线程池
public class FixedThreadPool implements ThreadPool {

    @Override
    public Executor getExecutor(URL url) {
        String name = url.getParameter(THREAD_NAME_KEY, DEFAULT_THREAD_NAME);
        int threads = url.getParameter(THREADS_KEY, DEFAULT_THREADS);
        int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES);
        return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS,
                queues == 0 ? new SynchronousQueue<Runnable>() :
                        (queues < 0 ? new LinkedBlockingQueue<Runnable>()
                                : new LinkedBlockingQueue<Runnable>(queues)),
                new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
    }

}

解决方案

可以参考官网issue

https://github.com/apache/dubbo/pull/4131

Reduce context switching cost by optimizing thread model on consumer side

可以升级版本或者自定义线程池
比如2.7.5

    public ExecutorService getExecutor(URL url) {
        String componentKey = EXECUTOR_SERVICE_COMPONENT_KEY;
        if (CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(SIDE_KEY))) {
            componentKey = CONSUMER_SIDE;
        }
        Map<Integer, ExecutorService> executors = data.get(componentKey);

        /**
         * It's guaranteed that this method is called after {@link #createExecutorIfAbsent(URL)}, so data should already
         * have Executor instances generated and stored.
         */
        if (executors == null) {
            logger.warn("No available executors, this is not expected, framework should call createExecutorIfAbsent first " +
                    "before coming to here.");
            return null;
        }

        Integer portKey = url.getPort();
        ExecutorService executor = executors.get(portKey);
        if (executor != null) {
            if (executor.isShutdown() || executor.isTerminated()) {
                executors.remove(portKey);
                executor = createExecutor(url);
                executors.put(portKey, executor);//相同port共享
            }
        }
        return executor;
    }

3.x

1.先调用createExecutorIfAbsent 创建线程池,注意这里是以port为维度的,也就是同一个dubbo服务共享一个线程池
2.createExecutor具体的创建,这里才调用那几种fixed,cached,eager等
3.用的时候判断有没有就行了,没有的话只能用shared的共享线程池

    public synchronized ExecutorService createExecutorIfAbsent(URL url) {
        Map<Integer, ExecutorService> executors = data.computeIfAbsent(getExecutorKey(url), k -> new ConcurrentHashMap<>());
        // Consumer's executor is sharing globally, key=Integer.MAX_VALUE. Provider's executor is sharing by protocol.
        Integer portKey = CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(SIDE_KEY)) ? Integer.MAX_VALUE : url.getPort();
        if (url.getParameter(THREAD_NAME_KEY) == null) {
            url = url.putAttribute(THREAD_NAME_KEY, "Dubbo-protocol-" + portKey);
        }
        URL finalUrl = url;
        ExecutorService executor = executors.computeIfAbsent(portKey, k -> createExecutor(finalUrl));
        // If executor has been shut down, create a new one
        if (executor.isShutdown() || executor.isTerminated()) {
            executors.remove(portKey);
            executor = createExecutor(url);//创建线程池
            executors.put(portKey, executor);
        }
        return executor;
    }

    /**
     * Return the executor key based on the type (internal or biz) of the current service.
     *
     * @param url
     * @return
     */
    private String getExecutorKey(URL url) {
        String executorKey = INTERNAL_EXECUTOR_SERVICE_COMPONENT_KEY;
        ServiceDescriptor serviceDescriptor = applicationModel.getInternalModule().getServiceRepository().lookupService(url.getServiceInterface());
        // if not found in internal service repository, then it's biz service defined by user.
        if (serviceDescriptor == null) {
            executorKey = EXECUTOR_SERVICE_COMPONENT_KEY;

        }
        return executorKey;
    }

    private ExecutorService createExecutor(URL url) {
        return (ExecutorService) extensionAccessor.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url);
    }

    @Override
    public ExecutorService getExecutor(URL url) {
        Map<Integer, ExecutorService> executors = data.get(getExecutorKey(url));

        /*
         * It's guaranteed that this method is called after {@link #createExecutorIfAbsent(URL)}, so data should already
         * have Executor instances generated and stored.
         */
        if (executors == null) {
            logger.warn("No available executors, this is not expected, framework should call createExecutorIfAbsent first " +
                "before coming to here.");
            return null;
        }

        // Consumer's executor is sharing globally, key=Integer.MAX_VALUE. Provider's executor is sharing by protocol.
        Integer portKey = CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(SIDE_KEY)) ? Integer.MAX_VALUE : url.getPort();
        ExecutorService executor = executors.get(portKey);
        if (executor != null && (executor.isShutdown() || executor.isTerminated())) {//如果已经shutdown 直接清除缓存
            executors.remove(portKey);
            // Does not re-create a shutdown executor, use SHARED_EXECUTOR for downgrade.
            executor = null;
            logger.info("Executor for " + url + " is shutdown.");
        }
        if (executor == null) {//如果没有使用默认的共享线程池,是一个cache类型的
            return frameworkExecutorRepository.getSharedExecutor();
        } else {
            return executor;
        }
    }

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions