This tutorial guides you through the main features of PyETWkit.
pip install pyetwkitPyETWkit provides a command-line interface for quick ETW monitoring:
# List available providers
pyetwkit providers
# Search for specific providers
pyetwkit providers --search DNS
# List available profiles
pyetwkit profiles
# Listen to events (requires admin)
pyetwkit listen Microsoft-Windows-DNS-Client
# Use a profile
pyetwkit listen --profile networkfrom pyetwkit._core import EtwProvider, EtwSession
# Create a session
session = EtwSession("MySession")
# Add a provider
provider = EtwProvider(
"Microsoft-Windows-DNS-Client",
"Microsoft-Windows-DNS-Client"
)
provider = provider.with_level(4) # Info level
session.add_provider(provider)
# Start and process events
session.start()
try:
while True:
event = session.next_event_timeout(1000)
if event:
print(f"Event {event.event_id}: {event.provider_name}")
except KeyboardInterrupt:
pass
finally:
session.stop()An ETW session is a connection to the Windows Event Tracing infrastructure. Each session must have a unique name.
from pyetwkit._core import EtwSession
session = EtwSession("UniqueSessionName")Providers are the sources of ETW events. Each provider has a name and GUID.
from pyetwkit._core import EtwProvider
# Using provider name
provider = EtwProvider("Microsoft-Windows-DNS-Client", "DNS-Client")
# Set trace level (1=Critical, 2=Error, 3=Warning, 4=Info, 5=Verbose)
provider = provider.with_level(4)
# Enable all keywords (event categories)
provider = provider.with_any_keyword(0xFFFFFFFF)Find available providers on the system:
from pyetwkit._core import list_providers, search_providers, get_provider_info
# List all providers
providers = list_providers()
for p in providers[:10]:
print(f"{p.name}: {p.guid}")
# Search by name
kernel_providers = search_providers("Kernel")
# Get detailed info
info = get_provider_info("Microsoft-Windows-DNS-Client")
if info:
print(f"GUID: {info.guid}")Profiles are pre-configured sets of providers for common use cases:
from pyetwkit.profiles import list_profiles, get_profile
# List available profiles
for profile in list_profiles():
print(f"{profile.name}: {profile.description}")
# Get a specific profile
network = get_profile("network")
if network:
for p in network.providers:
print(f" {p.name}")Monitor kernel-level events like process creation:
from pyetwkit._core import PyKernelFlags, PyKernelSession
# Configure kernel flags
flags = PyKernelFlags()
flags = flags.with_process() # Process events
flags = flags.with_thread() # Thread events
flags = flags.with_image_load() # DLL/module loads
# Create and start session
session = PyKernelSession(flags)
session.start()
try:
while True:
event = session.next_event_timeout(1000)
if event:
if event.event_id == 1: # Process start
props = event.to_dict().get("properties", {})
print(f"Process started: {props.get('ImageFileName')}")
except KeyboardInterrupt:
pass
finally:
session.stop()Read events from saved ETL (Event Trace Log) files:
from pyetwkit._core import EtlReader
reader = EtlReader("trace.etl")
for event in reader.events():
print(f"[{event.timestamp}] {event.provider_name} Event {event.event_id}")Export captured events to various formats:
from pyetwkit.export import to_csv, to_json, to_jsonl, to_parquet
# Collect events
events = []
for event in reader.events():
events.append(event)
# Export to different formats
to_csv(events, "events.csv")
to_json(events, "events.json", indent=2)
to_jsonl(events, "events.jsonl")
to_parquet(events, "events.parquet") # Requires pyarrowExport ETL files from the command line:
# Export to CSV
pyetwkit export trace.etl -o events.csv
# Export to Parquet
pyetwkit export trace.etl -o events.parquet -f parquet
# Filter by provider
pyetwkit export trace.etl -o filtered.csv -p Microsoft-Windows-DNS-Client
# Limit number of events
pyetwkit export trace.etl -o sample.json -f json --limit 1000Each event contains:
event_id: Event type identifierprovider_name: Source provider nameprovider_id: Provider GUIDtimestamp: When the event occurredprocess_id: Source process IDthread_id: Source thread IDproperties: Event-specific data
event_dict = event.to_dict()
print(f"Event ID: {event_dict['event_id']}")
print(f"Properties: {event_dict.get('properties', {})}")- Use unique session names - Avoid conflicts with other ETW consumers
- Stop sessions properly - Always call
session.stop()in a finally block - Filter at source - Use trace levels and keywords to reduce event volume
- Run as administrator - ETW sessions require elevated privileges
- Handle timeouts - Use
next_event_timeout()for non-blocking operation
ETW sessions require administrator privileges. Run your script or terminal as Administrator.
If a previous session wasn't properly stopped, clean it up:
logman stop MySession -ets- Check that the provider name is correct
- Ensure the trace level includes the events you want
- Verify the application generating events is running