move to use scheduled TP and not timer service

This commit is contained in:
kimchy 2011-02-09 16:47:04 +02:00
parent ccb30d42e9
commit 27d6c71d5b
4 changed files with 43 additions and 39 deletions

View File

@ -34,14 +34,13 @@ import org.elasticsearch.common.collect.Sets;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.timer.Timeout;
import org.elasticsearch.common.timer.TimerTask;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.Index;
import org.elasticsearch.indices.IndexMissingException;
import org.elasticsearch.timer.TimerService;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.Set;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@ -53,7 +52,7 @@ import static org.elasticsearch.cluster.metadata.MetaData.*;
*/
public class MetaDataDeleteIndexService extends AbstractComponent {
private final TimerService timerService;
private final ThreadPool threadPool;
private final ClusterService clusterService;
@ -61,10 +60,10 @@ public class MetaDataDeleteIndexService extends AbstractComponent {
private final NodeIndexDeletedAction nodeIndexDeletedAction;
@Inject public MetaDataDeleteIndexService(Settings settings, TimerService timerService, ClusterService clusterService, ShardsAllocation shardsAllocation,
@Inject public MetaDataDeleteIndexService(Settings settings, ThreadPool threadPool, ClusterService clusterService, ShardsAllocation shardsAllocation,
NodeIndexDeletedAction nodeIndexDeletedAction) {
super(settings);
this.timerService = timerService;
this.threadPool = threadPool;
this.clusterService = clusterService;
this.shardsAllocation = shardsAllocation;
this.nodeIndexDeletedAction = nodeIndexDeletedAction;
@ -130,16 +129,14 @@ public class MetaDataDeleteIndexService extends AbstractComponent {
};
nodeIndexDeletedAction.add(nodeIndexDeleteListener);
Timeout timeoutTask = timerService.newTimeout(new TimerTask() {
@Override public void run(Timeout timeout) throws Exception {
threadPool.schedule(new Runnable() {
@Override public void run() {
listener.onResponse(new Response(false));
nodeIndexDeletedAction.remove(nodeIndexDeleteListener);
}
}, request.timeout, TimerService.ExecutionType.THREADED);
listener.timeout = timeoutTask;
}, request.timeout, ThreadPool.ExecutionType.DEFAULT);
}
return newClusterStateBuilder().state(currentState).routingResult(routingResult).metaData(newMetaData).blocks(blocks).build();
} catch (Exception e) {
listener.onFailure(e);
@ -157,7 +154,7 @@ public class MetaDataDeleteIndexService extends AbstractComponent {
private final Listener listener;
volatile Timeout timeout;
volatile ScheduledFuture future;
private DeleteIndexListener(Request request, Listener listener) {
this.request = request;
@ -166,8 +163,8 @@ public class MetaDataDeleteIndexService extends AbstractComponent {
@Override public void onResponse(final Response response) {
if (notified.compareAndSet(false, true)) {
if (timeout != null) {
timeout.cancel();
if (future != null) {
future.cancel(false);
}
listener.onResponse(response);
}
@ -175,8 +172,8 @@ public class MetaDataDeleteIndexService extends AbstractComponent {
@Override public void onFailure(Throwable t) {
if (notified.compareAndSet(false, true)) {
if (timeout != null) {
timeout.cancel();
if (future != null) {
future.cancel(false);
}
listener.onFailure(t);
}

View File

@ -27,17 +27,13 @@ import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.operation.OperationRouting;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.timer.Timeout;
import org.elasticsearch.common.timer.TimerTask;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.jsr166y.LinkedTransferQueue;
import org.elasticsearch.discovery.DiscoveryService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.timer.TimerService;
import org.elasticsearch.transport.TransportService;
import java.util.Iterator;
@ -59,8 +55,6 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
private final ThreadPool threadPool;
private final TimerService timerService;
private final DiscoveryService discoveryService;
private final OperationRouting operationRouting;
@ -73,7 +67,7 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
private final List<ClusterStateListener> clusterStateListeners = new CopyOnWriteArrayList<ClusterStateListener>();
private final Queue<Tuple<Timeout, NotifyTimeout>> onGoingTimeouts = new LinkedTransferQueue<Tuple<Timeout, NotifyTimeout>>();
private final Queue<NotifyTimeout> onGoingTimeouts = new LinkedTransferQueue<NotifyTimeout>();
private volatile ClusterState clusterState = newClusterStateBuilder().build();
@ -81,14 +75,12 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
private volatile ScheduledFuture reconnectToNodes;
@Inject public InternalClusterService(Settings settings, DiscoveryService discoveryService, OperationRouting operationRouting, TransportService transportService, ThreadPool threadPool,
TimerService timerService) {
@Inject public InternalClusterService(Settings settings, DiscoveryService discoveryService, OperationRouting operationRouting, TransportService transportService, ThreadPool threadPool) {
super(settings);
this.operationRouting = operationRouting;
this.transportService = transportService;
this.discoveryService = discoveryService;
this.threadPool = threadPool;
this.timerService = timerService;
this.reconnectInterval = componentSettings.getAsTime("reconnect_interval", TimeValue.timeValueSeconds(10));
}
@ -108,9 +100,9 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
@Override protected void doStop() throws ElasticSearchException {
this.reconnectToNodes.cancel(true);
for (Tuple<Timeout, NotifyTimeout> onGoingTimeout : onGoingTimeouts) {
onGoingTimeout.v1().cancel();
onGoingTimeout.v2().listener.onClose();
for (NotifyTimeout onGoingTimeout : onGoingTimeouts) {
onGoingTimeout.cancel();
onGoingTimeout.listener.onClose();
}
updateTasksExecutor.shutdown();
try {
@ -141,10 +133,10 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
public void remove(ClusterStateListener listener) {
clusterStateListeners.remove(listener);
for (Iterator<Tuple<Timeout, NotifyTimeout>> it = onGoingTimeouts.iterator(); it.hasNext();) {
Tuple<Timeout, NotifyTimeout> tuple = it.next();
if (tuple.v2().listener.equals(listener)) {
tuple.v1().cancel();
for (Iterator<NotifyTimeout> it = onGoingTimeouts.iterator(); it.hasNext();) {
NotifyTimeout timeout = it.next();
if (timeout.listener.equals(listener)) {
timeout.cancel();
it.remove();
}
}
@ -156,8 +148,8 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
return;
}
NotifyTimeout notifyTimeout = new NotifyTimeout(listener, timeout);
Timeout timerTimeout = timerService.newTimeout(notifyTimeout, timeout, TimerService.ExecutionType.THREADED);
onGoingTimeouts.add(new Tuple<Timeout, NotifyTimeout>(timerTimeout, notifyTimeout));
notifyTimeout.future = threadPool.schedule(notifyTimeout, timeout, ThreadPool.ExecutionType.THREADED);
onGoingTimeouts.add(notifyTimeout);
clusterStateListeners.add(listener);
// call the post added notification on the same event thread
updateTasksExecutor.execute(new Runnable() {
@ -265,17 +257,22 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
});
}
private class NotifyTimeout implements TimerTask {
class NotifyTimeout implements Runnable {
final TimeoutClusterStateListener listener;
final TimeValue timeout;
ScheduledFuture future;
private NotifyTimeout(TimeoutClusterStateListener listener, TimeValue timeout) {
NotifyTimeout(TimeoutClusterStateListener listener, TimeValue timeout) {
this.listener = listener;
this.timeout = timeout;
}
@Override public void run(Timeout timeout) throws Exception {
if (timeout.isCancelled()) {
public void cancel() {
future.cancel(false);
}
@Override public void run() {
if (future.isCancelled()) {
return;
}
if (lifecycle.stoppedOrClosed()) {

View File

@ -109,6 +109,11 @@ public interface ThreadPool extends Executor {
*/
ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, TimeValue interval);
/**
* Returns an estimated current time in milliseconds.
*/
long estimatedCurrentTimeInMillis();
static enum ExecutionType {
DEFAULT,
THREADED

View File

@ -107,6 +107,11 @@ public abstract class AbstractThreadPool extends AbstractComponent implements Th
return scheduledExecutorService.schedule(command, delay.millis(), TimeUnit.MILLISECONDS);
}
@Override public long estimatedCurrentTimeInMillis() {
return System.currentTimeMillis();
}
@Override public void execute(Runnable command) {
executorService.execute(command);
}