Transport: shortcut local execution

In several places in the code we need to notify a node it needs to do something (typically the master). When that node is the local node, we have an optimization in serveral places that runs the execution code immediately instead of sending the request through the wire to itself. This is a shame as we need to implement the same pattern again and again. On top of that we may forget (see note bellow) to do so and we might have to write some craft if the code need to run under another thread pool.

This commit folds the optimization in the TrasnportService, shortcutting wire serliazition if the target node is local.

Note: this was discovered by #10247 which tries to import a dangling index quickly after the cluster forms. When sending an import dangling request to master, the code didn't take into account that fact that the local node may master. If this happens quickly enough, one would get a NodeNotConnected exception causing the dangling indices not to be imported. This will succeed after 10s where InternalClusterService.ReconnectToNodes runs and actively connects the local node to itself (which is not needed), potentially after another cluster state update.

Closes #10350
This commit is contained in:
Boaz Leskes 2015-03-31 14:44:01 +02:00
parent a24d898b3a
commit 80e86e5719
7 changed files with 288 additions and 120 deletions

View File

@ -20,7 +20,6 @@
package org.elasticsearch.cluster.action.index;
import org.apache.lucene.store.LockObtainFailedException;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNodes;
@ -32,7 +31,6 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.env.ShardLock;
import org.elasticsearch.index.Index;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.threadpool.ThreadPool;
@ -76,44 +74,23 @@ public class NodeIndexDeletedAction extends AbstractComponent {
public void nodeIndexDeleted(final ClusterState clusterState, final String index, final Settings indexSettings, final String nodeId) throws ElasticsearchException {
final DiscoveryNodes nodes = clusterState.nodes();
if (nodes.localNodeMaster()) {
threadPool.generic().execute(new AbstractRunnable() {
@Override
public void onFailure(Throwable t) {
logger.warn("[{}]failed to ack index store deleted for index", t, index);
}
@Override
protected void doRun() throws Exception {
innerNodeIndexDeleted(index, nodeId);
if (nodes.localNode().isDataNode() == false) {
logger.trace("[{}] not acking store deletion (not a data node)");
return;
}
lockIndexAndAck(index, nodes, nodeId, clusterState, indexSettings);
}
});
} else {
transportService.sendRequest(clusterState.nodes().masterNode(),
INDEX_DELETED_ACTION_NAME, new NodeIndexDeletedMessage(index, nodeId), EmptyTransportResponseHandler.INSTANCE_SAME);
if (nodes.localNode().isDataNode() == false) {
logger.trace("[{}] not acking store deletion (not a data node)");
return;
}
threadPool.generic().execute(new AbstractRunnable() {
@Override
public void onFailure(Throwable t) {
logger.warn("[{}]failed to ack index store deleted for index", t, index);
}
@Override
protected void doRun() throws Exception {
lockIndexAndAck(index, nodes, nodeId, clusterState, indexSettings);
}
});
transportService.sendRequest(clusterState.nodes().masterNode(),
INDEX_DELETED_ACTION_NAME, new NodeIndexDeletedMessage(index, nodeId), EmptyTransportResponseHandler.INSTANCE_SAME);
if (nodes.localNode().isDataNode() == false) {
logger.trace("[{}] not acking store deletion (not a data node)");
return;
}
threadPool.generic().execute(new AbstractRunnable() {
@Override
public void onFailure(Throwable t) {
logger.warn("[{}]failed to ack index store deleted for index", t, index);
}
@Override
protected void doRun() throws Exception {
lockIndexAndAck(index, nodes, nodeId, clusterState, indexSettings);
}
});
}
private void lockIndexAndAck(String index, DiscoveryNodes nodes, String nodeId, ClusterState clusterState, Settings indexSettings) throws IOException {
@ -123,30 +100,14 @@ public class NodeIndexDeletedAction extends AbstractComponent {
// due to a "currently canceled recovery" or so. The shard will delete itself BEFORE the lock is released so it's guaranteed to be
// deleted by the time we get the lock
indicesService.processPendingDeletes(new Index(index), indexSettings, new TimeValue(30, TimeUnit.MINUTES));
if (nodes.localNodeMaster()) {
innerNodeIndexStoreDeleted(index, nodeId);
} else {
transportService.sendRequest(clusterState.nodes().masterNode(),
INDEX_STORE_DELETED_ACTION_NAME, new NodeIndexStoreDeletedMessage(index, nodeId), EmptyTransportResponseHandler.INSTANCE_SAME);
}
transportService.sendRequest(clusterState.nodes().masterNode(),
INDEX_STORE_DELETED_ACTION_NAME, new NodeIndexStoreDeletedMessage(index, nodeId), EmptyTransportResponseHandler.INSTANCE_SAME);
} catch (LockObtainFailedException exc) {
logger.warn("[{}] failed to lock all shards for index - timed out after 30 seconds", index);
}
}
private void innerNodeIndexDeleted(String index, String nodeId) {
for (Listener listener : listeners) {
listener.onNodeIndexDeleted(index, nodeId);
}
}
private void innerNodeIndexStoreDeleted(String index, String nodeId) {
for (Listener listener : listeners) {
listener.onNodeIndexStoreDeleted(index, nodeId);
}
}
public static interface Listener {
public interface Listener {
void onNodeIndexDeleted(String index, String nodeId);
void onNodeIndexStoreDeleted(String index, String nodeId);
@ -161,7 +122,9 @@ public class NodeIndexDeletedAction extends AbstractComponent {
@Override
public void messageReceived(NodeIndexDeletedMessage message, TransportChannel channel) throws Exception {
innerNodeIndexDeleted(message.index, message.nodeId);
for (Listener listener : listeners) {
listener.onNodeIndexDeleted(message.index, message.nodeId);
}
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}
@ -180,7 +143,9 @@ public class NodeIndexDeletedAction extends AbstractComponent {
@Override
public void messageReceived(NodeIndexStoreDeletedMessage message, TransportChannel channel) throws Exception {
innerNodeIndexStoreDeleted(message.index, message.nodeId);
for (Listener listener : listeners) {
listener.onNodeIndexStoreDeleted(message.index, message.nodeId);
}
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}

View File

@ -61,17 +61,7 @@ public class NodeMappingRefreshAction extends AbstractComponent {
logger.warn("can't send mapping refresh for [{}][{}], no master known.", request.index(), Strings.arrayToCommaDelimitedString(request.types()));
return;
}
if (nodes.localNodeMaster()) {
innerMappingRefresh(request);
} else {
transportService.sendRequest(nodes.masterNode(),
ACTION_NAME, request, EmptyTransportResponseHandler.INSTANCE_SAME);
}
}
private void innerMappingRefresh(NodeMappingRefreshRequest request) {
metaDataMappingService.refreshMapping(request.index(), request.indexUUID(), request.types());
transportService.sendRequest(nodes.masterNode(), ACTION_NAME, request, EmptyTransportResponseHandler.INSTANCE_SAME);
}
private class NodeMappingRefreshTransportHandler extends BaseTransportRequestHandler<NodeMappingRefreshRequest> {
@ -83,7 +73,7 @@ public class NodeMappingRefreshAction extends AbstractComponent {
@Override
public void messageReceived(NodeMappingRefreshRequest request, TransportChannel channel) throws Exception {
innerMappingRefresh(request);
metaDataMappingService.refreshMapping(request.index(), request.indexUUID(), request.types());
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}

View File

@ -92,17 +92,13 @@ public class ShardStateAction extends AbstractComponent {
private void innerShardFailed(final ShardRouting shardRouting, final String indexUUID, final String reason, final DiscoveryNode masterNode) {
ShardRoutingEntry shardRoutingEntry = new ShardRoutingEntry(shardRouting, indexUUID, reason);
if (clusterService.localNode().equals(masterNode)) {
innerShardFailed(shardRoutingEntry);
} else {
transportService.sendRequest(masterNode,
SHARD_FAILED_ACTION_NAME, shardRoutingEntry, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
@Override
public void handleException(TransportException exp) {
logger.warn("failed to send failed shard to {}", exp, masterNode);
}
});
}
transportService.sendRequest(masterNode,
SHARD_FAILED_ACTION_NAME, shardRoutingEntry, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
@Override
public void handleException(TransportException exp) {
logger.warn("failed to send failed shard to {}", exp, masterNode);
}
});
}
public void shardStarted(final ShardRouting shardRouting, String indexUUID, final String reason) throws ElasticsearchException {
@ -120,20 +116,17 @@ public class ShardStateAction extends AbstractComponent {
logger.debug("sending shard started for {}", shardRoutingEntry);
if (clusterService.localNode().equals(masterNode)) {
innerShardStarted(shardRoutingEntry);
} else {
transportService.sendRequest(masterNode,
SHARD_STARTED_ACTION_NAME, new ShardRoutingEntry(shardRouting, indexUUID, reason), new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
@Override
public void handleException(TransportException exp) {
logger.warn("failed to send shard started to [{}]", exp, masterNode);
}
});
}
transportService.sendRequest(masterNode,
SHARD_STARTED_ACTION_NAME, new ShardRoutingEntry(shardRouting, indexUUID, reason), new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
@Override
public void handleException(TransportException exp) {
logger.warn("failed to send shard started to [{}]", exp, masterNode);
}
});
}
private void innerShardFailed(final ShardRoutingEntry shardRoutingEntry) {
private void handleShardFailureOnMaster(final ShardRoutingEntry shardRoutingEntry) {
logger.warn("{} received shard failed for {}", shardRoutingEntry.shardRouting.shardId(), shardRoutingEntry);
failedShardQueue.add(shardRoutingEntry);
clusterService.submitStateUpdateTask("shard-failed (" + shardRoutingEntry.shardRouting + "), reason [" + shardRoutingEntry.reason + "]", Priority.HIGH, new ProcessedClusterStateUpdateTask() {
@ -196,7 +189,7 @@ public class ShardStateAction extends AbstractComponent {
});
}
private void innerShardStarted(final ShardRoutingEntry shardRoutingEntry) {
private void shardStartedOnMaster(final ShardRoutingEntry shardRoutingEntry) {
logger.debug("received shard started for {}", shardRoutingEntry);
// buffer shard started requests, and the state update tasks will simply drain it
// this is to optimize the number of "started" events we generate, and batch them
@ -303,7 +296,7 @@ public class ShardStateAction extends AbstractComponent {
@Override
public void messageReceived(ShardRoutingEntry request, TransportChannel channel) throws Exception {
innerShardFailed(request);
handleShardFailureOnMaster(request);
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}
@ -322,7 +315,7 @@ public class ShardStateAction extends AbstractComponent {
@Override
public void messageReceived(ShardRoutingEntry request, TransportChannel channel) throws Exception {
innerShardStarted(request);
shardStartedOnMaster(request);
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}

View File

@ -156,7 +156,7 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
DiscoveryNode localNode = new DiscoveryNode(settings.get("name"), nodeId, transportService.boundAddress().publishAddress(), nodeAttributes, version);
DiscoveryNodes.Builder nodeBuilder = DiscoveryNodes.builder().put(localNode).localNodeId(localNode.id());
this.clusterState = ClusterState.builder(clusterState).nodes(nodeBuilder).blocks(initialBlocks).build();
this.transportService.setLocalNode(localNode);
}
@Override

View File

@ -431,12 +431,8 @@ public class RestoreService extends AbstractComponent implements ClusterStateLis
logger.trace("[{}] successfully restored shard [{}]", snapshotId, shardId);
UpdateIndexShardRestoreStatusRequest request = new UpdateIndexShardRestoreStatusRequest(snapshotId, shardId,
new ShardRestoreStatus(clusterService.state().nodes().localNodeId(), RestoreMetaData.State.SUCCESS));
if (clusterService.state().nodes().localNodeMaster()) {
innerUpdateRestoreState(request);
} else {
transportService.sendRequest(clusterService.state().nodes().masterNode(),
UPDATE_RESTORE_ACTION_NAME, request, EmptyTransportResponseHandler.INSTANCE_SAME);
}
}
public final static class RestoreCompletionResponse {
@ -462,7 +458,7 @@ public class RestoreService extends AbstractComponent implements ClusterStateLis
*
* @param request update shard status request
*/
private void innerUpdateRestoreState(final UpdateIndexShardRestoreStatusRequest request) {
private void updateRestoreStateOnMaster(final UpdateIndexShardRestoreStatusRequest request) {
clusterService.submitStateUpdateTask("update snapshot state", new ProcessedClusterStateUpdateTask() {
private RestoreInfo restoreInfo = null;
@ -657,7 +653,7 @@ public class RestoreService extends AbstractComponent implements ClusterStateLis
if (shardsToFail != null) {
for (ShardId shardId : shardsToFail) {
logger.trace("[{}] failing running shard restore [{}]", entry.snapshotId(), shardId);
innerUpdateRestoreState(new UpdateIndexShardRestoreStatusRequest(entry.snapshotId(), shardId, new ShardRestoreStatus(null, RestoreMetaData.State.FAILURE, "index was deleted")));
updateRestoreStateOnMaster(new UpdateIndexShardRestoreStatusRequest(entry.snapshotId(), shardId, new ShardRestoreStatus(null, RestoreMetaData.State.FAILURE, "index was deleted")));
}
}
}
@ -671,12 +667,8 @@ public class RestoreService extends AbstractComponent implements ClusterStateLis
logger.debug("[{}] failed to restore shard [{}]", snapshotId, shardId);
UpdateIndexShardRestoreStatusRequest request = new UpdateIndexShardRestoreStatusRequest(snapshotId, shardId,
new ShardRestoreStatus(clusterService.state().nodes().localNodeId(), RestoreMetaData.State.FAILURE));
if (clusterService.state().nodes().localNodeMaster()) {
innerUpdateRestoreState(request);
} else {
transportService.sendRequest(clusterService.state().nodes().masterNode(),
UPDATE_RESTORE_ACTION_NAME, request, EmptyTransportResponseHandler.INSTANCE_SAME);
}
}
private boolean failed(Snapshot snapshot, String index) {
@ -1001,7 +993,7 @@ public class RestoreService extends AbstractComponent implements ClusterStateLis
@Override
public void messageReceived(UpdateIndexShardRestoreStatusRequest request, final TransportChannel channel) throws Exception {
innerUpdateRestoreState(request);
updateRestoreStateOnMaster(request);
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}

View File

@ -36,13 +36,11 @@ import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.BoundTransportAddress;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.ConcurrentMapLong;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.common.util.concurrent.*;
import org.elasticsearch.node.settings.NodeSettingsService;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedHashMap;
@ -94,6 +92,9 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
volatile String[] tracelLogExclude;
private final ApplySettings settingsListener = new ApplySettings();
/** if set will call requests sent to this id to shortcut and executed locally */
volatile String localNodeId = null;
public TransportService(Transport transport, ThreadPool threadPool) {
this(EMPTY_SETTINGS, transport, threadPool);
}
@ -109,6 +110,21 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
adapter = createAdapter();
}
/**
* makes the transport service aware of the local node. this allows it to optimize requests sent
* from the local node to it self and by pass the network stack/ serialization
*
* @param localNode
*/
public void setLocalNode(DiscoveryNode localNode) {
localNodeId = localNode.id();
}
// for testing
String getLocalNodeId() {
return localNodeId;
}
protected Adapter createAdapter() {
return new Adapter();
}
@ -209,18 +225,27 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
}
public boolean nodeConnected(DiscoveryNode node) {
return transport.nodeConnected(node);
return node.id().equals(localNodeId) || transport.nodeConnected(node);
}
public void connectToNode(DiscoveryNode node) throws ConnectTransportException {
if (node.id().equals(localNodeId)) {
return;
}
transport.connectToNode(node);
}
public void connectToNodeLight(DiscoveryNode node) throws ConnectTransportException {
if (node.id().equals(localNodeId)) {
return;
}
transport.connectToNodeLight(node);
}
public void disconnectFromNode(DiscoveryNode node) {
if (node.id().equals(localNodeId)) {
return;
}
transport.disconnectFromNode(node);
}
@ -273,7 +298,11 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
assert options.timeout() != null;
timeoutHandler.future = threadPool.schedule(options.timeout(), ThreadPool.Names.GENERIC, timeoutHandler);
}
transport.sendRequest(node, requestId, action, request, options);
if (node.id().equals(localNodeId)) {
sendLocalRequest(requestId, action, request);
} else {
transport.sendRequest(node, requestId, action, request, options);
}
} catch (final Throwable e) {
// usually happen either because we failed to connect to the node
// or because we failed serializing the message
@ -294,6 +323,53 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
}
}
private void sendLocalRequest(long requestId, final String action, final TransportRequest request) {
final DirectResponseChannel channel = new DirectResponseChannel(action, requestId, adapter, threadPool);
try {
final TransportRequestHandler handler = adapter.handler(action);
if (handler == null) {
throw new ActionNotFoundTransportException("Action [" + action + "] not found");
}
final String executor = handler.executor();
if (ThreadPool.Names.SAME.equals(executor)) {
//noinspection unchecked
handler.messageReceived(request, channel);
} else {
threadPool.executor(executor).execute(new AbstractRunnable() {
@Override
protected void doRun() throws Exception {
//noinspection unchecked
handler.messageReceived(request, channel);
}
@Override
public boolean isForceExecution() {
return handler.isForceExecution();
}
@Override
public void onFailure(Throwable e) {
try {
channel.sendResponse(e);
} catch (Throwable e1) {
logger.warn("failed to notify channel of error message for action [" + action + "]", e1);
logger.warn("actual exception", e);
}
}
});
}
} catch (Throwable e) {
try {
channel.sendResponse(e);
} catch (Throwable e1) {
logger.warn("failed to notify channel of error message for action [" + action + "]", e1);
logger.warn("actual exception", e1);
}
}
}
private boolean shouldTraceAction(String action) {
if (tracerLogInclude.length > 0) {
if (Regex.simpleMatch(tracerLogInclude, action) == false) {
@ -609,4 +685,89 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
}
}
}
static class DirectResponseChannel implements TransportChannel {
final private String action;
final private long requestId;
final TransportServiceAdapter adapter;
final ThreadPool threadPool;
public DirectResponseChannel(String action, long requestId, TransportServiceAdapter adapter, ThreadPool threadPool) {
this.action = action;
this.requestId = requestId;
this.adapter = adapter;
this.threadPool = threadPool;
}
@Override
public String action() {
return action;
}
@Override
public void sendResponse(TransportResponse response) throws IOException {
sendResponse(response, TransportResponseOptions.EMPTY);
}
@Override
public void sendResponse(final TransportResponse response, TransportResponseOptions options) throws IOException {
final TransportResponseHandler handler = adapter.onResponseReceived(requestId);
// ignore if its null, the adapter logs it
if (handler != null) {
final String executor = handler.executor();
if (ThreadPool.Names.SAME.equals(executor)) {
processResponse(handler, response);
} else {
threadPool.executor(executor).execute(new Runnable() {
@SuppressWarnings({"unchecked"})
@Override
public void run() {
processResponse(handler, response);
}
});
}
}
}
protected void processResponse(TransportResponseHandler handler, TransportResponse response) {
try {
handler.handleResponse(response);
} catch (Throwable e) {
handler.handleException(new ResponseHandlerFailureTransportException(e));
}
}
@Override
public void sendResponse(Throwable error) throws IOException {
final TransportResponseHandler handler = adapter.onResponseReceived(requestId);
// ignore if its null, the adapter logs it
if (handler != null) {
if (!(error instanceof RemoteTransportException)) {
error = new RemoteTransportException(error.getMessage(), error);
}
final RemoteTransportException rtx = (RemoteTransportException) error;
final String executor = handler.executor();
if (ThreadPool.Names.SAME.equals(executor)) {
processException(handler, rtx);
} else {
threadPool.executor(handler.executor()).execute(new Runnable() {
@SuppressWarnings({"unchecked"})
@Override
public void run() {
processException(handler, rtx);
}
});
}
}
}
protected void processException(final TransportResponseHandler handler, final RemoteTransportException rtx) {
try {
handler.handleException(rtx);
} catch (Throwable e) {
handler.handleException(new ResponseHandlerFailureTransportException(e));
}
}
}
}

View File

@ -80,7 +80,8 @@ public abstract class AbstractSimpleTransportTests extends ElasticsearchTestCase
// wait till all nodes are properly connected and the event has been sent, so tests in this class
// will not get this callback called on the connections done in this setup
final CountDownLatch latch = new CountDownLatch(4);
final boolean useLocalNode = randomBoolean();
final CountDownLatch latch = new CountDownLatch(useLocalNode ? 2 : 4);
TransportConnectionListener waitForConnection = new TransportConnectionListener() {
@Override
public void onNodeConnected(DiscoveryNode node) {
@ -95,10 +96,18 @@ public abstract class AbstractSimpleTransportTests extends ElasticsearchTestCase
serviceA.addConnectionListener(waitForConnection);
serviceB.addConnectionListener(waitForConnection);
if (useLocalNode) {
logger.info("--> using local node optimization");
serviceA.setLocalNode(nodeA);
serviceB.setLocalNode(nodeB);
} else {
logger.info("--> actively connecting to local node");
serviceA.connectToNode(nodeA);
serviceB.connectToNode(nodeB);
}
serviceA.connectToNode(nodeB);
serviceA.connectToNode(nodeA);
serviceB.connectToNode(nodeA);
serviceB.connectToNode(nodeB);
assertThat("failed to wait for all nodes to connect", latch.await(5, TimeUnit.SECONDS), equalTo(true));
serviceA.removeConnectionListener(waitForConnection);
@ -204,6 +213,64 @@ public abstract class AbstractSimpleTransportTests extends ElasticsearchTestCase
serviceA.removeHandler("sayHello");
}
@Test
public void testLocalNodeConnection() throws InterruptedException {
assertTrue("serviceA is not connected to nodeA", serviceA.nodeConnected(nodeA));
if (((TransportService) serviceA).getLocalNodeId() != null) {
// this should be a noop
serviceA.disconnectFromNode(nodeA);
}
final AtomicReference<Exception> exception = new AtomicReference<>();
serviceA.registerHandler("localNode", new BaseTransportRequestHandler<StringMessageRequest>() {
@Override
public StringMessageRequest newInstance() {
return new StringMessageRequest();
}
@Override
public String executor() {
return ThreadPool.Names.GENERIC;
}
@Override
public void messageReceived(StringMessageRequest request, TransportChannel channel) {
try {
channel.sendResponse(new StringMessageResponse(request.message));
} catch (IOException e) {
exception.set(e);
}
}
});
final AtomicReference<String> responseString = new AtomicReference<>();
final CountDownLatch responseLatch = new CountDownLatch(1);
serviceA.sendRequest(nodeA, "localNode", new StringMessageRequest("test"), new TransportResponseHandler<StringMessageResponse>() {
@Override
public StringMessageResponse newInstance() {
return new StringMessageResponse();
}
@Override
public void handleResponse(StringMessageResponse response) {
responseString.set(response.message);
responseLatch.countDown();
}
@Override
public void handleException(TransportException exp) {
exception.set(exp);
responseLatch.countDown();
}
@Override
public String executor() {
return ThreadPool.Names.GENERIC;
}
});
responseLatch.await();
assertNull(exception.get());
assertThat(responseString.get(), equalTo("test"));
}
@Test
public void testVoidMessageCompressed() {
serviceA.registerHandler("sayHello", new BaseTransportRequestHandler<TransportRequest.Empty>() {
@ -367,7 +434,7 @@ public abstract class AbstractSimpleTransportTests extends ElasticsearchTestCase
res.txGet();
fail("exception should be thrown");
} catch (Exception e) {
assertThat("bad message !!!", equalTo(e.getCause().getMessage()));
assertThat(e.getCause().getMessage(), equalTo("bad message !!!"));
}
serviceA.removeHandler("sayHelloException");