Skip to content
1,284 changes: 27 additions & 1,257 deletions backend/src/services.rs

Large diffs are not rendered by default.

143 changes: 143 additions & 0 deletions backend/src/services/api_key.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
use async_trait::async_trait;
use sqlx::PgPool;
use uuid::Uuid;
use sha2::{Sha256, Digest};
use rand::Rng;
use crate::database::ApiKeyRepository;
use crate::models::{ApiKey, NewApiKey, ApiKeyTier};

pub struct ApiKeyService {
pub(crate) pool: PgPool,
}

impl ApiKeyService {
pub fn new(pool: PgPool) -> Self {
Self { pool }
}

/// SHA-256 hash of an API key. API keys are long random strings with sufficient
/// entropy that bcrypt's computational cost is unnecessary and harmful to throughput.
pub fn hash_api_key(api_key: &str) -> String {
let mut hasher = Sha256::new();
hasher.update(api_key.as_bytes());
hex::encode(hasher.finalize())
}

/// Generates a cryptographically secure API key: `cl_` prefix + 64 hex chars (256 bits entropy).
pub fn generate_api_key() -> String {
let bytes: [u8; 32] = rand::thread_rng().gen();
format!("cl_{}", hex::encode(bytes))
}

pub async fn disable_inactive_keys(&self, inactive_days: i64) -> Result<u64, sqlx::Error> {
let result = sqlx::query!(
r#"
UPDATE api_keys
SET is_active = false
WHERE is_active = true
AND last_used_at IS NOT NULL
AND last_used_at < NOW() - INTERVAL '1 day' * $1
"#,
inactive_days
)
.execute(&self.pool)
.await?;
Ok(result.rows_affected())
}
}

#[async_trait]
impl ApiKeyRepository for ApiKeyService {
async fn create_api_key(&self, api_key: NewApiKey) -> Result<ApiKey, sqlx::Error> {
sqlx::query_as!(
ApiKey,
r#"
INSERT INTO api_keys (user_id, key_hash, name, tier, rate_limit_per_minute, expires_at)
VALUES ($1, $2, $3, $4, $5, $6)
RETURNING *
"#,
api_key.user_id,
api_key.key_hash,
api_key.name,
api_key.tier as ApiKeyTier,
api_key.rate_limit_per_minute,
api_key.expires_at
)
.fetch_one(&self.pool)
.await
}

async fn get_api_key(&self, id: Uuid) -> Result<Option<ApiKey>, sqlx::Error> {
sqlx::query_as!(
ApiKey,
"SELECT * FROM api_keys WHERE id = $1",
id
)
.fetch_optional(&self.pool)
.await
}

async fn get_api_key_by_hash(&self, key_hash: &str) -> Result<Option<ApiKey>, sqlx::Error> {
sqlx::query_as!(
ApiKey,
"SELECT * FROM api_keys WHERE key_hash = $1 AND is_active = true",
key_hash
)
.fetch_optional(&self.pool)
.await
}

async fn list_api_keys(&self, user_id: Uuid) -> Result<Vec<ApiKey>, sqlx::Error> {
sqlx::query_as!(
ApiKey,
"SELECT * FROM api_keys WHERE user_id = $1 ORDER BY created_at DESC",
user_id
)
.fetch_all(&self.pool)
.await
}

async fn update_api_key(&self, id: Uuid, api_key: ApiKey) -> Result<ApiKey, sqlx::Error> {
sqlx::query_as!(
ApiKey,
r#"
UPDATE api_keys SET
name = $2,
tier = $3,
rate_limit_per_minute = $4,
is_active = $5,
expires_at = $6
WHERE id = $1
RETURNING *
"#,
id,
api_key.name,
api_key.tier as ApiKeyTier,
api_key.rate_limit_per_minute,
api_key.is_active,
api_key.expires_at
)
.fetch_one(&self.pool)
.await
}

async fn update_last_used(&self, id: Uuid) -> Result<(), sqlx::Error> {
sqlx::query!(
"UPDATE api_keys SET last_used_at = NOW() WHERE id = $1",
id
)
.execute(&self.pool)
.await?;
Ok(())
}

async fn revoke_api_key(&self, id: Uuid) -> Result<(), sqlx::Error> {
sqlx::query!(
"UPDATE api_keys SET is_active = false WHERE id = $1",
id
)
.execute(&self.pool)
.await?;
Ok(())
}
}
169 changes: 169 additions & 0 deletions backend/src/services/event.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
use async_trait::async_trait;
use sqlx::PgPool;
use redis::AsyncCommands;
use crate::database::{EventRepository, GlobalStats};
use crate::models::{TrackingEvent, NewTrackingEvent, ProductStats, AppError};

pub struct EventService {
pub(crate) pool: PgPool,
pub(crate) redis_client: redis::Client,
}

impl EventService {
pub fn new(pool: PgPool, redis_client: redis::Client) -> Self {
Self { pool, redis_client }
}

pub async fn invalidate_global_stats(&self) -> Result<(), AppError> {
if let Ok(mut conn) = self.redis_client.get_multiplexed_tokio_connection().await {
let _: Result<(), _> = conn.del("cache:global_stats").await;
}
Ok(())
}
}

#[async_trait]
impl EventRepository for EventService {
async fn create_event(&self, event: NewTrackingEvent) -> Result<TrackingEvent, sqlx::Error> {
let created = sqlx::query_as!(
TrackingEvent,
r#"
INSERT INTO tracking_events (
product_id, actor_address, timestamp, event_type,
location, data_hash, note, metadata
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
RETURNING *
"#,
event.product_id,
event.actor_address,
event.timestamp,
event.event_type,
event.location,
event.data_hash,
event.note,
event.metadata
)
.fetch_one(&self.pool)
.await?;

let _ = self.invalidate_global_stats().await;

Ok(created)
}

async fn get_event(&self, id: i64) -> Result<Option<TrackingEvent>, sqlx::Error> {
sqlx::query_as!(
TrackingEvent,
"SELECT * FROM tracking_events WHERE id = $1",
id
)
.fetch_optional(&self.pool)
.await
}

async fn list_events_by_product(
&self,
product_id: &str,
offset: i64,
limit: i64,
) -> Result<Vec<TrackingEvent>, sqlx::Error> {
sqlx::query_as!(
TrackingEvent,
"SELECT * FROM tracking_events WHERE product_id = $1 ORDER BY timestamp DESC LIMIT $2 OFFSET $3",
product_id,
limit,
offset
)
.fetch_all(&self.pool)
.await
}

async fn count_events_by_product(&self, product_id: &str) -> Result<i64, sqlx::Error> {
sqlx::query_scalar!(
"SELECT COUNT(*) FROM tracking_events WHERE product_id = $1",
product_id
)
.fetch_one(&self.pool)
.await
.unwrap_or(0)
}

async fn list_events_by_type(
&self,
product_id: &str,
event_type: &str,
offset: i64,
limit: i64,
) -> Result<Vec<TrackingEvent>, sqlx::Error> {
sqlx::query_as!(
TrackingEvent,
"SELECT * FROM tracking_events WHERE product_id = $1 AND event_type = $2 ORDER BY timestamp DESC LIMIT $3 OFFSET $4",
product_id,
event_type,
limit,
offset
)
.fetch_all(&self.pool)
.await
}

async fn get_product_stats(&self, product_id: &str) -> Result<Option<ProductStats>, sqlx::Error> {
sqlx::query_as!(
ProductStats,
r#"
SELECT
p.id as product_id,
(SELECT COUNT(*) FROM tracking_events WHERE product_id = p.id) as event_count,
p.is_active,
(SELECT MAX(timestamp) FROM tracking_events WHERE product_id = p.id) as last_event_at,
(SELECT event_type FROM tracking_events WHERE product_id = p.id ORDER BY timestamp DESC LIMIT 1) as last_event_type
FROM products p
WHERE p.id = $1
"#,
product_id
)
.fetch_optional(&self.pool)
.await
}

async fn get_global_stats(&self) -> Result<GlobalStats, sqlx::Error> {
let cache_key = "cache:global_stats";

if let Ok(mut conn) = self.redis_client.get_multiplexed_tokio_connection().await {
if let Ok(cached) = conn.get::<_, String>(cache_key).await {
if let Ok(stats) = serde_json::from_str::<GlobalStats>(&cached) {
return Ok(stats);
}
}
}

let stats = sqlx::query!(
r#"
SELECT
(SELECT COUNT(*) FROM products) as total_products,
(SELECT COUNT(*) FROM products WHERE is_active = true) as active_products,
(SELECT COUNT(*) FROM tracking_events) as total_events,
(SELECT COUNT(*) FROM users) as total_users,
(SELECT COUNT(*) FROM api_keys WHERE is_active = true) as active_api_keys
"#
)
.fetch_one(&self.pool)
.await?;

let global_stats = GlobalStats {
total_products: stats.total_products.unwrap_or(0),
active_products: stats.active_products.unwrap_or(0),
total_events: stats.total_events.unwrap_or(0),
total_users: stats.total_users.unwrap_or(0),
active_api_keys: stats.active_api_keys.unwrap_or(0),
};

if let Ok(mut conn) = self.redis_client.get_multiplexed_tokio_connection().await {
if let Ok(serialized) = serde_json::to_string(&global_stats) {
let _: Result<(), _> = conn.set_ex(cache_key, serialized, 300).await;
}
}

Ok(global_stats)
}
}
Loading
Loading