Subsections of References
Tasks lifecycle
A task is the basic unit of work, scheduled and processed by a handler.
Task storage
Tasks are stored in NATS JetStream in a stream called CHORIA_AJ_TASKS. Each task lives on a subject keyed by its ID: CHORIA_AJ.T.<TASK ID>. Task IDs must be unique.
Task storage defaults to file-based, non-replicated, with no limits on count or retention. An appropriate retention time is recommended, as shown below.
On a new setup, task storage can be initialized with custom defaults:
In code, the same initialization happens on the first client start:
Once created, the retention period can be adjusted:
Task properties
Tasks have several properties that influence processing:
| Property | Description |
|---|---|
Type | A string like email:new; the router dispatches the task to any handler matching email:new, email, or "" |
Payload | Content of the task that the handler reads to drive its behavior |
Deadline | Checked before calling the handler; tasks past their deadline are not processed |
MaxTries | Tasks that have already had this many tries are terminated; defaults to 10 since 0.0.8 |
Dependencies | Task IDs that must complete successfully before this task runs, since 0.0.8 |
LoadDependencies | For tasks with dependencies, load dependency TaskResults into DependencyResults before the handler, since 0.0.8 |
Setting other properties on new tasks should be avoided.
Task outcomes
During a task lifecycle, several properties are set. State is the main one and may be updated many times as described below. The others include:
| Property | Description |
|---|---|
Queue | Once enqueued, the name of the queue holding the task |
Result | On success, set with the time of completion and the result payload |
State | As described below |
CreatedAt | Timestamp of first enqueue |
LastTriedAt | When not nil, the last timestamp at which a handler was called |
Tries | Count of times the task has been sent to handlers |
LastErr | When not empty, the text of the most recent error from the handler |
Task states
Tasks have many possible states. The processor updates the task as it traverses them.
| State | Description |
|---|---|
TaskStateUnknown | No state is set; uninitialised or corrupt task |
TaskStateNew | A brand new task, either never processed or possibly not enqueued |
TaskStateActive | A task that is being handled by a handler |
TaskStateRetry | A task that had a previous failure and is scheduled for a later retry, or one that was manually retried |
TaskStateExpired | A task that was attempted to be processed past its deadline |
TaskStateTerminated | A handler returned ErrTerminateTask, so no further retries are done |
TaskStateCompleted | Successfully completed task |
TaskStateQueueError | Task was created but the work queue entry could not be made |
TaskStateBlocked | A task is waiting on its dependencies, since 0.0.8 |
TaskStateUnreachable | A task cannot execute because a dependent task failed, since 0.0.8 |
Some termination cases are not reflected in the task state. For example, a task with no processor over the full queue retention window is orphaned without a final state update.
Task dependencies
Since 0.0.8, task dependencies are supported. A task with dependencies starts in TaskStateBlocked. When the processor schedules it, dependencies are checked; if all are complete, the task becomes active.
If a dependent task reaches a final failure state, the task becomes TaskStateUnreachable as a final state.
Retrying a task
A task that still exists in the task store can be retried by ID. Any work queue items for the task are discarded, the state is set to TaskStateRetry, the Result is cleared, and the task is enqueued again for processing.
The CLI can also retry tasks with ajc task retry 24atXzUomFeTt4OK4yNJNafNQR3.
End state discard
Without additional configuration, tasks are retained forever, or for the duration of the task store retention policy.
The client can be configured to discard tasks in specific end states:
The client first saves the state change and then discards the task. Saving first allows lifecycle watchers to observe the terminal state. This behavior will change once #15 is resolved.
Flow diagram
The diagram includes the task relationships introduced in 0.0.8.
Routing, Concurrency, Retry
Processing tasks is the core of the system. The general case is straightforward, but several nuances are worth knowing.
Handlers execute tasks. Handlers are typically code supplied by the caller, written in Go.
For a non-Go solution, see Remote Request-Reply Handlers. This page still provides useful grounding, since remote handlers map directly onto the same concepts.
Example
The following handler sends an email. The task payload is a serialized object describing the email. Deadlines and timeouts come from the context.
A task handler is a single-purpose piece of code capable of handling one type of task.
Routing tasks to handlers
Every client that processes messages must be ready to process every message found in the queue. A client connected to an EMAIL queue must handle every task on that queue.
A message with no matching handler fails and enters retries.
Task delivery is handled by asyncjobs.Mux.
The router above dispatches email:new tasks to emailNewHandler and all other tasks to emailPassthroughHandler. A handler registered for email: would process all unmatched email-related tasks.
Middleware
Cross-cutting behavior such as logging, metrics, tracing, authentication, or panic recovery is typically expressed as middleware. A Middleware is a function that wraps a HandlerFunc and returns a new one:
Middleware should normally invoke next(ctx, log, t) and return its result and error unchanged. To short-circuit (for example on an authentication failure) return without calling next. Always use the (ctx, log, t) arguments passed to the returned closure; capturing values from the surrounding scope at construction time will leak them across dispatches.
Global Middleware
Use Mux.Use to register middleware that applies to every handler:
Middleware registered earlier runs outermost, so the example above resolves at dispatch time to Recovery(Logging(emailNewHandler)). Put a recovery middleware first if you want it to catch panics from later middleware as well as the handler.
Use may be called before or after HandleFunc; existing handlers are rewrapped so subsequent dispatches see the new chain. Dispatches already in flight keep the chain they previously resolved.
Per-Route Middleware
HandleFunc accepts optional middleware that applies only to that route, inside any global middleware:
The dispatch chain is Logging(RequireSignedTask(emailNewHandler)) — global middleware always wraps per-route middleware, which always wraps the handler.
Reusable Bundles
asyncjobs.Chain composes several middlewares into one, useful when the same combination is reused across routes:
Order is preserved: Chain(a, b, c) runs a outermost, then b, then c.
Unrouted Tasks
The built-in handler returned for task types with no matching route is intentionally not wrapped by middleware. This keeps unrouted tasks from generating logging or metric noise; if you want to observe them, register a catch-all handler with HandleFunc("", ...) or a prefix that matches them.
Both Use and HandleFunc return ErrInvalidMiddleware if any supplied middleware is nil.
Concurrency
Two kinds of concurrency control are in effect at any time: client and queue.
Client concurrency
Every client can limit how many concurrent tasks it handles. A host with four cores might run only four handlers at a time.
runtime.NumCPU() dynamically allocates maximum concurrency based on available logical CPUs.
Queue concurrency
When many clients are active against a specific queue, each receives jobs up to its own limit. Overall concurrency across a queue can also be capped. With 10 clients, each allowing 10 concurrent tasks, the total would be 100; an infrastructure that only supports 50 at a time can enforce that on the queue.
This creates a new queue on first use and caps it at 50 concurrent handlers, regardless of how many clients start.
Adjust the value after creation with ajc queue configure EMAIL --concurrent 100.
Task runtime and max tries
The queue defines how long a task can be processed. A task still running past that timeout is retried, on the assumption that the handler has crashed. Choose the timeout carefully to avoid duplicate handling.
The queue above allows a task to be handled for up to one hour and retries up to 100 times. Choose these values with care.
The ajc CLI can adjust these values post-creation. Running clients still build context deadlines from the configuration present when they were started.
Terminating processing
The earlier example contained:
This returns the parse error from the handler, and the task is retried later. A bad payload will never parse; invalid JSON will always be invalid JSON. In that case, give up on the task immediately:
The returned error wraps asyncjobs.ErrTerminateTask. The task is terminated immediately, no further retries run, and the state is set to TaskStateTerminated.
Retry schedules
When the client determines that a task has failed and must be retried, it consults a RetryPolicy. The default retries at increasing intervals between one and ten minutes, with jitter applied.
To switch to a 50-step policy ranging from 10 minutes to 1 hour:
The predefined policies are RetryLinearTenMinutes, RetryLinearOneHour, and RetryLinearOneMinute.
Custom schedules, such as exponential backoff, can be built by populating asyncjobs.RetryPolicy or by implementing the asyncjobs.RetryPolicyProvider interface.
Request-Reply Handlers
Handlers are typically implemented in Go and compiled into the binary for best performance.
Other programming languages are supported through NATS Request-Reply. A remote service can be written in any language and called out to over NATS.
Review Routing, Handlers, Concurrency, and Retry first for background. Remote handlers map directly onto the same model.
Registering with the router
This registers with the router for tasks of type email:new. Tasks of that type are dispatched via NATS Request-Reply.
When all handlers in a deployment use request-reply, the Docker-based runner achieves the same outcome without any Go code.
Protocol
A lightweight JSON-plus-headers protocol communicates with the remote service. The protocol supports returning errors, including the ErrTerminateTask behavior.
The service must listen on CHORIA_AJ.H.T.email:new, typically in a NATS queue group, replacing email:new with the configured task type. A handler registered with task type "" handles all tasks, and the service listens on CHORIA.AJ.H.T.catchall.
Tasks
A request for a task handler carries these headers:
| Header | Value |
|---|---|
AJ-Content-Type | application/x-asyncjobs-task+json |
AJ-Handler-Deadline | 2009-11-10T23:00:00Z |
The content type is identical for all tasks. The deadline is a UTC timestamp by which the remote service must complete handling to avoid a timeout.
The body is a JSON-encoded Task.
Responses from the service may set these headers:
| Header | Description |
|---|---|
AJ-Error | Indicates an error was encountered; the value is set as the task error and the task is retried later |
AJ-Terminate | Terminates the task via ErrTerminateTask; the value becomes additional text in the error. No further retries are performed |
The body of the response is stored on the task unmodified.
Demonstration
The nats CLI tool shows this in action:
The CLI received the job with the two headers set and the expected payload, responded with success, and the task completed.
Lifecycle Events
Lifecycle events are small JSON messages published to notify observers about stages of task processing and client life.
Only one event type is supported today: a notification about changes to task state. Additional events will follow. A future release will emit Cloud Events standard messages.
Events are informational. Delivery is not guaranteed and events are not persisted. They can loosely couple systems that react to task completion, but they must not be relied on for exactly-once or guaranteed notification.
Event types
Each event carries a type such as io.choria.asyncjobs.v1.task_state, exposed in Go as asyncjobs.TaskStateChangeEventType. The type string aids parsing and routing through other systems.
Parsing an event
A helper parses any supported event for use with the common Go type-switch pattern:
TaskStateChangeEvent
This event is published for any state change of a task. A task can be watched by ID, or all tasks can be watched at once.
These events are published to CHORIA_AJ.E.task_state.*, where the final token is the job ID.
On the wire, the messages look like this. The task_age field is a Go duration.
Security
Handlers sometimes run in untrusted locations and must only execute tasks from trusted creators.
Tasks can be signed with ed25519 private keys. Clients can be configured to accept only tasks created and signed with a specific key. When keys are configured, signatures on all tasks are required by default. Clients can also be configured to accept unsigned tasks, while still verifying signatures when present.
Create keys and save them to a file using hex.Encode():
Configure the client:
On the command line, ajc tasks supports --sign and --verify flags. Each accepts either a hex-encoded key or a path to a file containing a hex-encoded key.
Docker containers built with ajc package docker can set a key via the AJ_VERIFICATION_KEY environment variable. Optional signatures can be enabled at build time by setting task_signatures_optional: true in asyncjobs.yaml.
Terminology
Several terms recur throughout this system.
JetStream
The underlying storage and work queue manager. See the NATS project documentation for background.
Work queue
A work queue is a JetStream stream with the WorkQueue retention policy. The underlying stream backing the DEFAULT queue is called CHORIA_AJ_Q_DEFAULT.
Work item
Work items are placed in the work queue and scheduled by JetStream. The contents of the work queue are ProcessItem messages encoded as JSON.
Client
Connects to JetStream and manages the enqueueing and routing of tasks.
Handler
A handler is a function with the signature func(context.Context, *asyncjobs.Task) (any, error).
Router
The router locates handlers for a task using the Type field as a matcher.
See Routing, Handlers, Concurrency, and Retry.
Task
A task is a specific kind of work item handled by a handler via the router. Tasks are the primary unit of work. Other kinds of work item are anticipated, such as scheduled items. At present, tasks are the only kind.
Tasks have timestamps, statuses, and more. See Task Lifecycle.
Lifecycle event
A lifecycle event is a small message published to notify listeners about state changes. Only task state changes are reported today. Processor start and stop events and similar signals will follow.
See Lifecycle Events.
Scheduled task
A cron-like schedule that creates tasks on demand. A task scheduler process must be running. See Scheduled Tasks.
HTTP API
The asyncjobs library can be exposed over an HTTP/JSON API so non-Go services and operators can enqueue tasks, manage queues, and inspect schedules without linking the library. The contract is OpenAPI 3.0; the canonical document lives at api/openapi.yaml.
The server is shipped as the httpserver subpackage and a launchable ajc server run subcommand.
Quickstart
Start the server on loopback and call it with curl:
ajc server run flags
| Flag | Default | Notes |
|---|---|---|
--bind | 127.0.0.1:8080 | Listen address. Non-loopback requires --unsafe-bind or mTLS. |
--tls-cert, --tls-key | (none) | Enable TLS. Both must be supplied together. |
--tls-client-ca | (none) | CA bundle used to verify client certificates. Enables mTLS. |
--queue | (none) | Work queue the API enqueues to. Required unless --allow-create-default. |
--allow-create-default | false | Permit auto-creating the implicit DEFAULT queue. |
--read-timeout | 30s | Per-request read timeout. |
--write-timeout | 30s | Per-request write timeout. |
--max-body | 524288 | Maximum request body size, in bytes (default 512 KiB). |
--unsafe-bind | false | Acknowledge that a non-loopback bind exposes an unauthenticated server. |
The server binds to a dedicated http.ServeMux, never the default mux, and applies a MaxBytesReader to every request body. Request bodies and the Authorization header are never logged.
Authentication
The server performs no authentication and no authorization. The default loopback bind is sufficient for same-host integrations; anything else must be fronted by a reverse proxy or secured with mTLS.
A non-loopback --bind is rejected at startup unless one of the following holds:
- Reverse proxy (nginx, oauth2-proxy, envoy, caddy, Traefik, Tailscale, …): pass
--unsafe-bindto acknowledge that you have fronted the server with a proxy that terminates authentication. - mTLS: pass
--tls-cert,--tls-key, and--tls-client-ca. When a client-CA bundle is configured, every TLS client must present a certificate chain that verifies against it. Certificate subjects are not mapped to scopes; successful verification alone grants full access.
/v1/info advertises the mode as auth: "none" or auth: "mtls" so callers and health checks can assert it externally.
Reverse-proxy requirements
Any fronting proxy must:
- Terminate authentication (bearer token, OIDC, SSO, mTLS, whatever fits) before reaching the asyncjobs listener.
- Preserve or strip request headers per the proxy’s own policy; asyncjobs itself reads none.
- Pass through
413 Payload Too Largeand429 Too Many Requestsemitted by the backend.
Client requests rejected by the proxy receive the proxy’s error envelope (a 401 or 403), not the asyncjobs envelope — those status codes do not originate from this server.
Rotating mTLS material
There is no hot-reload for TLS or client-CA material; restart the server to pick up new files.
Endpoint catalog
All paths are versioned under /v1.
Meta
| Method | Path | Notes |
|---|---|---|
| GET | /healthz | Liveness probe. Does not touch storage. |
| GET | /readyz | Readiness probe. Returns the same body shape on 200/503. |
| GET | /v1/openapi.json | The full OpenAPI document, generated from the embedded YAML. |
| GET | /v1/info | Server version, auth mode, queue count, task count. |
| GET | /v1/retry-policies | The known retry policies and their intervals. |
Tasks
| Method | Path | Notes |
|---|---|---|
| POST | /v1/tasks | Enqueue a task. Honors Idempotency-Key. |
| GET | /v1/tasks | Snapshot list. Supports limit, state, queue, type, created_since, stream=ndjson. Rate-limited. |
| GET | /v1/tasks/{id} | Fetch a task. |
| DELETE | /v1/tasks/{id} | Remove a task from storage. |
| POST | /v1/tasks/{id}/retry | Retry a single task by id. |
| POST | /v1/tasks/retry | Bulk retry, capped at 100 ids; returns per-item results. |
Queues
| Method | Path | Notes |
|---|---|---|
| POST | /v1/queues | Create a queue. |
| GET | /v1/queues | List queues. Raw JetStream and consumer detail are never returned. |
| GET | /v1/queues/{name} | Queue detail. |
| DELETE | /v1/queues/{name} | Delete a queue. |
| POST | /v1/queues/{name}/purge | Purge all entries. |
Schedules
| Method | Path | Notes |
|---|---|---|
| POST | /v1/schedules | Create a scheduled task. |
| GET | /v1/schedules | List schedules. |
| GET | /v1/schedules/{name} | Schedule detail. |
| DELETE | /v1/schedules/{name} | Remove a schedule. |
Error envelope
Every error response shares the same shape:
code is a stable string drawn from a closed enum (invalid_argument, not_found, conflict, duplicate, rate_limited, payload_too_large, unavailable, signature_required, signature_invalid, dependency_failed, internal). Treat code as the machine-readable surface; message is for operators.
details.reason carries a library-specific identifier (e.g. task_not_found, queue_already_exists, payload_both_set) and lets clients branch on a finer-grained signal than code alone provides. details.field is set on validation failures to the offending field name.
Quirks worth remembering
Payload encoding. Create requests accept either payload (any JSON value, server-encoded) or payload_base64 (pre-encoded bytes). Setting both yields HTTP 400 with details.reason: payload_both_set. Task and schedule responses always return payload as a base64 string; this matches the library’s []byte JSON encoding. Decode before use, and use payload_base64 when round-tripping a fetched task.
Duration vs RFC3339 dates. Queue duration fields (max_age, max_runtime) accept Go duration strings (30s, 5m) on requests but echo int64 nanoseconds on responses. Task deadline is an absolute RFC3339 timestamp. Schedule deadline_offset is a Go duration string applied at fire-time. The names differ deliberately so the schemas cannot silently collide.
Rate-limited list. GET /v1/tasks opens a fresh ephemeral JetStream consumer per call and is serialized server-side. Concurrent callers receive HTTP 429 (code: "rate_limited"). stream=ndjson tail mode is capped at roughly 60 seconds by the underlying library; clients should reconnect.
Default-queue guard. POST /v1/queues rejects name: "DEFAULT" unless the server was started with ajc server run --allow-create-default. The library would otherwise auto-create it, masking misconfigured deployments.
Signing is the caller’s responsibility. When the deployment has a TaskVerificationKey configured, the HTTP server does not sign on the caller’s behalf. Callers must supply a pre-computed signature field.
Idempotency. POST /v1/tasks honors the Idempotency-Key header for a 10-minute window: a repeat request with the same key returns the existing task as 200 with the same Location header rather than enqueuing twice.
Generating client SDKs
The OpenAPI document at /v1/openapi.json is OpenAPI 3.0.3 — emitted from the embedded YAML at server boot. Use a generator that supports 3.0:
For the openapi-generator-cli tool, pin the input version with --spec-version 3.0.3 if your generator defaults to 3.1; the asyncjobs spec does not use 3.1-only constructs but generators may emit warnings otherwise.
The repository also exposes the YAML directly at api/openapi.yaml for offline generation.