Skip to content

graphrefly/graphrefly-py

Repository files navigation

GraphReFly

The reactive harness layer for agent workflows. Describe in plain language, review visually, run persistently, trace every decision.

GraphReFly makes long-running human + LLM co-operation reactive, resumable, and causally explainable. State pushes downstream on change (no re-reading), nodes have lifecycles (not infinite append), and every decision has a traceable causal chain — the substrate underneath tools, agents, and personal automations.

PyPI license Python

Docs | Spec | TypeScript | Python API


What can you do with it?

Email triage — "Watch my inbox. Urgent emails from my team go to a priority list. Newsletters get summarized weekly. Everything else, count by sender." It watches, classifies, and alerts — and when you ask "why was this flagged?", it walks you through the reasoning.

Spending alerts — Connect bank transactions to budget categories. Get a push notification when monthly dining exceeds your target. No polling, no manual checks — changes propagate the moment data arrives.

Knowledge management — Notes, bookmarks, highlights flow in. Contradictions surface automatically. Related ideas link themselves. Your second brain stays current without you maintaining it.


Quick start

pip install graphrefly
from graphrefly import state, derived, effect

count = state(0)
doubled = derived([count], lambda deps, _: deps[0] * 2)

effect([doubled], lambda deps, _: print("doubled:", deps[0]))
# → doubled: 0

count.push(3)
# → doubled: 6

How it works

You describe what you need — an LLM composes a reactive graph (like SQL for data flows). The graph runs persistently, checkpoints its state, and traces every decision through a causal chain. Ask "why?" at any point and get a human-readable explanation from source to conclusion.

Harness engineering coverage

The eight requirements of a production agent harness cluster into a handful of composed blocks that sit on top of the reactive graph primitives:

Need GraphReFly
Context & state persistent_state()auto_checkpoint + snapshot / restore + incremental diff
Agent memory agent_memory()distill + vectors + knowledge graph + tiers, OpenViking decay
Control flow & resilience resilient_pipeline()rate_limiter → breaker → retry → timeout → fallback, correct ordering built in
Execution & policy guarded_execution() — Actor / Guard ABAC + policy() + budget_gate + scoped describe
Observability & causality graph_lens() — reactive topology, health, flow, and why(node) causal chains as structured data
Human governance gate — reactive pending / is_open with approve / reject / modify(fn, n)
Verification Multi-model eval harness with regression gates
Continuous improvement Strategy model: root_cause × intervention → success_rate

The library computes structured facts reactively; LLMs and UIs render them. Natural language is never the library's job — which keeps the whole stack model-agnostic and testable.

Why GraphReFly?

Redux / Zustand RxPY Pydantic AI LangGraph TC39 Signals GraphReFly
Simple store API yes no no no yes yes
Streaming operators no yes no no no yes
Diamond resolution no n/a n/a n/a partial glitch-free
Graph introspection no no no checkpoints no describe / observe / diagram
Causal tracing no no no no no explain every decision
Durable checkpoints no no no yes no file / SQLite / IndexedDB
LLM orchestration no no partial yes no agent_loop / chat_stream / tool_registry
NL → graph composition no no no no no graph_from_spec / llm_compose
Async runners n/a asyncio asyncio asyncio n/a asyncio / trio
Dependencies varies 0 many many n/a 0

One primitive

Everything is a node. Sugar constructors give you the right shape:

from graphrefly import state, derived, producer, effect
from graphrefly.core.messages import DATA

# Writable state
name = state("world")

# Computed (re-runs when deps change)
greeting = derived([name], lambda deps, _: f"Hello, {deps[0]}!")

# Push source (timers, events, async streams)
clock = producer(lambda emit, _: emit([(DATA, time.time())]))

# Side effect
effect([greeting], lambda deps, _: print(deps[0]))

Streaming & operators

70+ operators — transform, combine, buffer, window, rate-limit, retry, circuit-break:

from graphrefly.extra.tier1 import map_op, filter_op, scan
from graphrefly.extra.tier2 import switch_map, debounce_time
from graphrefly.extra.resilience import retry
from graphrefly import pipe

search = pipe(
    user_input,
    debounce_time(0.3),
    switch_map(lambda q: from_promise(fetch(f"/api?q={q}"))),
    retry(strategy="exponential", max_attempts=3),
)

Graph container

Register nodes in a Graph for introspection, snapshot, and persistence:

from graphrefly import Graph, state, derived

g = Graph("pricing")
price = g.register("price", state(100))
tax   = g.register("tax", derived([price], lambda d, _: d[0] * 0.1))
total = g.register("total", derived([price, tax], lambda d, _: d[0] + d[1]))

g.describe()   # → full graph topology as dict
g.diagram()    # → Mermaid diagram string
g.observe(lambda e: print(e))  # → live change stream

AI & orchestration

First-class patterns for LLM streaming, agent loops, and human-in-the-loop workflows:

from graphrefly.patterns.ai import chat_stream, agent_loop, tool_registry
from graphrefly.patterns.memory import collection, decay

# Streaming chat with tool use
chat = chat_stream("assistant", model="claude-sonnet-4-20250514",
                   tools=tool_registry("tools", search=search_fn))

# Full agent loop: observe → think → act → memory
agent = agent_loop("researcher", llm=chat,
                   memory=agent_memory(decay="openviking"))

Async runners

Native asyncio and trio support for async sources and long-running graphs:

from graphrefly.compat.asyncio_runner import AsyncioRunner
from graphrefly.extra.sources import from_async_iter

# Wrap an async generator as a reactive node
async def sse_events():
    async for event in httpx_client.stream("GET", "/events"):
        yield event.data

events = from_async_iter(sse_events())

# Run the graph in an asyncio event loop
runner = AsyncioRunner(graph)
await runner.run()

FastAPI integration

Drop-in integration for reactive backends:

from graphrefly.integrations.fastapi import GraphReflyRouter

router = GraphReflyRouter(graph)
app.include_router(router, prefix="/graph")
# GET /graph/describe  → graph topology
# GET /graph/snapshot  → current state
# WS  /graph/observe   → live change stream

Resilience & checkpoints

Built-in retry, circuit breakers, rate limiters, and persistent checkpoints:

from graphrefly.extra.resilience import retry, circuit_breaker, rate_limiter
from graphrefly.extra.checkpoint import FileCheckpointAdapter, save_graph_checkpoint

# Retry with exponential backoff
resilient = pipe(source, retry(strategy="exponential"))

# Circuit breaker
breaker = circuit_breaker(threshold=5, reset_timeout=30.0)

# Checkpoint to file system
adapter = FileCheckpointAdapter("./checkpoints")
save_graph_checkpoint(graph, adapter)

Project layout

Path Contents
src/graphrefly/core/ Message protocol, node primitive, batch, sugar constructors
src/graphrefly/extra/ Operators, sources, data structures, resilience, checkpoints
src/graphrefly/graph/ Graph container, describe/observe, snapshot, persistence
src/graphrefly/patterns/ Orchestration, messaging, memory, AI, CQRS, reactive layout
src/graphrefly/compat/ Async runners (asyncio, trio)
src/graphrefly/integrations/ Framework integrations (FastAPI)
docs/ Roadmap, guidance, benchmarks
website/ Astro + Starlight docs site (py.graphrefly.dev)

Scripts

uv run pytest              # run tests
uv run ruff check .        # lint
uv run mypy src/           # type check
uv run pytest --benchmark  # benchmarks

Requirements

Python 3.12 or later. Zero runtime dependencies.

Acknowledgments

GraphReFly builds on ideas from many projects and papers:

Protocol & predecessor:

  • Callbag (Andre Staltz) — the original reactive protocol spec. GraphReFly's message-based node communication descends from callbag's function-calling-function model.
  • callbag-recharge & callbag-recharge-py — GraphReFly's direct predecessors. The Python port (6 primitives, 18 operators, 100+ tests) established cross-language parity patterns carried forward.

Reactive design patterns:

  • SolidJS — two-phase execution (DIRTY propagation + value flow), automatic caching, and effect batching. Closest philosophical neighbor.
  • Preact Signals — fine-grained reactivity and cached-flag optimization patterns that informed RESOLVED signal design.
  • TC39 Signals Proposal — the .get()/.set() contract and the push toward language-level reactivity.
  • RxJS / RxPY — operator naming conventions and the DevTools observability philosophy that inspired the Inspector pattern.

AI & memory:

  • OpenViking (Volcengine) — the memory decay formula (sigmoid(log1p(count)) * exp_decay(age, 7d)) and L0/L1/L2 progressive loading strategy used in agent_memory().
  • FadeMem (Wei et al., ICASSP 2026) — biologically-inspired dual-layer memory with adaptive exponential decay.
  • MAGMA (Jiang et al., 2026) — four-parallel-graph model (semantic/temporal/causal/entity) that informed knowledge_graph() design.
  • Letta/MemGPT, Mem0, Zep/Graphiti, Cognee — production memory architectures surveyed during agent_memory() design.

Layout & other:

  • Pretext (Cheng Lou) — inspired the reactive layout engine's DOM-free text measurement pipeline.
  • CASL — declarative allow()/deny() policy builder DX that inspired policy().
  • Nanostores — tiny framework-agnostic API with .get()/.set()/.subscribe() mapping that validated the store ergonomics.

License

MIT