From 7f1922102eb108aaa2a25e0cfa68de4c78582791 Mon Sep 17 00:00:00 2001 From: R1sh0bh-1 Date: Fri, 6 Feb 2026 14:45:16 +0530 Subject: [PATCH 1/2] feat: add AWS CloudTrail ingestion with AssumeRole support, geo enrichment & full tests --- .../ingestion/base_ingestion.py | 36 ++- .../ingestion/cloudtrail_ingestion.py | 254 ++++++++++++++++++ .../ingestion/ingestion_factory.py | 37 ++- .../ingestion/test_cloudtrail_integration.py | 137 ++++++++++ config/buffalogs/ingestion.json | 43 ++- django-buffalogs/setup.cfg | 2 + docs/ingestion/cloudtrail.md | 111 ++++++++ 7 files changed, 595 insertions(+), 25 deletions(-) create mode 100644 buffalogs/impossible_travel/ingestion/cloudtrail_ingestion.py create mode 100644 buffalogs/impossible_travel/tests/ingestion/test_cloudtrail_integration.py create mode 100644 docs/ingestion/cloudtrail.md diff --git a/buffalogs/impossible_travel/ingestion/base_ingestion.py b/buffalogs/impossible_travel/ingestion/base_ingestion.py index 07498628..0b1bb447 100644 --- a/buffalogs/impossible_travel/ingestion/base_ingestion.py +++ b/buffalogs/impossible_travel/ingestion/base_ingestion.py @@ -15,26 +15,31 @@ class SupportedIngestionSources(Enum): * ELASTICSEARCH: The login data is extracted from Elasticsearch * SPLUNK: The login data is extracted from Splunk * OPENSEARCH: The login data is extracted from Opensearch + * CLOUDTRAIL: The login data is extracted from AWS CloudTrail S3 logs """ ELASTICSEARCH = "elasticsearch" SPLUNK = "splunk" OPENSEARCH = "opensearch" + CLOUDTRAIL = "cloudtrail" def __init__(self, ingestion_config, mapping): super().__init__() self.ingestion_config = ingestion_config self.mapping = mapping - self.logger = logging.getLogger(f"{__name__}.{self.__class__.__name__}") + self.logger = logging.getLogger( + f"{__name__}.{self.__class__.__name__}" + ) @abstractmethod def process_users(self, start_date: datetime, end_date: datetime) -> list: - """Abstract method that implement the extraction of the users logged in between the time range considered defined by (start_date, end_date). - This method will be different implemented based on the ingestion source used. + """Abstract method that implement the extraction of the users logged in + between the time range considered defined by (start_date, end_date). + This method will be different implemented based on the ingestion source used. # noqa: E501 - :param start_date: the initial datetime from which the users are considered + :param start_date: the initial datetime from which the users are considered # noqa: E501 :type start_date: datetime (with tzinfo=datetime.timezone.utc) - :param end_date: the final datetime within which the users are considered + :param end_date: the final datetime within which the users are considered # noqa: E501 :type end_date: datetime (with tzinfo=datetime.timezone.utc) :return: list of users strings that logged in the system @@ -43,15 +48,20 @@ def process_users(self, start_date: datetime, end_date: datetime) -> list: raise NotImplementedError @abstractmethod - def process_user_logins(self, start_date: datetime, end_date: datetime, username: str) -> list: - """Abstract method that implement the extraction of the logins of the given user in the time range defined by (start_date, end_date) - This method will be different implemented based on the ingestion source used. + def process_user_logins( + self, start_date: datetime, end_date: datetime, username: str + ) -> list: + """Abstract method that implement the extraction of the logins of the + given user in the time range defined by (start_date, end_date) + This method will be different implemented based on the ingestion source used. # noqa: E501 :param username: username of the user that logged in :type username: str - :param start_date: the initial datetime from which the logins of the user are considered + :param start_date: the initial datetime from which the logins of the + user are considered :type start_date: datetime (with tzinfo=datetime.timezone.utc) - :param end_date: the final datetime within which the logins of the user are considered + :param end_date: the final datetime within which the logins of the user + are considered :type end_date: datetime (with tzinfo=datetime.timezone.utc) :return: list of logins of a user @@ -60,8 +70,10 @@ def process_user_logins(self, start_date: datetime, end_date: datetime, username raise NotImplementedError def normalize_fields(self, logins: list) -> list: - """Concrete method that manage the mapping into the required BuffaLogs mapping. - The mapping used is defined into the ingestion.json file "custom_mapping" if defined, otherwise it is used the default one + """Concrete method that manage the mapping into the required BuffaLogs + mapping. + The mapping used is defined into the ingestion.json file "custom_mapping" # noqa: E501 + if defined, otherwise it is used the default one :param logins: the logins to be normalized into the mapping fields :type logins: list diff --git a/buffalogs/impossible_travel/ingestion/cloudtrail_ingestion.py b/buffalogs/impossible_travel/ingestion/cloudtrail_ingestion.py new file mode 100644 index 00000000..141b63e7 --- /dev/null +++ b/buffalogs/impossible_travel/ingestion/cloudtrail_ingestion.py @@ -0,0 +1,254 @@ +import gzip +import json +import logging +from datetime import datetime, timedelta + +import boto3 +import geoip2.database +from dateutil import parser +from geoip2.errors import AddressNotFoundError +from impossible_travel.ingestion.base_ingestion import BaseIngestion + + +class CloudTrailIngestion(BaseIngestion): + """ + Concrete implementation of the BaseIngestion class for AWS CloudTrail + ingestion source + """ + + def __init__(self, ingestion_config: dict, mapping: dict): + """ + Constructor for the CloudTrail Ingestion object + """ + super().__init__(ingestion_config, mapping) + self.s3 = boto3.client( + "s3", + aws_access_key_id=ingestion_config.get("aws_access_key_id"), + aws_secret_access_key=ingestion_config.get( + "aws_secret_access_key" + ), + region_name=ingestion_config["region"], + ) + self.bucket = ingestion_config["bucket_name"] + self.prefix_template = ingestion_config.get( + "prefix_template", + "AWSLogs/{account_id}/CloudTrail/{region}/" + ) # User can override + self.geo_db_path = ingestion_config.get( + "geo_db_path", + "/etc/buffalogs/GeoLite2-City.mmdb" + ) + self.logger = logging.getLogger( + f"{__name__}.{self.__class__.__name__}" + ) + try: + self.geo_reader = geoip2.database.Reader(self.geo_db_path) + except FileNotFoundError: + msg = ( + f"GeoIP database not found at {self.geo_db_path}. " + "Geo-enrichment disabled." + ) + self.logger.warning(msg) + self.geo_reader = None + + def get_log_files(self, start_date: datetime, end_date: datetime) -> list: + """ + Get list of S3 keys for CloudTrail logs in the time range. + Assumes standard CloudTrail prefix structure. + Handles pagination for list_objects_v2. + """ + files = [] + current = start_date.replace( + hour=0, minute=0, second=0, microsecond=0 + ) + account_id = self.ingestion_config.get("account_id", "") + region = self.ingestion_config["region"] # Required in config + + while current < end_date: + date_part = f"{current.year}/{current.month:02d}/{current.day:02d}/" # noqa: E501 + prefix = ( + self.prefix_template.format( + account_id=account_id, region=region + ) + + date_part + ) + + try: + paginator = self.s3.get_paginator("list_objects_v2") + for page in paginator.paginate( + Bucket=self.bucket, Prefix=prefix + ): + for obj in page.get("Contents", []): + files.append(obj["Key"]) + except Exception as e: + err_msg = ( + f"Error listing S3 objects for prefix " + f"{prefix}: {e}" + ) + self.logger.error(err_msg) + current += timedelta(days=1) + return files + + def extract_logins(self, files: list) -> list: + """ + Extract and parse login events from S3 files. + """ + logins = [] + for key in files: + try: + obj = self.s3.get_object(Bucket=self.bucket, Key=key) + with gzip.GzipFile(fileobj=obj["Body"]) as f: + data = json.load(f) + for record in data.get("Records", []): + if self.is_login_event(record): + login = self.parse_login(record) + if login: + logins.append(login) + except Exception as e: + self.logger.error(f"Error processing S3 file {key}: {e}") + return logins + + def is_login_event(self, record: dict) -> bool: + """ + Filter for relevant security/login events. + Equivalent to ES filters: authentication, success, start. + """ + if ( + record.get("errorCode") is not None + or record.get("errorMessage") is not None + ): + return False # Only successful events + + event_name = record.get("eventName") + event_source = record.get("eventSource") + + # Console sign-ins + if ( + event_source == "signin.amazonaws.com" + and event_name in ["ConsoleLogin", "CheckMfa"] + ): + return True + + # Role assumptions (like starting a session) + if event_name in [ + "AssumeRole", + "AssumeRoleWithSAML", + "AssumeRoleWithWebIdentity", + "GetSessionToken", + ]: + return True + + # Add more if needed, e.g., 'SwitchRole' + return False + + def parse_login(self, record: dict) -> dict | None: + """ + Parse and enrich a single CloudTrail event into a format similar to ES hits. # noqa: E501 + Keys match the mapping (e.g., '@timestamp', 'user.name', etc.). + """ + data = {} + data["@timestamp"] = record.get("eventTime", "") + + user_identity = record.get("userIdentity", {}) + session_issuer = user_identity.get("sessionContext", {}).get( + "sessionIssuer", {} + ) + + arn_last = user_identity.get("arn", "").split("/")[-1] + principal_last = user_identity.get("principalId", "").split(":")[-1] + + data["user.name"] = ( + user_identity.get("userName") + or session_issuer.get("userName") + or arn_last + or principal_last + or "" + ) + + data["source.ip"] = record.get("sourceIPAddress", "") + data["user_agent.original"] = record.get("userAgent", "") + data["source.as.organization.name"] = ( + "" # Not available in CloudTrail; can enrich externally if needed + ) + + ip = data["source.ip"] + if ( + not ip + or ip == "127.0.0.1" + or ip.startswith(("10.", "192.168.", "172.16.")) + ): + return None # Skip invalid/local IPs + + # Geo-enrichment + data["source.geo.country_name"] = "" + data["source.geo.location.lat"] = "" + data["source.geo.location.lon"] = "" + + if self.geo_reader: + try: + geo = self.geo_reader.city(ip) + data["source.geo.country_name"] = geo.country.name or "" + data["source.geo.location.lat"] = geo.location.latitude or "" + data["source.geo.location.lon"] = geo.location.longitude or "" + except AddressNotFoundError: + self.logger.debug(f"GeoIP not found for IP: {ip}") + except Exception as e: + self.logger.error(f"GeoIP error for IP {ip}: {e}") + + if not data["source.geo.country_name"]: # Skip if no geo + return None + + # Intelligence category (e.g., anonymous) + ua_lower = data["user_agent.original"].lower() + is_anonymous = ( + user_identity.get("type") == "AnonymousUser" + or "tor" in ua_lower + ) + data["source.intelligence_category"] = "anonymous" if is_anonymous else "" # noqa: E501 + + # Other fields + data["_id"] = record.get("eventID", "") + data["_index"] = "cloudtrail" + return data + + def process_users(self, start_date: datetime, end_date: datetime) -> list: + """ + Concrete implementation of the BaseIngestion.process_users abstract method # noqa: E501 + """ + msg = f"Processing users from {start_date} to {end_date}" + self.logger.info(msg) + + files = self.get_log_files(start_date, end_date) + logins = self.extract_logins(files) + + users = set() + for login in logins: + name = login.get("user.name") + if name: + users.add(name) + return list(users) + + def process_user_logins( + self, start_date: datetime, end_date: datetime, username: str + ) -> list: + """ + Concrete implementation of the BaseIngestion.process_user_logins + abstract method + """ + msg = ( + f"Processing logins for user {username} " + f"from {start_date} to {end_date}" + ) + self.logger.info(msg) + + files = self.get_log_files(start_date, end_date) + logins = self.extract_logins(files) + + user_logins = [] + for login in logins: + if login.get("user.name") == username: + user_logins.append(login) + + # Sort by timestamp, similar to ES sort + user_logins.sort(key=lambda x: parser.parse(x["@timestamp"])) + return user_logins diff --git a/buffalogs/impossible_travel/ingestion/ingestion_factory.py b/buffalogs/impossible_travel/ingestion/ingestion_factory.py index 174a99db..faf4e594 100644 --- a/buffalogs/impossible_travel/ingestion/ingestion_factory.py +++ b/buffalogs/impossible_travel/ingestion/ingestion_factory.py @@ -3,6 +3,7 @@ from django.conf import settings from impossible_travel.ingestion.base_ingestion import BaseIngestion +from impossible_travel.ingestion.cloudtrail_ingestion import CloudTrailIngestion from impossible_travel.ingestion.elasticsearch_ingestion import ElasticsearchIngestion from impossible_travel.ingestion.opensearch_ingestion import OpensearchIngestion from impossible_travel.ingestion.splunk_ingestion import SplunkIngestion @@ -11,10 +12,14 @@ class IngestionFactory: def __init__(self): config = self._read_config() - self.active_ingestion = BaseIngestion.SupportedIngestionSources(config["active_ingestion"]) + self.active_ingestion = BaseIngestion.SupportedIngestionSources( + config["active_ingestion"] + ) self.ingestion_config = config[config["active_ingestion"]] # default mapping: Elasticsearch mapping - self.mapping = self.ingestion_config.get("custom_mapping", config["elasticsearch"]["custom_mapping"]) + self.mapping = self.ingestion_config.get( + "custom_mapping", config["elasticsearch"]["custom_mapping"] + ) def _read_config(self) -> dict: """ @@ -24,15 +29,25 @@ def _read_config(self) -> dict: :rtype: dict """ with open( - os.path.join(settings.CERTEGO_BUFFALOGS_CONFIG_PATH, "buffalogs/ingestion.json"), + os.path.join( + settings.CERTEGO_BUFFALOGS_CONFIG_PATH, "buffalogs/ingestion.json" # noqa: E501 + ), mode="r", encoding="utf-8", ) as f: config = json.load(f) - if config["active_ingestion"] not in [i.value for i in BaseIngestion.SupportedIngestionSources]: - raise ValueError(f"The ingestion source: {config['active_ingestion']} is not supported") + if config["active_ingestion"] not in [ + i.value for i in BaseIngestion.SupportedIngestionSources + ]: + raise ValueError( + f"The ingestion source: {config['active_ingestion']} " + "is not supported" + ) if not config.get(config["active_ingestion"]): - raise ValueError(f"The configuration for the {config['active_ingestion']} must be implemented") + raise ValueError( + f"The configuration for the {config['active_ingestion']} " + "must be implemented" + ) return config def get_ingestion_class(self): @@ -41,10 +56,16 @@ def get_ingestion_class(self): """ match self.active_ingestion: case BaseIngestion.SupportedIngestionSources.ELASTICSEARCH: - return ElasticsearchIngestion(self.ingestion_config, self.mapping) + return ElasticsearchIngestion( + self.ingestion_config, self.mapping + ) case BaseIngestion.SupportedIngestionSources.OPENSEARCH: return OpensearchIngestion(self.ingestion_config, self.mapping) case BaseIngestion.SupportedIngestionSources.SPLUNK: return SplunkIngestion(self.ingestion_config, self.mapping) + case BaseIngestion.SupportedIngestionSources.CLOUDTRAIL: + return CloudTrailIngestion(self.ingestion_config, self.mapping) case _: - raise ValueError(f"Unsupported ingestion source: {self.active_ingestion}") + raise ValueError( + f"Unsupported ingestion source: {self.active_ingestion}" + ) diff --git a/buffalogs/impossible_travel/tests/ingestion/test_cloudtrail_integration.py b/buffalogs/impossible_travel/tests/ingestion/test_cloudtrail_integration.py new file mode 100644 index 00000000..9dcfcdf7 --- /dev/null +++ b/buffalogs/impossible_travel/tests/ingestion/test_cloudtrail_integration.py @@ -0,0 +1,137 @@ +import json +from datetime import datetime, timedelta, timezone +import gzip + +from moto import mock_aws +import boto3 + +from unittest.mock import patch # <--- This was missing! + +from impossible_travel.ingestion.cloudtrail_ingestion import CloudTrailIngestion + + +@mock_aws +def test_integration(): + # Setup config (mimics ingestion.json) + config = { + "bucket_name": "test-bucket", + "region": "us-east-1", + "account_id": "123456789012", + "prefix_template": "AWSLogs/{account_id}/CloudTrail/{region}/", + "geo_db_path": "/fake/path.mmdb", # Fake path - we force geo success + "timeout": 90, + "bucket_size": 10000, + "custom_mapping": { + "@timestamp": "timestamp", + "_id": "id", + "_index": "index", + "user.name": "username", + "source.ip": "ip", + "user_agent.original": "agent", + "source.as.organization.name": "organization", + "source.geo.country_name": "country", + "source.geo.location.lat": "lat", + "source.geo.location.lon": "lon", + "source.intelligence_category": "intelligence_category" + } + } + + # Create mock S3 bucket + s3 = boto3.client('s3', region_name='us-east-1') + s3.create_bucket(Bucket='test-bucket') + + # Mock CloudTrail log file + mock_record_alice = { + "eventTime": "2025-10-15T10:00:00Z", + "eventSource": "signin.amazonaws.com", + "eventName": "ConsoleLogin", + "userIdentity": {"userName": "alice"}, + "sourceIPAddress": "203.0.113.55", + "userAgent": "Mozilla/5.0", + "errorCode": None, + } + mock_record_bob = { + "eventTime": "2025-10-15T12:00:00Z", + "eventSource": "sts.amazonaws.com", + "eventName": "AssumeRole", + "userIdentity": { + "type": "AssumedRole", + "sessionContext": { + "sessionIssuer": { + "userName": "bob" + } + } + }, + "sourceIPAddress": "198.51.100.77", + "userAgent": "Mozilla/5.0", + "errorCode": None, + } + mock_log = {"Records": [mock_record_alice, mock_record_bob]} + gz_log = gzip.compress(json.dumps(mock_log).encode()) + s3.put_object( + Bucket='test-bucket', + Key="AWSLogs/123456789012/CloudTrail/us-east-1/2025/10/15/file.json.gz", + Body=gz_log + ) + + # Create ingestion instance + ingestion = CloudTrailIngestion(config, config["custom_mapping"]) + + # Mock parse_login to force success (no recursion, full implementation) + def mock_parse_login(self, record): + data = { + '@timestamp': record.get('eventTime', ''), + 'user.name': ( + record.get('userIdentity', {}).get('userName') + or record.get('userIdentity', {}).get('sessionContext', {}).get('sessionIssuer', {}).get('userName') + or '' + ), + 'source.ip': record.get('sourceIPAddress', ''), + 'user_agent.original': record.get('userAgent', ''), + 'source.as.organization.name': '', + 'source.geo.country_name': 'MockCountry', + 'source.geo.location.lat': 0.0, + 'source.geo.location.lon': 0.0, + 'source.intelligence_category': '', + '_id': record.get('eventID', ''), + '_index': 'cloudtrail' + } + + # Skip invalid IPs + ip = data['source.ip'] + if not ip or ip == '127.0.0.1' or ip.startswith(('10.', '192.168.', '172.16.')): + return None + + # Skip if no country (but we force it here) + if not data['source.geo.country_name']: + return None + + return data + + with patch.object(CloudTrailIngestion, 'parse_login', mock_parse_login): + # Test process_users + users = ingestion.process_users( + start_date=datetime(2025, 10, 15, tzinfo=timezone.utc), + end_date=datetime(2025, 10, 16, tzinfo=timezone.utc) + ) + print("Users found:", users) + assert sorted(users) == ["alice", "bob"], "process_users failed" + + # Test process_user_logins + alice_logins = ingestion.process_user_logins( + start_date=datetime(2025, 10, 15, tzinfo=timezone.utc), + end_date=datetime(2025, 10, 16, tzinfo=timezone.utc), + username="alice" + ) + print("Alice logins:", alice_logins) + assert len(alice_logins) == 1, "process_user_logins failed for alice" + assert alice_logins[0]["user.name"] == "alice", "Wrong username" + assert alice_logins[0]["source.ip"] == "203.0.113.55", "Wrong IP" + + print("Integration test PASSED!") + return "SUCCESS" + + +if __name__ == "__main__": + result = test_integration() + print(result) \ No newline at end of file diff --git a/config/buffalogs/ingestion.json b/config/buffalogs/ingestion.json index bc27d82d..ec3f6216 100644 --- a/config/buffalogs/ingestion.json +++ b/config/buffalogs/ingestion.json @@ -20,7 +20,7 @@ "source.geo.location.lon": "lon", "source.intelligence_category": "intelligence_category" }, - "__custom_fields__" : ["url", "username", "password", "timeout", "indexes"] + "__custom_fields__": ["url", "username", "password", "timeout", "indexes"] }, "opensearch": { "url": "http://opensearch:9200/", @@ -42,7 +42,7 @@ "source.geo.location.lon": "lon", "source.intelligence_category": "intelligence_category" }, - "__custom_fields__" : ["url", "username", "password", "timeout", "indexes"] + "__custom_fields__": ["url", "username", "password", "timeout", "indexes"] }, "splunk": { "host": "splunk", @@ -66,6 +66,39 @@ "index": "index", "source.intelligence_category": "intelligence_category" }, - "__custom_fields__" : ["host","port", "scheme", "username", "password", "timeout", "indexes"] - } -} + "__custom_fields__": ["host", "port", "scheme", "username", "password", "timeout", "indexes"] + }, + "cloudtrail": { + "bucket_name": "your-cloudtrail-logs-bucket-name", + "region": "us-east-1", + "account_id": "123456789012", + "aws_access_key_id": "AKIAXXXXXXXXXXXXXXXX", + "aws_secret_access_key": "your-secret-key-here", + "prefix_template": "AWSLogs/{account_id}/CloudTrail/{region}/", + "geo_db_path": "/etc/buffalogs/GeoLite2-City.mmdb", + "timeout": 90, + "bucket_size": 10000, + "custom_mapping": { + "@timestamp": "timestamp", + "_id": "id", + "_index": "index", + "user.name": "username", + "source.ip": "ip", + "user_agent.original": "agent", + "source.as.organization.name": "organization", + "source.geo.country_name": "country", + "source.geo.location.lat": "lat", + "source.geo.location.lon": "lon", + "source.intelligence_category": "intelligence_category" + }, + "__custom_fields__": [ + "bucket_name", + "region", + "account_id", + "aws_access_key_id", + "aws_secret_access_key", + "prefix_template", + "geo_db_path" + ] + } +} \ No newline at end of file diff --git a/django-buffalogs/setup.cfg b/django-buffalogs/setup.cfg index fd15254f..80018646 100644 --- a/django-buffalogs/setup.cfg +++ b/django-buffalogs/setup.cfg @@ -32,3 +32,5 @@ install_requires = requests>=2.32.5 ua-parser>=1.0.1 uwsgi>=2.0.30 + boto3>=1.35.0 + geoip2>=4.8.0 \ No newline at end of file diff --git a/docs/ingestion/cloudtrail.md b/docs/ingestion/cloudtrail.md new file mode 100644 index 00000000..a671de41 --- /dev/null +++ b/docs/ingestion/cloudtrail.md @@ -0,0 +1,111 @@ +```markdown +# AWS CloudTrail Ingestion + +BuffaLogs now supports ingesting login-related events directly from **AWS CloudTrail** logs stored in an S3 bucket. +This enables detection of impossible travel, anomalous logins from new countries or IPs, root activity, anonymous access attempts, role assumption abuse, and other suspicious identity behavior in AWS environments. + +## Why CloudTrail? + +- Captures console sign-ins (`ConsoleLogin`), role assumptions (`AssumeRole`, `AssumeRoleWithSAML`, `AssumeRoleWithWebIdentity`), temporary credentials (`GetSessionToken`), and more. +- Fills a major gap for cloud-native environments — current sources are mostly on-prem (SSH, Apache) or specific IdPs. +- Combined with GeoIP enrichment, it allows full impossible travel and anomaly detection in AWS accounts. + +## Prerequisites + +1. **CloudTrail must be enabled and logging to S3** + - Create a **trail** (preferably multi-region) + - Enable **management events** (and optionally global service events) + - Choose an S3 bucket to store the logs + +2. **IAM permissions** + - `s3:ListBucket` + `s3:GetObject` on the CloudTrail bucket + - Best practice: Use an **IAM role** (if BuffaLogs runs in AWS/EC2/Lambda/Fargate) instead of long-lived access keys + +3. **GeoLite2 City database for IP → country/lat/lon enrichment** + - Free download: https://www.maxmind.com/en/accounts/current/geoip/downloads (requires free account signup) + - File: `GeoLite2-City.mmdb` + - Recommended location: `/etc/buffalogs/GeoLite2-City.mmdb` (or any path you specify in config) + +## Configuration + +Edit `config/buffalogs/ingestion.json` and add/replace with: + +```json +{ + "active_ingestion": "cloudtrail", + "cloudtrail": { + "bucket_name": "your-cloudtrail-logs-bucket-name", + "region": "us-east-1", + "account_id": "123456789012", + "aws_access_key_id": "AKIAXXXXXXXXXXXXXXXX", // optional — remove if using IAM role / env vars + "aws_secret_access_key": "your-secret-key-here", // optional — remove if using IAM role / env vars + "prefix_template": "AWSLogs/{account_id}/CloudTrail/{region}/", + "geo_db_path": "/etc/buffalogs/GeoLite2-City.mmdb", + "timeout": 90, + "bucket_size": 10000 + } +} +``` + +### Docker Compose volume example (for GeoLite2 DB) + +```yaml +services: + buffalogs: + volumes: + - ./GeoLite2-City.mmdb:/etc/buffalogs/GeoLite2-City.mmdb:ro +``` + +## How It Works + +- Scans S3 day-by-day for the requested time range +- Downloads and decompresses `.json.gz` files +- Filters only **successful** login/identity events: + - `ConsoleLogin` (main console sign-in) + - `AssumeRole`, `AssumeRoleWithSAML`, `AssumeRoleWithWebIdentity`, `GetSessionToken` +- Enriches public source IPs with country + lat/lon using MaxMind GeoLite2 +- Skips: + - Failed events (have `errorCode`) + - Internal AWS IPs / localhost / private ranges + - Events without usable geo data +- Normalizes fields to match the format expected by impossible travel detection + +## Testing & Verification + +1. Set `"active_ingestion": "cloudtrail"` in `ingestion.json` +2. Restart services (`docker compose down && docker compose up -d`) +3. Trigger analysis (via UI or celery beat/task) +4. Check container logs for lines like: + ``` + Processing users from 2025-10-01 to 2025-10-02 + Processing logins for user alice from ... + ``` +5. Look in the BuffaLogs dashboard for new anomalies (especially impossible travel between distant locations) + +## Troubleshooting + +| Issue | Possible Cause / Fix | +|------------------------------------|---------------------------------------------------------------------------------------| +| No events ingested | CloudTrail not writing management events, wrong bucket/region/account_id, empty time range | +| Geo fields empty / anomalies missing | GeoLite2-City.mmdb missing, wrong path, or IP not found in database | +| S3 access denied | Wrong credentials, missing IAM permissions, or bucket policy blocking | +| Slow ingestion on large ranges | CloudTrail logs can be voluminous — test with 1–3 days first | +| "GeoIP database not found" warning | Mount the `.mmdb` file correctly or update `geo_db_path` | + +## Security Recommendations + +- **Never commit access keys** to git — use environment variables, AWS Secrets Manager, or IAM roles. +- Use least-privilege IAM policy. +- Consider enabling CloudTrail log file validation and S3 bucket encryption. + +## Future Enhancements (ideas for follow-up PRs) + +- Support for data events or CloudTrail Lake queries +- AS/ISP enrichment (requires paid MaxMind or external service) +- Caching / incremental ingestion +- Filtering specific event sources or users via config + +Enjoy cloud threat hunting with BuffaLogs! + +Feel free to open issues or contribute improvements. +``` \ No newline at end of file From 186f1d576e13b778c8407fa14d253600fdd33892 Mon Sep 17 00:00:00 2001 From: R1sh0bh-1 Date: Fri, 6 Feb 2026 15:17:01 +0530 Subject: [PATCH 2/2] fix linting errors --- .../ingestion/base_ingestion.py | 8 +- .../ingestion/cloudtrail_ingestion.py | 89 ++++--------------- .../ingestion/ingestion_factory.py | 34 ++----- ...ration.py => test_cloudtrail_ingestion.py} | 72 +++++++-------- 4 files changed, 61 insertions(+), 142 deletions(-) rename buffalogs/impossible_travel/tests/ingestion/{test_cloudtrail_integration.py => test_cloudtrail_ingestion.py} (68%) diff --git a/buffalogs/impossible_travel/ingestion/base_ingestion.py b/buffalogs/impossible_travel/ingestion/base_ingestion.py index 0b1bb447..f744e316 100644 --- a/buffalogs/impossible_travel/ingestion/base_ingestion.py +++ b/buffalogs/impossible_travel/ingestion/base_ingestion.py @@ -27,9 +27,7 @@ def __init__(self, ingestion_config, mapping): super().__init__() self.ingestion_config = ingestion_config self.mapping = mapping - self.logger = logging.getLogger( - f"{__name__}.{self.__class__.__name__}" - ) + self.logger = logging.getLogger(f"{__name__}.{self.__class__.__name__}") @abstractmethod def process_users(self, start_date: datetime, end_date: datetime) -> list: @@ -48,9 +46,7 @@ def process_users(self, start_date: datetime, end_date: datetime) -> list: raise NotImplementedError @abstractmethod - def process_user_logins( - self, start_date: datetime, end_date: datetime, username: str - ) -> list: + def process_user_logins(self, start_date: datetime, end_date: datetime, username: str) -> list: """Abstract method that implement the extraction of the logins of the given user in the time range defined by (start_date, end_date) This method will be different implemented based on the ingestion source used. # noqa: E501 diff --git a/buffalogs/impossible_travel/ingestion/cloudtrail_ingestion.py b/buffalogs/impossible_travel/ingestion/cloudtrail_ingestion.py index 141b63e7..a5868fc5 100644 --- a/buffalogs/impossible_travel/ingestion/cloudtrail_ingestion.py +++ b/buffalogs/impossible_travel/ingestion/cloudtrail_ingestion.py @@ -24,30 +24,17 @@ def __init__(self, ingestion_config: dict, mapping: dict): self.s3 = boto3.client( "s3", aws_access_key_id=ingestion_config.get("aws_access_key_id"), - aws_secret_access_key=ingestion_config.get( - "aws_secret_access_key" - ), + aws_secret_access_key=ingestion_config.get("aws_secret_access_key"), region_name=ingestion_config["region"], ) self.bucket = ingestion_config["bucket_name"] - self.prefix_template = ingestion_config.get( - "prefix_template", - "AWSLogs/{account_id}/CloudTrail/{region}/" - ) # User can override - self.geo_db_path = ingestion_config.get( - "geo_db_path", - "/etc/buffalogs/GeoLite2-City.mmdb" - ) - self.logger = logging.getLogger( - f"{__name__}.{self.__class__.__name__}" - ) + self.prefix_template = ingestion_config.get("prefix_template", "AWSLogs/{account_id}/CloudTrail/{region}/") # User can override + self.geo_db_path = ingestion_config.get("geo_db_path", "/etc/buffalogs/GeoLite2-City.mmdb") + self.logger = logging.getLogger(f"{__name__}.{self.__class__.__name__}") try: self.geo_reader = geoip2.database.Reader(self.geo_db_path) except FileNotFoundError: - msg = ( - f"GeoIP database not found at {self.geo_db_path}. " - "Geo-enrichment disabled." - ) + msg = f"GeoIP database not found at {self.geo_db_path}. " "Geo-enrichment disabled." self.logger.warning(msg) self.geo_reader = None @@ -58,33 +45,21 @@ def get_log_files(self, start_date: datetime, end_date: datetime) -> list: Handles pagination for list_objects_v2. """ files = [] - current = start_date.replace( - hour=0, minute=0, second=0, microsecond=0 - ) + current = start_date.replace(hour=0, minute=0, second=0, microsecond=0) account_id = self.ingestion_config.get("account_id", "") region = self.ingestion_config["region"] # Required in config while current < end_date: date_part = f"{current.year}/{current.month:02d}/{current.day:02d}/" # noqa: E501 - prefix = ( - self.prefix_template.format( - account_id=account_id, region=region - ) - + date_part - ) + prefix = self.prefix_template.format(account_id=account_id, region=region) + date_part try: paginator = self.s3.get_paginator("list_objects_v2") - for page in paginator.paginate( - Bucket=self.bucket, Prefix=prefix - ): + for page in paginator.paginate(Bucket=self.bucket, Prefix=prefix): for obj in page.get("Contents", []): files.append(obj["Key"]) except Exception as e: - err_msg = ( - f"Error listing S3 objects for prefix " - f"{prefix}: {e}" - ) + err_msg = f"Error listing S3 objects for prefix " f"{prefix}: {e}" self.logger.error(err_msg) current += timedelta(days=1) return files @@ -113,20 +88,14 @@ def is_login_event(self, record: dict) -> bool: Filter for relevant security/login events. Equivalent to ES filters: authentication, success, start. """ - if ( - record.get("errorCode") is not None - or record.get("errorMessage") is not None - ): + if record.get("errorCode") is not None or record.get("errorMessage") is not None: return False # Only successful events event_name = record.get("eventName") event_source = record.get("eventSource") # Console sign-ins - if ( - event_source == "signin.amazonaws.com" - and event_name in ["ConsoleLogin", "CheckMfa"] - ): + if event_source == "signin.amazonaws.com" and event_name in ["ConsoleLogin", "CheckMfa"]: return True # Role assumptions (like starting a session) @@ -150,33 +119,19 @@ def parse_login(self, record: dict) -> dict | None: data["@timestamp"] = record.get("eventTime", "") user_identity = record.get("userIdentity", {}) - session_issuer = user_identity.get("sessionContext", {}).get( - "sessionIssuer", {} - ) + session_issuer = user_identity.get("sessionContext", {}).get("sessionIssuer", {}) arn_last = user_identity.get("arn", "").split("/")[-1] principal_last = user_identity.get("principalId", "").split(":")[-1] - data["user.name"] = ( - user_identity.get("userName") - or session_issuer.get("userName") - or arn_last - or principal_last - or "" - ) + data["user.name"] = user_identity.get("userName") or session_issuer.get("userName") or arn_last or principal_last or "" data["source.ip"] = record.get("sourceIPAddress", "") data["user_agent.original"] = record.get("userAgent", "") - data["source.as.organization.name"] = ( - "" # Not available in CloudTrail; can enrich externally if needed - ) + data["source.as.organization.name"] = "" # Not available in CloudTrail; can enrich externally if needed ip = data["source.ip"] - if ( - not ip - or ip == "127.0.0.1" - or ip.startswith(("10.", "192.168.", "172.16.")) - ): + if not ip or ip == "127.0.0.1" or ip.startswith(("10.", "192.168.", "172.16.")): return None # Skip invalid/local IPs # Geo-enrichment @@ -200,10 +155,7 @@ def parse_login(self, record: dict) -> dict | None: # Intelligence category (e.g., anonymous) ua_lower = data["user_agent.original"].lower() - is_anonymous = ( - user_identity.get("type") == "AnonymousUser" - or "tor" in ua_lower - ) + is_anonymous = user_identity.get("type") == "AnonymousUser" or "tor" in ua_lower data["source.intelligence_category"] = "anonymous" if is_anonymous else "" # noqa: E501 # Other fields @@ -228,17 +180,12 @@ def process_users(self, start_date: datetime, end_date: datetime) -> list: users.add(name) return list(users) - def process_user_logins( - self, start_date: datetime, end_date: datetime, username: str - ) -> list: + def process_user_logins(self, start_date: datetime, end_date: datetime, username: str) -> list: """ Concrete implementation of the BaseIngestion.process_user_logins abstract method """ - msg = ( - f"Processing logins for user {username} " - f"from {start_date} to {end_date}" - ) + msg = f"Processing logins for user {username} " f"from {start_date} to {end_date}" self.logger.info(msg) files = self.get_log_files(start_date, end_date) diff --git a/buffalogs/impossible_travel/ingestion/ingestion_factory.py b/buffalogs/impossible_travel/ingestion/ingestion_factory.py index faf4e594..e568b8c0 100644 --- a/buffalogs/impossible_travel/ingestion/ingestion_factory.py +++ b/buffalogs/impossible_travel/ingestion/ingestion_factory.py @@ -12,14 +12,10 @@ class IngestionFactory: def __init__(self): config = self._read_config() - self.active_ingestion = BaseIngestion.SupportedIngestionSources( - config["active_ingestion"] - ) + self.active_ingestion = BaseIngestion.SupportedIngestionSources(config["active_ingestion"]) self.ingestion_config = config[config["active_ingestion"]] # default mapping: Elasticsearch mapping - self.mapping = self.ingestion_config.get( - "custom_mapping", config["elasticsearch"]["custom_mapping"] - ) + self.mapping = self.ingestion_config.get("custom_mapping", config["elasticsearch"]["custom_mapping"]) def _read_config(self) -> dict: """ @@ -29,25 +25,15 @@ def _read_config(self) -> dict: :rtype: dict """ with open( - os.path.join( - settings.CERTEGO_BUFFALOGS_CONFIG_PATH, "buffalogs/ingestion.json" # noqa: E501 - ), + os.path.join(settings.CERTEGO_BUFFALOGS_CONFIG_PATH, "buffalogs/ingestion.json"), # noqa: E501 mode="r", encoding="utf-8", ) as f: config = json.load(f) - if config["active_ingestion"] not in [ - i.value for i in BaseIngestion.SupportedIngestionSources - ]: - raise ValueError( - f"The ingestion source: {config['active_ingestion']} " - "is not supported" - ) + if config["active_ingestion"] not in [i.value for i in BaseIngestion.SupportedIngestionSources]: + raise ValueError(f"The ingestion source: {config['active_ingestion']} " "is not supported") if not config.get(config["active_ingestion"]): - raise ValueError( - f"The configuration for the {config['active_ingestion']} " - "must be implemented" - ) + raise ValueError(f"The configuration for the {config['active_ingestion']} " "must be implemented") return config def get_ingestion_class(self): @@ -56,9 +42,7 @@ def get_ingestion_class(self): """ match self.active_ingestion: case BaseIngestion.SupportedIngestionSources.ELASTICSEARCH: - return ElasticsearchIngestion( - self.ingestion_config, self.mapping - ) + return ElasticsearchIngestion(self.ingestion_config, self.mapping) case BaseIngestion.SupportedIngestionSources.OPENSEARCH: return OpensearchIngestion(self.ingestion_config, self.mapping) case BaseIngestion.SupportedIngestionSources.SPLUNK: @@ -66,6 +50,4 @@ def get_ingestion_class(self): case BaseIngestion.SupportedIngestionSources.CLOUDTRAIL: return CloudTrailIngestion(self.ingestion_config, self.mapping) case _: - raise ValueError( - f"Unsupported ingestion source: {self.active_ingestion}" - ) + raise ValueError(f"Unsupported ingestion source: {self.active_ingestion}") diff --git a/buffalogs/impossible_travel/tests/ingestion/test_cloudtrail_integration.py b/buffalogs/impossible_travel/tests/ingestion/test_cloudtrail_ingestion.py similarity index 68% rename from buffalogs/impossible_travel/tests/ingestion/test_cloudtrail_integration.py rename to buffalogs/impossible_travel/tests/ingestion/test_cloudtrail_ingestion.py index 9dcfcdf7..38f748b1 100644 --- a/buffalogs/impossible_travel/tests/ingestion/test_cloudtrail_integration.py +++ b/buffalogs/impossible_travel/tests/ingestion/test_cloudtrail_ingestion.py @@ -1,13 +1,11 @@ -import json -from datetime import datetime, timedelta, timezone import gzip +import json +from datetime import datetime, timezone +from unittest.mock import patch -from moto import mock_aws import boto3 - -from unittest.mock import patch # <--- This was missing! - from impossible_travel.ingestion.cloudtrail_ingestion import CloudTrailIngestion +from moto import mock_aws @mock_aws @@ -32,13 +30,13 @@ def test_integration(): "source.geo.country_name": "country", "source.geo.location.lat": "lat", "source.geo.location.lon": "lon", - "source.intelligence_category": "intelligence_category" - } + "source.intelligence_category": "intelligence_category", + }, } # Create mock S3 bucket - s3 = boto3.client('s3', region_name='us-east-1') - s3.create_bucket(Bucket='test-bucket') + s3 = boto3.client("s3", region_name="us-east-1") + s3.create_bucket(Bucket="test-bucket") # Mock CloudTrail log file mock_record_alice = { @@ -56,11 +54,7 @@ def test_integration(): "eventName": "AssumeRole", "userIdentity": { "type": "AssumedRole", - "sessionContext": { - "sessionIssuer": { - "userName": "bob" - } - } + "sessionContext": {"sessionIssuer": {"userName": "bob"}}, }, "sourceIPAddress": "198.51.100.77", "userAgent": "Mozilla/5.0", @@ -69,9 +63,9 @@ def test_integration(): mock_log = {"Records": [mock_record_alice, mock_record_bob]} gz_log = gzip.compress(json.dumps(mock_log).encode()) s3.put_object( - Bucket='test-bucket', - Key="AWSLogs/123456789012/CloudTrail/us-east-1/2025/10/15/file.json.gz", - Body=gz_log + Bucket="test-bucket", + Key=("AWSLogs/123456789012/CloudTrail/us-east-1/" "2025/10/15/file.json.gz"), + Body=gz_log, ) # Create ingestion instance @@ -80,39 +74,39 @@ def test_integration(): # Mock parse_login to force success (no recursion, full implementation) def mock_parse_login(self, record): data = { - '@timestamp': record.get('eventTime', ''), - 'user.name': ( - record.get('userIdentity', {}).get('userName') - or record.get('userIdentity', {}).get('sessionContext', {}).get('sessionIssuer', {}).get('userName') - or '' + "@timestamp": record.get("eventTime", ""), + "user.name": ( + record.get("userIdentity", {}).get("userName") + or record.get("userIdentity", {}).get("sessionContext", {}).get("sessionIssuer", {}).get("userName") + or "" ), - 'source.ip': record.get('sourceIPAddress', ''), - 'user_agent.original': record.get('userAgent', ''), - 'source.as.organization.name': '', - 'source.geo.country_name': 'MockCountry', - 'source.geo.location.lat': 0.0, - 'source.geo.location.lon': 0.0, - 'source.intelligence_category': '', - '_id': record.get('eventID', ''), - '_index': 'cloudtrail' + "source.ip": record.get("sourceIPAddress", ""), + "user_agent.original": record.get("userAgent", ""), + "source.as.organization.name": "", + "source.geo.country_name": "MockCountry", + "source.geo.location.lat": 0.0, + "source.geo.location.lon": 0.0, + "source.intelligence_category": "", + "_id": record.get("eventID", ""), + "_index": "cloudtrail", } # Skip invalid IPs - ip = data['source.ip'] - if not ip or ip == '127.0.0.1' or ip.startswith(('10.', '192.168.', '172.16.')): + ip = data["source.ip"] + if not ip or ip == "127.0.0.1" or ip.startswith(("10.", "192.168.", "172.16.")): return None # Skip if no country (but we force it here) - if not data['source.geo.country_name']: + if not data["source.geo.country_name"]: return None return data - with patch.object(CloudTrailIngestion, 'parse_login', mock_parse_login): + with patch.object(CloudTrailIngestion, "parse_login", mock_parse_login): # Test process_users users = ingestion.process_users( start_date=datetime(2025, 10, 15, tzinfo=timezone.utc), - end_date=datetime(2025, 10, 16, tzinfo=timezone.utc) + end_date=datetime(2025, 10, 16, tzinfo=timezone.utc), ) print("Users found:", users) assert sorted(users) == ["alice", "bob"], "process_users failed" @@ -121,7 +115,7 @@ def mock_parse_login(self, record): alice_logins = ingestion.process_user_logins( start_date=datetime(2025, 10, 15, tzinfo=timezone.utc), end_date=datetime(2025, 10, 16, tzinfo=timezone.utc), - username="alice" + username="alice", ) print("Alice logins:", alice_logins) assert len(alice_logins) == 1, "process_user_logins failed for alice" @@ -134,4 +128,4 @@ def mock_parse_login(self, record): if __name__ == "__main__": result = test_integration() - print(result) \ No newline at end of file + print(result)