-
Notifications
You must be signed in to change notification settings - Fork 0
feat:Optimize the event service #5
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: pr-6192-base
Are you sure you want to change the base?
Conversation
feat(event): optimize the event service
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
📋 Review Summary
This Pull Request introduces a new event service architecture to optimize event handling, addressing issue tronprotocol#6153. It involves significant refactoring of event-related logic, the addition of new services, and new unit tests. The changes aim to improve the flexibility and efficiency of event processing within the system.
🔍 General Feedback
- The new event service components (BlockEventCache, BlockEventGet, BlockEventLoad, EventService, HistoryEventService, RealtimeEventService, SolidEventService) are well-structured and introduce a more modular approach to event handling.
- Comprehensive unit tests have been added for the new event service components, which is a positive aspect for ensuring code quality and correctness.
- The use of versioning in the
EventPluginLoaderandEventServiceis noted. While it provides a mechanism for phased rollout, care must be taken to manage these version dependencies, especially within coreManagerlogic, to prevent increased complexity and potential for tightly coupled modules. - Several minor style and magic number issues were identified, which can be addressed to further improve code quality and maintainability.
|
|
||
| @Override | ||
| public String toString() { | ||
| return new StringBuilder().append("triggerName: ").append(getTriggerName()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🟢 The change in toString() methods improves readability and consistency.
| return new StringBuilder().append("triggerName: ").append(getTriggerName()) | |
| .append(", timestamp: ") |
|
|
||
| @Override | ||
| public String toString() { | ||
| return new StringBuilder().append("triggerName: ").append(getTriggerName()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🟢 The change in toString() methods improves readability and consistency.
| return new StringBuilder().append("triggerName: ").append(getTriggerName()) | |
| .append(", timestamp: ") |
| .getLatestBlockHeaderHash() | ||
| .equals(binaryTree.getValue().peekLast().getParentHash())) { | ||
| reOrgContractTrigger(); | ||
| if (EventPluginLoader.getInstance().getVersion() == 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🟠 High: This conditional logic introduces a dependency on the EventPluginLoader version within a core Manager class. While it might be for a phased rollout, it adds complexity and potential for unexpected behavior if versioning is not robustly managed throughout the system. Consider if this logic can be abstracted or configured more cleanly to avoid direct version checks in core logic.
| .getLatestBlockHeaderHash() | |
| .equals(binaryTree.getValue().peekLast().getParentHash())) { | |
| reOrgContractTrigger(); | |
| if (EventPluginLoader.getInstance().getVersion() == 0) { | |
| if (EventPluginLoader.getInstance().getVersion() == 0) { | |
| reOrgContractTrigger(); | |
| } |
| } | ||
|
|
||
| void blockTrigger(final BlockCapsule block, long oldSolid, long newSolid) { | ||
| // post block and logs for jsonrpc | ||
| try { | ||
| if (CommonParameter.getInstance().isJsonRpcHttpFullNodeEnable()) { | ||
| postBlockFilter(block, false); | ||
| postLogsFilter(block, false, false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🟠 High: Similar to the above, embedding version-dependent logic directly in blockTrigger can make the event processing flow harder to reason about and maintain. The current implementation essentially bypasses the old trigger mechanism for version != 0 and just updates lastUsedSolidityNum. This is a significant change in behavior based on a version flag.
| } | |
| void blockTrigger(final BlockCapsule block, long oldSolid, long newSolid) { | |
| // post block and logs for jsonrpc | |
| try { | |
| if (CommonParameter.getInstance().isJsonRpcHttpFullNodeEnable()) { | |
| postBlockFilter(block, false); | |
| postLogsFilter(block, false, false); | |
| // post block and logs for jsonrpc | |
| try { | |
| if (CommonParameter.getInstance().isJsonRpcHttpFullNodeEnable()) { | |
| postBlockFilter(block, false); | |
| postLogsFilter(block, false, false); | |
| } | |
| if (CommonParameter.getInstance().isJsonRpcHttpSolidityNodeEnable()) { | |
| postSolidityFilter(oldSolid, newSolid); | |
| } | |
| if (EventPluginLoader.getInstance().getVersion() != 0) { | |
| lastUsedSolidityNum = newSolid; | |
| return; | |
| } |
| postLogsFilter(block, false, false); | ||
| } | ||
|
|
||
| if (CommonParameter.getInstance().isJsonRpcHttpSolidityNodeEnable()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🟡 Medium: The removal of oldSolidNum as a parameter suggests it's no longer needed for postSolidityTrigger. This is an API change. Ensure that oldSolidNum is genuinely not required and that all callers are updated.
| if (CommonParameter.getInstance().isJsonRpcHttpSolidityNodeEnable()) { | |
| postSolidityTrigger(newSolid); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🟢 Low: No newline at end of file.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🟢 Low: No newline at end of file.
| for (BlockCapsule blockCapsule : capsuleList) { | ||
| postBlockFilter(blockCapsule, true); | ||
| postLogsFilter(blockCapsule, true, false); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🟡 Medium: Consistent with the previous change where oldSolidNum was removed. The logic related to postSolidityFilter has been moved to blockTrigger.
| } | |
| private void postSolidityTrigger(final long latestSolidifiedBlockNumber) { |
| logger.info("Too many filters, logs filter lost: {}.", blockNumber); | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🟡 Medium: This is a direct consequence of moving these calls to blockTrigger.
| } | |
| void postBlockTrigger(final BlockCapsule blockCapsule) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🟠 High: The introduction of version-dependent conditional logic within core Manager methods (switchFork, blockTrigger, processTransaction) is a significant change. Specifically, the logic around lines 1518-1519 (new lines in the diff) where postContractTrigger is conditionally called based on EventPluginLoader.getInstance().getVersion() == 0 adds complexity and tightly couples the core blockchain logic to the event plugin's version. This pattern of spreading version checks throughout critical modules should be carefully reviewed for potential issues during future upgrades, maintainability, and clarity of event processing flow. Consider abstracting this versioning logic to a dedicated strategy or configuration to improve modularity.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
📋 Review Summary
This PR introduces a new version of the event subscription service, which is a significant feature. The overall structure and lifecycle management of the new services are well-designed. However, there are critical concurrency issues in the BlockEventCache class that need to be addressed to prevent race conditions and data corruption.
🔍 General Feedback
- The use of a configuration version to switch between the old and new event systems is a good approach for a staged rollout.
- The separation of concerns into different services (
HistoryEventService,RealtimeEventService,SolidEventService,BlockEventLoad) is clean. - Unit tests have been added for the new components, which is great.
🔴 Critical Issues
BlockEventCache.javais not thread-safe: This is the most critical issue. The class uses static maps and is accessed by multiple threads without proper synchronization. This will lead to race conditions, especially betweenBlockEventLoadandSolidEventService. I've left several inline comments with specific suggestions to fix these problems by addingsynchronizedkeywords and using thread-safe collections correctly. All public static methods inBlockEventCacheshould be synchronized.
| solidId = blockId; | ||
| List<BlockEvent> list = new ArrayList<>(); | ||
| list.add(head); | ||
| numMap.put(blockId.getNum(), list); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🔴 The numMap field is a ConcurrentHashMap, but the value is List<BlockEvent>, which is an ArrayList. ArrayList is not thread-safe.
In the add method, the code does list.add(blockEvent). If two threads are adding a BlockEvent for the same block number, they will get a reference to the same ArrayList and modify it concurrently. This can lead to race conditions, ConcurrentModificationException, or lost data.
| numMap.put(blockId.getNum(), list); | |
| private static Map<Long, List<BlockEvent>> numMap = new ConcurrentHashMap<>(); |
| + blockEvent.getBlockId().getString() + ", " | ||
| + blockEvent.getParentId().getString()); | ||
| } | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🔴 The add method is not thread-safe. It performs a sequence of check-then-act operations on shared maps (blockEventMap and numMap) without proper synchronization. Multiple threads calling this method concurrently can lead to race conditions.
For example:
- Thread A checks if a parent exists.
- Thread B removes the parent.
- Thread A proceeds to add the block event, leading to an inconsistent state.
The entire add method should be synchronized to ensure atomicity.
| public static synchronized void add(BlockEvent blockEvent) throws EventException { |
| solidId.getString(), solidNum, numMap.size(), blockEventMap.size()); | ||
| numMap.forEach((k, v) -> { | ||
| if (k < solidId.getNum()) { | ||
| v.forEach(value -> blockEventMap.remove(value.getBlockId())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🔴 Removing from a ConcurrentHashMap while iterating over it with forEach has undefined behavior if the map is modified during the iteration. The lambda passed to forEach calls numMap.remove(k), which modifies the map during iteration. This can lead to unpredictable behavior, including missed removals or exceptions.
A safer approach is to collect the keys to be removed in a separate list and then iterate over that list to remove the entries from the map.
| v.forEach(value -> blockEventMap.remove(value.getBlockId())); | |
| public static void remove(BlockCapsule.BlockId solidId) { | |
| logger.info("Remove solidId {}, solidNum {}, {}, {}", | |
| solidId.getString(), solidNum, numMap.size(), blockEventMap.size()); | |
| List<Long> keysToRemove = new ArrayList<>(); | |
| numMap.forEach((k, v) -> { | |
| if (k < solidId.getNum()) { | |
| v.forEach(value -> blockEventMap.remove(value.getBlockId())); | |
| keysToRemove.add(k); | |
| } | |
| }); | |
| keysToRemove.forEach(numMap::remove); | |
| solidNum = solidId.getNum(); | |
| } |
| List<BlockEvent> blockEvents = new ArrayList<>(); | ||
| BlockCapsule.BlockId tmp = solidId; | ||
| while (tmp.getNum() > solidNum) { | ||
| BlockEvent blockEvent = blockEventMap.get(tmp); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🔴 The getSolidBlockEvents method is not synchronized. It reads from blockEventMap and the solidNum field, which can be modified concurrently by the add and remove methods in other threads. This can lead to an inconsistent view of the data, potentially returning a partial or incorrect list of block events.
To ensure thread safety, this method should also be synchronized.
| BlockEvent blockEvent = blockEventMap.get(tmp); | |
| public static synchronized List<BlockEvent> getSolidBlockEvents(BlockCapsule.BlockId solidId) { |
| isRunning = false; | ||
| } | ||
|
|
||
| private void syncEvent() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🔴 Starting a new Thread manually is generally discouraged in a Spring application. It's better to use a managed thread pool (e.g., TaskExecutor) to have more control over the thread's lifecycle and resource management.
Consider using a Spring-managed TaskExecutor to run the syncEvent task.
| private void syncEvent() { | |
| new Thread(() -> syncEvent()).start(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This PR introduces a significant and well-structured optimization for the event service, gated behind a new configuration version. The changes effectively separate the new event handling logic from the old, allowing for a controlled rollout. The use of dedicated services for different event processing stages (history, realtime, solid) is a good design choice.
🔍 General Feedback
- The overall architecture of the new event service is modular and seems to address the goal of optimizing event handling.
- Unit tests have been added for the new components, which is excellent.
- The use of single-threaded executors in the new services simplifies concurrency management within those components.
- The PR carefully manages backward compatibility by keying the new functionality on a version number in the configuration.
| logger.info("History event service start."); | ||
| } | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| logger.info("History event service start."); | |
| } | |
| private final ExecutorService executor = ExecutorServiceManager | |
| .newSingleThreadExecutor("history-event"); | |
| ... | |
| public void init() { | |
| if (instance.getStartSyncBlockNum() <= 0) { | |
| initEventService(manager.getChainBaseManager().getHeadBlockId()); | |
| return; | |
| } | |
| isRunning = true; | |
| executor.submit(() -> syncEvent()); | |
| logger.info("History event service start."); | |
| } | |
| public void close() { | |
| isRunning = false; | |
| executor.shutdown(); | |
| } |
| l2.add(tmp); | ||
| tmp = BlockEventCache.getBlockEvent(tmp.getParentId()); | ||
| } | ||
| blockEvent = blockEventGet.getBlockEvent(tmpNum); | ||
| l1.add(blockEvent); | ||
| } | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider adding a null check for tmp to handle this edge case gracefully.
| l2.add(tmp); | |
| tmp = BlockEventCache.getBlockEvent(tmp.getParentId()); | |
| } | |
| blockEvent = blockEventGet.getBlockEvent(tmpNum); | |
| l1.add(blockEvent); | |
| } | |
| while (!blockEvent.getParentId().equals(tmp.getBlockId())) { | |
| tmpNum--; | |
| if (tmpNum == tmp.getBlockId().getNum()) { | |
| l2.add(tmp); | |
| tmp = BlockEventCache.getBlockEvent(tmp.getParentId()); | |
| if (tmp == null) { | |
| // This indicates a deep reorg beyond the cache's depth. | |
| // The service should probably re-initialize or handle this state. | |
| logger.error("Deep reorg detected. Block {}'s parent is not in cache.", | |
| l2.get(l2.size()-1).getBlockId().getString()); | |
| // Depending on desired behavior, you might want to clear the cache and | |
| // re-initialize from the last solid block, or simply break here. | |
| break; | |
| } | |
| } | |
| blockEvent = blockEventGet.getBlockEvent(tmpNum); | |
| l1.add(blockEvent); | |
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For example, the add method modifies numMap, blockEventMap, head, and solidId without a common lock. If add and remove were to be called from different threads, this could lead to an inconsistent state in the cache.
The current implementation seems to rely on callers (BlockEventLoad and SolidEventService) to provide synchronization. This implicit requirement is fragile. Consider making the synchronization explicit within BlockEventCache by using synchronized on the methods or a dedicated lock object to ensure atomicity of operations. This would make the class more robust and thread-safe on its own.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
📋 Review Summary
This PR introduces a new, versioned event subscription service (EventService) intended to optimize event processing by decoupling it from the main block processing path. While the architectural intent of offloading event processing is sound, the implementation introduces critical performance and liveness risks due to incorrect synchronization and potential data loss in the new asynchronous queue.
🔍 General Feedback
- Locking & Liveness (Critical): The
BlockEventLoadservice synchronizes on the globalManagerinstance while performing heavy database and parsing operations. This will likely cause severe contention, blocking core node functions like consensus and RPC. - Data Reliability: The
RealtimeEventServiceuses a bounded queue that silently drops events when full. For a system meant to provide reliable event logs, this is a significant regression compared to the previous synchronous model. - Thread Management:
HistoryEventServicespawns an unmanaged thread, which deviates from the project's pattern of usingExecutorServiceand complicates graceful shutdown. - Static State:
BlockEventCacherelies on static mutable state, making it difficult to test and prone to race conditions in a concurrent environment.
PR Review Stability Analysis
1. PR Change Type Classification
Mixed (Modification + Addition)
- Modification:
Managerlogic for triggering events is altered based on version config. - Addition: New
EventServiceecosystem (Realtime,History,Solid,BlockEventLoad,BlockEventCache).
2. Lifecycle & Resource Safety
- Issue:
HistoryEventServicecreates an unmanaged thread (new Thread(...).start()) which is not tracked by anExecutorService. - Check:
ApplicationImplcorrectly callseventService.init()andclose().EventServicedelegates to sub-services.
3. Executor / Thread Pool Shutdown Semantics
- Checked: Most services (
RealtimeEventService,SolidEventService,BlockEventLoad) useExecutorServiceManagerand correctly callshutdown()in theirclose()methods.
4. Behavioral Compatibility & Implicit Invariants
- Risk: The new system moves from synchronous event posting (in
Manager) to an asynchronous polling model (BlockEventLoadpolling every 100ms). This introduces latency and the possibility of queue overflow, changing the implicit guarantee that "processed block = triggered event".
5. Business Logic Correctness
- Risk: The re-organization logic in
BlockEventLoadassumes strict parent-child continuity in the cache. If the cache is incomplete or a deep reorg occurs,BlockEventCache.getBlockEvent(tmp.getParentId())may throw aNullPointerException.
6. Concurrency & Execution Interleaving Risks
- CRITICAL:
BlockEventLoad.load()synchronizes on themanagerinstance.Managerusessynchronized(this)for many critical paths. Holding this lock during heavyBlockEventGetoperations (DB I/O, ABI parsing) is a major liveness risk. - Risk:
BlockEventCacheuses static mutable state (head,solidId) accessed by multiple threads (BlockEventLoad,SolidEventService). The compound check-then-act operations are not atomic.
7. Numeric Precision & Determinism
- Checked: No significant numeric precision risks found.
longis used for block numbers.
8. System-Level Risk Summary
- Performance/Liveness Risk: High. The locking strategy in
BlockEventLoadthreatens to stall the entire node. - Data Integrity Risk: High. The
RealtimeEventServicebounded queue drops events on overflow, leading to incomplete data for subscribers. - Concurrency Risk: Medium. Static shared state in
BlockEventCacheand unmanaged threads.
| if (cacheHeadNum >= tmpNum) { | ||
| return; | ||
| } | ||
| synchronized (manager) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Manager class often uses synchronized(this) for critical consensus and state operations. Holding this lock for the duration of event processing (which involves iterating transactions, decoding ABI, and multiple DB reads) will block the main node thread, potentially causing:
- Missed block production slots.
- Delayed block processing/verification.
- RPC timeouts.
Action Required:
Do not synchronize on manager. If you need a consistent view of the chain head, retrieve the necessary atomic values (e.g., head block number) and then perform the getBlockEvent processing without holding the lock. If Manager methods are not thread-safe for read access, exposes thread-safe read-only views or snapshots.
| logger.warn("Add event failed, blockId {}.", event.getBlockEvent().getBlockId().getString()); | ||
| return; | ||
| } | ||
| queue.offer(event); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the consumer (work() method) cannot keep up with the producer (block processing), events will be discarded, and the external event subscribers will miss triggers. This breaks the guarantee of reliable event delivery.
Action Required:
- Consider using a blocking
put(with backpressure) if event delivery is critical, though this risks blocking the producer. - Alternatively, use an unbounded queue (monitor memory) or a persistent queue.
- If dropping is intended, it must be explicitly monitored via metrics/logs (which is partially done here), but for a core event service, dropping is usually unacceptable.
|
|
||
| isRunning = true; | ||
|
|
||
| new Thread(() -> syncEvent()).start(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the application shuts down or if HistoryEventService is closed, this thread relies on isRunning flag and sleep wake-up. While isRunning is checked, unmanaged threads can leak or complicate graceful shutdown (e.g., no awaitTermination).
Suggestion:
Use a ExecutorService (e.g., newSingleThreadExecutor) to manage this task, similar to how RealtimeEventService is implemented. This ensures consistent thread management and shutdown behavior.
| private static volatile long solidNum; | ||
|
|
||
| @Getter | ||
| private static volatile BlockEvent head; | ||
|
|
||
| @Getter | ||
| private static volatile BlockCapsule.BlockId solidId; | ||
|
|
||
| private static Map<BlockCapsule.BlockId, BlockEvent> blockEventMap = new ConcurrentHashMap<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
More importantly, while ConcurrentHashMap is thread-safe for individual operations, the compound operations in add and remove (checking numMap, updating blockEventMap, updating head/solidId) are not atomic.
For example:
addchecksnum > head...and updateshead.- Another thread could intervene.
While BlockEventLoad might be the single writer effectively, SolidEventService calls remove concurrently.
Suggestion:
Make BlockEventCache an instance bean (Singleton) managed by Spring/ServiceContainer rather than a static utility class. Protect shared state with explicit locking or use atomic references for the head/solid pointers if concurrent access is expected.
| tmpNum--; | ||
| if (tmpNum == tmp.getBlockId().getNum()) { | ||
| l2.add(tmp); | ||
| tmp = BlockEventCache.getBlockEvent(tmp.getParentId()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BlockEventCache.getBlockEvent(tmp.getParentId()) returns null if the parent is not in the cache. If tmp refers to the oldest block in the cache, tmp.getParentId() might not be present.
Then tmp = null, and the next iteration's tmp.getBlockId() (implicit or explicit) or blockEvent.getParentId().equals(...) check will throw NPE.
Action Required:
Add a null check for tmp after retrieval. If tmp is null, deciding whether to break the loop or fetch from DB is necessary.
What does this PR do?
Optimize the event service, refer to issue tronprotocol#6153
Why are these changes required?
This PR has been tested by:
Follow up
Extra details
Why are these changes required?
This PR has been tested by:
Follow up
Extra details