Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@ Format follows [Keep a Changelog 1.1.0](https://keepachangelog.com/en/1.1.0/). I

### Added

- Three activity-leaderboard endpoints under `/v1/stats/`:
- `GET /top-attesters?n=10` — addresses ranked by attestation submissions (`attestations.submitter`). All-time count.
- `GET /top-schema-owners?n=10` — addresses ranked by schemas registered (`schemas.owner`). All-time count.
- `GET /top-attestor-sets?n=10` — attestor sets (`las1...`) ranked by `attestor_sets.schema_count` (denormalised counter maintained by ingest, so the query is a single index scan).
All three share a `LeaderboardResponse { window: "all-time", rows: [{ rank, address, count }] }` shape so Grafana table panels can use one template per leaderboard. `n` defaults to 10, capped at 100. 30s in-process cache + `Cache-Control: public, max-age=30` matching the rest of `/v1/stats/*`. Powers the chain Grafana dashboard's forthcoming "Activity leaders" row (filed as a follow-up issue) and a future explorer leaderboard page.
- `slots.da_block_height` column (migration `20260518000002_slots_da_block_height.sql`) + matching `da_block_height: Option<u64>` field on `BlockResponse`. The indexer's `extract_slot_first_batch_facts` (renamed from `extract_slot_proposer` to reflect that it now pulls two related fields from the same first-batch fetch) reads `receipt.da_block_height` from chain v0.2.3+ batch JSON and writes the BIGINT through the existing slot upsert. COALESCE-preserve semantics mirror the `proposer` column: a re-poll that can't reach batches doesn't blank a known value. `null` on slots ingested before chain v0.2.3 (no backfill yet) and on slots whose first-batch fetch fails. Powers the explorer's "View on Celenium" deep-link per `ligate-io/ligate-chain#355` (`https://mocha.celenium.io/block/{da_block_height}` — singular `/block/`; `/blocks` is the list page).
- `GET /v1/stats/drips-daily?days=N` endpoint. Returns daily faucet-drip counts broken down by source: `web` (chain-side count of `bank.transfer` txs from the faucet sender, read from the indexer's `transactions` table) and `bot` (api-side count from `bot_drips`). Powers the cost dashboard's drips-per-day panel without Grafana needing to aggregate two heterogeneous sources client-side. Same `DailyPoint`-style shape as `/v1/stats/attestations-daily`. 30s cached, capped at 90 days of history.
- `POST /v1/drip-bot` endpoint for the Discord faucet bot. Header-gated via `X-Bot-Secret`; uses the same hot-key signer as `POST /v1/drip` so there's a single nonce stream and no inter-endpoint coordination. Tier-aware amounts validated server-side (100 / 250 / 500 / 1000 LGT for newcomer / regular / veteran / elder, by Discord server tenure). 5-day cooldown, applied independently to (a) per-address and (b) per-Discord-user counters; both must clear. Endpoint disabled (returns 503) if `FAUCET_BOT_SECRET` is unset, so safe to merge before the bot is deployed.
Expand Down
5 changes: 5 additions & 0 deletions crates/api/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,11 @@ async fn main() -> Result<()> {
)
.route("/v1/stats/drips-daily", get(stats::drips_daily))
.route("/v1/stats/top-holders", get(stats::top_holders))
// Activity leaderboards (api#XX): "who's using the chain most".
// Each one is a small GROUP BY against the indexer query tables.
.route("/v1/stats/top-attesters", get(stats::top_attesters))
.route("/v1/stats/top-schema-owners", get(stats::top_schema_owners))
.route("/v1/stats/top-attestor-sets", get(stats::top_attestor_sets))
// chain#442 phase 3: cluster topology proxy.
.route("/v1/cluster/nodes", get(cluster::nodes))
.layer(TraceLayer::new_for_http())
Expand Down
204 changes: 204 additions & 0 deletions crates/api/src/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
//! | `GET /new-wallets-daily?days=7` | Timeseries of new addresses (`address_summaries.first_seen_timestamp`) bucketed by UTC day. |
//! | `GET /tx-rate-daily?days=7` | Timeseries of tx counts bucketed by UTC day, broken down by `kind` (bank.transfer, register.schema, etc.) and `outcome`. |
//! | `GET /top-holders?n=10` | Top N LGT holders by current balance, queried live from the chain's bank module (no balance index in the api yet; fine for devnet's ~10-address scale, replace with indexed column before mainnet). |
//! | `GET /top-attesters?n=10` | Addresses ranked by attestation submissions (`attestations.submitter`). All-time count. |
//! | `GET /top-schema-owners?n=10` | Addresses ranked by schemas registered (`schemas.owner`). All-time count. |
//! | `GET /top-attestor-sets?n=10` | Attestor sets (`las1...`) ranked by `attestor_sets.schema_count` (denormalised counter maintained by ingest). |
//!
//! ## Caching
//!
Expand Down Expand Up @@ -1174,3 +1177,204 @@ mod tests {
assert!(parse_interval("5y").is_err()); // unknown unit
}
}

// ============================================================================
// Activity leaderboards (top attesters, top schema owners, top attestor sets)
//
// Three endpoints under `/v1/stats/` that surface "who's using the chain
// most" for Grafana panels + future explorer leaderboard pages. All three
// share a small `TopAddressRank` shape so Grafana can render them as a
// single table-panel template per leaderboard.
//
// Why this lives in the api, not in chain Prometheus: Prometheus is bad
// at high-cardinality labels (every address becomes a label value =
// unbounded series). Grafana points at the api directly via the Infinity
// HTTP-JSON datasource for the table view.
// ============================================================================

#[derive(Deserialize)]
pub struct LeaderboardQuery {
/// Number of rows to return. Default 10, capped at 100 to keep the
/// row scan + JSON envelope cheap. Grafana panels typically render
/// 10 to 25; analysts can pull 100 via direct curl.
#[serde(default)]
n: Option<u32>,
}

/// One row in a leaderboard response. `address` is the leaderboard key
/// (a submitter / owner / set id depending on the endpoint). `count`
/// is whatever the endpoint counts (attestations, schemas, ...) — its
/// semantic is documented per endpoint, not per row, so the shape stays
/// uniform across leaderboards.
#[derive(Serialize)]
struct TopAddressRank {
rank: u32,
address: String,
count: i64,
}

#[derive(Serialize)]
struct LeaderboardResponse {
/// Window the leaderboard covers. Today all three leaderboards are
/// `"all-time"` because the indexer doesn't bucket events by time
/// at the address level. A future variant could pass `?since=24h`
/// and switch to a SUM over `transactions.indexed_at`; the response
/// shape stays the same.
window: String,
rows: Vec<TopAddressRank>,
}

/// `GET /v1/stats/top-attesters?n=10` — addresses ranked by total number
/// of attestations submitted. Counts `attestations.submitter`. Identifier
/// is a bech32m `lig1...` (28-byte rollup address).
pub async fn top_attesters(
State(state): State<AppState>,
Query(params): Query<LeaderboardQuery>,
) -> Response {
let n = params.n.unwrap_or(10).min(100);
let key = format!("top-attesters:{n}");
if let Some(cached) = state.stats_cache.get_fresh(&key, DEFAULT_TTL) {
return cached_json_response(cached, TTL_STATS_DEFAULT_SECS);
}
match compute_top_attesters(&state.pg, n).await {
Ok(payload) => match serde_json::to_string(&payload) {
Ok(body) => {
state.stats_cache.put(key, body.clone());
cached_json_response(body, TTL_STATS_DEFAULT_SECS)
}
Err(e) => error_response(e.into()),
},
Err(e) => error_response(e),
}
}

async fn compute_top_attesters(pool: &PgPool, n: u32) -> anyhow::Result<LeaderboardResponse> {
let rows: Vec<(String, i64)> = sqlx::query_as(
"SELECT submitter, COUNT(*)::BIGINT \
FROM attestations \
GROUP BY submitter \
ORDER BY COUNT(*) DESC, submitter ASC \
LIMIT $1",
)
.bind(n as i64)
.fetch_all(pool)
.await
.context("top-attesters query")?;
Ok(LeaderboardResponse {
window: "all-time".to_string(),
rows: rows
.into_iter()
.enumerate()
.map(|(i, (address, count))| TopAddressRank {
rank: (i as u32) + 1,
address,
count,
})
.collect(),
})
}

/// `GET /v1/stats/top-schema-owners?n=10` — addresses ranked by total
/// number of schemas registered (`schemas.owner`). Same `lig1...`
/// address universe as top-attesters; an address may appear in both
/// leaderboards with different counts.
pub async fn top_schema_owners(
State(state): State<AppState>,
Query(params): Query<LeaderboardQuery>,
) -> Response {
let n = params.n.unwrap_or(10).min(100);
let key = format!("top-schema-owners:{n}");
if let Some(cached) = state.stats_cache.get_fresh(&key, DEFAULT_TTL) {
return cached_json_response(cached, TTL_STATS_DEFAULT_SECS);
}
match compute_top_schema_owners(&state.pg, n).await {
Ok(payload) => match serde_json::to_string(&payload) {
Ok(body) => {
state.stats_cache.put(key, body.clone());
cached_json_response(body, TTL_STATS_DEFAULT_SECS)
}
Err(e) => error_response(e.into()),
},
Err(e) => error_response(e),
}
}

async fn compute_top_schema_owners(pool: &PgPool, n: u32) -> anyhow::Result<LeaderboardResponse> {
let rows: Vec<(String, i64)> = sqlx::query_as(
"SELECT owner, COUNT(*)::BIGINT \
FROM schemas \
GROUP BY owner \
ORDER BY COUNT(*) DESC, owner ASC \
LIMIT $1",
)
.bind(n as i64)
.fetch_all(pool)
.await
.context("top-schema-owners query")?;
Ok(LeaderboardResponse {
window: "all-time".to_string(),
rows: rows
.into_iter()
.enumerate()
.map(|(i, (address, count))| TopAddressRank {
rank: (i as u32) + 1,
address,
count,
})
.collect(),
})
}

/// `GET /v1/stats/top-attestor-sets?n=10` — attestor sets ranked by
/// number of schemas bound to them. Uses the denormalised
/// `attestor_sets.schema_count` column (kept up to date by ingest),
/// so the query is a single index scan, not a GROUP BY over schemas.
/// Identifier here is a `las1...` (the attestor-set id), not an
/// address — same response shape, different identifier space. Clients
/// route to `/v1/attestor-sets/{id}` for detail.
pub async fn top_attestor_sets(
State(state): State<AppState>,
Query(params): Query<LeaderboardQuery>,
) -> Response {
let n = params.n.unwrap_or(10).min(100);
let key = format!("top-attestor-sets:{n}");
if let Some(cached) = state.stats_cache.get_fresh(&key, DEFAULT_TTL) {
return cached_json_response(cached, TTL_STATS_DEFAULT_SECS);
}
match compute_top_attestor_sets(&state.pg, n).await {
Ok(payload) => match serde_json::to_string(&payload) {
Ok(body) => {
state.stats_cache.put(key, body.clone());
cached_json_response(body, TTL_STATS_DEFAULT_SECS)
}
Err(e) => error_response(e.into()),
},
Err(e) => error_response(e),
}
}

async fn compute_top_attestor_sets(pool: &PgPool, n: u32) -> anyhow::Result<LeaderboardResponse> {
let rows: Vec<(String, i32)> = sqlx::query_as(
"SELECT id, schema_count \
FROM attestor_sets \
WHERE schema_count > 0 \
ORDER BY schema_count DESC, id ASC \
LIMIT $1",
)
.bind(n as i64)
.fetch_all(pool)
.await
.context("top-attestor-sets query")?;
Ok(LeaderboardResponse {
window: "all-time".to_string(),
rows: rows
.into_iter()
.enumerate()
.map(|(i, (address, count))| TopAddressRank {
rank: (i as u32) + 1,
address,
count: count as i64,
})
.collect(),
})
}
Loading