Add blocking socket based MockTcpTransport (#19332)

Today we have a bunch of tests that use netty transport for several reasons
these tests use it because they need to run some tcp based transport. Yet, this
couples our tests tightly to the netty implementation which should be tested on it's own.
This change adds a plain socket based blocking TcpTransport implementation that is used by
default in tests if local transport is suppressed or if network is selected.
It also adds another tcp network implementation as a showcase how the interface works.
This commit is contained in:
Simon Willnauer 2016-07-11 12:17:52 +02:00 committed by GitHub
parent 1d03a1409c
commit 3f3c93ec65
14 changed files with 532 additions and 101 deletions

View File

@ -40,7 +40,7 @@ public class NetworkExceptionHelper {
}
if (e.getMessage() != null) {
// UGLY!, this exception messages seems to represent closed connection
if (e.getMessage().contains("Connection reset by peer")) {
if (e.getMessage().contains("Connection reset")) {
return true;
}
if (e.getMessage().contains("connection was aborted")) {

View File

@ -352,7 +352,7 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
return Arrays.asList(recovery, bulk, reg, state, ping);
}
public synchronized void close() {
public synchronized void close() throws IOException {
closeChannels(allChannels);
}
}
@ -433,7 +433,7 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
connectedNodes.remove(node);
try {
logger.debug("disconnecting from [{}], {}", node, reason);
nodeChannels.close();
IOUtils.closeWhileHandlingException(nodeChannels);
} finally {
logger.trace("disconnected from [{}], {}", node, reason);
transportServiceAdapter.raiseNodeDisconnected(node);
@ -451,15 +451,19 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
protected final void disconnectFromNodeChannel(final Channel channel, final Exception failure) {
threadPool.generic().execute(() -> {
try {
closeChannels(Collections.singletonList(channel));
} finally {
for (DiscoveryNode node : connectedNodes.keySet()) {
if (disconnectFromNode(node, channel, ExceptionsHelper.detailedMessage(failure))) {
// if we managed to find this channel and disconnect from it, then break, no need to check on
// the rest of the nodes
break;
try {
closeChannels(Collections.singletonList(channel));
} finally {
for (DiscoveryNode node : connectedNodes.keySet()) {
if (disconnectFromNode(node, channel, ExceptionsHelper.detailedMessage(failure))) {
// if we managed to find this channel and disconnect from it, then break, no need to check on
// the rest of the nodes
break;
}
}
}
} catch (IOException e) {
logger.warn("failed to close channel", e);
}
});
}
@ -479,7 +483,7 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
if (nodeChannels != null) {
try {
logger.debug("disconnecting from [{}] due to explicit disconnect call", node);
nodeChannels.close();
IOUtils.closeWhileHandlingException(nodeChannels);
} finally {
logger.trace("disconnected from [{}] due to explicit disconnect call", node);
transportServiceAdapter.raiseNodeDisconnected(node);
@ -766,7 +770,7 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
for (Iterator<NodeChannels> it = connectedNodes.values().iterator(); it.hasNext(); ) {
NodeChannels nodeChannels = it.next();
it.remove();
nodeChannels.close();
IOUtils.closeWhileHandlingException(nodeChannels);
}
for (Map.Entry<String, List<Channel>> entry : serverChannels.entrySet()) {
@ -782,7 +786,7 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
for (Iterator<NodeChannels> it = connectedNodes.values().iterator(); it.hasNext(); ) {
NodeChannels nodeChannels = it.next();
it.remove();
nodeChannels.close();
IOUtils.closeWhileHandlingException(nodeChannels);
}
}
@ -800,7 +804,7 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
}
}
protected void onException(Channel channel, Exception e) {
protected void onException(Channel channel, Exception e) throws IOException {
if (!lifecycle.started()) {
// ignore
return;
@ -845,28 +849,28 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
* @param name the profile name
* @param address the address to bind to
*/
protected abstract Channel bind(String name, InetSocketAddress address);
protected abstract Channel bind(String name, InetSocketAddress address) throws IOException;
/**
* Closes all channels in this list
*/
protected abstract void closeChannels(List<Channel> channel);
protected abstract void closeChannels(List<Channel> channel) throws IOException;
/**
* Connects to the given node in a light way. This means we are not creating multiple connections like we do
* for production connections. This connection is for pings or handshakes
*/
protected abstract NodeChannels connectToChannelsLight(DiscoveryNode node);
protected abstract NodeChannels connectToChannelsLight(DiscoveryNode node) throws IOException;
protected abstract void sendMessage(Channel channel, BytesReference reference, Runnable sendListener, boolean close);
protected abstract void sendMessage(Channel channel, BytesReference reference, Runnable sendListener, boolean close) throws IOException;
/**
* Connects to the node in a <tt>heavy</tt> way.
*
* @see #connectToChannelsLight(DiscoveryNode)
*/
protected abstract NodeChannels connectToChannels(DiscoveryNode node);
protected abstract NodeChannels connectToChannels(DiscoveryNode node) throws IOException;
/**
* Called to tear down internal resources
@ -880,9 +884,7 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
@Override
public void sendRequest(final DiscoveryNode node, final long requestId, final String action, final TransportRequest request,
TransportRequestOptions options) throws IOException, TransportException {
Channel targetChannel = nodeChannel(node, options);
if (compress) {
options = TransportRequestOptions.builder(options).withCompress(true).build();
}
@ -962,7 +964,6 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
if (compress) {
options = TransportResponseOptions.builder(options).withCompress(true).build();
}
byte status = 0;
status = TransportStatus.setResponse(status); // TODO share some code with sendRequest
ReleasableBytesStreamOutput bStream = new ReleasableBytesStreamOutput(bigArrays);
@ -1042,7 +1043,7 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
}
/**
* Validates the first N bytes of the message header and returns <code>true</code> if the message is
* Validates the first N bytes of the message header and returns <code>false</code> if the message is
* a ping message and has no payload ie. isn't a real user level message.
*
* @throws IllegalStateException if the message is too short, less than the header or less that the header plus the message size

View File

@ -66,21 +66,29 @@ public final class TcpTransportChannel<Channel> implements TransportChannel {
@Override
public void sendResponse(TransportResponse response, TransportResponseOptions options) throws IOException {
release();
transport.sendResponse(version, channel, response, requestId, action, options);
try {
transport.sendResponse(version, channel, response, requestId, action, options);
} finally {
release();
}
}
@Override
public void sendResponse(Exception exception) throws IOException {
release();
transport.sendErrorResponse(version, channel, exception, requestId, action);
try {
transport.sendErrorResponse(version, channel, exception, requestId, action);
} finally {
release();
}
}
private Exception releaseBy;
private void release() {
// attempt to release once atomically
if (released.compareAndSet(false, true) == false) {
throw new IllegalStateException("reserved bytes are already released");
throw new IllegalStateException("reserved bytes are already released", releaseBy);
} else {
assert (releaseBy = new Exception()) != null; // easier to debug if it's already closed
}
transport.getInFlightRequestBreaker().addWithoutBreaking(-reservedBytes);
}

View File

@ -63,6 +63,7 @@ import org.jboss.netty.channel.socket.oio.OioClientSocketChannelFactory;
import org.jboss.netty.channel.socket.oio.OioServerSocketChannelFactory;
import org.jboss.netty.util.HashedWheelTimer;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collections;
@ -336,7 +337,7 @@ public class NettyTransport extends TcpTransport<Channel> {
channels[0].getCloseFuture().addListener(new ChannelCloseListener(node));
return new NodeChannels(channels, channels, channels, channels, channels);
}
protected NodeChannels connectToChannels(DiscoveryNode node) {
protected NodeChannels connectToChannels(DiscoveryNode node) throws IOException {
final NodeChannels nodeChannels = new NodeChannels(new Channel[connectionsPerNodeRecovery], new Channel[connectionsPerNodeBulk],
new Channel[connectionsPerNodeReg], new Channel[connectionsPerNodeState],
new Channel[connectionsPerNodePing]);
@ -476,7 +477,9 @@ public class NettyTransport extends TcpTransport<Channel> {
public void operationComplete(final ChannelFuture future) throws Exception {
NodeChannels nodeChannels = connectedNodes.get(node);
if (nodeChannels != null && nodeChannels.hasChannel(future.getChannel())) {
threadPool.generic().execute(() -> disconnectFromNode(node, future.getChannel(), "channel closed event"));
threadPool.generic().execute(() -> {
disconnectFromNode(node, future.getChannel(), "channel closed event");
});
}
}
}

View File

@ -520,7 +520,6 @@ public class ClusterServiceIT extends ESIntegTestCase {
assertThat(clusterService.state().nodes().getMasterNode(), notNullValue());
assertThat(clusterService.state().nodes().isLocalNodeElectedMaster(), is(true));
assertThat(testService.master(), is(true));
String node_1 = internalCluster().startNode(settings);
final ClusterService clusterService1 = internalCluster().getInstance(ClusterService.class, node_1);
MasterAwareService testService1 = internalCluster().getInstance(MasterAwareService.class, node_1);

View File

@ -39,6 +39,7 @@ import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
import org.elasticsearch.test.ESIntegTestCase.Scope;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportSettings;
import org.elasticsearch.transport.Transports;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
@ -63,6 +64,7 @@ public class NettyTransportIT extends ESIntegTestCase {
protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder().put(super.nodeSettings(nodeOrdinal))
.put(Node.NODE_MODE_SETTING.getKey(), "network")
.put(NetworkModule.TRANSPORT_TYPE_KEY, "netty")
.put(NetworkModule.TRANSPORT_TYPE_KEY, "exception-throwing").build();
}

View File

@ -32,24 +32,22 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.network.NetworkModule;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.discovery.MasterNotDiscoveredException;
import org.elasticsearch.discovery.zen.ping.unicast.UnicastZenPing;
import org.elasticsearch.node.Node;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.NodeConfigurationSource;
import org.elasticsearch.test.TestCluster;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Before;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.function.Function;
@ -74,31 +72,18 @@ public class TribeIT extends ESIntegTestCase {
private Node tribeNode;
private Client tribeClient;
@BeforeClass
public static void setupSecondCluster() throws Exception {
ESIntegTestCase.beforeClass();
NodeConfigurationSource nodeConfigurationSource = new NodeConfigurationSource() {
@Override
public Settings nodeSettings(int nodeOrdinal) {
return Settings.builder().put(NetworkModule.HTTP_ENABLED.getKey(), false).build();
}
@Override
public Collection<Class<? extends Plugin>> nodePlugins() {
return Collections.emptyList();
}
@Override
public Settings transportClientSettings() {
return null;
}
};
cluster2 = new InternalTestCluster(InternalTestCluster.configuredNodeMode(), randomLong(), createTempDir(), true, 2, 2,
UUIDs.randomBase64UUID(random()), nodeConfigurationSource, 0, false, SECOND_CLUSTER_NODE_PREFIX, Collections.emptyList(), Function.identity());
cluster2.beforeTest(random(), 0.1);
cluster2.ensureAtLeastNumDataNodes(2);
@Before
public void setupSecondCluster() throws Exception {
if (cluster2 == null) {
final Tuple<String, NodeConfigurationSource> configSource = getNodeConfigSource();
final String nodeMode = configSource.v1();
final NodeConfigurationSource nodeConfigurationSource = configSource.v2();
cluster2 = new InternalTestCluster(nodeMode, randomLong(), createTempDir(), true, 2, 2,
UUIDs.randomBase64UUID(random()), nodeConfigurationSource, 0, false, SECOND_CLUSTER_NODE_PREFIX, getMockPlugins(),
Function.identity());
cluster2.beforeTest(random(), 0.1);
cluster2.ensureAtLeastNumDataNodes(2);
}
}
@AfterClass

View File

@ -29,6 +29,7 @@ import org.apache.lucene.util.LuceneTestCase;
import org.apache.lucene.util.TestUtil;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.transport.MockTcpTransportPlugin;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ShardOperationFailedException;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
@ -1707,28 +1708,7 @@ public abstract class ESIntegTestCase extends ESTestCase {
default:
throw new ElasticsearchException("Scope not supported: " + scope);
}
NodeConfigurationSource nodeConfigurationSource = new NodeConfigurationSource() {
@Override
public Settings nodeSettings(int nodeOrdinal) {
return Settings.builder().put(NetworkModule.HTTP_ENABLED.getKey(), false).
put(ESIntegTestCase.this.nodeSettings(nodeOrdinal)).build();
}
@Override
public Collection<Class<? extends Plugin>> nodePlugins() {
return ESIntegTestCase.this.nodePlugins();
}
@Override
public Settings transportClientSettings() {
return ESIntegTestCase.this.transportClientSettings();
}
@Override
public Collection<Class<? extends Plugin>> transportClientPlugins() {
return ESIntegTestCase.this.transportClientPlugins();
}
};
boolean supportsDedicatedMasters = getSupportsDedicatedMasters();
int numDataNodes = getNumDataNodes();
@ -1740,22 +1720,71 @@ public abstract class ESIntegTestCase extends ESTestCase {
minNumDataNodes = getMinNumDataNodes();
maxNumDataNodes = getMaxNumDataNodes();
}
Collection<Class<? extends Plugin>> mockPlugins = getMockPlugins();
Tuple<String, NodeConfigurationSource> configSource = getNodeConfigSource();
final String nodeMode = configSource.v1();
final NodeConfigurationSource nodeConfigurationSource = configSource.v2();
return new InternalTestCluster(nodeMode, seed, createTempDir(), supportsDedicatedMasters, minNumDataNodes, maxNumDataNodes,
InternalTestCluster.clusterName(scope.name(), seed) + "-cluster", nodeConfigurationSource, getNumClientNodes(),
InternalTestCluster.DEFAULT_ENABLE_HTTP_PIPELINING, nodePrefix, mockPlugins, getClientWrapper());
}
protected Tuple<String, NodeConfigurationSource> getNodeConfigSource() {
SuppressLocalMode noLocal = getAnnotation(this.getClass(), SuppressLocalMode.class);
SuppressNetworkMode noNetwork = getAnnotation(this.getClass(), SuppressNetworkMode.class);
String nodeMode = InternalTestCluster.configuredNodeMode();
Settings.Builder networkSettings = Settings.builder();
if (noLocal != null && noNetwork != null) {
throw new IllegalStateException("Can't suppress both network and local mode");
} else if (noLocal != null) {
nodeMode = "network";
if (addMockTransportService()) {
networkSettings.put(NetworkModule.TRANSPORT_TYPE_KEY, MockTcpTransportPlugin.MOCK_TCP_TRANSPORT_NAME);
}
} else if (noNetwork != null) {
nodeMode = "local";
if (addMockTransportService()) {
networkSettings.put(NetworkModule.TRANSPORT_TYPE_KEY, AssertingLocalTransport.ASSERTING_TRANSPORT_NAME);
}
}
final boolean isNetwork = "network".equals(nodeMode);
NodeConfigurationSource nodeConfigurationSource = new NodeConfigurationSource() {
@Override
public Settings nodeSettings(int nodeOrdinal) {
return Settings.builder().put(NetworkModule.HTTP_ENABLED.getKey(), false).put(networkSettings.build()).
put(ESIntegTestCase.this.nodeSettings(nodeOrdinal)).build();
}
Collection<Class<? extends Plugin>> mockPlugins = getMockPlugins();
@Override
public Collection<Class<? extends Plugin>> nodePlugins() {
return ESIntegTestCase.this.nodePlugins();
}
return new InternalTestCluster(nodeMode, seed, createTempDir(), supportsDedicatedMasters, minNumDataNodes, maxNumDataNodes,
InternalTestCluster.clusterName(scope.name(), seed) + "-cluster", nodeConfigurationSource, getNumClientNodes(),
InternalTestCluster.DEFAULT_ENABLE_HTTP_PIPELINING, nodePrefix, mockPlugins, getClientWrapper());
@Override
public Settings transportClientSettings() {
return Settings.builder().put(networkSettings.build())
.put(ESIntegTestCase.this.transportClientSettings()).build();
}
@Override
public Collection<Class<? extends Plugin>> transportClientPlugins() {
Collection<Class<? extends Plugin>> plugins = ESIntegTestCase.this.transportClientPlugins();
if (isNetwork) {
plugins = new ArrayList<>(plugins);
plugins.add(MockTcpTransportPlugin.class);
}
return Collections.unmodifiableCollection(plugins);
}
};
return new Tuple<>(nodeMode, nodeConfigurationSource);
}
/**
* Iff this returns true mock transport implementations are used for the test runs. Otherwise not mock transport impls are used.
* The defautl is <tt>true</tt>
*/
protected boolean addMockTransportService() {
return true;
}
/**
@ -1771,7 +1800,7 @@ public abstract class ESIntegTestCase extends ESTestCase {
protected Collection<Class<? extends Plugin>> getMockPlugins() {
final ArrayList<Class<? extends Plugin>> mocks = new ArrayList<>();
if (randomBoolean()) { // sometimes run without those completely
if (randomBoolean()) {
if (randomBoolean() && addMockTransportService()) {
mocks.add(MockTransportService.TestPlugin.class);
}
if (randomBoolean()) {
@ -1786,11 +1815,13 @@ public abstract class ESIntegTestCase extends ESTestCase {
if (randomBoolean()) {
mocks.add(MockSearchService.TestPlugin.class);
}
if (randomBoolean()) {
mocks.add(AssertingLocalTransport.TestPlugin.class);
}
}
mocks.add(TestSeedPlugin.class);
if (addMockTransportService()) {
// add both mock plugins - local and tcp
mocks.add(AssertingLocalTransport.TestPlugin.class);
mocks.add(MockTcpTransportPlugin.class);
}
return Collections.unmodifiableList(mocks);
}
@ -1799,7 +1830,6 @@ public abstract class ESIntegTestCase extends ESTestCase {
public List<Setting<?>> getSettings() {
return Arrays.asList(INDEX_TEST_SEED_SETTING);
}
}
/**

View File

@ -386,10 +386,6 @@ public final class InternalTestCluster extends TestCluster {
private Collection<Class<? extends Plugin>> getPlugins() {
Set<Class<? extends Plugin>> plugins = new HashSet<>(nodeConfigurationSource.nodePlugins());
plugins.addAll(mockPlugins);
if (isLocalTransportConfigured() == false) {
// this is crazy we must do this here...we should really just always be using local transport...
plugins.remove(AssertingLocalTransport.TestPlugin.class);
}
return plugins;
}

View File

@ -47,13 +47,11 @@ import java.util.Random;
public class AssertingLocalTransport extends LocalTransport {
public static final String ASSERTING_TRANSPORT_NAME = "asserting_local";
public static class TestPlugin extends Plugin {
public void onModule(NetworkModule module) {
module.registerTransport("mock", AssertingLocalTransport.class);
}
@Override
public Settings additionalSettings() {
return Settings.builder().put(NetworkModule.TRANSPORT_TYPE_KEY, "mock").build();
module.registerTransport(ASSERTING_TRANSPORT_NAME, AssertingLocalTransport.class);
}
@Override

View File

@ -0,0 +1,339 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.transport;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.InputStreamStreamInput;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.CancellableThreads;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.Closeable;
import java.io.IOException;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
/**
* This is a socket based blocking TcpTransport implementation that is used for tests
* that need real networking. This implementation is a test only implementation that implements
* the networking layer in the worst possible way since it blocks and uses a thread per request model.
*/
public class MockTcpTransport extends TcpTransport<MockTcpTransport.MockChannel> {
private final ExecutorService executor;
private final Version mockVersion;
@Inject
public MockTcpTransport(String transportName, Settings settings, ThreadPool threadPool, BigArrays bigArrays,
CircuitBreakerService circuitBreakerService, NamedWriteableRegistry namedWriteableRegistry,
NetworkService networkService) {
this(transportName, settings, threadPool, bigArrays, circuitBreakerService, namedWriteableRegistry, networkService,
Version.CURRENT);
}
public MockTcpTransport(String transportName, Settings settings, ThreadPool threadPool, BigArrays bigArrays,
CircuitBreakerService circuitBreakerService, NamedWriteableRegistry namedWriteableRegistry,
NetworkService networkService, Version mockVersion) {
super(transportName, settings, threadPool, bigArrays, circuitBreakerService, namedWriteableRegistry, networkService);
// we have our own crazy cached threadpool this one is not bounded at all...
// using the ES thread factory here is crucial for tests otherwise disruption tests won't block that thread
executor = Executors.newCachedThreadPool(EsExecutors.daemonThreadFactory(settings, Transports.TEST_MOCK_TRANSPORT_THREAD_PREFIX));
this.mockVersion = mockVersion;
}
@Override
protected InetSocketAddress getLocalAddress(MockChannel mockChannel) {
return mockChannel.localAddress;
}
@Override
protected MockChannel bind(final String name, InetSocketAddress address) throws IOException {
ServerSocket socket = new ServerSocket();
socket.bind(address);
socket.setReuseAddress(TCP_REUSE_ADDRESS.get(settings()));
ByteSizeValue tcpReceiveBufferSize = TCP_RECEIVE_BUFFER_SIZE.get(settings);
if (tcpReceiveBufferSize.bytes() > 0) {
socket.setReceiveBufferSize(tcpReceiveBufferSize.bytesAsInt());
}
MockChannel serverMockChannel = new MockChannel(socket, name);
CountDownLatch started = new CountDownLatch(1);
executor.execute(() -> {
try {
started.countDown();
serverMockChannel.accept(executor);
} catch (IOException e) {
try {
onException(serverMockChannel, e);
} catch (IOException ex) {
logger.warn("failed on handling exception", ex);
}
}
});
try {
started.await();
} catch (InterruptedException e) {
Thread.interrupted();
}
return serverMockChannel;
}
private void readMessage(MockChannel mockChannel, StreamInput input) throws IOException {
Socket socket = mockChannel.activeChannel;
byte[] minimalHeader = new byte[TcpHeader.MARKER_BYTES_SIZE];
int firstByte = input.read();
if (firstByte == -1) {
throw new IOException("Connection reset by peer");
}
minimalHeader[0] = (byte) firstByte;
minimalHeader[1] = (byte) input.read();
int msgSize = input.readInt();
if (msgSize == -1) {
socket.getOutputStream().flush();
} else {
BytesStreamOutput output = new BytesStreamOutput();
final byte[] buffer = new byte[msgSize];
input.readFully(buffer);
output.write(minimalHeader);
output.writeInt(msgSize);
output.write(buffer);
BytesReference bytes = output.bytes();
if (validateMessageHeader(bytes)) {
InetSocketAddress remoteAddress = (InetSocketAddress) socket.getRemoteSocketAddress();
messageReceived(bytes.slice(TcpHeader.MARKER_BYTES_SIZE + TcpHeader.MESSAGE_LENGTH_SIZE, msgSize),
mockChannel, mockChannel.profile, remoteAddress, msgSize);
} else {
// ping message - we just drop all stuff
}
}
}
@Override
protected NodeChannels connectToChannelsLight(DiscoveryNode node) throws IOException {
return connectToChannels(node);
}
@Override
protected NodeChannels connectToChannels(DiscoveryNode node) throws IOException {
final NodeChannels nodeChannels = new NodeChannels(new MockChannel[1],
new MockChannel[1],
new MockChannel[1],
new MockChannel[1],
new MockChannel[1]);
boolean success = false;
final Socket socket = new Socket();
try {
Consumer<MockChannel> onClose = (channel) -> {
final NodeChannels connected = connectedNodes.get(node);
if (connected != null && connected.hasChannel(channel)) {
executor.execute(() -> {
disconnectFromNode(node, channel, "channel closed event");
});
}
};
InetSocketAddress address = ((InetSocketTransportAddress) node.getAddress()).address();
// we just use a single connections
configureSocket(socket);
socket.connect(address, (int) TCP_CONNECT_TIMEOUT.get(settings).millis());
MockChannel channel = new MockChannel(socket, address, "none", onClose);
channel.loopRead(executor);
for (MockChannel[] channels : nodeChannels.getChannelArrays()) {
for (int i = 0; i < channels.length; i++) {
channels[i] = channel;
}
}
success = true;
} finally {
if (success == false) {
IOUtils.close(nodeChannels, socket);
}
}
return nodeChannels;
}
private void configureSocket(Socket socket) throws SocketException {
socket.setTcpNoDelay(TCP_NO_DELAY.get(settings));
ByteSizeValue tcpSendBufferSize = TCP_SEND_BUFFER_SIZE.get(settings);
if (tcpSendBufferSize.bytes() > 0) {
socket.setSendBufferSize(tcpSendBufferSize.bytesAsInt());
}
ByteSizeValue tcpReceiveBufferSize = TCP_RECEIVE_BUFFER_SIZE.get(settings);
if (tcpReceiveBufferSize.bytes() > 0) {
socket.setReceiveBufferSize(tcpReceiveBufferSize.bytesAsInt());
}
socket.setReuseAddress(TCP_REUSE_ADDRESS.get(settings()));
}
@Override
protected boolean isOpen(MockChannel mockChannel) {
return mockChannel.isOpen.get();
}
@Override
protected void sendMessage(MockChannel mockChannel, BytesReference reference, Runnable sendListener, boolean close) throws IOException {
synchronized (mockChannel) {
final Socket socket = mockChannel.activeChannel;
OutputStream outputStream = new BufferedOutputStream(socket.getOutputStream());
reference.writeTo(outputStream);
outputStream.flush();
}
if (sendListener != null) {
sendListener.run();
}
if (close) {
IOUtils.closeWhileHandlingException(mockChannel);
}
}
@Override
protected void closeChannels(List<MockChannel> channel) throws IOException {
IOUtils.close(channel);
}
@Override
public long serverOpen() {
return 1;
}
public final class MockChannel implements Closeable {
private final AtomicBoolean isOpen = new AtomicBoolean(true);
private final InetSocketAddress localAddress;
private final ServerSocket serverSocket;
private final ConcurrentHashMap<MockChannel, Boolean> workerChannels = new ConcurrentHashMap<>();
private final Socket activeChannel;
private final String profile;
private final CancellableThreads cancellableThreads = new CancellableThreads();
private final Closeable onClose;
public MockChannel(Socket socket, InetSocketAddress localAddress, String profile, Consumer<MockChannel> onClose) {
this.localAddress = localAddress;
this.activeChannel = socket;
this.serverSocket = null;
this.profile = profile;
this.onClose = () -> onClose.accept(this);
}
public void accept(Executor executor) throws IOException {
while (isOpen.get()) {
Socket accept = serverSocket.accept();
configureSocket(accept);
MockChannel mockChannel = new MockChannel(accept, localAddress, profile, workerChannels::remove);
workerChannels.put(mockChannel, Boolean.TRUE);
mockChannel.loopRead(executor);
}
}
public void loopRead(Executor executor) {
executor.execute(() -> {
try {
StreamInput input = new InputStreamStreamInput(new BufferedInputStream(activeChannel.getInputStream()));
while(isOpen.get()) {
cancellableThreads.executeIO(() -> readMessage(this, input));
}
} catch (Exception e) {
if (isOpen.get()) {
try {
onException(this, e);
} catch (IOException ex) {
logger.warn("failed on handling exception", ex);
}
}
}
});
}
public MockChannel(ServerSocket serverSocket, String profile) {
this.localAddress = (InetSocketAddress) serverSocket.getLocalSocketAddress();
this.serverSocket = serverSocket;
this.profile = profile;
this.activeChannel = null;
this.onClose = null;
}
@Override
public void close() throws IOException {
if (isOpen.compareAndSet(true, false)) {
IOUtils.close( () -> cancellableThreads.cancel("channel closed"), serverSocket, activeChannel,
() -> IOUtils.close(workerChannels.keySet()), onClose);
}
}
}
@Override
protected void doStart() {
boolean success = false;
try {
if (NetworkService.NETWORK_SERVER.get(settings)) {
// loop through all profiles and start them up, special handling for default one
for (Map.Entry<String, Settings> entry : buildProfileSettings().entrySet()) {
final Settings settings = Settings.builder()
.put(entry.getValue()).build();
bindServer(entry.getKey(), settings);
}
}
super.doStart();
success = true;
} finally {
if (success == false) {
doStop();
}
}
}
@Override
protected void stopInternal() {
ThreadPool.terminate(executor, 10, TimeUnit.SECONDS);
}
@Override
protected Version getCurrentVersion() {
return mockVersion;
}
}

View File

@ -0,0 +1,31 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.transport;
import org.elasticsearch.common.network.NetworkModule;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.plugins.Plugin;
public class MockTcpTransportPlugin extends Plugin {
public static final String MOCK_TCP_TRANSPORT_NAME = "mock-socket-network";
public void onModule(NetworkModule module) {
module.registerTransport(MOCK_TCP_TRANSPORT_NAME, MockTcpTransport.class);
}
}

View File

@ -0,0 +1,39 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.transport;
import org.elasticsearch.Version;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.test.transport.MockTransportService;
public class MockTcpTransportTests extends AbstractSimpleTransportTestCase {
@Override
protected MockTransportService build(Settings settings, Version version) {
NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry();
Transport transport = new MockTcpTransport("mock", settings, threadPool, BigArrays.NON_RECYCLING_INSTANCE,
new NoneCircuitBreakerService(), namedWriteableRegistry, new NetworkService(settings), version);
MockTransportService mockTransportService = new MockTransportService(Settings.EMPTY, transport, threadPool);
mockTransportService.start();
return mockTransportService;
}
}