Added cluster pending tasks api.

Closes #3368
This commit is contained in:
Martijn van Groningen 2013-07-23 15:32:56 +02:00
parent 69a7f8d71d
commit b52243cdc2
14 changed files with 664 additions and 0 deletions

View File

@ -40,6 +40,8 @@ import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsAction;
import org.elasticsearch.action.admin.cluster.shards.TransportClusterSearchShardsAction;
import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
import org.elasticsearch.action.admin.cluster.state.TransportClusterStateAction;
import org.elasticsearch.action.admin.cluster.tasks.PendingClusterTasksAction;
import org.elasticsearch.action.admin.cluster.tasks.TransportPendingClusterTasksAction;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesAction;
import org.elasticsearch.action.admin.indices.alias.TransportIndicesAliasesAction;
import org.elasticsearch.action.admin.indices.alias.exists.AliasesExistAction;
@ -186,6 +188,7 @@ public class ActionModule extends AbstractModule {
registerAction(ClusterUpdateSettingsAction.INSTANCE, TransportClusterUpdateSettingsAction.class);
registerAction(ClusterRerouteAction.INSTANCE, TransportClusterRerouteAction.class);
registerAction(ClusterSearchShardsAction.INSTANCE, TransportClusterSearchShardsAction.class);
registerAction(PendingClusterTasksAction.INSTANCE, TransportPendingClusterTasksAction.class);
registerAction(IndicesStatsAction.INSTANCE, TransportIndicesStatsAction.class);
registerAction(IndicesStatusAction.INSTANCE, TransportIndicesStatusAction.class);

View File

@ -0,0 +1,45 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.cluster.tasks;
import org.elasticsearch.action.admin.cluster.ClusterAction;
import org.elasticsearch.client.ClusterAdminClient;
/**
*/
public class PendingClusterTasksAction extends ClusterAction<PendingClusterTasksRequest, PendingClusterTasksResponse, PendingClusterTasksRequestBuilder> {
public static final PendingClusterTasksAction INSTANCE = new PendingClusterTasksAction();
public static final String NAME = "cluster/task";
private PendingClusterTasksAction() {
super(NAME);
}
@Override
public PendingClusterTasksResponse newResponse() {
return new PendingClusterTasksResponse();
}
@Override
public PendingClusterTasksRequestBuilder newRequestBuilder(ClusterAdminClient client) {
return new PendingClusterTasksRequestBuilder(client);
}
}

View File

@ -0,0 +1,33 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.cluster.tasks;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.support.master.MasterNodeOperationRequest;
/**
*/
public class PendingClusterTasksRequest extends MasterNodeOperationRequest<PendingClusterTasksRequest> {
@Override
public ActionRequestValidationException validate() {
return null;
}
}

View File

@ -0,0 +1,39 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.cluster.tasks;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder;
import org.elasticsearch.client.ClusterAdminClient;
import org.elasticsearch.client.internal.InternalClusterAdminClient;
/**
*/
public class PendingClusterTasksRequestBuilder extends MasterNodeOperationRequestBuilder<PendingClusterTasksRequest, PendingClusterTasksResponse, PendingClusterTasksRequestBuilder> {
public PendingClusterTasksRequestBuilder(ClusterAdminClient client) {
super((InternalClusterAdminClient) client, new PendingClusterTasksRequest());
}
@Override
protected void doExecute(ActionListener<PendingClusterTasksResponse> listener) {
((InternalClusterAdminClient) client).pendingClusterTasks(request, listener);
}
}

View File

@ -0,0 +1,82 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.cluster.tasks;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.cluster.service.PendingClusterTask;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
/**
*/
public class PendingClusterTasksResponse extends ActionResponse implements Iterable<PendingClusterTask> {
private List<PendingClusterTask> pendingTasks;
PendingClusterTasksResponse() {
}
PendingClusterTasksResponse(List<PendingClusterTask> pendingTasks) {
this.pendingTasks = pendingTasks;
}
public List<PendingClusterTask> pendingTasks() {
return pendingTasks;
}
/**
* The pending cluster tasks
*/
public List<PendingClusterTask> getPendingTasks() {
return pendingTasks();
}
@Override
public Iterator<PendingClusterTask> iterator() {
return pendingTasks.iterator();
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
int size = in.readVInt();
pendingTasks = new ArrayList<PendingClusterTask>(size);
for (int i = 0; i < size; i++) {
PendingClusterTask task = new PendingClusterTask();
task.readFrom(in);
pendingTasks.add(task);
}
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeVInt(pendingTasks.size());
for (PendingClusterTask task : pendingTasks) {
task.writeTo(out);
}
}
}

View File

@ -0,0 +1,69 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.cluster.tasks;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.master.TransportMasterNodeOperationAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
/**
*/
public class TransportPendingClusterTasksAction extends TransportMasterNodeOperationAction<PendingClusterTasksRequest, PendingClusterTasksResponse> {
private final ClusterService clusterService;
@Inject
public TransportPendingClusterTasksAction(Settings settings, TransportService transportService, ClusterService clusterService, ThreadPool threadPool) {
super(settings, transportService, clusterService, threadPool);
this.clusterService = clusterService;
}
@Override
protected String transportAction() {
return PendingClusterTasksAction.NAME;
}
@Override
protected String executor() {
// very lightweight operation in memory, no need to fork to a thread
return ThreadPool.Names.SAME;
}
@Override
protected PendingClusterTasksRequest newRequest() {
return new PendingClusterTasksRequest();
}
@Override
protected PendingClusterTasksResponse newResponse() {
return new PendingClusterTasksResponse();
}
@Override
protected void masterOperation(PendingClusterTasksRequest request, ClusterState state, ActionListener<PendingClusterTasksResponse> listener) throws ElasticSearchException {
listener.onResponse(new PendingClusterTasksResponse(clusterService.pendingTasks()));
}
}

View File

@ -51,6 +51,9 @@ import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequestBuilder;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.admin.cluster.tasks.PendingClusterTasksRequest;
import org.elasticsearch.action.admin.cluster.tasks.PendingClusterTasksRequestBuilder;
import org.elasticsearch.action.admin.cluster.tasks.PendingClusterTasksResponse;
/**
* Administrative actions/operations against indices.
@ -259,4 +262,22 @@ public interface ClusterAdminClient {
*/
ClusterSearchShardsRequestBuilder prepareSearchShards(String... indices);
/**
* Returns a list of the pending cluster tasks, that are scheduled to be executed. This includes operations
* that update the cluster state (for example, a create index operation)
*/
void pendingClusterTasks(PendingClusterTasksRequest request, ActionListener<PendingClusterTasksResponse> listener);
/**
* Returns a list of the pending cluster tasks, that are scheduled to be executed. This includes operations
* that update the cluster state (for example, a create index operation)
*/
ActionFuture<PendingClusterTasksResponse> pendingClusterTasks(PendingClusterTasksRequest request);
/**
* Returns a list of the pending cluster tasks, that are scheduled to be executed. This includes operations
* that update the cluster state (for example, a create index operation)
*/
PendingClusterTasksRequestBuilder preparePendingClusterTasks();
}

View File

@ -61,6 +61,10 @@ import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequestBuilder;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.admin.cluster.tasks.PendingClusterTasksAction;
import org.elasticsearch.action.admin.cluster.tasks.PendingClusterTasksRequest;
import org.elasticsearch.action.admin.cluster.tasks.PendingClusterTasksRequestBuilder;
import org.elasticsearch.action.admin.cluster.tasks.PendingClusterTasksResponse;
import org.elasticsearch.client.internal.InternalClusterAdminClient;
/**
@ -228,5 +232,18 @@ public abstract class AbstractClusterAdminClient implements InternalClusterAdmin
return new ClusterSearchShardsRequestBuilder(this).setIndices(indices);
}
@Override
public PendingClusterTasksRequestBuilder preparePendingClusterTasks() {
return new PendingClusterTasksRequestBuilder(this);
}
@Override
public ActionFuture<PendingClusterTasksResponse> pendingClusterTasks(PendingClusterTasksRequest request) {
return execute(PendingClusterTasksAction.INSTANCE, request);
}
@Override
public void pendingClusterTasks(PendingClusterTasksRequest request, ActionListener<PendingClusterTasksResponse> listener) {
execute(PendingClusterTasksAction.INSTANCE, request, listener);
}
}

View File

@ -23,10 +23,13 @@ import org.elasticsearch.ElasticSearchIllegalStateException;
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.operation.OperationRouting;
import org.elasticsearch.cluster.service.PendingClusterTask;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.component.LifecycleComponent;
import org.elasticsearch.common.unit.TimeValue;
import java.util.List;
/**
* The cluster service allowing to both register for cluster state events ({@link ClusterStateListener})
* and submit state update tasks ({@link ClusterStateUpdateTask}.
@ -97,4 +100,9 @@ public interface ClusterService extends LifecycleComponent<ClusterService> {
* Submits a task that will update the cluster state (the task has a default priority of {@link Priority#NORMAL}).
*/
void submitStateUpdateTask(final String source, final ClusterStateUpdateTask updateTask);
/**
* Returns the tasks that are pending.
*/
List<PendingClusterTask> pendingTasks();
}

View File

@ -34,6 +34,7 @@ import org.elasticsearch.common.Priority;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.text.StringText;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.EsExecutors;
@ -45,6 +46,7 @@ import org.elasticsearch.node.settings.NodeSettingsService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Queue;
@ -236,10 +238,33 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
}
}
@Override
public List<PendingClusterTask> pendingTasks() {
long now = System.currentTimeMillis();
PrioritizedEsThreadPoolExecutor.Pending[] pendings = updateTasksExecutor.getPending();
List<PendingClusterTask> pendingClusterTasks = new ArrayList<PendingClusterTask>(pendings.length);
for (PrioritizedEsThreadPoolExecutor.Pending pending : pendings) {
final String source;
final long timeInQueue;
if (pending.task instanceof UpdateTask) {
UpdateTask updateTask = (UpdateTask) pending.task;
source = updateTask.source;
timeInQueue = now - updateTask.addedAt;
} else {
source = "unknown";
timeInQueue = -1;
}
pendingClusterTasks.add(new PendingClusterTask(pending.insertionOrder, pending.priority, new StringText(source), timeInQueue));
}
return pendingClusterTasks;
}
class UpdateTask extends PrioritizedRunnable {
public final String source;
public final ClusterStateUpdateTask updateTask;
public final long addedAt = System.currentTimeMillis();
UpdateTask(String source, Priority priority, ClusterStateUpdateTask updateTask) {
super(priority);

View File

@ -0,0 +1,101 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.service;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.text.Text;
import org.elasticsearch.common.unit.TimeValue;
import java.io.IOException;
/**
*/
public class PendingClusterTask implements Streamable {
private long insertOrder;
private Priority priority;
private Text source;
private long timeInQueue;
public PendingClusterTask() {
}
public PendingClusterTask(long insertOrder, Priority priority, Text source, long timeInQueue) {
this.insertOrder = insertOrder;
this.priority = priority;
this.source = source;
this.timeInQueue = timeInQueue;
}
public long insertOrder() {
return insertOrder;
}
public long getInsertOrder() {
return insertOrder();
}
public Priority priority() {
return priority;
}
public Priority getPriority() {
return priority();
}
public Text source() {
return source;
}
public Text getSource() {
return source();
}
public long timeInQueueInMillis() {
return timeInQueue;
}
public long getTimeInQueueInMillis() {
return timeInQueueInMillis();
}
public TimeValue getTimeInQueue() {
return new TimeValue(getTimeInQueueInMillis());
}
@Override
public void readFrom(StreamInput in) throws IOException {
insertOrder = in.readVLong();
priority = Priority.fromByte(in.readByte());
source = in.readText();
timeInQueue = in.readVLong();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(insertOrder);
out.writeByte(priority.value());
out.writeText(source);
out.writeVLong(timeInQueue);
}
}

View File

@ -33,6 +33,7 @@ import org.elasticsearch.rest.action.admin.cluster.settings.RestClusterGetSettin
import org.elasticsearch.rest.action.admin.cluster.settings.RestClusterUpdateSettingsAction;
import org.elasticsearch.rest.action.admin.cluster.shards.RestClusterSearchShardsAction;
import org.elasticsearch.rest.action.admin.cluster.state.RestClusterStateAction;
import org.elasticsearch.rest.action.admin.cluster.tasks.RestPendingClusterTasksAction;
import org.elasticsearch.rest.action.admin.indices.alias.RestGetIndicesAliasesAction;
import org.elasticsearch.rest.action.admin.indices.alias.RestIndicesAliasesAction;
import org.elasticsearch.rest.action.admin.indices.alias.delete.RestIndexDeleteAliasesAction;
@ -121,6 +122,7 @@ public class RestActionModule extends AbstractModule {
bind(RestClusterGetSettingsAction.class).asEagerSingleton();
bind(RestClusterRerouteAction.class).asEagerSingleton();
bind(RestClusterSearchShardsAction.class).asEagerSingleton();
bind(RestPendingClusterTasksAction.class).asEagerSingleton();
bind(RestIndicesExistsAction.class).asEagerSingleton();
bind(RestTypesExistsAction.class).asEagerSingleton();

View File

@ -0,0 +1,97 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.rest.action.admin.cluster.tasks;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.tasks.PendingClusterTasksRequest;
import org.elasticsearch.action.admin.cluster.tasks.PendingClusterTasksResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.service.PendingClusterTask;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentBuilderString;
import org.elasticsearch.rest.*;
import org.elasticsearch.rest.action.support.RestXContentBuilder;
import java.io.IOException;
/**
*/
public class RestPendingClusterTasksAction extends BaseRestHandler {
@Inject
public RestPendingClusterTasksAction(Settings settings, Client client, RestController controller) {
super(settings, client);
controller.registerHandler(RestRequest.Method.GET, "/_cluster/pending_tasks", this);
}
@Override
public void handleRequest(final RestRequest request, final RestChannel channel) {
PendingClusterTasksRequest pendingClusterTasksRequest = new PendingClusterTasksRequest();
client.admin().cluster().pendingClusterTasks(pendingClusterTasksRequest, new ActionListener<PendingClusterTasksResponse>() {
@Override
public void onResponse(PendingClusterTasksResponse response) {
try {
XContentBuilder builder = RestXContentBuilder.restContentBuilder(request);
builder.startObject();
builder.startArray(Fields.TASKS);
for (PendingClusterTask pendingClusterTask : response) {
builder.startObject();
builder.field(Fields.INSERT_ORDER, pendingClusterTask.insertOrder());
builder.field(Fields.PRIORITY, pendingClusterTask.priority());
builder.field(Fields.SOURCE, pendingClusterTask.source());
builder.field(Fields.TIME_IN_QUEUE_MILLIS, pendingClusterTask.timeInQueueInMillis());
builder.field(Fields.TIME_IN_QUEUE, pendingClusterTask.getTimeInQueue());
builder.endObject();
}
builder.endArray();
channel.sendResponse(new XContentRestResponse(request, RestStatus.OK, builder));
} catch (Throwable e) {
onFailure(e);
}
}
@Override
public void onFailure(Throwable e) {
if (logger.isDebugEnabled()) {
logger.debug("failed to get pending cluster tasks", e);
}
try {
channel.sendResponse(new XContentThrowableRestResponse(request, e));
} catch (IOException e1) {
logger.error("Failed to send failure response", e1);
}
}
});
}
static final class Fields {
static final XContentBuilderString TASKS = new XContentBuilderString("tasks");
static final XContentBuilderString INSERT_ORDER = new XContentBuilderString("insert_order");
static final XContentBuilderString PRIORITY = new XContentBuilderString("proirity");
static final XContentBuilderString SOURCE = new XContentBuilderString("source");
static final XContentBuilderString TIME_IN_QUEUE_MILLIS = new XContentBuilderString("time_in_queue_millis");
static final XContentBuilderString TIME_IN_QUEUE = new XContentBuilderString("time_in_queue");
}
}

View File

@ -21,7 +21,10 @@ package org.elasticsearch.test.integration.cluster;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.tasks.PendingClusterTasksResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.*;
import org.elasticsearch.cluster.service.PendingClusterTask;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.component.LifecycleComponent;
@ -114,6 +117,125 @@ public class ClusterServiceTests extends AbstractZenNodesTests {
assertThat(executeCalled.get(), equalTo(false));
}
@Test
public void testPendingUpdateTask() throws Exception {
InternalNode node1 = (InternalNode) startNode("node1");
Client client = startNode("client-node", settingsBuilder().put("node.client", true).build()).client();
ClusterService clusterService = node1.injector().getInstance(ClusterService.class);
final CountDownLatch block1 = new CountDownLatch(1);
clusterService.submitStateUpdateTask("1", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
try {
block1.await();
} catch (InterruptedException e) {
assert false;
}
return currentState;
}
@Override
public void onFailure(String source, Throwable t) {
assert false;
}
});
for (int i = 2; i <= 10; i++) {
clusterService.submitStateUpdateTask(Integer.toString(i), new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
try {
block1.await();
} catch (InterruptedException e) {
assert false;
}
return currentState;
}
@Override
public void onFailure(String source, Throwable t) {
assert false;
}
});
}
List<PendingClusterTask> pendingClusterTasks = clusterService.pendingTasks();
assertThat(pendingClusterTasks.size(), equalTo(9));
int counter = 2;
for (PendingClusterTask task : pendingClusterTasks) {
assertThat(task.source().string(), equalTo("" + counter++));
}
PendingClusterTasksResponse response = client.admin().cluster().preparePendingClusterTasks().execute().actionGet();
assertThat(response.pendingTasks().size(), equalTo(9));
counter = 2;
for (PendingClusterTask task : response) {
assertThat(task.source().string(), equalTo("" + counter++));
}
block1.countDown();
Thread.sleep(500);
pendingClusterTasks = clusterService.pendingTasks();
assertThat(pendingClusterTasks, empty());
response = client.admin().cluster().preparePendingClusterTasks().execute().actionGet();
assertThat(response.pendingTasks(), empty());
final CountDownLatch block2 = new CountDownLatch(1);
clusterService.submitStateUpdateTask("1", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
try {
block2.await();
} catch (InterruptedException e) {
assert false;
}
return currentState;
}
@Override
public void onFailure(String source, Throwable t) {
assert false;
}
});
for (int i = 2; i <= 5; i++) {
clusterService.submitStateUpdateTask(Integer.toString(i), new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
try {
block2.await();
} catch (InterruptedException e) {
assert false;
}
return currentState;
}
@Override
public void onFailure(String source, Throwable t) {
assert false;
}
});
}
Thread.sleep(100);
pendingClusterTasks = clusterService.pendingTasks();
assertThat(pendingClusterTasks.size(), equalTo(4));
counter = 2;
for (PendingClusterTask task : pendingClusterTasks) {
assertThat(task.source().string(), equalTo("" + counter++));
}
response = client.admin().cluster().preparePendingClusterTasks().execute().actionGet();
assertThat(response.pendingTasks().size(), equalTo(4));
counter = 2;
for (PendingClusterTask task : response) {
assertThat(task.source().string(), equalTo("" + counter++));
assertThat(task.getTimeInQueueInMillis(), greaterThan(0l));
}
block2.countDown();
}
@Test
public void testListenerCallbacks() throws Exception {
Settings settings = settingsBuilder()