Remove setLocalNode from ClusterService and TransportService (#22608)

ClusterService and TransportService expect the local discovery node to be set
before they are started but this requires manual interaction and is error prone since
to work absolutely correct they should share the same instance (same ephemeral ID).

TransportService also has 2 modes of operation, mainly realted to transport client vs. internal
to a node. This change removes the mode where we don't maintain a local node and uses a dummy local
node in the transport client since we don't bind to any port in such a case.

Local discovery node instances are now managed by the node itself and only suppliers and factories that allow
creation only once are passed to TransportService and ClusterService.
This commit is contained in:
Simon Willnauer 2017-01-13 16:12:27 +01:00 committed by GitHub
parent d5fa84f869
commit 4c1ee018f6
35 changed files with 185 additions and 150 deletions

View File

@ -29,6 +29,7 @@ import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.client.support.AbstractClient;
import org.elasticsearch.cluster.ClusterModule;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.component.LifecycleComponent;
import org.elasticsearch.common.inject.Injector;
import org.elasticsearch.common.inject.Module;
@ -172,7 +173,9 @@ public abstract class TransportClient extends AbstractClient {
bigArrays, circuitBreakerService, namedWriteableRegistry, xContentRegistry, networkService);
final Transport transport = networkModule.getTransportSupplier().get();
final TransportService transportService = new TransportService(settings, transport, threadPool,
networkModule.getTransportInterceptor(), null);
networkModule.getTransportInterceptor(),
boundTransportAddress -> DiscoveryNode.createLocal(settings, new TransportAddress(TransportAddress.META_ADDRESS, 0),
UUIDs.randomBase64UUID()), null);
modules.add((b -> {
b.bind(BigArrays.class).toInstance(bigArrays);
b.bind(PluginsService.class).toInstance(pluginsService);

View File

@ -202,7 +202,7 @@ public class DiscoveryNode implements Writeable, ToXContent {
roles.add(DiscoveryNode.Role.DATA);
}
return new DiscoveryNode(Node.NODE_NAME_SETTING.get(settings), nodeId, publishAddress,attributes, roles, Version.CURRENT);
return new DiscoveryNode(Node.NODE_NAME_SETTING.get(settings), nodeId, publishAddress, attributes, roles, Version.CURRENT);
}
/**

View File

@ -102,6 +102,7 @@ public class ClusterService extends AbstractLifecycleComponent {
public static final String UPDATE_THREAD_NAME = "clusterService#updateTask";
private final ThreadPool threadPool;
private final ClusterName clusterName;
private final Supplier<DiscoveryNode> localNodeSupplier;
private BiConsumer<ClusterChangedEvent, Discovery.AckListener> clusterStatePublisher;
@ -140,8 +141,9 @@ public class ClusterService extends AbstractLifecycleComponent {
private DiscoverySettings discoverySettings;
public ClusterService(Settings settings,
ClusterSettings clusterSettings, ThreadPool threadPool) {
ClusterSettings clusterSettings, ThreadPool threadPool, Supplier<DiscoveryNode> localNodeSupplier) {
super(settings);
this.localNodeSupplier = localNodeSupplier;
this.operationRouting = new OperationRouting(settings, clusterSettings);
this.threadPool = threadPool;
this.clusterSettings = clusterSettings;
@ -167,14 +169,6 @@ public class ClusterService extends AbstractLifecycleComponent {
clusterStatePublisher = publisher;
}
public synchronized void setLocalNode(DiscoveryNode localNode) {
assert state().nodes().getLocalNodeId() == null : "local node is already set";
updateState(clusterState -> {
DiscoveryNodes nodes = DiscoveryNodes.builder(clusterState.nodes()).add(localNode).localNodeId(localNode.getId()).build();
return ClusterState.builder(clusterState).nodes(nodes).build();
});
}
private void updateState(UnaryOperator<ClusterState> updateFunction) {
this.state.getAndUpdate(updateFunction);
}
@ -214,11 +208,16 @@ public class ClusterService extends AbstractLifecycleComponent {
@Override
protected synchronized void doStart() {
Objects.requireNonNull(clusterStatePublisher, "please set a cluster state publisher before starting");
Objects.requireNonNull(state().nodes().getLocalNode(), "please set the local node before starting");
Objects.requireNonNull(nodeConnectionsService, "please set the node connection service before starting");
Objects.requireNonNull(discoverySettings, "please set discovery settings before starting");
addListener(localNodeMasterListeners);
updateState(state -> ClusterState.builder(state).blocks(initialBlocks).build());
DiscoveryNode localNode = localNodeSupplier.get();
assert localNode != null;
updateState(state -> {
assert state.nodes().getLocalNodeId() == null : "local node is already set";
DiscoveryNodes nodes = DiscoveryNodes.builder(state.nodes()).add(localNode).localNodeId(localNode.getId()).build();
return ClusterState.builder(state).nodes(nodes).blocks(initialBlocks).build();
});
this.threadPoolExecutor = EsExecutors.newSinglePrioritizing(UPDATE_THREAD_NAME, daemonThreadFactory(settings, UPDATE_THREAD_NAME),
threadPool.getThreadContext());
}

View File

@ -22,6 +22,7 @@ package org.elasticsearch.node;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.util.Constants;
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.Build;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchTimeoutException;
@ -213,6 +214,7 @@ public class Node implements Closeable {
private final PluginsService pluginsService;
private final NodeClient client;
private final Collection<LifecycleComponent> pluginLifecycleComponents;
private final LocalNodeFactory localNodeFactory;
/**
* Constructs a node with the given settings.
@ -249,7 +251,6 @@ public class Node implements Closeable {
} catch (IOException ex) {
throw new IllegalStateException("Failed to created node environment", ex);
}
final boolean hadPredefinedNodeName = NODE_NAME_SETTING.exists(tmpSettings);
Logger logger = Loggers.getLogger(Node.class, tmpSettings);
final String nodeId = nodeEnvironment.nodeId();
@ -285,6 +286,8 @@ public class Node implements Closeable {
this.pluginsService = new PluginsService(tmpSettings, environment.modulesFile(), environment.pluginsFile(), classpathPlugins);
this.settings = pluginsService.updatedSettings();
localNodeFactory = new LocalNodeFactory(settings, nodeEnvironment.nodeId());
// create the environment based on the finalized (processed) view of the settings
// this is just to makes sure that people get the same settings, no matter where they ask them from
this.environment = new Environment(this.settings);
@ -319,7 +322,8 @@ public class Node implements Closeable {
resourcesToClose.add(resourceWatcherService);
final NetworkService networkService = new NetworkService(settings,
getCustomNameResolvers(pluginsService.filterPlugins(DiscoveryPlugin.class)));
final ClusterService clusterService = new ClusterService(settings, settingsModule.getClusterSettings(), threadPool);
final ClusterService clusterService = new ClusterService(settings, settingsModule.getClusterSettings(), threadPool,
localNodeFactory::getNode);
clusterService.addListener(scriptModule.getScriptService());
resourcesToClose.add(clusterService);
final IngestService ingestService = new IngestService(settings, threadPool, this.environment,
@ -389,7 +393,7 @@ public class Node implements Closeable {
final MetaDataUpgrader metaDataUpgrader = new MetaDataUpgrader(customMetaDataUpgraders);
final Transport transport = networkModule.getTransportSupplier().get();
final TransportService transportService = newTransportService(settings, transport, threadPool,
networkModule.getTransportInterceptor(), settingsModule.getClusterSettings());
networkModule.getTransportInterceptor(), localNodeFactory, settingsModule.getClusterSettings());
final Consumer<Binder> httpBind;
if (networkModule.isHttpEnabled()) {
HttpServerTransport httpServerTransport = networkModule.getHttpServerTransportSupplier().get();
@ -489,8 +493,10 @@ public class Node implements Closeable {
}
protected TransportService newTransportService(Settings settings, Transport transport, ThreadPool threadPool,
TransportInterceptor interceptor, ClusterSettings clusterSettings) {
return new TransportService(settings, transport, threadPool, interceptor, clusterSettings);
TransportInterceptor interceptor,
Function<BoundTransportAddress, DiscoveryNode> localNodeFactory,
ClusterSettings clusterSettings) {
return new TransportService(settings, transport, threadPool, interceptor, localNodeFactory, clusterSettings);
}
protected void processRecoverySettings(ClusterSettings clusterSettings, RecoverySettings recoverySettings) {
@ -576,17 +582,13 @@ public class Node implements Closeable {
validateNodeBeforeAcceptingRequests(settings, transportService.boundAddress(), pluginsService.filterPlugins(Plugin.class).stream()
.flatMap(p -> p.getBootstrapChecks().stream()).collect(Collectors.toList()));
DiscoveryNode localNode = DiscoveryNode.createLocal(settings,
transportService.boundAddress().publishAddress(), injector.getInstance(NodeEnvironment.class).nodeId());
// TODO: need to find a cleaner way to start/construct a service with some initial parameters,
// playing nice with the life cycle interfaces
clusterService.setLocalNode(localNode);
transportService.setLocalNode(localNode);
clusterService.addStateApplier(transportService.getTaskManager());
clusterService.start();
assert localNodeFactory.getNode() != null;
assert transportService.getLocalNode().equals(localNodeFactory.getNode())
: "transportService has a different local node than the factory provided";
assert clusterService.localNode().equals(localNodeFactory.getNode())
: "clusterService has a different local node than the factory provided";
// start after cluster service so the local disco is known
discovery.start();
transportService.acceptIncomingRequests();
@ -886,4 +888,26 @@ public class Node implements Closeable {
ThreadPool threadPool, NodeClient client) {
return new InternalClusterInfoService(settings, clusterService, threadPool, client);
}
private static class LocalNodeFactory implements Function<BoundTransportAddress, DiscoveryNode> {
private final SetOnce<DiscoveryNode> localNode = new SetOnce<>();
private final String persistentNodeId;
private final Settings settings;
private LocalNodeFactory(Settings settings, String persistentNodeId) {
this.persistentNodeId = persistentNodeId;
this.settings = settings;
}
@Override
public DiscoveryNode apply(BoundTransportAddress boundTransportAddress) {
localNode.set(DiscoveryNode.createLocal(settings, boundTransportAddress.publishAddress(), persistentNodeId));
return localNode.get();
}
DiscoveryNode getNode() {
assert localNode.get() != null;
return localNode.get();
}
}
}

View File

@ -80,6 +80,7 @@ public class TransportService extends AbstractLifecycleComponent {
protected final ClusterName clusterName;
protected final TaskManager taskManager;
private final TransportInterceptor.AsyncSender asyncSender;
private final Function<BoundTransportAddress, DiscoveryNode> localNodeFactory;
volatile Map<String, RequestHandlerRegistry> requestHandlers = Collections.emptyMap();
final Object requestHandlerMutex = new Object();
@ -143,10 +144,11 @@ public class TransportService extends AbstractLifecycleComponent {
* updates for {@link #TRACE_LOG_EXCLUDE_SETTING} and {@link #TRACE_LOG_INCLUDE_SETTING}.
*/
public TransportService(Settings settings, Transport transport, ThreadPool threadPool, TransportInterceptor transportInterceptor,
@Nullable ClusterSettings clusterSettings) {
Function<BoundTransportAddress, DiscoveryNode> localNodeFactory, @Nullable ClusterSettings clusterSettings) {
super(settings);
this.transport = transport;
this.threadPool = threadPool;
this.localNodeFactory = localNodeFactory;
this.clusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings);
setTracerLogInclude(TRACE_LOG_INCLUDE_SETTING.get(settings));
setTracerLogExclude(TRACE_LOG_EXCLUDE_SETTING.get(settings));
@ -162,15 +164,9 @@ public class TransportService extends AbstractLifecycleComponent {
}
/**
* makes the transport service aware of the local node. this allows it to optimize requests sent
* from the local node to it self and by pass the network stack/ serialization
* Returns the local node representation
*/
public void setLocalNode(DiscoveryNode localNode) {
this.localNode = localNode;
}
// for testing
DiscoveryNode getLocalNode() {
public DiscoveryNode getLocalNode() {
return localNode;
}
@ -200,18 +196,20 @@ public class TransportService extends AbstractLifecycleComponent {
adapter.txMetric.clear();
transport.transportServiceAdapter(adapter);
transport.start();
if (transport.boundAddress() != null && logger.isInfoEnabled()) {
logger.info("{}", transport.boundAddress());
for (Map.Entry<String, BoundTransportAddress> entry : transport.profileBoundAddresses().entrySet()) {
logger.info("profile [{}]: {}", entry.getKey(), entry.getValue());
}
}
localNode = localNodeFactory.apply(transport.boundAddress());
registerRequestHandler(
HANDSHAKE_ACTION_NAME,
() -> HandshakeRequest.INSTANCE,
ThreadPool.Names.SAME,
(request, channel) -> channel.sendResponse(
new HandshakeResponse(localNode, clusterName, localNode != null ? localNode.getVersion() : Version.CURRENT)));
new HandshakeResponse(localNode, clusterName, localNode.getVersion())));
}
@Override
@ -294,7 +292,7 @@ public class TransportService extends AbstractLifecycleComponent {
* Returns <code>true</code> iff the given node is already connected.
*/
public boolean nodeConnected(DiscoveryNode node) {
return node.equals(localNode) || transport.nodeConnected(node);
return isLocalNode(node) || transport.nodeConnected(node);
}
public void connectToNode(DiscoveryNode node) throws ConnectTransportException {
@ -308,7 +306,7 @@ public class TransportService extends AbstractLifecycleComponent {
* @param connectionProfile the connection profile to use when connecting to this node
*/
public void connectToNode(final DiscoveryNode node, ConnectionProfile connectionProfile) {
if (node.equals(localNode)) {
if (isLocalNode(node)) {
return;
}
transport.connectToNode(node, connectionProfile);
@ -321,7 +319,7 @@ public class TransportService extends AbstractLifecycleComponent {
* @param profile the connection profile to use
*/
public Transport.Connection openConnection(final DiscoveryNode node, ConnectionProfile profile) throws IOException {
if (node.equals(localNode)) {
if (isLocalNode(node)) {
return localNodeConnection;
} else {
return transport.openConnection(node, profile);
@ -362,7 +360,7 @@ public class TransportService extends AbstractLifecycleComponent {
if (!Objects.equals(clusterName, response.clusterName)) {
throw new IllegalStateException("handshake failed, mismatched cluster name [" + response.clusterName + "] - " + node);
} else if (response.version.isCompatible((localNode != null ? localNode.getVersion() : Version.CURRENT)) == false) {
} else if (response.version.isCompatible(localNode.getVersion()) == false) {
throw new IllegalStateException("handshake failed, incompatible version [" + response.version + "] - " + node);
}
@ -410,7 +408,7 @@ public class TransportService extends AbstractLifecycleComponent {
}
public void disconnectFromNode(DiscoveryNode node) {
if (node.equals(localNode)) {
if (isLocalNode(node)) {
return;
}
transport.disconnectFromNode(node);
@ -481,7 +479,7 @@ public class TransportService extends AbstractLifecycleComponent {
* @throws NodeNotConnectedException if the given node is not connected
*/
public Transport.Connection getConnection(DiscoveryNode node) {
if (Objects.requireNonNull(node, "node must be non-null").equals(localNode)) {
if (isLocalNode(node)) {
return localNodeConnection;
} else {
return transport.getConnection(node);
@ -1136,4 +1134,8 @@ public class TransportService extends AbstractLifecycleComponent {
return "direct";
}
}
private boolean isLocalNode(DiscoveryNode discoveryNode) {
return Objects.requireNonNull(discoveryNode, "discovery node must not be null").equals(localNode);
}
}

View File

@ -174,7 +174,7 @@ public abstract class TaskManagerTestCase extends ESTestCase {
new MockTcpTransport(settings, threadPool, BigArrays.NON_RECYCLING_INSTANCE, new NoneCircuitBreakerService(),
new NamedWriteableRegistry(ClusterModule.getNamedWriteables()),
new NetworkService(settings, Collections.emptyList())),
threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, null) {
threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> clusterService.localNode(), null) {
@Override
protected TaskManager createTaskManager() {
if (MockTaskManager.USE_MOCK_TASK_MANAGER_SETTING.get(settings)) {

View File

@ -88,7 +88,8 @@ public class TransportBulkActionTookTests extends ESTestCase {
private TransportBulkAction createAction(boolean controlled, AtomicLong expected) {
CapturingTransport capturingTransport = new CapturingTransport();
TransportService transportService = new TransportService(clusterService.getSettings(), capturingTransport, threadPool,
TransportService.NOOP_TRANSPORT_INTERCEPTOR, null);
TransportService.NOOP_TRANSPORT_INTERCEPTOR,
boundAddress -> clusterService.localNode(), null);
transportService.start();
transportService.acceptIncomingRequests();
IndexNameExpressionResolver resolver = new Resolver(Settings.EMPTY);

View File

@ -121,7 +121,7 @@ public class MainActionTests extends ESTestCase {
when(clusterService.state()).thenReturn(state);
TransportService transportService = new TransportService(Settings.EMPTY, null, null, TransportService.NOOP_TRANSPORT_INTERCEPTOR,
null);
x -> null, null);
TransportMainAction action = new TransportMainAction(settings, mock(ThreadPool.class), transportService, mock(ActionFilters.class),
mock(IndexNameExpressionResolver.class), clusterService);
AtomicReference<MainResponse> responseRef = new AtomicReference<>();

View File

@ -30,6 +30,7 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.test.ESTestCase;
@ -48,7 +49,7 @@ import static org.mockito.Mockito.when;
public class TransportMultiSearchActionTests extends ESTestCase {
public void testBatchExecute() throws Exception {
// Initialize depedencies of TransportMultiSearchAction
// Initialize dependencies of TransportMultiSearchAction
Settings settings = Settings.builder()
.put("node.name", TransportMultiSearchActionTests.class.getSimpleName())
.build();
@ -57,7 +58,7 @@ public class TransportMultiSearchActionTests extends ESTestCase {
ThreadPool threadPool = new ThreadPool(settings);
TaskManager taskManager = mock(TaskManager.class);
TransportService transportService = new TransportService(Settings.EMPTY, null, null, TransportService.NOOP_TRANSPORT_INTERCEPTOR,
null) {
boundAddress -> DiscoveryNode.createLocal(settings, boundAddress.publishAddress(), UUIDs.randomBase64UUID()), null) {
@Override
public TaskManager getTaskManager() {
return taskManager;

View File

@ -191,7 +191,7 @@ public class TransportBroadcastByNodeActionTests extends ESTestCase {
transport = new CapturingTransport();
clusterService = createClusterService(THREAD_POOL);
final TransportService transportService = new TransportService(clusterService.getSettings(), transport, THREAD_POOL,
TransportService.NOOP_TRANSPORT_INTERCEPTOR, null);
TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> clusterService.localNode(), null);
transportService.start();
transportService.acceptIncomingRequests();
setClusterState(clusterService, TEST_INDEX);

View File

@ -87,7 +87,7 @@ public class TransportMasterNodeActionTests extends ESTestCase {
transport = new CapturingTransport();
clusterService = createClusterService(threadPool);
transportService = new TransportService(clusterService.getSettings(), transport, threadPool,
TransportService.NOOP_TRANSPORT_INTERCEPTOR, null);
TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> clusterService.localNode(), null);
transportService.start();
transportService.acceptIncomingRequests();
localNode = new DiscoveryNode("local_node", buildNewFakeTransportAddress(), Collections.emptyMap(),

View File

@ -182,7 +182,7 @@ public class TransportNodesActionTests extends ESTestCase {
transport = new CapturingTransport();
clusterService = createClusterService(THREAD_POOL);
transportService = new TransportService(clusterService.getSettings(), transport, THREAD_POOL,
TransportService.NOOP_TRANSPORT_INTERCEPTOR, null);
TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> clusterService.localNode(), null);
transportService.start();
transportService.acceptIncomingRequests();
int numNodes = randomIntBetween(3, 10);

View File

@ -96,7 +96,7 @@ public class BroadcastReplicationTests extends ESTestCase {
new NetworkService(Settings.EMPTY, Collections.emptyList()));
clusterService = createClusterService(threadPool);
transportService = new TransportService(clusterService.getSettings(), transport, threadPool,
TransportService.NOOP_TRANSPORT_INTERCEPTOR, null);
TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> clusterService.localNode(), null);
transportService.start();
transportService.acceptIncomingRequests();
broadcastReplicationAction = new TestBroadcastReplicationAction(Settings.EMPTY, threadPool, clusterService, transportService,

View File

@ -155,7 +155,7 @@ public class TransportReplicationActionTests extends ESTestCase {
transport = new CapturingTransport();
clusterService = createClusterService(threadPool);
transportService = new TransportService(clusterService.getSettings(), transport, threadPool,
TransportService.NOOP_TRANSPORT_INTERCEPTOR, null);
TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> clusterService.localNode(), null);
transportService.start();
transportService.acceptIncomingRequests();
shardStateAction = new ShardStateAction(Settings.EMPTY, clusterService, transportService, null, null, threadPool);

View File

@ -163,8 +163,8 @@ public class TransportWriteActionTests extends ESTestCase {
}
protected TestAction(boolean withDocumentFailureOnPrimary, boolean withDocumentFailureOnReplica) {
super(Settings.EMPTY, "test",
new TransportService(Settings.EMPTY, null, null, TransportService.NOOP_TRANSPORT_INTERCEPTOR, null), null, null, null,
null, new ActionFilters(new HashSet<>()), new IndexNameExpressionResolver(Settings.EMPTY), TestRequest::new,
new TransportService(Settings.EMPTY, null, null, TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null), null,
null, null, null, new ActionFilters(new HashSet<>()), new IndexNameExpressionResolver(Settings.EMPTY), TestRequest::new,
TestRequest::new, ThreadPool.Names.SAME);
this.withDocumentFailureOnPrimary = withDocumentFailureOnPrimary;
this.withDocumentFailureOnReplica = withDocumentFailureOnReplica;

View File

@ -144,7 +144,7 @@ public class TransportInstanceSingleOperationActionTests extends ESTestCase {
transport = new CapturingTransport();
clusterService = createClusterService(THREAD_POOL);
transportService = new TransportService(clusterService.getSettings(), transport, THREAD_POOL,
TransportService.NOOP_TRANSPORT_INTERCEPTOR, null);
TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> clusterService.localNode(), null);
transportService.start();
transportService.acceptIncomingRequests();
action = new TestTransportInstanceSingleOperationAction(

View File

@ -113,8 +113,8 @@ public class TransportClientHeadersTests extends AbstractClientHeadersTestCase {
fail("takes way too long to get the cluster state");
}
assertThat(client.connectedNodes().size(), is(1));
assertThat(client.connectedNodes().get(0).getAddress(), is(transportService.boundAddress().publishAddress()));
assertEquals(1, client.connectedNodes().size());
assertEquals(client.connectedNodes().get(0).getAddress(), transportService.boundAddress().publishAddress());
}
}
@ -165,7 +165,8 @@ public class TransportClientHeadersTests extends AbstractClientHeadersTestCase {
ClusterName cluster1 = new ClusterName("cluster1");
ClusterState.Builder builder = ClusterState.builder(cluster1);
//the sniffer detects only data nodes
builder.nodes(DiscoveryNodes.builder().add(new DiscoveryNode("node_id", address, Collections.emptyMap(),
builder.nodes(DiscoveryNodes.builder().add(new DiscoveryNode("node_id", "someId", "some_ephemeralId_id",
address.address().getHostString(), address.getAddress(), address, Collections.emptyMap(),
Collections.singleton(DiscoveryNode.Role.DATA), Version.CURRENT)));
((TransportResponseHandler<ClusterStateResponse>) handler)
.handleResponse(new ClusterStateResponse(cluster1, builder.build()));

View File

@ -38,6 +38,7 @@ import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.test.ESTestCase;
@ -143,6 +144,9 @@ public class TransportClientNodesServiceTests extends ESTestCase {
}
};
}
}, (addr) -> {
assert addr == null : "boundAddress: " + addr;
return DiscoveryNode.createLocal(settings, buildNewFakeTransportAddress(), UUIDs.randomBase64UUID());
}, null);
transportService.start();
transportService.acceptIncomingRequests();

View File

@ -19,6 +19,8 @@
package org.elasticsearch.cluster;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.ShardAllocationDecision;
@ -39,6 +41,7 @@ import org.elasticsearch.cluster.routing.allocation.decider.ShardsLimitAllocatio
import org.elasticsearch.cluster.routing.allocation.decider.SnapshotInProgressAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.inject.ModuleTestCase;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.IndexScopedSettings;
@ -58,7 +61,8 @@ import java.util.function.Supplier;
public class ClusterModuleTests extends ModuleTestCase {
private ClusterService clusterService = new ClusterService(Settings.EMPTY,
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null);
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null, () ->
new DiscoveryNode(UUIDs.randomBase64UUID(), buildNewFakeTransportAddress(), Version.CURRENT));
static class FakeAllocationDecider extends AllocationDecider {
protected FakeAllocationDecider(Settings settings) {
super(settings);

View File

@ -22,6 +22,7 @@ package org.elasticsearch.cluster;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.component.Lifecycle;
import org.elasticsearch.common.component.LifecycleListener;
import org.elasticsearch.common.settings.Settings;
@ -151,7 +152,8 @@ public class NodeConnectionsServiceTests extends ESTestCase {
public void setUp() throws Exception {
super.setUp();
this.transport = new MockTransport();
transportService = new TransportService(Settings.EMPTY, transport, THREAD_POOL, TransportService.NOOP_TRANSPORT_INTERCEPTOR, null);
transportService = new TransportService(Settings.EMPTY, transport, THREAD_POOL, TransportService.NOOP_TRANSPORT_INTERCEPTOR,
boundAddress -> DiscoveryNode.createLocal(Settings.EMPTY, buildNewFakeTransportAddress(), UUIDs.randomBase64UUID()), null);
transportService.start();
transportService.acceptIncomingRequests();
}

View File

@ -108,7 +108,7 @@ public class ShardStateActionTests extends ESTestCase {
this.transport = new CapturingTransport();
clusterService = createClusterService(THREAD_POOL);
transportService = new TransportService(clusterService.getSettings(), transport, THREAD_POOL,
TransportService.NOOP_TRANSPORT_INTERCEPTOR, null);
TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> clusterService.localNode(), null);
transportService.start();
transportService.acceptIncomingRequests();
shardStateAction = new TestShardStateAction(Settings.EMPTY, clusterService, transportService, null, null);

View File

@ -96,7 +96,7 @@ public class ClusterStateHealthTests extends ESTestCase {
super.setUp();
clusterService = createClusterService(threadPool);
transportService = new TransportService(clusterService.getSettings(), new CapturingTransport(), threadPool,
TransportService.NOOP_TRANSPORT_INTERCEPTOR, null);
TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> clusterService.localNode(), null);
transportService.start();
transportService.acceptIncomingRequests();
}

View File

@ -122,8 +122,7 @@ public class ClusterServiceTests extends ESTestCase {
TimedClusterService createTimedClusterService(boolean makeMaster) throws InterruptedException {
TimedClusterService timedClusterService = new TimedClusterService(Settings.builder().put("cluster.name",
"ClusterServiceTests").build(), new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
threadPool);
timedClusterService.setLocalNode(new DiscoveryNode("node1", buildNewFakeTransportAddress(), emptyMap(),
threadPool, () -> new DiscoveryNode("node1", buildNewFakeTransportAddress(), emptyMap(),
emptySet(), Version.CURRENT));
timedClusterService.setNodeConnectionsService(new NodeConnectionsService(Settings.EMPTY, null, null) {
@Override
@ -1055,8 +1054,7 @@ public class ClusterServiceTests extends ESTestCase {
public void testDisconnectFromNewlyAddedNodesIfClusterStatePublishingFails() throws InterruptedException {
TimedClusterService timedClusterService = new TimedClusterService(Settings.builder().put("cluster.name",
"ClusterServiceTests").build(), new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
threadPool);
timedClusterService.setLocalNode(new DiscoveryNode("node1", buildNewFakeTransportAddress(), emptyMap(),
threadPool, () -> new DiscoveryNode("node1", buildNewFakeTransportAddress(), emptyMap(),
emptySet(), Version.CURRENT));
Set<DiscoveryNode> currentNodes = new HashSet<>();
timedClusterService.setNodeConnectionsService(new NodeConnectionsService(Settings.EMPTY, null, null) {
@ -1273,8 +1271,9 @@ public class ClusterServiceTests extends ESTestCase {
public volatile Long currentTimeOverride = null;
public TimedClusterService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool) {
super(settings, clusterSettings, threadPool);
public TimedClusterService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool,
Supplier<DiscoveryNode> localNodeSupplier) {
super(settings, clusterSettings, threadPool, localNodeSupplier);
}
@Override

View File

@ -422,7 +422,7 @@ public class UnicastZenPingTests extends ESTestCase {
Version.CURRENT);
closeables.push(transport);
final TransportService transportService =
new TransportService(Settings.EMPTY, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, null);
new TransportService(Settings.EMPTY, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null);
closeables.push(transportService);
final int limitPortCounts = randomIntBetween(1, 10);
final List<DiscoveryNode> discoveryNodes = TestUnicastZenPing.resolveHostsLists(
@ -465,7 +465,7 @@ public class UnicastZenPingTests extends ESTestCase {
closeables.push(transport);
final TransportService transportService =
new TransportService(Settings.EMPTY, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, null);
new TransportService(Settings.EMPTY, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null);
closeables.push(transportService);
final List<DiscoveryNode> discoveryNodes = TestUnicastZenPing.resolveHostsLists(
@ -515,7 +515,7 @@ public class UnicastZenPingTests extends ESTestCase {
closeables.push(transport);
final TransportService transportService =
new TransportService(Settings.EMPTY, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, null);
new TransportService(Settings.EMPTY, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null);
closeables.push(transportService);
final TimeValue resolveTimeout = TimeValue.timeValueSeconds(randomIntBetween(1, 3));
try {
@ -703,7 +703,7 @@ public class UnicastZenPingTests extends ESTestCase {
closeables.push(transport);
final TransportService transportService =
new TransportService(Settings.EMPTY, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, null);
new TransportService(Settings.EMPTY, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null);
closeables.push(transportService);
final List<DiscoveryNode> discoveryNodes = TestUnicastZenPing.resolveHostsLists(
executorService,
@ -751,7 +751,8 @@ public class UnicastZenPingTests extends ESTestCase {
.build();
final Transport transport = supplier.apply(nodeSettings, version);
final MockTransportService transportService =
new MockTransportService(nodeSettings, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, null);
new MockTransportService(nodeSettings, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, boundAddress ->
new DiscoveryNode(nodeId, nodeId, boundAddress.publishAddress(), emptyMap(), nodeRoles, version), null);
transportService.start();
transportService.acceptIncomingRequests();
final ConcurrentMap<TransportAddress, AtomicInteger> counters = ConcurrentCollections.newConcurrentMap();
@ -762,10 +763,7 @@ public class UnicastZenPingTests extends ESTestCase {
counters.get(node.getAddress()).incrementAndGet();
}
});
final DiscoveryNode node =
new DiscoveryNode(nodeId, nodeId, transportService.boundAddress().publishAddress(), emptyMap(), nodeRoles, version);
transportService.setLocalNode(node);
return new NetworkHandle(transport.boundAddress().publishAddress(), transportService, node, counters);
return new NetworkHandle(transport.boundAddress().publishAddress(), transportService, transportService.getLocalNode(), counters);
}
private static class NetworkHandle {

View File

@ -179,9 +179,8 @@ public class ZenDiscoveryUnitTests extends ESTestCase {
final MockTransportService masterTransport = MockTransportService.createNewService(settings, Version.CURRENT, threadPool, null);
masterTransport.start();
DiscoveryNode masterNode = new DiscoveryNode("master", masterTransport.boundAddress().publishAddress(), Version.CURRENT);
DiscoveryNode masterNode = masterTransport.getLocalNode();
toClose.addFirst(masterTransport);
masterTransport.setLocalNode(masterNode);
ClusterState state = ClusterStateCreationUtils.state(masterNode, masterNode, masterNode);
// build the zen discovery and cluster service
ClusterService masterClusterService = createClusterService(threadPool, masterNode);
@ -196,8 +195,7 @@ public class ZenDiscoveryUnitTests extends ESTestCase {
final MockTransportService otherTransport = MockTransportService.createNewService(settings, Version.CURRENT, threadPool, null);
otherTransport.start();
toClose.addFirst(otherTransport);
DiscoveryNode otherNode = new DiscoveryNode("other", otherTransport.boundAddress().publishAddress(), Version.CURRENT);
otherTransport.setLocalNode(otherNode);
DiscoveryNode otherNode = otherTransport.getLocalNode();
final ClusterState otherState = ClusterState.builder(masterClusterService.getClusterName())
.nodes(DiscoveryNodes.builder().add(otherNode).localNodeId(otherNode.getId())).build();
ClusterService otherClusterService = createClusterService(threadPool, masterNode);
@ -248,9 +246,8 @@ public class ZenDiscoveryUnitTests extends ESTestCase {
try {
final MockTransportService masterTransport = MockTransportService.createNewService(settings, Version.CURRENT, threadPool, null);
masterTransport.start();
DiscoveryNode masterNode = new DiscoveryNode("master", masterTransport.boundAddress().publishAddress(), Version.CURRENT);
DiscoveryNode masterNode = masterTransport.getLocalNode();
toClose.addFirst(masterTransport);
masterTransport.setLocalNode(masterNode);
ClusterState state = ClusterStateCreationUtils.state(masterNode, null, masterNode);
// build the zen discovery and cluster service
ClusterService masterClusterService = createClusterService(threadPool, masterNode);

View File

@ -19,7 +19,10 @@
package org.elasticsearch.gateway;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
@ -34,7 +37,7 @@ public class GatewayServiceTests extends ESTestCase {
private GatewayService createService(Settings.Builder settings) {
ClusterService clusterService = new ClusterService(Settings.builder().put("cluster.name", "GatewayServiceTests").build(),
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
null);
null, () -> new DiscoveryNode(UUIDs.randomBase64UUID(), buildNewFakeTransportAddress(), Version.CURRENT));
return new GatewayService(settings.build(),
null, clusterService, null, null, null, new NoopDiscovery(), null);
}

View File

@ -82,7 +82,7 @@ public class DynamicMappingDisabledTests extends ESSingleNodeTestCase {
new NoneCircuitBreakerService(), new NamedWriteableRegistry(Collections.emptyList()),
new NetworkService(settings, Collections.emptyList()));
transportService = new TransportService(clusterService.getSettings(), transport, THREAD_POOL,
TransportService.NOOP_TRANSPORT_INTERCEPTOR, null);
TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> clusterService.localNode(), null);
indicesService = getInstanceFromNode(IndicesService.class);
shardStateAction = new ShardStateAction(settings, clusterService, transportService, null, null, THREAD_POOL);
actionFilters = new ActionFilters(Collections.emptySet());

View File

@ -57,7 +57,7 @@ public class GlobalCheckpointSyncActionTests extends ESTestCase {
transport = new CapturingTransport();
clusterService = createClusterService(threadPool);
transportService = new TransportService(clusterService.getSettings(), transport, threadPool,
TransportService.NOOP_TRANSPORT_INTERCEPTOR, null);
TransportService.NOOP_TRANSPORT_INTERCEPTOR, boundAddress -> clusterService.localNode(), null);
transportService.start();
transportService.acceptIncomingRequests();
shardStateAction = new ShardStateAction(Settings.EMPTY, clusterService, transportService, null, null, threadPool);

View File

@ -52,6 +52,7 @@ import org.elasticsearch.cluster.metadata.MetaDataDeleteIndexService;
import org.elasticsearch.cluster.metadata.MetaDataIndexStateService;
import org.elasticsearch.cluster.metadata.MetaDataIndexUpgradeService;
import org.elasticsearch.cluster.metadata.MetaDataUpdateSettingsService;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.FailedShard;
@ -61,6 +62,7 @@ import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
import org.elasticsearch.cluster.routing.allocation.decider.ReplicaAfterPrimaryActiveAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.SameShardAllocationDecider;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.IndexScopedSettings;
@ -155,7 +157,8 @@ public class ClusterStateChanges extends AbstractComponent {
// services
TransportService transportService = new TransportService(settings, transport, threadPool,
TransportService.NOOP_TRANSPORT_INTERCEPTOR, clusterSettings);
TransportService.NOOP_TRANSPORT_INTERCEPTOR,
boundAddress -> DiscoveryNode.createLocal(settings, boundAddress.publishAddress(), UUIDs.randomBase64UUID()), clusterSettings);
MetaDataIndexUpgradeService metaDataIndexUpgradeService = new MetaDataIndexUpgradeService(settings, xContentRegistry, null, null) {
// metaData upgrader should do nothing
@Override

View File

@ -368,7 +368,8 @@ public class IndicesClusterStateServiceRandomUpdatesTests extends AbstractIndice
final MockIndicesService indicesService = indicesServiceSupplier.get();
final Settings settings = Settings.builder().put("node.name", discoveryNode.getName()).build();
final TransportService transportService = new TransportService(settings, null, threadPool,
TransportService.NOOP_TRANSPORT_INTERCEPTOR, null);
TransportService.NOOP_TRANSPORT_INTERCEPTOR,
boundAddress -> DiscoveryNode.createLocal(settings, boundAddress.publishAddress(), UUIDs.randomBase64UUID()), null);
final ClusterService clusterService = mock(ClusterService.class);
final RepositoriesService repositoriesService = new RepositoriesService(settings, clusterService,
transportService, null);

View File

@ -67,20 +67,17 @@ public class TransportServiceHandshakeTests extends ESTestCase {
new NamedWriteableRegistry(Collections.emptyList()),
new NetworkService(settings, Collections.emptyList()));
TransportService transportService = new MockTransportService(settings, transport, threadPool,
TransportService.NOOP_TRANSPORT_INTERCEPTOR, null);
TransportService.NOOP_TRANSPORT_INTERCEPTOR, (boundAddress) -> new DiscoveryNode(
nodeNameAndId,
nodeNameAndId,
boundAddress.publishAddress(),
emptyMap(),
emptySet(),
version), null);
transportService.start();
transportService.acceptIncomingRequests();
DiscoveryNode node =
new DiscoveryNode(
nodeNameAndId,
nodeNameAndId,
transportService.boundAddress().publishAddress(),
emptyMap(),
emptySet(),
version);
transportService.setLocalNode(node);
transportServices.add(transportService);
return new NetworkHandle(transportService, node);
return new NetworkHandle(transportService, transportService.getLocalNode());
}
@After

View File

@ -22,9 +22,11 @@ package org.elasticsearch.node;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.cluster.ClusterInfoService;
import org.elasticsearch.cluster.MockInternalClusterInfoService;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.BoundTransportAddress;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.MockBigArrays;
import org.elasticsearch.discovery.zen.UnicastHostsProvider;
@ -46,6 +48,7 @@ import org.elasticsearch.transport.TransportInterceptor;
import org.elasticsearch.transport.TransportService;
import java.util.Collection;
import java.util.function.Function;
/**
* A node for testing which allows:
@ -90,15 +93,17 @@ public class MockNode extends Node {
@Override
protected TransportService newTransportService(Settings settings, Transport transport, ThreadPool threadPool,
TransportInterceptor interceptor, ClusterSettings clusterSettings) {
TransportInterceptor interceptor,
Function<BoundTransportAddress, DiscoveryNode> localNodeFactory,
ClusterSettings clusterSettings) {
// we use the MockTransportService.TestPlugin class as a marker to create a network
// module with this MockNetworkService. NetworkService is such an integral part of the systme
// we don't allow to plug it in from plugins or anything. this is a test-only override and
// can't be done in a production env.
if (getPluginsService().filterPlugins(MockTransportService.TestPlugin.class).isEmpty()) {
return super.newTransportService(settings, transport, threadPool, interceptor, clusterSettings);
return super.newTransportService(settings, transport, threadPool, interceptor, localNodeFactory, clusterSettings);
} else {
return new MockTransportService(settings, transport, threadPool, interceptor, clusterSettings);
return new MockTransportService(settings, transport, threadPool, interceptor, localNodeFactory, clusterSettings);
}
}

View File

@ -50,8 +50,7 @@ public class ClusterServiceUtils {
public static ClusterService createClusterService(ThreadPool threadPool, DiscoveryNode localNode) {
ClusterService clusterService = new ClusterService(Settings.builder().put("cluster.name", "ClusterServiceTests").build(),
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
threadPool);
clusterService.setLocalNode(localNode);
threadPool, () -> localNode);
clusterService.setNodeConnectionsService(new NodeConnectionsService(Settings.EMPTY, null, null) {
@Override
public void connectToNodes(Iterable<DiscoveryNode> discoveryNodes) {

View File

@ -23,6 +23,7 @@ import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterModule;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.component.Lifecycle;
import org.elasticsearch.common.component.LifecycleListener;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
@ -38,6 +39,7 @@ import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.node.Node;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.test.tasks.MockTaskManager;
@ -68,6 +70,7 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
/**
* A mock transport service that allows to simulate different network topology failures.
@ -109,7 +112,21 @@ public final class MockTransportService extends TransportService {
*/
public MockTransportService(Settings settings, Transport transport, ThreadPool threadPool, TransportInterceptor interceptor,
@Nullable ClusterSettings clusterSettings) {
super(settings, new LookupTestTransport(transport), threadPool, interceptor, clusterSettings);
this(settings, transport, threadPool, interceptor, (boundAddress) ->
DiscoveryNode.createLocal(settings, boundAddress.publishAddress(), settings.get(Node.NODE_NAME_SETTING.getKey(),
UUIDs.randomBase64UUID())), clusterSettings);
}
/**
* Build the service.
*
* @param clusterSettings if non null the the {@linkplain TransportService} will register with the {@link ClusterSettings} for settings
* updates for {@link #TRACE_LOG_EXCLUDE_SETTING} and {@link #TRACE_LOG_INCLUDE_SETTING}.
*/
public MockTransportService(Settings settings, Transport transport, ThreadPool threadPool, TransportInterceptor interceptor,
Function<BoundTransportAddress, DiscoveryNode> localNodeFactory,
@Nullable ClusterSettings clusterSettings) {
super(settings, new LookupTestTransport(transport), threadPool, interceptor, localNodeFactory, clusterSettings);
this.original = transport;
}

View File

@ -107,15 +107,12 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
threadPool = new TestThreadPool(getClass().getName());
clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
serviceA = buildService("TS_A", version0, clusterSettings); // this one supports dynamic tracer updates
nodeA = new DiscoveryNode("TS_A", serviceA.boundAddress().publishAddress(), emptyMap(), emptySet(), version0);
// serviceA.setLocalNode(nodeA);
nodeA = serviceA.getLocalNode();
serviceB = buildService("TS_B", version1, null); // this one doesn't support dynamic tracer updates
nodeB = new DiscoveryNode("TS_B", serviceB.boundAddress().publishAddress(), emptyMap(), emptySet(), version1);
//serviceB.setLocalNode(nodeB);
nodeB = serviceB.getLocalNode();
// wait till all nodes are properly connected and the event has been sent, so tests in this class
// will not get this callback called on the connections done in this setup
final boolean useLocalNode = randomBoolean();
final CountDownLatch latch = new CountDownLatch(useLocalNode ? 2 : 4);
final CountDownLatch latch = new CountDownLatch(2);
TransportConnectionListener waitForConnection = new TransportConnectionListener() {
@Override
public void onNodeConnected(DiscoveryNode node) {
@ -130,18 +127,6 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
serviceA.addConnectionListener(waitForConnection);
serviceB.addConnectionListener(waitForConnection);
int numHandshakes = 1;
if (useLocalNode) {
logger.info("--> using local node optimization");
serviceA.setLocalNode(nodeA);
serviceB.setLocalNode(nodeB);
} else {
logger.info("--> actively connecting to local node");
serviceA.connectToNode(nodeA);
serviceB.connectToNode(nodeB);
assertNumHandshakes(numHandshakes, serviceA.getOriginalTransport());
assertNumHandshakes(numHandshakes, serviceB.getOriginalTransport());
numHandshakes++;
}
serviceA.connectToNode(nodeB);
serviceB.connectToNode(nodeA);
assertNumHandshakes(numHandshakes, serviceA.getOriginalTransport());
@ -205,16 +190,13 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
public void testHelloWorld() {
serviceA.registerRequestHandler("sayHello", StringMessageRequest::new, ThreadPool.Names.GENERIC,
new TransportRequestHandler<StringMessageRequest>() {
@Override
public void messageReceived(StringMessageRequest request, TransportChannel channel) {
assertThat("moshe", equalTo(request.message));
try {
channel.sendResponse(new StringMessageResponse("hello " + request.message));
} catch (IOException e) {
logger.error("Unexpected failure", e);
fail(e.getMessage());
}
(request, channel) -> {
assertThat("moshe", equalTo(request.message));
try {
channel.sendResponse(new StringMessageResponse("hello " + request.message));
} catch (IOException e) {
logger.error("Unexpected failure", e);
fail(e.getMessage());
}
});
@ -338,20 +320,15 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
public void testLocalNodeConnection() throws InterruptedException {
assertTrue("serviceA is not connected to nodeA", serviceA.nodeConnected(nodeA));
if (((TransportService) serviceA).getLocalNode() != null) {
// this should be a noop
serviceA.disconnectFromNode(nodeA);
}
// this should be a noop
serviceA.disconnectFromNode(nodeA);
final AtomicReference<Exception> exception = new AtomicReference<>();
serviceA.registerRequestHandler("localNode", StringMessageRequest::new, ThreadPool.Names.GENERIC,
new TransportRequestHandler<StringMessageRequest>() {
@Override
public void messageReceived(StringMessageRequest request, TransportChannel channel) {
try {
channel.sendResponse(new StringMessageResponse(request.message));
} catch (IOException e) {
exception.set(e);
}
(request, channel) -> {
try {
channel.sendResponse(new StringMessageResponse(request.message));
} catch (IOException e) {
exception.set(e);
}
});
final AtomicReference<String> responseString = new AtomicReference<>();
@ -1601,11 +1578,10 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
.build(),
version0,
null, true);
DiscoveryNode nodeC =
new DiscoveryNode("TS_C", "TS_C", serviceC.boundAddress().publishAddress(), emptyMap(), emptySet(), version0);
DiscoveryNode nodeC = serviceC.getLocalNode();
serviceC.acceptIncomingRequests();
final CountDownLatch latch = new CountDownLatch(5);
final CountDownLatch latch = new CountDownLatch(4);
TransportConnectionListener waitForConnection = new TransportConnectionListener() {
@Override
public void onNodeConnected(DiscoveryNode node) {
@ -1625,7 +1601,6 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
serviceC.connectToNode(nodeB);
serviceA.connectToNode(nodeC);
serviceB.connectToNode(nodeC);
serviceC.connectToNode(nodeC);
latch.await();
serviceA.removeConnectionListener(waitForConnection);