Skip to content

Conversation

@0xbigapple
Copy link
Owner

What does this PR do?
Optimize the event service, refer to issue tronprotocol#6153

Why are these changes required?

This PR has been tested by:

  • Unit Tests
  • Manual Testing

Follow up

Extra details

Why are these changes required?

This PR has been tested by:

  • Unit Tests
  • Manual Testing

Follow up

Extra details

Copy link

@github-actions github-actions bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

📋 Review Summary

This Pull Request introduces a new event service architecture to optimize event handling, addressing issue tronprotocol#6153. It involves significant refactoring of event-related logic, the addition of new services, and new unit tests. The changes aim to improve the flexibility and efficiency of event processing within the system.

🔍 General Feedback

  • The new event service components (BlockEventCache, BlockEventGet, BlockEventLoad, EventService, HistoryEventService, RealtimeEventService, SolidEventService) are well-structured and introduce a more modular approach to event handling.
  • Comprehensive unit tests have been added for the new event service components, which is a positive aspect for ensuring code quality and correctness.
  • The use of versioning in the EventPluginLoader and EventService is noted. While it provides a mechanism for phased rollout, care must be taken to manage these version dependencies, especially within core Manager logic, to prevent increased complexity and potential for tightly coupled modules.
  • Several minor style and magic number issues were identified, which can be addressed to further improve code quality and maintainability.


@Override
public String toString() {
return new StringBuilder().append("triggerName: ").append(getTriggerName())

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟢 The change in toString() methods improves readability and consistency.

Suggested change
return new StringBuilder().append("triggerName: ").append(getTriggerName())
.append(", timestamp: ")


@Override
public String toString() {
return new StringBuilder().append("triggerName: ").append(getTriggerName())

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟢 The change in toString() methods improves readability and consistency.

Suggested change
return new StringBuilder().append("triggerName: ").append(getTriggerName())
.append(", timestamp: ")

Comment on lines 1113 to +1115
.getLatestBlockHeaderHash()
.equals(binaryTree.getValue().peekLast().getParentHash())) {
reOrgContractTrigger();
if (EventPluginLoader.getInstance().getVersion() == 0) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟠 High: This conditional logic introduces a dependency on the EventPluginLoader version within a core Manager class. While it might be for a phased rollout, it adds complexity and potential for unexpected behavior if versioning is not robustly managed throughout the system. Consider if this logic can be abstracted or configured more cleanly to avoid direct version checks in core logic.

Suggested change
.getLatestBlockHeaderHash()
.equals(binaryTree.getValue().peekLast().getParentHash())) {
reOrgContractTrigger();
if (EventPluginLoader.getInstance().getVersion() == 0) {
if (EventPluginLoader.getInstance().getVersion() == 0) {
reOrgContractTrigger();
}

Comment on lines 1376 to +1383
}

void blockTrigger(final BlockCapsule block, long oldSolid, long newSolid) {
// post block and logs for jsonrpc
try {
if (CommonParameter.getInstance().isJsonRpcHttpFullNodeEnable()) {
postBlockFilter(block, false);
postLogsFilter(block, false, false);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟠 High: Similar to the above, embedding version-dependent logic directly in blockTrigger can make the event processing flow harder to reason about and maintain. The current implementation essentially bypasses the old trigger mechanism for version != 0 and just updates lastUsedSolidityNum. This is a significant change in behavior based on a version flag.

Suggested change
}
void blockTrigger(final BlockCapsule block, long oldSolid, long newSolid) {
// post block and logs for jsonrpc
try {
if (CommonParameter.getInstance().isJsonRpcHttpFullNodeEnable()) {
postBlockFilter(block, false);
postLogsFilter(block, false, false);
// post block and logs for jsonrpc
try {
if (CommonParameter.getInstance().isJsonRpcHttpFullNodeEnable()) {
postBlockFilter(block, false);
postLogsFilter(block, false, false);
}
if (CommonParameter.getInstance().isJsonRpcHttpSolidityNodeEnable()) {
postSolidityFilter(oldSolid, newSolid);
}
if (EventPluginLoader.getInstance().getVersion() != 0) {
lastUsedSolidityNum = newSolid;
return;
}

postLogsFilter(block, false, false);
}

if (CommonParameter.getInstance().isJsonRpcHttpSolidityNodeEnable()) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Medium: The removal of oldSolidNum as a parameter suggests it's no longer needed for postSolidityTrigger. This is an API change. Ensure that oldSolidNum is genuinely not required and that all callers are updated.

Suggested change
if (CommonParameter.getInstance().isJsonRpcHttpSolidityNodeEnable()) {
postSolidityTrigger(newSolid);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟢 Low: No newline at end of file.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟢 Low: No newline at end of file.

for (BlockCapsule blockCapsule : capsuleList) {
postBlockFilter(blockCapsule, true);
postLogsFilter(blockCapsule, true, false);
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Medium: Consistent with the previous change where oldSolidNum was removed. The logic related to postSolidityFilter has been moved to blockTrigger.

Suggested change
}
private void postSolidityTrigger(final long latestSolidifiedBlockNumber) {

logger.info("Too many filters, logs filter lost: {}.", blockNumber);
}
}
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Medium: This is a direct consequence of moving these calls to blockTrigger.

Suggested change
}
void postBlockTrigger(final BlockCapsule blockCapsule) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟠 High: The introduction of version-dependent conditional logic within core Manager methods (switchFork, blockTrigger, processTransaction) is a significant change. Specifically, the logic around lines 1518-1519 (new lines in the diff) where postContractTrigger is conditionally called based on EventPluginLoader.getInstance().getVersion() == 0 adds complexity and tightly couples the core blockchain logic to the event plugin's version. This pattern of spreading version checks throughout critical modules should be carefully reviewed for potential issues during future upgrades, maintainability, and clarity of event processing flow. Consider abstracting this versioning logic to a dedicated strategy or configuration to improve modularity.

@0xbigapple 0xbigapple closed this Jan 23, 2026
@0xbigapple 0xbigapple reopened this Jan 23, 2026
Copy link

@github-actions github-actions bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

📋 Review Summary

This PR introduces a new version of the event subscription service, which is a significant feature. The overall structure and lifecycle management of the new services are well-designed. However, there are critical concurrency issues in the BlockEventCache class that need to be addressed to prevent race conditions and data corruption.

🔍 General Feedback

  • The use of a configuration version to switch between the old and new event systems is a good approach for a staged rollout.
  • The separation of concerns into different services (HistoryEventService, RealtimeEventService, SolidEventService, BlockEventLoad) is clean.
  • Unit tests have been added for the new components, which is great.

🔴 Critical Issues

  • BlockEventCache.java is not thread-safe: This is the most critical issue. The class uses static maps and is accessed by multiple threads without proper synchronization. This will lead to race conditions, especially between BlockEventLoad and SolidEventService. I've left several inline comments with specific suggestions to fix these problems by adding synchronized keywords and using thread-safe collections correctly. All public static methods in BlockEventCache should be synchronized.

solidId = blockId;
List<BlockEvent> list = new ArrayList<>();
list.add(head);
numMap.put(blockId.getNum(), list);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 The numMap field is a ConcurrentHashMap, but the value is List<BlockEvent>, which is an ArrayList. ArrayList is not thread-safe.

In the add method, the code does list.add(blockEvent). If two threads are adding a BlockEvent for the same block number, they will get a reference to the same ArrayList and modify it concurrently. This can lead to race conditions, ConcurrentModificationException, or lost data.

Suggested change
numMap.put(blockId.getNum(), list);
private static Map<Long, List<BlockEvent>> numMap = new ConcurrentHashMap<>();

+ blockEvent.getBlockId().getString() + ", "
+ blockEvent.getParentId().getString());
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 The add method is not thread-safe. It performs a sequence of check-then-act operations on shared maps (blockEventMap and numMap) without proper synchronization. Multiple threads calling this method concurrently can lead to race conditions.

For example:

  1. Thread A checks if a parent exists.
  2. Thread B removes the parent.
  3. Thread A proceeds to add the block event, leading to an inconsistent state.

The entire add method should be synchronized to ensure atomicity.

Suggested change
public static synchronized void add(BlockEvent blockEvent) throws EventException {

solidId.getString(), solidNum, numMap.size(), blockEventMap.size());
numMap.forEach((k, v) -> {
if (k < solidId.getNum()) {
v.forEach(value -> blockEventMap.remove(value.getBlockId()));

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 Removing from a ConcurrentHashMap while iterating over it with forEach has undefined behavior if the map is modified during the iteration. The lambda passed to forEach calls numMap.remove(k), which modifies the map during iteration. This can lead to unpredictable behavior, including missed removals or exceptions.

A safer approach is to collect the keys to be removed in a separate list and then iterate over that list to remove the entries from the map.

Suggested change
v.forEach(value -> blockEventMap.remove(value.getBlockId()));
public static void remove(BlockCapsule.BlockId solidId) {
logger.info("Remove solidId {}, solidNum {}, {}, {}",
solidId.getString(), solidNum, numMap.size(), blockEventMap.size());
List<Long> keysToRemove = new ArrayList<>();
numMap.forEach((k, v) -> {
if (k < solidId.getNum()) {
v.forEach(value -> blockEventMap.remove(value.getBlockId()));
keysToRemove.add(k);
}
});
keysToRemove.forEach(numMap::remove);
solidNum = solidId.getNum();
}

List<BlockEvent> blockEvents = new ArrayList<>();
BlockCapsule.BlockId tmp = solidId;
while (tmp.getNum() > solidNum) {
BlockEvent blockEvent = blockEventMap.get(tmp);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 The getSolidBlockEvents method is not synchronized. It reads from blockEventMap and the solidNum field, which can be modified concurrently by the add and remove methods in other threads. This can lead to an inconsistent view of the data, potentially returning a partial or incorrect list of block events.

To ensure thread safety, this method should also be synchronized.

Suggested change
BlockEvent blockEvent = blockEventMap.get(tmp);
public static synchronized List<BlockEvent> getSolidBlockEvents(BlockCapsule.BlockId solidId) {

isRunning = false;
}

private void syncEvent() {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 Starting a new Thread manually is generally discouraged in a Spring application. It's better to use a managed thread pool (e.g., TaskExecutor) to have more control over the thread's lifecycle and resource management.

Consider using a Spring-managed TaskExecutor to run the syncEvent task.

Suggested change
private void syncEvent() {
new Thread(() -> syncEvent()).start();

@0xbigapple 0xbigapple closed this Jan 23, 2026
@0xbigapple 0xbigapple reopened this Jan 23, 2026
Copy link

@github-actions github-actions bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

## 📋 Review Summary

This PR introduces a significant and well-structured optimization for the event service, gated behind a new configuration version. The changes effectively separate the new event handling logic from the old, allowing for a controlled rollout. The use of dedicated services for different event processing stages (history, realtime, solid) is a good design choice.

🔍 General Feedback

  • The overall architecture of the new event service is modular and seems to address the goal of optimizing event handling.
  • Unit tests have been added for the new components, which is excellent.
  • The use of single-threaded executors in the new services simplifies concurrency management within those components.
  • The PR carefully manages backward compatibility by keying the new functionality on a version number in the configuration.

Comment on lines +44 to +46
logger.info("History event service start.");
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 For better lifecycle management and resource control, consider using a managed `ExecutorService` instead of creating a raw `Thread`. This would align with the pattern used in the other new services (`BlockEventLoad`, `RealtimeEventService`, `SolidEventService`) and provide more robust control over the thread's lifecycle.
Suggested change
logger.info("History event service start.");
}
private final ExecutorService executor = ExecutorServiceManager
.newSingleThreadExecutor("history-event");
...
public void init() {
if (instance.getStartSyncBlockNum() <= 0) {
initEventService(manager.getChainBaseManager().getHeadBlockId());
return;
}
isRunning = true;
executor.submit(() -> syncEvent());
logger.info("History event service start.");
}
public void close() {
isRunning = false;
executor.shutdown();
}

Comment on lines +69 to +75
l2.add(tmp);
tmp = BlockEventCache.getBlockEvent(tmp.getParentId());
}
blockEvent = blockEventGet.getBlockEvent(tmpNum);
l1.add(blockEvent);
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟠 There's a potential `NullPointerException` here if a chain reorganization is deeper than the events stored in `BlockEventCache`. The `while` loop traverses blocks backwards using `tmp = BlockEventCache.getBlockEvent(tmp.getParentId())`. If `getParentId()` refers to a block that is no longer in the cache, `BlockEventCache.getBlockEvent()` will return `null`. The next iteration's access to `tmp.getBlockId()` would then throw a `NullPointerException`, crashing the "event-load" thread and halting event processing.

Consider adding a null check for tmp to handle this edge case gracefully.

Suggested change
l2.add(tmp);
tmp = BlockEventCache.getBlockEvent(tmp.getParentId());
}
blockEvent = blockEventGet.getBlockEvent(tmpNum);
l1.add(blockEvent);
}
while (!blockEvent.getParentId().equals(tmp.getBlockId())) {
tmpNum--;
if (tmpNum == tmp.getBlockId().getNum()) {
l2.add(tmp);
tmp = BlockEventCache.getBlockEvent(tmp.getParentId());
if (tmp == null) {
// This indicates a deep reorg beyond the cache's depth.
// The service should probably re-initialize or handle this state.
logger.error("Deep reorg detected. Block {}'s parent is not in cache.",
l2.get(l2.size()-1).getBlockId().getString());
// Depending on desired behavior, you might want to clear the cache and
// re-initialize from the last solid block, or simply break here.
break;
}
}
blockEvent = blockEventGet.getBlockEvent(tmpNum);
l1.add(blockEvent);
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 `BlockEventCache` is implemented as a global singleton with static fields. While the use of `ConcurrentHashMap` helps with individual map operations, the methods `add`, `remove`, and `init` perform multiple operations across different fields (`solidNum`, `head`, `solidId`, `blockEventMap`, `numMap`) that are not atomic as a whole.

For example, the add method modifies numMap, blockEventMap, head, and solidId without a common lock. If add and remove were to be called from different threads, this could lead to an inconsistent state in the cache.

The current implementation seems to rely on callers (BlockEventLoad and SolidEventService) to provide synchronization. This implicit requirement is fragile. Consider making the synchronization explicit within BlockEventCache by using synchronized on the methods or a dedicated lock object to ensure atomicity of operations. This would make the class more robust and thread-safe on its own.

@0xbigapple 0xbigapple closed this Jan 23, 2026
@0xbigapple 0xbigapple reopened this Jan 23, 2026
Copy link

@github-actions github-actions bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

📋 Review Summary

This PR introduces a new, versioned event subscription service (EventService) intended to optimize event processing by decoupling it from the main block processing path. While the architectural intent of offloading event processing is sound, the implementation introduces critical performance and liveness risks due to incorrect synchronization and potential data loss in the new asynchronous queue.

🔍 General Feedback

  • Locking & Liveness (Critical): The BlockEventLoad service synchronizes on the global Manager instance while performing heavy database and parsing operations. This will likely cause severe contention, blocking core node functions like consensus and RPC.
  • Data Reliability: The RealtimeEventService uses a bounded queue that silently drops events when full. For a system meant to provide reliable event logs, this is a significant regression compared to the previous synchronous model.
  • Thread Management: HistoryEventService spawns an unmanaged thread, which deviates from the project's pattern of using ExecutorService and complicates graceful shutdown.
  • Static State: BlockEventCache relies on static mutable state, making it difficult to test and prone to race conditions in a concurrent environment.

PR Review Stability Analysis

1. PR Change Type Classification

Mixed (Modification + Addition)

  • Modification: Manager logic for triggering events is altered based on version config.
  • Addition: New EventService ecosystem (Realtime, History, Solid, BlockEventLoad, BlockEventCache).

2. Lifecycle & Resource Safety

  • Issue: HistoryEventService creates an unmanaged thread (new Thread(...).start()) which is not tracked by an ExecutorService.
  • Check: ApplicationImpl correctly calls eventService.init() and close(). EventService delegates to sub-services.

3. Executor / Thread Pool Shutdown Semantics

  • Checked: Most services (RealtimeEventService, SolidEventService, BlockEventLoad) use ExecutorServiceManager and correctly call shutdown() in their close() methods.

4. Behavioral Compatibility & Implicit Invariants

  • Risk: The new system moves from synchronous event posting (in Manager) to an asynchronous polling model (BlockEventLoad polling every 100ms). This introduces latency and the possibility of queue overflow, changing the implicit guarantee that "processed block = triggered event".

5. Business Logic Correctness

  • Risk: The re-organization logic in BlockEventLoad assumes strict parent-child continuity in the cache. If the cache is incomplete or a deep reorg occurs, BlockEventCache.getBlockEvent(tmp.getParentId()) may throw a NullPointerException.

6. Concurrency & Execution Interleaving Risks

  • CRITICAL: BlockEventLoad.load() synchronizes on the manager instance. Manager uses synchronized(this) for many critical paths. Holding this lock during heavy BlockEventGet operations (DB I/O, ABI parsing) is a major liveness risk.
  • Risk: BlockEventCache uses static mutable state (head, solidId) accessed by multiple threads (BlockEventLoad, SolidEventService). The compound check-then-act operations are not atomic.

7. Numeric Precision & Determinism

  • Checked: No significant numeric precision risks found. long is used for block numbers.

8. System-Level Risk Summary

  • Performance/Liveness Risk: High. The locking strategy in BlockEventLoad threatens to stall the entire node.
  • Data Integrity Risk: High. The RealtimeEventService bounded queue drops events on overflow, leading to incomplete data for subscribers.
  • Concurrency Risk: Medium. Static shared state in BlockEventCache and unmanaged threads.

if (cacheHeadNum >= tmpNum) {
return;
}
synchronized (manager) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 Locking the `Manager` instance while performing heavy database I/O and log parsing in `blockEventGet.getBlockEvent(tmpNum)` is a critical performance and liveness risk.

The Manager class often uses synchronized(this) for critical consensus and state operations. Holding this lock for the duration of event processing (which involves iterating transactions, decoding ABI, and multiple DB reads) will block the main node thread, potentially causing:

  1. Missed block production slots.
  2. Delayed block processing/verification.
  3. RPC timeouts.

Action Required:
Do not synchronize on manager. If you need a consistent view of the chain head, retrieve the necessary atomic values (e.g., head block number) and then perform the getBlockEvent processing without holding the lock. If Manager methods are not thread-safe for read access, exposes thread-safe read-only views or snapshots.

logger.warn("Add event failed, blockId {}.", event.getBlockEvent().getBlockId().getString());
return;
}
queue.offer(event);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟠 The use of a bounded queue with a "drop on full" policy (`queue.offer`) introduces silent data loss under load.

If the consumer (work() method) cannot keep up with the producer (block processing), events will be discarded, and the external event subscribers will miss triggers. This breaks the guarantee of reliable event delivery.

Action Required:

  1. Consider using a blocking put (with backpressure) if event delivery is critical, though this risks blocking the producer.
  2. Alternatively, use an unbounded queue (monitor memory) or a persistent queue.
  3. If dropping is intended, it must be explicitly monitored via metrics/logs (which is partially done here), but for a core event service, dropping is usually unacceptable.


isRunning = true;

new Thread(() -> syncEvent()).start();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 This creates an unmanaged thread that is not tied to the application's lifecycle or an `ExecutorService`.

If the application shuts down or if HistoryEventService is closed, this thread relies on isRunning flag and sleep wake-up. While isRunning is checked, unmanaged threads can leak or complicate graceful shutdown (e.g., no awaitTermination).

Suggestion:
Use a ExecutorService (e.g., newSingleThreadExecutor) to manage this task, similar to how RealtimeEventService is implemented. This ensures consistent thread management and shutdown behavior.

Comment on lines +17 to +25
private static volatile long solidNum;

@Getter
private static volatile BlockEvent head;

@Getter
private static volatile BlockCapsule.BlockId solidId;

private static Map<BlockCapsule.BlockId, BlockEvent> blockEventMap = new ConcurrentHashMap<>();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 The reliance on `static` mutable state (`blockEventMap`, `numMap`, `head`, `solidId`) complicates testing and isolation.

More importantly, while ConcurrentHashMap is thread-safe for individual operations, the compound operations in add and remove (checking numMap, updating blockEventMap, updating head/solidId) are not atomic.

For example:

  1. add checks num > head... and updates head.
  2. Another thread could intervene.

While BlockEventLoad might be the single writer effectively, SolidEventService calls remove concurrently.

Suggestion:
Make BlockEventCache an instance bean (Singleton) managed by Spring/ServiceContainer rather than a static utility class. Protect shared state with explicit locking or use atomic references for the head/solid pointers if concurrent access is expected.

tmpNum--;
if (tmpNum == tmp.getBlockId().getNum()) {
l2.add(tmp);
tmp = BlockEventCache.getBlockEvent(tmp.getParentId());

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Potential `NullPointerException`.

BlockEventCache.getBlockEvent(tmp.getParentId()) returns null if the parent is not in the cache. If tmp refers to the oldest block in the cache, tmp.getParentId() might not be present.

Then tmp = null, and the next iteration's tmp.getBlockId() (implicit or explicit) or blockEvent.getParentId().equals(...) check will throw NPE.

Action Required:
Add a null check for tmp after retrieval. If tmp is null, deciding whether to break the loop or fetch from DB is necessary.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants