Transport: Have a separate channel for recovery

Have a separate channel for recovery, so it won't overflow the "low" channel which is also used for bulk indexing.

Also, rename the channel names to be more descriptive. Change low to bulk (for bulk based operations, currently just bulk indexing), med to reg (for "regular" operations), and high to state (for state based communication). The new channel for recovery will be named recovery, and the ping channel will remain the same.

closes #3954
This commit is contained in:
Shay Banon 2013-10-23 15:55:27 -07:00
parent d18192b39f
commit 35b573ff24
9 changed files with 125 additions and 138 deletions

View File

@ -48,7 +48,7 @@ public class BulkAction extends Action<BulkRequest, BulkResponse, BulkRequestBui
@Override @Override
public TransportRequestOptions transportOptions(Settings settings) { public TransportRequestOptions transportOptions(Settings settings) {
return TransportRequestOptions.options() return TransportRequestOptions.options()
.withType(TransportRequestOptions.Type.fromString(settings.get("action.bulk.transport.type", TransportRequestOptions.Type.LOW.toString()))) .withType(TransportRequestOptions.Type.BULK)
.withCompress(settings.getAsBoolean("action.bulk.compress", true) .withCompress(settings.getAsBoolean("action.bulk.compress", true)
); );
} }

View File

@ -311,7 +311,7 @@ public class TransportClientNodesService extends AbstractComponent {
try { try {
NodesInfoResponse nodeInfo = transportService.submitRequest(node, NodesInfoAction.NAME, NodesInfoResponse nodeInfo = transportService.submitRequest(node, NodesInfoAction.NAME,
Requests.nodesInfoRequest("_local"), Requests.nodesInfoRequest("_local"),
TransportRequestOptions.options().withHighType().withTimeout(pingTimeout), TransportRequestOptions.options().withType(TransportRequestOptions.Type.STATE).withTimeout(pingTimeout),
new FutureTransportResponseHandler<NodesInfoResponse>() { new FutureTransportResponseHandler<NodesInfoResponse>() {
@Override @Override
public NodesInfoResponse newInstance() { public NodesInfoResponse newInstance() {
@ -378,7 +378,7 @@ public class TransportClientNodesService extends AbstractComponent {
transportService.sendRequest(listedNode, ClusterStateAction.NAME, transportService.sendRequest(listedNode, ClusterStateAction.NAME,
Requests.clusterStateRequest() Requests.clusterStateRequest()
.filterAll().filterNodes(false).local(true), .filterAll().filterNodes(false).local(true),
TransportRequestOptions.options().withHighType().withTimeout(pingTimeout), TransportRequestOptions.options().withType(TransportRequestOptions.Type.STATE).withTimeout(pingTimeout),
new BaseTransportResponseHandler<ClusterStateResponse>() { new BaseTransportResponseHandler<ClusterStateResponse>() {
@Override @Override

View File

@ -266,7 +266,7 @@ public class MasterFaultDetection extends AbstractComponent {
threadPool.schedule(pingInterval, ThreadPool.Names.SAME, MasterPinger.this); threadPool.schedule(pingInterval, ThreadPool.Names.SAME, MasterPinger.this);
return; return;
} }
transportService.sendRequest(masterToPing, MasterPingRequestHandler.ACTION, new MasterPingRequest(nodesProvider.nodes().localNode().id(), masterToPing.id()), options().withPingType().withTimeout(pingRetryTimeout), transportService.sendRequest(masterToPing, MasterPingRequestHandler.ACTION, new MasterPingRequest(nodesProvider.nodes().localNode().id(), masterToPing.id()), options().withType(TransportRequestOptions.Type.PING).withTimeout(pingRetryTimeout),
new BaseTransportResponseHandler<MasterPingResponseResponse>() { new BaseTransportResponseHandler<MasterPingResponseResponse>() {
@Override @Override
public MasterPingResponseResponse newInstance() { public MasterPingResponseResponse newInstance() {
@ -324,7 +324,7 @@ public class MasterFaultDetection extends AbstractComponent {
notifyMasterFailure(masterToPing, "failed to ping, tried [" + pingRetryCount + "] times, each with maximum [" + pingRetryTimeout + "] timeout"); notifyMasterFailure(masterToPing, "failed to ping, tried [" + pingRetryCount + "] times, each with maximum [" + pingRetryTimeout + "] timeout");
} else { } else {
// resend the request, not reschedule, rely on send timeout // resend the request, not reschedule, rely on send timeout
transportService.sendRequest(masterToPing, MasterPingRequestHandler.ACTION, new MasterPingRequest(nodesProvider.nodes().localNode().id(), masterToPing.id()), options().withPingType().withTimeout(pingRetryTimeout), this); transportService.sendRequest(masterToPing, MasterPingRequestHandler.ACTION, new MasterPingRequest(nodesProvider.nodes().localNode().id(), masterToPing.id()), options().withType(TransportRequestOptions.Type.PING).withTimeout(pingRetryTimeout), this);
} }
} }
} }

View File

@ -200,7 +200,7 @@ public class NodesFaultDetection extends AbstractComponent {
if (!running) { if (!running) {
return; return;
} }
transportService.sendRequest(node, PingRequestHandler.ACTION, new PingRequest(node.id()), options().withPingType().withTimeout(pingRetryTimeout), transportService.sendRequest(node, PingRequestHandler.ACTION, new PingRequest(node.id()), options().withType(TransportRequestOptions.Type.PING).withTimeout(pingRetryTimeout),
new BaseTransportResponseHandler<PingResponse>() { new BaseTransportResponseHandler<PingResponse>() {
@Override @Override
public PingResponse newInstance() { public PingResponse newInstance() {
@ -248,7 +248,7 @@ public class NodesFaultDetection extends AbstractComponent {
} else { } else {
// resend the request, not reschedule, rely on send timeout // resend the request, not reschedule, rely on send timeout
transportService.sendRequest(node, PingRequestHandler.ACTION, new PingRequest(node.id()), transportService.sendRequest(node, PingRequestHandler.ACTION, new PingRequest(node.id()),
options().withPingType().withTimeout(pingRetryTimeout), this); options().withType(TransportRequestOptions.Type.PING).withTimeout(pingRetryTimeout), this);
} }
} }
} }

View File

@ -112,7 +112,7 @@ public class PublishClusterStateAction extends AbstractComponent {
} }
} }
try { try {
TransportRequestOptions options = TransportRequestOptions.options().withHighType().withCompress(false); TransportRequestOptions options = TransportRequestOptions.options().withType(TransportRequestOptions.Type.STATE).withCompress(false);
// no need to put a timeout on the options here, because we want the response to eventually be received // no need to put a timeout on the options here, because we want the response to eventually be received
// and not log an error if it arrives after the timeout // and not log an error if it arrives after the timeout
transportService.sendRequest(node, PublishClusterStateRequestHandler.ACTION, transportService.sendRequest(node, PublishClusterStateRequestHandler.ACTION,

View File

@ -200,7 +200,7 @@ public class RecoverySource extends AbstractComponent {
indexInput.readBytes(buf, 0, toRead, false); indexInput.readBytes(buf, 0, toRead, false);
BytesArray content = new BytesArray(buf, 0, toRead); BytesArray content = new BytesArray(buf, 0, toRead);
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.FILE_CHUNK, new RecoveryFileChunkRequest(request.recoveryId(), request.shardId(), name, position, len, md.checksum(), content), transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.FILE_CHUNK, new RecoveryFileChunkRequest(request.recoveryId(), request.shardId(), name, position, len, md.checksum(), content),
TransportRequestOptions.options().withCompress(shouldCompressRequest).withLowType().withTimeout(internalActionTimeout), EmptyTransportResponseHandler.INSTANCE_SAME).txGet(); TransportRequestOptions.options().withCompress(shouldCompressRequest).withType(TransportRequestOptions.Type.RECOVERY).withTimeout(internalActionTimeout), EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
readCount += toRead; readCount += toRead;
} }
} catch (Throwable e) { } catch (Throwable e) {
@ -299,7 +299,7 @@ public class RecoverySource extends AbstractComponent {
} }
RecoveryTranslogOperationsRequest translogOperationsRequest = new RecoveryTranslogOperationsRequest(request.recoveryId(), request.shardId(), operations); RecoveryTranslogOperationsRequest translogOperationsRequest = new RecoveryTranslogOperationsRequest(request.recoveryId(), request.shardId(), operations);
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.TRANSLOG_OPS, translogOperationsRequest, TransportRequestOptions.options().withCompress(recoverySettings.compress()).withLowType().withTimeout(internalActionLongTimeout), EmptyTransportResponseHandler.INSTANCE_SAME).txGet(); transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.TRANSLOG_OPS, translogOperationsRequest, TransportRequestOptions.options().withCompress(recoverySettings.compress()).withType(TransportRequestOptions.Type.RECOVERY).withTimeout(internalActionLongTimeout), EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
ops = 0; ops = 0;
size = 0; size = 0;
operations.clear(); operations.clear();
@ -308,7 +308,7 @@ public class RecoverySource extends AbstractComponent {
// send the leftover // send the leftover
if (!operations.isEmpty()) { if (!operations.isEmpty()) {
RecoveryTranslogOperationsRequest translogOperationsRequest = new RecoveryTranslogOperationsRequest(request.recoveryId(), request.shardId(), operations); RecoveryTranslogOperationsRequest translogOperationsRequest = new RecoveryTranslogOperationsRequest(request.recoveryId(), request.shardId(), operations);
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.TRANSLOG_OPS, translogOperationsRequest, TransportRequestOptions.options().withCompress(recoverySettings.compress()).withLowType().withTimeout(internalActionLongTimeout), EmptyTransportResponseHandler.INSTANCE_SAME).txGet(); transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.TRANSLOG_OPS, translogOperationsRequest, TransportRequestOptions.options().withCompress(recoverySettings.compress()).withType(TransportRequestOptions.Type.RECOVERY).withTimeout(internalActionLongTimeout), EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
} }
return totalOperations; return totalOperations;
} }

View File

@ -34,18 +34,21 @@ public class TransportRequestOptions {
} }
public static enum Type { public static enum Type {
LOW, RECOVERY,
MED, BULK,
HIGH, REG,
STATE,
PING; PING;
public static Type fromString(String type) { public static Type fromString(String type) {
if ("low".equalsIgnoreCase(type)) { if ("bulk".equalsIgnoreCase(type)) {
return LOW; return BULK;
} else if ("med".equalsIgnoreCase(type)) { } else if ("reg".equalsIgnoreCase(type)) {
return MED; return REG;
} else if ("high".equalsIgnoreCase(type)) { } else if ("state".equalsIgnoreCase(type)) {
return HIGH; return STATE;
} else if ("recovery".equalsIgnoreCase(type)) {
return RECOVERY;
} else if ("ping".equalsIgnoreCase(type)) { } else if ("ping".equalsIgnoreCase(type)) {
return PING; return PING;
} else { } else {
@ -58,7 +61,7 @@ public class TransportRequestOptions {
private boolean compress; private boolean compress;
private Type type = Type.MED; private Type type = Type.REG;
public TransportRequestOptions withTimeout(long timeout) { public TransportRequestOptions withTimeout(long timeout) {
return withTimeout(TimeValue.timeValueMillis(timeout)); return withTimeout(TimeValue.timeValueMillis(timeout));
@ -79,39 +82,6 @@ public class TransportRequestOptions {
return this; return this;
} }
/**
* A request that requires very low latency.
*/
public TransportRequestOptions withPingType() {
this.type = Type.PING;
return this;
}
/**
* A channel reserved for high prio requests.
*/
public TransportRequestOptions withHighType() {
this.type = Type.HIGH;
return this;
}
/**
* The typical requests flows go through this one.
*/
public TransportRequestOptions withMedType() {
this.type = Type.MED;
return this;
}
/**
* Batch oriented (big payload) based requests use this one.
*/
public TransportRequestOptions withLowType() {
this.type = Type.LOW;
return this;
}
public TimeValue timeout() { public TimeValue timeout() {
return this.timeout; return this.timeout;
} }

View File

@ -22,6 +22,7 @@ package org.elasticsearch.transport.netty;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import org.elasticsearch.ElasticSearchException; import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.ElasticSearchIllegalArgumentException;
import org.elasticsearch.ElasticSearchIllegalStateException; import org.elasticsearch.ElasticSearchIllegalStateException;
import org.elasticsearch.Version; import org.elasticsearch.Version;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode;
@ -126,9 +127,10 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
final ByteSizeValue tcpReceiveBufferSize; final ByteSizeValue tcpReceiveBufferSize;
final ReceiveBufferSizePredictorFactory receiveBufferSizePredictorFactory; final ReceiveBufferSizePredictorFactory receiveBufferSizePredictorFactory;
final int connectionsPerNodeLow; final int connectionsPerNodeRecovery;
final int connectionsPerNodeMed; final int connectionsPerNodeBulk;
final int connectionsPerNodeHigh; final int connectionsPerNodeReg;
final int connectionsPerNodeState;
final int connectionsPerNodePing; final int connectionsPerNodePing;
final ByteSizeValue maxCumulationBufferCapacity; final ByteSizeValue maxCumulationBufferCapacity;
@ -183,11 +185,23 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
this.reuseAddress = componentSettings.getAsBoolean("reuse_address", settings.getAsBoolean(TCP_REUSE_ADDRESS, NetworkUtils.defaultReuseAddress())); this.reuseAddress = componentSettings.getAsBoolean("reuse_address", settings.getAsBoolean(TCP_REUSE_ADDRESS, NetworkUtils.defaultReuseAddress()));
this.tcpSendBufferSize = componentSettings.getAsBytesSize("tcp_send_buffer_size", settings.getAsBytesSize(TCP_SEND_BUFFER_SIZE, TCP_DEFAULT_SEND_BUFFER_SIZE)); this.tcpSendBufferSize = componentSettings.getAsBytesSize("tcp_send_buffer_size", settings.getAsBytesSize(TCP_SEND_BUFFER_SIZE, TCP_DEFAULT_SEND_BUFFER_SIZE));
this.tcpReceiveBufferSize = componentSettings.getAsBytesSize("tcp_receive_buffer_size", settings.getAsBytesSize(TCP_RECEIVE_BUFFER_SIZE, TCP_DEFAULT_RECEIVE_BUFFER_SIZE)); this.tcpReceiveBufferSize = componentSettings.getAsBytesSize("tcp_receive_buffer_size", settings.getAsBytesSize(TCP_RECEIVE_BUFFER_SIZE, TCP_DEFAULT_RECEIVE_BUFFER_SIZE));
this.connectionsPerNodeLow = componentSettings.getAsInt("connections_per_node.low", settings.getAsInt("transport.connections_per_node.low", 2)); this.connectionsPerNodeRecovery = componentSettings.getAsInt("connections_per_node.recovery", settings.getAsInt("transport.connections_per_node.recovery", 2));
this.connectionsPerNodeMed = componentSettings.getAsInt("connections_per_node.med", settings.getAsInt("transport.connections_per_node.med", 6)); this.connectionsPerNodeBulk = componentSettings.getAsInt("connections_per_node.bulk", settings.getAsInt("transport.connections_per_node.bulk", 3));
this.connectionsPerNodeHigh = componentSettings.getAsInt("connections_per_node.high", settings.getAsInt("transport.connections_per_node.high", 1)); this.connectionsPerNodeReg = componentSettings.getAsInt("connections_per_node.reg", settings.getAsInt("transport.connections_per_node.reg", 6));
this.connectionsPerNodeState = componentSettings.getAsInt("connections_per_node.high", settings.getAsInt("transport.connections_per_node.state", 1));
this.connectionsPerNodePing = componentSettings.getAsInt("connections_per_node.ping", settings.getAsInt("transport.connections_per_node.ping", 1)); this.connectionsPerNodePing = componentSettings.getAsInt("connections_per_node.ping", settings.getAsInt("transport.connections_per_node.ping", 1));
// we want to have at least 1 for reg/state/ping
if (this.connectionsPerNodeReg == 0) {
throw new ElasticSearchIllegalArgumentException("can't set [connection_per_node.reg] to 0");
}
if (this.connectionsPerNodePing == 0) {
throw new ElasticSearchIllegalArgumentException("can't set [connection_per_node.ping] to 0");
}
if (this.connectionsPerNodeState == 0) {
throw new ElasticSearchIllegalArgumentException("can't set [connection_per_node.state] to 0");
}
this.maxCumulationBufferCapacity = componentSettings.getAsBytesSize("max_cumulation_buffer_capacity", null); this.maxCumulationBufferCapacity = componentSettings.getAsBytesSize("max_cumulation_buffer_capacity", null);
this.maxCompositeBufferComponents = componentSettings.getAsInt("max_composite_buffer_components", -1); this.maxCompositeBufferComponents = componentSettings.getAsInt("max_composite_buffer_components", -1);
@ -207,8 +221,8 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
receiveBufferSizePredictorFactory = new AdaptiveReceiveBufferSizePredictorFactory((int) receivePredictorMin.bytes(), (int) receivePredictorMin.bytes(), (int) receivePredictorMax.bytes()); receiveBufferSizePredictorFactory = new AdaptiveReceiveBufferSizePredictorFactory((int) receivePredictorMin.bytes(), (int) receivePredictorMin.bytes(), (int) receivePredictorMax.bytes());
} }
logger.debug("using worker_count[{}], port[{}], bind_host[{}], publish_host[{}], compress[{}], connect_timeout[{}], connections_per_node[{}/{}/{}/{}], receive_predictor[{}->{}]", logger.debug("using worker_count[{}], port[{}], bind_host[{}], publish_host[{}], compress[{}], connect_timeout[{}], connections_per_node[{}/{}/{}/{}/{}], receive_predictor[{}->{}]",
workerCount, port, bindHost, publishHost, compress, connectTimeout, connectionsPerNodeLow, connectionsPerNodeMed, connectionsPerNodeHigh, connectionsPerNodePing, receivePredictorMin, receivePredictorMax); workerCount, port, bindHost, publishHost, compress, connectTimeout, connectionsPerNodeRecovery, connectionsPerNodeBulk, connectionsPerNodeReg, connectionsPerNodeState, connectionsPerNodePing, receivePredictorMin, receivePredictorMax);
} }
public Settings settings() { public Settings settings() {
@ -603,7 +617,7 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
if (light) { if (light) {
nodeChannels = connectToChannelsLight(node); nodeChannels = connectToChannelsLight(node);
} else { } else {
nodeChannels = new NodeChannels(new Channel[connectionsPerNodeLow], new Channel[connectionsPerNodeMed], new Channel[connectionsPerNodeHigh], new Channel[connectionsPerNodePing]); nodeChannels = new NodeChannels(new Channel[connectionsPerNodeRecovery], new Channel[connectionsPerNodeBulk], new Channel[connectionsPerNodeReg], new Channel[connectionsPerNodeState], new Channel[connectionsPerNodePing]);
try { try {
connectToChannels(nodeChannels, node); connectToChannels(nodeChannels, node);
} catch (Exception e) { } catch (Exception e) {
@ -646,54 +660,67 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
Channel[] channels = new Channel[1]; Channel[] channels = new Channel[1];
channels[0] = connect.getChannel(); channels[0] = connect.getChannel();
channels[0].getCloseFuture().addListener(new ChannelCloseListener(node)); channels[0].getCloseFuture().addListener(new ChannelCloseListener(node));
return new NodeChannels(channels, channels, channels, channels); return new NodeChannels(channels, channels, channels, channels, channels);
} }
private void connectToChannels(NodeChannels nodeChannels, DiscoveryNode node) { private void connectToChannels(NodeChannels nodeChannels, DiscoveryNode node) {
ChannelFuture[] connectLow = new ChannelFuture[nodeChannels.low.length]; ChannelFuture[] connectRecovery = new ChannelFuture[nodeChannels.recovery.length];
ChannelFuture[] connectMed = new ChannelFuture[nodeChannels.med.length]; ChannelFuture[] connectBulk = new ChannelFuture[nodeChannels.bulk.length];
ChannelFuture[] connectHigh = new ChannelFuture[nodeChannels.high.length]; ChannelFuture[] connectReg = new ChannelFuture[nodeChannels.reg.length];
ChannelFuture[] connectState = new ChannelFuture[nodeChannels.state.length];
ChannelFuture[] connectPing = new ChannelFuture[nodeChannels.ping.length]; ChannelFuture[] connectPing = new ChannelFuture[nodeChannels.ping.length];
InetSocketAddress address = ((InetSocketTransportAddress) node.address()).address(); InetSocketAddress address = ((InetSocketTransportAddress) node.address()).address();
for (int i = 0; i < connectLow.length; i++) { for (int i = 0; i < connectRecovery.length; i++) {
connectLow[i] = clientBootstrap.connect(address); connectRecovery[i] = clientBootstrap.connect(address);
} }
for (int i = 0; i < connectMed.length; i++) { for (int i = 0; i < connectBulk.length; i++) {
connectMed[i] = clientBootstrap.connect(address); connectBulk[i] = clientBootstrap.connect(address);
} }
for (int i = 0; i < connectHigh.length; i++) { for (int i = 0; i < connectReg.length; i++) {
connectHigh[i] = clientBootstrap.connect(address); connectReg[i] = clientBootstrap.connect(address);
}
for (int i = 0; i < connectState.length; i++) {
connectState[i] = clientBootstrap.connect(address);
} }
for (int i = 0; i < connectPing.length; i++) { for (int i = 0; i < connectPing.length; i++) {
connectPing[i] = clientBootstrap.connect(address); connectPing[i] = clientBootstrap.connect(address);
} }
try { try {
for (int i = 0; i < connectLow.length; i++) { for (int i = 0; i < connectRecovery.length; i++) {
connectLow[i].awaitUninterruptibly((long) (connectTimeout.millis() * 1.5)); connectRecovery[i].awaitUninterruptibly((long) (connectTimeout.millis() * 1.5));
if (!connectLow[i].isSuccess()) { if (!connectRecovery[i].isSuccess()) {
throw new ConnectTransportException(node, "connect_timeout[" + connectTimeout + "]", connectLow[i].getCause()); throw new ConnectTransportException(node, "connect_timeout[" + connectTimeout + "]", connectRecovery[i].getCause());
} }
nodeChannels.low[i] = connectLow[i].getChannel(); nodeChannels.recovery[i] = connectRecovery[i].getChannel();
nodeChannels.low[i].getCloseFuture().addListener(new ChannelCloseListener(node)); nodeChannels.recovery[i].getCloseFuture().addListener(new ChannelCloseListener(node));
} }
for (int i = 0; i < connectMed.length; i++) { for (int i = 0; i < connectBulk.length; i++) {
connectMed[i].awaitUninterruptibly((long) (connectTimeout.millis() * 1.5)); connectBulk[i].awaitUninterruptibly((long) (connectTimeout.millis() * 1.5));
if (!connectMed[i].isSuccess()) { if (!connectBulk[i].isSuccess()) {
throw new ConnectTransportException(node, "connect_timeout[" + connectTimeout + "]", connectMed[i].getCause()); throw new ConnectTransportException(node, "connect_timeout[" + connectTimeout + "]", connectBulk[i].getCause());
} }
nodeChannels.med[i] = connectMed[i].getChannel(); nodeChannels.bulk[i] = connectBulk[i].getChannel();
nodeChannels.med[i].getCloseFuture().addListener(new ChannelCloseListener(node)); nodeChannels.bulk[i].getCloseFuture().addListener(new ChannelCloseListener(node));
} }
for (int i = 0; i < connectHigh.length; i++) { for (int i = 0; i < connectReg.length; i++) {
connectHigh[i].awaitUninterruptibly((long) (connectTimeout.millis() * 1.5)); connectReg[i].awaitUninterruptibly((long) (connectTimeout.millis() * 1.5));
if (!connectHigh[i].isSuccess()) { if (!connectReg[i].isSuccess()) {
throw new ConnectTransportException(node, "connect_timeout[" + connectTimeout + "]", connectHigh[i].getCause()); throw new ConnectTransportException(node, "connect_timeout[" + connectTimeout + "]", connectReg[i].getCause());
} }
nodeChannels.high[i] = connectHigh[i].getChannel(); nodeChannels.reg[i] = connectReg[i].getChannel();
nodeChannels.high[i].getCloseFuture().addListener(new ChannelCloseListener(node)); nodeChannels.reg[i].getCloseFuture().addListener(new ChannelCloseListener(node));
}
for (int i = 0; i < connectState.length; i++) {
connectState[i].awaitUninterruptibly((long) (connectTimeout.millis() * 1.5));
if (!connectState[i].isSuccess()) {
throw new ConnectTransportException(node, "connect_timeout[" + connectTimeout + "]", connectState[i].getCause());
}
nodeChannels.state[i] = connectState[i].getChannel();
nodeChannels.state[i].getCloseFuture().addListener(new ChannelCloseListener(node));
} }
for (int i = 0; i < connectPing.length; i++) { for (int i = 0; i < connectPing.length; i++) {
@ -705,37 +732,19 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
nodeChannels.ping[i].getCloseFuture().addListener(new ChannelCloseListener(node)); nodeChannels.ping[i].getCloseFuture().addListener(new ChannelCloseListener(node));
} }
if (nodeChannels.low.length == 0) { if (nodeChannels.recovery.length == 0) {
if (nodeChannels.med.length > 0) { if (nodeChannels.bulk.length > 0) {
nodeChannels.low = nodeChannels.med; nodeChannels.recovery = nodeChannels.bulk;
} else { } else {
nodeChannels.low = nodeChannels.high; nodeChannels.recovery = nodeChannels.reg;
} }
} }
if (nodeChannels.med.length == 0) { if (nodeChannels.bulk.length == 0) {
if (nodeChannels.high.length > 0) { nodeChannels.bulk = nodeChannels.reg;
nodeChannels.med = nodeChannels.high;
} else {
nodeChannels.med = nodeChannels.low;
}
}
if (nodeChannels.high.length == 0) {
if (nodeChannels.med.length > 0) {
nodeChannels.high = nodeChannels.med;
} else {
nodeChannels.high = nodeChannels.low;
}
}
if (nodeChannels.ping.length == 0) {
if (nodeChannels.high.length > 0) {
nodeChannels.ping = nodeChannels.high;
} else {
nodeChannels.ping = nodeChannels.med;
}
} }
} catch (RuntimeException e) { } catch (RuntimeException e) {
// clean the futures // clean the futures
for (ChannelFuture future : ImmutableList.<ChannelFuture>builder().add(connectLow).add(connectMed).add(connectHigh).build()) { for (ChannelFuture future : ImmutableList.<ChannelFuture>builder().add(connectRecovery).add(connectBulk).add(connectReg).add(connectState).add(connectPing).build()) {
future.cancel(); future.cancel();
if (future.getChannel() != null && future.getChannel().isOpen()) { if (future.getChannel() != null && future.getChannel().isOpen()) {
try { try {
@ -843,24 +852,27 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
public static class NodeChannels { public static class NodeChannels {
private Channel[] low; private Channel[] recovery;
private final AtomicInteger lowCounter = new AtomicInteger(); private final AtomicInteger recoveryCounter = new AtomicInteger();
private Channel[] med; private Channel[] bulk;
private final AtomicInteger medCounter = new AtomicInteger(); private final AtomicInteger bulkCounter = new AtomicInteger();
private Channel[] high; private Channel[] reg;
private final AtomicInteger highCounter = new AtomicInteger(); private final AtomicInteger regCounter = new AtomicInteger();
private Channel[] state;
private final AtomicInteger stateCounter = new AtomicInteger();
private Channel[] ping; private Channel[] ping;
private final AtomicInteger pingCounter = new AtomicInteger(); private final AtomicInteger pingCounter = new AtomicInteger();
public NodeChannels(Channel[] low, Channel[] med, Channel[] high, Channel[] ping) { public NodeChannels(Channel[] recovery, Channel[] bulk, Channel[] reg, Channel[] state, Channel[] ping) {
this.low = low; this.recovery = recovery;
this.med = med; this.bulk = bulk;
this.high = high; this.reg = reg;
this.state = state;
this.ping = ping; this.ping = ping;
} }
public boolean hasChannel(Channel channel) { public boolean hasChannel(Channel channel) {
return hasChannel(channel, low) || hasChannel(channel, med) || hasChannel(channel, high) || hasChannel(channel, ping); return hasChannel(channel, recovery) || hasChannel(channel, bulk) || hasChannel(channel, reg) || hasChannel(channel, state) || hasChannel(channel, ping);
} }
private boolean hasChannel(Channel channel, Channel[] channels) { private boolean hasChannel(Channel channel, Channel[] channels) {
@ -873,22 +885,27 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
} }
public Channel channel(TransportRequestOptions.Type type) { public Channel channel(TransportRequestOptions.Type type) {
if (type == TransportRequestOptions.Type.MED) { if (type == TransportRequestOptions.Type.REG) {
return med[Math.abs(medCounter.incrementAndGet()) % med.length]; return reg[Math.abs(regCounter.incrementAndGet()) % reg.length];
} else if (type == TransportRequestOptions.Type.HIGH) { } else if (type == TransportRequestOptions.Type.STATE) {
return high[Math.abs(highCounter.incrementAndGet()) % high.length]; return state[Math.abs(stateCounter.incrementAndGet()) % state.length];
} else if (type == TransportRequestOptions.Type.PING) { } else if (type == TransportRequestOptions.Type.PING) {
return ping[Math.abs(pingCounter.incrementAndGet()) % ping.length]; return ping[Math.abs(pingCounter.incrementAndGet()) % ping.length];
} else if (type == TransportRequestOptions.Type.BULK) {
return bulk[Math.abs(bulkCounter.incrementAndGet()) % bulk.length];
} else if (type == TransportRequestOptions.Type.RECOVERY) {
return recovery[Math.abs(recoveryCounter.incrementAndGet()) % recovery.length];
} else { } else {
return low[Math.abs(lowCounter.incrementAndGet()) % low.length]; throw new ElasticSearchIllegalArgumentException("no type channel for [" + type + "]");
} }
} }
public synchronized void close() { public synchronized void close() {
List<ChannelFuture> futures = new ArrayList<ChannelFuture>(); List<ChannelFuture> futures = new ArrayList<ChannelFuture>();
closeChannelsAndWait(low, futures); closeChannelsAndWait(recovery, futures);
closeChannelsAndWait(med, futures); closeChannelsAndWait(bulk, futures);
closeChannelsAndWait(high, futures); closeChannelsAndWait(reg, futures);
closeChannelsAndWait(state, futures);
closeChannelsAndWait(ping, futures); closeChannelsAndWait(ping, futures);
for (ChannelFuture future : futures) { for (ChannelFuture future : futures) {
future.awaitUninterruptibly(); future.awaitUninterruptibly();

View File

@ -87,7 +87,7 @@ public class BenchmarkNettyLargeMessages {
public void run() { public void run() {
for (int i = 0; i < NUMBER_OF_ITERATIONS; i++) { for (int i = 0; i < NUMBER_OF_ITERATIONS; i++) {
BenchmarkMessageRequest message = new BenchmarkMessageRequest(1, payload); BenchmarkMessageRequest message = new BenchmarkMessageRequest(1, payload);
transportServiceClient.submitRequest(bigNode, "benchmark", message, options().withLowType(), new BaseTransportResponseHandler<BenchmarkMessageResponse>() { transportServiceClient.submitRequest(bigNode, "benchmark", message, options().withType(TransportRequestOptions.Type.BULK), new BaseTransportResponseHandler<BenchmarkMessageResponse>() {
@Override @Override
public BenchmarkMessageResponse newInstance() { public BenchmarkMessageResponse newInstance() {
return new BenchmarkMessageResponse(); return new BenchmarkMessageResponse();
@ -119,7 +119,7 @@ public class BenchmarkNettyLargeMessages {
for (int i = 0; i < 1; i++) { for (int i = 0; i < 1; i++) {
BenchmarkMessageRequest message = new BenchmarkMessageRequest(2, BytesRef.EMPTY_BYTES); BenchmarkMessageRequest message = new BenchmarkMessageRequest(2, BytesRef.EMPTY_BYTES);
long start = System.currentTimeMillis(); long start = System.currentTimeMillis();
transportServiceClient.submitRequest(smallNode, "benchmark", message, options().withHighType(), new BaseTransportResponseHandler<BenchmarkMessageResponse>() { transportServiceClient.submitRequest(smallNode, "benchmark", message, options().withType(TransportRequestOptions.Type.STATE), new BaseTransportResponseHandler<BenchmarkMessageResponse>() {
@Override @Override
public BenchmarkMessageResponse newInstance() { public BenchmarkMessageResponse newInstance() {
return new BenchmarkMessageResponse(); return new BenchmarkMessageResponse();