-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathsync.js
More file actions
343 lines (284 loc) · 9.8 KB
/
sync.js
File metadata and controls
343 lines (284 loc) · 9.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
require('dotenv').config();
const mongoose = require('mongoose');
const axios = require('axios');
const Block = require('./models/Block');
const Transaction = require('./models/Transaction');
const Stats = require('./models/Stats');
// S256 RPC configuration
const RPC_USER = process.env.RPC_USER || 'user';
const RPC_PASSWORD = process.env.RPC_PASSWORD || 'password';
const RPC_HOST = process.env.RPC_HOST || '127.0.0.1';
const RPC_PORT = process.env.RPC_PORT || '25332';
const MONGODB_URI = process.env.MONGODB_URI || 'mongodb://localhost:27017/s256explorer';
const RPC_URL = `http://${RPC_USER}:${RPC_PASSWORD}@${RPC_HOST}:${RPC_PORT}`;
let isSyncing = false;
let currentHeight = -1;
// RPC helper function
async function rpcCall(method, params = []) {
try {
const response = await axios.post(RPC_URL, {
jsonrpc: '1.0',
id: 'sync',
method: method,
params: params
}, {
headers: {
'Content-Type': 'application/json'
},
timeout: 30000
});
return response.data.result;
} catch (error) {
console.error(`RPC Error (${method}):`, error.message);
throw error;
}
}
// Connect to MongoDB
async function connectDB() {
try {
await mongoose.connect(MONGODB_URI);
console.log('Connected to MongoDB');
} catch (error) {
console.error('MongoDB connection error:', error);
process.exit(1);
}
}
// Fetch prevout data for transaction inputs
async function fetchPrevouts(vin) {
const vinWithPrevout = [];
for (const input of vin) {
if (input.coinbase) {
// Coinbase transaction has no prevout
vinWithPrevout.push(input);
continue;
}
try {
// Look up the previous transaction to get the output being spent
const prevTx = await rpcCall('getrawtransaction', [input.txid, true]);
const prevOutput = prevTx.vout[input.vout];
vinWithPrevout.push({
...input,
prevout: prevOutput
});
} catch (error) {
console.error(`Failed to fetch prevout for ${input.txid}:${input.vout}:`, error.message);
vinWithPrevout.push(input);
}
}
return vinWithPrevout;
}
// Sync a single block
async function syncBlock(height) {
try {
// Get block from RPC
const blockHash = await rpcCall('getblockhash', [height]);
const blockData = await rpcCall('getblock', [blockHash, 2]);
// Check if this block hash already exists and is NOT an orphan
const existingBlock = await Block.findOne({ hash: blockHash, isOrphan: false });
if (existingBlock && existingBlock.height === height) {
// console.log(`⏭️ Block ${height} already synced (${blockHash})`);
return;
}
console.log(`🔄 Syncing block ${height} (${blockHash})...`);
// Process transactions
const txids = [];
for (const tx of blockData.tx) {
const txData = typeof tx === 'string' ? await rpcCall('getrawtransaction', [tx, true]) : tx;
// Fetch prevout data for inputs (to track sent amounts)
if (txData.vin && txData.vin.length > 0) {
txData.vin = await fetchPrevouts(txData.vin);
}
// Save transaction
await Transaction.findOneAndUpdate(
{ txid: txData.txid },
{
...txData,
blockheight: height,
blockhash: blockHash,
isOrphan: false // Ensure it's not marked as orphan if we're re-syncing
},
{ upsert: true, new: true }
);
txids.push(txData.txid);
}
// Save block
await Block.findOneAndUpdate(
{ hash: blockHash },
{
...blockData,
tx: txids,
isOrphan: false
},
{ upsert: true, new: true }
);
console.log(`✅ Synced block ${height} (${txids.length} transactions)`);
} catch (error) {
console.error(`❌ Error syncing block ${height}:`, error.message);
throw error;
}
}
// Check for chain reorg and handle rollback
async function checkForReorg() {
try {
// Get highest synced block from DB
const lastBlock = await Block.findOne({ isOrphan: false }).sort({ height: -1 });
if (!lastBlock) return -1;
let height = lastBlock.height;
let reorgDetected = false;
// Check backwards from current height
while (height >= 0) {
const dbBlock = await Block.findOne({ height, isOrphan: false });
if (!dbBlock) {
height--;
continue;
}
const rpcHash = await rpcCall('getblockhash', [height]);
if (dbBlock.hash === rpcHash) {
// Found the common ancestor
if (reorgDetected) {
console.log(`🔗 Fork point found at height ${height}. Continuing sync from ${height + 1}.`);
}
break;
} else {
// Mismatch! This block is now an orphan
reorgDetected = true;
console.log(`⚠️ Reorg detected at height ${height}! DB hash: ${dbBlock.hash}, RPC hash: ${rpcHash}`);
// Mark block and its transactions as orphans
await Block.updateOne({ hash: dbBlock.hash }, { isOrphan: true, confirmations: -1 });
await Transaction.updateMany({ blockhash: dbBlock.hash }, { isOrphan: true, confirmations: -1 });
height--;
}
// Safety limit: don't rollback more than 100 blocks at a time
if (lastBlock.height - height > 100) {
console.warn('⚠️ Deep reorg detected (>100 blocks). Manual intervention might be needed.');
break;
}
}
return height;
} catch (error) {
console.error('❌ Reorg check error:', error.message);
return -1;
}
}
// Initial sync (catch up with blockchain)
async function initialSync() {
try {
console.log('🔄 Starting initial sync...');
// 1. Check for reorg first
await checkForReorg();
// 2. Get current blockchain height
const blockchainInfo = await rpcCall('getblockchaininfo');
const chainHeight = blockchainInfo.blocks;
// 3. Get highest synced block
const lastBlock = await Block.findOne({ isOrphan: false }).sort({ height: -1 });
const startHeight = lastBlock ? lastBlock.height + 1 : 0;
console.log(`📊 Chain height: ${chainHeight}`);
console.log(`📊 Last synced: ${lastBlock ? lastBlock.height : 'none'}`);
console.log(`📊 Blocks to sync: ${chainHeight - startHeight + 1}`);
// Sync missing blocks
for (let height = startHeight; height <= chainHeight; height++) {
await syncBlock(height);
// Progress update every 10 blocks
if (height % 10 === 0) {
const progress = ((height - startHeight) / (chainHeight - startHeight + 1) * 100).toFixed(2);
console.log(`📈 Progress: ${progress}% (${height}/${chainHeight})`);
}
}
currentHeight = chainHeight;
console.log('✅ Initial sync complete!');
} catch (error) {
console.error('❌ Initial sync error:', error);
throw error;
}
}
// Monitor for new blocks
async function monitorNewBlocks() {
if (isSyncing) return;
try {
isSyncing = true;
// 1. Check for reorg before proceeding
const forkPoint = await checkForReorg();
// 2. Get current blockchain height
const blockchainInfo = await rpcCall('getblockchaininfo');
const chainHeight = blockchainInfo.blocks;
// 3. Determine where to start syncing
// If we had a reorg, start from forkPoint + 1
// Otherwise, start from highest synced block + 1
const lastBlock = await Block.findOne({ isOrphan: false }).sort({ height: -1 });
let startSyncHeight = lastBlock ? lastBlock.height + 1 : 0;
// Sync new blocks
if (chainHeight >= startSyncHeight) {
console.log(`🔔 Syncing blocks: ${startSyncHeight} to ${chainHeight}`);
for (let height = startSyncHeight; height <= chainHeight; height++) {
await syncBlock(height);
}
currentHeight = chainHeight;
}
// Update stats
const networkInfo = await rpcCall('getnetworkinfo');
const miningInfo = await rpcCall('getmininginfo');
const mempoolInfo = await rpcCall('getmempoolinfo');
await Stats.create({
blocks: blockchainInfo.blocks,
difficulty: blockchainInfo.difficulty,
chainwork: blockchainInfo.chainwork,
connections: networkInfo.connections,
networkhashps: miningInfo.networkhashps,
mempoolsize: mempoolInfo.size
});
} catch (error) {
console.error('❌ Monitor error:', error.message);
} finally {
isSyncing = false;
}
}
// Update confirmations for recent blocks
async function updateConfirmations() {
try {
const recentBlocks = await Block.find().sort({ height: -1 }).limit(100);
for (const block of recentBlocks) {
const blockData = await rpcCall('getblock', [block.hash, 1]);
if (blockData.confirmations !== block.confirmations) {
await Block.updateOne(
{ hash: block.hash },
{
confirmations: blockData.confirmations,
nextblockhash: blockData.nextblockhash
}
);
await Transaction.updateMany(
{ blockhash: block.hash },
{ confirmations: blockData.confirmations }
);
}
}
} catch (error) {
console.error('❌ Update confirmations error:', error.message);
}
}
// Main sync process
async function startSync() {
try {
await connectDB();
console.log('🚀 S256 Block Sync Service Started');
console.log(`📡 Connected to RPC: ${RPC_HOST}:${RPC_PORT}`);
// Initial sync
await initialSync();
// Monitor for new blocks every 30 seconds
console.log('👀 Monitoring for new blocks...');
setInterval(monitorNewBlocks, 30000);
// Update confirmations every 5 minutes
setInterval(updateConfirmations, 300000);
} catch (error) {
console.error('❌ Sync service error:', error);
process.exit(1);
}
}
// Handle shutdown gracefully
process.on('SIGINT', async () => {
console.log('\n⏹️ Shutting down sync service...');
await mongoose.connection.close();
process.exit(0);
});
// Start the sync service
startSync();