Skip to content

Commit 89dcdc7

Browse files
committed
working-sse-with-n8n-and-authentication
1 parent 7a0dd69 commit 89dcdc7

1 file changed

Lines changed: 81 additions & 84 deletions

File tree

spendee/spendee_mcp.py

Lines changed: 81 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -6,26 +6,33 @@
66
from fastapi import FastAPI, HTTPException, Request
77
from mcp.server.fastmcp import FastMCP
88
from mcp.server.streamable_http_manager import StreamableHTTPSessionManager
9-
import contextlib
10-
9+
from starlette.routing import Route, Mount
1110
from starlette.applications import Starlette
12-
from starlette.routing import Mount
13-
14-
from mcp.server.fastmcp import FastMCP
11+
from starlette.requests import Request
12+
from starlette.responses import Response
13+
from starlette.types import Scope, Receive, Send
14+
from mcp.server.sse import SseServerTransport
15+
import hashlib
16+
import secrets
1517

1618
# to start (after .venv setup):
1719
# python spendee/spendee_mcp.py
1820

1921
# to test:
2022
# mcp dev spendee/spendee_mcp.py
2123
# Then on the url: http://localhost:6274/
22-
# setup "Transport Type" to "Streamable HTTP"
23-
# and "Server URL" to "http://localhost:8000/mcp"
24+
# setup for "Transport Type": "Streamable HTTP"
25+
# "Server URL" to "http://localhost:8000/mcp"
26+
# setup for "Transport Type": "SSE"
27+
# "Server URL" to "http://localhost:8000/sse"
2428

2529
ACCEPTED_TOKEN = os.environ.get("MCP_TOKEN", "spendee-token")
2630
PORT = int(os.environ.get("MCP_PORT", 8000))
2731
DEBUG_MODE = os.environ.get("DEBUG_MODE", "") != ""
28-
DISABLE_AUTH = os.environ.get("DISABLE_AUTH", "") != ""
32+
TRANSFER_MODE = os.environ.get("TRANSFER_MODE", "sse").lower()
33+
34+
if TRANSFER_MODE not in ["sse", "streamable-http"]:
35+
raise ValueError("TRANSFER_MODE must be either 'sse' or 'streamable-http'")
2936

3037
logging.basicConfig(level=logging.DEBUG)
3138
logger = logging.getLogger(__name__)
@@ -40,30 +47,40 @@ def get_wallets():
4047
{"id": 2, "name": "Savings", "currency": "EUR", "balance": 7890.12},
4148
]
4249

50+
# Authentication middleware and server setup
4351

44-
def server_with_authentication():
45-
session_manager = StreamableHTTPSessionManager(
46-
app=mcp._mcp_server, # Use the underlying MCPServer instance
47-
)
52+
async def check_bearer_auth(request, error_response=None):
53+
if DEBUG_MODE:
54+
logger.debug(f"Incoming request: method={request.method}, url={request.url}")
55+
logger.debug(f"Request headers: {dict(request.headers)}")
4856

49-
async def auth_middleware(scope, receive, send):
50-
request = Request(scope, receive)
51-
# Log request method, url, and headers for troubleshooting
52-
if DEBUG_MODE:
53-
logger.debug(f"Incoming request: method={request.method}, url={request.url}")
54-
logger.debug(f"Request headers: {dict(request.headers)}")
57+
auth_header = request.headers.get("authorization")
58+
if not auth_header or not auth_header.lower().startswith("bearer "):
59+
logger.warning("Missing or invalid Authorization header.")
60+
if error_response:
61+
return await error_response("Missing or invalid Authorization header.", 401)
62+
raise HTTPException(401, "Missing or invalid Authorization header.")
5563

56-
auth_header = request.headers.get("authorization")
64+
token = auth_header.split(" ", 1)[1]
65+
if token != ACCEPTED_TOKEN:
66+
logger.warning("Invalid token.")
67+
logger.debug(f"Expected token: '{ACCEPTED_TOKEN}', received token: '{token}'")
68+
if error_response:
69+
return await error_response("Invalid token.", 401)
70+
raise HTTPException(401, "Invalid token.")
71+
return None
5772

58-
if not auth_header or not auth_header.lower().startswith("bearer "):
59-
logger.warning("Missing or invalid Authorization header.")
60-
raise HTTPException(401, "Missing or invalid Authorization header.")
6173

62-
token = auth_header.split(" ", 1)[1]
63-
if token != ACCEPTED_TOKEN:
64-
logger.warning("Invalid or expired token.")
65-
raise HTTPException(401, "Invalid or expired token.")
74+
def streaming_server():
75+
session_manager = StreamableHTTPSessionManager(
76+
app=mcp._mcp_server,
77+
)
6678

79+
async def auth_middleware(scope, receive, send):
80+
request = Request(scope, receive)
81+
err = await check_bearer_auth(request)
82+
if err:
83+
return
6784
await session_manager.handle_request(scope, receive, send)
6885

6986
@contextlib.asynccontextmanager
@@ -80,59 +97,36 @@ async def lifespan(app: FastAPI):
8097
logger.info(f"Access the MCP endpoint at http://0.0.0.0:{PORT}/mcp")
8198
uvicorn.run(app, host="0.0.0.0", port=PORT)
8299

83-
def server_with_sse():
84100

85-
# Persistent SSE transport instance
86-
sse_transport = mcp._sse_transport if hasattr(mcp, "_sse_transport") else None
87-
if not sse_transport:
88-
# Create and cache the transport instance
89-
from mcp.server.sse import SseServerTransport
90-
sse_transport = SseServerTransport("/messages/")
91-
mcp._sse_transport = sse_transport
101+
def sse_server():
102+
sse_transport = SseServerTransport("/messages/")
103+
104+
async def error_response(msg, status):
105+
response = Response(msg, status_code=status)
106+
async def responder(scope, receive, send):
107+
await response(scope, receive, send)
108+
return responder
92109

93-
from starlette.requests import Request
94-
from starlette.responses import Response
95-
from starlette.types import Scope, Receive, Send
110+
async def handle_sse(request):
111+
async with sse_transport.connect_sse(request.scope, request.receive, request._send) as streams:
112+
await mcp._mcp_server.run(streams[0], streams[1], mcp._mcp_server.create_initialization_options())
113+
return Response()
114+
115+
routes = [
116+
Route("/sse", endpoint=handle_sse, methods=["GET"]),
117+
Mount("/messages/", app=sse_transport.handle_post_message),
118+
]
119+
starlette_sse_app = Starlette(routes=routes)
96120

97121
async def sse_auth_middleware(scope: Scope, receive: Receive, send: Send):
98122
request = Request(scope, receive)
99-
if DEBUG_MODE:
100-
logger.debug(f"Incoming request: method={request.method}, url={request.url}")
101-
logger.debug(f"Request headers: {dict(request.headers)}")
102-
103-
auth_header = request.headers.get("authorization")
104-
if not auth_header or not auth_header.lower().startswith("bearer "):
105-
logger.warning("Missing or invalid Authorization header.")
106-
response = Response("Missing or invalid Authorization header.", status_code=401)
107-
return await response(scope, receive, send)
108-
109-
token = auth_header.split(" ", 1)[1]
110-
if token != ACCEPTED_TOKEN:
111-
logger.warning("Invalid or expired token.")
112-
response = Response("Invalid or expired token.", status_code=401)
113-
return await response(scope, receive, send)
114-
115-
# If auth passes, route to the persistent SSE transport
116-
# Mount /sse and /messages/ endpoints
117-
from starlette.routing import Route, Mount
118-
from starlette.applications import Starlette
119-
120-
# Only create the app once
121-
if not hasattr(mcp, "_starlette_sse_app"):
122-
async def handle_sse(request):
123-
async with sse_transport.connect_sse(request.scope, request.receive, request._send) as streams:
124-
await mcp._mcp_server.run(streams[0], streams[1], mcp._mcp_server.create_initialization_options())
125-
return Response()
126-
127-
routes = [
128-
Route("/sse", endpoint=handle_sse, methods=["GET"]),
129-
Mount("/messages/", app=sse_transport.handle_post_message),
130-
]
131-
mcp._starlette_sse_app = Starlette(routes=routes)
132-
133-
await mcp._starlette_sse_app(scope, receive, send)
134-
135-
# Mount the auth middleware at root
123+
err = await check_bearer_auth(request, error_response)
124+
if err:
125+
await err(scope, receive, send)
126+
return
127+
128+
await starlette_sse_app(scope, receive, send)
129+
136130
app = Starlette(
137131
routes=[
138132
Mount("/", sse_auth_middleware),
@@ -142,16 +136,19 @@ async def handle_sse(request):
142136

143137

144138
if __name__ == "__main__":
145-
logger.info("Starting Spendee MCP Server as SSE without authentication")
146-
# for n8n compatibility, authentication implemented on cloudflare level
147-
server_with_sse()
148-
149-
# if DISABLE_AUTH:
150-
# logger.warning("Running without authentication! This is insecure and should only be used for local testing.")
151-
# #mcp.run(transport="streamable-http")
152-
# mcp.run(transport="sse")
153-
# else:
154-
# server_with_authentication()
139+
logger.info("Starting Spendee MCP Server")
140+
salt = secrets.token_hex(5)
141+
token_hash = hashlib.sha256((salt + ACCEPTED_TOKEN).encode()).hexdigest()
142+
logger.debug(f"sha256('{salt}' + token): {token_hash}")
143+
logger.debug(f"You can verify with: echo -n \"{salt}$MCP_TOKEN\" | sha256sum")
144+
145+
# I failed to unify both transfer modes, because of some lifecycle issues
146+
if TRANSFER_MODE == "streamable-http":
147+
logger.info("Using Streamable HTTP transport")
148+
streaming_server()
149+
else:
150+
logger.info("Using SSE transport")
151+
sse_server()
155152

156153

157154
# relevant URLs for learning:

0 commit comments

Comments
 (0)