Name: Aman Panigrahi CNetID: amanpanigrahi23
For this assignment, I built a data pipeline to move records from a MySQL production database (Sakila) into a SQLite database designed for analytics. I used SQLAlchemy ORM to handle the impedance mismatch between the database tables and my Python code. Following the Data Mapper pattern, I kept the database logic separate from the application to make it easier to maintain. The target database uses a simplified structure to flatten complex joins into easier-to-read dimension and fact tables.
To run this project, ensure your environment is configured as follows:
- Virtual Environment:
python -m venv venv ./venv/Scripts/activate # Windows pip install -r requirements.txt - Environment Variables & Connection Strings:
The connection settings are managed in
db/session.py. You must update theSOURCE_URLto include your local MySQL credentials:SOURCE_URL:mysql+mysqlconnector://root:YOUR_PASSWORD@localhost/sakilaTARGET_URL:sqlite:///analytics.db
The analytics database uses a Star Schema to reduce join complexity. I chose SQLite for the target because it is a fast, self-contained, and portable RDBMS that requires no server setup.
I used an analytics-focused schema to make joining tables easier. Instead of jumping between many small tables, I flattened the data into clear dimensions like customers and films and facts like rentals. I chose SQLite for the target database because it’s fast, portable, and doesn't need a server to run—everything is just stored in one file.
- I joined
Address,City, andCountryinto a singleDimCustomertable using the ORM to avoid multi-table joins during analysis. - I utilized SQLite's dynamic typing system, where types are associated with values rather than columns.
- By using SQLAlchemy, I decoupled the business logic from the database interaction, making the system more maintainable and easier to test.
The pipeline is managed via a command-line interface built with the Click library.
- Initialize Database:
python cli.py init
- Creates the
analytics.dbfile and all tables defined in the ORM models. - Populates the
dim_datetable for time-series analysis.
- Full Data Load:
python cli.py full-load
- Performs a one-time migration of all records from MySQL to the analytics tables.
- Incremental Sync:
python cli.py incremental
- Uses a
sync_statetable to identify and sync only records modified since the last run.
- Data Validation:
python cli.py validate
- Compares row counts between source and target to verify data integrity.
I used pytest to verify the pipeline logic.
- Command:
python -m pytest - Verification: The test suite confirms successful initialization, full-loading, and the accuracy of the incremental sync logic (handling both new and updated records).
Results: 5 passed, 0 failed.
