refactoring transport service to allow for send options
This commit is contained in:
parent
04781e038a
commit
3d31c38f11
|
@ -37,6 +37,7 @@ import java.util.concurrent.CopyOnWriteArrayList;
|
|||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import static org.elasticsearch.common.unit.TimeValue.*;
|
||||
import static org.elasticsearch.transport.TransportRequestOptions.*;
|
||||
|
||||
/**
|
||||
* A fault detection that pings the master periodically to see if its alive.
|
||||
|
@ -253,7 +254,7 @@ public class MasterFaultDetection extends AbstractComponent {
|
|||
threadPool.schedule(MasterPinger.this, pingInterval);
|
||||
return;
|
||||
}
|
||||
transportService.sendRequest(masterToPing, MasterPingRequestHandler.ACTION, new MasterPingRequest(nodesProvider.nodes().localNode().id(), masterToPing.id()), pingRetryTimeout,
|
||||
transportService.sendRequest(masterToPing, MasterPingRequestHandler.ACTION, new MasterPingRequest(nodesProvider.nodes().localNode().id(), masterToPing.id()), options().withTimeout(pingRetryTimeout),
|
||||
new BaseTransportResponseHandler<MasterPingResponseResponse>() {
|
||||
@Override public MasterPingResponseResponse newInstance() {
|
||||
return new MasterPingResponseResponse();
|
||||
|
@ -291,7 +292,7 @@ public class MasterFaultDetection extends AbstractComponent {
|
|||
notifyMasterFailure(masterToPing, "failed to ping, tried [" + pingRetryCount + "] times, each with maximum [" + pingRetryTimeout + "] timeout");
|
||||
} else {
|
||||
// resend the request, not reschedule, rely on send timeout
|
||||
transportService.sendRequest(masterToPing, MasterPingRequestHandler.ACTION, new MasterPingRequest(nodesProvider.nodes().localNode().id(), masterToPing.id()), pingRetryTimeout, this);
|
||||
transportService.sendRequest(masterToPing, MasterPingRequestHandler.ACTION, new MasterPingRequest(nodesProvider.nodes().localNode().id(), masterToPing.id()), options().withTimeout(pingRetryTimeout), this);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -38,6 +38,7 @@ import java.util.concurrent.CopyOnWriteArrayList;
|
|||
import static org.elasticsearch.cluster.node.DiscoveryNodes.*;
|
||||
import static org.elasticsearch.common.unit.TimeValue.*;
|
||||
import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.*;
|
||||
import static org.elasticsearch.transport.TransportRequestOptions.*;
|
||||
|
||||
/**
|
||||
* A fault detection of multiple nodes.
|
||||
|
@ -193,7 +194,7 @@ public class NodesFaultDetection extends AbstractComponent {
|
|||
if (!running) {
|
||||
return;
|
||||
}
|
||||
transportService.sendRequest(node, PingRequestHandler.ACTION, new PingRequest(node.id()), pingRetryTimeout,
|
||||
transportService.sendRequest(node, PingRequestHandler.ACTION, new PingRequest(node.id()), options().withTimeout(pingRetryTimeout),
|
||||
new BaseTransportResponseHandler<PingResponse>() {
|
||||
@Override public PingResponse newInstance() {
|
||||
return new PingResponse();
|
||||
|
@ -227,7 +228,7 @@ public class NodesFaultDetection extends AbstractComponent {
|
|||
}
|
||||
} else {
|
||||
// resend the request, not reschedule, rely on send timeout
|
||||
transportService.sendRequest(node, PingRequestHandler.ACTION, new PingRequest(node.id()), pingRetryTimeout, this);
|
||||
transportService.sendRequest(node, PingRequestHandler.ACTION, new PingRequest(node.id()), options().withTimeout(pingRetryTimeout), this);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -196,7 +196,7 @@ public class UnicastZenPing extends AbstractLifecycleComponent<ZenPing> implemen
|
|||
}
|
||||
|
||||
final boolean disconnect = disconnectX;
|
||||
transportService.sendRequest(nodeToSend, UnicastPingRequestHandler.ACTION, pingRequest, TimeValue.timeValueMillis((long) (timeout.millis() * 1.25)), new BaseTransportResponseHandler<UnicastPingResponse>() {
|
||||
transportService.sendRequest(nodeToSend, UnicastPingRequestHandler.ACTION, pingRequest, TransportRequestOptions.options().withTimeout((long) (timeout.millis() * 1.25)), new BaseTransportResponseHandler<UnicastPingResponse>() {
|
||||
|
||||
@Override public UnicastPingResponse newInstance() {
|
||||
return new UnicastPingResponse();
|
||||
|
|
|
@ -25,6 +25,7 @@ import org.elasticsearch.common.io.stream.Streamable;
|
|||
import org.elasticsearch.common.transport.BoundTransportAddress;
|
||||
import org.elasticsearch.common.transport.TransportAddress;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
|
@ -93,5 +94,8 @@ public interface Transport extends LifecycleComponent<Transport> {
|
|||
*/
|
||||
void disconnectFromNode(DiscoveryNode node);
|
||||
|
||||
<T extends Streamable> void sendRequest(DiscoveryNode node, long requestId, String action, Streamable message) throws IOException, TransportException;
|
||||
/**
|
||||
* Sends the request to the node.
|
||||
*/
|
||||
<T extends Streamable> void sendRequest(DiscoveryNode node, long requestId, String action, Streamable message, @Nullable TransportRequestOptions options) throws IOException, TransportException;
|
||||
}
|
||||
|
|
|
@ -0,0 +1,47 @@
|
|||
/*
|
||||
* Licensed to Elastic Search and Shay Banon under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Elastic Search licenses this
|
||||
* file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.transport;
|
||||
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
|
||||
/**
|
||||
* @author kimchy (shay.banon)
|
||||
*/
|
||||
public class TransportRequestOptions {
|
||||
|
||||
public static TransportRequestOptions options() {
|
||||
return new TransportRequestOptions();
|
||||
}
|
||||
|
||||
private TimeValue timeout;
|
||||
|
||||
public TransportRequestOptions withTimeout(long timeout) {
|
||||
return withTimeout(TimeValue.timeValueMillis(timeout));
|
||||
}
|
||||
|
||||
public TransportRequestOptions withTimeout(TimeValue timeout) {
|
||||
this.timeout = timeout;
|
||||
return this;
|
||||
}
|
||||
|
||||
public TimeValue timeout() {
|
||||
return this.timeout;
|
||||
}
|
||||
}
|
|
@ -29,12 +29,12 @@ import org.elasticsearch.common.timer.Timeout;
|
|||
import org.elasticsearch.common.timer.TimerTask;
|
||||
import org.elasticsearch.common.transport.BoundTransportAddress;
|
||||
import org.elasticsearch.common.transport.TransportAddress;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
|
||||
import org.elasticsearch.common.util.concurrent.ConcurrentMapLong;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.timer.TimerService;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.util.Collections;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.Map;
|
||||
|
@ -160,9 +160,9 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
|
|||
}
|
||||
|
||||
public <T extends Streamable> TransportFuture<T> submitRequest(DiscoveryNode node, String action, Streamable message,
|
||||
TimeValue timeout, TransportResponseHandler<T> handler) throws TransportException {
|
||||
@Nullable TransportRequestOptions options, TransportResponseHandler<T> handler) throws TransportException {
|
||||
PlainTransportFuture<T> futureHandler = new PlainTransportFuture<T>(handler);
|
||||
sendRequest(node, action, message, timeout, futureHandler);
|
||||
sendRequest(node, action, message, options, futureHandler);
|
||||
return futureHandler;
|
||||
}
|
||||
|
||||
|
@ -172,15 +172,15 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
|
|||
}
|
||||
|
||||
public <T extends Streamable> void sendRequest(final DiscoveryNode node, final String action, final Streamable message,
|
||||
final TimeValue timeout, final TransportResponseHandler<T> handler) throws TransportException {
|
||||
@Nullable final TransportRequestOptions options, final TransportResponseHandler<T> handler) throws TransportException {
|
||||
final long requestId = newRequestId();
|
||||
Timeout timeoutX = null;
|
||||
try {
|
||||
if (timeout != null) {
|
||||
timeoutX = timerService.newTimeout(new TimeoutTimerTask(requestId), timeout, TimerService.ExecutionType.THREADED);
|
||||
if (options != null && options.timeout() != null) {
|
||||
timeoutX = timerService.newTimeout(new TimeoutTimerTask(requestId), options.timeout(), TimerService.ExecutionType.THREADED);
|
||||
}
|
||||
clientHandlers.put(requestId, new RequestHolder<T>(handler, node, action, timeoutX));
|
||||
transport.sendRequest(node, requestId, action, message);
|
||||
transport.sendRequest(node, requestId, action, message, options);
|
||||
} catch (final Exception e) {
|
||||
// usually happen either because we failed to connect to the node
|
||||
// or because we failed serializing the message
|
||||
|
|
|
@ -134,7 +134,7 @@ public class LocalTransport extends AbstractLifecycleComponent<Transport> implem
|
|||
}
|
||||
}
|
||||
|
||||
@Override public <T extends Streamable> void sendRequest(final DiscoveryNode node, final long requestId, final String action, final Streamable message) throws IOException, TransportException {
|
||||
@Override public <T extends Streamable> void sendRequest(final DiscoveryNode node, final long requestId, final String action, final Streamable message, @Nullable TransportRequestOptions options) throws IOException, TransportException {
|
||||
HandlesStreamOutput stream = BytesStreamOutput.Cached.cachedHandles();
|
||||
|
||||
stream.writeLong(requestId);
|
||||
|
|
|
@ -53,6 +53,7 @@ import org.elasticsearch.common.unit.TimeValue;
|
|||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.*;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.io.IOException;
|
||||
import java.net.InetAddress;
|
||||
import java.net.InetSocketAddress;
|
||||
|
@ -382,7 +383,7 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
|
|||
|
||||
private static final byte[] LENGTH_PLACEHOLDER = new byte[4];
|
||||
|
||||
@Override public <T extends Streamable> void sendRequest(final DiscoveryNode node, final long requestId, final String action, final Streamable streamable) throws IOException, TransportException {
|
||||
@Override public <T extends Streamable> void sendRequest(final DiscoveryNode node, final long requestId, final String action, final Streamable streamable, @Nullable TransportRequestOptions options) throws IOException, TransportException {
|
||||
|
||||
Channel targetChannel = nodeChannel(node);
|
||||
|
||||
|
|
|
@ -35,6 +35,7 @@ import java.io.IOException;
|
|||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static org.elasticsearch.transport.TransportRequestOptions.*;
|
||||
import static org.hamcrest.MatcherAssert.*;
|
||||
import static org.hamcrest.Matchers.*;
|
||||
|
||||
|
@ -193,7 +194,7 @@ public abstract class AbstractSimpleTransportTests {
|
|||
});
|
||||
|
||||
TransportFuture<StringMessage> res = serviceB.submitRequest(serviceANode, "sayHelloTimeoutNoResponse",
|
||||
new StringMessage("moshe"), TimeValue.timeValueMillis(100), new BaseTransportResponseHandler<StringMessage>() {
|
||||
new StringMessage("moshe"), options().withTimeout(100), new BaseTransportResponseHandler<StringMessage>() {
|
||||
@Override public StringMessage newInstance() {
|
||||
return new StringMessage();
|
||||
}
|
||||
|
@ -243,7 +244,7 @@ public abstract class AbstractSimpleTransportTests {
|
|||
});
|
||||
|
||||
TransportFuture<StringMessage> res = serviceB.submitRequest(serviceANode, "sayHelloTimeoutDelayedResponse",
|
||||
new StringMessage("300ms"), TimeValue.timeValueMillis(100), new BaseTransportResponseHandler<StringMessage>() {
|
||||
new StringMessage("300ms"), options().withTimeout(100), new BaseTransportResponseHandler<StringMessage>() {
|
||||
@Override public StringMessage newInstance() {
|
||||
return new StringMessage();
|
||||
}
|
||||
|
@ -271,7 +272,7 @@ public abstract class AbstractSimpleTransportTests {
|
|||
final int counter = i;
|
||||
// now, try and send another request, this times, with a short timeout
|
||||
res = serviceB.submitRequest(serviceANode, "sayHelloTimeoutDelayedResponse",
|
||||
new StringMessage(counter + "ms"), TimeValue.timeValueMillis(100), new BaseTransportResponseHandler<StringMessage>() {
|
||||
new StringMessage(counter + "ms"), options().withTimeout(100), new BaseTransportResponseHandler<StringMessage>() {
|
||||
@Override public StringMessage newInstance() {
|
||||
return new StringMessage();
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue