diff --git a/webapp/Dockerfile b/webapp/Dockerfile index 3252d01..59a758e 100644 --- a/webapp/Dockerfile +++ b/webapp/Dockerfile @@ -2,6 +2,7 @@ # SPDX-License-Identifier: CC0-1.0 FROM alpine:3.20 RUN apk add --no-cache py3-aiosqlite py3-jinja2 py3-starlette py3-uvloop uvicorn +RUN apk add --no-cache lz4 tcpdump COPY . /webapp WORKDIR /webapp CMD ["uvicorn", "--host", "0.0.0.0", "--timeout-graceful-shutdown", "1", "main:app"] diff --git a/webapp/main.py b/webapp/main.py index f2b1288..965e260 100644 --- a/webapp/main.py +++ b/webapp/main.py @@ -6,6 +6,7 @@ import asyncio import base64 import contextlib +import itertools import json import time from pathlib import Path @@ -20,6 +21,8 @@ from starlette.staticfiles import StaticFiles from starlette.templating import Jinja2Templates +from pcap import stream_pcaps_with_bpf + def row_to_dict(row: aiosqlite.Row) -> dict: row_dict = dict(row) @@ -181,25 +184,59 @@ async def api_flow_pcap_get(request): # Query flow start timestamp from database async with eve_db.execute( - "SELECT ts_start FROM flow WHERE id = ?", (flow_id,) + "SELECT ts_start, ts_end, src_ip, src_port, dest_ip, dest_port FROM flow WHERE id = ?", + (flow_id,), ) as cursor: flow = await cursor.fetchone() if not flow: raise HTTPException(404) - flow_us = flow["ts_start"] // 1000 + + # convert μs to seconds + flow_start_secs = flow["ts_start"] // 1000_000 + flow_end_secs = flow["ts_end"] // 1000_000 + + # Build the BPF filter for this flow + src_ip = flow["src_ip"].strip("[]") + src_port = flow["src_port"] + dst_ip = flow["dest_ip"].strip("[]") + dst_port = flow["dest_port"] + bpf_filter = ( + f"host {src_ip} and host {dst_ip} and " f"port {src_port} and port {dst_port}" + ) # Serve corresponding pcap, found using timestamp - path = None - for pcap_path in sorted(Path("../suricata/output/pcaps/").glob("*.*")): - pcap_us = int(pcap_path.name.replace(".lz4", "").rsplit(".", 1)[-1]) - if pcap_us * 1000 > flow_us: - break # take previous one - path = pcap_path - if path is None: + pcaps = [] + pcap_paths = list(sorted(Path("../suricata/output/pcaps/").glob("*.*"))) + timestamps = [ + int(str(pcap_path).replace(".lz4", "").rsplit(".", 1)[-1]) + for pcap_path in pcap_paths + ] + + # create pairs of (start, end) timestamps for each pcap + start_end_stamps = list(itertools.zip_longest(timestamps, timestamps[1:])) + + # Find pcaps overlapping with the flow's time range + for pcap_path, pcap_start_end in zip(pcap_paths, start_end_stamps): + pcap_start, pcap_end = pcap_start_end + # Check if flow overlaps with this PCAP's time range + # For the last PCAP (pcap_end is None), only check if flow ends after pcap starts + if pcap_end is None: + # Last PCAP: include if flow hasn't ended before this PCAP started + if flow_end_secs > pcap_start: + pcaps.append(pcap_path) + else: + # Normal overlap check for PCAPs with known end time + if flow_start_secs < pcap_end and flow_end_secs > pcap_start: + pcaps.append(pcap_path) + + if not pcaps: raise HTTPException(404) - return Response( - path.open("rb").read(), # cache before sending as file might change - headers={"Content-Disposition": f'attachment; filename="{path.name}"'}, + + filename = f"flow-{flow_id}.pcap" + return StreamingResponse( + stream_pcaps_with_bpf(pcaps, bpf_filter), + media_type="application/vnd.tcpdump.pcap", + headers={"Content-Disposition": f'attachment; filename="{filename}"'}, ) diff --git a/webapp/pcap.py b/webapp/pcap.py new file mode 100644 index 0000000..7ef479a --- /dev/null +++ b/webapp/pcap.py @@ -0,0 +1,75 @@ +#!/usr/bin/env python3 +# Copyright (C) 2025 Yun Zheng Hu +# SPDX-License-Identifier: GPL-2.0-or-later +import asyncio +import io +import logging +from pathlib import Path +from typing import AsyncIterable, BinaryIO, Iterable + +PCAP_HEADER_SIZE = 24 + +log = logging.getLogger(__name__) + + +async def pipe_data(reader: asyncio.StreamReader, fout: BinaryIO, skip: int = 0): + """Pipe data from reader to fout, skipping initial bytes if specified.""" + while True: + chunk = await reader.read(io.DEFAULT_BUFFER_SIZE) + if not chunk: + break + if skip: + chunk = chunk[skip:] + skip = 0 + fout.write(chunk) + + +async def stream_pcaps(pcaps: Iterable[str | Path], fout: BinaryIO) -> None: + """Stream multiple pcap files to the given output file-like object.""" + for i, pcap in enumerate(pcaps): + log.debug(pcap) + match Path(pcap).suffix.lower(): + case ".lz4": + args = ["lz4cat", str(pcap)] + case ".gz": + args = ["zcat", str(pcap)] + case ".pcap" | ".pcapng": + args = ["cat", str(pcap)] + case _: + raise ValueError(f"Unknown file extension for pcap: {pcap}") + + log.debug("stream_pcaps args=%r", args) + process = await asyncio.create_subprocess_exec( + *args, + stdout=asyncio.subprocess.PIPE, + ) + skip = PCAP_HEADER_SIZE if i > 0 else 0 + async with asyncio.TaskGroup() as tg: + tg.create_task(pipe_data(process.stdout, fout, skip=skip)) + + log.debug("finished stream_pcaps") + + +async def stream_pcaps_with_bpf( + pcaps: list[str | Path], bpf_filter: str +) -> AsyncIterable[bytes]: + """Stream pcap data through tcpdump with BPF filter applied.""" + # Run tcpdump with reading from stdin and writing to stdout using the BPF filter + args = ["tcpdump", "-r", "-", "-w", "-", bpf_filter] + log.debug("stream_pcaps_with_bpf args=%r", args) + process = await asyncio.create_subprocess_exec( + *args, + stdin=asyncio.subprocess.PIPE, + stdout=asyncio.subprocess.PIPE, + ) + + # create task to stream pcaps into tcpdump's stdin + stream_task = asyncio.create_task(stream_pcaps(pcaps, process.stdin)) + stream_task.add_done_callback(lambda t: process.stdin.close()) + + # read tcpdump's stdout and yield chunks + while True: + chunk = await process.stdout.read(io.DEFAULT_BUFFER_SIZE) + if not chunk: + break + yield chunk