Skip to content

vkaracic/mini_zapier

Repository files navigation

Mini Zapier

A local-first workflow automation engine where an AI agent is the orchestrator. Instead of rigid if/then chains, workflows are driven by Claude through raw Anthropic SDK tool use.

No LangChain. No CrewAI. Just a ReAct loop, a tool registry, and YAML workflow definitions.

This was primarily used for me to learn agentic AI development, so it is was more educational than practical although there are practical use-cases!

How it works

The core idea is simple: define a workflow in YAML, and let the agent figure out the details.

There are two modes:

Goal mode - you describe what you want, the agent picks the tools and sequence:

name: amazon_price_alert
mode: goal
params:
  item_id:
    description: "Amazon ASIN"
  price_threshold:
    description: "Alert below this price in euros"
goal: >
  Check the price of Amazon item '{{ params.item_id }}' on amazon.de.
  If lower than {{ params.price_threshold }} euros, send an email alert.

Explicit mode - you define the steps, the agent handles errors and data flow:

name: hn_interests_summary
mode: explicit
params:
  interests:
    default: "programming, AI, agents, LLM, AWS"
steps:
  - name: fetch_hn
    tool: http_request
    params:
      method: "GET"
      url: "https://hnrss.org/frontpage"
  - name: parse_feed
    tool: rss_parser
    params:
      xml: "{{ previous_step.output }}"
  - name: filter_by_interests
    tool: structured_agent
    params:
      task: >
        Here are my interests: [{{ params.interests }}].
        Return only matching stories with title, url, and matched interest.
        Stories: {{ previous_step.output }}
      output_schema:
        type: object
        properties:
          stories:
            type: array
            items:
              type: object
              properties:
                title: { type: string }
                url: { type: string }
                matched_interest: { type: string }
              required: [title, url, matched_interest]
        required: [stories]

Under the hood, the agent runs a ReAct loop (plan, act, observe, decide) with a hard cap of 15 iterations. Every tool call and result is logged to SQLite, so you can inspect exactly what happened after the fact.

Setup

Requires Python 3.14+ and uv.

git clone <repo-url> && cd mini_zapier
cp .example.env .env
# Fill in your Anthropic API key and SMTP credentials
uv sync

Running the server

uv run fastapi dev main.py

Opens Swagger UI at http://localhost:8000/docs with endpoints for each workflow.

Running from the CLI

# List available workflows
uv run python cli.py list

# Run a workflow
uv run python cli.py run hn_interests_summary.yaml

# Pass parameters
uv run python cli.py run amazon_price_alert.yaml \
  -p item_id=B0CQQVNCB6 \
  -p price_threshold=600

# Validate all workflows (checks YAML syntax + tool names)
uv run python cli.py validate

Tools

Every tool extends BaseTool, implements execute() and get_parameters_schema(), and auto-generates its own Anthropic tool definition. The agent sees all registered tools and picks the right one for the job.

Tool What it does
http_request HTTP requests with configurable method, headers, body
agent Delegates a subtask to another agent instance
structured_agent Same as above, but forces structured JSON output via a schema
email Sends email via SMTP
rss_parser Parses RSS/Atom XML into structured items
scrape_amazon_price Extracts price from an Amazon product page
file_io Read/write files (sandboxed to /tmp/)
shell_command Runs allowlisted shell commands (ls, pwd, date, etc.)
json_parser Parses a JSON string
regex Pattern matching and replacement
base64 Encode/decode
delay Sleep (max 60s)

Adding a new tool

Create a file in tools/, subclass BaseTool, and register it.

# tools/word_count.py
from tools.base import BaseTool, ToolResult


class WordCountTool(BaseTool):
    name = "word_count"
    description = "Count the number of words in a text"

    def get_parameters_schema(self) -> dict:
        return {
            "type": "object",
            "properties": {
                "text": {"type": "string"},
            },
            "required": ["text"],
        }

    async def execute(self, **params) -> ToolResult:
        count = len(params["text"].split())
        return ToolResult(success=True, output=str(count), data={"count": count})

Then add it to register_all_tools() in tools/base.py:

from tools.word_count import WordCountTool
# ...
tool_registry.register_tool(WordCountTool())

That's it. The tool is now available to the agent and can be used in any workflow.

Adding a new workflow

Drop a YAML file in workflows/custom/. Two fields control the execution mode:

  • mode: goal + goal: - the agent drives everything
  • mode: explicit + steps: - you define the pipeline

Template variables available in step params and goals:

  • {{ previous_step.output }} - output from the last step
  • {{ params.X }} - runtime parameter, passed via CLI (-p X=value) or API

Workflows with a schedule field (cron syntax) are automatically picked up by the scheduler on server startup:

name: daily_digest
schedule: "0 9 * * *"
mode: explicit
# ...

API

The server exposes dedicated endpoints for each example workflow, plus generic ones:

POST /workflows/amazon-price-alert     {item_id, domain?, price_threshold}
POST /workflows/hn-email-digest        {to_email}
POST /workflows/hn-interests-summary   {interests?}
POST /workflows/hacker-news-summary
POST /workflows/get-latest-hn-link
POST /workflows/check-api

POST /test-workflow                     {path, params?}
POST /test                             {task}

GET  /executions/                      ?limit=20&offset=0
GET  /executions/{id}                  Full trace with step logs

The execution history endpoints return every tool call the agent made, with inputs and outputs - useful for debugging workflows or understanding how the agent approached a task.

Architecture

agent/
  core.py          ReAct loop -- sends messages to Claude, executes tool calls, logs steps
  prompts.py       System prompt

tools/
  base.py          BaseTool ABC, ToolRegistry, ToolResult model
  *.py             One file per tool

workflows/
  models.py        Pydantic models (Workflow, WorkflowStep, WorkflowParam)
  engine.py        Runs workflows -- param resolution, template rendering, step execution
  loaders.py       YAML parsing
  router.py        FastAPI endpoints per workflow
  examples/*.yaml  Example workflows

store/
  database.py      SQLite via aiosqlite -- execution and step log tables
  router.py        Execution history API

cli.py             CLI runner (run, list, validate)
main.py            FastAPI app, tool registration, scheduler setup
config.py          Pydantic settings from .env

The agent loop in agent/core.py is intentionally minimal. It sends messages to Claude with the full tool registry, processes tool_use responses by executing the corresponding tool, feeds results back, and repeats until Claude says it's done or hits the iteration limit. No framework magic, no abstractions beyond what's needed.

Development

uv run ruff check .          # Lint
uv run ruff format .         # Format
uv run mypy .                # Type check
uv run pytest -v             # Tests

Tech stack

  • Python 3.14 with uv
  • Anthropic SDK -- direct tool use, no agent frameworks
  • FastAPI -- async API with Swagger UI
  • SQLite via aiosqlite -- execution traces
  • APScheduler -- cron-based workflow scheduling
  • Pydantic v2 -- models, settings, validation

About

Local-first workflow automation engine where an AI agent (ReAct loop) is the orchestrator. Define workflows in YAML, let Claude pick the tools. Raw Anthropic SDK, no frameworks.

Topics

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors

Languages