Golang Walkthrough
This is a basic walkthrough of publishing Tasks and handling Tasks in Go. A more thorough guide will be written in time.
This is an introductory guide, we have extensive Go reference documentation.
This guide is known to work with Release 0.0.4
Connecting to JetStream
A connection to a JetStream server is needed, you can either prepare a connecting yourself or pass in the name of a NATS Context
NATS have a plethora of connection methods, security approaches, TLS or non TLS and even supports Websockets - you can configure it any way you wish. See the nats.go package for details.
First an example passing in a already prepared nats connection:
nc, err := nats.Connect("localhost:4222")
panicIfErr(err)
client, err := asyncjobs.NewClient(asyncjobs.NatsConn(nc))
panicIfErr(err)
Or if you have a NATS Context called AJC
you can use it:
client, err := asyncjobs.NewClient(asyncjobs.NatsContext("AJC"))
panicIfErr(err)
In both cases a number of options can be supplied to log disconnections, reconnections and more.
Configuring Queues
A Queue is where messages go, you can have many different, named, queues if you wish. If you do not specify any Queue a default one is made called DEFAULT
.
You might make different Queues to set different concurrency limits, different max attempts, maximum validity and more.
queue := &asyncjobs.Queue{Name: "EMAIL", MaxRunTime: 60 * time.Minute, MaxTries: 50}
client, err := asyncjobs.NewClient(asyncjobs.NatsContext("EMAIL"), asyncjobs.WorkQueue(queue))
panicIfErr(err)
Here we attach to or create a new queue called EMAIL
setting some specific options. If the queue already exist we will just attach but not update configuration. You can prevent on-demand creation by setting NoCreate: true
. See go doc for details.
Creating and Enqueueing Tasks
A task can be anything you wish as long as it can serialize to JSON. Tasks have types like email:new
, email-new
or really anything you want, we’ll see later how task types interact with the routing system.
Any number of producers can create tasks from any number of different processes.
First we have a simplistic helper to create a map that describes an email:
func newEmail(to, subject, body string) any {
return map[string]string{"to": to, "subject": subject, "body": body}
}
Now we can create a new email task and enqueue it:
client, err := asyncjobs.NewClient(
asyncjobs.NatsContext("EMAIL"),
asyncjobs.WorkQueue(&asyncjobs.Queue{Name: "EMAIL", NoCreate: true}))
panicIfErr(err)
email := newEmail("user@example.net", "Test Subject", "Test Body")
task, err := asyncjobs.NewTask("email:new", email)
panicIfErr(err)
err = client.EnqueueTask(context.Background(), task)
panicIfErr(err)
The task is sent to the store and placed in the EMAIL
work queue for processing.
Consuming and Processing Tasks
Messages are consumed and handled by matching their type and from a specific Queue. Task processors can run concurrently across different processes and each processes can process a number of tasks concurrently. Per-process and per-Queue concurrency limits can be set.
We’ll show a few more options than typical to get a feel for what’s possible.
client, err := asyncjobs.NewClient(
asyncjobs.NatsContext("EMAIL"),
// 10 Tasks handled by this process concurrently
asyncjobs.ClientConcurrency(10),
// Prometheus stats on 0.0.0.0:8080/metrics
asyncjobs.PrometheusListenPort(8080),
// Logs using an already-prepared logger
asyncjobs.CustomLogger(log),
// Schedules retries on a jittering backoff between 1 and 10 minutes
asyncjobs.RetryBackoffPolicy(asyncjobs.RetryLinearTenMinutes),
// Connects to a queue that should already exist
asyncjobs.BindWorkQueue("EMAIL"))
panicIfErr(err)
router := asyncjobs.NewTaskRouter()
err = router.Handler("email:new", func(ctx context.Context, log asyncjobs.Logger, task *asyncjobs.Task) (any, error) {
log.Printf("Processing task %s", task.ID)
// do work here using task.Payload
return "sent", nil
})
panicIfErr(err)
err = client.Run(ctx, routeR)
panicIfErr(err)
Here we registered one handler for email:new
and a callback that will handle that task up to 10 at a time.
Loading a task
Existing tasks can be loaded which will include their status and other details:
client, err := asyncjobs.NewClient(asyncjobs.NatsContext("EMAIL"))
panicIfErr(err)
task, err := client.LoadTaskByID("24Y0rDk7kMHYHKwMSCxQZOocLH3")
panicIfErr(err)