move to use scheduled TP and not timer service

This commit is contained in:
kimchy 2011-02-09 17:21:16 +02:00
parent 27d6c71d5b
commit 9f2afeb4ca
11 changed files with 62 additions and 74 deletions

View File

@ -26,10 +26,6 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
*/
public class ReceiveTimeoutTransportException extends ActionTransportException {
public ReceiveTimeoutTransportException(DiscoveryNode node, String action) {
super(node.name(), node.address(), action, null);
}
public ReceiveTimeoutTransportException(DiscoveryNode node, String action, String msg) {
super(node.name(), node.address(), action, msg, null);
}

View File

@ -25,20 +25,18 @@ import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.timer.Timeout;
import org.elasticsearch.common.timer.TimerTask;
import org.elasticsearch.common.transport.BoundTransportAddress;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.ConcurrentMapLong;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.timer.TimerService;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicLong;
import static org.elasticsearch.common.settings.ImmutableSettings.Builder.*;
@ -53,8 +51,6 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
private final ThreadPool threadPool;
private final TimerService timerService;
final ConcurrentMap<String, TransportRequestHandler> serverHandlers = newConcurrentMap();
final ConcurrentMapLong<RequestHolder> clientHandlers = ConcurrentCollections.newConcurrentMapLong();
@ -78,15 +74,14 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
private boolean throwConnectException = false;
public TransportService(Transport transport, ThreadPool threadPool, TimerService timerService) {
this(EMPTY_SETTINGS, transport, threadPool, timerService);
public TransportService(Transport transport, ThreadPool threadPool) {
this(EMPTY_SETTINGS, transport, threadPool);
}
@Inject public TransportService(Settings settings, Transport transport, ThreadPool threadPool, TimerService timerService) {
@Inject public TransportService(Settings settings, Transport transport, ThreadPool threadPool) {
super(settings);
this.transport = transport;
this.threadPool = threadPool;
this.timerService = timerService;
}
@Override protected void doStart() throws ElasticSearchException {
@ -173,19 +168,20 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
public <T extends Streamable> void sendRequest(final DiscoveryNode node, final String action, final Streamable message,
final TransportRequestOptions options, final TransportResponseHandler<T> handler) throws TransportException {
final long requestId = newRequestId();
Timeout timeoutX = null;
TimeoutHandler timeoutHandler = null;
try {
if (options.timeout() != null) {
timeoutX = timerService.newTimeout(new TimeoutTimerTask(requestId), options.timeout(), TimerService.ExecutionType.THREADED);
timeoutHandler = new TimeoutHandler(requestId);
timeoutHandler.future = threadPool.schedule(timeoutHandler, options.timeout(), ThreadPool.ExecutionType.THREADED);
}
clientHandlers.put(requestId, new RequestHolder<T>(handler, node, action, timeoutX));
clientHandlers.put(requestId, new RequestHolder<T>(handler, node, action, timeoutHandler));
transport.sendRequest(node, requestId, action, message, options);
} catch (final Exception e) {
// usually happen either because we failed to connect to the node
// or because we failed serializing the message
clientHandlers.remove(requestId);
if (timeoutX != null) {
timeoutX.cancel();
if (timeoutHandler != null) {
timeoutHandler.future.cancel(false);
}
if (throwConnectException) {
if (e instanceof ConnectTransportException) {
@ -248,15 +244,14 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
// lets see if its in the timeout holder
TimeoutInfoHolder timeoutInfoHolder = timeoutInfoHandlers.remove(requestId);
if (timeoutInfoHolder != null) {
logger.warn("Received response for a request that has timed out, action [{}], node [{}], id [{}]", timeoutInfoHolder.action(), timeoutInfoHolder.node(), requestId);
long time = System.currentTimeMillis();
logger.warn("Received response for a request that has timed out, sent [{}ms] ago, timed out [{}ms] ago, action [{}], node [{}], id [{}]", time - timeoutInfoHolder.sentTime(), time - timeoutInfoHolder.timeoutTime(), timeoutInfoHolder.action(), timeoutInfoHolder.node(), requestId);
} else {
logger.warn("Transport response handler not found of id [{}]", requestId);
}
return null;
}
if (holder.timeout() != null) {
holder.timeout().cancel();
}
holder.cancel();
return holder.handler();
}
@ -297,23 +292,32 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
}
}
class TimeoutTimerTask implements TimerTask {
class TimeoutHandler implements Runnable {
private final long requestId;
TimeoutTimerTask(long requestId) {
private final long sentTime = System.currentTimeMillis();
ScheduledFuture future;
TimeoutHandler(long requestId) {
this.requestId = requestId;
}
@Override public void run(Timeout timeout) throws Exception {
if (timeout.isCancelled()) {
public long sentTime() {
return sentTime;
}
@Override public void run() {
if (future.isCancelled()) {
return;
}
final RequestHolder holder = clientHandlers.remove(requestId);
if (holder != null) {
// add it to the timeout information holder, in case we are going to get a response later
timeoutInfoHandlers.put(requestId, new TimeoutInfoHolder(holder.node(), holder.action()));
holder.handler().handleException(new ReceiveTimeoutTransportException(holder.node(), holder.action(), "request_id [" + requestId + "]"));
long timeoutTime = System.currentTimeMillis();
timeoutInfoHandlers.put(requestId, new TimeoutInfoHolder(holder.node(), holder.action(), sentTime, timeoutTime));
holder.handler().handleException(new ReceiveTimeoutTransportException(holder.node(), holder.action(), "request_id [" + requestId + "] timed out after [" + (timeoutTime - sentTime) + "ms]"));
}
}
}
@ -325,9 +329,15 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
private final String action;
TimeoutInfoHolder(DiscoveryNode node, String action) {
private final long sentTime;
private final long timeoutTime;
TimeoutInfoHolder(DiscoveryNode node, String action, long sentTime, long timeoutTime) {
this.node = node;
this.action = action;
this.sentTime = sentTime;
this.timeoutTime = timeoutTime;
}
public DiscoveryNode node() {
@ -337,6 +347,14 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
public String action() {
return action;
}
public long sentTime() {
return sentTime;
}
public long timeoutTime() {
return timeoutTime;
}
}
static class RequestHolder<T extends Streamable> {
@ -347,9 +365,9 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
private final String action;
private final Timeout timeout;
private final TimeoutHandler timeout;
RequestHolder(TransportResponseHandler<T> handler, DiscoveryNode node, String action, Timeout timeout) {
RequestHolder(TransportResponseHandler<T> handler, DiscoveryNode node, String action, TimeoutHandler timeout) {
this.handler = handler;
this.node = node;
this.action = action;
@ -368,8 +386,10 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
return this.action;
}
public Timeout timeout() {
return timeout;
public void cancel() {
if (timeout != null) {
timeout.future.cancel(false);
}
}
}
}

View File

@ -27,7 +27,6 @@ import org.elasticsearch.discovery.zen.DiscoveryNodesProvider;
import org.elasticsearch.discovery.zen.ping.ZenPing;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.threadpool.cached.CachedThreadPool;
import org.elasticsearch.timer.TimerService;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.transport.local.LocalTransport;
import org.testng.annotations.Test;
@ -43,12 +42,11 @@ public class MulticastZenPingTests {
@Test public void testSimplePings() {
ThreadPool threadPool = new CachedThreadPool();
TimerService timerService = new TimerService(threadPool);
ClusterName clusterName = new ClusterName("test");
final TransportService transportServiceA = new TransportService(new LocalTransport(threadPool), threadPool, timerService).start();
final TransportService transportServiceA = new TransportService(new LocalTransport(threadPool), threadPool).start();
final DiscoveryNode nodeA = new DiscoveryNode("A", transportServiceA.boundAddress().publishAddress());
final TransportService transportServiceB = new TransportService(new LocalTransport(threadPool), threadPool, timerService).start();
final TransportService transportServiceB = new TransportService(new LocalTransport(threadPool), threadPool).start();
final DiscoveryNode nodeB = new DiscoveryNode("B", transportServiceA.boundAddress().publishAddress());
MulticastZenPing zenPingA = new MulticastZenPing(threadPool, transportServiceA, clusterName);

View File

@ -48,13 +48,13 @@ public class UnicastZenPingTests {
TimerService timerService = new TimerService(threadPool);
ClusterName clusterName = new ClusterName("test");
NettyTransport transportA = new NettyTransport(threadPool);
final TransportService transportServiceA = new TransportService(transportA, threadPool, timerService).start();
final TransportService transportServiceA = new TransportService(transportA, threadPool).start();
final DiscoveryNode nodeA = new DiscoveryNode("A", transportServiceA.boundAddress().publishAddress());
InetSocketTransportAddress addressA = (InetSocketTransportAddress) transportA.boundAddress().publishAddress();
NettyTransport transportB = new NettyTransport(threadPool);
final TransportService transportServiceB = new TransportService(transportB, threadPool, timerService).start();
final TransportService transportServiceB = new TransportService(transportB, threadPool).start();
final DiscoveryNode nodeB = new DiscoveryNode("B", transportServiceA.boundAddress().publishAddress());
InetSocketTransportAddress addressB = (InetSocketTransportAddress) transportB.boundAddress().publishAddress();

View File

@ -26,7 +26,6 @@ import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.threadpool.cached.CachedThreadPool;
import org.elasticsearch.timer.TimerService;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@ -45,7 +44,6 @@ import static org.hamcrest.Matchers.*;
public abstract class AbstractSimpleTransportTests {
protected ThreadPool threadPool;
protected TimerService timerService;
protected TransportService serviceA;
protected TransportService serviceB;
@ -54,7 +52,6 @@ public abstract class AbstractSimpleTransportTests {
@BeforeMethod public void setUp() {
threadPool = new CachedThreadPool();
timerService = new TimerService(threadPool);
build();
serviceA.connectToNode(serviceBNode);
serviceB.connectToNode(serviceANode);
@ -76,7 +73,6 @@ public abstract class AbstractSimpleTransportTests {
}
@Override public void messageReceived(StringMessage request, TransportChannel channel) {
System.out.println("got message: " + request.message);
assertThat("moshe", equalTo(request.message));
try {
channel.sendResponse(new StringMessage("hello " + request.message));
@ -94,7 +90,6 @@ public abstract class AbstractSimpleTransportTests {
}
@Override public void handleResponse(StringMessage response) {
System.out.println("got response: " + response.message);
assertThat("hello moshe", equalTo(response.message));
}
@ -112,8 +107,6 @@ public abstract class AbstractSimpleTransportTests {
}
serviceA.removeHandler("sayHello");
System.out.println("after ...");
}
@ -124,7 +117,6 @@ public abstract class AbstractSimpleTransportTests {
}
@Override public void messageReceived(StringMessage request, TransportChannel channel) {
System.out.println("got message: " + request.message);
assertThat("moshe", equalTo(request.message));
try {
channel.sendResponse(new StringMessage("hello " + request.message), TransportResponseOptions.options().withCompress());
@ -142,7 +134,6 @@ public abstract class AbstractSimpleTransportTests {
}
@Override public void handleResponse(StringMessage response) {
System.out.println("got response: " + response.message);
assertThat("hello moshe", equalTo(response.message));
}
@ -160,8 +151,6 @@ public abstract class AbstractSimpleTransportTests {
}
serviceA.removeHandler("sayHello");
System.out.println("after ...");
}
@Test public void testErrorMessage() {
@ -171,7 +160,6 @@ public abstract class AbstractSimpleTransportTests {
}
@Override public void messageReceived(StringMessage request, TransportChannel channel) throws Exception {
System.out.println("got message: " + request.message);
assertThat("moshe", equalTo(request.message));
throw new RuntimeException("bad message !!!");
}
@ -200,9 +188,6 @@ public abstract class AbstractSimpleTransportTests {
}
serviceA.removeHandler("sayHelloException");
System.out.println("after ...");
}
@Test
@ -229,7 +214,6 @@ public abstract class AbstractSimpleTransportTests {
}
@Override public void messageReceived(StringMessage request, TransportChannel channel) {
System.out.println("got message: " + request.message);
assertThat("moshe", equalTo(request.message));
// don't send back a response
// try {
@ -264,8 +248,6 @@ public abstract class AbstractSimpleTransportTests {
}
serviceA.removeHandler("sayHelloTimeoutNoResponse");
System.out.println("after ...");
}
@Test public void testTimeoutSendExceptionWithDelayedResponse() throws Exception {
@ -275,7 +257,6 @@ public abstract class AbstractSimpleTransportTests {
}
@Override public void messageReceived(StringMessage request, TransportChannel channel) {
System.out.println("got message: " + request.message);
TimeValue sleep = TimeValue.parseTimeValue(request.message, null);
try {
Thread.sleep(sleep.millis());
@ -326,7 +307,6 @@ public abstract class AbstractSimpleTransportTests {
}
@Override public void handleResponse(StringMessage response) {
System.out.println("got response: " + response.message);
assertThat("hello " + counter + "ms", equalTo(response.message));
}
@ -341,8 +321,6 @@ public abstract class AbstractSimpleTransportTests {
}
serviceA.removeHandler("sayHelloTimeoutDelayedResponse");
System.out.println("after ...");
}
private class StringMessage implements Streamable {

View File

@ -28,10 +28,10 @@ import org.testng.annotations.Test;
public class SimpleLocalTransportTests extends AbstractSimpleTransportTests {
@Override protected void build() {
serviceA = new TransportService(new LocalTransport(threadPool), threadPool, timerService).start();
serviceA = new TransportService(new LocalTransport(threadPool), threadPool).start();
serviceANode = new DiscoveryNode("A", serviceA.boundAddress().publishAddress());
serviceB = new TransportService(new LocalTransport(threadPool), threadPool, timerService).start();
serviceB = new TransportService(new LocalTransport(threadPool), threadPool).start();
serviceBNode = new DiscoveryNode("B", serviceB.boundAddress().publishAddress());
}
}

View File

@ -32,10 +32,10 @@ import static org.elasticsearch.common.settings.ImmutableSettings.*;
public class SimpleNettyTransportTests extends AbstractSimpleTransportTests {
@Override protected void build() {
serviceA = new TransportService(settingsBuilder().put("name", "A").build(), new NettyTransport(settingsBuilder().put("name", "A").build(), threadPool), threadPool, timerService).start();
serviceA = new TransportService(settingsBuilder().put("name", "A").build(), new NettyTransport(settingsBuilder().put("name", "A").build(), threadPool), threadPool).start();
serviceANode = new DiscoveryNode("A", serviceA.boundAddress().publishAddress());
serviceB = new TransportService(settingsBuilder().put("name", "B").build(), new NettyTransport(settingsBuilder().put("name", "B").build(), threadPool), threadPool, timerService).start();
serviceB = new TransportService(settingsBuilder().put("name", "B").build(), new NettyTransport(settingsBuilder().put("name", "B").build(), threadPool), threadPool).start();
serviceBNode = new DiscoveryNode("B", serviceB.boundAddress().publishAddress());
}

View File

@ -60,7 +60,7 @@ public class BenchmarkNettyClient {
final ThreadPool threadPool = new CachedThreadPool(settings);
// final ThreadPool threadPool = new ScalingThreadPool(settings);
final TimerService timerService = new TimerService(settings, threadPool);
final TransportService transportService = new TransportService(new NettyTransport(settings, threadPool), threadPool, timerService).start();
final TransportService transportService = new TransportService(new NettyTransport(settings, threadPool), threadPool).start();
final DiscoveryNode node = new DiscoveryNode("server", new InetSocketTransportAddress("localhost", 9999));

View File

@ -28,7 +28,6 @@ import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.threadpool.cached.CachedThreadPool;
import org.elasticsearch.timer.TimerService;
import org.elasticsearch.transport.BaseTransportResponseHandler;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportService;
@ -59,8 +58,7 @@ public class BenchmarkNettyClientBlocking {
final ThreadPool threadPool = new CachedThreadPool(settings);
// final ThreadPool threadPool = new ScalingThreadPool(settings);
final TimerService timerService = new TimerService(settings, threadPool);
final TransportService transportService = new TransportService(new NettyTransport(settings, threadPool), threadPool, timerService).start();
final TransportService transportService = new TransportService(new NettyTransport(settings, threadPool), threadPool).start();
final DiscoveryNode node = new DiscoveryNode("server", new InetSocketTransportAddress("localhost", 9999));

View File

@ -52,8 +52,8 @@ public class BenchmarkNettyLargeMessages {
final ThreadPool threadPool = new CachedThreadPool(settings);
final TimerService timerService = new TimerService(settings, threadPool);
final TransportService transportServiceServer = new TransportService(new NettyTransport(settings, threadPool), threadPool, timerService).start();
final TransportService transportServiceClient = new TransportService(new NettyTransport(settings, threadPool), threadPool, timerService).start();
final TransportService transportServiceServer = new TransportService(new NettyTransport(settings, threadPool), threadPool).start();
final TransportService transportServiceClient = new TransportService(new NettyTransport(settings, threadPool), threadPool).start();
final DiscoveryNode bigNode = new DiscoveryNode("big", new InetSocketTransportAddress("localhost", 9300));
// final DiscoveryNode smallNode = new DiscoveryNode("small", new InetSocketTransportAddress("localhost", 9300));

View File

@ -23,7 +23,6 @@ import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.threadpool.cached.CachedThreadPool;
import org.elasticsearch.timer.TimerService;
import org.elasticsearch.transport.BaseTransportRequestHandler;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportService;
@ -44,8 +43,7 @@ public class BenchmarkNettyServer {
final ThreadPool threadPool = new CachedThreadPool(settings);
// final ThreadPool threadPool = new ScalingThreadPool(settings);
final TimerService timerService = new TimerService(settings, threadPool);
final TransportService transportService = new TransportService(new NettyTransport(settings, threadPool), threadPool, timerService).start();
final TransportService transportService = new TransportService(new NettyTransport(settings, threadPool), threadPool).start();
transportService.registerHandler("benchmark", new BaseTransportRequestHandler<BenchmarkMessage>() {
@Override public BenchmarkMessage newInstance() {