diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/fd/MasterFaultDetection.java b/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/fd/MasterFaultDetection.java index 52eb0dbb7df..d65f3a633ca 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/fd/MasterFaultDetection.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/fd/MasterFaultDetection.java @@ -37,6 +37,7 @@ import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicBoolean; import static org.elasticsearch.common.unit.TimeValue.*; +import static org.elasticsearch.transport.TransportRequestOptions.*; /** * A fault detection that pings the master periodically to see if its alive. @@ -253,7 +254,7 @@ public class MasterFaultDetection extends AbstractComponent { threadPool.schedule(MasterPinger.this, pingInterval); return; } - transportService.sendRequest(masterToPing, MasterPingRequestHandler.ACTION, new MasterPingRequest(nodesProvider.nodes().localNode().id(), masterToPing.id()), pingRetryTimeout, + transportService.sendRequest(masterToPing, MasterPingRequestHandler.ACTION, new MasterPingRequest(nodesProvider.nodes().localNode().id(), masterToPing.id()), options().withTimeout(pingRetryTimeout), new BaseTransportResponseHandler() { @Override public MasterPingResponseResponse newInstance() { return new MasterPingResponseResponse(); @@ -291,7 +292,7 @@ public class MasterFaultDetection extends AbstractComponent { notifyMasterFailure(masterToPing, "failed to ping, tried [" + pingRetryCount + "] times, each with maximum [" + pingRetryTimeout + "] timeout"); } else { // resend the request, not reschedule, rely on send timeout - transportService.sendRequest(masterToPing, MasterPingRequestHandler.ACTION, new MasterPingRequest(nodesProvider.nodes().localNode().id(), masterToPing.id()), pingRetryTimeout, this); + transportService.sendRequest(masterToPing, MasterPingRequestHandler.ACTION, new MasterPingRequest(nodesProvider.nodes().localNode().id(), masterToPing.id()), options().withTimeout(pingRetryTimeout), this); } } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/fd/NodesFaultDetection.java b/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/fd/NodesFaultDetection.java index cac414b6b83..aa4879f9765 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/fd/NodesFaultDetection.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/fd/NodesFaultDetection.java @@ -38,6 +38,7 @@ import java.util.concurrent.CopyOnWriteArrayList; import static org.elasticsearch.cluster.node.DiscoveryNodes.*; import static org.elasticsearch.common.unit.TimeValue.*; import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.*; +import static org.elasticsearch.transport.TransportRequestOptions.*; /** * A fault detection of multiple nodes. @@ -193,7 +194,7 @@ public class NodesFaultDetection extends AbstractComponent { if (!running) { return; } - transportService.sendRequest(node, PingRequestHandler.ACTION, new PingRequest(node.id()), pingRetryTimeout, + transportService.sendRequest(node, PingRequestHandler.ACTION, new PingRequest(node.id()), options().withTimeout(pingRetryTimeout), new BaseTransportResponseHandler() { @Override public PingResponse newInstance() { return new PingResponse(); @@ -227,7 +228,7 @@ public class NodesFaultDetection extends AbstractComponent { } } else { // resend the request, not reschedule, rely on send timeout - transportService.sendRequest(node, PingRequestHandler.ACTION, new PingRequest(node.id()), pingRetryTimeout, this); + transportService.sendRequest(node, PingRequestHandler.ACTION, new PingRequest(node.id()), options().withTimeout(pingRetryTimeout), this); } } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java b/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java index 9b13cb01356..4e17bb5c691 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java @@ -196,7 +196,7 @@ public class UnicastZenPing extends AbstractLifecycleComponent implemen } final boolean disconnect = disconnectX; - transportService.sendRequest(nodeToSend, UnicastPingRequestHandler.ACTION, pingRequest, TimeValue.timeValueMillis((long) (timeout.millis() * 1.25)), new BaseTransportResponseHandler() { + transportService.sendRequest(nodeToSend, UnicastPingRequestHandler.ACTION, pingRequest, TransportRequestOptions.options().withTimeout((long) (timeout.millis() * 1.25)), new BaseTransportResponseHandler() { @Override public UnicastPingResponse newInstance() { return new UnicastPingResponse(); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/Transport.java b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/Transport.java index 9a6ed56d203..0728b5e34d5 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/Transport.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/Transport.java @@ -25,6 +25,7 @@ import org.elasticsearch.common.io.stream.Streamable; import org.elasticsearch.common.transport.BoundTransportAddress; import org.elasticsearch.common.transport.TransportAddress; +import javax.annotation.Nullable; import java.io.IOException; /** @@ -93,5 +94,8 @@ public interface Transport extends LifecycleComponent { */ void disconnectFromNode(DiscoveryNode node); - void sendRequest(DiscoveryNode node, long requestId, String action, Streamable message) throws IOException, TransportException; + /** + * Sends the request to the node. + */ + void sendRequest(DiscoveryNode node, long requestId, String action, Streamable message, @Nullable TransportRequestOptions options) throws IOException, TransportException; } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/TransportRequestOptions.java b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/TransportRequestOptions.java new file mode 100644 index 00000000000..ba69a472fd8 --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/TransportRequestOptions.java @@ -0,0 +1,47 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.transport; + +import org.elasticsearch.common.unit.TimeValue; + +/** + * @author kimchy (shay.banon) + */ +public class TransportRequestOptions { + + public static TransportRequestOptions options() { + return new TransportRequestOptions(); + } + + private TimeValue timeout; + + public TransportRequestOptions withTimeout(long timeout) { + return withTimeout(TimeValue.timeValueMillis(timeout)); + } + + public TransportRequestOptions withTimeout(TimeValue timeout) { + this.timeout = timeout; + return this; + } + + public TimeValue timeout() { + return this.timeout; + } +} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/TransportService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/TransportService.java index 76dba0116fe..7f9d641b42c 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/TransportService.java @@ -29,12 +29,12 @@ import org.elasticsearch.common.timer.Timeout; import org.elasticsearch.common.timer.TimerTask; import org.elasticsearch.common.transport.BoundTransportAddress; import org.elasticsearch.common.transport.TransportAddress; -import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.ConcurrentMapLong; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.timer.TimerService; +import javax.annotation.Nullable; import java.util.Collections; import java.util.LinkedHashMap; import java.util.Map; @@ -160,9 +160,9 @@ public class TransportService extends AbstractLifecycleComponent TransportFuture submitRequest(DiscoveryNode node, String action, Streamable message, - TimeValue timeout, TransportResponseHandler handler) throws TransportException { + @Nullable TransportRequestOptions options, TransportResponseHandler handler) throws TransportException { PlainTransportFuture futureHandler = new PlainTransportFuture(handler); - sendRequest(node, action, message, timeout, futureHandler); + sendRequest(node, action, message, options, futureHandler); return futureHandler; } @@ -172,15 +172,15 @@ public class TransportService extends AbstractLifecycleComponent void sendRequest(final DiscoveryNode node, final String action, final Streamable message, - final TimeValue timeout, final TransportResponseHandler handler) throws TransportException { + @Nullable final TransportRequestOptions options, final TransportResponseHandler handler) throws TransportException { final long requestId = newRequestId(); Timeout timeoutX = null; try { - if (timeout != null) { - timeoutX = timerService.newTimeout(new TimeoutTimerTask(requestId), timeout, TimerService.ExecutionType.THREADED); + if (options != null && options.timeout() != null) { + timeoutX = timerService.newTimeout(new TimeoutTimerTask(requestId), options.timeout(), TimerService.ExecutionType.THREADED); } clientHandlers.put(requestId, new RequestHolder(handler, node, action, timeoutX)); - transport.sendRequest(node, requestId, action, message); + transport.sendRequest(node, requestId, action, message, options); } catch (final Exception e) { // usually happen either because we failed to connect to the node // or because we failed serializing the message diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/local/LocalTransport.java b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/local/LocalTransport.java index 3157562e689..12177ca91ca 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/local/LocalTransport.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/local/LocalTransport.java @@ -134,7 +134,7 @@ public class LocalTransport extends AbstractLifecycleComponent implem } } - @Override public void sendRequest(final DiscoveryNode node, final long requestId, final String action, final Streamable message) throws IOException, TransportException { + @Override public void sendRequest(final DiscoveryNode node, final long requestId, final String action, final Streamable message, @Nullable TransportRequestOptions options) throws IOException, TransportException { HandlesStreamOutput stream = BytesStreamOutput.Cached.cachedHandles(); stream.writeLong(requestId); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java index ea890632b4a..c7e909c228b 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java @@ -53,6 +53,7 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.*; +import javax.annotation.Nullable; import java.io.IOException; import java.net.InetAddress; import java.net.InetSocketAddress; @@ -382,7 +383,7 @@ public class NettyTransport extends AbstractLifecycleComponent implem private static final byte[] LENGTH_PLACEHOLDER = new byte[4]; - @Override public void sendRequest(final DiscoveryNode node, final long requestId, final String action, final Streamable streamable) throws IOException, TransportException { + @Override public void sendRequest(final DiscoveryNode node, final long requestId, final String action, final Streamable streamable, @Nullable TransportRequestOptions options) throws IOException, TransportException { Channel targetChannel = nodeChannel(node); diff --git a/modules/elasticsearch/src/test/java/org/elasticsearch/transport/AbstractSimpleTransportTests.java b/modules/elasticsearch/src/test/java/org/elasticsearch/transport/AbstractSimpleTransportTests.java index 20d536d60f6..e8cb04110fd 100644 --- a/modules/elasticsearch/src/test/java/org/elasticsearch/transport/AbstractSimpleTransportTests.java +++ b/modules/elasticsearch/src/test/java/org/elasticsearch/transport/AbstractSimpleTransportTests.java @@ -35,6 +35,7 @@ import java.io.IOException; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import static org.elasticsearch.transport.TransportRequestOptions.*; import static org.hamcrest.MatcherAssert.*; import static org.hamcrest.Matchers.*; @@ -193,7 +194,7 @@ public abstract class AbstractSimpleTransportTests { }); TransportFuture res = serviceB.submitRequest(serviceANode, "sayHelloTimeoutNoResponse", - new StringMessage("moshe"), TimeValue.timeValueMillis(100), new BaseTransportResponseHandler() { + new StringMessage("moshe"), options().withTimeout(100), new BaseTransportResponseHandler() { @Override public StringMessage newInstance() { return new StringMessage(); } @@ -243,7 +244,7 @@ public abstract class AbstractSimpleTransportTests { }); TransportFuture res = serviceB.submitRequest(serviceANode, "sayHelloTimeoutDelayedResponse", - new StringMessage("300ms"), TimeValue.timeValueMillis(100), new BaseTransportResponseHandler() { + new StringMessage("300ms"), options().withTimeout(100), new BaseTransportResponseHandler() { @Override public StringMessage newInstance() { return new StringMessage(); } @@ -271,7 +272,7 @@ public abstract class AbstractSimpleTransportTests { final int counter = i; // now, try and send another request, this times, with a short timeout res = serviceB.submitRequest(serviceANode, "sayHelloTimeoutDelayedResponse", - new StringMessage(counter + "ms"), TimeValue.timeValueMillis(100), new BaseTransportResponseHandler() { + new StringMessage(counter + "ms"), options().withTimeout(100), new BaseTransportResponseHandler() { @Override public StringMessage newInstance() { return new StringMessage(); }