Conversation
|
Warning Rate limit exceeded
Your organization is not enrolled in usage-based pricing. Contact your admin to enable usage-based pricing to continue reviews beyond the rate limit, or try again in 21 minutes and 43 seconds. ⌛ How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. ℹ️ Review info⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (8)
WalkthroughPostgres connection reuse and retry infrastructure introduced via a new PostgresBase; ReaderPostgres and WriterPostgres now reuse a cached connection and execute preloaded SQL. Tests and SQL files added; minor config/docs updates for Dependabot and copilot instructions. Changes
Sequence Diagram(s)sequenceDiagram
participant Lambda as Lambda Handler
participant PostgresBase as PostgresBase
participant Secrets as AWS SecretsManager
participant Postgres as PostgreSQL
Lambda->>PostgresBase: request connection / run operation (read/write)
alt first access
PostgresBase->>Secrets: load_postgres_config(secret_name, region)
Secrets-->>PostgresBase: secret JSON
PostgresBase-->>Postgres: psycopg2.connect(options, **cfg)
Postgres-->>PostgresBase: connection
PostgresBase-->>Lambda: provide cached connection
else cached connection exists
PostgresBase-->>Lambda: return cached connection
end
Lambda->>Postgres: execute SQL (via cached connection)
alt OperationalError
Postgres-->>PostgresBase: OperationalError
PostgresBase->>PostgresBase: close cached connection, retry (once)
PostgresBase-->>Postgres: connect again
Postgres-->>PostgresBase: connection
PostgresBase-->>Lambda: retry operation
end
Postgres-->>Lambda: query result / acknowledgment
Estimated code review effort🎯 4 (Complex) | ⏱️ ~50 minutes Possibly related PRs
Suggested labels
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 3 | ❌ 2❌ Failed checks (1 warning, 1 inconclusive)
✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
🧹 Nitpick comments (2)
tests/unit/readers/test_reader_postgres.py (1)
385-405: Unused variablepaginationflagged by static analysis.The variable
paginationon line 402 is unpacked but never used. Consider using an underscore prefix to indicate it's intentionally unused.🔧 Suggested fix
- rows, pagination = reader.read_stats(limit=10) + rows, _pagination = reader.read_stats(limit=10)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/unit/readers/test_reader_postgres.py` around lines 385 - 405, The test test_retries_on_operational_error unpacks reader.read_stats into rows, pagination but never uses pagination; rename the unused variable to _pagination (or simply use an underscore `_`) to satisfy static analysis and indicate intentional non-use. Update the line that calls reader.read_stats in test_retries_on_operational_error to unpack as rows, _pagination (or rows, _) so the test behavior (asserting connect call count and rows) remains unchanged while eliminating the unused-variable warning.src/writers/writer_postgres.py (1)
60-60: Consider adding explicit connection cleanup for non-Lambda deployments.The cached connection has no explicit
close()mechanism. While this works well for Lambda (connections naturally close when the container terminates), long-running processes or integration tests may benefit from explicit cleanup. TheWriterbase class could be extended with an optionalclose()method.This is not blocking for the current Lambda use case.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/writers/writer_postgres.py` at line 60, Add an explicit cleanup hook that closes the cached DB connection: extend the Writer base class with an optional close() method and implement it in the Postgres writer to call and null out self._connection.close() (or await if async) when a connection exists; update any connection-creating methods that set self._connection to ensure close() will be safe to call, and add a brief unit test or integration cleanup call to exercise Writer.close() in long-running tests or processes to avoid leaked connections.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@src/writers/writer_postgres.py`:
- Line 60: Add an explicit cleanup hook that closes the cached DB connection:
extend the Writer base class with an optional close() method and implement it in
the Postgres writer to call and null out self._connection.close() (or await if
async) when a connection exists; update any connection-creating methods that set
self._connection to ensure close() will be safe to call, and add a brief unit
test or integration cleanup call to exercise Writer.close() in long-running
tests or processes to avoid leaked connections.
In `@tests/unit/readers/test_reader_postgres.py`:
- Around line 385-405: The test test_retries_on_operational_error unpacks
reader.read_stats into rows, pagination but never uses pagination; rename the
unused variable to _pagination (or simply use an underscore `_`) to satisfy
static analysis and indicate intentional non-use. Update the line that calls
reader.read_stats in test_retries_on_operational_error to unpack as rows,
_pagination (or rows, _) so the test behavior (asserting connect call count and
rows) remains unchanged while eliminating the unused-variable warning.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 6c04b6fc-8b9c-4a86-a7cd-246694e57bde
📒 Files selected for processing (7)
.github/copilot-instructions.md.github/dependabot.ymlsrc/readers/reader_postgres.pysrc/writers/writer_postgres.pytests/integration/test_connection_reuse.pytests/unit/readers/test_reader_postgres.pytests/unit/writers/test_writer_postgres.py
| self._secret_name = os.environ.get("POSTGRES_SECRET_NAME", "") | ||
| self._secret_region = os.environ.get("POSTGRES_SECRET_REGION", "") | ||
| self._db_config: dict[str, Any] | None = None | ||
| self._connection: Any | None = None |
There was a problem hiding this comment.
If we would use strict mode of mypy, I am not sure this would be allowed / without warning. It's nice that you use types, but having a type that has an optional Any value does not say that much
There was a problem hiding this comment.
I tried to fix this pattern in the PR. In some cases this is not really possible. For example in this one, the connection is created dynamically, so it has no exact type, as I found. Look at the current solution. I tried to create a types, where possible and leave a comment in cases, where the Any has its place IMO.
| raise RuntimeError("Failed to load database configuration.") | ||
| return config | ||
|
|
||
| def _get_connection(self) -> Any: |
There was a problem hiding this comment.
This is pretty common pattern actually - there is from functools import cached_property for these things exactly - caching a property for a class. It's more native and intuitive. Pseudo-code:
class DB:
@cached_property
def conn(self):
return create_connection()There was a problem hiding this comment.
and it should be also lazy by default, so the connection would be established on the first call, not eagerly
There was a problem hiding this comment.
I implemented that approach few times in the current solution see new commits.
| user=db_config["user"], | ||
| password=db_config["password"], | ||
| port=db_config["port"], | ||
| options="-c statement_timeout=30000 -c default_transaction_read_only=on", |
There was a problem hiding this comment.
Consider to extract this 30s timeout somewhere into a constant
There was a problem hiding this comment.
I have refactored the solution, so this timeout is now in the constants.py file. See the new version please.
There was a problem hiding this comment.
I saw this call earlier already, hmm. I might revisit the whole structure actually
There was a problem hiding this comment.
But this should not belong here - DB config loading and validation - what's implemented in the next few lines. Why not to put this into a 'get_conn' method or so?
There was a problem hiding this comment.
See new state. There was a PostgresBase class created, which holds some of the logic, so the logic is more distributed.
There was a problem hiding this comment.
I saw these queries. Couple of upgrade ideas, if you want, you can either (ordered from low to higher effort & practice):
- put them at least into triple double-quotes, not like this.
""" query here """i.e. multi-line strings are perfect for this. - put them into a separated sql file and load it. Combining SQL and Python is a bad practice. In simple projects like this it's okay but as projects scale this is not maintainable (not to mention to typical engineering practices - formatting, testing, discovery of these SQL files)
- put them into a separated JINJA2 file - with this, you can parametrize it from Python
- my most favourite option - AIOSQL: https://nackjicholson.github.io/aiosql/, also the idea here is to have a separated file with the SQL that you load and work with
There was a problem hiding this comment.
[TL;DR, but please read the above still]
on the last option, AIOSQL: https://nackjicholson.github.io/aiosql/
I think that this is actually the simplest, most clear, and fastest option at the same time (I elaborated the previous options mostly for a quick ideas list how I saw it being done in projects across years and years of doing this). If you are bored with doing this manually, just tell this to the CoPilot and it might blow you away how clean and quick solution it can give you (and also it's a good exercise for us to work on our 'AI agentic / context engineering' muscle memory :D), preferably using Opus 4.6 for this. Or even, 4.7 if it's available - iut was released 2 days ago :) https://www.anthropic.com/news/claude-opus-4-7
There was a problem hiding this comment.
Thanks! That was whole new way, how to handle sql files inside the python file. I implemented that solution, see the result please.
| raw_rows = db_cursor.fetchall() | ||
| connection.rollback() | ||
| break | ||
| except OperationalError as exc: |
There was a problem hiding this comment.
this method does way too much. It loads the DB config, validates it, performs retries, manipulates with cursor, unpacks and post-processes the values. Split it please, it's hard to read, hard to test, hard to extend.
My advice: you try first. Think about it, reason about it in your head - what are the responsibilities of individual logical execution code blocks? You will need more methods - how much, it might tell you the previous question (if it's too much, say 3-4+ and it might smell like a separated moderately-or-bigger piece, maybe used on at least 2 places, not just one, put it onto a class). Then, maybe consider Copilot with Opus 4.6/4.7 to refactor it and see what happens. Then, learn from it, and consider to ask repetitively but now with more specifications/exactness about how can it refactor it - you provide the 'tutorial' for it. Etc. This is what I am doing with architectural questions, refactoring tasks, even when coding projects completely from scratch. I think this is one of the skills to have. AI will do what you tell it to do
There was a problem hiding this comment.
I did not implement this suggestion directly, it kinda came with the other changes, look please if it looks alright to you.
There was a problem hiding this comment.
yet another embedded SQL - I am discovering these more and more as I read the code. Seriously consider using AIOSQL :)
There was a problem hiding this comment.
Cost / reason: Imagine that someone will want to see all ways how we use a database. Like all the queries etc. With the current approach, where SQL statements are python strings all over the codebase, it's very hard to do.
There was a problem hiding this comment.
This change now, will help as well for the following task, where we add and implement other aggregated queries for management.
| with connection.cursor() as cursor: | ||
| if topic_name == "public.cps.za.dlchange": | ||
| self._postgres_edla_write(cursor, table_info["main"], message) | ||
| elif topic_name == "public.cps.za.runs": |
There was a problem hiding this comment.
consider extracting these topic names into a common constant - maybe a frozen data class or so
There was a problem hiding this comment.
These topics were added in commit: 731d259. I used frozen data class as well in the solution. Thank you for showing that approach.
| db_cursor.execute(query, params) | ||
| col_names = [desc[0] for desc in db_cursor.description] # type: ignore[union-attr] | ||
| raw_rows = db_cursor.fetchall() | ||
| connection.rollback() |
There was a problem hiding this comment.
rollback on read? what am I missing here?
There was a problem hiding this comment.
I understand. But this is a correct way, how to leave cached connection in a state for reuse. PostgreSQL automatically opens a transaction, even a plain SELECT. Until that transaction is explicitly closed, the connection is stuck in an "in transaction" state. So we use a rollback. I did add a comment to this logic, to be more clear.
# Rollback closes the implicit transaction opened by the SELECT,
# leaving the cached connection in a clean idle state for reuse.
| break | ||
| except OperationalError as exc: | ||
| self._connection = None | ||
| if attempt > 0: |
There was a problem hiding this comment.
What am I missing here - is the retry even working? Would it not fail after the first attempt?
There was a problem hiding this comment.
or is it that the RUntimeError is non-retrieable?
| break | ||
| except OperationalError: | ||
| self._connection = None | ||
| if attempt > 0: |
There was a problem hiding this comment.
consider improving logs - similar way how the Reader is done. In fact, maybe you can extract some of it into a common class handling connections and retries. Just a quick idea, I would need to think about it more, just consider it. I know that the conn setting for reading and writing is slightly different (but that could be parametrized or so)
|
I find the title to be a bit misleading considering the implementation. There is no connection pooling, just caching implemented manually. Either change the title and PR description to say that connection is reshared / cached, or introduce One idea on the connection pooling: check how many connections can Aurora, with our current infrastructure setup, tolerate. It's good to be setting this up on the 'application' (Lambda) side, but for this you must understand the Lambda Execution lifecycle: https://docs.aws.amazon.com/lambda/latest/dg/lambda-runtime-environment.html because some things can be implemented in 'shared' way between individual Lambda invocations, like connection pooling, but it's slightly more advanced topic and your current approach would need to significantly change. SOmething like, that the connection pool must be mutable and on global level, on the place when the lambda handler is called I think. This is completely possible and would lead to more robustness (having less bottlenecks on DB connections - not that we have bottlenecks currently?), just saying that it's more work. I would read the documentation I sent you and then ask the AI about this (you can learn how it works and check the approach and your code) From the above, see Two more resources: |
There was a problem hiding this comment.
Actionable comments posted: 9
🧹 Nitpick comments (1)
tests/unit/utils/test_postgres_base.py (1)
76-86: Move PostgreSQL/AWS-secret mocks into shared fixtures.These tests define external-service mocks inline in several places. Please centralize the
pb.psycopg2andload_postgres_configmocks inconftest.pyfixtures to keep the unit-suite isolation consistent.As per coding guidelines,
tests/unit/**/*.py: Unit tests must mock external services (Kafka, EventBridge, PostgreSQL, S3) viaconftest.py.Also applies to: 108-139, 187-220
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/unit/utils/test_postgres_base.py` around lines 76 - 86, Tests inline-patch external services (pb.psycopg2 and load_postgres_config) inside test_postgres_base.py; move those mocks into reusable fixtures in conftest.py and update tests to use them. Create fixtures in conftest.py that patch src.utils.postgres_base.load_postgres_config (returning the secret dict) and pb.psycopg2 connection behavior, then remove the inline with patch(...) blocks from test_pg_config_builds_correct_values and related tests (including the ones at 108-139 and 187-220) and accept the fixtures (e.g., postgres_config_mock, psycopg2_mock) as function args so tests simply access base._pg_config and other behavior without in-test patching. Ensure fixture scope is appropriate (function or module) and that mocks expose call_count/assertions used by the tests.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/readers/reader_postgres.py`:
- Around line 159-162: The finally block currently calls connection.rollback()
unguarded which can raise and replace the original execute() error; wrap the
rollback call in its own try/except so any exception from connection.rollback()
is caught and logged/suppressed, and ensure the cached connection is discarded
(close the connection and clear whatever cache variable holds it, e.g. set
self._cached_connection = None or call the connection-cache cleanup helper) so a
bad connection isn’t reused; also make sure to re-raise the original exception
from the execute path (do not let rollback exceptions replace it).
In `@src/utils/postgres_base.py`:
- Around line 55-68: The _build_postgres_config currently masks missing/invalid
fields by coercing to empty strings and port 0; change it to validate required
keys from aws_secret and raise clear errors instead of silent defaults: in
_build_postgres_config check aws_secret contains non-empty
"database","host","user","password" and that "port" is present and a positive
integer (raise ValueError with descriptive messages on failures), then construct
and return PostgresConfig using those validated values so malformed AWS secrets
fail fast; reference the function _build_postgres_config, the aws_secret input
dict, and the PostgresConfig constructor when making the changes.
- Around line 123-142: The _execute_with_retry method currently retries any
operation (operation: Callable[..., T]) on OperationalError which can duplicate
non-idempotent writes; change the API so retries are opt-in: add a parameter
(e.g. retry: bool or idempotent: bool) to _execute_with_retry and require
callers of writer paths to pass retry=True only when they guarantee idempotency,
or alternatively require callers to supply an idempotency token/callback; inside
_execute_with_retry (referencing POSTGRES_MAX_RETRIES, _get_connection,
_close_connection, OperationalError) only perform the retry loop when the new
flag indicates it is safe, otherwise raise immediately after the first
OperationalError with the original exception preserved.
- Around line 99-109: The code calling psycopg2.connect via connect_kwargs lacks
a connect_timeout, which can cause Lambda functions to hang on network/DNS
stalls; update the connection construction in the method that builds
connect_kwargs (referencing connect_kwargs, pg_config, _connect_options and
psycopg2.connect) to include a connect_timeout value (make it configurable via
pg_config or a class attribute with a sensible default, e.g., 5–10 seconds) and
ensure that value is added to connect_kwargs before calling psycopg2.connect so
self._connection is created with the timeout applied.
In `@src/writers/writer_postgres.py`:
- Around line 188-189: The exception log currently builds a string into err_msg
and calls logger.exception(err_msg) which forces eager formatting and may omit a
trailing period; change the call to use lazy logging with logger.exception("The
Postgres writer failed with unknown error: %s.", e) (or logger.exception("The
Postgres writer failed with unknown error: %s.", str(e))) and keep err_msg for
the return value if needed; update the message to ensure it ends with a period
and reference the err_msg variable and logger.exception call in your edit.
- Around line 185-186: The retry wrapper around _write_topic via
_execute_with_retry is re-running non-idempotent INSERT batches (which include
connection.commit()) and can cause duplicate rows if the commit actually
succeeded but the client perceived failure; update the implementation to make
retries idempotent by either modifying the INSERT statements in
src/writers/sql/inserts.sql to include an ON CONFLICT ... DO NOTHING/DO UPDATE
targeting event_id, or ensure a UNIQUE constraint exists on event_id in the DB
schema, or change _execute_with_retry/_write_topic so retries only occur before
any DML is sent (i.e., detect/avoid retrying after commit); pick one of these
fixes and apply it consistently to _write_topic and the SQL files.
In `@tests/unit/readers/test_reader_postgres.py`:
- Line 405: The test currently unpacks a second value from
reader.read_stats(limit=10) into pagination but never uses it, triggering
RUF059; change the unpack target to an intentionally unused variable (e.g.,
_pagination or _) when calling reader.read_stats in the test so the linter knows
the value is intentionally ignored while still preserving the two-value
unpacking for the read_stats function.
In `@tests/unit/utils/test_postgres_base.py`:
- Around line 41-47: The test currently asserts empty-string/0 defaults for
missing DB fields; instead make the behavior fail-fast by updating
_build_postgres_config to validate required fields (database, host, user,
password, port) and raise a clear exception (e.g., ValueError) when any are
missing or invalid, and change
test_build_postgres_config_defaults_for_missing_keys to assert that calling
_build_postgres_config({}) raises that exception; reference the
_build_postgres_config function for where to add the validation and the
test_test_build_postgres_config_defaults_for_missing_keys for the assertion
change.
In `@tests/unit/writers/test_writer_postgres.py`:
- Around line 188-192: Replace manual assignments/deletions of
type(writer)._pg_config in tests (e.g., test_write_skips_when_no_database) with
monkeypatch to avoid leaks: use the monkeypatch fixture to call
monkeypatch.setattr(type(writer), "_pg_config", property(lambda self:
{"database": ""})) before invoking WriterPostgres.write and remove the manual
del; apply this pattern for all tests that override _pg_config (e.g., those
referencing WriterPostgres and _pg_config at the listed lines).
---
Nitpick comments:
In `@tests/unit/utils/test_postgres_base.py`:
- Around line 76-86: Tests inline-patch external services (pb.psycopg2 and
load_postgres_config) inside test_postgres_base.py; move those mocks into
reusable fixtures in conftest.py and update tests to use them. Create fixtures
in conftest.py that patch src.utils.postgres_base.load_postgres_config
(returning the secret dict) and pb.psycopg2 connection behavior, then remove the
inline with patch(...) blocks from test_pg_config_builds_correct_values and
related tests (including the ones at 108-139 and 187-220) and accept the
fixtures (e.g., postgres_config_mock, psycopg2_mock) as function args so tests
simply access base._pg_config and other behavior without in-test patching.
Ensure fixture scope is appropriate (function or module) and that mocks expose
call_count/assertions used by the tests.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 641ba377-cd75-4685-b525-f0b77848a77c
📒 Files selected for processing (15)
.github/copilot-instructions.mdrequirements.txtsrc/handlers/handler_topic.pysrc/readers/reader_postgres.pysrc/readers/sql/stats.sqlsrc/utils/config_loader.pysrc/utils/constants.pysrc/utils/postgres_base.pysrc/utils/utils.pysrc/writers/sql/inserts.sqlsrc/writers/writer_postgres.pytests/unit/readers/test_reader_postgres.pytests/unit/utils/test_postgres_base.pytests/unit/utils/test_trace_logging.pytests/unit/writers/test_writer_postgres.py
✅ Files skipped from review due to trivial changes (5)
- requirements.txt
- .github/copilot-instructions.md
- src/utils/utils.py
- src/readers/sql/stats.sql
- src/writers/sql/inserts.sql
|
Caution Failed to replace (edit) comment. This is likely due to insufficient permissions or the comment being deleted. Error details |
Overview
This pull request introduces connection caching and reuse for both PostgreSQL readers and writers, improving efficiency and reliability by maintaining a single connection per instance. It also adds robust reconnection logic, updates tests to reflect the new behavior, and enhances configuration for dependency updates.
Release Notes
Related
Closes #115
Summary by CodeRabbit
New Features
Bug Fixes
Performance Improvements
Tests
Documentation
Chores