// Copyright 2011 Google Inc. All rights reserved. // Use of this source code is governed by the Apache 2.0 // license that can be found in the LICENSE file. /* Package taskqueue provides a client for App Engine's taskqueue service. Using this service, applications may perform work outside a user's request. A Task may be constructed manually; alternatively, since the most common taskqueue operation is to add a single POST task, NewPOSTTask makes it easy. t := taskqueue.NewPOSTTask("/worker", url.Values{ "key": {key}, }) taskqueue.Add(c, t, "") // add t to the default queue */ package taskqueue import ( "errors" "fmt" "net/http" "net/url" "time" "github.com/golang/protobuf/proto" "golang.org/x/net/context" "google.golang.org/appengine" "google.golang.org/appengine/internal" dspb "google.golang.org/appengine/internal/datastore" pb "google.golang.org/appengine/internal/taskqueue" ) var ( // ErrTaskAlreadyAdded is the error returned by Add and AddMulti when a task has already been added with a particular name. ErrTaskAlreadyAdded = errors.New("taskqueue: task has already been added") ) // RetryOptions let you control whether to retry a task and the backoff intervals between tries. type RetryOptions struct { // Number of tries/leases after which the task fails permanently and is deleted. // If AgeLimit is also set, both limits must be exceeded for the task to fail permanently. RetryLimit int32 // Maximum time allowed since the task's first try before the task fails permanently and is deleted (only for push tasks). // If RetryLimit is also set, both limits must be exceeded for the task to fail permanently. AgeLimit time.Duration // Minimum time between successive tries (only for push tasks). MinBackoff time.Duration // Maximum time between successive tries (only for push tasks). MaxBackoff time.Duration // Maximum number of times to double the interval between successive tries before the intervals increase linearly (only for push tasks). MaxDoublings int32 // If MaxDoublings is zero, set ApplyZeroMaxDoublings to true to override the default non-zero value. // Otherwise a zero MaxDoublings is ignored and the default is used. ApplyZeroMaxDoublings bool } // toRetryParameter converts RetryOptions to pb.TaskQueueRetryParameters. func (opt *RetryOptions) toRetryParameters() *pb.TaskQueueRetryParameters { params := &pb.TaskQueueRetryParameters{} if opt.RetryLimit > 0 { params.RetryLimit = proto.Int32(opt.RetryLimit) } if opt.AgeLimit > 0 { params.AgeLimitSec = proto.Int64(int64(opt.AgeLimit.Seconds())) } if opt.MinBackoff > 0 { params.MinBackoffSec = proto.Float64(opt.MinBackoff.Seconds()) } if opt.MaxBackoff > 0 { params.MaxBackoffSec = proto.Float64(opt.MaxBackoff.Seconds()) } if opt.MaxDoublings > 0 || (opt.MaxDoublings == 0 && opt.ApplyZeroMaxDoublings) { params.MaxDoublings = proto.Int32(opt.MaxDoublings) } return params } // A Task represents a task to be executed. type Task struct { // Path is the worker URL for the task. // If unset, it will default to /_ah/queue/. Path string // Payload is the data for the task. // This will be delivered as the HTTP request body. // It is only used when Method is POST, PUT or PULL. // url.Values' Encode method may be used to generate this for POST requests. Payload []byte // Additional HTTP headers to pass at the task's execution time. // To schedule the task to be run with an alternate app version // or backend, set the "Host" header. Header http.Header // Method is the HTTP method for the task ("GET", "POST", etc.), // or "PULL" if this is task is destined for a pull-based queue. // If empty, this defaults to "POST". Method string // A name for the task. // If empty, a name will be chosen. Name string // Delay specifies the duration the task queue service must wait // before executing the task. // Either Delay or ETA may be set, but not both. Delay time.Duration // ETA specifies the earliest time a task may be executed (push queues) // or leased (pull queues). // Either Delay or ETA may be set, but not both. ETA time.Time // The number of times the task has been dispatched or leased. RetryCount int32 // Tag for the task. Only used when Method is PULL. Tag string // Retry options for this task. May be nil. RetryOptions *RetryOptions } func (t *Task) method() string { if t.Method == "" { return "POST" } return t.Method } // NewPOSTTask creates a Task that will POST to a path with the given form data. func NewPOSTTask(path string, params url.Values) *Task { h := make(http.Header) h.Set("Content-Type", "application/x-www-form-urlencoded") return &Task{ Path: path, Payload: []byte(params.Encode()), Header: h, Method: "POST", } } var ( currentNamespace = http.CanonicalHeaderKey("X-AppEngine-Current-Namespace") defaultNamespace = http.CanonicalHeaderKey("X-AppEngine-Default-Namespace") ) func getDefaultNamespace(ctx context.Context) string { return internal.IncomingHeaders(ctx).Get(defaultNamespace) } func newAddReq(c context.Context, task *Task, queueName string) (*pb.TaskQueueAddRequest, error) { if queueName == "" { queueName = "default" } path := task.Path if path == "" { path = "/_ah/queue/" + queueName } eta := task.ETA if eta.IsZero() { eta = time.Now().Add(task.Delay) } else if task.Delay != 0 { panic("taskqueue: both Delay and ETA are set") } req := &pb.TaskQueueAddRequest{ QueueName: []byte(queueName), TaskName: []byte(task.Name), EtaUsec: proto.Int64(eta.UnixNano() / 1e3), } method := task.method() if method == "PULL" { // Pull-based task req.Body = task.Payload req.Mode = pb.TaskQueueMode_PULL.Enum() if task.Tag != "" { req.Tag = []byte(task.Tag) } } else { // HTTP-based task if v, ok := pb.TaskQueueAddRequest_RequestMethod_value[method]; ok { req.Method = pb.TaskQueueAddRequest_RequestMethod(v).Enum() } else { return nil, fmt.Errorf("taskqueue: bad method %q", method) } req.Url = []byte(path) for k, vs := range task.Header { for _, v := range vs { req.Header = append(req.Header, &pb.TaskQueueAddRequest_Header{ Key: []byte(k), Value: []byte(v), }) } } if method == "POST" || method == "PUT" { req.Body = task.Payload } // Namespace headers. if _, ok := task.Header[currentNamespace]; !ok { // Fetch the current namespace of this request. ns := internal.NamespaceFromContext(c) req.Header = append(req.Header, &pb.TaskQueueAddRequest_Header{ Key: []byte(currentNamespace), Value: []byte(ns), }) } if _, ok := task.Header[defaultNamespace]; !ok { // Fetch the X-AppEngine-Default-Namespace header of this request. if ns := getDefaultNamespace(c); ns != "" { req.Header = append(req.Header, &pb.TaskQueueAddRequest_Header{ Key: []byte(defaultNamespace), Value: []byte(ns), }) } } } if task.RetryOptions != nil { req.RetryParameters = task.RetryOptions.toRetryParameters() } return req, nil } var alreadyAddedErrors = map[pb.TaskQueueServiceError_ErrorCode]bool{ pb.TaskQueueServiceError_TASK_ALREADY_EXISTS: true, pb.TaskQueueServiceError_TOMBSTONED_TASK: true, } // Add adds the task to a named queue. // An empty queue name means that the default queue will be used. // Add returns an equivalent Task with defaults filled in, including setting // the task's Name field to the chosen name if the original was empty. func Add(c context.Context, task *Task, queueName string) (*Task, error) { req, err := newAddReq(c, task, queueName) if err != nil { return nil, err } res := &pb.TaskQueueAddResponse{} if err := internal.Call(c, "taskqueue", "Add", req, res); err != nil { apiErr, ok := err.(*internal.APIError) if ok && alreadyAddedErrors[pb.TaskQueueServiceError_ErrorCode(apiErr.Code)] { return nil, ErrTaskAlreadyAdded } return nil, err } resultTask := *task resultTask.Method = task.method() if task.Name == "" { resultTask.Name = string(res.ChosenTaskName) } return &resultTask, nil } // AddMulti adds multiple tasks to a named queue. // An empty queue name means that the default queue will be used. // AddMulti returns a slice of equivalent tasks with defaults filled in, including setting // each task's Name field to the chosen name if the original was empty. // If a given task is badly formed or could not be added, an appengine.MultiError is returned. func AddMulti(c context.Context, tasks []*Task, queueName string) ([]*Task, error) { req := &pb.TaskQueueBulkAddRequest{ AddRequest: make([]*pb.TaskQueueAddRequest, len(tasks)), } me, any := make(appengine.MultiError, len(tasks)), false for i, t := range tasks { req.AddRequest[i], me[i] = newAddReq(c, t, queueName) any = any || me[i] != nil } if any { return nil, me } res := &pb.TaskQueueBulkAddResponse{} if err := internal.Call(c, "taskqueue", "BulkAdd", req, res); err != nil { return nil, err } if len(res.Taskresult) != len(tasks) { return nil, errors.New("taskqueue: server error") } tasksOut := make([]*Task, len(tasks)) for i, tr := range res.Taskresult { tasksOut[i] = new(Task) *tasksOut[i] = *tasks[i] tasksOut[i].Method = tasksOut[i].method() if tasksOut[i].Name == "" { tasksOut[i].Name = string(tr.ChosenTaskName) } if *tr.Result != pb.TaskQueueServiceError_OK { if alreadyAddedErrors[*tr.Result] { me[i] = ErrTaskAlreadyAdded } else { me[i] = &internal.APIError{ Service: "taskqueue", Code: int32(*tr.Result), } } any = true } } if any { return tasksOut, me } return tasksOut, nil } // Delete deletes a task from a named queue. func Delete(c context.Context, task *Task, queueName string) error { err := DeleteMulti(c, []*Task{task}, queueName) if me, ok := err.(appengine.MultiError); ok { return me[0] } return err } // DeleteMulti deletes multiple tasks from a named queue. // If a given task could not be deleted, an appengine.MultiError is returned. func DeleteMulti(c context.Context, tasks []*Task, queueName string) error { taskNames := make([][]byte, len(tasks)) for i, t := range tasks { taskNames[i] = []byte(t.Name) } if queueName == "" { queueName = "default" } req := &pb.TaskQueueDeleteRequest{ QueueName: []byte(queueName), TaskName: taskNames, } res := &pb.TaskQueueDeleteResponse{} if err := internal.Call(c, "taskqueue", "Delete", req, res); err != nil { return err } if a, b := len(req.TaskName), len(res.Result); a != b { return fmt.Errorf("taskqueue: internal error: requested deletion of %d tasks, got %d results", a, b) } me, any := make(appengine.MultiError, len(res.Result)), false for i, ec := range res.Result { if ec != pb.TaskQueueServiceError_OK { me[i] = &internal.APIError{ Service: "taskqueue", Code: int32(ec), } any = true } } if any { return me } return nil } func lease(c context.Context, maxTasks int, queueName string, leaseTime int, groupByTag bool, tag []byte) ([]*Task, error) { if queueName == "" { queueName = "default" } req := &pb.TaskQueueQueryAndOwnTasksRequest{ QueueName: []byte(queueName), LeaseSeconds: proto.Float64(float64(leaseTime)), MaxTasks: proto.Int64(int64(maxTasks)), GroupByTag: proto.Bool(groupByTag), Tag: tag, } res := &pb.TaskQueueQueryAndOwnTasksResponse{} if err := internal.Call(c, "taskqueue", "QueryAndOwnTasks", req, res); err != nil { return nil, err } tasks := make([]*Task, len(res.Task)) for i, t := range res.Task { tasks[i] = &Task{ Payload: t.Body, Name: string(t.TaskName), Method: "PULL", ETA: time.Unix(0, *t.EtaUsec*1e3), RetryCount: *t.RetryCount, Tag: string(t.Tag), } } return tasks, nil } // Lease leases tasks from a queue. // leaseTime is in seconds. // The number of tasks fetched will be at most maxTasks. func Lease(c context.Context, maxTasks int, queueName string, leaseTime int) ([]*Task, error) { return lease(c, maxTasks, queueName, leaseTime, false, nil) } // LeaseByTag leases tasks from a queue, grouped by tag. // If tag is empty, then the returned tasks are grouped by the tag of the task with earliest ETA. // leaseTime is in seconds. // The number of tasks fetched will be at most maxTasks. func LeaseByTag(c context.Context, maxTasks int, queueName string, leaseTime int, tag string) ([]*Task, error) { return lease(c, maxTasks, queueName, leaseTime, true, []byte(tag)) } // Purge removes all tasks from a queue. func Purge(c context.Context, queueName string) error { if queueName == "" { queueName = "default" } req := &pb.TaskQueuePurgeQueueRequest{ QueueName: []byte(queueName), } res := &pb.TaskQueuePurgeQueueResponse{} return internal.Call(c, "taskqueue", "PurgeQueue", req, res) } // ModifyLease modifies the lease of a task. // Used to request more processing time, or to abandon processing. // leaseTime is in seconds and must not be negative. func ModifyLease(c context.Context, task *Task, queueName string, leaseTime int) error { if queueName == "" { queueName = "default" } req := &pb.TaskQueueModifyTaskLeaseRequest{ QueueName: []byte(queueName), TaskName: []byte(task.Name), EtaUsec: proto.Int64(task.ETA.UnixNano() / 1e3), // Used to verify ownership. LeaseSeconds: proto.Float64(float64(leaseTime)), } res := &pb.TaskQueueModifyTaskLeaseResponse{} if err := internal.Call(c, "taskqueue", "ModifyTaskLease", req, res); err != nil { return err } task.ETA = time.Unix(0, *res.UpdatedEtaUsec*1e3) return nil } // QueueStatistics represents statistics about a single task queue. type QueueStatistics struct { Tasks int // may be an approximation OldestETA time.Time // zero if there are no pending tasks Executed1Minute int // tasks executed in the last minute InFlight int // tasks executing now EnforcedRate float64 // requests per second } // QueueStats retrieves statistics about queues. func QueueStats(c context.Context, queueNames []string) ([]QueueStatistics, error) { req := &pb.TaskQueueFetchQueueStatsRequest{ QueueName: make([][]byte, len(queueNames)), } for i, q := range queueNames { if q == "" { q = "default" } req.QueueName[i] = []byte(q) } res := &pb.TaskQueueFetchQueueStatsResponse{} if err := internal.Call(c, "taskqueue", "FetchQueueStats", req, res); err != nil { return nil, err } qs := make([]QueueStatistics, len(res.Queuestats)) for i, qsg := range res.Queuestats { qs[i] = QueueStatistics{ Tasks: int(*qsg.NumTasks), } if eta := *qsg.OldestEtaUsec; eta > -1 { qs[i].OldestETA = time.Unix(0, eta*1e3) } if si := qsg.ScannerInfo; si != nil { qs[i].Executed1Minute = int(*si.ExecutedLastMinute) qs[i].InFlight = int(si.GetRequestsInFlight()) qs[i].EnforcedRate = si.GetEnforcedRate() } } return qs, nil } func setTransaction(x *pb.TaskQueueAddRequest, t *dspb.Transaction) { x.Transaction = t } func init() { internal.RegisterErrorCodeMap("taskqueue", pb.TaskQueueServiceError_ErrorCode_name) // Datastore error codes are shifted by DATASTORE_ERROR when presented through taskqueue. dsCode := int32(pb.TaskQueueServiceError_DATASTORE_ERROR) + int32(dspb.Error_TIMEOUT) internal.RegisterTimeoutErrorCode("taskqueue", dsCode) // Transaction registration. internal.RegisterTransactionSetter(setTransaction) internal.RegisterTransactionSetter(func(x *pb.TaskQueueBulkAddRequest, t *dspb.Transaction) { for _, req := range x.AddRequest { setTransaction(req, t) } }) }