added priority support for cluster state updates:

* URGENT:
    * cluster_reroute (api)
    * refresh-mapping
    * cluster_update_settings
    * reroute_after_cluster_update_settings
    * create-index
    * delete-index
    * index-aliases
    * remove-index-template
    * create-index-template
    * update-mapping
    * remove-mapping
    * put-mapping
    * open-index
    * close-index
    * update-settings

* HIGH
    * routing-table-updater
    * zen-disco-node_left
    * zen-disco-master_failed
    * shard-failed
    * shard-started

* NORMAL
    * all other actions
This commit is contained in:
uboness 2013-02-05 22:17:49 +01:00
parent f5331c9535
commit 6d9048f8cc
20 changed files with 547 additions and 22 deletions

View File

@ -26,6 +26,7 @@ import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ProcessedClusterStateUpdateTask; import org.elasticsearch.cluster.ProcessedClusterStateUpdateTask;
import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
@ -75,7 +76,7 @@ public class TransportClusterRerouteAction extends TransportMasterNodeOperationA
final AtomicReference<ClusterState> clusterStateResponse = new AtomicReference<ClusterState>(); final AtomicReference<ClusterState> clusterStateResponse = new AtomicReference<ClusterState>();
final CountDownLatch latch = new CountDownLatch(1); final CountDownLatch latch = new CountDownLatch(1);
clusterService.submitStateUpdateTask("cluster_reroute (api)", new ProcessedClusterStateUpdateTask() { clusterService.submitStateUpdateTask("cluster_reroute (api)", Priority.URGENT, new ProcessedClusterStateUpdateTask() {
@Override @Override
public ClusterState execute(ClusterState currentState) { public ClusterState execute(ClusterState currentState) {
try { try {

View File

@ -29,6 +29,7 @@ import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
@ -80,7 +81,7 @@ public class TransportClusterUpdateSettingsAction extends TransportMasterNodeOpe
final AtomicReference<Throwable> failureRef = new AtomicReference<Throwable>(); final AtomicReference<Throwable> failureRef = new AtomicReference<Throwable>();
final CountDownLatch latch = new CountDownLatch(1); final CountDownLatch latch = new CountDownLatch(1);
clusterService.submitStateUpdateTask("cluster_update_settings", new ProcessedClusterStateUpdateTask() { clusterService.submitStateUpdateTask("cluster_update_settings", Priority.URGENT, new ProcessedClusterStateUpdateTask() {
@Override @Override
public ClusterState execute(ClusterState currentState) { public ClusterState execute(ClusterState currentState) {
try { try {
@ -137,7 +138,7 @@ public class TransportClusterUpdateSettingsAction extends TransportMasterNodeOpe
@Override @Override
public void clusterStateProcessed(ClusterState clusterState) { public void clusterStateProcessed(ClusterState clusterState) {
// now, reroute // now, reroute
clusterService.submitStateUpdateTask("reroute_after_cluster_update_settings", new ClusterStateUpdateTask() { clusterService.submitStateUpdateTask("reroute_after_cluster_update_settings", Priority.URGENT, new ClusterStateUpdateTask() {
@Override @Override
public ClusterState execute(ClusterState currentState) { public ClusterState execute(ClusterState currentState) {
try { try {

View File

@ -23,6 +23,7 @@ import org.elasticsearch.ElasticSearchIllegalStateException;
import org.elasticsearch.cluster.block.ClusterBlock; import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.operation.OperationRouting; import org.elasticsearch.cluster.routing.operation.OperationRouting;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.component.LifecycleComponent; import org.elasticsearch.common.component.LifecycleComponent;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
@ -90,5 +91,10 @@ public interface ClusterService extends LifecycleComponent<ClusterService> {
/** /**
* Submits a task that will update the cluster state. * Submits a task that will update the cluster state.
*/ */
void submitStateUpdateTask(final String source, Priority priority, final ClusterStateUpdateTask updateTask);
/**
* Submits a task that will update the cluster state (the task has a default priority of {@link Priority#NORMAL}).
*/
void submitStateUpdateTask(final String source, final ClusterStateUpdateTask updateTask); void submitStateUpdateTask(final String source, final ClusterStateUpdateTask updateTask);
} }

View File

@ -30,6 +30,7 @@ import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
@ -108,7 +109,7 @@ public class ShardStateAction extends AbstractComponent {
private void innerShardFailed(final ShardRouting shardRouting, final String reason) { private void innerShardFailed(final ShardRouting shardRouting, final String reason) {
logger.warn("received shard failed for {}, reason [{}]", shardRouting, reason); logger.warn("received shard failed for {}, reason [{}]", shardRouting, reason);
clusterService.submitStateUpdateTask("shard-failed (" + shardRouting + "), reason [" + reason + "]", new ClusterStateUpdateTask() { clusterService.submitStateUpdateTask("shard-failed (" + shardRouting + "), reason [" + reason + "]", Priority.HIGH, new ClusterStateUpdateTask() {
@Override @Override
public ClusterState execute(ClusterState currentState) { public ClusterState execute(ClusterState currentState) {
if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {
@ -136,7 +137,7 @@ public class ShardStateAction extends AbstractComponent {
// process started events as fast as possible, to make shards available // process started events as fast as possible, to make shards available
startedShardsQueue.add(shardRouting); startedShardsQueue.add(shardRouting);
clusterService.submitStateUpdateTask("shard-started (" + shardRouting + "), reason [" + reason + "]", new ClusterStateUpdateTask() { clusterService.submitStateUpdateTask("shard-started (" + shardRouting + "), reason [" + reason + "]", Priority.HIGH, new ClusterStateUpdateTask() {
@Override @Override
public ClusterState execute(ClusterState currentState) { public ClusterState execute(ClusterState currentState) {

View File

@ -34,6 +34,7 @@ import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.Strings; import org.elasticsearch.common.Strings;
import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.compress.CompressedString; import org.elasticsearch.common.compress.CompressedString;
@ -131,7 +132,7 @@ public class MetaDataCreateIndexService extends AbstractComponent {
final CreateIndexListener listener = new CreateIndexListener(mdLock, request, userListener); final CreateIndexListener listener = new CreateIndexListener(mdLock, request, userListener);
clusterService.submitStateUpdateTask("create-index [" + request.index + "], cause [" + request.cause + "]", new ProcessedClusterStateUpdateTask() { clusterService.submitStateUpdateTask("create-index [" + request.index + "], cause [" + request.cause + "]", Priority.URGENT, new ProcessedClusterStateUpdateTask() {
@Override @Override
public ClusterState execute(ClusterState currentState) { public ClusterState execute(ClusterState currentState) {
boolean indexCreated = false; boolean indexCreated = false;

View File

@ -27,6 +27,7 @@ import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;

View File

@ -26,6 +26,7 @@ import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ProcessedClusterStateUpdateTask; import org.elasticsearch.cluster.ProcessedClusterStateUpdateTask;
import org.elasticsearch.cluster.action.index.NodeAliasesUpdatedAction; import org.elasticsearch.cluster.action.index.NodeAliasesUpdatedAction;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.Strings; import org.elasticsearch.common.Strings;
import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
@ -69,7 +70,7 @@ public class MetaDataIndexAliasesService extends AbstractComponent {
} }
public void indicesAliases(final Request request, final Listener listener) { public void indicesAliases(final Request request, final Listener listener) {
clusterService.submitStateUpdateTask("index-aliases", new ProcessedClusterStateUpdateTask() { clusterService.submitStateUpdateTask("index-aliases", Priority.URGENT, new ProcessedClusterStateUpdateTask() {
@Override @Override
public ClusterState execute(ClusterState currentState) { public ClusterState execute(ClusterState currentState) {

View File

@ -25,6 +25,7 @@ import org.elasticsearch.ElasticSearchIllegalArgumentException;
import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ProcessedClusterStateUpdateTask; import org.elasticsearch.cluster.ProcessedClusterStateUpdateTask;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.Strings; import org.elasticsearch.common.Strings;
import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
@ -50,7 +51,7 @@ public class MetaDataIndexTemplateService extends AbstractComponent {
} }
public void removeTemplate(final RemoveRequest request, final RemoveListener listener) { public void removeTemplate(final RemoveRequest request, final RemoveListener listener) {
clusterService.submitStateUpdateTask("remove-index-template [" + request.name + "]", new ProcessedClusterStateUpdateTask() { clusterService.submitStateUpdateTask("remove-index-template [" + request.name + "]", Priority.URGENT, new ProcessedClusterStateUpdateTask() {
@Override @Override
public ClusterState execute(ClusterState currentState) { public ClusterState execute(ClusterState currentState) {
if (!currentState.metaData().templates().containsKey(request.name)) { if (!currentState.metaData().templates().containsKey(request.name)) {
@ -115,7 +116,7 @@ public class MetaDataIndexTemplateService extends AbstractComponent {
} }
final IndexTemplateMetaData template = templateBuilder.build(); final IndexTemplateMetaData template = templateBuilder.build();
clusterService.submitStateUpdateTask("create-index-template [" + request.name + "], cause [" + request.cause + "]", new ProcessedClusterStateUpdateTask() { clusterService.submitStateUpdateTask("create-index-template [" + request.name + "], cause [" + request.cause + "]", Priority.URGENT, new ProcessedClusterStateUpdateTask() {
@Override @Override
public ClusterState execute(ClusterState currentState) { public ClusterState execute(ClusterState currentState) {
if (request.create && currentState.metaData().templates().containsKey(request.name)) { if (request.create && currentState.metaData().templates().containsKey(request.name)) {

View File

@ -28,6 +28,7 @@ import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.ProcessedClusterStateUpdateTask; import org.elasticsearch.cluster.ProcessedClusterStateUpdateTask;
import org.elasticsearch.cluster.action.index.NodeMappingCreatedAction; import org.elasticsearch.cluster.action.index.NodeMappingCreatedAction;
import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.compress.CompressedString; import org.elasticsearch.common.compress.CompressedString;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
@ -90,7 +91,7 @@ public class MetaDataMappingService extends AbstractComponent {
} }
sTypes.addAll(Arrays.asList(types)); sTypes.addAll(Arrays.asList(types));
} }
clusterService.submitStateUpdateTask("refresh-mapping [" + index + "][" + Arrays.toString(types) + "]", new ClusterStateUpdateTask() { clusterService.submitStateUpdateTask("refresh-mapping [" + index + "][" + Arrays.toString(types) + "]", Priority.URGENT, new ClusterStateUpdateTask() {
@Override @Override
public ClusterState execute(ClusterState currentState) { public ClusterState execute(ClusterState currentState) {
boolean createdIndex = false; boolean createdIndex = false;
@ -155,7 +156,7 @@ public class MetaDataMappingService extends AbstractComponent {
} }
public void updateMapping(final String index, final String type, final CompressedString mappingSource, final Listener listener) { public void updateMapping(final String index, final String type, final CompressedString mappingSource, final Listener listener) {
clusterService.submitStateUpdateTask("update-mapping [" + index + "][" + type + "]", new ProcessedClusterStateUpdateTask() { clusterService.submitStateUpdateTask("update-mapping [" + index + "][" + type + "]", Priority.URGENT, new ProcessedClusterStateUpdateTask() {
@Override @Override
public ClusterState execute(ClusterState currentState) { public ClusterState execute(ClusterState currentState) {
boolean createdIndex = false; boolean createdIndex = false;
@ -222,7 +223,7 @@ public class MetaDataMappingService extends AbstractComponent {
public void removeMapping(final RemoveRequest request, final Listener listener) { public void removeMapping(final RemoveRequest request, final Listener listener) {
final AtomicBoolean notifyOnPostProcess = new AtomicBoolean(); final AtomicBoolean notifyOnPostProcess = new AtomicBoolean();
clusterService.submitStateUpdateTask("remove-mapping [" + request.mappingType + "]", new ProcessedClusterStateUpdateTask() { clusterService.submitStateUpdateTask("remove-mapping [" + request.mappingType + "]", Priority.URGENT, new ProcessedClusterStateUpdateTask() {
@Override @Override
public ClusterState execute(ClusterState currentState) { public ClusterState execute(ClusterState currentState) {
if (request.indices.length == 0) { if (request.indices.length == 0) {
@ -272,7 +273,7 @@ public class MetaDataMappingService extends AbstractComponent {
public void putMapping(final PutRequest request, final Listener listener) { public void putMapping(final PutRequest request, final Listener listener) {
final AtomicBoolean notifyOnPostProcess = new AtomicBoolean(); final AtomicBoolean notifyOnPostProcess = new AtomicBoolean();
clusterService.submitStateUpdateTask("put-mapping [" + request.mappingType + "]", new ProcessedClusterStateUpdateTask() { clusterService.submitStateUpdateTask("put-mapping [" + request.mappingType + "]", Priority.URGENT, new ProcessedClusterStateUpdateTask() {
@Override @Override
public ClusterState execute(ClusterState currentState) { public ClusterState execute(ClusterState currentState) {
List<String> indicesToClose = Lists.newArrayList(); List<String> indicesToClose = Lists.newArrayList();

View File

@ -28,6 +28,7 @@ import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
@ -57,7 +58,7 @@ public class MetaDataStateIndexService extends AbstractComponent {
} }
public void closeIndex(final Request request, final Listener listener) { public void closeIndex(final Request request, final Listener listener) {
clusterService.submitStateUpdateTask("close-index [" + request.index + "]", new ProcessedClusterStateUpdateTask() { clusterService.submitStateUpdateTask("close-index [" + request.index + "]", Priority.URGENT, new ProcessedClusterStateUpdateTask() {
@Override @Override
public ClusterState execute(ClusterState currentState) { public ClusterState execute(ClusterState currentState) {
@ -100,7 +101,7 @@ public class MetaDataStateIndexService extends AbstractComponent {
} }
public void openIndex(final Request request, final Listener listener) { public void openIndex(final Request request, final Listener listener) {
clusterService.submitStateUpdateTask("open-index [" + request.index + "]", new ProcessedClusterStateUpdateTask() { clusterService.submitStateUpdateTask("open-index [" + request.index + "]", Priority.URGENT, new ProcessedClusterStateUpdateTask() {
@Override @Override
public ClusterState execute(ClusterState currentState) { public ClusterState execute(ClusterState currentState) {

View File

@ -27,6 +27,7 @@ import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.common.Booleans; import org.elasticsearch.common.Booleans;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.ImmutableSettings;
@ -149,7 +150,7 @@ public class MetaDataUpdateSettingsService extends AbstractComponent implements
} }
final Settings openSettings = updatedSettingsBuilder.build(); final Settings openSettings = updatedSettingsBuilder.build();
clusterService.submitStateUpdateTask("update-settings", new ProcessedClusterStateUpdateTask() { clusterService.submitStateUpdateTask("update-settings", Priority.URGENT, new ProcessedClusterStateUpdateTask() {
@Override @Override
public ClusterState execute(ClusterState currentState) { public ClusterState execute(ClusterState currentState) {
try { try {

View File

@ -24,6 +24,7 @@ import org.elasticsearch.cluster.*;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
@ -138,7 +139,7 @@ public class RoutingService extends AbstractLifecycleComponent<RoutingService> i
if (lifecycle.stopped()) { if (lifecycle.stopped()) {
return; return;
} }
clusterService.submitStateUpdateTask(CLUSTER_UPDATE_TASK_SOURCE, new ClusterStateUpdateTask() { clusterService.submitStateUpdateTask(CLUSTER_UPDATE_TASK_SOURCE, Priority.HIGH, new ClusterStateUpdateTask() {
@Override @Override
public ClusterState execute(ClusterState currentState) { public ClusterState execute(ClusterState currentState) {
RoutingAllocation.Result routingResult = allocationService.reroute(currentState); RoutingAllocation.Result routingResult = allocationService.reroute(currentState);

View File

@ -29,11 +29,14 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.operation.OperationRouting; import org.elasticsearch.cluster.routing.operation.OperationRouting;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.PrioritizedRunnable;
import org.elasticsearch.discovery.Discovery; import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.discovery.DiscoveryService; import org.elasticsearch.discovery.DiscoveryService;
import org.elasticsearch.node.settings.NodeSettingsService; import org.elasticsearch.node.settings.NodeSettingsService;
@ -45,7 +48,6 @@ import java.util.List;
import java.util.Queue; import java.util.Queue;
import java.util.concurrent.*; import java.util.concurrent.*;
import static java.util.concurrent.Executors.newSingleThreadExecutor;
import static org.elasticsearch.cluster.ClusterState.Builder; import static org.elasticsearch.cluster.ClusterState.Builder;
import static org.elasticsearch.cluster.ClusterState.newClusterStateBuilder; import static org.elasticsearch.cluster.ClusterState.newClusterStateBuilder;
import static org.elasticsearch.common.util.concurrent.EsExecutors.daemonThreadFactory; import static org.elasticsearch.common.util.concurrent.EsExecutors.daemonThreadFactory;
@ -114,7 +116,7 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
protected void doStart() throws ElasticSearchException { protected void doStart() throws ElasticSearchException {
add(localNodeMasterListeners); add(localNodeMasterListeners);
this.clusterState = newClusterStateBuilder().blocks(initialBlocks).build(); this.clusterState = newClusterStateBuilder().blocks(initialBlocks).build();
this.updateTasksExecutor = newSingleThreadExecutor(daemonThreadFactory(settings, "clusterService#updateTask")); this.updateTasksExecutor = EsExecutors.newSinglePrioritizingThreadExecutor(daemonThreadFactory(settings, "clusterService#updateTask"));
this.reconnectToNodes = threadPool.schedule(reconnectInterval, ThreadPool.Names.GENERIC, new ReconnectToNodes()); this.reconnectToNodes = threadPool.schedule(reconnectInterval, ThreadPool.Names.GENERIC, new ReconnectToNodes());
} }
@ -206,10 +208,14 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
} }
public void submitStateUpdateTask(final String source, final ClusterStateUpdateTask updateTask) { public void submitStateUpdateTask(final String source, final ClusterStateUpdateTask updateTask) {
submitStateUpdateTask(source, Priority.NORMAL, updateTask);
}
public void submitStateUpdateTask(final String source, Priority priority, final ClusterStateUpdateTask updateTask) {
if (!lifecycle.started()) { if (!lifecycle.started()) {
return; return;
} }
updateTasksExecutor.execute(new Runnable() { updateTasksExecutor.execute(new PrioritizedRunnable(priority) {
@Override @Override
public void run() { public void run() {
if (!lifecycle.started()) { if (!lifecycle.started()) {

View File

@ -0,0 +1,75 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common;
/**
*
*/
public final class Priority implements Comparable<Priority> {
public static Priority URGENT = new Priority((byte) 0);
public static Priority HIGH = new Priority((byte) 1);
public static Priority NORMAL = new Priority((byte) 2);
public static Priority LOW = new Priority((byte) 3);
public static Priority LANGUID = new Priority((byte) 4);
private final byte value;
private Priority(byte value) {
this.value = value;
}
public int compareTo(Priority p) {
return this.value - p.value;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || Priority.class != o.getClass()) return false;
Priority priority = (Priority) o;
if (value != priority.value) return false;
return true;
}
@Override
public int hashCode() {
return (int) value;
}
@Override
public String toString() {
switch (value) {
case (byte) 0:
return "URGENT";
case (byte) 1:
return "HIGH";
case (byte) 2:
return "NORMAL";
case (byte) 3:
return "LOW";
default:
return "LANGUID";
}
}
}

View File

@ -31,6 +31,10 @@ import java.util.concurrent.atomic.AtomicInteger;
*/ */
public class EsExecutors { public class EsExecutors {
public static EsThreadPoolExecutor newSinglePrioritizingThreadExecutor(ThreadFactory threadFactory) {
return new PrioritizedEsThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, threadFactory);
}
public static EsThreadPoolExecutor newScalingExecutorService(int min, int max, long keepAliveTime, TimeUnit unit, public static EsThreadPoolExecutor newScalingExecutorService(int min, int max, long keepAliveTime, TimeUnit unit,
ThreadFactory threadFactory) { ThreadFactory threadFactory) {
ExecutorScalingQueue<Runnable> queue = new ExecutorScalingQueue<Runnable>(); ExecutorScalingQueue<Runnable> queue = new ExecutorScalingQueue<Runnable>();

View File

@ -0,0 +1,64 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.util.concurrent;
import org.elasticsearch.common.Priority;
import java.util.concurrent.Callable;
/**
*
*/
public abstract class PrioritizedCallable<T> implements Callable<T>, Comparable<PrioritizedCallable> {
private final Priority priority;
public static <T> PrioritizedCallable<T> wrap(Callable<T> callable, Priority priority) {
return new Wrapped<T>(callable, priority);
}
protected PrioritizedCallable(Priority priority) {
this.priority = priority;
}
@Override
public int compareTo(PrioritizedCallable pc) {
return priority.compareTo(pc.priority);
}
public Priority priority() {
return priority;
}
static class Wrapped<T> extends PrioritizedCallable<T> {
private final Callable<T> callable;
private Wrapped(Callable<T> callable, Priority priority) {
super(priority);
this.callable = callable;
}
@Override
public T call() throws Exception {
return callable.call();
}
}
}

View File

@ -0,0 +1,95 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.util.concurrent;
import org.elasticsearch.common.Priority;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
/**
* A prioritizing executor which uses a priority queue as a work queue. The jobs that will be submitted will be treated
* as {@link PrioritizedRunnable} and/or {@link PrioritizedCallable}, those tasks that are not instances of these two will
* be wrapped and assign a default {@link Priority#NORMAL} priority.
*
* Note, if two tasks have the same priority, the first to arrive will be executed first (FIFO style).
*/
public class PrioritizedEsThreadPoolExecutor extends EsThreadPoolExecutor {
private AtomicLong tieBreaker = new AtomicLong(Long.MIN_VALUE);
public PrioritizedEsThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, new PriorityBlockingQueue<Runnable>(), threadFactory);
}
public PrioritizedEsThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, new PriorityBlockingQueue<Runnable>(), threadFactory, handler);
}
public PrioritizedEsThreadPoolExecutor(int corePoolSize, int initialWorkQueuSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, new PriorityBlockingQueue<Runnable>(initialWorkQueuSize), threadFactory, handler);
}
@Override
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
if (!(runnable instanceof PrioritizedRunnable)) {
runnable = PrioritizedRunnable.wrap(runnable, Priority.NORMAL);
}
return new PrioritizedFutureTask<T>((PrioritizedRunnable) runnable, value, tieBreaker.incrementAndGet());
}
@Override
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
if (!(callable instanceof PrioritizedCallable)) {
callable = PrioritizedCallable.wrap(callable, Priority.NORMAL);
}
return new PrioritizedFutureTask<T>((PrioritizedCallable<T>) callable, tieBreaker.incrementAndGet());
}
/**
*
*/
static class PrioritizedFutureTask<T> extends FutureTask<T> implements Comparable<PrioritizedFutureTask> {
private final Priority priority;
private final long tieBreaker;
public PrioritizedFutureTask(PrioritizedRunnable runnable, T value, long tieBreaker) {
super(runnable, value);
this.priority = runnable.priority();
this.tieBreaker = tieBreaker;
}
public PrioritizedFutureTask(PrioritizedCallable<T> callable, long tieBreaker) {
super(callable);
this.priority = callable.priority();
this.tieBreaker = tieBreaker;
}
@Override
public int compareTo(PrioritizedFutureTask pft) {
int res = priority.compareTo(pft.priority);
if (res != 0) {
return res;
}
return tieBreaker < pft.tieBreaker ? -1 : 1;
}
}
}

View File

@ -0,0 +1,62 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.util.concurrent;
import org.elasticsearch.common.Priority;
/**
*
*/
public abstract class PrioritizedRunnable implements Runnable, Comparable<PrioritizedRunnable> {
private final Priority priority;
public static PrioritizedRunnable wrap(Runnable runnable, Priority priority) {
return new Wrapped(runnable, priority);
}
protected PrioritizedRunnable(Priority priority) {
this.priority = priority;
}
@Override
public int compareTo(PrioritizedRunnable pr) {
return priority.compareTo(pr.priority);
}
public Priority priority() {
return priority;
}
static class Wrapped extends PrioritizedRunnable {
private final Runnable runnable;
private Wrapped(Runnable runnable, Priority priority) {
super(priority);
this.runnable = runnable;
}
@Override
public void run() {
runnable.run();
}
}
}

View File

@ -33,6 +33,7 @@ import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.UUID; import org.elasticsearch.common.UUID;
import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.component.Lifecycle; import org.elasticsearch.common.component.Lifecycle;
@ -367,7 +368,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
return; return;
} }
if (master) { if (master) {
clusterService.submitStateUpdateTask("zen-disco-node_left(" + node + ")", new ClusterStateUpdateTask() { clusterService.submitStateUpdateTask("zen-disco-node_left(" + node + ")", Priority.HIGH, new ClusterStateUpdateTask() {
@Override @Override
public ClusterState execute(ClusterState currentState) { public ClusterState execute(ClusterState currentState) {
DiscoveryNodes.Builder builder = new DiscoveryNodes.Builder() DiscoveryNodes.Builder builder = new DiscoveryNodes.Builder()
@ -462,7 +463,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
logger.info("master_left [{}], reason [{}]", masterNode, reason); logger.info("master_left [{}], reason [{}]", masterNode, reason);
clusterService.submitStateUpdateTask("zen-disco-master_failed (" + masterNode + ")", new ProcessedClusterStateUpdateTask() { clusterService.submitStateUpdateTask("zen-disco-master_failed (" + masterNode + ")", Priority.HIGH, new ProcessedClusterStateUpdateTask() {
@Override @Override
public ClusterState execute(ClusterState currentState) { public ClusterState execute(ClusterState currentState) {
if (!masterNode.id().equals(currentState.nodes().masterNodeId())) { if (!masterNode.id().equals(currentState.nodes().masterNodeId())) {

View File

@ -0,0 +1,201 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.test.unit.common.util.concurrent;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.PrioritizedCallable;
import org.elasticsearch.common.util.concurrent.PrioritizedRunnable;
import org.testng.annotations.Test;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.PriorityBlockingQueue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
/**
*
*/
public class PrioritizedExecutorsTests {
@Test
public void testPriorityQueue() throws Exception {
PriorityBlockingQueue<Priority> queue = new PriorityBlockingQueue<Priority>();
queue.add(Priority.LANGUID);
queue.add(Priority.NORMAL);
queue.add(Priority.HIGH);
queue.add(Priority.LOW);
queue.add(Priority.URGENT);
assertThat(queue.poll(), equalTo(Priority.URGENT));
assertThat(queue.poll(), equalTo(Priority.HIGH));
assertThat(queue.poll(), equalTo(Priority.NORMAL));
assertThat(queue.poll(), equalTo(Priority.LOW));
assertThat(queue.poll(), equalTo(Priority.LANGUID));
}
@Test
public void testPrioritizedExecutorWithRunnables() throws Exception {
ExecutorService executor = EsExecutors.newSinglePrioritizingThreadExecutor(Executors.defaultThreadFactory());
List<Integer> results = new ArrayList<Integer>(7);
CountDownLatch awaitingLatch = new CountDownLatch(1);
CountDownLatch finishedLatch = new CountDownLatch(7);
executor.submit(new AwaitingJob(awaitingLatch));
executor.submit(new Job(6, Priority.LANGUID, results, finishedLatch));
executor.submit(new Job(4, Priority.LOW, results, finishedLatch));
executor.submit(new Job(1, Priority.HIGH, results, finishedLatch));
executor.submit(new Job(5, Priority.LOW, results, finishedLatch)); // will execute after the first LOW (fifo)
executor.submit(new Job(0, Priority.URGENT, results, finishedLatch));
executor.submit(new Job(3, Priority.NORMAL, results, finishedLatch));
executor.submit(new Job(2, Priority.HIGH, results, finishedLatch)); // will execute after the first HIGH (fifo)
awaitingLatch.countDown();
finishedLatch.await();
assertThat(results.size(), equalTo(7));
assertThat(results.get(0), equalTo(0));
assertThat(results.get(1), equalTo(1));
assertThat(results.get(2), equalTo(2));
assertThat(results.get(3), equalTo(3));
assertThat(results.get(4), equalTo(4));
assertThat(results.get(5), equalTo(5));
assertThat(results.get(6), equalTo(6));
}
@Test
public void testPrioritizedExecutorWithCallables() throws Exception {
ExecutorService executor = EsExecutors.newSinglePrioritizingThreadExecutor(Executors.defaultThreadFactory());
List<Integer> results = new ArrayList<Integer>(7);
CountDownLatch awaitingLatch = new CountDownLatch(1);
CountDownLatch finishedLatch = new CountDownLatch(7);
executor.submit(new AwaitingJob(awaitingLatch));
executor.submit(new CallableJob(6, Priority.LANGUID, results, finishedLatch));
executor.submit(new CallableJob(4, Priority.LOW, results, finishedLatch));
executor.submit(new CallableJob(1, Priority.HIGH, results, finishedLatch));
executor.submit(new CallableJob(5, Priority.LOW, results, finishedLatch)); // will execute after the first LOW (fifo)
executor.submit(new CallableJob(0, Priority.URGENT, results, finishedLatch));
executor.submit(new CallableJob(3, Priority.NORMAL, results, finishedLatch));
executor.submit(new CallableJob(2, Priority.HIGH, results, finishedLatch)); // will execute after the first HIGH (fifo)
awaitingLatch.countDown();
finishedLatch.await();
assertThat(results.size(), equalTo(7));
assertThat(results.get(0), equalTo(0));
assertThat(results.get(1), equalTo(1));
assertThat(results.get(2), equalTo(2));
assertThat(results.get(3), equalTo(3));
assertThat(results.get(4), equalTo(4));
assertThat(results.get(5), equalTo(5));
assertThat(results.get(6), equalTo(6));
}
@Test
public void testPrioritizedExecutorWithMixed() throws Exception {
ExecutorService executor = EsExecutors.newSinglePrioritizingThreadExecutor(Executors.defaultThreadFactory());
List<Integer> results = new ArrayList<Integer>(7);
CountDownLatch awaitingLatch = new CountDownLatch(1);
CountDownLatch finishedLatch = new CountDownLatch(7);
executor.submit(new AwaitingJob(awaitingLatch));
executor.submit(new CallableJob(6, Priority.LANGUID, results, finishedLatch));
executor.submit(new Job(4, Priority.LOW, results, finishedLatch));
executor.submit(new CallableJob(1, Priority.HIGH, results, finishedLatch));
executor.submit(new Job(5, Priority.LOW, results, finishedLatch)); // will execute after the first LOW (fifo)
executor.submit(new CallableJob(0, Priority.URGENT, results, finishedLatch));
executor.submit(new Job(3, Priority.NORMAL, results, finishedLatch));
executor.submit(new CallableJob(2, Priority.HIGH, results, finishedLatch)); // will execute after the first HIGH (fifo)
awaitingLatch.countDown();
finishedLatch.await();
assertThat(results.size(), equalTo(7));
assertThat(results.get(0), equalTo(0));
assertThat(results.get(1), equalTo(1));
assertThat(results.get(2), equalTo(2));
assertThat(results.get(3), equalTo(3));
assertThat(results.get(4), equalTo(4));
assertThat(results.get(5), equalTo(5));
assertThat(results.get(6), equalTo(6));
}
static class AwaitingJob extends PrioritizedRunnable {
private final CountDownLatch latch;
private AwaitingJob(CountDownLatch latch) {
super(Priority.URGENT);
this.latch = latch;
}
@Override
public void run() {
try {
latch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
static class Job extends PrioritizedRunnable {
private final int result;
private final List<Integer> results;
private final CountDownLatch latch;
Job(int result, Priority priority, List<Integer> results, CountDownLatch latch) {
super(priority);
this.result = result;
this.results = results;
this.latch = latch;
}
@Override
public void run() {
results.add(result);
latch.countDown();
}
}
static class CallableJob extends PrioritizedCallable<Integer> {
private final int result;
private final List<Integer> results;
private final CountDownLatch latch;
CallableJob(int result, Priority priority, List<Integer> results, CountDownLatch latch) {
super(priority);
this.result = result;
this.results = results;
this.latch = latch;
}
@Override
public Integer call() throws Exception {
results.add(result);
latch.countDown();
return result;
}
}
}