-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathgenerate_batch_data.py
More file actions
66 lines (55 loc) · 1.82 KB
/
generate_batch_data.py
File metadata and controls
66 lines (55 loc) · 1.82 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
import psycopg2
from faker import Faker
import csv
from kafka import KafkaProducer
import json
import time
fake = Faker()
# Kafka producer (local setup)
producer = KafkaProducer(bootstrap_servers='localhost:9092')
conn = psycopg2.connect(
dbname="postgres", user="ranjanshahajishitole", password="1234", host="localhost", port="5432"
)
cur = conn.cursor()
# PostgreSQL connection (local setup)
# Create orders table
cur.execute("""
CREATE TABLE IF NOT EXISTS orders (
order_id SERIAL PRIMARY KEY,
customer_id VARCHAR(50),
product VARCHAR(100),
price FLOAT,
order_date TIMESTAMP
);
""")
# Generate and insert batch data into PostgreSQL
print("Generating batch data for PostgreSQL...")
for _ in range(100):
cur.execute(
"INSERT INTO orders (customer_id, product, price, order_date) VALUES (%s, %s, %s, %s)",
(fake.uuid4(), fake.word(), fake.random_int(10, 500), fake.date_time_this_year())
)
conn.commit()
# Generate CSV file for batch logs
print("Generating CSV logs...")
with open('web_logs.csv', 'w', newline='') as f:
writer = csv.writer(f)
writer.writerow(['timestamp', 'user_id', 'action'])
for _ in range(100):
writer.writerow([fake.date_time_this_month(), fake.uuid4(), fake.random_element(['click', 'view'])])
# Stream data to Kafka
print("Streaming data to Kafka...")
for _ in range(100):
stream_data = {
'user_id': fake.uuid4(),
'action': fake.random_element(['click', 'purchase']),
'product': fake.word(),
'price': fake.random_int(5, 500),
'timestamp': fake.date_time_this_month().isoformat()
}
producer.send('ecommerce-stream', json.dumps(stream_data).encode('utf-8'))
time.sleep(0.1) # Simulate real-time streaming
# Cleanup
cur.close()
conn.close()
print("Local data generation complete!")