Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions quantara/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ faker = "^30.8.1"
notebook = "^7.2.2"
sentry-sdk = {extras = ["fastapi"], version = "^2.18.0"}
slowapi = "^0.1.9"
structlog = ">=24.1.0"
[tool.poetry.group.dev.dependencies]
black = "24.8.0"
isort = "5.13.2"
Expand Down
27 changes: 22 additions & 5 deletions quantara/web_app/api/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
authentication endpoints, and exposes a /health endpoint for CI orchestration.
"""

import logging
import os
import asyncio
import uuid
from contextlib import asynccontextmanager

from fastapi import FastAPI, Request, Response, Depends
Expand All @@ -35,8 +35,11 @@
from web_app.api.middleware import MaxBodySizeMiddleware
from web_app.db.database import init_db
from web_app.db.database import init_db, get_database
from web_app.utils.logger import configure_logging, get_logger
import structlog

logger = logging.getLogger(__name__)
configure_logging()
logger = get_logger(__name__)
DEFAULT_CORS_ORIGINS = ["http://localhost:3000"]
CORS_ALLOW_METHODS = ["GET", "POST"]
CORS_ALLOW_HEADERS = ["Content-Type", "Authorization", "X-Wallet-Id", "X-Nonce", "X-Signature"]
Expand All @@ -63,6 +66,7 @@ async def lifespan(app: FastAPI):
Handles application startup and shutdown events.
"""
init_db()
configure_logging()

# Validate required environment variables at startup.
assert_valid_config()
Expand Down Expand Up @@ -117,7 +121,10 @@ async def global_exception_handler(request: Request, exc: Exception):
:return: JSON response with a generic error message and 500 status.
"""
logger.exception(
"Unhandled exception on %s %s", request.method, request.url.path
"unhandled_exception",
method=request.method,
path=request.url.path,
request_id=structlog.contextvars.get_contextvars().get("request_id", "-"),
)
return JSONResponse(
status_code=500,
Expand All @@ -143,6 +150,16 @@ async def global_exception_handler(request: Request, exc: Exception):
app.add_middleware(MaxBodySizeMiddleware, max_body_size=1024*1024)


@app.middleware("http")
async def request_id_middleware(request: Request, call_next):
"""Attach a unique request_id to every structlog context."""
request_id = request.headers.get("X-Request-Id", str(uuid.uuid4()))
with structlog.contextvars.bound_contextvars(request_id=request_id):
response = await call_next(request)
response.headers["X-Request-Id"] = request_id
return response


@app.get("/health", tags=["Health"], summary="Health check endpoint")
async def health_check(response: Response, db: Session = Depends(get_database)):
"""Returns 200 OK when the service is running and dependencies are healthy."""
Expand All @@ -156,7 +173,7 @@ async def health_check(response: Response, db: Session = Depends(get_database)):
asyncio.to_thread(db.execute, text("SELECT 1")), timeout=2.0
)
except Exception as e:
logger.error(f"Database health check failed: {e}")
logger.error("health_check_db_failed", error=str(e))
health_status["database"] = "down"
is_healthy = False

Expand All @@ -167,7 +184,7 @@ async def health_check(response: Response, db: Session = Depends(get_database)):
await asyncio.wait_for(r.ping(), timeout=2.0)
await r.close()
except Exception as e:
logger.error(f"Redis health check failed: {e}")
logger.error("health_check_redis_failed", error=str(e))
health_status["redis"] = "down"
is_healthy = False

Expand Down
2 changes: 2 additions & 0 deletions quantara/web_app/api/position.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,10 @@
from web_app.contract_tools.blockchain_call import StellarClient
from web_app.db.models import Status, TransactionStatus
from web_app.api.rate_limiter import limiter, WRITE_LIMIT, USER_DATA_LIMIT, READ_LIMIT
from web_app.utils.logger import get_logger

router = APIRouter()
logger = get_logger(__name__)
position_db_connector = PositionDBConnector()
transaction_db_connector = TransactionDBConnector()

Expand Down
5 changes: 4 additions & 1 deletion quantara/web_app/api/telegram.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@
)

from web_app.api.rate_limiter import limiter, WRITE_LIMIT, READ_LIMIT
from web_app.utils.logger import get_logger

_logger = get_logger(__name__)

# Create a FastAPI router for handling Telegram webhook requests
router = APIRouter()
Expand Down Expand Up @@ -98,7 +101,7 @@ async def telegram_webhook(update: Update):
result = await dp.feed_webhook_update(bot, update, db=db_connector)
return build_multipart_response(bot, result)
except (ValueError, KeyError, TypeError, AttributeError) as e:
logger.error(f"Error processing Telegram update {update.update_id}: {e}")
_logger.error("telegram_update_error", update_id=update.update_id, error=str(e))
return b"", 200


Expand Down
9 changes: 5 additions & 4 deletions quantara/web_app/api/user.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@
)

from web_app.api.rate_limiter import limiter, WRITE_LIMIT, USER_DATA_LIMIT, READ_LIMIT
from web_app.utils.logger import get_logger

logger = logging.getLogger(__name__)
logger = get_logger(__name__)
router = APIRouter() # Initialize the router
telegram_db = TelegramUserDBConnector()

Expand Down Expand Up @@ -254,7 +255,7 @@ async def get_stats(request: Request) -> GetStatsResponse:
for token, amount in token_amounts.items():
# Skip if no price available for the token
if token not in current_prices:
logger.warning(f"No price data available for {token}")
logger.warning("no_price_data", token=token)
continue

# If the token is USDC, use it directly
Expand All @@ -276,7 +277,7 @@ async def get_stats(request: Request) -> GetStatsResponse:
)

except Exception as e:
logger.exception("Error in get_stats")
logger.error("get_stats_error", exc_info=True)
raise HTTPException(status_code=500, detail="Internal server error")


Expand Down Expand Up @@ -335,6 +336,6 @@ async def save_bug_report(request: Request, report: BugReportRequest) -> BugRepo
if isinstance(e, HTTPException):
raise

logger.error(f"Failed to submit bug report: {str(e)}")
logger.error("bug_report_failed", error=str(e))
sentry_sdk.capture_exception(e)
raise HTTPException(status_code=500, detail="Failed to submit bug report")
7 changes: 4 additions & 3 deletions quantara/web_app/api/vault.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@
VaultDepositResponse,
)
from web_app.api.rate_limiter import limiter, WRITE_LIMIT, USER_DATA_LIMIT
from web_app.utils.logger import get_logger

logger = logging.getLogger(__name__)
logger = get_logger(__name__)
router = APIRouter(prefix="/api/vault", tags=["vault"])


Expand All @@ -34,7 +35,7 @@ async def deposit_to_vault(

Requires wallet signature authentication via X-Wallet-Id, X-Nonce, and X-Signature headers.
"""
logger.info(f"Processing deposit request for wallet {body.wallet_id}")
logger.info("vault_deposit_request", wallet_id=body.wallet_id)

try:
user_db = UserDBConnector()
Expand All @@ -53,7 +54,7 @@ async def deposit_to_vault(
symbol=body.symbol,
)
except (ValueError, TypeError) as e:
logger.error(f"Invalid input data: {str(e)}")
logger.error("vault_deposit_invalid_input", error=str(e))
raise HTTPException(status_code=400, detail=str(e))


Expand Down
42 changes: 12 additions & 30 deletions quantara/web_app/contract_tools/blockchain_call.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@
import aiohttp

from .constants import MULTIPLIER_POWER, TokenParams
from web_app.utils.logger import get_logger

logger = logging.getLogger(__name__)
logger = get_logger(__name__)

# Base64-encoded "wasm_hash" key for Soroban getContractData RPC calls
_SOROBAN_WASM_HASH_KEY = "dHJ1c3RlZAB3YXNoX2hhc2g="
Expand Down Expand Up @@ -75,22 +76,17 @@ async def get_balance(
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
if response.status == 404:
logger.info(
"Account %s not found on Stellar network",
holder_address,
)
logger.info("horizon_account_not_found", account=holder_address)
return "0"
if response.status != 200:
logger.warning(
"Horizon returned %d for %s", response.status, url
)
logger.warning("horizon_unexpected_status", status=response.status, url=url)
return "0"
account = await response.json()
except aiohttp.ClientError as exc:
logger.error("Network error fetching account %s: %s", holder_address, exc)
logger.error("horizon_network_error", account=holder_address, error=str(exc))
return "0"
except (ValueError, KeyError, TypeError) as exc:
logger.error("Data error fetching account %s: %s", holder_address, exc)
logger.error("horizon_data_error", account=holder_address, error=str(exc))
return "0"

code = asset_code.lower()
Expand Down Expand Up @@ -129,9 +125,7 @@ async def get_token_balances(
)
balances[token.name] = bal
except (aiohttp.ClientError, ValueError, KeyError) as exc:
logger.info(
"Failed to get balance for %s: %s", token.name, exc
)
logger.info("token_balance_fetch_failed", token=token.name, error=str(exc))
return balances

# ------------------------------------------------------------------ #
Expand Down Expand Up @@ -213,24 +207,16 @@ async def is_contract_deployed(self, contract_id: str) -> bool:
if response.status == 200:
data = await response.json()
if "error" in data:
logger.warning(
"RPC error checking contract %s: %s",
contract_id,
data["error"],
)
logger.warning("rpc_contract_error", contract_id=contract_id, error=data["error"])
return False
return "result" in data
logger.warning(
"RPC returned %d checking contract %s",
response.status,
contract_id,
)
logger.warning("rpc_unexpected_status", status=response.status, contract_id=contract_id)
return False
except aiohttp.ClientError as e:
logger.error("Network error checking contract %s: %s", contract_id, e)
logger.error("rpc_network_error", contract_id=contract_id, error=str(e))
return False
except (ValueError, KeyError, TypeError) as e:
logger.error("Data error checking contract %s: %s", contract_id, e)
logger.error("rpc_data_error", contract_id=contract_id, error=str(e))
return False

async def fetch_portfolio(self, contract_address: str) -> dict:
Expand All @@ -252,11 +238,7 @@ async def fetch_portfolio(self, contract_address: str) -> dict:
"decimals": token.decimals,
}
except (aiohttp.ClientError, ValueError, KeyError) as exc:
logger.info(
"Failed to get portfolio balance for %s: %s",
token.name,
exc,
)
logger.info("portfolio_balance_fetch_failed", token=token.name, error=str(exc))
return results


Expand Down
7 changes: 4 additions & 3 deletions quantara/web_app/db/crud/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@

from web_app.db.database import get_database_url
from web_app.db.models import AirDrop, Base
from web_app.utils.logger import get_logger

logger = logging.getLogger(__name__)
logger = get_logger(__name__)
ModelType = TypeVar("ModelType", bound=Base)


Expand Down Expand Up @@ -71,7 +72,7 @@ def get_object(
try:
return db.query(model).filter(model.id == obj_id).first()
except SQLAlchemyError as e:
logger.error("Failed to get object by id: %s", e)
logger.error("db_get_object_failed", error=str(e))
return None
finally:
db.close()
Expand Down Expand Up @@ -127,7 +128,7 @@ def delete_object(self, object: Base) -> None:

except SQLAlchemyError as e:
db.rollback()
logger.error(f"Error deleting object: {e}")
logger.error("db_delete_object_failed", error=str(e))

finally:
db.close()
Expand Down
5 changes: 3 additions & 2 deletions quantara/web_app/db/crud/deposit.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@
from web_app.db.models import Base, User, Vault

from .base import DBConnector
from web_app.utils.logger import get_logger

logger = logging.getLogger(__name__)
logger = get_logger(__name__)
ModelType = TypeVar("ModelType", bound=Base)


Expand Down Expand Up @@ -45,7 +46,7 @@ def get_vault(self, wallet_id: str, symbol: str) -> Vault | None:
with self.Session() as db:
user = self.get_object_by_field(User, "wallet_id", wallet_id)
if not user:
logger.error(f"User with wallet id {wallet_id} not found")
logger.error("db_get_vault_user_not_found", wallet_id=wallet_id)
return None
vault = db.query(Vault).filter_by(user_id=user.id, symbol=symbol).first()
return vault
Expand Down
Loading
Loading