dynamo-io is an opinionated Python library for working with DynamoDB single-table designs. It provides:
- Dataclass-based record models
- Schema definitions for primary keys, secondary index keys, and typed attributes
- Helpers for inserts, upserts, deletes, point reads, partition queries, and indexed queries
- Response wrappers for raw rows and deserialized records
- An in-memory mock DynamoDB client and table helpers for unit and scenario tests
The package targets Python >=3.11,<4.0 and works with the low-level boto3 DynamoDB client API.
Using pip:
pip install dynamo-ioUsing Poetry:
poetry add dynamo-ioThis library is built around a normalized single-table shape:
- Primary partition key lives in
pk - Primary sort key lives in
sk - Additional indexed key columns may live in
g1k,g2k, andg3k - Common metadata columns
created_at,updated_at, andexpires_atare included on everyRecord
It expects a low-level DynamoDB client, for example:
import boto3
client = boto3.client("dynamodb")Do not pass a DynamoDB resource/table object. The functions call client methods such as get_item, update_item, batch_write_item, and get_paginator.
import dataclasses
import datetime
import boto3
import dynamo_io as dio
@dataclasses.dataclass(frozen=True)
class Product(dio.Record):
product_id: dio.TypeHints.KeyColumn = None
sku: dio.TypeHints.KeyColumn = None
category_key: dio.TypeHints.String = None
name: dio.TypeHints.String = None
inventory: dio.TypeHints.Integer = None
launched_at: dio.TypeHints.Datetime = None
schema: dio.SchemaType = dio.Schema(
partition_key=dio.PartitionColumn("product_id", "product:"),
sort_key=dio.SortColumn("sku", "sku:"),
columns=(
dio.GlobalFirstColumn("category_key", "category:"),
dio.StringColumn("name"),
dio.IntegerColumn("inventory"),
dio.DatetimeColumn("launched_at"),
),
)
client = boto3.client("dynamodb")
table_name = "catalog"
record = Product(
product_id="product:123",
sku="sku:red-small",
category_key="category:shirts",
name="Red Small Shirt",
inventory=10,
launched_at=datetime.datetime(2026, 1, 1, tzinfo=datetime.timezone.utc),
)
dio.upsert(client, table_name, record)
result = dio.get_record(
client=client,
table_name=table_name,
source=Product(product_id="product:123", sku="sku:red-small"),
)
loaded = result.recordCreate a dataclass that subclasses dio.Record and assign a schema class variable.
@dataclasses.dataclass(frozen=True)
class ExampleRecord(dio.Record):
account_id: dio.TypeHints.KeyColumn = None
object_id: dio.TypeHints.KeyColumn = None
status: dio.TypeHints.String = None
enabled: dio.TypeHints.Boolean = None
schema: dio.SchemaType = dio.Schema(
partition_key=dio.PartitionColumn("account_id", "account:"),
sort_key=dio.SortColumn("object_id", "object:"),
columns=(
dio.StringColumn("status"),
dio.BooleanColumn("enabled"),
),
)schemamust be a class variable with type hintdio.SchemaType- The base
Recordclass always contributescreated_at,updated_at, andexpires_at - A record can omit the sort key by setting
sort_key=Nonein the schema - Field values are serialized directly; this library does not prepend key prefixes for you
- In practice, key values should already be in the exact DynamoDB form you want stored, such as
"product:123"or"sku:red-small"
PartitionColumn(name, value_prefix)defines the field that maps to table keypkSortColumn(name, value_prefix)defines the field that maps to table keyskGlobalFirstColumn(name, value_prefix)maps tog1kGlobalSecondColumn(name, value_prefix)maps tog2kGlobalThirdColumn(name, value_prefix)maps tog3kColumn(name, data_type)is the generic typed field definitionStringColumn,IntegerColumn,DatetimeColumn, etc. are typed conveniencesMapColumn(name, children=...)describes nested map attributes- Any column can use
key="actual_dynamo_attribute_name"to store under a different attribute name - Any column can use
computed=Trueto write the field but exclude it when materializing records from rows
Available TypeHints aliases:
TypeHints.BooleanTypeHints.BinarySetTypeHints.StringTypeHints.BytesTypeHints.DateTypeHints.DatetimeTypeHints.FloatTypeHints.IntegerTypeHints.IndexTypeHints.StringSetTypeHints.FloatSetTypeHints.IntegerSetTypeHints.TimestampTypeHints.ListTypeHints.MapTypeHints.KeyColumn
Available typed column helpers:
BooleanColumnBinarySetColumnStringColumnBytesColumnDateColumnDatetimeColumnFloatColumnFloatSetColumnIntegerColumnIntegerSetColumnListColumnMapColumnStringSetColumnTimestampColumn
upsert(client, table_name, record) writes a single record with UpdateItem.
- Uses the record's primary key as
Key - Builds
ExpressionAttributeNames,ExpressionAttributeValues, andUpdateExpression - Returns a
SingleRecordResponse - Uses
if_not_existsforcreated_at - Always writes the latest
updated_atif present on the record
result = dio.upsert(client, "catalog", record)
updated_record = result.recordinsert_records(client, table_name, records) performs a batch write of PutRequest items.
- Accepts any iterable of
Recordinstances - Retries unprocessed items with exponential backoff for up to 10 attempts
- Raises
RuntimeErrorif items still remain unprocessed - Returns a basic
Response
dio.insert_records(client, "catalog", [record_a, record_b, record_c])Use this when you want batch insert behavior rather than attribute-level updates.
transacts(client, table_name, puts=None, updates=None, deletes=None) builds a single transact_write_items request.
dio.transacts(
client=client,
table_name="catalog",
puts=[
Product(product_id="product:1", sku="sku:a"),
],
updates=[
Product(product_id="product:2", sku="sku:b", inventory=20),
],
deletes=[
Product(product_id="product:3", sku="sku:c"),
],
)remove(client, table_name, record) deletes one record by its primary key.
dio.remove(
client=client,
table_name="catalog",
record=Product(product_id="product:123", sku="sku:red-small"),
)Use the sentinel dio.DELETE to remove an attribute from an existing item during upsert or transactional update generation.
record = Product(
product_id="product:123",
sku="sku:red-small",
name=dio.DELETE,
)
dio.upsert(client, "catalog", record)Empty strings are also treated as removals in update expressions. None means "leave unchanged" for update generation and is omitted from serialized writes.
These helpers return DynamoDB rows in low-level attribute-value format.
Read one row by primary key.
result = dio.get_row(
client=client,
table_name="catalog",
partition_key_value="product:123",
sort_key_value="sku:red-small",
)
row = result.rowQuery a partition, optionally filtering by sort key prefix or range bounds.
result = dio.get_rows_for_partition(
client=client,
table_name="catalog",
partition_key_value="category:shirts",
sort_key_starts="product:",
index=dio.Indexes.G1_PARTITION,
limit=100,
)Parameters:
sort_key_starts: builds abegins_withcondition on the queried index sort keybefore_sort_key: adds<after_sort_key: adds>index: defaults toIndexes.STANDARDlimit: optional DynamoDB query limit
Query an index directly by explicit partition/sort values:
result = dio.get_indexed_row(
client=client,
table_name="catalog",
partition_key_value="category:shirts",
sort_key_value="product:123",
index=dio.Indexes.G1_PARTITION,
)These helpers deserialize matching rows back into Record instances.
Fetch one record by primary key using a partially populated source record.
result = dio.get_record(
client=client,
table_name="catalog",
source=Product(product_id="product:123", sku="sku:red-small"),
)
record = result.recordQuery multiple rows and map them to known record classes.
result = dio.get_records_for_partition(
client=client,
table_name="catalog",
partition_key_value="category:shirts",
index=dio.Indexes.G1_PARTITION,
record_classes=[Product],
)
records = result.recordsNotes:
- Only rows whose schema matches one of the provided
record_classesare returned as records - Rows with unknown fields or mismatched key prefixes are skipped
- The raw rows remain available on
result.rows
Query an index using a source record to supply the relevant index key values.
result = dio.get_indexed_record(
client=client,
table_name="catalog",
source=Product(
category_key="category:shirts",
product_id="product:123",
),
index=dio.Indexes.G1_PARTITION,
)For source-based indexed queries, the library inspects the source record's schema to find fields that correspond to the requested index's partition and sort key attributes.
read_entire_table(client, table_name, max_page_count=100) performs a scan across the whole table.
result = dio.read_entire_table(client, "catalog", max_page_count=25)This is explicitly a debugging helper. It returns a ScannedRowResponse with:
rowspagescompleted
If the scan exceeds max_page_count, completed is False.
The package exposes predeclared Indexes values that describe common key layouts:
Indexes.STANDARDIndexes.INVERTEDIndexes.PARTITION_G1Indexes.PARTITION_G2Indexes.PARTITION_G3Indexes.SORT_G1Indexes.SORT_G2Indexes.SORT_G3Indexes.G1_PARTITIONIndexes.G1_SORTIndexes.G1_G2Indexes.G1_G3Indexes.G2_PARTITIONIndexes.G2_SORTIndexes.G2_G1Indexes.G2_G3Indexes.G3_PARTITIONIndexes.G3_SORTIndexes.G3_G1Indexes.G3_G2
Each Index definition carries:
idnamepartition_keysort_key
These objects are used by query helpers to build KeyConditionExpression and optional IndexName values.
The library serializes record values to DynamoDB's low-level attribute-value format.
Examples:
- strings become
{"S": "..."} - integers become
{"N": "..."} - booleans become
{"BOOL": ...} - datetimes become UTC ISO strings like
2021-02-03T00:00:00Z - timestamps become epoch seconds stored as
{"N": "..."} - sets become homogeneous DynamoDB set types (
SS,NS,BS) - maps become nested
{"M": ...}values
Deserialization reverses that process when building records from rows.
Read and write helpers return small dataclasses instead of bare dictionaries.
Response: rawrequestandresponseSingleRowResponse: addsrowPagedRowResponse: addspages,rows,first_row,iter_rows()ScannedRowResponse: addscompleted
SingleRecordResponse: addsrecordPagedRecordResponse: addsrecords,first_record,iter_records()ScannedRecordResponse: defined for scanned record use cases
All response types expose to_debug_dict() for compact debug logging.
The package includes dynamo_io.mock, which provides an in-memory client compatible with this library's read/write helpers.
import dynamo_io as dio
from dynamo_io import mock
client = mock.MockDynamoClient()
dio.upsert(
client=client,
table_name="NA",
record=Product(
product_id="product:123",
sku="sku:red-small",
category_key="category:shirts",
inventory=10,
),
)
result = dio.get_record(
client=client,
table_name="NA",
source=Product(product_id="product:123", sku="sku:red-small"),
)dynamo_io.mock exports helpers for building raw DynamoDB attribute values:
mock.string(value)mock.integer(value)mock.number(value)mock.boolean(value)mock.timestamp(...)mock.date_time(...)
Example:
from dynamo_io import mock as m
client = m.MockDynamoClient()
client.table.add_rows(
{"pk": m.string("foo"), "sk": m.string("bar1"), "g1k": m.string("a")},
{"pk": m.string("foo"), "sk": m.string("bar2"), "g1k": m.string("a")},
)client.table exposes a MockTable with helpers that are useful in unit and scenario tests:
add_row,add_rowsadd_record,add_recordsupdate_row,update_rowsupdate_record,update_recordsdelete_key,delete_row,delete_rowsdelete_record,delete_recordsall_records(record_class)find_rows(needles)find_records(needles, record_class)assert_has_key(key)assert_row_values(key, comparisons)assert_matching_row_values(partition_key_value, sort_key_starts, comparisons, ...)
String lookups in find_rows and find_records support wildcard matching via shell-style patterns such as "prod:s*".
The mock package also exports comparison helpers used in assertions:
is_anythingis_inis_instance_ofis_isois_likeis_matchis_not_nullis_optional
import dataclasses
import dynamo_io as dio
from dynamo_io import mock
@dataclasses.dataclass(frozen=True)
class Product(dio.Record):
product: dio.TypeHints.KeyColumn = None
line: dio.TypeHints.KeyColumn = None
category: dio.TypeHints.String = None
units: dio.TypeHints.Integer = None
schema: dio.SchemaType = dio.Schema(
partition_key=dio.PartitionColumn("product", "prod:"),
sort_key=dio.SortColumn("line", "line:"),
columns=(
dio.GlobalFirstColumn("category", "cat:"),
dio.IntegerColumn("units"),
),
)
client = mock.MockDynamoClient()
dio.insert_records(
client=client,
table_name="NA",
records=[
Product(
product="prod:twix",
line="line:candy-bar",
category="cat:candy",
units=10,
),
Product(
product="prod:skittles",
line="line:candy-pieces",
category="cat:candy",
units=5,
),
],
)
single = dio.get_record(
client=client,
table_name="NA",
source=Product(product="prod:twix", line="line:candy-bar"),
)
by_category = dio.get_records_for_partition(
client=client,
table_name="NA",
partition_key_value="cat:candy",
index=dio.Indexes.G1_PARTITION,
record_classes=[Product],
)This repository is managed with Poetry and Taskipy.
Install dependencies:
poetry installRun tests:
poetry run task testRun the full local check suite:
poetry run task checkAvailable tasks in pyproject.toml include:
task blacktask flake8task mypytask radontask testtask linttask check
Top-level exports include:
- Record and schema types:
Record,Schema,SchemaType,Column, typed column helpers,PartitionColumn,SortColumn,GlobalFirstColumn,GlobalSecondColumn,GlobalThirdColumn - Type helpers:
DynamoType,DynamoTypes,TypeHints,DELETE - Index helpers:
Index,Indexes - Read functions:
get_row,get_record,get_rows_for_partition,get_records_for_partition,get_indexed_row,get_indexed_rows,get_indexed_record,get_indexed_records,read_entire_table - Write functions:
insert_records,upsert,remove,transacts - Response types:
Response,SingleRowResponse,PagedRowResponse,SingleRecordResponse,PagedRecordResponse
MIT