From 9b2a5c6b5783b0aa6acbe1426ddbcc0b156ed8e0 Mon Sep 17 00:00:00 2001 From: Yun Zheng Hu Date: Fri, 21 Nov 2025 16:44:12 +0100 Subject: [PATCH 1/2] webapp: improve pcap download --- webapp/Dockerfile | 1 + webapp/main.py | 62 ++++++++++++++++++++++++++++++++-------- webapp/pcap.py | 73 +++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 124 insertions(+), 12 deletions(-) create mode 100644 webapp/pcap.py diff --git a/webapp/Dockerfile b/webapp/Dockerfile index 3252d01..59a758e 100644 --- a/webapp/Dockerfile +++ b/webapp/Dockerfile @@ -2,6 +2,7 @@ # SPDX-License-Identifier: CC0-1.0 FROM alpine:3.20 RUN apk add --no-cache py3-aiosqlite py3-jinja2 py3-starlette py3-uvloop uvicorn +RUN apk add --no-cache lz4 tcpdump COPY . /webapp WORKDIR /webapp CMD ["uvicorn", "--host", "0.0.0.0", "--timeout-graceful-shutdown", "1", "main:app"] diff --git a/webapp/main.py b/webapp/main.py index f2b1288..aa9aef0 100644 --- a/webapp/main.py +++ b/webapp/main.py @@ -6,6 +6,8 @@ import asyncio import base64 import contextlib +import io +import itertools import json import time from pathlib import Path @@ -20,6 +22,8 @@ from starlette.staticfiles import StaticFiles from starlette.templating import Jinja2Templates +from pcap import stream_pcaps_with_bpf + def row_to_dict(row: aiosqlite.Row) -> dict: row_dict = dict(row) @@ -181,25 +185,59 @@ async def api_flow_pcap_get(request): # Query flow start timestamp from database async with eve_db.execute( - "SELECT ts_start FROM flow WHERE id = ?", (flow_id,) + "SELECT ts_start, ts_end, src_ip, src_port, dest_ip, dest_port FROM flow WHERE id = ?", + (flow_id,), ) as cursor: flow = await cursor.fetchone() if not flow: raise HTTPException(404) - flow_us = flow["ts_start"] // 1000 + + # convert μs to seconds + flow_start_secs = flow["ts_start"] // 1000_000 + flow_end_secs = flow["ts_end"] // 1000_000 + + # Build the BPF filter for this flow + src_ip = flow["src_ip"].strip("[]") + src_port = flow["src_port"] + dst_ip = flow["dest_ip"].strip("[]") + dst_port = flow["dest_port"] + bpf_filter = ( + f"host {src_ip} and host {dst_ip} and " f"port {src_port} and port {dst_port}" + ) # Serve corresponding pcap, found using timestamp - path = None - for pcap_path in sorted(Path("../suricata/output/pcaps/").glob("*.*")): - pcap_us = int(pcap_path.name.replace(".lz4", "").rsplit(".", 1)[-1]) - if pcap_us * 1000 > flow_us: - break # take previous one - path = pcap_path - if path is None: + pcaps = [] + pcap_paths = list(sorted(Path("../suricata/output/pcaps/").glob("*.*"))) + timestamps = [ + int(str(pcap_path).replace(".lz4", "").rsplit(".", 1)[-1]) + for pcap_path in pcap_paths + ] + + # create pairs of (start, end) timestamps for each pcap + start_end_stamps = list(itertools.zip_longest(timestamps, timestamps[1:])) + + # Find pcaps overlapping with the flow's time range + for pcap_path, pcap_start_end in zip(pcap_paths, start_end_stamps): + pcap_start, pcap_end = pcap_start_end + # Check if flow overlaps with this PCAP's time range + # For the last PCAP (pcap_end is None), only check if flow ends after pcap starts + if pcap_end is None: + # Last PCAP: include if flow hasn't ended before this PCAP started + if flow_end_secs > pcap_start: + pcaps.append(pcap_path) + else: + # Normal overlap check for PCAPs with known end time + if flow_start_secs < pcap_end and flow_end_secs > pcap_start: + pcaps.append(pcap_path) + + if not pcaps: raise HTTPException(404) - return Response( - path.open("rb").read(), # cache before sending as file might change - headers={"Content-Disposition": f'attachment; filename="{path.name}"'}, + + filename = f"flow-{flow_id}.pcap" + return StreamingResponse( + stream_pcaps_with_bpf(pcaps, bpf_filter), + media_type="application/vnd.tcpdump.pcap", + headers={"Content-Disposition": f'attachment; filename="{filename}"'}, ) diff --git a/webapp/pcap.py b/webapp/pcap.py new file mode 100644 index 0000000..8de22e5 --- /dev/null +++ b/webapp/pcap.py @@ -0,0 +1,73 @@ +import asyncio +import io +import logging +import sys +from pathlib import Path +from typing import AsyncIterable, BinaryIO, Iterable + +PCAP_HEADER_SIZE = 24 + +log = logging.getLogger(__name__) + + +async def pipe_data(reader: asyncio.StreamReader, fout: BinaryIO, skip: int = 0): + """Pipe data from reader to fout, skipping initial bytes if specified.""" + while True: + chunk = await reader.read(io.DEFAULT_BUFFER_SIZE) + if not chunk: + break + if skip: + chunk = chunk[skip:] + skip = 0 + fout.write(chunk) + + +async def stream_pcaps(pcaps: Iterable[str | Path], fout: BinaryIO) -> None: + """Stream multiple pcap files to the given output file-like object.""" + for i, pcap in enumerate(pcaps): + log.debug(pcap) + match Path(pcap).suffix.lower(): + case ".lz4": + args = ["lz4cat", str(pcap)] + case ".gz": + args = ["zcat", str(pcap)] + case ".pcap" | ".pcapng": + args = ["cat", str(pcap)] + case _: + raise ValueError(f"Unknown file extension for pcap: {pcap}") + + log.debug("stream_pcaps args=%r", args) + process = await asyncio.create_subprocess_exec( + *args, + stdout=asyncio.subprocess.PIPE, + ) + skip = PCAP_HEADER_SIZE if i > 0 else 0 + async with asyncio.TaskGroup() as tg: + stdout_task = tg.create_task(pipe_data(process.stdout, fout, skip=skip)) + + log.debug("finished stream_pcaps") + + +async def stream_pcaps_with_bpf( + pcaps: list[str | Path], bpf_filter: str +) -> AsyncIterable[bytes]: + """Stream pcap data through tcpdump with BPF filter applied.""" + # Run tcpdump with reading from stdin and writing to stdout using the BPF filter + args = ["tcpdump", "-r", "-", "-w", "-", bpf_filter] + log.debug("stream_pcaps_with_bpf args=%r", args) + process = await asyncio.create_subprocess_exec( + *args, + stdin=asyncio.subprocess.PIPE, + stdout=asyncio.subprocess.PIPE, + ) + + # create task to stream pcaps into tcpdump's stdin + stream_task = asyncio.create_task(stream_pcaps(pcaps, process.stdin)) + stream_task.add_done_callback(lambda t: process.stdin.close()) + + # read tcpdump's stdout and yield chunks + while True: + chunk = await process.stdout.read(io.DEFAULT_BUFFER_SIZE) + if not chunk: + break + yield chunk From f4cf60e4207394237b95ea94294d728f2c65a8c9 Mon Sep 17 00:00:00 2001 From: Yun Zheng Hu Date: Fri, 21 Nov 2025 17:00:40 +0100 Subject: [PATCH 2/2] fix CI --- webapp/main.py | 1 - webapp/pcap.py | 6 ++++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/webapp/main.py b/webapp/main.py index aa9aef0..965e260 100644 --- a/webapp/main.py +++ b/webapp/main.py @@ -6,7 +6,6 @@ import asyncio import base64 import contextlib -import io import itertools import json import time diff --git a/webapp/pcap.py b/webapp/pcap.py index 8de22e5..7ef479a 100644 --- a/webapp/pcap.py +++ b/webapp/pcap.py @@ -1,7 +1,9 @@ +#!/usr/bin/env python3 +# Copyright (C) 2025 Yun Zheng Hu +# SPDX-License-Identifier: GPL-2.0-or-later import asyncio import io import logging -import sys from pathlib import Path from typing import AsyncIterable, BinaryIO, Iterable @@ -43,7 +45,7 @@ async def stream_pcaps(pcaps: Iterable[str | Path], fout: BinaryIO) -> None: ) skip = PCAP_HEADER_SIZE if i > 0 else 0 async with asyncio.TaskGroup() as tg: - stdout_task = tg.create_task(pipe_data(process.stdout, fout, skip=skip)) + tg.create_task(pipe_data(process.stdout, fout, skip=skip)) log.debug("finished stream_pcaps")