497 lines
15 KiB
Go
497 lines
15 KiB
Go
// Copyright 2011 Google Inc. All rights reserved.
|
|
// Use of this source code is governed by the Apache 2.0
|
|
// license that can be found in the LICENSE file.
|
|
|
|
/*
|
|
Package taskqueue provides a client for App Engine's taskqueue service.
|
|
Using this service, applications may perform work outside a user's request.
|
|
|
|
A Task may be constructed manually; alternatively, since the most common
|
|
taskqueue operation is to add a single POST task, NewPOSTTask makes it easy.
|
|
|
|
t := taskqueue.NewPOSTTask("/worker", url.Values{
|
|
"key": {key},
|
|
})
|
|
taskqueue.Add(c, t, "") // add t to the default queue
|
|
*/
|
|
package taskqueue
|
|
|
|
import (
|
|
"errors"
|
|
"fmt"
|
|
"net/http"
|
|
"net/url"
|
|
"time"
|
|
|
|
"github.com/golang/protobuf/proto"
|
|
"golang.org/x/net/context"
|
|
|
|
"google.golang.org/appengine"
|
|
"google.golang.org/appengine/internal"
|
|
dspb "google.golang.org/appengine/internal/datastore"
|
|
pb "google.golang.org/appengine/internal/taskqueue"
|
|
)
|
|
|
|
var (
|
|
// ErrTaskAlreadyAdded is the error returned by Add and AddMulti when a task has already been added with a particular name.
|
|
ErrTaskAlreadyAdded = errors.New("taskqueue: task has already been added")
|
|
)
|
|
|
|
// RetryOptions let you control whether to retry a task and the backoff intervals between tries.
|
|
type RetryOptions struct {
|
|
// Number of tries/leases after which the task fails permanently and is deleted.
|
|
// If AgeLimit is also set, both limits must be exceeded for the task to fail permanently.
|
|
RetryLimit int32
|
|
|
|
// Maximum time allowed since the task's first try before the task fails permanently and is deleted (only for push tasks).
|
|
// If RetryLimit is also set, both limits must be exceeded for the task to fail permanently.
|
|
AgeLimit time.Duration
|
|
|
|
// Minimum time between successive tries (only for push tasks).
|
|
MinBackoff time.Duration
|
|
|
|
// Maximum time between successive tries (only for push tasks).
|
|
MaxBackoff time.Duration
|
|
|
|
// Maximum number of times to double the interval between successive tries before the intervals increase linearly (only for push tasks).
|
|
MaxDoublings int32
|
|
|
|
// If MaxDoublings is zero, set ApplyZeroMaxDoublings to true to override the default non-zero value.
|
|
// Otherwise a zero MaxDoublings is ignored and the default is used.
|
|
ApplyZeroMaxDoublings bool
|
|
}
|
|
|
|
// toRetryParameter converts RetryOptions to pb.TaskQueueRetryParameters.
|
|
func (opt *RetryOptions) toRetryParameters() *pb.TaskQueueRetryParameters {
|
|
params := &pb.TaskQueueRetryParameters{}
|
|
if opt.RetryLimit > 0 {
|
|
params.RetryLimit = proto.Int32(opt.RetryLimit)
|
|
}
|
|
if opt.AgeLimit > 0 {
|
|
params.AgeLimitSec = proto.Int64(int64(opt.AgeLimit.Seconds()))
|
|
}
|
|
if opt.MinBackoff > 0 {
|
|
params.MinBackoffSec = proto.Float64(opt.MinBackoff.Seconds())
|
|
}
|
|
if opt.MaxBackoff > 0 {
|
|
params.MaxBackoffSec = proto.Float64(opt.MaxBackoff.Seconds())
|
|
}
|
|
if opt.MaxDoublings > 0 || (opt.MaxDoublings == 0 && opt.ApplyZeroMaxDoublings) {
|
|
params.MaxDoublings = proto.Int32(opt.MaxDoublings)
|
|
}
|
|
return params
|
|
}
|
|
|
|
// A Task represents a task to be executed.
|
|
type Task struct {
|
|
// Path is the worker URL for the task.
|
|
// If unset, it will default to /_ah/queue/<queue_name>.
|
|
Path string
|
|
|
|
// Payload is the data for the task.
|
|
// This will be delivered as the HTTP request body.
|
|
// It is only used when Method is POST, PUT or PULL.
|
|
// url.Values' Encode method may be used to generate this for POST requests.
|
|
Payload []byte
|
|
|
|
// Additional HTTP headers to pass at the task's execution time.
|
|
// To schedule the task to be run with an alternate app version
|
|
// or backend, set the "Host" header.
|
|
Header http.Header
|
|
|
|
// Method is the HTTP method for the task ("GET", "POST", etc.),
|
|
// or "PULL" if this is task is destined for a pull-based queue.
|
|
// If empty, this defaults to "POST".
|
|
Method string
|
|
|
|
// A name for the task.
|
|
// If empty, a name will be chosen.
|
|
Name string
|
|
|
|
// Delay specifies the duration the task queue service must wait
|
|
// before executing the task.
|
|
// Either Delay or ETA may be set, but not both.
|
|
Delay time.Duration
|
|
|
|
// ETA specifies the earliest time a task may be executed (push queues)
|
|
// or leased (pull queues).
|
|
// Either Delay or ETA may be set, but not both.
|
|
ETA time.Time
|
|
|
|
// The number of times the task has been dispatched or leased.
|
|
RetryCount int32
|
|
|
|
// Tag for the task. Only used when Method is PULL.
|
|
Tag string
|
|
|
|
// Retry options for this task. May be nil.
|
|
RetryOptions *RetryOptions
|
|
}
|
|
|
|
func (t *Task) method() string {
|
|
if t.Method == "" {
|
|
return "POST"
|
|
}
|
|
return t.Method
|
|
}
|
|
|
|
// NewPOSTTask creates a Task that will POST to a path with the given form data.
|
|
func NewPOSTTask(path string, params url.Values) *Task {
|
|
h := make(http.Header)
|
|
h.Set("Content-Type", "application/x-www-form-urlencoded")
|
|
return &Task{
|
|
Path: path,
|
|
Payload: []byte(params.Encode()),
|
|
Header: h,
|
|
Method: "POST",
|
|
}
|
|
}
|
|
|
|
var (
|
|
currentNamespace = http.CanonicalHeaderKey("X-AppEngine-Current-Namespace")
|
|
defaultNamespace = http.CanonicalHeaderKey("X-AppEngine-Default-Namespace")
|
|
)
|
|
|
|
func getDefaultNamespace(ctx context.Context) string {
|
|
return internal.IncomingHeaders(ctx).Get(defaultNamespace)
|
|
}
|
|
|
|
func newAddReq(c context.Context, task *Task, queueName string) (*pb.TaskQueueAddRequest, error) {
|
|
if queueName == "" {
|
|
queueName = "default"
|
|
}
|
|
path := task.Path
|
|
if path == "" {
|
|
path = "/_ah/queue/" + queueName
|
|
}
|
|
eta := task.ETA
|
|
if eta.IsZero() {
|
|
eta = time.Now().Add(task.Delay)
|
|
} else if task.Delay != 0 {
|
|
panic("taskqueue: both Delay and ETA are set")
|
|
}
|
|
req := &pb.TaskQueueAddRequest{
|
|
QueueName: []byte(queueName),
|
|
TaskName: []byte(task.Name),
|
|
EtaUsec: proto.Int64(eta.UnixNano() / 1e3),
|
|
}
|
|
method := task.method()
|
|
if method == "PULL" {
|
|
// Pull-based task
|
|
req.Body = task.Payload
|
|
req.Mode = pb.TaskQueueMode_PULL.Enum()
|
|
if task.Tag != "" {
|
|
req.Tag = []byte(task.Tag)
|
|
}
|
|
} else {
|
|
// HTTP-based task
|
|
if v, ok := pb.TaskQueueAddRequest_RequestMethod_value[method]; ok {
|
|
req.Method = pb.TaskQueueAddRequest_RequestMethod(v).Enum()
|
|
} else {
|
|
return nil, fmt.Errorf("taskqueue: bad method %q", method)
|
|
}
|
|
req.Url = []byte(path)
|
|
for k, vs := range task.Header {
|
|
for _, v := range vs {
|
|
req.Header = append(req.Header, &pb.TaskQueueAddRequest_Header{
|
|
Key: []byte(k),
|
|
Value: []byte(v),
|
|
})
|
|
}
|
|
}
|
|
if method == "POST" || method == "PUT" {
|
|
req.Body = task.Payload
|
|
}
|
|
|
|
// Namespace headers.
|
|
if _, ok := task.Header[currentNamespace]; !ok {
|
|
// Fetch the current namespace of this request.
|
|
ns := internal.NamespaceFromContext(c)
|
|
req.Header = append(req.Header, &pb.TaskQueueAddRequest_Header{
|
|
Key: []byte(currentNamespace),
|
|
Value: []byte(ns),
|
|
})
|
|
}
|
|
if _, ok := task.Header[defaultNamespace]; !ok {
|
|
// Fetch the X-AppEngine-Default-Namespace header of this request.
|
|
if ns := getDefaultNamespace(c); ns != "" {
|
|
req.Header = append(req.Header, &pb.TaskQueueAddRequest_Header{
|
|
Key: []byte(defaultNamespace),
|
|
Value: []byte(ns),
|
|
})
|
|
}
|
|
}
|
|
}
|
|
|
|
if task.RetryOptions != nil {
|
|
req.RetryParameters = task.RetryOptions.toRetryParameters()
|
|
}
|
|
|
|
return req, nil
|
|
}
|
|
|
|
var alreadyAddedErrors = map[pb.TaskQueueServiceError_ErrorCode]bool{
|
|
pb.TaskQueueServiceError_TASK_ALREADY_EXISTS: true,
|
|
pb.TaskQueueServiceError_TOMBSTONED_TASK: true,
|
|
}
|
|
|
|
// Add adds the task to a named queue.
|
|
// An empty queue name means that the default queue will be used.
|
|
// Add returns an equivalent Task with defaults filled in, including setting
|
|
// the task's Name field to the chosen name if the original was empty.
|
|
func Add(c context.Context, task *Task, queueName string) (*Task, error) {
|
|
req, err := newAddReq(c, task, queueName)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
res := &pb.TaskQueueAddResponse{}
|
|
if err := internal.Call(c, "taskqueue", "Add", req, res); err != nil {
|
|
apiErr, ok := err.(*internal.APIError)
|
|
if ok && alreadyAddedErrors[pb.TaskQueueServiceError_ErrorCode(apiErr.Code)] {
|
|
return nil, ErrTaskAlreadyAdded
|
|
}
|
|
return nil, err
|
|
}
|
|
resultTask := *task
|
|
resultTask.Method = task.method()
|
|
if task.Name == "" {
|
|
resultTask.Name = string(res.ChosenTaskName)
|
|
}
|
|
return &resultTask, nil
|
|
}
|
|
|
|
// AddMulti adds multiple tasks to a named queue.
|
|
// An empty queue name means that the default queue will be used.
|
|
// AddMulti returns a slice of equivalent tasks with defaults filled in, including setting
|
|
// each task's Name field to the chosen name if the original was empty.
|
|
// If a given task is badly formed or could not be added, an appengine.MultiError is returned.
|
|
func AddMulti(c context.Context, tasks []*Task, queueName string) ([]*Task, error) {
|
|
req := &pb.TaskQueueBulkAddRequest{
|
|
AddRequest: make([]*pb.TaskQueueAddRequest, len(tasks)),
|
|
}
|
|
me, any := make(appengine.MultiError, len(tasks)), false
|
|
for i, t := range tasks {
|
|
req.AddRequest[i], me[i] = newAddReq(c, t, queueName)
|
|
any = any || me[i] != nil
|
|
}
|
|
if any {
|
|
return nil, me
|
|
}
|
|
res := &pb.TaskQueueBulkAddResponse{}
|
|
if err := internal.Call(c, "taskqueue", "BulkAdd", req, res); err != nil {
|
|
return nil, err
|
|
}
|
|
if len(res.Taskresult) != len(tasks) {
|
|
return nil, errors.New("taskqueue: server error")
|
|
}
|
|
tasksOut := make([]*Task, len(tasks))
|
|
for i, tr := range res.Taskresult {
|
|
tasksOut[i] = new(Task)
|
|
*tasksOut[i] = *tasks[i]
|
|
tasksOut[i].Method = tasksOut[i].method()
|
|
if tasksOut[i].Name == "" {
|
|
tasksOut[i].Name = string(tr.ChosenTaskName)
|
|
}
|
|
if *tr.Result != pb.TaskQueueServiceError_OK {
|
|
if alreadyAddedErrors[*tr.Result] {
|
|
me[i] = ErrTaskAlreadyAdded
|
|
} else {
|
|
me[i] = &internal.APIError{
|
|
Service: "taskqueue",
|
|
Code: int32(*tr.Result),
|
|
}
|
|
}
|
|
any = true
|
|
}
|
|
}
|
|
if any {
|
|
return tasksOut, me
|
|
}
|
|
return tasksOut, nil
|
|
}
|
|
|
|
// Delete deletes a task from a named queue.
|
|
func Delete(c context.Context, task *Task, queueName string) error {
|
|
err := DeleteMulti(c, []*Task{task}, queueName)
|
|
if me, ok := err.(appengine.MultiError); ok {
|
|
return me[0]
|
|
}
|
|
return err
|
|
}
|
|
|
|
// DeleteMulti deletes multiple tasks from a named queue.
|
|
// If a given task could not be deleted, an appengine.MultiError is returned.
|
|
func DeleteMulti(c context.Context, tasks []*Task, queueName string) error {
|
|
taskNames := make([][]byte, len(tasks))
|
|
for i, t := range tasks {
|
|
taskNames[i] = []byte(t.Name)
|
|
}
|
|
if queueName == "" {
|
|
queueName = "default"
|
|
}
|
|
req := &pb.TaskQueueDeleteRequest{
|
|
QueueName: []byte(queueName),
|
|
TaskName: taskNames,
|
|
}
|
|
res := &pb.TaskQueueDeleteResponse{}
|
|
if err := internal.Call(c, "taskqueue", "Delete", req, res); err != nil {
|
|
return err
|
|
}
|
|
if a, b := len(req.TaskName), len(res.Result); a != b {
|
|
return fmt.Errorf("taskqueue: internal error: requested deletion of %d tasks, got %d results", a, b)
|
|
}
|
|
me, any := make(appengine.MultiError, len(res.Result)), false
|
|
for i, ec := range res.Result {
|
|
if ec != pb.TaskQueueServiceError_OK {
|
|
me[i] = &internal.APIError{
|
|
Service: "taskqueue",
|
|
Code: int32(ec),
|
|
}
|
|
any = true
|
|
}
|
|
}
|
|
if any {
|
|
return me
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func lease(c context.Context, maxTasks int, queueName string, leaseTime int, groupByTag bool, tag []byte) ([]*Task, error) {
|
|
if queueName == "" {
|
|
queueName = "default"
|
|
}
|
|
req := &pb.TaskQueueQueryAndOwnTasksRequest{
|
|
QueueName: []byte(queueName),
|
|
LeaseSeconds: proto.Float64(float64(leaseTime)),
|
|
MaxTasks: proto.Int64(int64(maxTasks)),
|
|
GroupByTag: proto.Bool(groupByTag),
|
|
Tag: tag,
|
|
}
|
|
res := &pb.TaskQueueQueryAndOwnTasksResponse{}
|
|
if err := internal.Call(c, "taskqueue", "QueryAndOwnTasks", req, res); err != nil {
|
|
return nil, err
|
|
}
|
|
tasks := make([]*Task, len(res.Task))
|
|
for i, t := range res.Task {
|
|
tasks[i] = &Task{
|
|
Payload: t.Body,
|
|
Name: string(t.TaskName),
|
|
Method: "PULL",
|
|
ETA: time.Unix(0, *t.EtaUsec*1e3),
|
|
RetryCount: *t.RetryCount,
|
|
Tag: string(t.Tag),
|
|
}
|
|
}
|
|
return tasks, nil
|
|
}
|
|
|
|
// Lease leases tasks from a queue.
|
|
// leaseTime is in seconds.
|
|
// The number of tasks fetched will be at most maxTasks.
|
|
func Lease(c context.Context, maxTasks int, queueName string, leaseTime int) ([]*Task, error) {
|
|
return lease(c, maxTasks, queueName, leaseTime, false, nil)
|
|
}
|
|
|
|
// LeaseByTag leases tasks from a queue, grouped by tag.
|
|
// If tag is empty, then the returned tasks are grouped by the tag of the task with earliest ETA.
|
|
// leaseTime is in seconds.
|
|
// The number of tasks fetched will be at most maxTasks.
|
|
func LeaseByTag(c context.Context, maxTasks int, queueName string, leaseTime int, tag string) ([]*Task, error) {
|
|
return lease(c, maxTasks, queueName, leaseTime, true, []byte(tag))
|
|
}
|
|
|
|
// Purge removes all tasks from a queue.
|
|
func Purge(c context.Context, queueName string) error {
|
|
if queueName == "" {
|
|
queueName = "default"
|
|
}
|
|
req := &pb.TaskQueuePurgeQueueRequest{
|
|
QueueName: []byte(queueName),
|
|
}
|
|
res := &pb.TaskQueuePurgeQueueResponse{}
|
|
return internal.Call(c, "taskqueue", "PurgeQueue", req, res)
|
|
}
|
|
|
|
// ModifyLease modifies the lease of a task.
|
|
// Used to request more processing time, or to abandon processing.
|
|
// leaseTime is in seconds and must not be negative.
|
|
func ModifyLease(c context.Context, task *Task, queueName string, leaseTime int) error {
|
|
if queueName == "" {
|
|
queueName = "default"
|
|
}
|
|
req := &pb.TaskQueueModifyTaskLeaseRequest{
|
|
QueueName: []byte(queueName),
|
|
TaskName: []byte(task.Name),
|
|
EtaUsec: proto.Int64(task.ETA.UnixNano() / 1e3), // Used to verify ownership.
|
|
LeaseSeconds: proto.Float64(float64(leaseTime)),
|
|
}
|
|
res := &pb.TaskQueueModifyTaskLeaseResponse{}
|
|
if err := internal.Call(c, "taskqueue", "ModifyTaskLease", req, res); err != nil {
|
|
return err
|
|
}
|
|
task.ETA = time.Unix(0, *res.UpdatedEtaUsec*1e3)
|
|
return nil
|
|
}
|
|
|
|
// QueueStatistics represents statistics about a single task queue.
|
|
type QueueStatistics struct {
|
|
Tasks int // may be an approximation
|
|
OldestETA time.Time // zero if there are no pending tasks
|
|
|
|
Executed1Minute int // tasks executed in the last minute
|
|
InFlight int // tasks executing now
|
|
EnforcedRate float64 // requests per second
|
|
}
|
|
|
|
// QueueStats retrieves statistics about queues.
|
|
func QueueStats(c context.Context, queueNames []string) ([]QueueStatistics, error) {
|
|
req := &pb.TaskQueueFetchQueueStatsRequest{
|
|
QueueName: make([][]byte, len(queueNames)),
|
|
}
|
|
for i, q := range queueNames {
|
|
if q == "" {
|
|
q = "default"
|
|
}
|
|
req.QueueName[i] = []byte(q)
|
|
}
|
|
res := &pb.TaskQueueFetchQueueStatsResponse{}
|
|
if err := internal.Call(c, "taskqueue", "FetchQueueStats", req, res); err != nil {
|
|
return nil, err
|
|
}
|
|
qs := make([]QueueStatistics, len(res.Queuestats))
|
|
for i, qsg := range res.Queuestats {
|
|
qs[i] = QueueStatistics{
|
|
Tasks: int(*qsg.NumTasks),
|
|
}
|
|
if eta := *qsg.OldestEtaUsec; eta > -1 {
|
|
qs[i].OldestETA = time.Unix(0, eta*1e3)
|
|
}
|
|
if si := qsg.ScannerInfo; si != nil {
|
|
qs[i].Executed1Minute = int(*si.ExecutedLastMinute)
|
|
qs[i].InFlight = int(si.GetRequestsInFlight())
|
|
qs[i].EnforcedRate = si.GetEnforcedRate()
|
|
}
|
|
}
|
|
return qs, nil
|
|
}
|
|
|
|
func setTransaction(x *pb.TaskQueueAddRequest, t *dspb.Transaction) {
|
|
x.Transaction = t
|
|
}
|
|
|
|
func init() {
|
|
internal.RegisterErrorCodeMap("taskqueue", pb.TaskQueueServiceError_ErrorCode_name)
|
|
|
|
// Datastore error codes are shifted by DATASTORE_ERROR when presented through taskqueue.
|
|
dsCode := int32(pb.TaskQueueServiceError_DATASTORE_ERROR) + int32(dspb.Error_TIMEOUT)
|
|
internal.RegisterTimeoutErrorCode("taskqueue", dsCode)
|
|
|
|
// Transaction registration.
|
|
internal.RegisterTransactionSetter(setTransaction)
|
|
internal.RegisterTransactionSetter(func(x *pb.TaskQueueBulkAddRequest, t *dspb.Transaction) {
|
|
for _, req := range x.AddRequest {
|
|
setTransaction(req, t)
|
|
}
|
|
})
|
|
}
|