Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
209 changes: 82 additions & 127 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
<img src="https://github.com/glassflow/glassflow-python-sdk/workflows/Test/badge.svg?labelColor=&color=e69e3a">
</a>
<!-- Pytest Coverage Comment:Begin -->
<img src=https://img.shields.io/badge/coverage-90%25-brightgreen>
<img src=https://img.shields.io/badge/coverage-92%25-brightgreen>
<!-- Pytest Coverage Comment:End -->
</p>

Expand All @@ -24,9 +24,12 @@ A Python SDK for creating and managing data pipelines between Kafka and ClickHou
## Features

- Create and manage data pipelines between Kafka and ClickHouse
- Ingest from Kafka topics or OTLP signals (logs, metrics, traces)
- Deduplication of events during a time window based on a key
- Temporal joins between topics based on a common key with a given time window
- Per-topic Schema Registry integration
- Schema validation and configuration management
- Fine-grained resource control per pipeline component

## Installation

Expand All @@ -41,102 +44,74 @@ pip install glassflow
```python
from glassflow.etl import Client

# Initialize GlassFlow client
client = Client(host="your-glassflow-etl-url")
```

### Create a pipeline

The example below uses pipeline version `v3`. See [Migrating from V2 to V3](#migrating-from-v2-to-v3) if you have existing `v2` configurations.

```python
pipeline_config = {
"version": "v2",
"version": "v3",
"pipeline_id": "my-pipeline-id",
"source": {
"type": "kafka",
"connection_params": {
"brokers": [
"http://my.kafka.broker:9093"
],
"protocol": "PLAINTEXT",
"mechanism": "NO_AUTH"
},
"topics": [
{
"consumer_group_initial_offset": "latest",
"name": "users",
"deduplication": {
"enabled": True,
"id_field": "event_id",
"id_field_type": "string",
"time_window": "1h"
}
}
]
},
"join": {
"enabled": False
"type": "kafka",
"connection_params": {
"brokers": ["http://my.kafka.broker:9093"],
"protocol": "PLAINTEXT",
"mechanism": "NO_AUTH"
},
"topics": [
{
"id": "users",
"name": "users",
"consumer_group_initial_offset": "latest",
"deduplication": {
"enabled": True,
"key": "event_id",
"time_window": "1h"
},
"schema_fields": [
{"name": "event_id", "type": "string"},
{"name": "user_id", "type": "string"},
{"name": "created_at", "type": "string"},
{"name": "name", "type": "string"},
{"name": "email", "type": "string"}
]
}
]
},
"join": {"enabled": False},
"sink": {
"type": "clickhouse",
"host": "http://my.clickhouse.server",
"port": "9000",
"database": "default",
"username": "default",
"password": "c2VjcmV0",
"secure": False,
"max_batch_size": 1000,
"max_delay_time": "30s",
"table": "users_dedup"
},
"schema": {
"fields": [
{
"source_id": "users",
"name": "event_id",
"type": "string",
"column_name": "event_id",
"column_type": "UUID"
"type": "clickhouse",
"source_id": "users",
"connection_params": {
"host": "http://my.clickhouse.server",
"port": "9000",
"database": "default",
"username": "default",
"password": "mysecret",
"secure": False
},
{
"source_id": "users",
"field_name": "user_id",
"column_name": "user_id",
"column_type": "UUID"
},
{
"source_id": "users",
"name": "created_at",
"type": "string",
"column_name": "created_at",
"column_type": "DateTime"
},
{
"source_id": "users",
"name": "name",
"type": "string",
"column_name": "name",
"column_type": "String"
},
{
"source_id": "users",
"name": "email",
"type": "string",
"column_name": "email",
"column_type": "String"
}
]
"mapping": [
{"name": "event_id", "column_name": "event_id", "column_type": "UUID"},
{"name": "user_id", "column_name": "user_id", "column_type": "UUID"},
{"name": "created_at", "column_name": "created_at", "column_type": "DateTime"},
{"name": "name", "column_name": "name", "column_type": "String"},
{"name": "email", "column_name": "email", "column_type": "String"}
]
}
}

# Create a pipeline
pipeline = client.create_pipeline(pipeline_config)
```

For full configuration reference — including Schema Registry, joins, OTLP sources, and resource controls — see the [GlassFlow docs](https://docs.glassflow.dev/configuration/pipeline-json-reference).

## Get pipeline
### Get pipeline

```python
# Get a pipeline by ID
pipeline = client.get_pipeline("my-pipeline-id")
```

Expand All @@ -145,87 +120,67 @@ pipeline = client.get_pipeline("my-pipeline-id")
```python
pipelines = client.list_pipelines()
for pipeline in pipelines:
print(f"Pipeline ID: {pipeline['pipeline_id']}")
print(f"Name: {pipeline['name']}")
print(f"Transformation Type: {pipeline['transformation_type']}")
print(f"Created At: {pipeline['created_at']}")
print(f"State: {pipeline['state']}")
print(f"Pipeline ID: {pipeline['pipeline_id']}, State: {pipeline['state']}")
```

### Stop / Terminate / Resume Pipeline
### Stop / Terminate / Resume pipeline

```python
pipeline = client.get_pipeline("my-pipeline-id")
pipeline.stop()
print(pipeline.status)
```

```
STOPPING
pipeline.stop() # graceful stop → STOPPING
client.stop_pipeline("my-pipeline-id", terminate=True) # ungraceful → TERMINATING
pipeline.resume() # restart → RESUMING
```

```python
# Stop a pipeline ungracefully (terminate)
client.stop_pipeline("my-pipeline-id", terminate=True)
print(pipeline.status)
```
### Delete pipeline

```
TERMINATING
```
Only stopped or terminated pipelines can be deleted.

```python
pipeline = client.get_pipeline("my-pipeline-id")
pipeline.resume()
print(pipeline.status)
```

```
RESUMING
client.delete_pipeline("my-pipeline-id")
# or
pipeline.delete()
```

### Delete pipeline
## Migrating from V2 to V3

Only stopped or terminated pipelines can be deleted.
Pipeline version `v2` has been removed. Use `migrate_pipeline_v2_to_v3()` to convert an existing configuration automatically:

```python
# Delete a pipeline
client.delete_pipeline("my-pipeline-id")
from glassflow.etl import migrate_pipeline_v2_to_v3

# Or delete via pipeline instance
pipeline.delete()
v3_config = migrate_pipeline_v2_to_v3(v2_config)
pipeline = client.create_pipeline(v3_config)
```

## Pipeline Configuration
If you prefer to migrate manually, the key changes are:

For detailed information about the pipeline configuration, see [GlassFlow docs](https://docs.glassflow.dev/configuration/pipeline-json-reference).
| Area | V2 | V3 |
|------|----|----|
| `version` | `"v2"` | `"v3"` |
| Topics | no `id` field | `id: "<topic-name>"` required |
| Schema | top-level `schema.fields` block | `source.topics[].schema_fields` per topic |
| Sink connection | flat fields (`host`, `port`, …) at top level | nested `sink.connection_params` object |
| Sink field mapping | top-level `schema.fields` with `source_id` | `sink.mapping` list of `{name, column_name, column_type}` |
| Deduplication key | `id_field` | `key` |
| Join key | `join_key` | `key` |
| Sink password | base64-encoded | plain text |

## Tracking

The SDK includes anonymous usage tracking to help improve the product. Tracking is enabled by default but can be disabled in two ways:
The SDK includes anonymous usage stats collection to help improve the product. It collects non-identifying information such as SDK version, Python version, and feature flags (e.g., whether joins or deduplication are enabled). No personally identifiable information is collected.

Usage states collection is enabled by default. To disable it:

1. Using an environment variable:
```bash
export GF_TRACKING_ENABLED=false
export GF_USAGESTATS_ENABLED=false
```

2. Programmatically using the `disable_tracking` method:
```python
from glassflow.etl import Client

client = Client(host="my-glassflow-host")
client.disable_tracking()
client.disable_usagestats()
```

The tracking collects anonymous information about:
- SDK version
- Platform (operating system)
- Python version
- Pipeline ID
- Whether joins or deduplication are enabled
- Kafka security protocol, auth mechanism used and whether authentication is disabled
- Errors during pipeline creation and deletion

## Development

### Setup
Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
3.8.0
4.0.0
2 changes: 2 additions & 0 deletions src/glassflow/etl/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
SourceConfig,
)
from .pipeline import Pipeline
from .utils import migrate_pipeline_v2_to_v3

__all__ = [
"Pipeline",
Expand All @@ -20,4 +21,5 @@
"SourceConfig",
"SinkConfig",
"JoinConfig",
"migrate_pipeline_v2_to_v3",
]
2 changes: 1 addition & 1 deletion src/glassflow/etl/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,6 @@ def delete_pipeline(self, pipeline_id: str) -> None:
"""
Pipeline(host=self.host, pipeline_id=pipeline_id).delete()

def disable_tracking(self) -> None:
def disable_usagestats(self) -> None:
"""Disable tracking of pipeline events."""
self._tracking.enabled = False
Loading
Loading