diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/replication/TransportShardReplicationOperationAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/replication/TransportShardReplicationOperationAction.java index 539187c8fa2..5a58a5a9689 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/replication/TransportShardReplicationOperationAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/replication/TransportShardReplicationOperationAction.java @@ -20,6 +20,7 @@ package org.elasticsearch.action.support.replication; import org.elasticsearch.ElasticSearchException; +import org.elasticsearch.ElasticSearchIllegalStateException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionResponse; @@ -332,6 +333,18 @@ public abstract class TransportShardReplicationOperationAction { void add(TimeValue timeout, TimeoutClusterStateListener listener); - void remove(TimeoutClusterStateListener listener); - void submitStateUpdateTask(final String source, final ClusterStateUpdateTask updateTask); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/TimeoutClusterStateListener.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/TimeoutClusterStateListener.java index db47c0cc8bf..7e4e8291ba2 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/TimeoutClusterStateListener.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/TimeoutClusterStateListener.java @@ -22,9 +22,15 @@ package org.elasticsearch.cluster; import org.elasticsearch.common.unit.TimeValue; /** - * @author kimchy (Shay Banon) + * An exception to cluster state listener that allows for timeouts and for post added notifications. + * + * @author kimchy (shay.banon) */ public interface TimeoutClusterStateListener extends ClusterStateListener { + void postAdded(); + + void onClose(); + void onTimeout(TimeValue timeout); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java index 3382ffe5474..b782b9c5c73 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java @@ -23,23 +23,27 @@ import org.elasticsearch.ElasticSearchException; import org.elasticsearch.cluster.*; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.timer.Timeout; +import org.elasticsearch.common.timer.TimerTask; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.jsr166y.LinkedTransferQueue; import org.elasticsearch.discovery.DiscoveryService; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.timer.TimerService; import org.elasticsearch.transport.TransportService; import java.util.List; +import java.util.Queue; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutorService; -import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import static java.util.concurrent.Executors.*; import static org.elasticsearch.cluster.ClusterState.*; -import static org.elasticsearch.common.unit.TimeValue.*; import static org.elasticsearch.common.util.concurrent.EsExecutors.*; /** @@ -47,10 +51,10 @@ import static org.elasticsearch.common.util.concurrent.EsExecutors.*; */ public class InternalClusterService extends AbstractLifecycleComponent implements ClusterService { - private final TimeValue timeoutInterval; - private final ThreadPool threadPool; + private final TimerService timerService; + private final DiscoveryService discoveryService; private final TransportService transportService; @@ -59,45 +63,28 @@ public class InternalClusterService extends AbstractLifecycleComponent clusterStateListeners = new CopyOnWriteArrayList(); - private final List clusterStateTimeoutListeners = new CopyOnWriteArrayList(); - - private volatile ScheduledFuture scheduledFuture; + private final Queue> onGoingTimeouts = new LinkedTransferQueue>(); private volatile ClusterState clusterState = newClusterStateBuilder().build(); - @Inject public InternalClusterService(Settings settings, DiscoveryService discoveryService, TransportService transportService, ThreadPool threadPool) { + @Inject public InternalClusterService(Settings settings, DiscoveryService discoveryService, TransportService transportService, ThreadPool threadPool, + TimerService timerService) { super(settings); this.transportService = transportService; this.discoveryService = discoveryService; this.threadPool = threadPool; - - this.timeoutInterval = componentSettings.getAsTime("timeout_interval", timeValueMillis(500)); + this.timerService = timerService; } @Override protected void doStart() throws ElasticSearchException { this.clusterState = newClusterStateBuilder().build(); this.updateTasksExecutor = newSingleThreadExecutor(daemonThreadFactory(settings, "clusterService#updateTask")); - scheduledFuture = threadPool.scheduleWithFixedDelay(new Runnable() { - @Override public void run() { - long timestamp = System.currentTimeMillis(); - for (final TimeoutHolder holder : clusterStateTimeoutListeners) { - if ((timestamp - holder.timestamp) > holder.timeout.millis()) { - clusterStateTimeoutListeners.remove(holder); - InternalClusterService.this.threadPool.execute(new Runnable() { - @Override public void run() { - holder.listener.onTimeout(holder.timeout); - } - }); - } - } - } - }, timeoutInterval); } @Override protected void doStop() throws ElasticSearchException { - scheduledFuture.cancel(false); - for (TimeoutHolder holder : clusterStateTimeoutListeners) { - holder.listener.onTimeout(holder.timeout); + for (Tuple onGoingTimeout : onGoingTimeouts) { + onGoingTimeout.v1().cancel(); + onGoingTimeout.v2().onClose(); } updateTasksExecutor.shutdown(); try { @@ -122,12 +109,16 @@ public class InternalClusterService extends AbstractLifecycleComponent(timerTimeout, listener)); + clusterStateListeners.add(listener); + // call the post added notification on the same event thread + updateTasksExecutor.execute(new Runnable() { + @Override public void run() { + listener.postAdded(); + } + }); } public void submitStateUpdateTask(final String source, final ClusterStateUpdateTask updateTask) { @@ -194,9 +185,6 @@ public class InternalClusterService extends AbstractLifecycleComponent implem @Override protected void doStop() throws ElasticSearchException { transports.remove(localAddress); // now, go over all the transports connected to me, and raise disconnected event - for (LocalTransport targetTransport : transports.values()) { - for (Map.Entry entry : targetTransport.connectedNodes.entrySet()) { + for (final LocalTransport targetTransport : transports.values()) { + for (final Map.Entry entry : targetTransport.connectedNodes.entrySet()) { if (entry.getValue() == this) { - targetTransport.disconnectFromNode(entry.getKey()); + targetTransport.threadPool().cached().execute(new Runnable() { + @Override public void run() { + targetTransport.disconnectFromNode(entry.getKey()); + } + }); } } } diff --git a/modules/test/integration/src/test/java/org/elasticsearch/test/integration/datanode/SimpleDataNodesTests.java b/modules/test/integration/src/test/java/org/elasticsearch/test/integration/datanode/SimpleDataNodesTests.java index cede7f0e099..967180e807f 100644 --- a/modules/test/integration/src/test/java/org/elasticsearch/test/integration/datanode/SimpleDataNodesTests.java +++ b/modules/test/integration/src/test/java/org/elasticsearch/test/integration/datanode/SimpleDataNodesTests.java @@ -33,7 +33,7 @@ import static org.hamcrest.MatcherAssert.*; import static org.hamcrest.Matchers.*; /** - * @author kimchy (Shay Banon) + * @author kimchy (shay.banon) */ public class SimpleDataNodesTests extends AbstractNodesTests {