This walkthrough covers publishing tasks and handling them in Go. A more thorough guide is planned. Complete Go reference documentation is available on pkg.go.dev.
The guide is known to work with Release 0.0.4.
Connecting to JetStream
A connection to a JetStream server is required. Either a prepared NATS connection or the name of a NATS Context can be passed in.
NATS supports many connection methods, security approaches, TLS or non-TLS, and websockets. See the nats.go package for details.
The call attaches to or creates a queue called EMAIL with specific options. If the queue exists, the client attaches without updating the configuration. Setting NoCreate: true prevents on-demand creation. See the Queue reference for details.
Creating and enqueueing tasks
A task can carry any payload that serializes to JSON. Task types such as email:new or email-new drive routing to handlers.
Any number of producers can create tasks from any number of processes.
A helper creates a map describing an email:
funcnewEmail(to, subject, bodystring) any {
returnmap[string]string{"to": to, "subject": subject, "body": body}
}
A new email task can then be created and enqueued:
The task is sent to the store and placed in the EMAIL work queue for processing.
Consuming and processing tasks
Messages are consumed and handled by matching their type on a specific queue. Task processors can run concurrently across processes, and each process can handle multiple tasks concurrently. Per-process and per-queue concurrency limits are configurable.
The following example uses more options than typical:
client, err:=asyncjobs.NewClient(
asyncjobs.NatsContext("EMAIL"),
// 10 Tasks handled by this process concurrentlyasyncjobs.ClientConcurrency(10),
// Prometheus stats on 0.0.0.0:8080/metricsasyncjobs.PrometheusListenPort(8080),
// Logs using an already-prepared loggerasyncjobs.CustomLogger(log),
// Schedules retries on a jittering backoff between 1 and 10 minutesasyncjobs.RetryBackoffPolicy(asyncjobs.RetryLinearTenMinutes),
// Connects to a queue that should already existasyncjobs.BindWorkQueue("EMAIL"))
panicIfErr(err)
router:=asyncjobs.NewTaskRouter()
err = router.Handler("email:new", func(ctxcontext.Context, logasyncjobs.Logger, task*asyncjobs.Task) (any, error) {
log.Printf("Processing task %s", task.ID)
// do work here using task.Payloadreturn"sent", nil})
panicIfErr(err)
err = client.Run(ctx, router)
panicIfErr(err)
The example registers one handler for email:new with a callback that handles up to 10 tasks at a time.
Loading a task
Existing tasks can be loaded, including their status and other details:
This walkthrough covers publishing tasks and handling them with the CLI. It mirrors the Introductory Golang Walkthrough, with shell commands in place of Go code.
The guide is known to work with Release 0.0.4. A video walkthrough covers the same material.
Requirements
The NATS CLI, an optional JetStream server, and the Async Jobs CLI are required.
$ go install github.com/choria-io/asyncjobs/ajc@v0.0.4
JetStream
For an existing JetStream server, add a context to connect to it:
A queue holds messages for processing. Many named queues can coexist. Without an explicit queue, a default called DEFAULT is used.
Different queues support different concurrency limits, maximum attempts, and validity periods.
$ ajc queue add EMAIL --run-time 1h --tries 50
EMAIL Work Queue:
Entries: 0 @ 0 B
Memory Based: false
Replicas: 1
Archive Period: forever
Max Task Tries: 50
Max Run Time: 1h0m0s
Max Concurrent: 100
Max Entries: unlimited
The command attaches to or creates a queue called EMAIL with specific options.
Creating and enqueueing tasks
A task can carry any payload that serializes to JSON. Task types such as email:new or email-new drive routing to handlers.
Any number of producers can create tasks from any number of processes.
$ ajc task add --queue EMAIL email:new '{"to":"user@example.net", "subject":"Test Subject", "body":"Test Body"}'
Enqueued task 24YUZF4MzOCLgI7kpwrGtT4lYnS
$ ajc task view 24YUZF4MzOCLgI7kpwrGtT4lYnS
Task 24YUZF4MzOCLgI7kpwrGtT4lYnS created at 02 Feb 22 13:04:26 UTC
Payload: 85 B
Status: new
Queue: EMAIL
Tries: 0
Consuming and processing tasks
The CLI can process tasks through a shell command. Create a basic command:
$ touch /tmp/send-email.sh
$ vi /tmp/send-email.sh
$ chmod a+x /tmp/send-email.sh
#!/bin/bash
echo '{"status":"success"}'
Run jobs from the EMAIL queue, five concurrently:
$ ajc task process "" EMAIL 5 /tmp/send-email.sh --monitor 8080
WARN[0000] Exposing Prometheus metrics on port 8080
INFO[0000] Running task 24YUZF4MzOCLgI7kpwrGtT4lYnS try 1
INFO[0000] Task 24YUZF4MzOCLgI7kpwrGtT4lYnS completed after 2.425439ms and 1 tries with 18 B payload
The empty task type matches all tasks. Prometheus stats are exposed at http://localhost:8080/metrics.
After processing, the task shows as done:
$ ajc task view 24YUZF4MzOCLgI7kpwrGtT4lYnS
Task 24YUZF4MzOCLgI7kpwrGtT4lYnS created at 02 Feb 22 13:04:26 UTC
Payload: 85 B
Status: complete
Completed: 02 Feb 22 13:07:09 UTC (2m42s)
Queue: EMAIL
Tries: 1
$ ajc queue ls
╭─────────────────────────────────────────────────────────────────────────────╮
│ Work Queues │
├─────────┬───────┬───────┬──────────┬───────────┬───────────┬────────────────┤
│ Name │ Items │ Size │ Replicas │ Max Tries │ Max Items │ Max Concurrent │
├─────────┼───────┼───────┼──────────┼───────────┼───────────┼────────────────┤
│ EMAIL │ 0 │ 0 B │ 1 │ 50 │ unlimited │ 100 │
│ DEFAULT │ 3 │ 489 B │ 1 │ 100 │ unlimited │ 100 │
╰─────────┴───────┴───────┴──────────┴───────────┴───────────┴────────────────╯
List tasks and their status:
$ ajc task ls
╭───────────────────────────────────────────────────────────────────────────────────────────────╮
│ 2 Tasks │
├─────────────────────────────┬───────────┬────────────────────────┬──────────┬─────────┬───────┤
│ ID │ Type │ Created │ State │ Queue │ Tries │
├─────────────────────────────┼───────────┼────────────────────────┼──────────┼─────────┼───────┤
│ 24YUZF4MzOCLgI7kpwrGtT4lYnS │ email:new │ 02 Feb 22 13:04:26 UTC │ complete │ EMAIL │ 1 │
│ 24YV5AyE6epR8XMpIdIzcppYyBK │ email:new │ 02 Feb 22 13:08:41 UTC │ complete │ EMAIL │ 1 │
╰─────────────────────────────┴───────────┴────────────────────────┴──────────┴─────────┴───────╯
Show a general overview:
$ ajc info
Tasks Storage:
Entries: 5 @ 2.2 KiB
Memory Based: false
Replicas: 1
Archive Period: forever
First Entry: 02 Feb 22 13:01:19 UTC (14m24s)
Last Update: 02 Feb 22 13:08:41 UTC (7m2s)
DEFAULT Work Queue:
Entries: 3 @ 489 B
Memory Based: false
Replicas: 1
Archive Period: forever
Max Task Tries: 100
Max Run Time: 1m0s
Max Concurrent: 100
Max Entries: unlimited
First Item: 02 Feb 22 13:01:19 UTC (14m24s)
Last Item: 02 Feb 22 13:02:16 UTC (13m27s)
EMAIL Work Queue:
Entries: 0 @ 0 B
Memory Based: false
Replicas: 1
Archive Period: forever
Max Task Tries: 50
Max Run Time: 1h0m0s
Max Concurrent: 100
Max Entries: unlimited
Last Item: 02 Feb 22 13:08:41 UTC (7m2s)
Scheduled Tasks
The Task Scheduler supports cron-like entries that create tasks on demand.
A separate supervisor process evaluates the configured schedules and creates the tasks. The supervisor is built into the ajc binary and is deployable in any container manager.
The scheduler supports highly-available clusters. Members perform leader election, and one member schedules tasks at a time. Restarts and signals are not required when schedules are added, removed, or updated.
Deploying to Kubernetes is supported through Helm charts. The scheduler can also run anywhere else, as described below.
Schedule overview
A scheduled task combines a cron-like schedule with task properties. The scheduler creates new jobs on demand, using those properties as templates.
// ScheduledTask represents a cron like schedule and task properties that will// result in regular new tasks to be created machine scheduletypeScheduledTaskstruct {
// Name is a unique name for the scheduled taskNamestring`json:"name"`// Schedule is a cron specification for the scheduleSchedulestring`json:"schedule"`// Queue is the name of a queue to enqueue the task intoQueuestring`json:"queue"`// TaskType is the type of task to createTaskTypestring`json:"task_type"`// Payload is the task payload for the enqueued tasksPayload []byte`json:"payload"`// Deadline is the time after scheduling that the deadline would beDeadlinetime.Duration`json:"deadline,omitempty"`// MaxTries is how many times the created task could be triedMaxTriesint`json:"max_tries"`// CreatedAt is when the schedule was createdCreatedAttime.Time`json:"created_at"`}
Valid schedules use the common unix cron format, including syntax like */5. Shortcuts such as @yearly, @monthly, @weekly, @daily, and @hourly are supported, along with @every 10m, where 10m is a Go standard duration.
Go API
Adding and loading scheduled tasks resembles working with a normal task:
client, _:=asyncjobs.NewClient(asyncjobs.NatsContext("AJC"))
// The deadline being an hour from now results in a Scheduled Task with a 1 hour deadline settask, _:=asyncjobs.NewTask("email:monthly", nil, asyncjobs.TaskDeadline(time.Now().Add(time.Hour)))
// Create the scheduleerr:=client.NewScheduledTask("EMAIL_MONTHLY_UPDATE", "@monthly", "EMAIL", task)
// Load itst, _:=client.LoadScheduledTaskByName("EMAIL_MONTHLY_UPDATE")
// Remove iterr = client.RemoveScheduledTask("EMAIL_MONTHLY_UPDATE")
CLI management
The CLI is new, and some aspects may change.
Adding and removing scheduled tasks
$ ajc task cron add EMAIL_MONTHLY_UPDATE "0 0 1 * *" email:monthly --queue EMAIL --deadline 12h
Scheduled Task EMAIL_MONTHLY_UPDATE created at 17 Feb 22 17:40:37 UTC
Schedule: 0 0 1 * *
Queue: EMAIL
Task Type: email:monthly
Payload: 0 B
Scheduling Deadline: 12h0m0s
The command adds a scheduled task that runs monthly. The shortcut monthly also works. Each run creates a task of type email:monthly in the EMAIL queue with a 12-hour deadline from creation.
To enter the @yearly schedule, use just yearly. The CLI interprets the @.
Remove a scheduled task by name with ajc task cron delete EMAIL_MONTHLY_UPDATE. The -f flag makes the command non-interactive.
Viewing schedules
List all schedules:
$ ajc task cron list
╭───────────────────────────────────────────────────────────────────────────────────╮
│ 1 Scheduled Task(s) │
├──────────────────────┬───────────┬───────┬───────────────┬────────────────────────┤
│ Name │ Schedule │ Queue │ Task Type │ Created │
├──────────────────────┼───────────┼───────┼───────────────┼────────────────────────┤
│ EMAIL_MONTHLY_UPDATE │ 0 0 1 * * │ EMAIL │ email:monthly │ 17 Feb 22 17:40:37 UTC │
╰──────────────────────┴───────────┴───────┴───────────────┴────────────────────────╯
$ ajc task cron list --names
EMAIL_MONTHLY_UPDATE
Deploying to Kubernetes is supported through Helm charts. The scheduler can also run elsewhere using the CLI below.
Each scheduler needs a name, ideally unique. The name appears in logs and in leader elections.
$ ajc task cron scheduler $(hostname -f) --monitor 8080 --context AJC
INFO[19:21:29] Starting leader election as dev1.example.net
INFO[19:21:29] Registered a new item EMAIL_MONTHLY_UPDATE on queue EMAIL: 0 0 1 * *
INFO[19:21:29] Loaded 1 scheduled task(s)
INFO[19:21:42] Became leader, tasks will be scheduled
Logs record schedule additions, schedule removals, and task-creation events. Prometheus metrics are served at port 8080 on /metrics.
Any number of schedulers can run. Members perform leader election. All members log schedule updates, but only the leader creates tasks.
Note
During a leadership change, some tasks may not be scheduled. A later release will address this.
The ajc info output includes scheduler state:
$ ajc info
...
Leader Elections:
Entries: 1 @ 139 B
Memory Based: false
Replicas: 1
Elections:
task_scheduler: dev1.example.net
...
The current leader of the task_scheduler election group appears in the output.
Handlers in Docker
Version 0.0.4 introduces a packager that builds handler containers from configuration.
Preparing handlers
Go based
One handler per Go package is recommended. The packager pulls configured handlers into a small microservice.
packagehandlerimport (
aj"github.com/choria-io/asyncjobs")
funcAsyncJobHandler(ctxcontext.Context, logaj.Logger, task*aj.Task) (any, error) {
// process your email}
Place this in any package, for example git.example.com/example/email/new. Multiple handlers are supported as long as each resides in its own package.
Other languages
Other languages are supported through NATS Request-Reply. Implement them against the protocol described in Remote Request-Reply Handlers.
Packaging
In an empty directory, create a file asyncjobs.yaml with the following content:
# The NATS Context to connect with.## Same as NatsContext() client optionnats: AJ_EMAIL# The Work Queue to consume.## Same as BindWorkQueue() client optionqueue: EMAIL# The package name to generatename: git.example.com/example# The version of github.com/choria-io/asyncjobs to use,# something go get would accept. Defaults to the same# as the CLI versionasyncjobs: latest# Use the RetryLinearTenMinutes retry policy,## Equivalent to client RetryBackoffPolicyName() optionretry: 10m# Discard tasks that reach complete state.## Same as DiscardTaskStates() client optiondiscard:
- completed# List of Task handlerstasks:
- type: email:newpackage: git.example.com/example/email/newversion: v0.2.0 - type: audit:logremote: true - type: webhook:callcommand: webhook/call.sh
The webhook:call handler is a shell script that must exist at commands/webhook/call.sh and is copied into the container. Handlers often need dependencies absent from the default container. Use the generated container as a FROM base and add dependencies through the alpine package system.
Generate the package:
$ ajc package docker
╭────────────────────────────────────────────────────────────────╮
│ Handler Microservice Settings │
├────────────────────────────────┬───────────────────────────────┤
│ Package Name │ git.example.com/example │
│ NATS Context Name │ AJ_EMAIL │
│ Work Queue │ EMAIL │
│ Task Handlers │ 2 │
│ github.com/choria-io/asyncjobs │ latest │
╰────────────────────────────────┴───────────────────────────────╯
╭───────────────────────────────────────────────────────────────────────────────────────────────╮
│ Handler Configuration and Packages │
├──────────────┬───────────────────────┬────────────────────────────────────────────────────────┤
│ Task Type │ Handler Kind │ Detail │
├──────────────┼───────────────────────┼────────────────────────────────────────────────────────┤
│ email:new │ Go Package │ git.example.com/example/email/new@v0.2.0 │
│ webhook:call │ External Command │ webhook/call.sh │
│ audit:log │ Request-Reply Service │ CHORIA_AJ.H.T.audit:log │
╰──────────────┴───────────────────────┴────────────────────────────────────────────────────────╯
Build your container using 'docker build'
$ ls -l
total 12
-rw-rw-r-- 1 rip rip 166 Feb 8 17:48 asyncjobs.yaml
-rw-r--r-- 1 rip rip 713 Feb 8 20:01 Dockerfile
-rw-r--r-- 1 rip rip 2540 Feb 8 20:01 main.go
drwxrwxr-x 3 rip rip 19 Feb 8 17:48 commands
A main.go is generated and built into a container:
$ docker run -ti -v "/home/myname/work/email_service/nats:/handler/config/nats" -p 8080:8080 --rm example/email:latest
INFO[19:07:39] Connecting using Context AJ_EMAIL consuming work queue EMAIL with concurrency 4
WARN[19:07:39] Exposing Prometheus metrics on port 8080
The nats configuration directory is mounted to /handler/config/nats, where the container looks for the context. Additional files such as credentials can be placed in the container and referenced at their in-container paths.
Environment configuration
The following environment variables influence the container:
Variable
Description
YAML Item
AJ_WORK_QUEUE
The name of the queue to connect to
queue
AJ_NATS_CONTEXT
The NATS context to use for connectivity
nats
XDG_CONFIG_HOME
The prefix for NATS context configuration, defaults to /handler/config
AJ_CONCURRENCY
The number of workers to run, defaults to runtime.NumCPU()
AJ_DEBUG
Set to 1 to enable debug logging
AJ_RETRY_POLICY
Sets the retry backoff policy, one of 10m, 1h, 1m, default
retry
Handlers in K8s
Helm charts are published to deploy the system to Kubernetes.
Requirements
NATS Server with JetStream
A NATS JetStream server is required. Choria users can enable Choria Streams. The NATS community publishes its own NATS Helm Charts.
Connection context
NATS Contexts configure the connection between asyncjobs and NATS. For an existing context configured through the NATS CLI, run nats context show CONTEXTNAME --json to retrieve the keys and values.
TLS certificates for NATS authentication can be stored in a secret called task-scheduler-tls. NATS credential files and similar data fit the same pattern: