improve timeout cluster service listener
This commit is contained in:
parent
4ac1409afa
commit
3c8cf68a17
|
@ -20,6 +20,7 @@
|
|||
package org.elasticsearch.action.support.replication;
|
||||
|
||||
import org.elasticsearch.ElasticSearchException;
|
||||
import org.elasticsearch.ElasticSearchIllegalStateException;
|
||||
import org.elasticsearch.ExceptionsHelper;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.ActionResponse;
|
||||
|
@ -332,6 +333,18 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
|
|||
// make it threaded operation so we fork on the discovery listener thread
|
||||
request.operationThreaded(true);
|
||||
clusterService.add(request.timeout(), new TimeoutClusterStateListener() {
|
||||
@Override public void postAdded() {
|
||||
if (start(true)) {
|
||||
// if we managed to start and perform the operation on the primary, we can remove this listener
|
||||
clusterService.remove(this);
|
||||
}
|
||||
}
|
||||
|
||||
@Override public void onClose() {
|
||||
clusterService.remove(this);
|
||||
listener.onFailure(new ElasticSearchIllegalStateException("node is shutting down"));
|
||||
}
|
||||
|
||||
@Override public void clusterChanged(ClusterChangedEvent event) {
|
||||
if (start(true)) {
|
||||
// if we managed to start and perform the operation on the primary, we can remove this listener
|
||||
|
@ -345,6 +358,7 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
|
|||
clusterService.remove(this);
|
||||
return;
|
||||
}
|
||||
clusterService.remove(this);
|
||||
final PrimaryNotStartedActionException failure = new PrimaryNotStartedActionException(shardId, "Timeout waiting for [" + timeValue + "]");
|
||||
if (request.listenerThreaded()) {
|
||||
threadPool.execute(new Runnable() {
|
||||
|
|
|
@ -35,7 +35,5 @@ public interface ClusterService extends LifecycleComponent<ClusterService> {
|
|||
|
||||
void add(TimeValue timeout, TimeoutClusterStateListener listener);
|
||||
|
||||
void remove(TimeoutClusterStateListener listener);
|
||||
|
||||
void submitStateUpdateTask(final String source, final ClusterStateUpdateTask updateTask);
|
||||
}
|
||||
|
|
|
@ -22,9 +22,15 @@ package org.elasticsearch.cluster;
|
|||
import org.elasticsearch.common.unit.TimeValue;
|
||||
|
||||
/**
|
||||
* @author kimchy (Shay Banon)
|
||||
* An exception to cluster state listener that allows for timeouts and for post added notifications.
|
||||
*
|
||||
* @author kimchy (shay.banon)
|
||||
*/
|
||||
public interface TimeoutClusterStateListener extends ClusterStateListener {
|
||||
|
||||
void postAdded();
|
||||
|
||||
void onClose();
|
||||
|
||||
void onTimeout(TimeValue timeout);
|
||||
}
|
||||
|
|
|
@ -23,23 +23,27 @@ import org.elasticsearch.ElasticSearchException;
|
|||
import org.elasticsearch.cluster.*;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
import org.elasticsearch.common.collect.Tuple;
|
||||
import org.elasticsearch.common.component.AbstractLifecycleComponent;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.timer.Timeout;
|
||||
import org.elasticsearch.common.timer.TimerTask;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.util.concurrent.jsr166y.LinkedTransferQueue;
|
||||
import org.elasticsearch.discovery.DiscoveryService;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.timer.TimerService;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Queue;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.ScheduledFuture;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static java.util.concurrent.Executors.*;
|
||||
import static org.elasticsearch.cluster.ClusterState.*;
|
||||
import static org.elasticsearch.common.unit.TimeValue.*;
|
||||
import static org.elasticsearch.common.util.concurrent.EsExecutors.*;
|
||||
|
||||
/**
|
||||
|
@ -47,10 +51,10 @@ import static org.elasticsearch.common.util.concurrent.EsExecutors.*;
|
|||
*/
|
||||
public class InternalClusterService extends AbstractLifecycleComponent<ClusterService> implements ClusterService {
|
||||
|
||||
private final TimeValue timeoutInterval;
|
||||
|
||||
private final ThreadPool threadPool;
|
||||
|
||||
private final TimerService timerService;
|
||||
|
||||
private final DiscoveryService discoveryService;
|
||||
|
||||
private final TransportService transportService;
|
||||
|
@ -59,45 +63,28 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
|
|||
|
||||
private final List<ClusterStateListener> clusterStateListeners = new CopyOnWriteArrayList<ClusterStateListener>();
|
||||
|
||||
private final List<TimeoutHolder> clusterStateTimeoutListeners = new CopyOnWriteArrayList<TimeoutHolder>();
|
||||
|
||||
private volatile ScheduledFuture scheduledFuture;
|
||||
private final Queue<Tuple<Timeout, TimeoutClusterStateListener>> onGoingTimeouts = new LinkedTransferQueue<Tuple<Timeout, TimeoutClusterStateListener>>();
|
||||
|
||||
private volatile ClusterState clusterState = newClusterStateBuilder().build();
|
||||
|
||||
@Inject public InternalClusterService(Settings settings, DiscoveryService discoveryService, TransportService transportService, ThreadPool threadPool) {
|
||||
@Inject public InternalClusterService(Settings settings, DiscoveryService discoveryService, TransportService transportService, ThreadPool threadPool,
|
||||
TimerService timerService) {
|
||||
super(settings);
|
||||
this.transportService = transportService;
|
||||
this.discoveryService = discoveryService;
|
||||
this.threadPool = threadPool;
|
||||
|
||||
this.timeoutInterval = componentSettings.getAsTime("timeout_interval", timeValueMillis(500));
|
||||
this.timerService = timerService;
|
||||
}
|
||||
|
||||
@Override protected void doStart() throws ElasticSearchException {
|
||||
this.clusterState = newClusterStateBuilder().build();
|
||||
this.updateTasksExecutor = newSingleThreadExecutor(daemonThreadFactory(settings, "clusterService#updateTask"));
|
||||
scheduledFuture = threadPool.scheduleWithFixedDelay(new Runnable() {
|
||||
@Override public void run() {
|
||||
long timestamp = System.currentTimeMillis();
|
||||
for (final TimeoutHolder holder : clusterStateTimeoutListeners) {
|
||||
if ((timestamp - holder.timestamp) > holder.timeout.millis()) {
|
||||
clusterStateTimeoutListeners.remove(holder);
|
||||
InternalClusterService.this.threadPool.execute(new Runnable() {
|
||||
@Override public void run() {
|
||||
holder.listener.onTimeout(holder.timeout);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
}, timeoutInterval);
|
||||
}
|
||||
|
||||
@Override protected void doStop() throws ElasticSearchException {
|
||||
scheduledFuture.cancel(false);
|
||||
for (TimeoutHolder holder : clusterStateTimeoutListeners) {
|
||||
holder.listener.onTimeout(holder.timeout);
|
||||
for (Tuple<Timeout, TimeoutClusterStateListener> onGoingTimeout : onGoingTimeouts) {
|
||||
onGoingTimeout.v1().cancel();
|
||||
onGoingTimeout.v2().onClose();
|
||||
}
|
||||
updateTasksExecutor.shutdown();
|
||||
try {
|
||||
|
@ -122,12 +109,16 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
|
|||
clusterStateListeners.remove(listener);
|
||||
}
|
||||
|
||||
public void add(TimeValue timeout, TimeoutClusterStateListener listener) {
|
||||
clusterStateTimeoutListeners.add(new TimeoutHolder(listener, System.currentTimeMillis(), timeout));
|
||||
public void add(TimeValue timeout, final TimeoutClusterStateListener listener) {
|
||||
Timeout timerTimeout = timerService.newTimeout(new NotifyTimeout(listener, timeout), timeout, TimerService.ExecutionType.THREADED);
|
||||
onGoingTimeouts.add(new Tuple<Timeout, TimeoutClusterStateListener>(timerTimeout, listener));
|
||||
clusterStateListeners.add(listener);
|
||||
// call the post added notification on the same event thread
|
||||
updateTasksExecutor.execute(new Runnable() {
|
||||
@Override public void run() {
|
||||
listener.postAdded();
|
||||
}
|
||||
|
||||
public void remove(TimeoutClusterStateListener listener) {
|
||||
clusterStateTimeoutListeners.remove(new TimeoutHolder(listener, -1, null));
|
||||
});
|
||||
}
|
||||
|
||||
public void submitStateUpdateTask(final String source, final ClusterStateUpdateTask updateTask) {
|
||||
|
@ -194,9 +185,6 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
|
|||
}
|
||||
}
|
||||
|
||||
for (TimeoutHolder timeoutHolder : clusterStateTimeoutListeners) {
|
||||
timeoutHolder.listener.clusterChanged(clusterChangedEvent);
|
||||
}
|
||||
for (ClusterStateListener listener : clusterStateListeners) {
|
||||
listener.clusterChanged(clusterChangedEvent);
|
||||
}
|
||||
|
@ -226,23 +214,21 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
|
|||
});
|
||||
}
|
||||
|
||||
private static class TimeoutHolder {
|
||||
private class NotifyTimeout implements TimerTask {
|
||||
final TimeoutClusterStateListener listener;
|
||||
final long timestamp;
|
||||
final TimeValue timeout;
|
||||
|
||||
private TimeoutHolder(TimeoutClusterStateListener listener, long timestamp, TimeValue timeout) {
|
||||
private NotifyTimeout(TimeoutClusterStateListener listener, TimeValue timeout) {
|
||||
this.listener = listener;
|
||||
this.timestamp = timestamp;
|
||||
this.timeout = timeout;
|
||||
}
|
||||
|
||||
@Override public int hashCode() {
|
||||
return listener.hashCode();
|
||||
@Override public void run(Timeout timeout) throws Exception {
|
||||
if (timeout.isCancelled()) {
|
||||
return;
|
||||
}
|
||||
|
||||
@Override public boolean equals(Object obj) {
|
||||
return ((TimeoutHolder) obj).listener == listener;
|
||||
listener.onTimeout(this.timeout);
|
||||
// note, we rely on the listener to remove itself in case of timeout if needed
|
||||
}
|
||||
}
|
||||
}
|
|
@ -98,4 +98,10 @@ class RecoveryFileChunkRequest implements Streamable {
|
|||
out.writeVInt(contentLength);
|
||||
out.writeBytes(content, 0, contentLength);
|
||||
}
|
||||
|
||||
@Override public String toString() {
|
||||
return shardId + ": name='" + name + '\'' +
|
||||
", position=" + position +
|
||||
", length=" + length;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -20,6 +20,7 @@
|
|||
package org.elasticsearch.index.shard.recovery;
|
||||
|
||||
import org.apache.lucene.store.IndexOutput;
|
||||
import org.elasticsearch.ElasticSearchIllegalStateException;
|
||||
import org.elasticsearch.ExceptionsHelper;
|
||||
import org.elasticsearch.common.StopWatch;
|
||||
import org.elasticsearch.common.component.AbstractComponent;
|
||||
|
@ -338,6 +339,9 @@ public class RecoveryTarget extends AbstractComponent {
|
|||
} else {
|
||||
indexOutput = onGoingRecovery.openIndexOutputs.get(request.name());
|
||||
}
|
||||
if (indexOutput == null) {
|
||||
throw new ElasticSearchIllegalStateException("No ongoing output file to write to, request: " + request);
|
||||
}
|
||||
synchronized (indexOutput) {
|
||||
try {
|
||||
indexOutput.writeBytes(request.content(), request.contentLength());
|
||||
|
|
|
@ -107,7 +107,7 @@ public class TimerService extends AbstractComponent {
|
|||
}
|
||||
|
||||
@Override public void run(final Timeout timeout) throws Exception {
|
||||
threadPool.execute(new Runnable() {
|
||||
threadPool.cached().execute(new Runnable() {
|
||||
@Override public void run() {
|
||||
try {
|
||||
task.run(timeout);
|
||||
|
|
|
@ -87,11 +87,15 @@ public class LocalTransport extends AbstractLifecycleComponent<Transport> implem
|
|||
@Override protected void doStop() throws ElasticSearchException {
|
||||
transports.remove(localAddress);
|
||||
// now, go over all the transports connected to me, and raise disconnected event
|
||||
for (LocalTransport targetTransport : transports.values()) {
|
||||
for (Map.Entry<DiscoveryNode, LocalTransport> entry : targetTransport.connectedNodes.entrySet()) {
|
||||
for (final LocalTransport targetTransport : transports.values()) {
|
||||
for (final Map.Entry<DiscoveryNode, LocalTransport> entry : targetTransport.connectedNodes.entrySet()) {
|
||||
if (entry.getValue() == this) {
|
||||
targetTransport.threadPool().cached().execute(new Runnable() {
|
||||
@Override public void run() {
|
||||
targetTransport.disconnectFromNode(entry.getKey());
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -33,7 +33,7 @@ import static org.hamcrest.MatcherAssert.*;
|
|||
import static org.hamcrest.Matchers.*;
|
||||
|
||||
/**
|
||||
* @author kimchy (Shay Banon)
|
||||
* @author kimchy (shay.banon)
|
||||
*/
|
||||
public class SimpleDataNodesTests extends AbstractNodesTests {
|
||||
|
||||
|
|
Loading…
Reference in New Issue