Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ Format follows [Keep a Changelog 1.1.0](https://keepachangelog.com/en/1.1.0/). I

### Added

- `slots.da_block_height` column (migration `20260518000002_slots_da_block_height.sql`) + matching `da_block_height: Option<u64>` field on `BlockResponse`. The indexer's `extract_slot_first_batch_facts` (renamed from `extract_slot_proposer` to reflect that it now pulls two related fields from the same first-batch fetch) reads `receipt.da_block_height` from chain v0.2.3+ batch JSON and writes the BIGINT through the existing slot upsert. COALESCE-preserve semantics mirror the `proposer` column: a re-poll that can't reach batches doesn't blank a known value. `null` on slots ingested before chain v0.2.3 (no backfill yet) and on slots whose first-batch fetch fails. Powers the explorer's "View on Celenium" deep-link per `ligate-io/ligate-chain#355` (`https://mocha.celenium.io/blocks/{da_block_height}`).
- `GET /v1/stats/drips-daily?days=N` endpoint. Returns daily faucet-drip counts broken down by source: `web` (chain-side count of `bank.transfer` txs from the faucet sender, read from the indexer's `transactions` table) and `bot` (api-side count from `bot_drips`). Powers the cost dashboard's drips-per-day panel without Grafana needing to aggregate two heterogeneous sources client-side. Same `DailyPoint`-style shape as `/v1/stats/attestations-daily`. 30s cached, capped at 90 days of history.
- `POST /v1/drip-bot` endpoint for the Discord faucet bot. Header-gated via `X-Bot-Secret`; uses the same hot-key signer as `POST /v1/drip` so there's a single nonce stream and no inter-endpoint coordination. Tier-aware amounts validated server-side (100 / 250 / 500 / 1000 LGT for newcomer / regular / veteran / elder, by Discord server tenure). 5-day cooldown, applied independently to (a) per-address and (b) per-Discord-user counters; both must clear. Endpoint disabled (returns 503) if `FAUCET_BOT_SECRET` is unset, so safe to merge before the bot is deployed.
- `bot_drips` Postgres table for durable cooldown state (`migrations/20260518000001_bot_drips.sql`). Two B-tree indexes on `(address, dripped_at DESC)` and `(discord_user_id, dripped_at DESC)` so each cooldown check is a single index seek. Cooldowns persisted to Postgres (not in-memory like the web faucet's `RateLimiter`) because Railway restarts during a 5-day window would otherwise lose multi-day cooldowns.
Expand Down
5 changes: 5 additions & 0 deletions crates/api/src/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -547,6 +547,11 @@ fn slot_to_block_response(row: queries::SlotRow) -> BlockResponse {
finalized_at: row
.finalized_at
.map(|t| t.to_rfc3339_opts(SecondsFormat::Millis, true)),
// Cast i64 (SQL BIGINT) → u64 for the API surface. Celestia
// heights fit in i64 for ~292 billion years at 1 block/sec,
// so the cast is safe in practice. Stays `None` for slots
// ingested before chain v0.2.3 shipped the field.
da_block_height: row.da_block_height.map(|h| h as u64),
}
}

Expand Down
16 changes: 13 additions & 3 deletions crates/api/src/queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,13 @@ pub struct SlotRow {
/// before we tracked it. See indexer's `repoll_pending_loop`
/// for how this is populated.
pub finalized_at: Option<DateTime<Utc>>,
/// Celestia (DA) block height where this slot's first batch
/// landed. Source: indexer's `extract_slot_first_batch_facts`,
/// reading `receipt.da_block_height` from the chain JSON (chain
/// v0.2.3+, ligate-io/ligate-chain#355). `None` on rows ingested
/// before chain v0.2.3 (no backfill yet) or whose first-batch
/// fetch failed. Powers the explorer's Celenium deep-link.
pub da_block_height: Option<i64>,
}

/// Read the highest slot height the indexer has written. `None` for
Expand All @@ -61,7 +68,7 @@ pub async fn max_slot_height(pool: &PgPool) -> sqlx::Result<Option<i64>> {
pub async fn slot_by_height(pool: &PgPool, height: i64) -> sqlx::Result<Option<SlotRow>> {
let row = sqlx::query_as::<_, SlotTuple>(
"SELECT height, hash, prev_hash, state_root, timestamp,
batch_count, tx_count, proposer, finality_status, finalized_at
batch_count, tx_count, proposer, finality_status, finalized_at, da_block_height
FROM slots WHERE height = $1",
)
.bind(height)
Expand All @@ -87,6 +94,7 @@ type SlotTuple = (
Option<String>, // proposer
Option<String>, // finality_status
Option<DateTime<Utc>>, // finalized_at
Option<i64>, // da_block_height
);

fn slot_row_from_tuple(t: SlotTuple) -> SlotRow {
Expand All @@ -101,6 +109,7 @@ fn slot_row_from_tuple(t: SlotTuple) -> SlotRow {
proposer,
finality_status,
finalized_at,
da_block_height,
) = t;
SlotRow {
height,
Expand All @@ -113,6 +122,7 @@ fn slot_row_from_tuple(t: SlotTuple) -> SlotRow {
proposer,
finality_status,
finalized_at,
da_block_height,
}
}

Expand All @@ -137,7 +147,7 @@ pub async fn slots_page(
Some(h) => {
sqlx::query_as(
"SELECT height, hash, prev_hash, state_root, timestamp,
batch_count, tx_count, proposer, finality_status, finalized_at
batch_count, tx_count, proposer, finality_status, finalized_at, da_block_height
FROM slots
WHERE height < $1
ORDER BY height DESC
Expand All @@ -151,7 +161,7 @@ pub async fn slots_page(
None => {
sqlx::query_as(
"SELECT height, hash, prev_hash, state_root, timestamp,
batch_count, tx_count, proposer, finality_status, finalized_at
batch_count, tx_count, proposer, finality_status, finalized_at, da_block_height
FROM slots
ORDER BY height DESC
LIMIT $1",
Expand Down
12 changes: 12 additions & 0 deletions crates/api/src/responses.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,18 @@ pub struct BlockResponse {
/// `/v1/stats/finality`.
#[serde(skip_serializing_if = "Option::is_none")]
pub finalized_at: Option<String>,
/// Celestia (DA) block height where this slot's first batch's
/// blob was included. Extracted by the indexer from the chain's
/// `receipt.da_block_height` field (added in chain v0.2.3 per
/// ligate-io/ligate-chain#355). Powers the explorer's
/// "View on Celenium" deep-link: the per-block UI builds
/// `https://mocha.celenium.io/blocks/{da_block_height}`.
///
/// `null` for blocks indexed before this field shipped (chain
/// v0.2.2 and earlier did not emit it; no backfill yet) and for
/// the rare slot whose first-batch fetch failed at ingest time.
#[serde(skip_serializing_if = "Option::is_none")]
pub da_block_height: Option<u64>,
}

/// One transaction, served at `GET /v1/txs/{hash}` and as each
Expand Down
25 changes: 22 additions & 3 deletions crates/indexer/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,18 @@ pub async fn write_chain_identity(pool: &PgPool, info: &RollupInfo) -> Result<()
/// observation, not the true chain finalization moment (chain
/// doesn't emit that), but it's within one indexer poll interval
/// of truth — accurate enough for `/v1/stats/finality` percentiles.
pub async fn upsert_slot(pool: &PgPool, slot: &SlotResponse, proposer: Option<&str>) -> Result<()> {
pub async fn upsert_slot(
pool: &PgPool,
slot: &SlotResponse,
proposer: Option<&str>,
// Celestia (DA) block height where this slot's first batch's blob
// was included. Extracted from `receipt.da_block_height` by the
// caller; `None` for slots ingested before chain v0.2.3 (the field
// didn't exist) or for slots whose first-batch fetch failed.
// Same COALESCE-preserve pattern as `proposer` so a re-poll that
// can't refetch batches doesn't blank a known value.
da_block_height: Option<i64>,
) -> Result<()> {
// Chain emits `slot.timestamp` as Unix MILLISECONDS (verified
// against localnet: 1778527856952 → 2026-05-11T...). Earlier
// code parsed via `timestamp_opt(s, 0)` which treats the input
Expand Down Expand Up @@ -115,7 +126,8 @@ pub async fn upsert_slot(pool: &PgPool, slot: &SlotResponse, proposer: Option<&s
sqlx::query(
"INSERT INTO slots (
height, hash, prev_hash, state_root, timestamp,
batch_count, tx_count, proposer, finality_status, finalized_at, raw
batch_count, tx_count, proposer, finality_status, finalized_at, raw,
da_block_height
)
VALUES (
$1, $2, $3, $4, $5,
Expand All @@ -126,7 +138,8 @@ pub async fn upsert_slot(pool: &PgPool, slot: &SlotResponse, proposer: Option<&s
-- chain reports 'pending' first, so this is NULL and
-- the re-poll loop will stamp NOW() on the flip.
CASE WHEN $9 = 'finalized' THEN NOW() ELSE NULL END,
$10
$10,
$11
)
ON CONFLICT (height) DO UPDATE SET
hash = EXCLUDED.hash,
Expand All @@ -153,6 +166,11 @@ pub async fn upsert_slot(pool: &PgPool, slot: &SlotResponse, proposer: Option<&s
ELSE slots.finalized_at
END,
raw = EXCLUDED.raw,
-- COALESCE-preserve for da_block_height too: a re-poll
-- that can't reach the first batch doesn't blank a known
-- height. Slots from chain v0.2.2 and earlier (no field)
-- stay NULL forever unless explicitly backfilled.
da_block_height = COALESCE(EXCLUDED.da_block_height, slots.da_block_height),
indexed_at = NOW()",
)
.bind(slot.number as i64)
Expand All @@ -165,6 +183,7 @@ pub async fn upsert_slot(pool: &PgPool, slot: &SlotResponse, proposer: Option<&s
.bind(proposer)
.bind(slot.finality_status.as_deref())
.bind(raw)
.bind(da_block_height)
.execute(pool)
.await?;
Ok(())
Expand Down
94 changes: 69 additions & 25 deletions crates/indexer/src/ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,17 +137,24 @@ pub async fn run(client: NodeClient, pool: PgPool, start_height: Option<u64>) ->
match client.slot_at(h).await {
Ok(Some(slot)) => {
// Fetch the proposer (sequencer's Celestia
// `da_address`) from the slot's first batch.
// Tolerates failure: a missing proposer just
// leaves the column NULL; the slot still lands.
// `da_address`) AND da_block_height from the slot's
// first batch. Tolerates failure: missing values
// just leave the columns NULL; the slot still lands.
// ingest_slot_transactions below will re-fetch
// batches for tx processing; we accept the
// duplicate fetch for ~6s/slot rate so the
// slot upsert path stays simple. If this
// becomes a hot path, refactor to share the
// batch object between the two consumers.
let proposer = extract_slot_proposer(&client, &slot).await;
if let Err(e) = db::upsert_slot(&pool, &slot, proposer.as_deref()).await {
// slot upsert path stays simple. If this becomes
// a hot path, refactor to share the batch object
// between the two consumers.
let facts = extract_slot_first_batch_facts(&client, &slot).await;
if let Err(e) = db::upsert_slot(
&pool,
&slot,
facts.proposer.as_deref(),
facts.da_block_height,
)
.await
{
error!(error = %e, height = h, "writing slot; will retry");
tokio::time::sleep(ERROR_BACKOFF).await;
// Don't advance `next_height`; outer loop
Expand Down Expand Up @@ -466,41 +473,78 @@ async fn update_address_summaries(
/// processing. Acceptable at devnet-1 rate (~1 slot per 6 seconds);
/// if we ever need to optimize, refactor to share the LedgerBatch
/// between both consumers.
async fn extract_slot_proposer(client: &NodeClient, slot: &SlotResponse) -> Option<String> {
let batch_range = slot.batch_range.as_ref()?;
/// Per-slot data we extract from the first batch's receipt. Tuple
/// types stay in scope for both `extract_slot_first_batch_facts` and
/// its caller in the forward-walk loop; promoting to a named struct
/// is purely cosmetic at this size.
#[derive(Default)]
pub(crate) struct SlotFirstBatchFacts {
/// `receipt.da_address` — the sequencer's Celestia wallet. Same
/// semantics as before this change; named for clarity since the
/// tuple now carries two related fields.
pub proposer: Option<String>,
/// `receipt.da_block_height` — the Celestia mocha-4 block height
/// where the batch's blob was included. Source: chain v0.2.3+
/// (ligate-io/ligate-chain#355). `None` for batches produced
/// before the chain shipped the field, OR for batches whose
/// first-batch fetch failed.
pub da_block_height: Option<i64>,
}

async fn extract_slot_first_batch_facts(
client: &NodeClient,
slot: &SlotResponse,
) -> SlotFirstBatchFacts {
let Some(batch_range) = slot.batch_range.as_ref() else {
return SlotFirstBatchFacts::default();
};
let first_batch_number = batch_range.start;
let batch = match client.batch_at(first_batch_number).await {
Ok(Some(b)) => b,
Ok(None) => {
debug!(
slot = slot.number,
batch = first_batch_number,
"first batch 404 during proposer extract; proposer stays NULL"
"first batch 404 during slot-facts extract; proposer + da_block_height stay NULL"
);
return None;
return SlotFirstBatchFacts::default();
}
Err(e) => {
debug!(
slot = slot.number,
batch = first_batch_number,
error = %e,
"first batch fetch failed during proposer extract; proposer stays NULL"
"first batch fetch failed during slot-facts extract; proposer + da_block_height stay NULL"
);
return None;
return SlotFirstBatchFacts::default();
}
};
// LedgerBatch's typed shape ends at `tx_range`; everything below
// is in the `raw` catch-all map. `receipt.da_address` is the
// path we care about. Multi-level Value drill-down with as_str()
// at the leaf — returns None on any missing layer or wrong type.
let da_address = batch
.raw
.get("receipt")?
.as_object()?
.get("da_address")?
.as_str()?
.to_string();
Some(da_address)
// is in the `raw` catch-all map. `receipt.da_address` +
// `receipt.da_block_height` are the two fields we extract here.
// Multi-level Value drill-down with type-checked leaves — returns
// None on any missing layer or wrong type.
let receipt = match batch.raw.get("receipt").and_then(|v| v.as_object()) {
Some(r) => r,
None => return SlotFirstBatchFacts::default(),
};
let proposer = receipt
.get("da_address")
.and_then(|v| v.as_str())
.map(|s| s.to_string());
// `da_block_height` came online in chain v0.2.3 via #355. Treat
// missing field, JSON null, or non-u64 shape as None — old
// batches from v0.2.2 and earlier have nothing here. Cast to
// i64 for the SQL bind (BIGINT). Celestia heights fit in i64
// for ~292 billion years at 1 block/sec, so the cast is safe.
let da_block_height = receipt
.get("da_block_height")
.and_then(|v| v.as_u64())
.and_then(|h| i64::try_from(h).ok());
SlotFirstBatchFacts {
proposer,
da_block_height,
}
}

/// Background task: scan for `finality_status = 'pending'` slots,
Expand Down
19 changes: 19 additions & 0 deletions migrations/20260518000002_slots_da_block_height.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
-- slots.da_block_height — Celestia (DA) block height where this slot's
-- batch blob was included. Powers explorer-side "View on Celenium"
-- deep-links per ligate-io/ligate-chain#355.
--
-- Nullable for two reasons:
-- 1. Historical rows ingested before this migration land here with
-- NULL until an optional one-shot backfill re-fetches each slot's
-- first batch and updates the column.
-- 2. Slots whose first batch fetch fails (404 / transient RPC error)
-- keep NULL even after the migration. The slot row itself still
-- lands; only the deep-link is unavailable.
--
-- BIGINT (Postgres i64) is the right shape: Celestia heights are u64
-- but a chain producing a block per second wouldn't hit i64::MAX
-- for ~292 billion years. The indexer-side bind goes through i64 for
-- sqlx ergonomics; same convention as the existing `slot.height`
-- column.

ALTER TABLE slots ADD COLUMN IF NOT EXISTS da_block_height BIGINT;
Loading