Skip to content

Kafka3.x 元数据MetadataCache #12

Description

@2pc

MetadataCache两种实现

由于支持raft模式和zk模式,所以有两种实现KRaftMetadataCache,ZkMetadataCache

注意这两内部的对象比较特殊
1.ZkMetadataCache的内部叫metadataSnapshot
2.KRaftMetadataCache内部叫_currentImage

ZkMetadataCache更新流程

其更新流程zk模式很明显在KafkaApis内部显示处理

//KafkaApis.handleUpdateMetadataRequest-->ReplicaManager.maybeUpdateMetadataCache-->zkMetadataCache.updateMetadata
case ApiKeys.UPDATE_METADATA => handleUpdateMetadataRequest(request, requestLocal)

老版本zkMetadataCache.updateMetadata调用完直接返回deletedPartitions

KRaftMetadataCache更新

raft模式更新,BrokerServer启动的时候会向raftManager注册一个BrokerMetadataListener

metadataListener = new BrokerMetadataListener(config.nodeId,
                                              time,
                                              threadNamePrefix,
                                              config.metadataSnapshotMaxNewRecordBytes,
                                              metadataSnapshotter)
// Register a listener with the Raft layer to receive metadata event notifications
raftManager.register(metadataListener)

而内部是注册到KafkaRaftClient

//KafkaRaftClient.scala
public void register(Listener<T> listener) {
    pendingRegistrations.add(Registration.register(listener));
    wakeup();
}

poll loop调用逻辑
KafkaRaftManager.run()-->KafkaRaftManager.doWork()-->KafkaRaftClient.poll

    /**
     * Poll for new events. This allows the client to handle inbound
     * requests and send any needed outbound requests.
     */
    public void poll() {
        pollListeners();//BrokerMetadataListener被调用

        long currentTimeMs = time.milliseconds();
        if (maybeCompleteShutdown(currentTimeMs)) {
            return;
        }

        long pollStateTimeoutMs = pollCurrentState(currentTimeMs);
        long cleaningTimeoutMs = snapshotCleaner.maybeClean(currentTimeMs);
        long pollTimeoutMs = Math.min(pollStateTimeoutMs, cleaningTimeoutMs);

        kafkaRaftMetrics.updatePollStart(currentTimeMs);

        RaftMessage message = messageQueue.poll(pollTimeoutMs);//处理raft请求,ControllerApis依据不同ApiKeys做不同的处理

        currentTimeMs = time.milliseconds();
        kafkaRaftMetrics.updatePollEnd(currentTimeMs);

        if (message != null) {
            handleInboundMessage(message, currentTimeMs);//req处理
        }
    }

BrokerMetadataListener

BrokerMetadataListener继承自RaftClient.Listener接口,主要两个方法

void handleCommit(BatchReader<T> reader);
void handleSnapshot(SnapshotReader<T> reader);

这两方法触发都会更新一遍KRaftMetadataCache内部叫_currentImage,通过一个MetadataPublisher发布事件

  /**
   * Handle new metadata records.
   */
  override def handleCommit(reader: BatchReader[ApiMessageAndVersion]): Unit =
    eventQueue.append(new HandleCommitsEvent(reader))

  class HandleCommitsEvent(reader: BatchReader[ApiMessageAndVersion])
      extends EventQueue.FailureLoggingEvent(log) {
    override def run(): Unit = {
      val results = try {
        val loadResults = loadBatches(_delta, reader, None, None, None)
        if (isDebugEnabled) {
          debug(s"Loaded new commits: ${loadResults}")
        }
        loadResults
      } finally {
        reader.close()
      }
      _publisher.foreach(publish)

      snapshotter.foreach { snapshotter =>
        _bytesSinceLastSnapshot = _bytesSinceLastSnapshot + results.numBytes
        if (shouldSnapshot()) {
          if (snapshotter.maybeStartSnapshot(_highestTimestamp, _delta.apply())) {
            _bytesSinceLastSnapshot = 0L
          }
        }
      }
    }
  }

直接看publish方法

  private def publish(publisher: MetadataPublisher): Unit = {
    val delta = _delta
    _image = _delta.apply()
    _delta = new MetadataDelta(_image)
    publisher.publish(delta, _image)
  }

RaftMetadataCace更新

给BrokerMetadataPublisher更新metadataCache的Image(也就是更新_currentImage)

override def publish(delta: MetadataDelta, newImage: MetadataImage): Unit = {
    metadataCache.setImage(newImage)
    }

Metadata

Metadata

Assignees

No one assigned

    Labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions