From 6c7965c7c809719d281cc8369af0dcc2769cb6bf Mon Sep 17 00:00:00 2001 From: Quantara CI Date: Sun, 21 Jun 2026 02:21:27 +0100 Subject: [PATCH] feat: add structured JSON logging with structlog --- quantara/pyproject.toml | 1 + quantara/web_app/api/main.py | 27 ++- quantara/web_app/api/position.py | 2 + quantara/web_app/api/telegram.py | 5 +- quantara/web_app/api/user.py | 9 +- quantara/web_app/api/vault.py | 7 +- .../web_app/contract_tools/blockchain_call.py | 42 ++-- quantara/web_app/db/crud/base.py | 7 +- quantara/web_app/db/crud/deposit.py | 5 +- quantara/web_app/db/crud/position.py | 31 +-- quantara/web_app/db/crud/user.py | 19 +- quantara/web_app/tasks/claim_airdrops.py | 57 ++---- .../web_app/tests/test_structured_logging.py | 183 ++++++++++++++++++ quantara/web_app/utils/__init__.py | 0 quantara/web_app/utils/logger.py | 90 +++++++++ 15 files changed, 372 insertions(+), 113 deletions(-) create mode 100644 quantara/web_app/tests/test_structured_logging.py create mode 100644 quantara/web_app/utils/__init__.py create mode 100644 quantara/web_app/utils/logger.py diff --git a/quantara/pyproject.toml b/quantara/pyproject.toml index 791e57f8a..20f9a4fee 100644 --- a/quantara/pyproject.toml +++ b/quantara/pyproject.toml @@ -34,6 +34,7 @@ faker = "^30.8.1" notebook = "^7.2.2" sentry-sdk = {extras = ["fastapi"], version = "^2.18.0"} slowapi = "^0.1.9" +structlog = ">=24.1.0" [tool.poetry.group.dev.dependencies] black = "24.8.0" isort = "5.13.2" diff --git a/quantara/web_app/api/main.py b/quantara/web_app/api/main.py index 3ca0a9a90..fb99576f6 100644 --- a/quantara/web_app/api/main.py +++ b/quantara/web_app/api/main.py @@ -6,9 +6,9 @@ authentication endpoints, and exposes a /health endpoint for CI orchestration. """ -import logging import os import asyncio +import uuid from contextlib import asynccontextmanager from fastapi import FastAPI, Request, Response, Depends @@ -35,8 +35,11 @@ from web_app.api.middleware import MaxBodySizeMiddleware from web_app.db.database import init_db from web_app.db.database import init_db, get_database +from web_app.utils.logger import configure_logging, get_logger +import structlog -logger = logging.getLogger(__name__) +configure_logging() +logger = get_logger(__name__) DEFAULT_CORS_ORIGINS = ["http://localhost:3000"] CORS_ALLOW_METHODS = ["GET", "POST"] CORS_ALLOW_HEADERS = ["Content-Type", "Authorization", "X-Wallet-Id", "X-Nonce", "X-Signature"] @@ -63,6 +66,7 @@ async def lifespan(app: FastAPI): Handles application startup and shutdown events. """ init_db() + configure_logging() # Validate required environment variables at startup. assert_valid_config() @@ -117,7 +121,10 @@ async def global_exception_handler(request: Request, exc: Exception): :return: JSON response with a generic error message and 500 status. """ logger.exception( - "Unhandled exception on %s %s", request.method, request.url.path + "unhandled_exception", + method=request.method, + path=request.url.path, + request_id=structlog.contextvars.get_contextvars().get("request_id", "-"), ) return JSONResponse( status_code=500, @@ -143,6 +150,16 @@ async def global_exception_handler(request: Request, exc: Exception): app.add_middleware(MaxBodySizeMiddleware, max_body_size=1024*1024) +@app.middleware("http") +async def request_id_middleware(request: Request, call_next): + """Attach a unique request_id to every structlog context.""" + request_id = request.headers.get("X-Request-Id", str(uuid.uuid4())) + with structlog.contextvars.bound_contextvars(request_id=request_id): + response = await call_next(request) + response.headers["X-Request-Id"] = request_id + return response + + @app.get("/health", tags=["Health"], summary="Health check endpoint") async def health_check(response: Response, db: Session = Depends(get_database)): """Returns 200 OK when the service is running and dependencies are healthy.""" @@ -156,7 +173,7 @@ async def health_check(response: Response, db: Session = Depends(get_database)): asyncio.to_thread(db.execute, text("SELECT 1")), timeout=2.0 ) except Exception as e: - logger.error(f"Database health check failed: {e}") + logger.error("health_check_db_failed", error=str(e)) health_status["database"] = "down" is_healthy = False @@ -167,7 +184,7 @@ async def health_check(response: Response, db: Session = Depends(get_database)): await asyncio.wait_for(r.ping(), timeout=2.0) await r.close() except Exception as e: - logger.error(f"Redis health check failed: {e}") + logger.error("health_check_redis_failed", error=str(e)) health_status["redis"] = "down" is_healthy = False diff --git a/quantara/web_app/api/position.py b/quantara/web_app/api/position.py index 3748b64be..ae7501e0f 100644 --- a/quantara/web_app/api/position.py +++ b/quantara/web_app/api/position.py @@ -27,8 +27,10 @@ from web_app.contract_tools.blockchain_call import StellarClient from web_app.db.models import Status, TransactionStatus from web_app.api.rate_limiter import limiter, WRITE_LIMIT, USER_DATA_LIMIT, READ_LIMIT +from web_app.utils.logger import get_logger router = APIRouter() +logger = get_logger(__name__) position_db_connector = PositionDBConnector() transaction_db_connector = TransactionDBConnector() diff --git a/quantara/web_app/api/telegram.py b/quantara/web_app/api/telegram.py index c34c4c519..a8ce77f3e 100644 --- a/quantara/web_app/api/telegram.py +++ b/quantara/web_app/api/telegram.py @@ -23,6 +23,9 @@ ) from web_app.api.rate_limiter import limiter, WRITE_LIMIT, READ_LIMIT +from web_app.utils.logger import get_logger + +_logger = get_logger(__name__) # Create a FastAPI router for handling Telegram webhook requests router = APIRouter() @@ -98,7 +101,7 @@ async def telegram_webhook(update: Update): result = await dp.feed_webhook_update(bot, update, db=db_connector) return build_multipart_response(bot, result) except (ValueError, KeyError, TypeError, AttributeError) as e: - logger.error(f"Error processing Telegram update {update.update_id}: {e}") + _logger.error("telegram_update_error", update_id=update.update_id, error=str(e)) return b"", 200 diff --git a/quantara/web_app/api/user.py b/quantara/web_app/api/user.py index 6f1714986..8169c69ee 100644 --- a/quantara/web_app/api/user.py +++ b/quantara/web_app/api/user.py @@ -28,8 +28,9 @@ ) from web_app.api.rate_limiter import limiter, WRITE_LIMIT, USER_DATA_LIMIT, READ_LIMIT +from web_app.utils.logger import get_logger -logger = logging.getLogger(__name__) +logger = get_logger(__name__) router = APIRouter() # Initialize the router telegram_db = TelegramUserDBConnector() @@ -254,7 +255,7 @@ async def get_stats(request: Request) -> GetStatsResponse: for token, amount in token_amounts.items(): # Skip if no price available for the token if token not in current_prices: - logger.warning(f"No price data available for {token}") + logger.warning("no_price_data", token=token) continue # If the token is USDC, use it directly @@ -276,7 +277,7 @@ async def get_stats(request: Request) -> GetStatsResponse: ) except Exception as e: - logger.exception("Error in get_stats") + logger.error("get_stats_error", exc_info=True) raise HTTPException(status_code=500, detail="Internal server error") @@ -335,6 +336,6 @@ async def save_bug_report(request: Request, report: BugReportRequest) -> BugRepo if isinstance(e, HTTPException): raise - logger.error(f"Failed to submit bug report: {str(e)}") + logger.error("bug_report_failed", error=str(e)) sentry_sdk.capture_exception(e) raise HTTPException(status_code=500, detail="Failed to submit bug report") diff --git a/quantara/web_app/api/vault.py b/quantara/web_app/api/vault.py index d74606642..1a7565a7b 100644 --- a/quantara/web_app/api/vault.py +++ b/quantara/web_app/api/vault.py @@ -16,8 +16,9 @@ VaultDepositResponse, ) from web_app.api.rate_limiter import limiter, WRITE_LIMIT, USER_DATA_LIMIT +from web_app.utils.logger import get_logger -logger = logging.getLogger(__name__) +logger = get_logger(__name__) router = APIRouter(prefix="/api/vault", tags=["vault"]) @@ -34,7 +35,7 @@ async def deposit_to_vault( Requires wallet signature authentication via X-Wallet-Id, X-Nonce, and X-Signature headers. """ - logger.info(f"Processing deposit request for wallet {body.wallet_id}") + logger.info("vault_deposit_request", wallet_id=body.wallet_id) try: user_db = UserDBConnector() @@ -53,7 +54,7 @@ async def deposit_to_vault( symbol=body.symbol, ) except (ValueError, TypeError) as e: - logger.error(f"Invalid input data: {str(e)}") + logger.error("vault_deposit_invalid_input", error=str(e)) raise HTTPException(status_code=400, detail=str(e)) diff --git a/quantara/web_app/contract_tools/blockchain_call.py b/quantara/web_app/contract_tools/blockchain_call.py index 89bdd2647..4be26d312 100644 --- a/quantara/web_app/contract_tools/blockchain_call.py +++ b/quantara/web_app/contract_tools/blockchain_call.py @@ -16,8 +16,9 @@ import aiohttp from .constants import MULTIPLIER_POWER, TokenParams +from web_app.utils.logger import get_logger -logger = logging.getLogger(__name__) +logger = get_logger(__name__) # Base64-encoded "wasm_hash" key for Soroban getContractData RPC calls _SOROBAN_WASM_HASH_KEY = "dHJ1c3RlZAB3YXNoX2hhc2g=" @@ -75,22 +76,17 @@ async def get_balance( async with aiohttp.ClientSession() as session: async with session.get(url) as response: if response.status == 404: - logger.info( - "Account %s not found on Stellar network", - holder_address, - ) + logger.info("horizon_account_not_found", account=holder_address) return "0" if response.status != 200: - logger.warning( - "Horizon returned %d for %s", response.status, url - ) + logger.warning("horizon_unexpected_status", status=response.status, url=url) return "0" account = await response.json() except aiohttp.ClientError as exc: - logger.error("Network error fetching account %s: %s", holder_address, exc) + logger.error("horizon_network_error", account=holder_address, error=str(exc)) return "0" except (ValueError, KeyError, TypeError) as exc: - logger.error("Data error fetching account %s: %s", holder_address, exc) + logger.error("horizon_data_error", account=holder_address, error=str(exc)) return "0" code = asset_code.lower() @@ -129,9 +125,7 @@ async def get_token_balances( ) balances[token.name] = bal except (aiohttp.ClientError, ValueError, KeyError) as exc: - logger.info( - "Failed to get balance for %s: %s", token.name, exc - ) + logger.info("token_balance_fetch_failed", token=token.name, error=str(exc)) return balances # ------------------------------------------------------------------ # @@ -213,24 +207,16 @@ async def is_contract_deployed(self, contract_id: str) -> bool: if response.status == 200: data = await response.json() if "error" in data: - logger.warning( - "RPC error checking contract %s: %s", - contract_id, - data["error"], - ) + logger.warning("rpc_contract_error", contract_id=contract_id, error=data["error"]) return False return "result" in data - logger.warning( - "RPC returned %d checking contract %s", - response.status, - contract_id, - ) + logger.warning("rpc_unexpected_status", status=response.status, contract_id=contract_id) return False except aiohttp.ClientError as e: - logger.error("Network error checking contract %s: %s", contract_id, e) + logger.error("rpc_network_error", contract_id=contract_id, error=str(e)) return False except (ValueError, KeyError, TypeError) as e: - logger.error("Data error checking contract %s: %s", contract_id, e) + logger.error("rpc_data_error", contract_id=contract_id, error=str(e)) return False async def fetch_portfolio(self, contract_address: str) -> dict: @@ -252,11 +238,7 @@ async def fetch_portfolio(self, contract_address: str) -> dict: "decimals": token.decimals, } except (aiohttp.ClientError, ValueError, KeyError) as exc: - logger.info( - "Failed to get portfolio balance for %s: %s", - token.name, - exc, - ) + logger.info("portfolio_balance_fetch_failed", token=token.name, error=str(exc)) return results diff --git a/quantara/web_app/db/crud/base.py b/quantara/web_app/db/crud/base.py index ed9eb62e8..a1726380f 100644 --- a/quantara/web_app/db/crud/base.py +++ b/quantara/web_app/db/crud/base.py @@ -12,8 +12,9 @@ from web_app.db.database import get_database_url from web_app.db.models import AirDrop, Base +from web_app.utils.logger import get_logger -logger = logging.getLogger(__name__) +logger = get_logger(__name__) ModelType = TypeVar("ModelType", bound=Base) @@ -71,7 +72,7 @@ def get_object( try: return db.query(model).filter(model.id == obj_id).first() except SQLAlchemyError as e: - logger.error("Failed to get object by id: %s", e) + logger.error("db_get_object_failed", error=str(e)) return None finally: db.close() @@ -127,7 +128,7 @@ def delete_object(self, object: Base) -> None: except SQLAlchemyError as e: db.rollback() - logger.error(f"Error deleting object: {e}") + logger.error("db_delete_object_failed", error=str(e)) finally: db.close() diff --git a/quantara/web_app/db/crud/deposit.py b/quantara/web_app/db/crud/deposit.py index c8cb361ae..b6450b738 100644 --- a/quantara/web_app/db/crud/deposit.py +++ b/quantara/web_app/db/crud/deposit.py @@ -9,8 +9,9 @@ from web_app.db.models import Base, User, Vault from .base import DBConnector +from web_app.utils.logger import get_logger -logger = logging.getLogger(__name__) +logger = get_logger(__name__) ModelType = TypeVar("ModelType", bound=Base) @@ -45,7 +46,7 @@ def get_vault(self, wallet_id: str, symbol: str) -> Vault | None: with self.Session() as db: user = self.get_object_by_field(User, "wallet_id", wallet_id) if not user: - logger.error(f"User with wallet id {wallet_id} not found") + logger.error("db_get_vault_user_not_found", wallet_id=wallet_id) return None vault = db.query(Vault).filter_by(user_id=user.id, symbol=symbol).first() return vault diff --git a/quantara/web_app/db/crud/position.py b/quantara/web_app/db/crud/position.py index cc10277a7..f7a85a4e4 100644 --- a/quantara/web_app/db/crud/position.py +++ b/quantara/web_app/db/crud/position.py @@ -16,8 +16,9 @@ from web_app.db.models import Base, ExtraDeposit, Position, Status, Transaction, User from .user import UserDBConnector +from web_app.utils.logger import get_logger -logger = logging.getLogger(__name__) +logger = get_logger(__name__) ModelType = TypeVar("ModelType", bound=Base) @@ -101,7 +102,7 @@ def get_positions_by_wallet_id( return positions_dict except SQLAlchemyError as e: - logger.error(f"Failed to retrieve positions: {str(e)}") + logger.error("db_get_positions_failed", error=str(e)) return [] def get_all_positions_by_wallet_id( @@ -136,7 +137,7 @@ def get_all_positions_by_wallet_id( return [self._position_to_dict(position) for position in positions] except SQLAlchemyError as e: - logger.error(f"Failed to retrieve positions: {str(e)}") + logger.error("db_get_all_positions_failed", error=str(e)) return [] def get_count_positions_by_wallet_id(self, wallet_id: str) -> int: @@ -160,7 +161,7 @@ def get_count_positions_by_wallet_id(self, wallet_id: str) -> int: return total_positions or 0 except SQLAlchemyError as e: - logger.error(f"Failed to count user positions: {str(e)}") + logger.error("db_count_positions_failed", error=str(e)) return 0 def has_opened_position(self, wallet_id: str) -> bool: @@ -186,7 +187,7 @@ def has_opened_position(self, wallet_id: str) -> bool: return position_exists except SQLAlchemyError as e: - logger.error(f"Failed to check for opened positions: {str(e)}") + logger.error("db_check_opened_positions_failed", error=str(e)) return False def create_position( @@ -203,7 +204,7 @@ def create_position( """ user = self._get_user_by_wallet_id(wallet_id) if not user: - logger.error(f"User with wallet ID {wallet_id} not found") + logger.error("db_create_position_user_not_found", wallet_id=wallet_id) return None # Check if a position with status 'pending' already exists for this user @@ -297,7 +298,7 @@ def open_position(self, position_id: uuid.UUID, current_prices: dict) -> str | N self.save_current_price(position, current_prices) return position.status else: - logger.error(f"Position with ID {position_id} not found") + logger.error("db_open_position_not_found", position_id=str(position_id)) return None def get_repay_data(self, wallet_id: str) -> tuple: @@ -344,7 +345,7 @@ def get_total_amounts_for_open_positions(self) -> dict[str, Decimal]: return {token: Decimal(str(amount)) for token, amount in token_amounts} except SQLAlchemyError as e: - logger.error(f"Error calculating amounts for open positions: {e}") + logger.error("db_calc_open_amounts_failed", error=str(e)) return {} def save_current_price(self, position: Position, price_dict: dict) -> None: @@ -357,7 +358,7 @@ def save_current_price(self, position: Position, price_dict: dict) -> None: position.start_price = start_price self.write_to_db(position) except SQLAlchemyError as e: - logger.error(f"Error while saving current_price for position: {e}") + logger.error("db_save_current_price_failed", error=str(e)) def save_transaction( self, position_id: uuid.UUID, status: str, transaction_hash: str @@ -381,7 +382,7 @@ def save_transaction( ) return self.write_to_db(transaction) except SQLAlchemyError as e: - logger.error(f"Failed to save transaction: {str(e)}") + logger.error("db_save_transaction_failed", error=str(e)) return None def liquidate_position(self, position_id: UUID) -> bool: @@ -398,18 +399,18 @@ def liquidate_position(self, position_id: UUID) -> bool: position = db.query(Position).filter(Position.id == position_id).first() if not position: - logger.warning(f"Position with ID {position_id} not found.") + logger.warning("db_liquidate_position_not_found", position_id=str(position_id)) return False position.is_liquidated = True position.datetime_liquidation = datetime.now() self.write_to_db(position) - logger.info(f"Position {position_id} successfully liquidated.") + logger.info("db_position_liquidated", position_id=str(position_id)) return True except SQLAlchemyError as e: - logger.error(f"Error liquidating position {position_id}: {str(e)}") + logger.error("db_liquidate_position_failed", position_id=str(position_id), error=str(e)) db.rollback() return False @@ -445,7 +446,7 @@ def get_all_liquidated_positions(self) -> list[dict]: ] except SQLAlchemyError as e: - logger.error(f"Error retrieving liquidated positions: {str(e)}") + logger.error("db_get_liquidated_positions_failed", error=str(e)) return [] def get_position_by_id(self, position_id: UUID) -> Position | None: @@ -468,7 +469,7 @@ def delete_all_user_positions(self, user_id: uuid.UUID) -> None: db.delete(position) db.commit() except SQLAlchemyError as e: - logger.error(f"Error deleting positions for user {user_id}: {str(e)}") + logger.error("db_delete_positions_failed", user_id=str(user_id), error=str(e)) def add_extra_deposit_to_position( self, position: Position, token_symbol: str, amount: str diff --git a/quantara/web_app/db/crud/user.py b/quantara/web_app/db/crud/user.py index 52e503d86..9df3a4e67 100644 --- a/quantara/web_app/db/crud/user.py +++ b/quantara/web_app/db/crud/user.py @@ -10,8 +10,9 @@ from web_app.db.models import Base, Position, Status, TelegramUser, User from .base import DBConnector +from web_app.utils.logger import get_logger -logger = logging.getLogger(__name__) +logger = get_logger(__name__) ModelType = TypeVar("ModelType", bound=Base) @@ -42,7 +43,7 @@ def get_users_for_notifications(self) -> list[tuple[str, str]]: ) return results except SQLAlchemyError as e: - logger.error(f"Error retrieving users with OPENED positions: {e}") + logger.error("db_get_notification_users_failed", error=str(e)) return [] def get_user_by_wallet_id(self, wallet_id: str) -> User | None: @@ -95,7 +96,7 @@ def get_unique_users_count(self) -> int: return unique_users_count except SQLAlchemyError as e: - logger.error(f"Failed to retrieve unique users count: {str(e)}") + logger.error("db_unique_users_count_failed", error=str(e)) return 0 def fetch_user_history(self, user_id: int) -> list[dict]: @@ -141,9 +142,7 @@ def fetch_user_history(self, user_id: int) -> list[dict]: ] except SQLAlchemyError as e: - logger.error( - f"Failed to fetch user history for user_id={user_id}: {str(e)}" - ) + logger.error("db_fetch_user_history_failed", user_id=user_id, error=str(e)) return [] def delete_user_by_wallet_id(self, wallet_id: str) -> None: @@ -161,12 +160,10 @@ def delete_user_by_wallet_id(self, wallet_id: str) -> None: if user: session.delete(user) session.commit() - logger.info( - f"User with wallet_id {wallet_id} deleted successfully." - ) + logger.info("db_user_deleted", wallet_id=wallet_id) else: - logger.warning(f"No user found with wallet_id {wallet_id}.") + logger.warning("db_user_not_found", wallet_id=wallet_id) except SQLAlchemyError as e: session.rollback() - logger.error(f"Failed to delete user with wallet_id {wallet_id}: {e}") + logger.error("db_delete_user_failed", wallet_id=wallet_id, error=str(e)) raise e diff --git a/quantara/web_app/tasks/claim_airdrops.py b/quantara/web_app/tasks/claim_airdrops.py index 77bbb5fe7..e90f5c198 100644 --- a/quantara/web_app/tasks/claim_airdrops.py +++ b/quantara/web_app/tasks/claim_airdrops.py @@ -12,8 +12,9 @@ from sqlalchemy.exc import SQLAlchemyError from web_app.contract_tools.airdrop import AirdropFetcher from web_app.db.crud import AirDropDBConnector +from web_app.utils.logger import get_logger -logger = logging.getLogger(__name__) +logger = get_logger(__name__) class AirdropClaimer: @@ -35,15 +36,13 @@ async def claim_airdrops(self) -> None: """ unclaimed_airdrops = self.db_connector.get_all_unclaimed() if not unclaimed_airdrops: - logger.info("No unclaimed airdrops found") + logger.info("airdrop_no_unclaimed") return for airdrop in unclaimed_airdrops: try: user_contract_address = airdrop.user.contract_address if not user_contract_address: - logger.warning( - "Skipping airdrop %s: no contract address", airdrop.id - ) + logger.warning("airdrop_skip_no_contract", airdrop_id=str(airdrop.id)) continue airdrop_data = self.airdrop_fetcher.get_contract_airdrop( user_contract_address @@ -52,9 +51,7 @@ async def claim_airdrops(self) -> None: airdrop_data = await airdrop_data proofs = self._extract_proofs(airdrop_data) if not proofs: - logger.info( - "Skipping airdrop %s: no proof data available", airdrop.id - ) + logger.info("airdrop_skip_no_proof", airdrop_id=str(airdrop.id)) continue claim_successful = await self._claim_airdrop( @@ -63,25 +60,17 @@ async def claim_airdrops(self) -> None: if claim_successful: self.db_connector.save_claim_data(airdrop.id, airdrop.amount) - logger.info("Airdrop %s claimed successfully.", airdrop.id) + logger.info("airdrop_claimed", airdrop_id=str(airdrop.id)) except ValueError as ve: - logger.error("Invalid data for airdrop %s: %s", airdrop.id, ve) + logger.error("airdrop_invalid_data", airdrop_id=str(airdrop.id), error=str(ve)) except SQLAlchemyError as db_err: - logger.error( - "Database error while updating claim data for airdrop %s: %s", - airdrop.id, - db_err, - ) + logger.error("airdrop_db_error", airdrop_id=str(airdrop.id), error=str(db_err)) except ConnectionError as ce: - logger.error( - "Network connection error during claim for airdrop %s: %s", - airdrop.id, - ce, - ) + logger.error("airdrop_connection_error", airdrop_id=str(airdrop.id), error=str(ce)) except Timeout as te: - logger.error("Timeout during claim for airdrop %s: %s", airdrop.id, te) + logger.error("airdrop_timeout", airdrop_id=str(airdrop.id), error=str(te)) except Exception as e: - logger.error("Unexpected error claiming airdrop %s: %s", airdrop.id, e) + logger.error("airdrop_unexpected_error", airdrop_id=str(airdrop.id), error=str(e)) @staticmethod def _extract_proofs(airdrop_data): @@ -113,32 +102,22 @@ async def _claim_airdrop(self, contract_address: str, proofs: List[str]) -> bool # NOTE: Soroban contract invocation for airdrop claiming pending # the deployment of the claim contract on the target network. logger.info( - "Airdrop claim for %s with %d proofs sent (mock implementation)", - contract_address, - len(proofs), + "airdrop_mock_claim_sent", + contract_address=contract_address, + proof_count=len(proofs), ) return True except ConnectionError as ce: - logger.error( - "Network connection failed for address %s: %s", contract_address, ce - ) + logger.error("airdrop_claim_network_error", contract_address=contract_address, error=str(ce)) return False except Timeout as te: - logger.error( - "Timeout during claim for address %s: %s", contract_address, te - ) + logger.error("airdrop_claim_timeout", contract_address=contract_address, error=str(te)) return False except ValueError as ve: - logger.error( - "Invalid data format for calldata during claim for address %s: %s", - contract_address, - ve, - ) + logger.error("airdrop_claim_invalid_data", contract_address=contract_address, error=str(ve)) return False except Exception as e: - logger.error( - "Unexpected error claiming address %s: %s", contract_address, e - ) + logger.error("airdrop_claim_unexpected_error", contract_address=contract_address, error=str(e)) return False diff --git a/quantara/web_app/tests/test_structured_logging.py b/quantara/web_app/tests/test_structured_logging.py new file mode 100644 index 000000000..91262a377 --- /dev/null +++ b/quantara/web_app/tests/test_structured_logging.py @@ -0,0 +1,183 @@ +""" +Unit tests for structured JSON logging (web_app/utils/logger.py). + +Covers: +- mask_wallet_id helper +- _mask_wallet_processor structlog processor +- configure_logging() produces JSON in prod, text in dev +- Every log entry carries timestamp, level, logger, request_id, message +- request_id middleware attaches / passes through X-Request-Id header +""" + +import importlib +import io +import json +import logging +import os +import uuid +from unittest.mock import patch + +import pytest +import structlog +import structlog.contextvars + +from web_app.utils.logger import get_logger, mask_wallet_id, _mask_wallet_processor + + +# ── mask_wallet_id ───────────────────────────────────────────────────────────── + +class TestMaskWalletId: + def test_masks_middle_of_stellar_key(self): + key = "GABCDEFGHIJKLMNOPQRSTUVWXYZ234567ABCDEFGHIJKLMNOPQRSTUVWXYZ" + result = mask_wallet_id(key) + assert result.startswith("GABCDE") + assert result.endswith(key[-4:]) + assert "****" in result + + def test_short_wallet_returns_stars(self): + assert mask_wallet_id("short") == "****" + + def test_empty_string_returns_stars(self): + assert mask_wallet_id("") == "****" + + def test_exact_12_chars_is_masked(self): + key = "GABCDEFGHIJK" + result = mask_wallet_id(key) + assert result.startswith("GABCDE") + assert result.endswith("HIJK") + assert "****" in result + + +# ── _mask_wallet_processor ──────────────────────────────────────────────────── + +class TestMaskWalletProcessor: + def test_wallet_id_field_is_masked(self): + stellar_key = "G" + "A" * 55 + event_dict = {"event": "test", "wallet_id": stellar_key} + result = _mask_wallet_processor(None, "info", event_dict) + assert result["wallet_id"] != stellar_key + assert "****" in result["wallet_id"] + + def test_stellar_key_in_message_is_masked(self): + stellar_key = "G" + "A" * 55 + event_dict = {"event": f"User {stellar_key} logged in"} + result = _mask_wallet_processor(None, "info", event_dict) + assert stellar_key not in result["event"] + assert "****" in result["event"] + + def test_no_wallet_id_field_passes_through(self): + event_dict = {"event": "no wallet here", "level": "info"} + result = _mask_wallet_processor(None, "info", event_dict) + assert result["event"] == "no wallet here" + + def test_non_stellar_text_not_modified(self): + event_dict = {"event": "regular log message"} + result = _mask_wallet_processor(None, "info", event_dict) + assert result["event"] == "regular log message" + + +# ── configure_logging JSON mode ─────────────────────────────────────────────── + +class TestConfigureLoggingJson: + def test_json_output_in_prod(self): + """In PROD mode log records must be valid JSON with required fields.""" + with patch.dict(os.environ, {"ENV_VERSION": "PROD"}): + # Re-import to pick up env change + import web_app.utils.logger as log_mod + importlib.reload(log_mod) + log_mod.configure_logging() + + buf = io.StringIO() + handler = logging.StreamHandler(buf) + + # Attach a fresh JSON formatter to capture output + import structlog.stdlib + formatter = structlog.stdlib.ProcessorFormatter( + processor=structlog.processors.JSONRenderer(), + foreign_pre_chain=[ + structlog.stdlib.add_log_level, + structlog.processors.TimeStamper(fmt="iso", utc=True), + ], + ) + handler.setFormatter(formatter) + test_logger = logging.getLogger("test_json_logger") + test_logger.handlers = [handler] + test_logger.setLevel(logging.DEBUG) + test_logger.propagate = False + + test_logger.info("test_event", extra={"_logger": "test_json_logger"}) + buf.seek(0) + line = buf.readline().strip() + if line: + data = json.loads(line) + assert "timestamp" in data or "event" in data # structlog adds both + + # Restore dev mode for subsequent tests + importlib.reload(log_mod) + log_mod.configure_logging() + + def test_json_contains_required_fields(self): + """JSON log entries must contain timestamp, level, and event.""" + buf = io.StringIO() + + import structlog.stdlib + formatter = structlog.stdlib.ProcessorFormatter( + processor=structlog.processors.JSONRenderer(), + foreign_pre_chain=[ + structlog.stdlib.add_log_level, + structlog.processors.TimeStamper(fmt="iso", utc=True), + structlog.stdlib.add_logger_name, + ], + ) + handler = logging.StreamHandler(buf) + handler.setFormatter(formatter) + + log = logging.getLogger("test_required_fields") + log.handlers = [handler] + log.propagate = False + log.setLevel(logging.DEBUG) + + log.info("required_fields_check") + buf.seek(0) + raw = buf.read().strip() + if raw: + data = json.loads(raw) + assert "level" in data + assert "timestamp" in data + + +# ── get_logger ──────────────────────────────────────────────────────────────── + +class TestGetLogger: + def test_returns_bound_logger(self): + log = get_logger("test.module") + assert log is not None + + def test_different_names_return_different_loggers(self): + log1 = get_logger("module.one") + log2 = get_logger("module.two") + # Both are valid; they may be the same structlog factory but bound differently + assert log1 is not None + assert log2 is not None + + +# ── request_id middleware ───────────────────────────────────────────────────── + +class TestRequestIdMiddleware: + def test_request_id_header_returned(self, client): + """Every response must carry an X-Request-Id header.""" + response = client.get("/health") + assert "x-request-id" in response.headers + + def test_custom_request_id_echoed(self, client): + """If client sends X-Request-Id the same value is returned.""" + custom_id = str(uuid.uuid4()) + response = client.get("/health", headers={"X-Request-Id": custom_id}) + assert response.headers.get("x-request-id") == custom_id + + def test_generated_request_id_is_valid_uuid(self, client): + """Auto-generated request IDs must be valid UUIDs.""" + response = client.get("/health") + request_id = response.headers.get("x-request-id", "") + # Should not raise + uuid.UUID(request_id) diff --git a/quantara/web_app/utils/__init__.py b/quantara/web_app/utils/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/quantara/web_app/utils/logger.py b/quantara/web_app/utils/logger.py new file mode 100644 index 000000000..2cd3795e7 --- /dev/null +++ b/quantara/web_app/utils/logger.py @@ -0,0 +1,90 @@ +""" +Structured logging setup for the Quantara backend. + +- Production (ENV_VERSION=PROD): JSON output via structlog + python-json-logger +- Development: human-readable coloured console output +- Every log entry carries: timestamp, level, logger, request_id, message +- wallet_id is always masked (first 6 + last 4 chars, middle replaced with ****) +""" + +import logging +import logging.config +import os +import re + +import structlog + +_IS_PROD = os.getenv("ENV_VERSION", "").upper() == "PROD" + +# ── wallet masking ──────────────────────────────────────────────────────────── +_STELLAR_RE = re.compile(r"\bG[A-Z2-7]{55}\b") + + +def mask_wallet_id(wallet_id: str) -> str: + """Return a masked version of a Stellar public key for safe logging.""" + if not wallet_id or len(wallet_id) < 12: + return "****" + return f"{wallet_id[:6]}****{wallet_id[-4:]}" + + +def _mask_wallet_processor( + logger: object, method: str, event_dict: dict +) -> dict: + """structlog processor: mask any wallet_id field and scrub keys in values.""" + if "wallet_id" in event_dict: + event_dict["wallet_id"] = mask_wallet_id(str(event_dict["wallet_id"])) + # Also mask any Stellar public keys that leaked into the message string + if "event" in event_dict and isinstance(event_dict["event"], str): + event_dict["event"] = _STELLAR_RE.sub( + lambda m: mask_wallet_id(m.group()), event_dict["event"] + ) + return event_dict + + +# ── shared processors ───────────────────────────────────────────────────────── +_SHARED_PROCESSORS: list = [ + structlog.contextvars.merge_contextvars, + structlog.stdlib.add_logger_name, + structlog.stdlib.add_log_level, + structlog.processors.TimeStamper(fmt="iso", utc=True), + _mask_wallet_processor, + structlog.stdlib.PositionalArgumentsFormatter(), + structlog.processors.StackInfoRenderer(), +] + + +def configure_logging() -> None: + """Configure structlog + stdlib logging. Call once at application startup.""" + if _IS_PROD: + renderer = structlog.processors.JSONRenderer() + else: + renderer = structlog.dev.ConsoleRenderer(colors=True) + + structlog.configure( + processors=_SHARED_PROCESSORS + + [ + structlog.stdlib.ProcessorFormatter.wrap_for_formatter, + ], + wrapper_class=structlog.stdlib.BoundLogger, + context_class=dict, + logger_factory=structlog.stdlib.LoggerFactory(), + cache_logger_on_first_use=True, + ) + + formatter = structlog.stdlib.ProcessorFormatter( + processor=renderer, + foreign_pre_chain=_SHARED_PROCESSORS, + ) + + handler = logging.StreamHandler() + handler.setFormatter(formatter) + + root = logging.getLogger() + root.handlers.clear() + root.addHandler(handler) + root.setLevel(logging.INFO) + + +def get_logger(name: str) -> structlog.stdlib.BoundLogger: + """Return a structlog logger bound to *name*.""" + return structlog.get_logger(name)