From befb3238ef1085ca3bc5cdf18f021463f5392ca8 Mon Sep 17 00:00:00 2001 From: "Sergey V. Elfimov" Date: Tue, 17 Jun 2025 15:46:19 +0300 Subject: [PATCH] feat (test, readme, docs): Add tests, example, sqlite source sync and change readme. --- .gitignore | 1 + README.md | 16 ++---- example/__init__.py | 0 example/getters.py | 6 ++ example/handlers.py | 17 ++++++ example/models.py | 13 +++++ example/synchronizators.py | 7 +++ sqlalchemy_sync_data/connectors.py | 22 +++++--- sqlalchemy_sync_data/cursors.py | 3 +- sqlalchemy_sync_data/getters.py | 8 ++- tests/conftest.py | 90 ++++++++++++++++++++++++++++++ tests/test_handlers.py | 1 - tests/test_synchronizators.py | 22 ++++++++ 13 files changed, 183 insertions(+), 23 deletions(-) create mode 100644 example/__init__.py create mode 100644 example/getters.py create mode 100644 example/handlers.py create mode 100644 example/models.py create mode 100644 example/synchronizators.py delete mode 100644 tests/test_handlers.py create mode 100644 tests/test_synchronizators.py diff --git a/.gitignore b/.gitignore index 7c5b80b..1308d05 100644 --- a/.gitignore +++ b/.gitignore @@ -192,3 +192,4 @@ cython_debug/ # refer to https://docs.cursor.com/context/ignore-files .cursorignore .cursorindexingignore +*.sqlite diff --git a/README.md b/README.md index e863019..98b03f6 100644 --- a/README.md +++ b/README.md @@ -23,26 +23,18 @@ class User(CommonBase): # getters.py -from sqlalchemy_sync_data.getters import PostgresGetter -from sqlalchemy.engine import URL +from sqlalchemy_sync_data.getters import SQLiteGetter -class UserGetter(PostgresGetter): +class UserGetter(SQLiteGetter): template_query = """select id, first_name, last_name, email from users""" - connection_settings = URL( - drivername="postgresql+psycopg2" - username="user", - password="password", - host="localhost", - port=5432, - database="database", - ) + connection_settings = {"database": "sqlalchemy_sync_data.sqlite"} # handlers.py from sqlalchemy_sync_data.handlers import BaseHandler from .getters import UserGetter -from .model import User +from .models import User class UserHandler(BaseHandler): model = User diff --git a/example/__init__.py b/example/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/example/getters.py b/example/getters.py new file mode 100644 index 0000000..4f9a01e --- /dev/null +++ b/example/getters.py @@ -0,0 +1,6 @@ +from sqlalchemy_sync_data.getters import SQLiteGetter + + +class UserGetter(SQLiteGetter): + template_query = """select id, first_name, last_name, email from users""" + connection_settings = {"database": "sqlalchemy_sync_data.sqlite"} diff --git a/example/handlers.py b/example/handlers.py new file mode 100644 index 0000000..8ba420f --- /dev/null +++ b/example/handlers.py @@ -0,0 +1,17 @@ +from sqlalchemy_sync_data.handlers import BaseHandler + +from .getters import UserGetter +from .models import User + + +class UserHandler(BaseHandler): + model = User + db_fields_to_model_mapping = { + "id": "id", + "first_name": "first_name", + "last_name": "last_name", + "email": "username", + } + field_name_as_external_id = "id" + getter_class = UserGetter + connection_settings = {"url": "sqlite:///sqlalchemy_sync_data.sqlite"} diff --git a/example/models.py b/example/models.py new file mode 100644 index 0000000..cc27713 --- /dev/null +++ b/example/models.py @@ -0,0 +1,13 @@ +from sqlalchemy import Column, Integer, String +from sqlalchemy.orm import declarative_base + +CommonBase = declarative_base() + + +class User(CommonBase): + __tablename__ = "old_users" + + id = Column(Integer, autoincrement=True, unique=True, primary_key=True, nullable=False) + first_name = Column(String(255), nullable=False) + last_name = Column(String(255), nullable=False) + username = Column(String(255), nullable=False) diff --git a/example/synchronizators.py b/example/synchronizators.py new file mode 100644 index 0000000..2cf3164 --- /dev/null +++ b/example/synchronizators.py @@ -0,0 +1,7 @@ +from sqlalchemy_sync_data.synchronizator import BaseSyncronizator + +from .handlers import UserHandler + + +class UserSyncronizator(BaseSyncronizator): + handler_classes = (UserHandler,) diff --git a/sqlalchemy_sync_data/connectors.py b/sqlalchemy_sync_data/connectors.py index 54a922e..2964989 100644 --- a/sqlalchemy_sync_data/connectors.py +++ b/sqlalchemy_sync_data/connectors.py @@ -1,4 +1,5 @@ import logging +import sqlite3 import threading import typing from datetime import datetime @@ -25,7 +26,7 @@ class BasicConnector(threading.Thread, typing.Generic[BaseCursorType]): to fetch data as you prefer. Also may be used as a separate thread. Params: - connection_settings - dict with connection settings from settings.py. + connection_settings - connection settings. query - SQL query as string, e.g. SELECT id, code, name FROM products. output_queue - Required if running connector as separate process). Queue object to put fetching results in it. error_message - Error message. @@ -36,7 +37,7 @@ class BasicConnector(threading.Thread, typing.Generic[BaseCursorType]): def __init__( self, - connection_settings: dict, + connection_settings, query, output_queue: Queue = Queue(), error_message: str = "ERROR", @@ -51,8 +52,7 @@ def __init__( super().__init__(**kwargs) def make_connection_string(self): - formatted_string = "{server}{database}{db_username}{db_password}".format(**self._connection_settings) - return formatted_string + return self._connection_settings def get_cursor(self): ... @@ -98,10 +98,14 @@ def run(self): logger.exception("The error occurred while trying to fetch data from external database") -class PostgresConnector(BasicConnector): - def make_connection_string(self): - return self._connection_settings +class SQLiteConnector(BasicConnector): + def get_cursor(self): + connect = sqlite3.connect(**self.make_connection_string()) + connect.row_factory = sqlite3.Row + return connect.cursor() + +class PostgresConnector(BasicConnector): def get_cursor(self): cursor = psycopg2.connect(**self.make_connection_string(), cursor_factory=DictCursor).cursor() return cursor @@ -113,6 +117,10 @@ def __init__(self, connection_settings, query, output_queue=Queue(), **kwargs): self._ordered_fields_with_type = ordered_fields_with_type super().__init__(connection_settings=connection_settings, query=raw_query, output_queue=output_queue, **kwargs) + def make_connection_string(self): + formatted_string = "{server}{database}{db_username}{db_password}".format(**self._connection_settings) + return formatted_string + def get_cursor(self): return HttpClickHouseCursor.connect( **self._connection_settings, query=self._query, ordered_fields_with_type=self._ordered_fields_with_type diff --git a/sqlalchemy_sync_data/cursors.py b/sqlalchemy_sync_data/cursors.py index 27ff58b..ccee19f 100644 --- a/sqlalchemy_sync_data/cursors.py +++ b/sqlalchemy_sync_data/cursors.py @@ -7,8 +7,7 @@ class BaseCursor(ABC): connection_timeout: int @abstractmethod - @classmethod - def connect(cls, *args, **kwargs): ... + def connect(self, *args, **kwargs): ... @abstractmethod def fetchone(self): ... diff --git a/sqlalchemy_sync_data/getters.py b/sqlalchemy_sync_data/getters.py index 32ef3f2..4af075f 100644 --- a/sqlalchemy_sync_data/getters.py +++ b/sqlalchemy_sync_data/getters.py @@ -2,7 +2,7 @@ import typing from string import Formatter -from .connectors import BasicConnector, HttpClickHouseConnector, PostgresConnector +from .connectors import BasicConnector, HttpClickHouseConnector, PostgresConnector, SQLiteConnector logger = logging.getLogger(__name__) BasicConnectorType = typing.TypeVar("BasicConnectorType", bound=BasicConnector) @@ -81,6 +81,12 @@ def response(self): return self.get_response() +class SQLiteGetter(BaseGetter): + """Base class for getting data from SQLite.""" + + connector = SQLiteConnector + + class PostgresGetter(BaseGetter): """Base class for getting data from Postgres.""" diff --git a/tests/conftest.py b/tests/conftest.py index e69de29..22edef6 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -0,0 +1,90 @@ +import pytest +from sqlalchemy import Column, Integer, String, create_engine +from sqlalchemy.orm import declarative_base, sessionmaker + + +@pytest.fixture(scope="session") +def sqlite_file_path(tmp_path_factory): + file_path = "sqlalchemy_sync_data.sqlite" + yield file_path + + +@pytest.fixture(scope="session") +def database_url(sqlite_file_path) -> str: + return f"sqlite:///{sqlite_file_path}" + + +@pytest.fixture(scope="session") +def engine(database_url): + return create_engine(database_url) + + +@pytest.fixture(scope="session") +def SessionLocal(engine): + return sessionmaker(engine, autoflush=True) + + +@pytest.fixture(scope="function") +def db_session(engine, SessionLocal, Base): + Base.metadata.drop_all(engine) + Base.metadata.create_all(engine) + + with SessionLocal() as session: + yield session + + Base.metadata.drop_all(engine) + + +@pytest.fixture(scope="session") +def Base(): + return declarative_base() + + +@pytest.fixture(scope="session") +def User(Base): + class User(Base): # type: ignore[misc, valid-type] + __tablename__ = "users" + + id = Column(Integer, autoincrement=True, unique=True, primary_key=True, nullable=False) + first_name = Column(String(255), nullable=False) + last_name = Column(String(255), nullable=False) + email = Column(String(255), nullable=False) + + return User + + +@pytest.fixture(scope="session") +def OldUser(Base): + class OldUser(Base): # type: ignore[misc, valid-type] + __tablename__ = "old_users" + + id = Column(Integer, autoincrement=True, unique=True, primary_key=True, nullable=False) + first_name = Column(String(255), nullable=False) + last_name = Column(String(255), nullable=False) + username = Column(String(255), nullable=False) + + return OldUser + + +@pytest.fixture(scope="function") +def users(db_session, User, OldUser): + user_instances = [ + User( + first_name="User1", + last_name="User1", + email="user1@mail.com", + ), + User( + first_name="User2", + last_name="User2", + email="user2@mail.com", + ), + User( + first_name="User3", + last_name="User3", + email="user3@mail.com", + ), + ] + db_session.add_all(user_instances) + db_session.commit() + yield user_instances diff --git a/tests/test_handlers.py b/tests/test_handlers.py deleted file mode 100644 index 590dff9..0000000 --- a/tests/test_handlers.py +++ /dev/null @@ -1 +0,0 @@ -def test_first(): ... diff --git a/tests/test_synchronizators.py b/tests/test_synchronizators.py new file mode 100644 index 0000000..f5f3c12 --- /dev/null +++ b/tests/test_synchronizators.py @@ -0,0 +1,22 @@ +from sqlalchemy import func + +from example.models import User +from example.synchronizators import UserSyncronizator + + +def test_syncronizator(users, db_session): + syncronizator = UserSyncronizator() + syncronizator.run() + + assert db_session.query(func.count(User.id)).scalar() == len(users) + + users_with_id = {instance.id: instance for instance in users} + old_users_with_id = {instance.id: instance for instance in db_session.query(User).all()} + + for _id, user in users_with_id.items(): + old_user = old_users_with_id[_id] + + for field_name in ("id", "first_name", "last_name"): + assert getattr(user, field_name) == getattr(old_user, field_name) + + assert user.email == old_user.username