[TEST] Randomize netty worker and connection parameters

Try and push our system to a state where there is only a single worker, trying to expose potential deadlocks when we by mistake execute blocking operations on the worker thread
closes #6635
This commit is contained in:
Shay Banon 2014-06-27 11:09:46 +02:00
parent c907ce325e
commit c9ff9a6930
2 changed files with 23 additions and 6 deletions

View File

@ -103,6 +103,13 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
NettyStaticSetup.setup();
}
public static final String WORKER_COUNT = "transport.netty.worker_count";
public static final String CONNECTIONS_PER_NODE_RECOVERY = "transport.connections_per_node.recovery";
public static final String CONNECTIONS_PER_NODE_BULK = "transport.connections_per_node.bulk";
public static final String CONNECTIONS_PER_NODE_REG = "transport.connections_per_node.reg";
public static final String CONNECTIONS_PER_NODE_STATE = "transport.connections_per_node.state";
public static final String CONNECTIONS_PER_NODE_PING = "transport.connections_per_node.ping";
private final NetworkService networkService;
final Version version;
@ -182,7 +189,7 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
System.setProperty("org.jboss.netty.epollBugWorkaround", "true");
}
this.workerCount = componentSettings.getAsInt("worker_count", EsExecutors.boundedNumberOfProcessors(settings) * 2);
this.workerCount = settings.getAsInt(WORKER_COUNT, EsExecutors.boundedNumberOfProcessors(settings) * 2);
this.bossCount = componentSettings.getAsInt("boss_count", 1);
this.blockingServer = settings.getAsBoolean("transport.tcp.blocking_server", settings.getAsBoolean(TCP_BLOCKING_SERVER, settings.getAsBoolean(TCP_BLOCKING, false)));
this.blockingClient = settings.getAsBoolean("transport.tcp.blocking_client", settings.getAsBoolean(TCP_BLOCKING_CLIENT, settings.getAsBoolean(TCP_BLOCKING, false)));
@ -197,11 +204,11 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
this.reuseAddress = componentSettings.getAsBoolean("reuse_address", settings.getAsBoolean(TCP_REUSE_ADDRESS, NetworkUtils.defaultReuseAddress()));
this.tcpSendBufferSize = componentSettings.getAsBytesSize("tcp_send_buffer_size", settings.getAsBytesSize(TCP_SEND_BUFFER_SIZE, TCP_DEFAULT_SEND_BUFFER_SIZE));
this.tcpReceiveBufferSize = componentSettings.getAsBytesSize("tcp_receive_buffer_size", settings.getAsBytesSize(TCP_RECEIVE_BUFFER_SIZE, TCP_DEFAULT_RECEIVE_BUFFER_SIZE));
this.connectionsPerNodeRecovery = componentSettings.getAsInt("connections_per_node.recovery", settings.getAsInt("transport.connections_per_node.recovery", 2));
this.connectionsPerNodeBulk = componentSettings.getAsInt("connections_per_node.bulk", settings.getAsInt("transport.connections_per_node.bulk", 3));
this.connectionsPerNodeReg = componentSettings.getAsInt("connections_per_node.reg", settings.getAsInt("transport.connections_per_node.reg", 6));
this.connectionsPerNodeState = componentSettings.getAsInt("connections_per_node.high", settings.getAsInt("transport.connections_per_node.state", 1));
this.connectionsPerNodePing = componentSettings.getAsInt("connections_per_node.ping", settings.getAsInt("transport.connections_per_node.ping", 1));
this.connectionsPerNodeRecovery = componentSettings.getAsInt("connections_per_node.recovery", settings.getAsInt(CONNECTIONS_PER_NODE_RECOVERY, 2));
this.connectionsPerNodeBulk = componentSettings.getAsInt("connections_per_node.bulk", settings.getAsInt(CONNECTIONS_PER_NODE_BULK, 3));
this.connectionsPerNodeReg = componentSettings.getAsInt("connections_per_node.reg", settings.getAsInt(CONNECTIONS_PER_NODE_REG, 6));
this.connectionsPerNodeState = componentSettings.getAsInt("connections_per_node.high", settings.getAsInt(CONNECTIONS_PER_NODE_STATE, 1));
this.connectionsPerNodePing = componentSettings.getAsInt("connections_per_node.ping", settings.getAsInt(CONNECTIONS_PER_NODE_PING, 1));
// we want to have at least 1 for reg/state/ping
if (this.connectionsPerNodeReg == 0) {

View File

@ -77,6 +77,7 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportModule;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.transport.netty.NettyTransport;
import org.junit.Assert;
import java.io.Closeable;
@ -330,6 +331,15 @@ public final class InternalTestCluster extends TestCluster {
builder.put("indices.fielddata.cache.expire", TimeValue.timeValueMillis(1 + random.nextInt(10000)));
}
}
// randomize netty settings
if (random.nextBoolean()) {
builder.put(NettyTransport.WORKER_COUNT, random.nextInt(3) + 1);
builder.put(NettyTransport.CONNECTIONS_PER_NODE_RECOVERY, random.nextInt(2) + 1);
builder.put(NettyTransport.CONNECTIONS_PER_NODE_BULK, random.nextInt(3) + 1);
builder.put(NettyTransport.CONNECTIONS_PER_NODE_REG, random.nextInt(6) + 1);
}
return builder.build();
}