Routing, Concurrency, Retry

Processing Tasks is what it’s all about, so, this is an important topic to explore and understand. It is quite simple in the general case but there are some nuances to be aware of.

Handlers are how Tasks get executed, typically this is code you provide written in Go.

For a non Go solution see Remote Request Reply Handlers, but reading this page is still good for a grounding understanding since remote Handlers map exactly onto the same concepts.

Example

Below is a handler that sends an email, the task Payload is a serialized object describing an email to send. The deadlines and timeouts are extracted from the Context and the mail is sent.

The Task handler then is a single-purpose piece of code capable of handling 1 type of Task.

func emailNewHandler(ctx context.Context, log asycjobs.Logger, task *asyncjobs.Task) (any, error) {
	// Parse the task payload into an email
	email, err := parseEmail(task.Payload)
	if err != nil { return nil, err }

	// extract the deadline from the context
	deadline, ok := ctx.Deadline()
	if !ok { deadline = time.Now().Add(time.Minute) }

	// send the email using github.com/xhit/go-simple-mail
	server := mail.NewSMTPClient()
        server.Host = "smtp.example.com"
	server.Port = 587
	server.ConnectTimeout = 2 * time.Second
	server.SendTimeout = time.Until(deadline)
	
	client, err := server.Connect()
	if err != nil { return nil, err }

	client.SetFrom(EmailFrom).AddTo(email.To).SetSubject(email.Subject).SetBody(mail.TextHTML, email.Body)
	err = email.Send(client)
	if err != nil { return nil, err }

	log.Infof("Sent an email to %s", email.To)

        return "success", nil
}

Routing Tasks to Handlers

Every client that processes messages must be ready to process all messages found in the Queue. So if you have an EMAIL queue, all running clients must be able to handle all Tasks.

Should there be no appropriate handler the message will fail and enter retries.

Task delivery is handled by asyncjobs.Mux which today is quite minimal, we plan to support Middleware and more later.

router := asyncjobs.NewTaskRouter()
router.HandleFunc("email:new", emailNewHandler)
router.HandleFunc("", emailPassthroughHandler)

client.Run(ctx, router)

Here we set up the above example handler to handle email:new messages and register an handler for other messages. A handler could be set to handle email: messages and it would process all unhandled email related messages.

Concurrency

There are 2 kinds of Concurrency control in effect at any time: Client and Queue.

Client Concurrency

Every client can limit how many concurrent tasks it wish to handle. You might have 4 cores in your instance and so want to run only 4 Handlers at a time.

client, err := asyncjobs.NewClient(asyncjobs.ClientConcurrency(runtime.NumCPU()))

Here we set the client to use runtime.NumCPU() to dynamically allocate maximum concurrency based on available logical CPUs.

Queue Concurrency

When many clients are active against a specific Queue they would all get jobs according to the limit above. You might also want to limit the overall concurrency of all email processing regardless of how many clients you have. With 10 clients each set to allow 10 concurrent you would be handling 100 tasks, but if you know your infrastructure can only support 50 at a time you can limit this on the Queue.

queue := &asyncjobs.Queue{
	Name: "EMAIL",
	MaxConcurrent: 50
}
client, err := asyncjobs.NewClient(asyncjobs.WorkQueue(queue))

This would create a new Queue the first time and set it to 50 maximum concurrent handlers - regardless of how many your clients start.

You can adjust this once created using ajc queue configure EMAIL --concurrent 100.

Task Runtime and Max Tries

The Queue defines how long a Task can be processed, a Task that is not done being processed by that timeout will result in a retry - on the assumption that the handler has crashed. You should set the timeout carefully to avoid duplicate task handling.

queue := &asyncjobs.Queue{
	Name: "EMAIL",
	MaxRunTime: time.Hour,
	MaxTries: 100,
}

Above we define a Queue that will allow a task to be handled for up to 1 hour and will retry it 100 times. Care should be taken to pick these values correctly.

The ajc command line utility can adjust these times post-creation but running clients will still create context Deadlines based on the configuration that was set when they were started.

Terminating Processing

In the earlier example we had these 2 lines:

email, err := parseEmail(task.Payload)
if err != nil { return nil, err }

This would return the parse error from your Handler, the task would then go and get retried later. Thing is if this is a bad Payload it will never pass processing, invalid JSON will always be invalid JSON. You might want to give up on the task early:

email, err := parseEmail(task.Payload)
if err != nil { 
	return nil, fmt.Errorf("invalid task payload: %w", asyncjobs.ErrTerminateTask)
}

Here we return an error that is a asyncjobs.ErrTerminateTask, the task would then be terminated immediately, no future tries will be done and the task state will be set to TaskStateTerminated.

Retry Schedules

When a client determines that a Task has failed and needs to be retried it does so based on a RetryPolicy. The default policy is to retry at increasing intervals between 1 minute and 10 minutes with a jitter applied.

To change to a 50 step policy ranging between 10 minutes and 1 hour use:

client, err := asyncjobs.NewClient(RetryBackoffPolicy(asyncjobs.RetryLinearOneHour))

We have RetryLinearTenMinutes, RetryLinearOneHour and RetryLinearOneMinute pre-defined.

You can create your own schedule - perhaps based on an exponential backoff - by filling in your values in asyncjobs.RetryPolicy or by implementing the asyncjobs.RetryPolicyProvider interface.