Remove the MockTcpTransport (#36628)

This commit removes all remaining usages of the `MockTcpTransport`.
Additionally it removes the `MockTcpTransport` and its test case.
This commit is contained in:
Tim Brooks 2018-12-14 10:59:07 -07:00 committed by GitHub
parent bb3ae18da5
commit fbf88b2ab7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 130 additions and 609 deletions

View File

@ -20,19 +20,20 @@
package org.elasticsearch.discovery.ec2;
import com.amazonaws.services.ec2.model.Tag;
import org.elasticsearch.Version;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.PageCacheRecycler;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.MockTcpTransport;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.transport.nio.MockNioTransport;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
@ -72,8 +73,9 @@ public class Ec2DiscoveryTests extends ESTestCase {
@Before
public void createTransportService() {
NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(Collections.emptyList());
final Transport transport = new MockTcpTransport(Settings.EMPTY, threadPool, BigArrays.NON_RECYCLING_INSTANCE,
new NoneCircuitBreakerService(), namedWriteableRegistry, new NetworkService(Collections.emptyList())) {
final Transport transport = new MockNioTransport(Settings.EMPTY, Version.CURRENT, threadPool,
new NetworkService(Collections.emptyList()), PageCacheRecycler.NON_RECYCLING_INSTANCE, namedWriteableRegistry,
new NoneCircuitBreakerService()) {
@Override
public TransportAddress[] addressesFromString(String address, int perAddressLimit) throws UnknownHostException {
// we just need to ensure we don't resolve DNS here

View File

@ -41,15 +41,15 @@ import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.BoundTransportAddress;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.PageCacheRecycler;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.tasks.MockTaskManager;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.MockTcpTransport;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.transport.nio.MockNioTransport;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@ -174,9 +174,9 @@ public abstract class TaskManagerTestCase extends ESTestCase {
return discoveryNode.get();
};
transportService = new TransportService(settings,
new MockTcpTransport(settings, threadPool, BigArrays.NON_RECYCLING_INSTANCE, new NoneCircuitBreakerService(),
new NamedWriteableRegistry(ClusterModule.getNamedWriteables()),
new NetworkService(Collections.emptyList())),
new MockNioTransport(settings, Version.CURRENT, threadPool, new NetworkService(Collections.emptyList()),
PageCacheRecycler.NON_RECYCLING_INSTANCE, new NamedWriteableRegistry(ClusterModule.getNamedWriteables()),
new NoneCircuitBreakerService()),
threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, boundTransportAddressDiscoveryNodeFunction, null,
Collections.emptySet()) {
@Override

View File

@ -18,6 +18,7 @@
*/
package org.elasticsearch.action.resync;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.PlainActionFuture;
@ -34,7 +35,7 @@ import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.PageCacheRecycler;
import org.elasticsearch.discovery.DiscoverySettings;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexService;
@ -49,7 +50,7 @@ import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.MockTcpTransport;
import org.elasticsearch.transport.nio.MockNioTransport;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@ -99,8 +100,9 @@ public class TransportResyncReplicationActionTests extends ESTestCase {
.addGlobalBlock(DiscoverySettings.NO_MASTER_BLOCK_ALL)
.addIndexBlock(indexName, IndexMetaData.INDEX_WRITE_BLOCK)));
try (MockTcpTransport transport = new MockTcpTransport(Settings.EMPTY, threadPool, BigArrays.NON_RECYCLING_INSTANCE,
new NoneCircuitBreakerService(), new NamedWriteableRegistry(emptyList()), new NetworkService(emptyList()))) {
try (MockNioTransport transport = new MockNioTransport(Settings.EMPTY, Version.CURRENT, threadPool,
new NetworkService(emptyList()), PageCacheRecycler.NON_RECYCLING_INSTANCE, new NamedWriteableRegistry(emptyList()),
new NoneCircuitBreakerService())) {
final MockTransportService transportService = new MockTransportService(Settings.EMPTY, transport, threadPool,
NOOP_TRANSPORT_INTERCEPTOR, x -> clusterService.localNode(), null, Collections.emptySet());

View File

@ -18,6 +18,7 @@
*/
package org.elasticsearch.action.support.replication;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.NoShardAvailableActionException;
import org.elasticsearch.action.UnavailableShardsException;
@ -38,7 +39,7 @@ import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.PageCacheRecycler;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.index.shard.ShardId;
@ -49,8 +50,8 @@ import org.elasticsearch.tasks.Task;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.MockTcpTransport;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.transport.nio.MockNioTransport;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
@ -92,9 +93,9 @@ public class BroadcastReplicationTests extends ESTestCase {
@Before
public void setUp() throws Exception {
super.setUp();
MockTcpTransport transport = new MockTcpTransport(Settings.EMPTY,
threadPool, BigArrays.NON_RECYCLING_INSTANCE, circuitBreakerService, new NamedWriteableRegistry(Collections.emptyList()),
new NetworkService(Collections.emptyList()));
MockNioTransport transport = new MockNioTransport(Settings.EMPTY, Version.CURRENT,
threadPool, new NetworkService(Collections.emptyList()), PageCacheRecycler.NON_RECYCLING_INSTANCE,
new NamedWriteableRegistry(Collections.emptyList()), circuitBreakerService);
clusterService = createClusterService(threadPool);
transportService = new TransportService(clusterService.getSettings(), transport, threadPool,
TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> clusterService.localNode(), null, Collections.emptySet());

View File

@ -21,6 +21,7 @@ package org.elasticsearch.action.support.replication;
import org.apache.lucene.store.AlreadyClosedException;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.UnavailableShardsException;
import org.elasticsearch.action.admin.indices.close.CloseIndexRequest;
@ -56,7 +57,7 @@ import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.PageCacheRecycler;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.IndexService;
@ -75,13 +76,13 @@ import org.elasticsearch.test.transport.CapturingTransport;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.MockTcpTransport;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.transport.nio.MockNioTransport;
import org.hamcrest.Matcher;
import org.junit.After;
import org.junit.AfterClass;
@ -1064,8 +1065,9 @@ public class TransportReplicationActionTests extends ESTestCase {
AtomicBoolean throwException = new AtomicBoolean(true);
final ReplicationTask task = maybeTask();
NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(Collections.emptyList());
final Transport transport = new MockTcpTransport(Settings.EMPTY, threadPool, BigArrays.NON_RECYCLING_INSTANCE,
new NoneCircuitBreakerService(), namedWriteableRegistry, new NetworkService(Collections.emptyList()));
final Transport transport = new MockNioTransport(Settings.EMPTY, Version.CURRENT, threadPool,
new NetworkService(Collections.emptyList()), PageCacheRecycler.NON_RECYCLING_INSTANCE, namedWriteableRegistry,
new NoneCircuitBreakerService());
transportService = new MockTransportService(Settings.EMPTY, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR,
x -> clusterService.localNode(), null, Collections.emptySet());
transportService.start();

View File

@ -19,21 +19,22 @@
package org.elasticsearch.discovery.zen;
import org.elasticsearch.Version;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.BoundTransportAddress;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.PageCacheRecycler;
import org.elasticsearch.env.Environment;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.MockTcpTransport;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.transport.nio.MockNioTransport;
import org.junit.After;
import org.junit.Before;
@ -78,10 +79,11 @@ public class FileBasedUnicastHostsProviderTests extends ESTestCase {
@Before
public void createTransportSvc() {
final MockTcpTransport transport = new MockTcpTransport(Settings.EMPTY, threadPool, BigArrays.NON_RECYCLING_INSTANCE,
new NoneCircuitBreakerService(),
final MockNioTransport transport = new MockNioTransport(Settings.EMPTY, Version.CURRENT, threadPool,
new NetworkService(Collections.emptyList()),
PageCacheRecycler.NON_RECYCLING_INSTANCE,
new NamedWriteableRegistry(Collections.emptyList()),
new NetworkService(Collections.emptyList())) {
new NoneCircuitBreakerService()) {
@Override
public BoundTransportAddress boundAddress() {
return new BoundTransportAddress(

View File

@ -37,7 +37,6 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.BoundTransportAddress;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.PageCacheRecycler;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
@ -49,7 +48,6 @@ import org.elasticsearch.test.VersionUtils;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.MockTcpTransport;
import org.elasticsearch.transport.TcpTransport;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportConnectionListener;
@ -374,13 +372,14 @@ public class UnicastZenPingTests extends ESTestCase {
public void testPortLimit() throws InterruptedException {
final NetworkService networkService = new NetworkService(Collections.emptyList());
final Transport transport = new MockTcpTransport(
final Transport transport = new MockNioTransport(
Settings.EMPTY,
Version.CURRENT,
threadPool,
BigArrays.NON_RECYCLING_INSTANCE,
new NoneCircuitBreakerService(),
networkService,
PageCacheRecycler.NON_RECYCLING_INSTANCE,
new NamedWriteableRegistry(Collections.emptyList()),
networkService) {
new NoneCircuitBreakerService()) {
@Override
public BoundTransportAddress boundAddress() {
@ -415,13 +414,14 @@ public class UnicastZenPingTests extends ESTestCase {
public void testRemovingLocalAddresses() throws InterruptedException {
final NetworkService networkService = new NetworkService(Collections.emptyList());
final InetAddress loopbackAddress = InetAddress.getLoopbackAddress();
final Transport transport = new MockTcpTransport(
final Transport transport = new MockNioTransport(
Settings.EMPTY,
Version.CURRENT,
threadPool,
BigArrays.NON_RECYCLING_INSTANCE,
new NoneCircuitBreakerService(),
networkService,
PageCacheRecycler.NON_RECYCLING_INSTANCE,
new NamedWriteableRegistry(Collections.emptyList()),
networkService) {
new NoneCircuitBreakerService()) {
@Override
public BoundTransportAddress boundAddress() {
@ -460,13 +460,14 @@ public class UnicastZenPingTests extends ESTestCase {
final NetworkService networkService = new NetworkService(Collections.emptyList());
final String hostname = randomAlphaOfLength(8);
final UnknownHostException unknownHostException = new UnknownHostException(hostname);
final Transport transport = new MockTcpTransport(
final Transport transport = new MockNioTransport(
Settings.EMPTY,
Version.CURRENT,
threadPool,
BigArrays.NON_RECYCLING_INSTANCE,
new NoneCircuitBreakerService(),
networkService,
PageCacheRecycler.NON_RECYCLING_INSTANCE,
new NamedWriteableRegistry(Collections.emptyList()),
networkService) {
new NoneCircuitBreakerService()) {
@Override
public BoundTransportAddress boundAddress() {
@ -506,13 +507,14 @@ public class UnicastZenPingTests extends ESTestCase {
final Logger logger = mock(Logger.class);
final NetworkService networkService = new NetworkService(Collections.emptyList());
final CountDownLatch latch = new CountDownLatch(1);
final Transport transport = new MockTcpTransport(
final Transport transport = new MockNioTransport(
Settings.EMPTY,
Version.CURRENT,
threadPool,
BigArrays.NON_RECYCLING_INSTANCE,
new NoneCircuitBreakerService(),
networkService,
PageCacheRecycler.NON_RECYCLING_INSTANCE,
new NamedWriteableRegistry(Collections.emptyList()),
networkService) {
new NoneCircuitBreakerService()) {
@Override
public BoundTransportAddress boundAddress() {
@ -571,13 +573,14 @@ public class UnicastZenPingTests extends ESTestCase {
NetworkService networkService = new NetworkService(Collections.emptyList());
final BiFunction<Settings, Version, Transport> supplier = (s, v) -> new MockTcpTransport(
final BiFunction<Settings, Version, Transport> supplier = (s, v) -> new MockNioTransport(
s,
Version.CURRENT,
threadPool,
BigArrays.NON_RECYCLING_INSTANCE,
new NoneCircuitBreakerService(),
networkService,
PageCacheRecycler.NON_RECYCLING_INSTANCE,
new NamedWriteableRegistry(Collections.emptyList()),
networkService);
new NoneCircuitBreakerService());
NetworkHandle handleA = startServices(settings, threadPool, "UZP_A", Version.CURRENT, supplier, EnumSet.allOf(Role.class));
closeables.push(handleA.transportService);

View File

@ -223,7 +223,7 @@ public class TcpTransportTests extends ESTestCase {
};
DiscoveryNode node = new DiscoveryNode("foo", buildNewFakeTransportAddress(), Version.CURRENT);
ConnectionProfile.Builder profileBuilder = new ConnectionProfile.Builder(MockTcpTransport.LIGHT_PROFILE);
ConnectionProfile.Builder profileBuilder = new ConnectionProfile.Builder(TestProfiles.LIGHT_PROFILE);
if (compressed) {
profileBuilder.setCompressionEnabled(true);
} else {

View File

@ -26,13 +26,14 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.PageCacheRecycler;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.VersionUtils;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.nio.MockNioTransport;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@ -60,14 +61,10 @@ public class TransportServiceHandshakeTests extends ESTestCase {
private List<TransportService> transportServices = new ArrayList<>();
private NetworkHandle startServices(String nodeNameAndId, Settings settings, Version version) {
MockTcpTransport transport =
new MockTcpTransport(
settings,
threadPool,
BigArrays.NON_RECYCLING_INSTANCE,
new NoneCircuitBreakerService(),
new NamedWriteableRegistry(Collections.emptyList()),
new NetworkService(Collections.emptyList()));
MockNioTransport transport =
new MockNioTransport(settings, Version.CURRENT, threadPool, new NetworkService(Collections.emptyList()),
PageCacheRecycler.NON_RECYCLING_INSTANCE, new NamedWriteableRegistry(Collections.emptyList()),
new NoneCircuitBreakerService());
TransportService transportService = new MockTransportService(settings, transport, threadPool,
TransportService.NOOP_TRANSPORT_INTERCEPTOR, (boundAddress) -> new DiscoveryNode(
nodeNameAndId,
@ -112,7 +109,7 @@ public class TransportServiceHandshakeTests extends ESTestCase {
emptyMap(),
emptySet(),
Version.CURRENT.minimumCompatibilityVersion());
try (Transport.Connection connection = handleA.transportService.openConnection(discoveryNode, MockTcpTransport.LIGHT_PROFILE)){
try (Transport.Connection connection = handleA.transportService.openConnection(discoveryNode, TestProfiles.LIGHT_PROFILE)){
DiscoveryNode connectedNode = handleA.transportService.handshake(connection, timeout);
assertNotNull(connectedNode);
// the name and version should be updated
@ -134,7 +131,7 @@ public class TransportServiceHandshakeTests extends ESTestCase {
Version.CURRENT.minimumCompatibilityVersion());
IllegalStateException ex = expectThrows(IllegalStateException.class, () -> {
try (Transport.Connection connection = handleA.transportService.openConnection(discoveryNode,
MockTcpTransport.LIGHT_PROFILE)) {
TestProfiles.LIGHT_PROFILE)) {
handleA.transportService.handshake(connection, timeout);
}
});
@ -155,7 +152,7 @@ public class TransportServiceHandshakeTests extends ESTestCase {
Version.CURRENT.minimumCompatibilityVersion());
IllegalStateException ex = expectThrows(IllegalStateException.class, () -> {
try (Transport.Connection connection = handleA.transportService.openConnection(discoveryNode,
MockTcpTransport.LIGHT_PROFILE)) {
TestProfiles.LIGHT_PROFILE)) {
handleA.transportService.handshake(connection, timeout);
}
});
@ -174,7 +171,7 @@ public class TransportServiceHandshakeTests extends ESTestCase {
emptySet(),
handleB.discoveryNode.getVersion());
ConnectTransportException ex = expectThrows(ConnectTransportException.class, () -> {
handleA.transportService.connectToNode(discoveryNode, MockTcpTransport.LIGHT_PROFILE);
handleA.transportService.connectToNode(discoveryNode, TestProfiles.LIGHT_PROFILE);
});
assertThat(ex.getMessage(), containsString("unexpected remote node"));
assertFalse(handleA.transportService.nodeConnected(discoveryNode));
@ -192,7 +189,7 @@ public class TransportServiceHandshakeTests extends ESTestCase {
emptySet(),
handleB.discoveryNode.getVersion());
handleA.transportService.connectToNode(discoveryNode, MockTcpTransport.LIGHT_PROFILE);
handleA.transportService.connectToNode(discoveryNode, TestProfiles.LIGHT_PROFILE);
assertTrue(handleA.transportService.nodeConnected(discoveryNode));
}
@ -210,7 +207,7 @@ public class TransportServiceHandshakeTests extends ESTestCase {
emptySet(),
handleB.discoveryNode.getVersion());
ConnectTransportException ex = expectThrows(ConnectTransportException.class, () -> {
handleA.transportService.connectToNode(discoveryNode, MockTcpTransport.LIGHT_PROFILE);
handleA.transportService.connectToNode(discoveryNode, TestProfiles.LIGHT_PROFILE);
});
assertThat(ex.getMessage(), containsString("unexpected remote node"));
assertFalse(handleA.transportService.nodeConnected(discoveryNode));

View File

@ -42,7 +42,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.BoundTransportAddress;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.PageCacheRecycler;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.core.internal.io.IOUtils;
@ -56,6 +56,7 @@ import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.test.transport.StubbableTransport;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.nio.MockNioTransport;
import org.junit.After;
import org.junit.Before;
@ -1521,7 +1522,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
// all is well
}
expectThrows(ConnectTransportException.class, () -> serviceB.openConnection(nodeA, MockTcpTransport.LIGHT_PROFILE));
expectThrows(ConnectTransportException.class, () -> serviceB.openConnection(nodeA, TestProfiles.LIGHT_PROFILE));
}
public void testMockUnresponsiveRule() throws IOException {
@ -1572,7 +1573,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
// all is well
}
expectThrows(ConnectTransportException.class, () -> serviceB.openConnection(nodeA, MockTcpTransport.LIGHT_PROFILE));
expectThrows(ConnectTransportException.class, () -> serviceB.openConnection(nodeA, TestProfiles.LIGHT_PROFILE));
}
@ -2033,8 +2034,9 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
TcpTransport originalTransport = (TcpTransport) serviceA.getOriginalTransport();
NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(Collections.emptyList());
MockTcpTransport transport = new MockTcpTransport(Settings.EMPTY, threadPool, BigArrays.NON_RECYCLING_INSTANCE,
new NoneCircuitBreakerService(), namedWriteableRegistry, new NetworkService(Collections.emptyList())) {
MockNioTransport transport = new MockNioTransport(Settings.EMPTY, Version.CURRENT, threadPool,
new NetworkService(Collections.emptyList()), PageCacheRecycler.NON_RECYCLING_INSTANCE, namedWriteableRegistry,
new NoneCircuitBreakerService()) {
@Override
protected String handleRequest(TcpChannel mockChannel, String profileName, StreamInput stream, long requestId,
int messageLengthBytes, Version version, InetSocketAddress remoteAddress, byte status)

View File

@ -1,477 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.transport;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cli.SuppressForbidden;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.concurrent.CompletableContext;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.InputStreamStreamInput;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.CancellableThreads;
import org.elasticsearch.common.util.PageCacheRecycler;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.mocksocket.MockServerSocket;
import org.elasticsearch.mocksocket.MockSocket;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.Closeable;
import java.io.EOFException;
import java.io.IOException;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketException;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* This is a socket based blocking TcpTransport implementation that is used for tests
* that need real networking. This implementation is a test only implementation that implements
* the networking layer in the worst possible way since it blocks and uses a thread per request model.
*/
public class MockTcpTransport extends TcpTransport {
private static final Logger logger = LogManager.getLogger(MockTcpTransport.class);
/**
* A pre-built light connection profile that shares a single connection across all
* types.
*/
static final ConnectionProfile LIGHT_PROFILE;
private final Set<MockChannel> openChannels = new HashSet<>();
static {
ConnectionProfile.Builder builder = new ConnectionProfile.Builder();
builder.addConnections(1,
TransportRequestOptions.Type.BULK,
TransportRequestOptions.Type.PING,
TransportRequestOptions.Type.RECOVERY,
TransportRequestOptions.Type.REG,
TransportRequestOptions.Type.STATE);
LIGHT_PROFILE = builder.build();
}
private final ExecutorService executor;
public MockTcpTransport(Settings settings, ThreadPool threadPool, BigArrays bigArrays,
CircuitBreakerService circuitBreakerService, NamedWriteableRegistry namedWriteableRegistry,
NetworkService networkService) {
this(settings, threadPool, bigArrays, circuitBreakerService, namedWriteableRegistry, networkService, Version.CURRENT);
}
public MockTcpTransport(Settings settings, ThreadPool threadPool, BigArrays bigArrays,
CircuitBreakerService circuitBreakerService, NamedWriteableRegistry namedWriteableRegistry,
NetworkService networkService, Version mockVersion) {
super("mock-tcp-transport", settings, mockVersion, threadPool, PageCacheRecycler.NON_RECYCLING_INSTANCE, circuitBreakerService,
namedWriteableRegistry, networkService);
// we have our own crazy cached threadpool this one is not bounded at all...
// using the ES thread factory here is crucial for tests otherwise disruption tests won't block that thread
executor = Executors.newCachedThreadPool(EsExecutors.daemonThreadFactory(settings, Transports.TEST_MOCK_TRANSPORT_THREAD_PREFIX));
}
@Override
protected MockChannel bind(final String name, InetSocketAddress address) throws IOException {
MockServerSocket socket = new MockServerSocket();
socket.setReuseAddress(TCP_REUSE_ADDRESS.get(settings));
ByteSizeValue tcpReceiveBufferSize = TCP_RECEIVE_BUFFER_SIZE.get(settings);
if (tcpReceiveBufferSize.getBytes() > 0) {
socket.setReceiveBufferSize(tcpReceiveBufferSize.bytesAsInt());
}
socket.bind(address);
MockChannel serverMockChannel = new MockChannel(socket, name);
CountDownLatch started = new CountDownLatch(1);
executor.execute(new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
onException(serverMockChannel, e);
}
@Override
protected void doRun() throws Exception {
started.countDown();
serverMockChannel.accept(executor);
}
});
try {
started.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return serverMockChannel;
}
private void readMessage(MockChannel mockChannel, StreamInput input) throws IOException {
Socket socket = mockChannel.activeChannel;
byte[] minimalHeader = new byte[TcpHeader.MARKER_BYTES_SIZE + TcpHeader.MESSAGE_LENGTH_SIZE];
try {
input.readFully(minimalHeader);
} catch (EOFException eof) {
throw new IOException("Connection reset by peer");
}
// Read message length will throw stream corrupted exception if the marker bytes incorrect
int msgSize = TcpTransport.readMessageLength(new BytesArray(minimalHeader));
if (msgSize == -1) {
socket.getOutputStream().flush();
} else {
final byte[] buffer = new byte[msgSize];
input.readFully(buffer);
int expectedSize = TcpHeader.MARKER_BYTES_SIZE + TcpHeader.MESSAGE_LENGTH_SIZE + msgSize;
try (BytesStreamOutput output = new ReleasableBytesStreamOutput(expectedSize, bigArrays)) {
output.write(minimalHeader);
output.write(buffer);
consumeNetworkReads(mockChannel, output.bytes());
}
}
}
@Override
@SuppressForbidden(reason = "real socket for mocking remote connections")
protected MockChannel initiateChannel(DiscoveryNode node) throws IOException {
InetSocketAddress address = node.getAddress().address();
final MockSocket socket = new MockSocket();
final MockChannel channel = new MockChannel(socket, address, false, "none");
boolean success = false;
try {
configureSocket(socket);
success = true;
} finally {
if (success == false) {
IOUtils.close(socket);
}
}
executor.submit(() -> {
try {
socket.connect(address);
socket.setSoLinger(false, 0);
channel.connectFuture.complete(null);
channel.loopRead(executor);
} catch (Exception ex) {
channel.connectFuture.completeExceptionally(ex);
}
});
return channel;
}
@Override
protected ConnectionProfile maybeOverrideConnectionProfile(ConnectionProfile connectionProfile) {
ConnectionProfile.Builder builder = new ConnectionProfile.Builder();
Set<TransportRequestOptions.Type> allTypesWithConnection = new HashSet<>();
Set<TransportRequestOptions.Type> allTypesWithoutConnection = new HashSet<>();
for (ConnectionProfile.ConnectionTypeHandle handle : connectionProfile.getHandles()) {
Set<TransportRequestOptions.Type> types = handle.getTypes();
if (handle.length > 0) {
allTypesWithConnection.addAll(types);
} else {
allTypesWithoutConnection.addAll(types);
}
}
// make sure we maintain at least the types that are supported by this profile even if we only use a single channel for them.
builder.addConnections(1, allTypesWithConnection.toArray(new TransportRequestOptions.Type[0]));
if (allTypesWithoutConnection.isEmpty() == false) {
builder.addConnections(0, allTypesWithoutConnection.toArray(new TransportRequestOptions.Type[0]));
}
builder.setHandshakeTimeout(connectionProfile.getHandshakeTimeout());
builder.setConnectTimeout(connectionProfile.getConnectTimeout());
builder.setPingInterval(connectionProfile.getPingInterval());
builder.setCompressionEnabled(connectionProfile.getCompressionEnabled());
return builder.build();
}
private void configureSocket(Socket socket) throws SocketException {
socket.setTcpNoDelay(TCP_NO_DELAY.get(settings));
ByteSizeValue tcpSendBufferSize = TCP_SEND_BUFFER_SIZE.get(settings);
if (tcpSendBufferSize.getBytes() > 0) {
socket.setSendBufferSize(tcpSendBufferSize.bytesAsInt());
}
ByteSizeValue tcpReceiveBufferSize = TCP_RECEIVE_BUFFER_SIZE.get(settings);
if (tcpReceiveBufferSize.getBytes() > 0) {
socket.setReceiveBufferSize(tcpReceiveBufferSize.bytesAsInt());
}
socket.setReuseAddress(TCP_REUSE_ADDRESS.get(settings));
}
public final class MockChannel implements Closeable, TcpChannel, TcpServerChannel {
private final AtomicBoolean isOpen = new AtomicBoolean(true);
private final InetSocketAddress localAddress;
private final ServerSocket serverSocket;
private final Set<MockChannel> workerChannels = Collections.newSetFromMap(new ConcurrentHashMap<>());
private final Socket activeChannel;
private final boolean isServer;
private final String profile;
private final CancellableThreads cancellableThreads = new CancellableThreads();
private final CompletableContext<Void> closeFuture = new CompletableContext<>();
private final CompletableContext<Void> connectFuture = new CompletableContext<>();
private final ChannelStats stats = new ChannelStats();
/**
* Constructs a new MockChannel instance intended for handling the actual incoming / outgoing traffic.
*
* @param socket The client socket. Mut not be null.
* @param localAddress Address associated with the corresponding local server socket. Must not be null.
* @param profile The associated profile name.
*/
MockChannel(Socket socket, InetSocketAddress localAddress, boolean isServer, String profile) {
this.localAddress = localAddress;
this.activeChannel = socket;
this.isServer = isServer;
this.serverSocket = null;
this.profile = profile;
synchronized (openChannels) {
openChannels.add(this);
}
}
/**
* Constructs a new MockChannel instance intended for accepting requests.
*
* @param serverSocket The associated server socket. Must not be null.
* @param profile The associated profile name.
*/
MockChannel(ServerSocket serverSocket, String profile) {
this.localAddress = (InetSocketAddress) serverSocket.getLocalSocketAddress();
this.serverSocket = serverSocket;
this.profile = profile;
this.isServer = false;
this.activeChannel = null;
synchronized (openChannels) {
openChannels.add(this);
}
}
public void accept(Executor executor) throws IOException {
while (isOpen.get()) {
Socket incomingSocket = serverSocket.accept();
MockChannel incomingChannel = null;
try {
configureSocket(incomingSocket);
synchronized (this) {
if (isOpen.get()) {
InetSocketAddress localAddress = new InetSocketAddress(incomingSocket.getLocalAddress(),
incomingSocket.getPort());
incomingChannel = new MockChannel(incomingSocket, localAddress, true, profile);
MockChannel finalIncomingChannel = incomingChannel;
incomingChannel.addCloseListener(new ActionListener<Void>() {
@Override
public void onResponse(Void aVoid) {
workerChannels.remove(finalIncomingChannel);
}
@Override
public void onFailure(Exception e) {
workerChannels.remove(finalIncomingChannel);
}
});
serverAcceptedChannel(incomingChannel);
//establish a happens-before edge between closing and accepting a new connection
workerChannels.add(incomingChannel);
// this spawns a new thread immediately, so OK under lock
incomingChannel.loopRead(executor);
// the channel is properly registered and will be cleared by the close code.
incomingSocket = null;
incomingChannel = null;
}
}
} finally {
// ensure we don't leak sockets and channels in the failure case. Note that we null both
// if there are no exceptions so this becomes a no op.
IOUtils.closeWhileHandlingException(incomingSocket, incomingChannel);
}
}
}
void loopRead(Executor executor) {
executor.execute(new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
if (isOpen.get()) {
try {
onException(MockChannel.this, e);
} catch (Exception ex) {
logger.warn("failed on handling exception", ex);
IOUtils.closeWhileHandlingException(MockChannel.this); // pure paranoia
}
}
}
@Override
protected void doRun() throws Exception {
StreamInput input = new InputStreamStreamInput(new BufferedInputStream(activeChannel.getInputStream()));
// There is a (slim) chance that we get interrupted right after a loop iteration, so check explicitly
while (isOpen.get() && !Thread.currentThread().isInterrupted()) {
cancellableThreads.executeIO(() -> readMessage(MockChannel.this, input));
}
}
});
}
synchronized void close0() throws IOException {
// establish a happens-before edge between closing and accepting a new connection
// we have to sync this entire block to ensure that our openChannels checks work correctly.
// The close block below will close all worker channels but if one of the worker channels runs into an exception
// for instance due to a disconnect the handling of this exception might be executed concurrently.
// now if we are in-turn concurrently call close we might not wait for the actual close to happen and that will, down the road
// make the assertion trip that not all channels are closed.
if (isOpen.compareAndSet(true, false)) {
final boolean removedChannel;
synchronized (openChannels) {
removedChannel = openChannels.remove(this);
}
IOUtils.close(serverSocket, activeChannel, () -> IOUtils.close(workerChannels),
() -> cancellableThreads.cancel("channel closed"));
assert removedChannel: "Channel was not removed or removed twice?";
}
}
@Override
public String toString() {
return "MockChannel{" +
"profile='" + profile + '\'' +
", isOpen=" + isOpen +
", localAddress=" + localAddress +
", isServerSocket=" + (serverSocket != null) +
'}';
}
@Override
public void close() {
try {
close0();
closeFuture.complete(null);
} catch (IOException e) {
closeFuture.completeExceptionally(e);
}
}
@Override
public String getProfile() {
return profile;
}
@Override
public boolean isServerChannel() {
return isServer;
}
@Override
public void addCloseListener(ActionListener<Void> listener) {
closeFuture.addListener(ActionListener.toBiConsumer(listener));
}
@Override
public void addConnectListener(ActionListener<Void> listener) {
connectFuture.addListener(ActionListener.toBiConsumer(listener));
}
@Override
public ChannelStats getChannelStats() {
return stats;
}
@Override
public boolean isOpen() {
return isOpen.get();
}
@Override
public InetSocketAddress getLocalAddress() {
return localAddress;
}
@Override
public InetSocketAddress getRemoteAddress() {
return (InetSocketAddress) activeChannel.getRemoteSocketAddress();
}
@Override
public void sendMessage(BytesReference reference, ActionListener<Void> listener) {
try {
synchronized (this) {
OutputStream outputStream = new BufferedOutputStream(activeChannel.getOutputStream());
reference.writeTo(outputStream);
outputStream.flush();
}
listener.onResponse(null);
} catch (IOException e) {
listener.onFailure(e);
onException(this, e);
}
}
}
@Override
protected void doStart() {
boolean success = false;
try {
if (NetworkService.NETWORK_SERVER.get(settings)) {
// loop through all profiles and start them up, special handling for default one
for (ProfileSettings profileSettings : profileSettings) {
bindServer(profileSettings);
}
}
super.doStart();
success = true;
} finally {
if (success == false) {
doStop();
}
}
}
@Override
protected void stopInternal() {
ThreadPool.terminate(executor, 10, TimeUnit.SECONDS);
synchronized (openChannels) {
assert openChannels.isEmpty() : "there are still open channels: " + openChannels;
}
}
}

View File

@ -0,0 +1,49 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.transport;
import org.elasticsearch.common.settings.Settings;
final class TestProfiles {
private TestProfiles() {}
/**
* A pre-built light connection profile that shares a single connection across all
* types.
*/
static final ConnectionProfile LIGHT_PROFILE;
static {
ConnectionProfile source = ConnectionProfile.buildDefaultConnectionProfile(Settings.EMPTY);
ConnectionProfile.Builder builder = new ConnectionProfile.Builder();
builder.setConnectTimeout(source.getConnectTimeout());
builder.setHandshakeTimeout(source.getHandshakeTimeout());
builder.setCompressionEnabled(source.getCompressionEnabled());
builder.setPingInterval(source.getPingInterval());
builder.addConnections(1,
TransportRequestOptions.Type.BULK,
TransportRequestOptions.Type.PING,
TransportRequestOptions.Type.RECOVERY,
TransportRequestOptions.Type.REG,
TransportRequestOptions.Type.STATE);
LIGHT_PROFILE = builder.build();
}
}

View File

@ -1,62 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.transport;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.test.transport.MockTransportService;
import java.util.Collections;
public class MockTcpTransportTests extends AbstractSimpleTransportTestCase {
@Override
protected MockTransportService build(Settings settings, Version version, ClusterSettings clusterSettings, boolean doHandshake) {
NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(Collections.emptyList());
Transport transport = new MockTcpTransport(settings, threadPool, BigArrays.NON_RECYCLING_INSTANCE,
new NoneCircuitBreakerService(), namedWriteableRegistry, new NetworkService(Collections.emptyList()), version) {
@Override
public void executeHandshake(DiscoveryNode node, TcpChannel channel, ConnectionProfile profile,
ActionListener<Version> listener) {
if (doHandshake) {
super.executeHandshake(node, channel, profile, listener);
} else {
listener.onResponse(version.minimumCompatibilityVersion());
}
}
};
MockTransportService mockTransportService =
MockTransportService.createNewService(settings, transport, version, threadPool, clusterSettings, Collections.emptySet());
mockTransportService.start();
return mockTransportService;
}
@Override
public int channelsPerNodeConnection() {
return 1;
}
}