Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -192,3 +192,4 @@ cython_debug/
# refer to https://docs.cursor.com/context/ignore-files
.cursorignore
.cursorindexingignore
*.sqlite
16 changes: 4 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,26 +23,18 @@ class User(CommonBase):

# getters.py

from sqlalchemy_sync_data.getters import PostgresGetter
from sqlalchemy.engine import URL
from sqlalchemy_sync_data.getters import SQLiteGetter


class UserGetter(PostgresGetter):
class UserGetter(SQLiteGetter):
template_query = """select id, first_name, last_name, email from users"""
connection_settings = URL(
drivername="postgresql+psycopg2"
username="user",
password="password",
host="localhost",
port=5432,
database="database",
)
connection_settings = {"database": "sqlalchemy_sync_data.sqlite"}

# handlers.py

from sqlalchemy_sync_data.handlers import BaseHandler
from .getters import UserGetter
from .model import User
from .models import User

class UserHandler(BaseHandler):
model = User
Expand Down
Empty file added example/__init__.py
Empty file.
6 changes: 6 additions & 0 deletions example/getters.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from sqlalchemy_sync_data.getters import SQLiteGetter


class UserGetter(SQLiteGetter):
template_query = """select id, first_name, last_name, email from users"""
connection_settings = {"database": "sqlalchemy_sync_data.sqlite"}
17 changes: 17 additions & 0 deletions example/handlers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from sqlalchemy_sync_data.handlers import BaseHandler

from .getters import UserGetter
from .models import User


class UserHandler(BaseHandler):
model = User
db_fields_to_model_mapping = {
"id": "id",
"first_name": "first_name",
"last_name": "last_name",
"email": "username",
}
field_name_as_external_id = "id"
getter_class = UserGetter
connection_settings = {"url": "sqlite:///sqlalchemy_sync_data.sqlite"}
13 changes: 13 additions & 0 deletions example/models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from sqlalchemy import Column, Integer, String
from sqlalchemy.orm import declarative_base

CommonBase = declarative_base()


class User(CommonBase):
__tablename__ = "old_users"

id = Column(Integer, autoincrement=True, unique=True, primary_key=True, nullable=False)
first_name = Column(String(255), nullable=False)
last_name = Column(String(255), nullable=False)
username = Column(String(255), nullable=False)
7 changes: 7 additions & 0 deletions example/synchronizators.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from sqlalchemy_sync_data.synchronizator import BaseSyncronizator

from .handlers import UserHandler


class UserSyncronizator(BaseSyncronizator):
handler_classes = (UserHandler,)
22 changes: 15 additions & 7 deletions sqlalchemy_sync_data/connectors.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
import sqlite3
import threading
import typing
from datetime import datetime
Expand All @@ -25,7 +26,7 @@ class BasicConnector(threading.Thread, typing.Generic[BaseCursorType]):
to fetch data as you prefer. Also may be used as a separate thread.

Params:
connection_settings - dict with connection settings from settings.py.
connection_settings - connection settings.
query - SQL query as string, e.g. SELECT id, code, name FROM products.
output_queue - Required if running connector as separate process). Queue object to put fetching results in it.
error_message - Error message.
Expand All @@ -36,7 +37,7 @@ class BasicConnector(threading.Thread, typing.Generic[BaseCursorType]):

def __init__(
self,
connection_settings: dict,
connection_settings,
query,
output_queue: Queue = Queue(),
error_message: str = "ERROR",
Expand All @@ -51,8 +52,7 @@ def __init__(
super().__init__(**kwargs)

def make_connection_string(self):
formatted_string = "{server}{database}{db_username}{db_password}".format(**self._connection_settings)
return formatted_string
return self._connection_settings

def get_cursor(self): ...

Expand Down Expand Up @@ -98,10 +98,14 @@ def run(self):
logger.exception("The error occurred while trying to fetch data from external database")


class PostgresConnector(BasicConnector):
def make_connection_string(self):
return self._connection_settings
class SQLiteConnector(BasicConnector):
def get_cursor(self):
connect = sqlite3.connect(**self.make_connection_string())
connect.row_factory = sqlite3.Row
return connect.cursor()


class PostgresConnector(BasicConnector):
def get_cursor(self):
cursor = psycopg2.connect(**self.make_connection_string(), cursor_factory=DictCursor).cursor()
return cursor
Expand All @@ -113,6 +117,10 @@ def __init__(self, connection_settings, query, output_queue=Queue(), **kwargs):
self._ordered_fields_with_type = ordered_fields_with_type
super().__init__(connection_settings=connection_settings, query=raw_query, output_queue=output_queue, **kwargs)

def make_connection_string(self):
formatted_string = "{server}{database}{db_username}{db_password}".format(**self._connection_settings)
return formatted_string

def get_cursor(self):
return HttpClickHouseCursor.connect(
**self._connection_settings, query=self._query, ordered_fields_with_type=self._ordered_fields_with_type
Expand Down
3 changes: 1 addition & 2 deletions sqlalchemy_sync_data/cursors.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@ class BaseCursor(ABC):
connection_timeout: int

@abstractmethod
@classmethod
def connect(cls, *args, **kwargs): ...
def connect(self, *args, **kwargs): ...

@abstractmethod
def fetchone(self): ...
Expand Down
8 changes: 7 additions & 1 deletion sqlalchemy_sync_data/getters.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import typing
from string import Formatter

from .connectors import BasicConnector, HttpClickHouseConnector, PostgresConnector
from .connectors import BasicConnector, HttpClickHouseConnector, PostgresConnector, SQLiteConnector

logger = logging.getLogger(__name__)
BasicConnectorType = typing.TypeVar("BasicConnectorType", bound=BasicConnector)
Expand Down Expand Up @@ -81,6 +81,12 @@ def response(self):
return self.get_response()


class SQLiteGetter(BaseGetter):
"""Base class for getting data from SQLite."""

connector = SQLiteConnector


class PostgresGetter(BaseGetter):
"""Base class for getting data from Postgres."""

Expand Down
90 changes: 90 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
import pytest
from sqlalchemy import Column, Integer, String, create_engine
from sqlalchemy.orm import declarative_base, sessionmaker


@pytest.fixture(scope="session")
def sqlite_file_path(tmp_path_factory):
file_path = "sqlalchemy_sync_data.sqlite"
yield file_path


@pytest.fixture(scope="session")
def database_url(sqlite_file_path) -> str:
return f"sqlite:///{sqlite_file_path}"


@pytest.fixture(scope="session")
def engine(database_url):
return create_engine(database_url)


@pytest.fixture(scope="session")
def SessionLocal(engine):
return sessionmaker(engine, autoflush=True)


@pytest.fixture(scope="function")
def db_session(engine, SessionLocal, Base):
Base.metadata.drop_all(engine)
Base.metadata.create_all(engine)

with SessionLocal() as session:
yield session

Base.metadata.drop_all(engine)


@pytest.fixture(scope="session")
def Base():
return declarative_base()


@pytest.fixture(scope="session")
def User(Base):
class User(Base): # type: ignore[misc, valid-type]
__tablename__ = "users"

id = Column(Integer, autoincrement=True, unique=True, primary_key=True, nullable=False)
first_name = Column(String(255), nullable=False)
last_name = Column(String(255), nullable=False)
email = Column(String(255), nullable=False)

return User


@pytest.fixture(scope="session")
def OldUser(Base):
class OldUser(Base): # type: ignore[misc, valid-type]
__tablename__ = "old_users"

id = Column(Integer, autoincrement=True, unique=True, primary_key=True, nullable=False)
first_name = Column(String(255), nullable=False)
last_name = Column(String(255), nullable=False)
username = Column(String(255), nullable=False)

return OldUser


@pytest.fixture(scope="function")
def users(db_session, User, OldUser):
user_instances = [
User(
first_name="User1",
last_name="User1",
email="user1@mail.com",
),
User(
first_name="User2",
last_name="User2",
email="user2@mail.com",
),
User(
first_name="User3",
last_name="User3",
email="user3@mail.com",
),
]
db_session.add_all(user_instances)
db_session.commit()
yield user_instances
1 change: 0 additions & 1 deletion tests/test_handlers.py

This file was deleted.

22 changes: 22 additions & 0 deletions tests/test_synchronizators.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from sqlalchemy import func

from example.models import User
from example.synchronizators import UserSyncronizator


def test_syncronizator(users, db_session):
syncronizator = UserSyncronizator()
syncronizator.run()

assert db_session.query(func.count(User.id)).scalar() == len(users)

users_with_id = {instance.id: instance for instance in users}
old_users_with_id = {instance.id: instance for instance in db_session.query(User).all()}

for _id, user in users_with_id.items():
old_user = old_users_with_id[_id]

for field_name in ("id", "first_name", "last_name"):
assert getattr(user, field_name) == getattr(old_user, field_name)

assert user.email == old_user.username
Loading