From b52243cdc29554e94b8cb2783af03a23e23e747b Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Tue, 23 Jul 2013 15:32:56 +0200 Subject: [PATCH] Added cluster pending tasks api. Closes #3368 --- .../elasticsearch/action/ActionModule.java | 3 + .../tasks/PendingClusterTasksAction.java | 45 +++++++ .../tasks/PendingClusterTasksRequest.java | 33 +++++ .../PendingClusterTasksRequestBuilder.java | 39 ++++++ .../tasks/PendingClusterTasksResponse.java | 82 ++++++++++++ .../TransportPendingClusterTasksAction.java | 69 ++++++++++ .../client/ClusterAdminClient.java | 21 +++ .../support/AbstractClusterAdminClient.java | 17 +++ .../elasticsearch/cluster/ClusterService.java | 8 ++ .../service/InternalClusterService.java | 25 ++++ .../cluster/service/PendingClusterTask.java | 101 +++++++++++++++ .../rest/action/RestActionModule.java | 2 + .../tasks/RestPendingClusterTasksAction.java | 97 ++++++++++++++ .../cluster/ClusterServiceTests.java | 122 ++++++++++++++++++ 14 files changed, 664 insertions(+) create mode 100644 src/main/java/org/elasticsearch/action/admin/cluster/tasks/PendingClusterTasksAction.java create mode 100644 src/main/java/org/elasticsearch/action/admin/cluster/tasks/PendingClusterTasksRequest.java create mode 100644 src/main/java/org/elasticsearch/action/admin/cluster/tasks/PendingClusterTasksRequestBuilder.java create mode 100644 src/main/java/org/elasticsearch/action/admin/cluster/tasks/PendingClusterTasksResponse.java create mode 100644 src/main/java/org/elasticsearch/action/admin/cluster/tasks/TransportPendingClusterTasksAction.java create mode 100644 src/main/java/org/elasticsearch/cluster/service/PendingClusterTask.java create mode 100644 src/main/java/org/elasticsearch/rest/action/admin/cluster/tasks/RestPendingClusterTasksAction.java diff --git a/src/main/java/org/elasticsearch/action/ActionModule.java b/src/main/java/org/elasticsearch/action/ActionModule.java index 93e7c333dd1..ff80dddebf4 100644 --- a/src/main/java/org/elasticsearch/action/ActionModule.java +++ b/src/main/java/org/elasticsearch/action/ActionModule.java @@ -40,6 +40,8 @@ import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsAction; import org.elasticsearch.action.admin.cluster.shards.TransportClusterSearchShardsAction; import org.elasticsearch.action.admin.cluster.state.ClusterStateAction; import org.elasticsearch.action.admin.cluster.state.TransportClusterStateAction; +import org.elasticsearch.action.admin.cluster.tasks.PendingClusterTasksAction; +import org.elasticsearch.action.admin.cluster.tasks.TransportPendingClusterTasksAction; import org.elasticsearch.action.admin.indices.alias.IndicesAliasesAction; import org.elasticsearch.action.admin.indices.alias.TransportIndicesAliasesAction; import org.elasticsearch.action.admin.indices.alias.exists.AliasesExistAction; @@ -186,6 +188,7 @@ public class ActionModule extends AbstractModule { registerAction(ClusterUpdateSettingsAction.INSTANCE, TransportClusterUpdateSettingsAction.class); registerAction(ClusterRerouteAction.INSTANCE, TransportClusterRerouteAction.class); registerAction(ClusterSearchShardsAction.INSTANCE, TransportClusterSearchShardsAction.class); + registerAction(PendingClusterTasksAction.INSTANCE, TransportPendingClusterTasksAction.class); registerAction(IndicesStatsAction.INSTANCE, TransportIndicesStatsAction.class); registerAction(IndicesStatusAction.INSTANCE, TransportIndicesStatusAction.class); diff --git a/src/main/java/org/elasticsearch/action/admin/cluster/tasks/PendingClusterTasksAction.java b/src/main/java/org/elasticsearch/action/admin/cluster/tasks/PendingClusterTasksAction.java new file mode 100644 index 00000000000..8d4f3f450ed --- /dev/null +++ b/src/main/java/org/elasticsearch/action/admin/cluster/tasks/PendingClusterTasksAction.java @@ -0,0 +1,45 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.admin.cluster.tasks; + +import org.elasticsearch.action.admin.cluster.ClusterAction; +import org.elasticsearch.client.ClusterAdminClient; + +/** + */ +public class PendingClusterTasksAction extends ClusterAction { + + public static final PendingClusterTasksAction INSTANCE = new PendingClusterTasksAction(); + public static final String NAME = "cluster/task"; + + private PendingClusterTasksAction() { + super(NAME); + } + + @Override + public PendingClusterTasksResponse newResponse() { + return new PendingClusterTasksResponse(); + } + + @Override + public PendingClusterTasksRequestBuilder newRequestBuilder(ClusterAdminClient client) { + return new PendingClusterTasksRequestBuilder(client); + } +} diff --git a/src/main/java/org/elasticsearch/action/admin/cluster/tasks/PendingClusterTasksRequest.java b/src/main/java/org/elasticsearch/action/admin/cluster/tasks/PendingClusterTasksRequest.java new file mode 100644 index 00000000000..2e092d0a5e3 --- /dev/null +++ b/src/main/java/org/elasticsearch/action/admin/cluster/tasks/PendingClusterTasksRequest.java @@ -0,0 +1,33 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.admin.cluster.tasks; + +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.support.master.MasterNodeOperationRequest; + +/** + */ +public class PendingClusterTasksRequest extends MasterNodeOperationRequest { + + @Override + public ActionRequestValidationException validate() { + return null; + } +} diff --git a/src/main/java/org/elasticsearch/action/admin/cluster/tasks/PendingClusterTasksRequestBuilder.java b/src/main/java/org/elasticsearch/action/admin/cluster/tasks/PendingClusterTasksRequestBuilder.java new file mode 100644 index 00000000000..14519d00079 --- /dev/null +++ b/src/main/java/org/elasticsearch/action/admin/cluster/tasks/PendingClusterTasksRequestBuilder.java @@ -0,0 +1,39 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.admin.cluster.tasks; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder; +import org.elasticsearch.client.ClusterAdminClient; +import org.elasticsearch.client.internal.InternalClusterAdminClient; + +/** + */ +public class PendingClusterTasksRequestBuilder extends MasterNodeOperationRequestBuilder { + + public PendingClusterTasksRequestBuilder(ClusterAdminClient client) { + super((InternalClusterAdminClient) client, new PendingClusterTasksRequest()); + } + + @Override + protected void doExecute(ActionListener listener) { + ((InternalClusterAdminClient) client).pendingClusterTasks(request, listener); + } +} diff --git a/src/main/java/org/elasticsearch/action/admin/cluster/tasks/PendingClusterTasksResponse.java b/src/main/java/org/elasticsearch/action/admin/cluster/tasks/PendingClusterTasksResponse.java new file mode 100644 index 00000000000..d250715f3fd --- /dev/null +++ b/src/main/java/org/elasticsearch/action/admin/cluster/tasks/PendingClusterTasksResponse.java @@ -0,0 +1,82 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.admin.cluster.tasks; + +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.cluster.service.PendingClusterTask; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +/** + */ +public class PendingClusterTasksResponse extends ActionResponse implements Iterable { + + private List pendingTasks; + + PendingClusterTasksResponse() { + } + + PendingClusterTasksResponse(List pendingTasks) { + this.pendingTasks = pendingTasks; + } + + public List pendingTasks() { + return pendingTasks; + } + + /** + * The pending cluster tasks + */ + public List getPendingTasks() { + return pendingTasks(); + } + + @Override + public Iterator iterator() { + return pendingTasks.iterator(); + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + int size = in.readVInt(); + pendingTasks = new ArrayList(size); + for (int i = 0; i < size; i++) { + PendingClusterTask task = new PendingClusterTask(); + task.readFrom(in); + pendingTasks.add(task); + } + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeVInt(pendingTasks.size()); + for (PendingClusterTask task : pendingTasks) { + task.writeTo(out); + } + } + +} diff --git a/src/main/java/org/elasticsearch/action/admin/cluster/tasks/TransportPendingClusterTasksAction.java b/src/main/java/org/elasticsearch/action/admin/cluster/tasks/TransportPendingClusterTasksAction.java new file mode 100644 index 00000000000..4333e8ccc18 --- /dev/null +++ b/src/main/java/org/elasticsearch/action/admin/cluster/tasks/TransportPendingClusterTasksAction.java @@ -0,0 +1,69 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.admin.cluster.tasks; + +import org.elasticsearch.ElasticSearchException; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.master.TransportMasterNodeOperationAction; +import org.elasticsearch.cluster.ClusterService; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; + +/** + */ +public class TransportPendingClusterTasksAction extends TransportMasterNodeOperationAction { + + private final ClusterService clusterService; + + @Inject + public TransportPendingClusterTasksAction(Settings settings, TransportService transportService, ClusterService clusterService, ThreadPool threadPool) { + super(settings, transportService, clusterService, threadPool); + this.clusterService = clusterService; + } + + @Override + protected String transportAction() { + return PendingClusterTasksAction.NAME; + } + + @Override + protected String executor() { + // very lightweight operation in memory, no need to fork to a thread + return ThreadPool.Names.SAME; + } + + @Override + protected PendingClusterTasksRequest newRequest() { + return new PendingClusterTasksRequest(); + } + + @Override + protected PendingClusterTasksResponse newResponse() { + return new PendingClusterTasksResponse(); + } + + @Override + protected void masterOperation(PendingClusterTasksRequest request, ClusterState state, ActionListener listener) throws ElasticSearchException { + listener.onResponse(new PendingClusterTasksResponse(clusterService.pendingTasks())); + } +} diff --git a/src/main/java/org/elasticsearch/client/ClusterAdminClient.java b/src/main/java/org/elasticsearch/client/ClusterAdminClient.java index f55d678c36c..7304c8ce6f2 100644 --- a/src/main/java/org/elasticsearch/client/ClusterAdminClient.java +++ b/src/main/java/org/elasticsearch/client/ClusterAdminClient.java @@ -51,6 +51,9 @@ import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; import org.elasticsearch.action.admin.cluster.state.ClusterStateRequestBuilder; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; +import org.elasticsearch.action.admin.cluster.tasks.PendingClusterTasksRequest; +import org.elasticsearch.action.admin.cluster.tasks.PendingClusterTasksRequestBuilder; +import org.elasticsearch.action.admin.cluster.tasks.PendingClusterTasksResponse; /** * Administrative actions/operations against indices. @@ -259,4 +262,22 @@ public interface ClusterAdminClient { */ ClusterSearchShardsRequestBuilder prepareSearchShards(String... indices); + /** + * Returns a list of the pending cluster tasks, that are scheduled to be executed. This includes operations + * that update the cluster state (for example, a create index operation) + */ + void pendingClusterTasks(PendingClusterTasksRequest request, ActionListener listener); + + /** + * Returns a list of the pending cluster tasks, that are scheduled to be executed. This includes operations + * that update the cluster state (for example, a create index operation) + */ + ActionFuture pendingClusterTasks(PendingClusterTasksRequest request); + + /** + * Returns a list of the pending cluster tasks, that are scheduled to be executed. This includes operations + * that update the cluster state (for example, a create index operation) + */ + PendingClusterTasksRequestBuilder preparePendingClusterTasks(); + } diff --git a/src/main/java/org/elasticsearch/client/support/AbstractClusterAdminClient.java b/src/main/java/org/elasticsearch/client/support/AbstractClusterAdminClient.java index df491d2fd33..ff8373a3810 100644 --- a/src/main/java/org/elasticsearch/client/support/AbstractClusterAdminClient.java +++ b/src/main/java/org/elasticsearch/client/support/AbstractClusterAdminClient.java @@ -61,6 +61,10 @@ import org.elasticsearch.action.admin.cluster.state.ClusterStateAction; import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; import org.elasticsearch.action.admin.cluster.state.ClusterStateRequestBuilder; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; +import org.elasticsearch.action.admin.cluster.tasks.PendingClusterTasksAction; +import org.elasticsearch.action.admin.cluster.tasks.PendingClusterTasksRequest; +import org.elasticsearch.action.admin.cluster.tasks.PendingClusterTasksRequestBuilder; +import org.elasticsearch.action.admin.cluster.tasks.PendingClusterTasksResponse; import org.elasticsearch.client.internal.InternalClusterAdminClient; /** @@ -228,5 +232,18 @@ public abstract class AbstractClusterAdminClient implements InternalClusterAdmin return new ClusterSearchShardsRequestBuilder(this).setIndices(indices); } + @Override + public PendingClusterTasksRequestBuilder preparePendingClusterTasks() { + return new PendingClusterTasksRequestBuilder(this); + } + @Override + public ActionFuture pendingClusterTasks(PendingClusterTasksRequest request) { + return execute(PendingClusterTasksAction.INSTANCE, request); + } + + @Override + public void pendingClusterTasks(PendingClusterTasksRequest request, ActionListener listener) { + execute(PendingClusterTasksAction.INSTANCE, request, listener); + } } diff --git a/src/main/java/org/elasticsearch/cluster/ClusterService.java b/src/main/java/org/elasticsearch/cluster/ClusterService.java index af9c09ae8f8..045beaf72b1 100644 --- a/src/main/java/org/elasticsearch/cluster/ClusterService.java +++ b/src/main/java/org/elasticsearch/cluster/ClusterService.java @@ -23,10 +23,13 @@ import org.elasticsearch.ElasticSearchIllegalStateException; import org.elasticsearch.cluster.block.ClusterBlock; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.operation.OperationRouting; +import org.elasticsearch.cluster.service.PendingClusterTask; import org.elasticsearch.common.Priority; import org.elasticsearch.common.component.LifecycleComponent; import org.elasticsearch.common.unit.TimeValue; +import java.util.List; + /** * The cluster service allowing to both register for cluster state events ({@link ClusterStateListener}) * and submit state update tasks ({@link ClusterStateUpdateTask}. @@ -97,4 +100,9 @@ public interface ClusterService extends LifecycleComponent { * Submits a task that will update the cluster state (the task has a default priority of {@link Priority#NORMAL}). */ void submitStateUpdateTask(final String source, final ClusterStateUpdateTask updateTask); + + /** + * Returns the tasks that are pending. + */ + List pendingTasks(); } diff --git a/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java b/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java index a83819a7b63..40e447a4dd3 100644 --- a/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java +++ b/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java @@ -34,6 +34,7 @@ import org.elasticsearch.common.Priority; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.text.StringText; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.EsExecutors; @@ -45,6 +46,7 @@ import org.elasticsearch.node.settings.NodeSettingsService; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; +import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.Queue; @@ -236,10 +238,33 @@ public class InternalClusterService extends AbstractLifecycleComponent pendingTasks() { + long now = System.currentTimeMillis(); + PrioritizedEsThreadPoolExecutor.Pending[] pendings = updateTasksExecutor.getPending(); + List pendingClusterTasks = new ArrayList(pendings.length); + for (PrioritizedEsThreadPoolExecutor.Pending pending : pendings) { + final String source; + final long timeInQueue; + if (pending.task instanceof UpdateTask) { + UpdateTask updateTask = (UpdateTask) pending.task; + source = updateTask.source; + timeInQueue = now - updateTask.addedAt; + } else { + source = "unknown"; + timeInQueue = -1; + } + + pendingClusterTasks.add(new PendingClusterTask(pending.insertionOrder, pending.priority, new StringText(source), timeInQueue)); + } + return pendingClusterTasks; + } + class UpdateTask extends PrioritizedRunnable { public final String source; public final ClusterStateUpdateTask updateTask; + public final long addedAt = System.currentTimeMillis(); UpdateTask(String source, Priority priority, ClusterStateUpdateTask updateTask) { super(priority); diff --git a/src/main/java/org/elasticsearch/cluster/service/PendingClusterTask.java b/src/main/java/org/elasticsearch/cluster/service/PendingClusterTask.java new file mode 100644 index 00000000000..d89ecc2d683 --- /dev/null +++ b/src/main/java/org/elasticsearch/cluster/service/PendingClusterTask.java @@ -0,0 +1,101 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.cluster.service; + +import org.elasticsearch.common.Priority; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Streamable; +import org.elasticsearch.common.text.Text; +import org.elasticsearch.common.unit.TimeValue; + +import java.io.IOException; + +/** + */ +public class PendingClusterTask implements Streamable { + + private long insertOrder; + private Priority priority; + private Text source; + private long timeInQueue; + + public PendingClusterTask() { + } + + public PendingClusterTask(long insertOrder, Priority priority, Text source, long timeInQueue) { + this.insertOrder = insertOrder; + this.priority = priority; + this.source = source; + this.timeInQueue = timeInQueue; + } + + public long insertOrder() { + return insertOrder; + } + + public long getInsertOrder() { + return insertOrder(); + } + + public Priority priority() { + return priority; + } + + public Priority getPriority() { + return priority(); + } + + public Text source() { + return source; + } + + public Text getSource() { + return source(); + } + + public long timeInQueueInMillis() { + return timeInQueue; + } + + public long getTimeInQueueInMillis() { + return timeInQueueInMillis(); + } + + public TimeValue getTimeInQueue() { + return new TimeValue(getTimeInQueueInMillis()); + } + + @Override + public void readFrom(StreamInput in) throws IOException { + insertOrder = in.readVLong(); + priority = Priority.fromByte(in.readByte()); + source = in.readText(); + timeInQueue = in.readVLong(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeVLong(insertOrder); + out.writeByte(priority.value()); + out.writeText(source); + out.writeVLong(timeInQueue); + } +} \ No newline at end of file diff --git a/src/main/java/org/elasticsearch/rest/action/RestActionModule.java b/src/main/java/org/elasticsearch/rest/action/RestActionModule.java index 5e9cfe6b1ee..dcb9f9b4bef 100644 --- a/src/main/java/org/elasticsearch/rest/action/RestActionModule.java +++ b/src/main/java/org/elasticsearch/rest/action/RestActionModule.java @@ -33,6 +33,7 @@ import org.elasticsearch.rest.action.admin.cluster.settings.RestClusterGetSettin import org.elasticsearch.rest.action.admin.cluster.settings.RestClusterUpdateSettingsAction; import org.elasticsearch.rest.action.admin.cluster.shards.RestClusterSearchShardsAction; import org.elasticsearch.rest.action.admin.cluster.state.RestClusterStateAction; +import org.elasticsearch.rest.action.admin.cluster.tasks.RestPendingClusterTasksAction; import org.elasticsearch.rest.action.admin.indices.alias.RestGetIndicesAliasesAction; import org.elasticsearch.rest.action.admin.indices.alias.RestIndicesAliasesAction; import org.elasticsearch.rest.action.admin.indices.alias.delete.RestIndexDeleteAliasesAction; @@ -121,6 +122,7 @@ public class RestActionModule extends AbstractModule { bind(RestClusterGetSettingsAction.class).asEagerSingleton(); bind(RestClusterRerouteAction.class).asEagerSingleton(); bind(RestClusterSearchShardsAction.class).asEagerSingleton(); + bind(RestPendingClusterTasksAction.class).asEagerSingleton(); bind(RestIndicesExistsAction.class).asEagerSingleton(); bind(RestTypesExistsAction.class).asEagerSingleton(); diff --git a/src/main/java/org/elasticsearch/rest/action/admin/cluster/tasks/RestPendingClusterTasksAction.java b/src/main/java/org/elasticsearch/rest/action/admin/cluster/tasks/RestPendingClusterTasksAction.java new file mode 100644 index 00000000000..4f4f574e5ab --- /dev/null +++ b/src/main/java/org/elasticsearch/rest/action/admin/cluster/tasks/RestPendingClusterTasksAction.java @@ -0,0 +1,97 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.rest.action.admin.cluster.tasks; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.cluster.tasks.PendingClusterTasksRequest; +import org.elasticsearch.action.admin.cluster.tasks.PendingClusterTasksResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.service.PendingClusterTask; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentBuilderString; +import org.elasticsearch.rest.*; +import org.elasticsearch.rest.action.support.RestXContentBuilder; + +import java.io.IOException; + +/** + */ +public class RestPendingClusterTasksAction extends BaseRestHandler { + + @Inject + public RestPendingClusterTasksAction(Settings settings, Client client, RestController controller) { + super(settings, client); + controller.registerHandler(RestRequest.Method.GET, "/_cluster/pending_tasks", this); + } + + @Override + public void handleRequest(final RestRequest request, final RestChannel channel) { + PendingClusterTasksRequest pendingClusterTasksRequest = new PendingClusterTasksRequest(); + client.admin().cluster().pendingClusterTasks(pendingClusterTasksRequest, new ActionListener() { + + @Override + public void onResponse(PendingClusterTasksResponse response) { + try { + XContentBuilder builder = RestXContentBuilder.restContentBuilder(request); + builder.startObject(); + builder.startArray(Fields.TASKS); + for (PendingClusterTask pendingClusterTask : response) { + builder.startObject(); + builder.field(Fields.INSERT_ORDER, pendingClusterTask.insertOrder()); + builder.field(Fields.PRIORITY, pendingClusterTask.priority()); + builder.field(Fields.SOURCE, pendingClusterTask.source()); + builder.field(Fields.TIME_IN_QUEUE_MILLIS, pendingClusterTask.timeInQueueInMillis()); + builder.field(Fields.TIME_IN_QUEUE, pendingClusterTask.getTimeInQueue()); + builder.endObject(); + } + builder.endArray(); + channel.sendResponse(new XContentRestResponse(request, RestStatus.OK, builder)); + } catch (Throwable e) { + onFailure(e); + } + } + + @Override + public void onFailure(Throwable e) { + if (logger.isDebugEnabled()) { + logger.debug("failed to get pending cluster tasks", e); + } + try { + channel.sendResponse(new XContentThrowableRestResponse(request, e)); + } catch (IOException e1) { + logger.error("Failed to send failure response", e1); + } + } + }); + } + + static final class Fields { + + static final XContentBuilderString TASKS = new XContentBuilderString("tasks"); + static final XContentBuilderString INSERT_ORDER = new XContentBuilderString("insert_order"); + static final XContentBuilderString PRIORITY = new XContentBuilderString("proirity"); + static final XContentBuilderString SOURCE = new XContentBuilderString("source"); + static final XContentBuilderString TIME_IN_QUEUE_MILLIS = new XContentBuilderString("time_in_queue_millis"); + static final XContentBuilderString TIME_IN_QUEUE = new XContentBuilderString("time_in_queue"); + + } +} diff --git a/src/test/java/org/elasticsearch/test/integration/cluster/ClusterServiceTests.java b/src/test/java/org/elasticsearch/test/integration/cluster/ClusterServiceTests.java index 31185c2c780..a1badc79b5f 100644 --- a/src/test/java/org/elasticsearch/test/integration/cluster/ClusterServiceTests.java +++ b/src/test/java/org/elasticsearch/test/integration/cluster/ClusterServiceTests.java @@ -21,7 +21,10 @@ package org.elasticsearch.test.integration.cluster; import org.elasticsearch.ElasticSearchException; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; +import org.elasticsearch.action.admin.cluster.tasks.PendingClusterTasksResponse; +import org.elasticsearch.client.Client; import org.elasticsearch.cluster.*; +import org.elasticsearch.cluster.service.PendingClusterTask; import org.elasticsearch.common.Priority; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.component.LifecycleComponent; @@ -114,6 +117,125 @@ public class ClusterServiceTests extends AbstractZenNodesTests { assertThat(executeCalled.get(), equalTo(false)); } + @Test + public void testPendingUpdateTask() throws Exception { + InternalNode node1 = (InternalNode) startNode("node1"); + Client client = startNode("client-node", settingsBuilder().put("node.client", true).build()).client(); + + ClusterService clusterService = node1.injector().getInstance(ClusterService.class); + final CountDownLatch block1 = new CountDownLatch(1); + clusterService.submitStateUpdateTask("1", new ClusterStateUpdateTask() { + @Override + public ClusterState execute(ClusterState currentState) { + try { + block1.await(); + } catch (InterruptedException e) { + assert false; + } + return currentState; + } + + @Override + public void onFailure(String source, Throwable t) { + assert false; + } + }); + + for (int i = 2; i <= 10; i++) { + clusterService.submitStateUpdateTask(Integer.toString(i), new ClusterStateUpdateTask() { + @Override + public ClusterState execute(ClusterState currentState) { + try { + block1.await(); + } catch (InterruptedException e) { + assert false; + } + return currentState; + } + + @Override + public void onFailure(String source, Throwable t) { + assert false; + } + }); + } + + List pendingClusterTasks = clusterService.pendingTasks(); + assertThat(pendingClusterTasks.size(), equalTo(9)); + int counter = 2; + for (PendingClusterTask task : pendingClusterTasks) { + assertThat(task.source().string(), equalTo("" + counter++)); + } + + PendingClusterTasksResponse response = client.admin().cluster().preparePendingClusterTasks().execute().actionGet(); + assertThat(response.pendingTasks().size(), equalTo(9)); + counter = 2; + for (PendingClusterTask task : response) { + assertThat(task.source().string(), equalTo("" + counter++)); + } + block1.countDown(); + Thread.sleep(500); + + pendingClusterTasks = clusterService.pendingTasks(); + assertThat(pendingClusterTasks, empty()); + response = client.admin().cluster().preparePendingClusterTasks().execute().actionGet(); + assertThat(response.pendingTasks(), empty()); + + final CountDownLatch block2 = new CountDownLatch(1); + clusterService.submitStateUpdateTask("1", new ClusterStateUpdateTask() { + @Override + public ClusterState execute(ClusterState currentState) { + try { + block2.await(); + } catch (InterruptedException e) { + assert false; + } + return currentState; + } + + @Override + public void onFailure(String source, Throwable t) { + assert false; + } + }); + + for (int i = 2; i <= 5; i++) { + clusterService.submitStateUpdateTask(Integer.toString(i), new ClusterStateUpdateTask() { + @Override + public ClusterState execute(ClusterState currentState) { + try { + block2.await(); + } catch (InterruptedException e) { + assert false; + } + return currentState; + } + + @Override + public void onFailure(String source, Throwable t) { + assert false; + } + }); + } + Thread.sleep(100); + + pendingClusterTasks = clusterService.pendingTasks(); + assertThat(pendingClusterTasks.size(), equalTo(4)); + counter = 2; + for (PendingClusterTask task : pendingClusterTasks) { + assertThat(task.source().string(), equalTo("" + counter++)); + } + + response = client.admin().cluster().preparePendingClusterTasks().execute().actionGet(); + assertThat(response.pendingTasks().size(), equalTo(4)); + counter = 2; + for (PendingClusterTask task : response) { + assertThat(task.source().string(), equalTo("" + counter++)); + assertThat(task.getTimeInQueueInMillis(), greaterThan(0l)); + } + block2.countDown(); + } + @Test public void testListenerCallbacks() throws Exception { Settings settings = settingsBuilder()