-
Notifications
You must be signed in to change notification settings - Fork 3
docs: add embedded Go API usage #15
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
yottahmd
wants to merge
1
commit into
main
Choose a base branch
from
docs/embedded-dagu-api
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,251 @@ | ||
| # Embedded Go API | ||
|
|
||
| The Go package `github.com/dagucloud/dagu` exposes an experimental API for starting Dagu DAG runs from another Go application. | ||
|
|
||
| The API is marked experimental in the package documentation. Names, options, and behavior may change before this API is declared stable. | ||
|
|
||
| ```go | ||
| import "github.com/dagucloud/dagu" | ||
| ``` | ||
|
|
||
| ## Local Execution | ||
|
|
||
| `dagu.New` creates an embedded engine backed by Dagu's file stores. `RunFile` and `RunYAML` load a DAG, start the run asynchronously, and return a `*dagu.Run`. | ||
|
|
||
| ```go | ||
| package main | ||
|
|
||
| import ( | ||
| "context" | ||
| "fmt" | ||
| "log" | ||
| "time" | ||
|
|
||
| "github.com/dagucloud/dagu" | ||
| ) | ||
|
|
||
| func main() { | ||
| ctx, cancel := context.WithTimeout(context.Background(), time.Minute) | ||
| defer cancel() | ||
|
|
||
| engine, err := dagu.New(ctx, dagu.Options{ | ||
| HomeDir: "/var/lib/myapp/dagu", | ||
| }) | ||
| if err != nil { | ||
| log.Fatal(err) | ||
| } | ||
| defer func() { | ||
| if err := engine.Close(context.Background()); err != nil { | ||
| log.Printf("close Dagu engine: %v", err) | ||
| } | ||
| }() | ||
|
|
||
| run, err := engine.RunYAML(ctx, []byte(` | ||
| name: embedded-local | ||
| params: | ||
| - MESSAGE | ||
| steps: | ||
| - name: hello | ||
| command: echo "${MESSAGE}" | ||
| `), dagu.WithParams(map[string]string{ | ||
| "MESSAGE": "hello from the host app", | ||
| })) | ||
| if err != nil { | ||
| log.Fatal(err) | ||
| } | ||
|
|
||
| status, err := run.Wait(ctx) | ||
| if err != nil { | ||
| log.Fatal(err) | ||
| } | ||
| fmt.Printf("%s finished with %s\n", status.RunID, status.Status) | ||
| } | ||
| ``` | ||
|
|
||
| The same engine can load a workflow file: | ||
|
|
||
| ```go | ||
| run, err := engine.RunFile(ctx, "/var/lib/myapp/workflows/report.yaml") | ||
| ``` | ||
|
|
||
| `Wait` blocks until the run reaches a terminal state or the context is canceled. It returns a non-nil status when the final status can be read. A non-success terminal status is returned as an error. | ||
|
|
||
| ## Engine Options | ||
|
|
||
| `dagu.Options` maps to Dagu's existing config and file-backed storage paths. | ||
|
|
||
| | Field | Effect | | ||
| | --- | --- | | ||
| | `HomeDir` | Dagu application home used for default config and data paths. | | ||
| | `ConfigFile` | Explicit Dagu config file. | | ||
| | `DAGsDir` | Directory used to resolve named DAGs and sub-DAGs. | | ||
| | `DataDir` | File-backed state directory. | | ||
| | `LogDir` | Run log directory. | | ||
| | `ArtifactDir` | Artifact directory. | | ||
| | `BaseConfig` | Base config file applied while loading DAGs. | | ||
| | `Logger` | `*slog.Logger` used by the embedded engine. A quiet logger is used when nil. | | ||
| | `DefaultMode` | Default execution mode for runs that do not pass `WithMode`. | | ||
| | `Distributed` | Coordinator, TLS, worker selector, and status polling options for distributed mode. | | ||
|
|
||
| `Close` releases engine-owned resources. It does not replace `Run.Wait`, `Run.Stop`, or worker shutdown. | ||
|
|
||
| ## Run Options | ||
|
|
||
| Run options apply to a single `RunFile` or `RunYAML` call. | ||
|
|
||
| | Option | Effect | | ||
| | --- | --- | | ||
| | `WithRunID(id)` | Sets an explicit DAG run ID. | | ||
| | `WithName(name)` | Overrides the loaded DAG name. | | ||
| | `WithParams(map[string]string)` | Sets DAG parameters from a Go map. | | ||
| | `WithParamsList([]string)` | Sets DAG parameters from `KEY=VALUE` entries. | | ||
| | `WithDefaultWorkingDir(dir)` | Sets the default working directory while loading the DAG. | | ||
| | `WithMode(mode)` | Overrides the engine default execution mode. | | ||
| | `WithWorkerSelector(map[string]string)` | Sets the distributed worker selector for one run. | | ||
| | `WithTags(tags...)` | Adds tags to one run. | | ||
| | `WithDryRun(enabled)` | Enables or disables dry-run mode. | | ||
|
|
||
| ## Run Status And Cancellation | ||
|
|
||
| The run handle exposes the run reference and current state: | ||
|
|
||
| ```go | ||
| ref := run.Ref() | ||
| status, err := run.Status(ctx) | ||
| err = run.Stop(ctx) | ||
| ``` | ||
|
|
||
| `Engine.Status(ctx, ref)` and `Engine.Stop(ctx, ref)` operate on a run reference. `Engine.Status` reads the local file-backed run status. For distributed runs, use the `Run` returned by `RunFile` or `RunYAML`; its `Status` method reads status from the coordinator. | ||
|
|
||
| `dagu.Status` contains the DAG name, run ID, attempt ID, status string, start and finish times, error string, log file path, archive directory, worker ID, and trigger type. | ||
|
|
||
| ## Custom Executors | ||
|
|
||
| `RegisterExecutor` registers a process-local executor type that can be used in DAG YAML. | ||
|
|
||
| Registration mutates process-global state. Register custom executors before concurrent DAG loading or execution. `UnregisterExecutor` exists for tests and should not run concurrently with engine use. | ||
|
|
||
| ```go | ||
| dagu.RegisterExecutor( | ||
| "embedded_echo", | ||
| func(_ context.Context, step dagu.Step) (dagu.Executor, error) { | ||
| return &echoExecutor{step: step}, nil | ||
| }, | ||
| dagu.WithExecutorCapabilities(dagu.ExecutorCapabilities{Command: true}), | ||
| ) | ||
| ``` | ||
|
|
||
| The executor type name must start with a letter and may contain letters, digits, `_`, and `-`. | ||
|
|
||
| ```go | ||
| type echoExecutor struct { | ||
| step dagu.Step | ||
| stdout io.Writer | ||
| } | ||
|
|
||
| func (e *echoExecutor) SetStdout(out io.Writer) { | ||
| e.stdout = out | ||
| } | ||
|
|
||
| func (e *echoExecutor) SetStderr(io.Writer) {} | ||
|
|
||
| func (e *echoExecutor) Kill(os.Signal) error { | ||
| return nil | ||
| } | ||
|
|
||
| func (e *echoExecutor) Run(context.Context) error { | ||
| out := e.stdout | ||
| if out == nil { | ||
| out = io.Discard | ||
| } | ||
| _, err := fmt.Fprintf(out, "handled %s\n", e.step.Name) | ||
| return err | ||
| } | ||
| ``` | ||
|
|
||
| YAML can then reference the registered type: | ||
|
|
||
| ```yaml | ||
| name: embedded-custom | ||
| steps: | ||
| - name: call-go-code | ||
| type: embedded_echo | ||
| command: domain operation from DAG YAML | ||
| ``` | ||
|
|
||
| A custom executor is registered only in the current Go process. In distributed mode, every worker process that may execute a DAG containing that type must register the same executor type before it starts processing tasks. | ||
|
|
||
| ## Distributed Execution | ||
|
|
||
| Embedded distributed execution dispatches a loaded DAG to an existing Dagu coordinator. The public embedded API does not start a coordinator; start one with Dagu server commands such as `dagu coord` or `dagu start-all` with coordinator settings. | ||
|
|
||
| The embedded coordinator client requires TLS configuration unless plaintext is explicitly enabled. For a local plaintext coordinator: | ||
|
|
||
| ```go | ||
| TLS: dagu.TLSOptions{Insecure: true} | ||
| ``` | ||
|
|
||
| For TLS connections, use `CertFile`, `KeyFile`, `ClientCAFile`, and optionally `SkipTLSVerify`. | ||
|
|
||
| ```go | ||
| engine, err := dagu.New(ctx, dagu.Options{ | ||
| HomeDir: "/var/lib/myapp/dagu", | ||
| DefaultMode: dagu.ExecutionModeDistributed, | ||
| Distributed: &dagu.DistributedOptions{ | ||
| Coordinators: []string{"127.0.0.1:50055"}, | ||
| TLS: dagu.TLSOptions{Insecure: true}, | ||
| WorkerSelector: map[string]string{ | ||
| "pool": "embedded", | ||
| }, | ||
| PollInterval: time.Second, | ||
| }, | ||
| }) | ||
| ``` | ||
|
|
||
| `RunFile` and `RunYAML` load and validate the DAG in the caller process, then dispatch the loaded YAML definition to the coordinator. Shared-nothing workers receive the DAG definition from the coordinator and do not read the caller's DAG file path. | ||
|
|
||
| An embedded worker can run in the same host process: | ||
|
|
||
| ```go | ||
| worker, err := engine.NewWorker(dagu.WorkerOptions{ | ||
| ID: "embedded-worker-1", | ||
| MaxActiveRuns: 4, | ||
| Labels: map[string]string{"pool": "embedded"}, | ||
| }) | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
||
| workerCtx, stopWorker := context.WithCancel(ctx) | ||
| defer stopWorker() | ||
|
|
||
| go func() { | ||
| if err := worker.Start(workerCtx); err != nil && !errors.Is(err, context.Canceled) { | ||
| log.Printf("worker stopped: %v", err) | ||
| } | ||
| }() | ||
|
|
||
| if err := worker.WaitReady(ctx); err != nil { | ||
| return err | ||
| } | ||
|
|
||
| run, err := engine.RunFile(ctx, "/var/lib/myapp/workflows/report.yaml") | ||
| if err != nil { | ||
| return err | ||
| } | ||
| status, err := run.Wait(ctx) | ||
| ``` | ||
|
|
||
| `WorkerOptions.Coordinators` overrides `DistributedOptions.Coordinators` when non-empty. `WorkerOptions.TLS` overrides `DistributedOptions.TLS` when non-zero. `HealthPort` starts the worker health endpoint on that port; `0` disables it. | ||
|
|
||
| ## Repository Examples | ||
|
|
||
| The Dagu repository contains runnable examples: | ||
|
|
||
| ```sh | ||
| go run ./examples/embedded/local | ||
| go run ./examples/embedded/custom-executor | ||
| DAGU_COORDINATORS=127.0.0.1:50055 go run ./examples/embedded/distributed | ||
| ``` | ||
|
|
||
| The distributed example expects a coordinator to already be listening at the address in `DAGU_COORDINATORS`. | ||
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
Repository: dagucloud/docs
Length of output: 653
🏁 Script executed:
Repository: dagucloud/docs
Length of output: 2217
🌐 Web query:
dagucloud/dagu package API documentation go💡 Result:
The dagucloud/dagu package refers to the Go module for Dagu, a lightweight workflow orchestration engine. Its API documentation is available on pkg.go.dev at https://pkg.go.dev/github.com/dagu-org/dagu (latest v1.30.3 as of 2026-01-04). Note that there appear to be multiple related repositories (dagu-org/dagu, dagucloud/dagu, yohamta/dagu), but dagu-org/dagu is the primary active one with the most recent pkg.go.dev page. This page provides standard Go package documentation including: - Module overview, license (GPL-3.0), Go version compatibility (1.25+). - Dependencies list. - README with installation, quick start, features (YAML workflows, Web UI, CLI, distributed execution via gRPC, AI agent, etc.). - Links to full docs at docs.dagu.sh, GitHub examples, and community. Dagu also exposes a REST API (documented at https://docs.dagu.sh/web-ui/api and https://docs.dagu.cloud/reference/api) for HTTP-based interaction (e.g., /api/v1 or /api/v2 endpoints for DAG management, health checks), but this is the server's HTTP API, not the Go package API. No separate Go client SDK is explicitly documented in results; use the core package for embedding or direct use. For programmatic control, interact via the REST API or gRPC (mentioned for distributed mode). Source code at https://github.com/dagucloud/dagu or https://github.com/dagu-org/dagu.
Citations:
🌐 Web query:
site:pkg.go.dev github.com/dagu-org/dagu New RunYAML WithParams💡 Result:
No explicit documentation or source code snippets for a "RunYAML WithParams" function or method in github.com/dagu-org/dagu were found in the search results. The pkg.go.dev page for github.com/dagu-org/dagu v1.30.3 (most recent, published 2026-01-04) provides general project information, installation, CLI usage like "dagu start hello.yaml", and YAML workflow examples with params in sub-DAG calls (e.g., params: "SOURCE=s3://bucket/data.csv"), but no Go API details matching the query. The official docs are referenced at docs.dagu.sh, but no specific Go library API for RunYAML is detailed in results. The CLI "dagu start <yaml>" likely handles YAML execution, possibly with params via flags or env vars, but no "WithParams" identified. Repository likely at https://github.com/dagu-org/dagu or https://github.com/dagucloud/dagu based on installer scripts.
Citations:
🌐 Web query:
github.com/dagu-org/dagu embedding API Go example💡 Result:
No public example exists for embedding the Dagu workflow engine API directly in Go code. Dagu (github.com/dagu-org/dagu) is primarily a standalone binary workflow engine using YAML-defined DAGs, with REST API for programmatic control when the server runs (docs.dagu.sh/overview/api). pkg.go.dev shows the module exists but no documented public embedding API or examples. An open GitHub issue (
#583) requests adding direct Go execution like dagu.Start("dagName", dagu.Params("key=val")), but it remains unresolved as of latest updates. To use Dagu programmatically, run the binary via exec.Command or interact via its gRPC/REST interfaces in distributed mode. For embedding workflows in Go, consider alternatives like github.com/rhosocial/go-dag.Citations:
🌐 Web query:
site:github.com dagucloud/dagu "func New" "RunYAML"💡 Result:
No function named "func New" referencing or using "RunYAML" was found in the dagucloud/dagu repository or its documentation across multiple searches targeting Go source code, GitHub files, and official docs. The repository is a Go-based workflow engine (https://github.com/dagucloud/dagu) where workflows are defined in YAML and executed via CLI commands like dagu start hello.yaml or API endpoints that accept inline YAML specs (e.g., POST /api/v2/dag-runs with "spec" field). There is an open issue (
#583) proposing a public Go API for direct DAG execution like dagu.Start("dagName"), but it remains unimplemented as of 2026-04-19. Code snippets mentioning YAML parsing exist (e.g., migration to goccy/go-yaml in#913), but no matching "func New" with "RunYAML". Searches for source files like internal/digraph/loader.go surfaced coverage reports but no explicit code with the terms. Official docs at docs.dagu.sh focus on YAML workflows and CLI/API usage, with no embedded Go library API documented.Citations:
🌐 Web query:
dagu-org/dagu Go API embedded example "dagu.New"💡 Result:
No complete, up-to-date embedded Go API example using dagu.New was found in official documentation or pkg.go.dev pages. Dagu (github.com/dagu-org/dagu) is primarily designed as a standalone binary for YAML-based workflow execution via CLI commands like dagu start workflow.yaml, with a web UI and HTTP/gRPC APIs for management. The pkg.go.dev page (v1.30.3) shows the module but no public Go API documentation for direct embedding like dagu.New. An open GitHub issue
#583(dated 2024-06-03, updated 2025-12-06, still open) requests exactly this feature: "Enable Direct Execution of DAGs via Go Code" with proposed API like dagu.Start("someDAG", dagu.Params("FOO=BAR")). This confirms the feature is planned but not implemented as of the latest sources (repo active as of 2026-04). For embedding workflows in Go applications, current alternatives are: - Execute the dagu binary as a subprocess using os/exec (common for CLI tools). - Use the HTTP/gRPC client APIs once server is running (dagu start-all exposes /api/v1). Example subprocess usage: package main import ( "os/exec" ) func main { cmd := exec.Command("dagu", "start", "hello.yaml") err := cmd.Run if err != nil { panic(err) } } Official docs at docs.dagu.sh focus on YAML/CLI/server usage, not programmatic Go embedding. Repo: https://github.com/dagu-org/dagu (3K+ stars, Go primary lang).Citations:
Remove or replace this example—the documented API does not exist in the actual dagucloud/dagu package.
The example shows
dagu.New(),engine.RunYAML(),run.Wait(), anddagu.WithParams()functions that are not currently implemented in thegithub.com/dagu-org/dagupackage. The package is primarily a CLI tool and server with REST/gRPC APIs, not a Go library for direct programmatic embedding. An open GitHub issue (#583, still unresolved) requests exactly this feature.Either remove this example, replace it with an actual working approach (e.g., executing the dagu binary via
os/execor using the REST API when the server runs), or clearly mark it as aspirational/future API subject to change.🤖 Prompt for AI Agents