Zen2: Add DisruptableMockTransport (#33713)

Adds a mock transport implementation that allows to simulate network disruptions.
This commit is contained in:
Yannick Welsch 2018-09-18 11:48:24 +02:00 committed by GitHub
parent c79fbea923
commit 758b2f9111
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
22 changed files with 849 additions and 256 deletions

View File

@ -65,7 +65,7 @@ public class GetIndexActionTests extends ESSingleNodeTestCase {
clusterService = getInstanceFromNode(ClusterService.class);
indicesService = getInstanceFromNode(IndicesService.class);
CapturingTransport capturingTransport = new CapturingTransport();
transportService = capturingTransport.createCapturingTransportService(clusterService.getSettings(), threadPool,
transportService = capturingTransport.createTransportService(clusterService.getSettings(), threadPool,
TransportService.NOOP_TRANSPORT_INTERCEPTOR,
boundAddress -> clusterService.localNode(), null, emptySet());
transportService.start();

View File

@ -77,7 +77,7 @@ public class GetSettingsActionTests extends ESTestCase {
threadPool = new TestThreadPool("GetSettingsActionTests");
clusterService = createClusterService(threadPool);
CapturingTransport capturingTransport = new CapturingTransport();
transportService = capturingTransport.createCapturingTransportService(clusterService.getSettings(), threadPool,
transportService = capturingTransport.createTransportService(clusterService.getSettings(), threadPool,
TransportService.NOOP_TRANSPORT_INTERCEPTOR,
boundAddress -> clusterService.localNode(), null, Collections.emptySet());
transportService.start();

View File

@ -80,7 +80,7 @@ public class TransportBulkActionTests extends ESTestCase {
threadPool = new TestThreadPool("TransportBulkActionTookTests");
clusterService = createClusterService(threadPool);
CapturingTransport capturingTransport = new CapturingTransport();
transportService = capturingTransport.createCapturingTransportService(clusterService.getSettings(), threadPool,
transportService = capturingTransport.createTransportService(clusterService.getSettings(), threadPool,
TransportService.NOOP_TRANSPORT_INTERCEPTOR,
boundAddress -> clusterService.localNode(), null, Collections.emptySet());
transportService.start();

View File

@ -92,7 +92,7 @@ public class TransportBulkActionTookTests extends ESTestCase {
private TransportBulkAction createAction(boolean controlled, AtomicLong expected) {
CapturingTransport capturingTransport = new CapturingTransport();
TransportService transportService = capturingTransport.createCapturingTransportService(clusterService.getSettings(), threadPool,
TransportService transportService = capturingTransport.createTransportService(clusterService.getSettings(), threadPool,
TransportService.NOOP_TRANSPORT_INTERCEPTOR,
boundAddress -> clusterService.localNode(), null, Collections.emptySet());
transportService.start();

View File

@ -191,7 +191,7 @@ public class TransportBroadcastByNodeActionTests extends ESTestCase {
super.setUp();
transport = new CapturingTransport();
clusterService = createClusterService(THREAD_POOL);
TransportService transportService = transport.createCapturingTransportService(clusterService.getSettings(), THREAD_POOL,
TransportService transportService = transport.createTransportService(clusterService.getSettings(), THREAD_POOL,
TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> clusterService.localNode(), null, Collections.emptySet());
transportService.start();
transportService.acceptIncomingRequests();

View File

@ -87,7 +87,7 @@ public class TransportMasterNodeActionTests extends ESTestCase {
super.setUp();
transport = new CapturingTransport();
clusterService = createClusterService(threadPool);
transportService = transport.createCapturingTransportService(clusterService.getSettings(), threadPool,
transportService = transport.createTransportService(clusterService.getSettings(), threadPool,
TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> clusterService.localNode(), null, Collections.emptySet());
transportService.start();
transportService.acceptIncomingRequests();

View File

@ -180,7 +180,7 @@ public class TransportNodesActionTests extends ESTestCase {
super.setUp();
transport = new CapturingTransport();
clusterService = createClusterService(THREAD_POOL);
transportService = transport.createCapturingTransportService(clusterService.getSettings(), THREAD_POOL,
transportService = transport.createTransportService(clusterService.getSettings(), THREAD_POOL,
TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> clusterService.localNode(), null, Collections.emptySet());
transportService.start();
transportService.acceptIncomingRequests();

View File

@ -164,7 +164,7 @@ public class TransportReplicationActionTests extends ESTestCase {
super.setUp();
transport = new CapturingTransport();
clusterService = createClusterService(threadPool);
transportService = transport.createCapturingTransportService(clusterService.getSettings(), threadPool,
transportService = transport.createTransportService(clusterService.getSettings(), threadPool,
TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> clusterService.localNode(), null, Collections.emptySet());
transportService.start();
transportService.acceptIncomingRequests();

View File

@ -255,7 +255,7 @@ public class TransportWriteActionTests extends ESTestCase {
public void testReplicaProxy() throws InterruptedException, ExecutionException {
CapturingTransport transport = new CapturingTransport();
TransportService transportService = transport.createCapturingTransportService(clusterService.getSettings(), threadPool,
TransportService transportService = transport.createTransportService(clusterService.getSettings(), threadPool,
TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> clusterService.localNode(), null, Collections.emptySet());
transportService.start();
transportService.acceptIncomingRequests();

View File

@ -143,7 +143,7 @@ public class TransportInstanceSingleOperationActionTests extends ESTestCase {
super.setUp();
transport = new CapturingTransport();
clusterService = createClusterService(THREAD_POOL);
transportService = transport.createCapturingTransportService(clusterService.getSettings(), THREAD_POOL,
transportService = transport.createTransportService(clusterService.getSettings(), THREAD_POOL,
TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> clusterService.localNode(), null, Collections.emptySet());
transportService.start();
transportService.acceptIncomingRequests();

View File

@ -124,7 +124,7 @@ public class ShardStateActionTests extends ESTestCase {
super.setUp();
this.transport = new CapturingTransport();
clusterService = createClusterService(THREAD_POOL);
transportService = transport.createCapturingTransportService(clusterService.getSettings(), THREAD_POOL,
transportService = transport.createTransportService(clusterService.getSettings(), THREAD_POOL,
TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> clusterService.localNode(), null, Collections.emptySet());
transportService.start();
transportService.acceptIncomingRequests();

View File

@ -40,7 +40,7 @@ import org.elasticsearch.discovery.zen.UnicastHostsProvider.HostsResolver;
import org.elasticsearch.indices.cluster.FakeThreadPoolMasterService;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.test.transport.CapturingTransport;
import org.elasticsearch.test.transport.MockTransport;
import org.elasticsearch.threadpool.ThreadPool.Names;
import org.elasticsearch.transport.RequestHandlerRegistry;
import org.elasticsearch.transport.TransportChannel;
@ -192,7 +192,7 @@ public class CoordinatorTests extends ESTestCase {
private final PersistedState persistedState;
private MasterService masterService;
private TransportService transportService;
private CapturingTransport capturingTransport;
private MockTransport mockTransport;
ClusterNode(int nodeIndex) {
super(Settings.builder().put(NODE_NAME_SETTING.getKey(), nodeIdFromIndex(nodeIndex)).build());
@ -214,7 +214,7 @@ public class CoordinatorTests extends ESTestCase {
}
private void setUp() {
capturingTransport = new CapturingTransport() {
mockTransport = new MockTransport() {
@Override
protected void onSendRequest(long requestId, String action, TransportRequest request, DiscoveryNode destination) {
assert destination.equals(localNode) == false : "non-local message from " + localNode + " to itself";
@ -243,7 +243,7 @@ public class CoordinatorTests extends ESTestCase {
destinationNode -> {
final RequestHandlerRegistry requestHandler
= destinationNode.capturingTransport.getRequestHandler(action);
= destinationNode.mockTransport.getRequestHandler(action);
final TransportChannel transportChannel = new TransportChannel() {
@Override
@ -410,7 +410,7 @@ public class CoordinatorTests extends ESTestCase {
});
masterService.start();
transportService = capturingTransport.createCapturingTransportService(
transportService = mockTransport.createTransportService(
settings, deterministicTaskQueue.getThreadPool(), NOOP_TRANSPORT_INTERCEPTOR, a -> localNode, null, emptySet());
transportService.start();
transportService.acceptIncomingRequests();

View File

@ -27,7 +27,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.transport.CapturingTransport;
import org.elasticsearch.test.transport.MockTransport;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.RemoteTransportException;
import org.elasticsearch.transport.TransportException;
@ -69,7 +69,7 @@ public class PreVoteCollectorTests extends ESTestCase {
public void createObjects() {
Settings settings = Settings.builder().put(NODE_NAME_SETTING.getKey(), "node").build();
deterministicTaskQueue = new DeterministicTaskQueue(settings);
final CapturingTransport capturingTransport = new CapturingTransport() {
final MockTransport mockTransport = new MockTransport() {
@Override
protected void onSendRequest(final long requestId, final String action, final TransportRequest request,
final DiscoveryNode node) {
@ -103,7 +103,7 @@ public class PreVoteCollectorTests extends ESTestCase {
localNode = new DiscoveryNode("local-node", buildNewFakeTransportAddress(), Version.CURRENT);
responsesByNode.put(localNode, new PreVoteResponse(currentTerm, lastAcceptedTerm, lastAcceptedVersion));
transportService = capturingTransport.createCapturingTransportService(settings,
transportService = mockTransport.createTransportService(settings,
deterministicTaskQueue.getThreadPool(), TransportService.NOOP_TRANSPORT_INTERCEPTOR,
boundTransportAddress -> localNode, null, emptySet());
transportService.start();

View File

@ -95,7 +95,7 @@ public class ClusterStateHealthTests extends ESTestCase {
super.setUp();
clusterService = createClusterService(threadPool);
CapturingTransport transport = new CapturingTransport();
transportService = transport.createCapturingTransportService(clusterService.getSettings(), threadPool,
transportService = transport.createTransportService(clusterService.getSettings(), threadPool,
TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> clusterService.localNode(), null, Collections.emptySet());
transportService.start();
transportService.acceptIncomingRequests();

View File

@ -26,7 +26,7 @@ import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.transport.CapturingTransport;
import org.elasticsearch.test.transport.MockTransport;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportRequest;
@ -69,7 +69,7 @@ public class HandshakingTransportAddressConnectorTests extends ESTestCase {
remoteClusterName = null;
dropHandshake = false;
final CapturingTransport capturingTransport = new CapturingTransport() {
final MockTransport mockTransport = new MockTransport() {
@Override
protected void onSendRequest(long requestId, String action, TransportRequest request, DiscoveryNode node) {
super.onSendRequest(requestId, action, request, node);
@ -81,7 +81,7 @@ public class HandshakingTransportAddressConnectorTests extends ESTestCase {
}
};
transportService = capturingTransport.createCapturingTransportService(settings, threadPool,
transportService = mockTransport.createTransportService(settings, threadPool,
TransportService.NOOP_TRANSPORT_INTERCEPTOR, address -> localNode, null, emptySet());
transportService.start();

View File

@ -56,7 +56,7 @@ public class GlobalCheckpointSyncActionTests extends ESTestCase {
threadPool = new TestThreadPool(getClass().getName());
transport = new CapturingTransport();
clusterService = createClusterService(threadPool);
transportService = transport.createCapturingTransportService(clusterService.getSettings(), threadPool,
transportService = transport.createTransportService(clusterService.getSettings(), threadPool,
TransportService.NOOP_TRANSPORT_INTERCEPTOR, boundAddress -> clusterService.localNode(), null, Collections.emptySet());
transportService.start();
transportService.acceptIncomingRequests();

View File

@ -0,0 +1,189 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.test.disruption;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.test.transport.MockTransport;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.RequestHandlerRegistry;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportResponseOptions;
import java.util.Optional;
public abstract class DisruptableMockTransport extends MockTransport {
private final Logger logger;
public DisruptableMockTransport(Logger logger) {
this.logger = logger;
}
protected abstract DiscoveryNode getLocalNode();
protected abstract ConnectionStatus getConnectionStatus(DiscoveryNode sender, DiscoveryNode destination);
protected abstract Optional<DisruptableMockTransport> getDisruptedCapturingTransport(DiscoveryNode node);
protected abstract void handle(DiscoveryNode sender, DiscoveryNode destination, Runnable doDelivery);
private void sendFromTo(DiscoveryNode sender, DiscoveryNode destination, Runnable doDelivery) {
handle(sender, destination, new Runnable() {
@Override
public void run() {
if (getDisruptedCapturingTransport(destination).isPresent()) {
doDelivery.run();
} else {
logger.trace("unknown destination in {}", this);
}
}
@Override
public String toString() {
return doDelivery.toString() + " from " + sender + " to " + destination;
}
});
}
@Override
protected void onSendRequest(long requestId, String action, TransportRequest request, DiscoveryNode destination) {
assert destination.equals(getLocalNode()) == false : "non-local message from " + getLocalNode() + " to itself";
super.onSendRequest(requestId, action, request, destination);
final String requestDescription = new ParameterizedMessage("{}[{}] from {} to {}",
action, requestId, getLocalNode(), destination).getFormattedMessage();
final Runnable returnConnectException = new Runnable() {
@Override
public void run() {
handleError(requestId, new ConnectTransportException(destination, "disconnected"));
}
@Override
public String toString() {
return "disconnection response to " + requestDescription;
}
};
sendFromTo(getLocalNode(), destination, new Runnable() {
@Override
public void run() {
switch (getConnectionStatus(getLocalNode(), destination)) {
case BLACK_HOLE:
logger.trace("dropping {}", requestDescription);
break;
case DISCONNECTED:
sendFromTo(destination, getLocalNode(), returnConnectException);
break;
case CONNECTED:
Optional<DisruptableMockTransport> destinationTransport = getDisruptedCapturingTransport(destination);
assert destinationTransport.isPresent();
final RequestHandlerRegistry<TransportRequest> requestHandler =
destinationTransport.get().getRequestHandler(action);
final TransportChannel transportChannel = new TransportChannel() {
@Override
public String getProfileName() {
return "default";
}
@Override
public String getChannelType() {
return "disruptable-mock-transport-channel";
}
@Override
public void sendResponse(final TransportResponse response) {
sendFromTo(destination, getLocalNode(), new Runnable() {
@Override
public void run() {
if (getConnectionStatus(destination, getLocalNode()) != ConnectionStatus.CONNECTED) {
logger.trace("dropping response to {}: channel is not CONNECTED",
requestDescription);
} else {
handleResponse(requestId, response);
}
}
@Override
public String toString() {
return "response to " + requestDescription;
}
});
}
@Override
public void sendResponse(TransportResponse response,
TransportResponseOptions options) {
sendResponse(response);
}
@Override
public void sendResponse(Exception exception) {
sendFromTo(destination, getLocalNode(), new Runnable() {
@Override
public void run() {
if (getConnectionStatus(destination, getLocalNode()) != ConnectionStatus.CONNECTED) {
logger.trace("dropping response to {}: channel is not CONNECTED",
requestDescription);
} else {
handleRemoteError(requestId, exception);
}
}
@Override
public String toString() {
return "error response to " + requestDescription;
}
});
}
};
try {
requestHandler.processMessageReceived(request, transportChannel);
} catch (Exception e) {
try {
transportChannel.sendResponse(e);
} catch (Exception ee) {
logger.warn("failed to send failure", e);
}
}
}
}
@Override
public String toString() {
return requestDescription;
}
});
}
public enum ConnectionStatus {
CONNECTED,
DISCONNECTED, // network requests to or from this node throw a ConnectTransportException
BLACK_HOLE // network traffic to or from the corresponding node is silently discarded
}
}

View File

@ -19,61 +19,22 @@
package org.elasticsearch.test.transport;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.component.Lifecycle;
import org.elasticsearch.common.component.LifecycleListener;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.BoundTransportAddress;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.ConnectionManager;
import org.elasticsearch.transport.ConnectionProfile;
import org.elasticsearch.transport.RemoteTransportException;
import org.elasticsearch.transport.RequestHandlerRegistry;
import org.elasticsearch.transport.SendRequestTransportException;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportInterceptor;
import org.elasticsearch.transport.TransportMessageListener;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.transport.TransportStats;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Function;
import static org.apache.lucene.util.LuceneTestCase.rarely;
/**
* A transport class that doesn't send anything but rather captures all requests for inspection from tests
*/
public class CapturingTransport implements Transport {
private volatile Map<String, RequestHandlerRegistry> requestHandlers = Collections.emptyMap();
private final Object requestHandlerMutex = new Object();
private final ResponseHandlers responseHandlers = new ResponseHandlers();
private TransportMessageListener listener;
public class CapturingTransport extends MockTransport implements Transport {
public static class CapturedRequest {
public final DiscoveryNode node;
@ -89,20 +50,8 @@ public class CapturingTransport implements Transport {
}
}
private ConcurrentMap<Long, Tuple<DiscoveryNode, String>> requests = new ConcurrentHashMap<>();
private BlockingQueue<CapturedRequest> capturedRequests = ConcurrentCollections.newBlockingQueue();
public TransportService createCapturingTransportService(Settings settings, ThreadPool threadPool, TransportInterceptor interceptor,
Function<BoundTransportAddress, DiscoveryNode> localNodeFactory,
@Nullable ClusterSettings clusterSettings, Set<String> taskHeaders) {
StubbableConnectionManager connectionManager = new StubbableConnectionManager(new ConnectionManager(settings, this, threadPool),
settings, this, threadPool);
connectionManager.setDefaultNodeConnectedBehavior((cm, discoveryNode) -> nodeConnected(discoveryNode));
connectionManager.setDefaultConnectBehavior((cm, discoveryNode) -> openConnection(discoveryNode, null));
return new TransportService(settings, this, threadPool, interceptor, localNodeFactory, clusterSettings, taskHeaders,
connectionManager);
}
/**
* returns all requests captured so far. Doesn't clear the captured request list. See {@link #clear()}
*/
@ -160,188 +109,10 @@ public class CapturingTransport implements Transport {
capturedRequests.clear();
}
/**
* simulate a response for the given requestId
*/
public void handleResponse(final long requestId, final TransportResponse response) {
responseHandlers.onResponseReceived(requestId, listener).handleResponse(response);
}
/**
* simulate a local error for the given requestId, will be wrapped
* by a {@link SendRequestTransportException}
*
* @param requestId the id corresponding to the captured send
* request
* @param t the failure to wrap
*/
public void handleLocalError(final long requestId, final Throwable t) {
Tuple<DiscoveryNode, String> request = requests.get(requestId);
assert request != null;
this.handleError(requestId, new SendRequestTransportException(request.v1(), request.v2(), t));
}
/**
* simulate a remote error for the given requestId, will be wrapped
* by a {@link RemoteTransportException}
*
* @param requestId the id corresponding to the captured send
* request
* @param t the failure to wrap
*/
public void handleRemoteError(final long requestId, final Throwable t) {
final RemoteTransportException remoteException;
if (rarely(Randomness.get())) {
remoteException = new RemoteTransportException("remote failure, coming from local node", t);
} else {
try (BytesStreamOutput output = new BytesStreamOutput()) {
output.writeException(t);
remoteException = new RemoteTransportException("remote failure", output.bytes().streamInput().readException());
} catch (IOException ioException) {
throw new ElasticsearchException("failed to serialize/deserialize supplied exception " + t, ioException);
}
}
this.handleError(requestId, remoteException);
}
/**
* simulate an error for the given requestId, unlike
* {@link #handleLocalError(long, Throwable)} and
* {@link #handleRemoteError(long, Throwable)}, the provided
* exception will not be wrapped but will be delivered to the
* transport layer as is
*
* @param requestId the id corresponding to the captured send
* request
* @param e the failure
*/
public void handleError(final long requestId, final TransportException e) {
responseHandlers.onResponseReceived(requestId, listener).handleException(e);
}
@Override
public Connection openConnection(DiscoveryNode node, ConnectionProfile profile) {
return new Connection() {
@Override
public DiscoveryNode getNode() {
return node;
}
@Override
public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options)
throws TransportException {
onSendRequest(requestId, action, request, node);
}
@Override
public void addCloseListener(ActionListener<Void> listener) {
}
@Override
public boolean isClosed() {
return false;
}
@Override
public void close() {
}
};
}
protected void onSendRequest(long requestId, String action, TransportRequest request, DiscoveryNode node) {
requests.put(requestId, Tuple.tuple(node, action));
capturedRequests.add(new CapturedRequest(node, requestId, action, request));
}
protected boolean nodeConnected(DiscoveryNode discoveryNode) {
return true;
}
@Override
public TransportStats getStats() {
throw new UnsupportedOperationException();
}
@Override
public BoundTransportAddress boundAddress() {
return null;
}
@Override
public Map<String, BoundTransportAddress> profileBoundAddresses() {
return null;
}
@Override
public TransportAddress[] addressesFromString(String address, int perAddressLimit) {
return new TransportAddress[0];
}
@Override
public Lifecycle.State lifecycleState() {
return null;
}
@Override
public void addLifecycleListener(LifecycleListener listener) {
}
@Override
public void removeLifecycleListener(LifecycleListener listener) {
}
@Override
public void start() {
}
@Override
public void stop() {
}
@Override
public void close() {
}
@Override
public List<String> getLocalAddresses() {
return Collections.emptyList();
}
@Override
public <Request extends TransportRequest> void registerRequestHandler(RequestHandlerRegistry<Request> reg) {
synchronized (requestHandlerMutex) {
if (requestHandlers.containsKey(reg.getAction())) {
throw new IllegalArgumentException("transport handlers for action " + reg.getAction() + " is already registered");
}
requestHandlers = MapBuilder.newMapBuilder(requestHandlers).put(reg.getAction(), reg).immutableMap();
}
}
@Override
public ResponseHandlers getResponseHandlers() {
return responseHandlers;
}
@Override
public RequestHandlerRegistry getRequestHandler(String action) {
return requestHandlers.get(action);
}
@Override
public void addMessageListener(TransportMessageListener listener) {
if (this.listener != null) {
throw new IllegalStateException("listener already set");
}
this.listener = listener;
}
@Override
public boolean removeMessageListener(TransportMessageListener listener) {
if (listener == this.listener) {
this.listener = null;
return true;
}
return false;
super.onSendRequest(requestId, action, request, node);
capturedRequests.add(new CapturingTransport.CapturedRequest(node, requestId, action, request));
}
}

View File

@ -0,0 +1,271 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.test.transport;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.component.Lifecycle;
import org.elasticsearch.common.component.LifecycleComponent;
import org.elasticsearch.common.component.LifecycleListener;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.BoundTransportAddress;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.ConnectionManager;
import org.elasticsearch.transport.ConnectionProfile;
import org.elasticsearch.transport.RemoteTransportException;
import org.elasticsearch.transport.RequestHandlerRegistry;
import org.elasticsearch.transport.SendRequestTransportException;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportInterceptor;
import org.elasticsearch.transport.TransportMessageListener;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.transport.TransportStats;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Function;
import static org.apache.lucene.util.LuceneTestCase.rarely;
/**
* A basic transport implementation that allows to intercept requests that have been sent
*/
public class MockTransport implements Transport, LifecycleComponent {
private volatile Map<String, RequestHandlerRegistry> requestHandlers = Collections.emptyMap();
private final Object requestHandlerMutex = new Object();
private final ResponseHandlers responseHandlers = new ResponseHandlers();
private TransportMessageListener listener;
private ConcurrentMap<Long, Tuple<DiscoveryNode, String>> requests = new ConcurrentHashMap<>();
public TransportService createTransportService(Settings settings, ThreadPool threadPool, TransportInterceptor interceptor,
Function<BoundTransportAddress, DiscoveryNode> localNodeFactory,
@Nullable ClusterSettings clusterSettings, Set<String> taskHeaders) {
StubbableConnectionManager connectionManager = new StubbableConnectionManager(new ConnectionManager(settings, this, threadPool),
settings, this, threadPool);
connectionManager.setDefaultNodeConnectedBehavior((cm, discoveryNode) -> nodeConnected(discoveryNode));
connectionManager.setDefaultConnectBehavior((cm, discoveryNode) -> openConnection(discoveryNode, null));
return new TransportService(settings, this, threadPool, interceptor, localNodeFactory, clusterSettings, taskHeaders,
connectionManager);
}
/**
* simulate a response for the given requestId
*/
@SuppressWarnings("unchecked")
public void handleResponse(final long requestId, final TransportResponse response) {
responseHandlers.onResponseReceived(requestId, listener).handleResponse(response);
}
/**
* simulate a local error for the given requestId, will be wrapped
* by a {@link SendRequestTransportException}
*
* @param requestId the id corresponding to the captured send
* request
* @param t the failure to wrap
*/
public void handleLocalError(final long requestId, final Throwable t) {
Tuple<DiscoveryNode, String> request = requests.get(requestId);
assert request != null;
this.handleError(requestId, new SendRequestTransportException(request.v1(), request.v2(), t));
}
/**
* simulate a remote error for the given requestId, will be wrapped
* by a {@link RemoteTransportException}
*
* @param requestId the id corresponding to the captured send
* request
* @param t the failure to wrap
*/
public void handleRemoteError(final long requestId, final Throwable t) {
final RemoteTransportException remoteException;
if (rarely(Randomness.get())) {
remoteException = new RemoteTransportException("remote failure, coming from local node", t);
} else {
try (BytesStreamOutput output = new BytesStreamOutput()) {
output.writeException(t);
remoteException = new RemoteTransportException("remote failure", output.bytes().streamInput().readException());
} catch (IOException ioException) {
throw new ElasticsearchException("failed to serialize/deserialize supplied exception " + t, ioException);
}
}
this.handleError(requestId, remoteException);
}
/**
* simulate an error for the given requestId, unlike
* {@link #handleLocalError(long, Throwable)} and
* {@link #handleRemoteError(long, Throwable)}, the provided
* exception will not be wrapped but will be delivered to the
* transport layer as is
*
* @param requestId the id corresponding to the captured send
* request
* @param e the failure
*/
public void handleError(final long requestId, final TransportException e) {
responseHandlers.onResponseReceived(requestId, listener).handleException(e);
}
@Override
public Connection openConnection(DiscoveryNode node, ConnectionProfile profile) {
return new Connection() {
@Override
public DiscoveryNode getNode() {
return node;
}
@Override
public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options)
throws TransportException {
requests.put(requestId, Tuple.tuple(node, action));
onSendRequest(requestId, action, request, node);
}
@Override
public void addCloseListener(ActionListener<Void> listener) {
}
@Override
public boolean isClosed() {
return false;
}
@Override
public void close() {
}
};
}
protected void onSendRequest(long requestId, String action, TransportRequest request, DiscoveryNode node) {
}
protected boolean nodeConnected(DiscoveryNode discoveryNode) {
return true;
}
@Override
public TransportStats getStats() {
throw new UnsupportedOperationException();
}
@Override
public BoundTransportAddress boundAddress() {
return null;
}
@Override
public Map<String, BoundTransportAddress> profileBoundAddresses() {
return null;
}
@Override
public TransportAddress[] addressesFromString(String address, int perAddressLimit) {
return new TransportAddress[0];
}
@Override
public Lifecycle.State lifecycleState() {
return null;
}
@Override
public void addLifecycleListener(LifecycleListener listener) {
}
@Override
public void removeLifecycleListener(LifecycleListener listener) {
}
@Override
public void start() {
}
@Override
public void stop() {
}
@Override
public void close() {
}
@Override
public List<String> getLocalAddresses() {
return Collections.emptyList();
}
@Override
public <Request extends TransportRequest> void registerRequestHandler(RequestHandlerRegistry<Request> reg) {
synchronized (requestHandlerMutex) {
if (requestHandlers.containsKey(reg.getAction())) {
throw new IllegalArgumentException("transport handlers for action " + reg.getAction() + " is already registered");
}
requestHandlers = MapBuilder.newMapBuilder(requestHandlers).put(reg.getAction(), reg).immutableMap();
}
}
@Override
public ResponseHandlers getResponseHandlers() {
return responseHandlers;
}
@SuppressWarnings("unchecked")
@Override
public RequestHandlerRegistry<TransportRequest> getRequestHandler(String action) {
return requestHandlers.get(action);
}
@Override
public void addMessageListener(TransportMessageListener listener) {
if (this.listener != null) {
throw new IllegalStateException("listener already set");
}
this.listener = listener;
}
@Override
public boolean removeMessageListener(TransportMessageListener listener) {
if (listener == this.listener) {
this.listener = null;
return true;
}
return false;
}
}

View File

@ -0,0 +1,362 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.test.disruption;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.coordination.DeterministicTaskQueue;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.node.Node;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.disruption.DisruptableMockTransport.ConnectionStatus;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestHandler;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService;
import org.junit.Before;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import static org.elasticsearch.transport.TransportService.NOOP_TRANSPORT_INTERCEPTOR;
import static org.hamcrest.Matchers.containsString;
public class DisruptableMockTransportTests extends ESTestCase {
DiscoveryNode node1;
DiscoveryNode node2;
DisruptableMockTransport transport1;
DisruptableMockTransport transport2;
TransportService service1;
TransportService service2;
DeterministicTaskQueue deterministicTaskQueue;
Set<Tuple<DiscoveryNode, DiscoveryNode>> disconnectedLinks;
Set<Tuple<DiscoveryNode, DiscoveryNode>> blackholedLinks;
ConnectionStatus getConnectionStatus(DiscoveryNode sender, DiscoveryNode destination) {
Tuple<DiscoveryNode, DiscoveryNode> link = Tuple.tuple(sender, destination);
if (disconnectedLinks.contains(link)) {
assert blackholedLinks.contains(link) == false;
return ConnectionStatus.DISCONNECTED;
}
if (blackholedLinks.contains(link)) {
return ConnectionStatus.BLACK_HOLE;
}
return ConnectionStatus.CONNECTED;
}
@Before
public void initTransports() {
node1 = new DiscoveryNode("node1", buildNewFakeTransportAddress(), Version.CURRENT);
node2 = new DiscoveryNode("node2", buildNewFakeTransportAddress(), Version.CURRENT);
List<DiscoveryNode> discoNodes = new ArrayList<>();
discoNodes.add(node1);
discoNodes.add(node2);
disconnectedLinks = new HashSet<>();
blackholedLinks = new HashSet<>();
List<DisruptableMockTransport> transports = new ArrayList<>();
deterministicTaskQueue = new DeterministicTaskQueue(
Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), "dummy").build());
transport1 = new DisruptableMockTransport(logger) {
@Override
protected DiscoveryNode getLocalNode() {
return node1;
}
@Override
protected ConnectionStatus getConnectionStatus(DiscoveryNode sender, DiscoveryNode destination) {
return DisruptableMockTransportTests.this.getConnectionStatus(sender, destination);
}
@Override
protected Optional<DisruptableMockTransport> getDisruptedCapturingTransport(DiscoveryNode destination) {
int index = discoNodes.indexOf(destination);
if (index == -1) {
return Optional.empty();
} else {
return Optional.of(transports.get(index));
}
}
@Override
protected void handle(DiscoveryNode sender, DiscoveryNode destination, Runnable doDelivery) {
deterministicTaskQueue.scheduleNow(doDelivery);
}
};
transport2 = new DisruptableMockTransport(logger) {
@Override
protected DiscoveryNode getLocalNode() {
return node2;
}
@Override
protected ConnectionStatus getConnectionStatus(DiscoveryNode sender, DiscoveryNode destination) {
return DisruptableMockTransportTests.this.getConnectionStatus(sender, destination);
}
@Override
protected Optional<DisruptableMockTransport> getDisruptedCapturingTransport(DiscoveryNode destination) {
int index = discoNodes.indexOf(destination);
if (index == -1) {
return Optional.empty();
} else {
return Optional.of(transports.get(index));
}
}
@Override
protected void handle(DiscoveryNode sender, DiscoveryNode destination, Runnable doDelivery) {
deterministicTaskQueue.scheduleNow(doDelivery);
}
};
transports.add(transport1);
transports.add(transport2);
service1 = transport1.createTransportService(Settings.EMPTY, deterministicTaskQueue.getThreadPool(),
NOOP_TRANSPORT_INTERCEPTOR, a -> node1, null, Collections.emptySet());
service2 = transport2.createTransportService(Settings.EMPTY, deterministicTaskQueue.getThreadPool(),
NOOP_TRANSPORT_INTERCEPTOR, a -> node2, null, Collections.emptySet());
service1.start();
service2.start();
}
private TransportRequestHandler<TransportRequest.Empty> requestHandlerShouldNotBeCalled() {
return (request, channel, task) -> {
throw new AssertionError("should not be called");
};
}
private TransportRequestHandler<TransportRequest.Empty> requestHandlerRepliesNormally() {
return (request, channel, task) -> {
logger.debug("got a dummy request, replying normally...");
channel.sendResponse(TransportResponse.Empty.INSTANCE);
};
}
private TransportRequestHandler<TransportRequest.Empty> requestHandlerRepliesExceptionally(Exception e) {
return (request, channel, task) -> {
logger.debug("got a dummy request, replying exceptionally...");
channel.sendResponse(e);
};
}
private TransportRequestHandler<TransportRequest.Empty> requestHandlerCaptures(Consumer<TransportChannel> channelConsumer) {
return (request, channel, task) -> {
logger.debug("got a dummy request...");
channelConsumer.accept(channel);
};
}
private TransportResponseHandler<TransportResponse> responseHandlerShouldNotBeCalled() {
return new TransportResponseHandler<TransportResponse>() {
@Override
public void handleResponse(TransportResponse response) {
throw new AssertionError("should not be called");
}
@Override
public void handleException(TransportException exp) {
throw new AssertionError("should not be called");
}
@Override
public String executor() {
return ThreadPool.Names.SAME;
}
};
}
private TransportResponseHandler<TransportResponse> responseHandlerShouldBeCalledNormally(Runnable onCalled) {
return new TransportResponseHandler<TransportResponse>() {
@Override
public void handleResponse(TransportResponse response) {
onCalled.run();
}
@Override
public void handleException(TransportException exp) {
throw new AssertionError("should not be called");
}
@Override
public String executor() {
return ThreadPool.Names.SAME;
}
};
}
private TransportResponseHandler<TransportResponse> responseHandlerShouldBeCalledExceptionally(Consumer<TransportException> onCalled) {
return new TransportResponseHandler<TransportResponse>() {
@Override
public void handleResponse(TransportResponse response) {
throw new AssertionError("should not be called");
}
@Override
public void handleException(TransportException exp) {
onCalled.accept(exp);
}
@Override
public String executor() {
return ThreadPool.Names.SAME;
}
};
}
private void registerRequestHandler(TransportService transportService, TransportRequestHandler<TransportRequest.Empty> handler) {
transportService.registerRequestHandler("internal:dummy", () -> TransportRequest.Empty.INSTANCE, ThreadPool.Names.GENERIC, handler);
}
private void send(TransportService transportService, DiscoveryNode destinationNode,
TransportResponseHandler<TransportResponse> responseHandler) {
transportService.sendRequest(destinationNode, "internal:dummy", TransportRequest.Empty.INSTANCE, responseHandler);
}
public void testSuccessfulResponse() {
registerRequestHandler(service1, requestHandlerShouldNotBeCalled());
registerRequestHandler(service2, requestHandlerRepliesNormally());
AtomicBoolean responseHandlerCalled = new AtomicBoolean();
send(service1, node2, responseHandlerShouldBeCalledNormally(() -> responseHandlerCalled.set(true)));
deterministicTaskQueue.runAllRunnableTasks();
assertTrue(responseHandlerCalled.get());
}
public void testExceptionalResponse() {
registerRequestHandler(service1, requestHandlerShouldNotBeCalled());
Exception e = new Exception("dummy exception");
registerRequestHandler(service2, requestHandlerRepliesExceptionally(e));
AtomicReference<TransportException> responseHandlerException = new AtomicReference<>();
send(service1, node2, responseHandlerShouldBeCalledExceptionally(responseHandlerException::set));
deterministicTaskQueue.runAllRunnableTasks();
assertNotNull(responseHandlerException.get());
assertNotNull(responseHandlerException.get().getCause());
assertThat(responseHandlerException.get().getCause().getMessage(), containsString("dummy exception"));
}
public void testDisconnectedOnRequest() {
registerRequestHandler(service1, requestHandlerShouldNotBeCalled());
registerRequestHandler(service2, requestHandlerShouldNotBeCalled());
disconnectedLinks.add(Tuple.tuple(node1, node2));
AtomicReference<TransportException> responseHandlerException = new AtomicReference<>();
send(service1, node2, responseHandlerShouldBeCalledExceptionally(responseHandlerException::set));
deterministicTaskQueue.runAllRunnableTasks();
assertNotNull(responseHandlerException.get());
assertThat(responseHandlerException.get().getMessage(), containsString("disconnected"));
}
public void testUnavailableOnRequest() {
registerRequestHandler(service1, requestHandlerShouldNotBeCalled());
registerRequestHandler(service2, requestHandlerShouldNotBeCalled());
blackholedLinks.add(Tuple.tuple(node1, node2));
send(service1, node2, responseHandlerShouldNotBeCalled());
deterministicTaskQueue.runAllRunnableTasks();
}
public void testDisconnectedOnSuccessfulResponse() throws IOException {
registerRequestHandler(service1, requestHandlerShouldNotBeCalled());
AtomicReference<TransportChannel> responseHandlerChannel = new AtomicReference<>();
registerRequestHandler(service2, requestHandlerCaptures(responseHandlerChannel::set));
AtomicReference<TransportException> responseHandlerException = new AtomicReference<>();
send(service1, node2, responseHandlerShouldNotBeCalled());
deterministicTaskQueue.runAllRunnableTasks();
assertNull(responseHandlerException.get());
assertNotNull(responseHandlerChannel.get());
disconnectedLinks.add(Tuple.tuple(node2, node1));
responseHandlerChannel.get().sendResponse(TransportResponse.Empty.INSTANCE);
deterministicTaskQueue.runAllRunnableTasks();
}
public void testDisconnectedOnExceptionalResponse() throws IOException {
registerRequestHandler(service1, requestHandlerShouldNotBeCalled());
AtomicReference<TransportChannel> responseHandlerChannel = new AtomicReference<>();
registerRequestHandler(service2, requestHandlerCaptures(responseHandlerChannel::set));
AtomicReference<TransportException> responseHandlerException = new AtomicReference<>();
send(service1, node2, responseHandlerShouldNotBeCalled());
deterministicTaskQueue.runAllRunnableTasks();
assertNull(responseHandlerException.get());
assertNotNull(responseHandlerChannel.get());
disconnectedLinks.add(Tuple.tuple(node2, node1));
responseHandlerChannel.get().sendResponse(new Exception());
deterministicTaskQueue.runAllRunnableTasks();
}
public void testUnavailableOnSuccessfulResponse() throws IOException {
registerRequestHandler(service1, requestHandlerShouldNotBeCalled());
AtomicReference<TransportChannel> responseHandlerChannel = new AtomicReference<>();
registerRequestHandler(service2, requestHandlerCaptures(responseHandlerChannel::set));
AtomicReference<TransportException> responseHandlerException = new AtomicReference<>();
send(service1, node2, responseHandlerShouldNotBeCalled());
deterministicTaskQueue.runAllRunnableTasks();
assertNull(responseHandlerException.get());
assertNotNull(responseHandlerChannel.get());
blackholedLinks.add(Tuple.tuple(node2, node1));
responseHandlerChannel.get().sendResponse(TransportResponse.Empty.INSTANCE);
deterministicTaskQueue.runAllRunnableTasks();
}
public void testUnavailableOnExceptionalResponse() throws IOException {
registerRequestHandler(service1, requestHandlerShouldNotBeCalled());
AtomicReference<TransportChannel> responseHandlerChannel = new AtomicReference<>();
registerRequestHandler(service2, requestHandlerCaptures(responseHandlerChannel::set));
AtomicReference<TransportException> responseHandlerException = new AtomicReference<>();
send(service1, node2, responseHandlerShouldNotBeCalled());
deterministicTaskQueue.runAllRunnableTasks();
assertNull(responseHandlerException.get());
assertNotNull(responseHandlerChannel.get());
blackholedLinks.add(Tuple.tuple(node2, node1));
responseHandlerChannel.get().sendResponse(new Exception());
deterministicTaskQueue.runAllRunnableTasks();
}
}