MetadataCache两种实现
由于支持raft模式和zk模式,所以有两种实现KRaftMetadataCache,ZkMetadataCache
注意这两内部的对象比较特殊
1.ZkMetadataCache的内部叫metadataSnapshot
2.KRaftMetadataCache内部叫_currentImage
ZkMetadataCache更新流程
其更新流程zk模式很明显在KafkaApis内部显示处理
//KafkaApis.handleUpdateMetadataRequest-->ReplicaManager.maybeUpdateMetadataCache-->zkMetadataCache.updateMetadata
case ApiKeys.UPDATE_METADATA => handleUpdateMetadataRequest(request, requestLocal)
老版本zkMetadataCache.updateMetadata调用完直接返回deletedPartitions
KRaftMetadataCache更新
raft模式更新,BrokerServer启动的时候会向raftManager注册一个BrokerMetadataListener
metadataListener = new BrokerMetadataListener(config.nodeId,
time,
threadNamePrefix,
config.metadataSnapshotMaxNewRecordBytes,
metadataSnapshotter)
// Register a listener with the Raft layer to receive metadata event notifications
raftManager.register(metadataListener)
而内部是注册到KafkaRaftClient
//KafkaRaftClient.scala
public void register(Listener<T> listener) {
pendingRegistrations.add(Registration.register(listener));
wakeup();
}
poll loop调用逻辑
KafkaRaftManager.run()-->KafkaRaftManager.doWork()-->KafkaRaftClient.poll
/**
* Poll for new events. This allows the client to handle inbound
* requests and send any needed outbound requests.
*/
public void poll() {
pollListeners();//BrokerMetadataListener被调用
long currentTimeMs = time.milliseconds();
if (maybeCompleteShutdown(currentTimeMs)) {
return;
}
long pollStateTimeoutMs = pollCurrentState(currentTimeMs);
long cleaningTimeoutMs = snapshotCleaner.maybeClean(currentTimeMs);
long pollTimeoutMs = Math.min(pollStateTimeoutMs, cleaningTimeoutMs);
kafkaRaftMetrics.updatePollStart(currentTimeMs);
RaftMessage message = messageQueue.poll(pollTimeoutMs);//处理raft请求,ControllerApis依据不同ApiKeys做不同的处理
currentTimeMs = time.milliseconds();
kafkaRaftMetrics.updatePollEnd(currentTimeMs);
if (message != null) {
handleInboundMessage(message, currentTimeMs);//req处理
}
}
BrokerMetadataListener
BrokerMetadataListener继承自RaftClient.Listener接口,主要两个方法
void handleCommit(BatchReader<T> reader);
void handleSnapshot(SnapshotReader<T> reader);
这两方法触发都会更新一遍KRaftMetadataCache内部叫_currentImage,通过一个MetadataPublisher发布事件
/**
* Handle new metadata records.
*/
override def handleCommit(reader: BatchReader[ApiMessageAndVersion]): Unit =
eventQueue.append(new HandleCommitsEvent(reader))
class HandleCommitsEvent(reader: BatchReader[ApiMessageAndVersion])
extends EventQueue.FailureLoggingEvent(log) {
override def run(): Unit = {
val results = try {
val loadResults = loadBatches(_delta, reader, None, None, None)
if (isDebugEnabled) {
debug(s"Loaded new commits: ${loadResults}")
}
loadResults
} finally {
reader.close()
}
_publisher.foreach(publish)
snapshotter.foreach { snapshotter =>
_bytesSinceLastSnapshot = _bytesSinceLastSnapshot + results.numBytes
if (shouldSnapshot()) {
if (snapshotter.maybeStartSnapshot(_highestTimestamp, _delta.apply())) {
_bytesSinceLastSnapshot = 0L
}
}
}
}
}
直接看publish方法
private def publish(publisher: MetadataPublisher): Unit = {
val delta = _delta
_image = _delta.apply()
_delta = new MetadataDelta(_image)
publisher.publish(delta, _image)
}
RaftMetadataCace更新
给BrokerMetadataPublisher更新metadataCache的Image(也就是更新_currentImage)
override def publish(delta: MetadataDelta, newImage: MetadataImage): Unit = {
metadataCache.setImage(newImage)
}
MetadataCache两种实现
由于支持raft模式和zk模式,所以有两种实现KRaftMetadataCache,ZkMetadataCache
注意这两内部的对象比较特殊
1.ZkMetadataCache的内部叫metadataSnapshot
2.KRaftMetadataCache内部叫_currentImage
ZkMetadataCache更新流程
其更新流程zk模式很明显在KafkaApis内部显示处理
老版本zkMetadataCache.updateMetadata调用完直接返回deletedPartitions
KRaftMetadataCache更新
raft模式更新,BrokerServer启动的时候会向raftManager注册一个BrokerMetadataListener
而内部是注册到KafkaRaftClient
poll loop调用逻辑
KafkaRaftManager.run()-->KafkaRaftManager.doWork()-->KafkaRaftClient.poll
BrokerMetadataListener
BrokerMetadataListener继承自RaftClient.Listener接口,主要两个方法
这两方法触发都会更新一遍KRaftMetadataCache内部叫_currentImage,通过一个MetadataPublisher发布事件
直接看publish方法
RaftMetadataCace更新
给BrokerMetadataPublisher更新metadataCache的Image(也就是更新_currentImage)