Only accept transport requests after node is fully initialized #16746

We should open up the node to the world when it's as ready as possiblAt the moment we open up the transport service before the local node has been fully initialized. This causes bug as some data structures are not fully initialized yet. See for example #16723.

Sadly, we can't just start the TransportService last (as we do with the HTTP server) because the ClusterService needs to know the bound published network address for the local DiscoveryNode. This address can only be determined by actually binding (people may use, for example, port 0). Instead we start the TransportService as late as possible but block any incoming requests until the node has completed initialization.

A couple of other cleanup during start time:
1) The gateway service now starts before the initial cluster join so we can simplify the logic to recover state if the local node has become master.
2) The discovery is started before the transport service accepts requests, but we only start the join process later using a dedicated method.

Closes #16723
Closes #16746
This commit is contained in:
Boaz Leskes 2016-02-18 13:57:59 -08:00
parent c978335968
commit 5a91ad1115
22 changed files with 134 additions and 53 deletions

View File

@ -39,7 +39,6 @@ import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.network.NetworkModule;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsFilter;
import org.elasticsearch.common.settings.SettingsModule;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.indices.breaker.CircuitBreakerModule;
@ -155,7 +154,10 @@ public class TransportClient extends AbstractClient {
pluginsService.processModules(modules);
Injector injector = modules.createInjector();
injector.getInstance(TransportService.class).start();
final TransportService transportService = injector.getInstance(TransportService.class);
transportService.start();
transportService.acceptIncomingRequests();
TransportClient transportClient = new TransportClient(injector);
success = true;
return transportClient;

View File

@ -94,6 +94,12 @@ public interface Discovery extends LifecycleComponent<Discovery> {
DiscoveryStats stats();
/**
* Triggers the first join cycle
*/
void startInitialJoin();
/***
* @return the current value of minimum master nodes, or -1 for not set
*/

View File

@ -87,8 +87,9 @@ public class DiscoveryService extends AbstractLifecycleComponent<DiscoveryServic
logger.info(discovery.nodeDescription());
}
public void waitForInitialState() {
public void joinClusterAndWaitForInitialState() {
try {
discovery.startInitialJoin();
if (!initialStateListener.waitForInitialState(initialStateTimeout)) {
logger.warn("waited for {} and no initial state was set by the discovery", initialStateTimeout);
}

View File

@ -100,6 +100,11 @@ public class LocalDiscovery extends AbstractLifecycleComponent<Discovery> implem
@Override
protected void doStart() {
}
@Override
public void startInitialJoin() {
synchronized (clusterGroups) {
ClusterGroup clusterGroup = clusterGroups.get(clusterName);
if (clusterGroup == null) {

View File

@ -216,7 +216,10 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
joinThreadControl.start();
pingService.start();
this.nodeJoinController = new NodeJoinController(clusterService, routingService, discoverySettings, settings);
}
@Override
public void startInitialJoin() {
// start the join thread from a cluster state update. See {@link JoinThreadControl} for details.
clusterService.submitStateUpdateTask("initial_join", new ClusterStateUpdateTask() {

View File

@ -133,27 +133,6 @@ public class GatewayService extends AbstractLifecycleComponent<GatewayService> i
@Override
protected void doStart() {
clusterService.addLast(this);
// check we didn't miss any cluster state that came in until now / during the addition
clusterService.submitStateUpdateTask("gateway_initial_state_recovery", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
checkStateMeetsSettingsAndMaybeRecover(currentState);
return currentState;
}
@Override
public boolean runOnlyOnMaster() {
// It's OK to run on non masters as checkStateMeetsSettingsAndMaybeRecover checks for this
// we return false to avoid unneeded failure logs
return false;
}
@Override
public void onFailure(String source, Throwable t) {
logger.warn("unexpected failure while checking if state can be recovered. another attempt will be made with the next cluster state change", t);
}
});
}
@Override
@ -170,10 +149,9 @@ public class GatewayService extends AbstractLifecycleComponent<GatewayService> i
if (lifecycle.stoppedOrClosed()) {
return;
}
checkStateMeetsSettingsAndMaybeRecover(event.state());
}
protected void checkStateMeetsSettingsAndMaybeRecover(ClusterState state) {
final ClusterState state = event.state();
if (state.nodes().localNodeMaster() == false) {
// not our job to recover
return;

View File

@ -40,7 +40,6 @@ import org.elasticsearch.common.inject.Injector;
import org.elasticsearch.common.inject.Module;
import org.elasticsearch.common.inject.ModulesBuilder;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
@ -50,7 +49,6 @@ import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsFilter;
import org.elasticsearch.common.settings.SettingsModule;
import org.elasticsearch.common.transport.BoundTransportAddress;
import org.elasticsearch.common.transport.TransportAddress;
@ -279,8 +277,6 @@ public class Node implements Closeable {
injector.getInstance(IndicesTTLService.class).start();
injector.getInstance(SnapshotsService.class).start();
injector.getInstance(SnapshotShardsService.class).start();
injector.getInstance(TransportService.class).start();
injector.getInstance(ClusterService.class).start();
injector.getInstance(RoutingService.class).start();
injector.getInstance(SearchService.class).start();
injector.getInstance(MonitorService.class).start();
@ -289,16 +285,24 @@ public class Node implements Closeable {
// TODO hack around circular dependencies problems
injector.getInstance(GatewayAllocator.class).setReallocation(injector.getInstance(ClusterService.class), injector.getInstance(RoutingService.class));
DiscoveryService discoService = injector.getInstance(DiscoveryService.class).start();
discoService.waitForInitialState();
// gateway should start after disco, so it can try and recovery from gateway on "start"
injector.getInstance(ResourceWatcherService.class).start();
injector.getInstance(GatewayService.class).start();
// Start the transport service now so the publish address will be added to the local disco node in ClusterService
TransportService transportService = injector.getInstance(TransportService.class);
transportService.start();
injector.getInstance(ClusterService.class).start();
// start after cluster service so the local disco is known
DiscoveryService discoService = injector.getInstance(DiscoveryService.class).start();
transportService.acceptIncomingRequests();
discoService.joinClusterAndWaitForInitialState();
if (settings.getAsBoolean("http.enabled", true)) {
injector.getInstance(HttpServer.class).start();
}
injector.getInstance(ResourceWatcherService.class).start();
injector.getInstance(TribeService.class).start();
if (WRITE_PORTS_FIELD_SETTING.get(settings)) {

View File

@ -54,8 +54,8 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.function.Supplier;
@ -71,7 +71,7 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
public static final String DIRECT_RESPONSE_PROFILE = ".direct";
private final AtomicBoolean started = new AtomicBoolean(false);
private final CountDownLatch blockIncomingRequestsLatch = new CountDownLatch(1);
protected final Transport transport;
protected final ThreadPool threadPool;
protected final TaskManager taskManager;
@ -167,6 +167,7 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
void setTracerLogExclude(List<String> tracelLogExclude) {
this.tracelLogExclude = tracelLogExclude.toArray(Strings.EMPTY_ARRAY);
}
@Override
protected void doStart() {
adapter.rxMetric.clear();
@ -179,14 +180,10 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
logger.info("profile [{}]: {}", entry.getKey(), entry.getValue());
}
}
boolean setStarted = started.compareAndSet(false, true);
assert setStarted : "service was already started";
}
@Override
protected void doStop() {
final boolean setStopped = started.compareAndSet(true, false);
assert setStopped : "service has already been stopped";
try {
transport.stop();
} finally {
@ -213,6 +210,15 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
transport.close();
}
/**
* start accepting incoming requests.
* when the transport layer starts up it will block any incoming requests until
* this method is called
*/
public void acceptIncomingRequests() {
blockIncomingRequestsLatch.countDown();
}
public boolean addressSupported(Class<? extends TransportAddress> address) {
return transport.addressSupported(address);
}
@ -302,7 +308,7 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
timeoutHandler = new TimeoutHandler(requestId);
}
clientHandlers.put(requestId, new RequestHolder<>(new ContextRestoreResponseHandler<T>(threadPool.getThreadContext().newStoredContext(), handler), node, action, timeoutHandler));
if (started.get() == false) {
if (lifecycle.stoppedOrClosed()) {
// if we are not started the exception handling will remove the RequestHolder again and calls the handler to notify the caller.
// it will only notify if the toStop code hasn't done the work yet.
throw new TransportException("TransportService is closed stopped can't send request");
@ -405,10 +411,11 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
/**
* Registers a new request handler
* @param action The action the request handler is associated with
*
* @param action The action the request handler is associated with
* @param requestFactory a callable to be used construct new instances for streaming
* @param executor The executor the request handling will be executed on
* @param handler The handler itself that implements the request handling
* @param executor The executor the request handling will be executed on
* @param handler The handler itself that implements the request handling
*/
public <Request extends TransportRequest> void registerRequestHandler(String action, Supplier<Request> requestFactory, String executor, TransportRequestHandler<Request> handler) {
RequestHandlerRegistry<Request> reg = new RequestHandlerRegistry<>(action, requestFactory, taskManager, handler, executor, false);
@ -417,11 +424,12 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
/**
* Registers a new request handler
* @param action The action the request handler is associated with
* @param request The request class that will be used to constrcut new instances for streaming
* @param executor The executor the request handling will be executed on
*
* @param action The action the request handler is associated with
* @param request The request class that will be used to constrcut new instances for streaming
* @param executor The executor the request handling will be executed on
* @param forceExecution Force execution on the executor queue and never reject it
* @param handler The handler itself that implements the request handling
* @param handler The handler itself that implements the request handling
*/
public <Request extends TransportRequest> void registerRequestHandler(String action, Supplier<Request> request, String executor, boolean forceExecution, TransportRequestHandler<Request> handler) {
RequestHandlerRegistry<Request> reg = new RequestHandlerRegistry<>(action, request, taskManager, handler, executor, forceExecution);
@ -494,6 +502,11 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
@Override
public void onRequestReceived(long requestId, String action) {
try {
blockIncomingRequestsLatch.await();
} catch (InterruptedException e) {
logger.trace("interrupted while waiting for incoming requests block to be removed");
}
if (traceEnabled() && shouldTraceAction(action)) {
traceReceivedRequest(requestId, action);
}
@ -729,6 +742,7 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
private final static class ContextRestoreResponseHandler<T extends TransportResponse> implements TransportResponseHandler<T> {
private final TransportResponseHandler<T> delegate;
private final ThreadContext.StoredContext threadContext;
private ContextRestoreResponseHandler(ThreadContext.StoredContext threadContext, TransportResponseHandler<T> delegate) {
this.delegate = delegate;
this.threadContext = threadContext;
@ -766,7 +780,7 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
final ThreadPool threadPool;
public DirectResponseChannel(ESLogger logger, DiscoveryNode localNode, String action, long requestId,
TransportServiceAdapter adapter, ThreadPool threadPool) {
TransportServiceAdapter adapter, ThreadPool threadPool) {
this.logger = logger;
this.localNode = localNode;
this.action = action;

View File

@ -203,6 +203,7 @@ public abstract class TaskManagerTestCase extends ESTestCase {
actionFilters, indexNameExpressionResolver);
transportCancelTasksAction = new TransportCancelTasksAction(settings, clusterName, threadPool, clusterService, transportService,
actionFilters, indexNameExpressionResolver);
transportService.acceptIncomingRequests();
}
public final TestClusterService clusterService;

View File

@ -185,6 +185,7 @@ public class TransportBroadcastByNodeActionTests extends ESTestCase {
clusterService = new TestClusterService(THREAD_POOL);
final TransportService transportService = new TransportService(transport, THREAD_POOL);
transportService.start();
transportService.acceptIncomingRequests();
setClusterState(clusterService, TEST_INDEX);
action = new TestTransportBroadcastByNodeAction(
Settings.EMPTY,

View File

@ -84,6 +84,7 @@ public class TransportMasterNodeActionTests extends ESTestCase {
clusterService = new TestClusterService(threadPool);
transportService = new TransportService(transport, threadPool);
transportService.start();
transportService.acceptIncomingRequests();
localNode = new DiscoveryNode("local_node", DummyTransportAddress.INSTANCE, Version.CURRENT);
remoteNode = new DiscoveryNode("remote_node", DummyTransportAddress.INSTANCE, Version.CURRENT);
allNodes = new DiscoveryNode[] { localNode, remoteNode };

View File

@ -88,6 +88,7 @@ public class BroadcastReplicationTests extends ESTestCase {
clusterService = new TestClusterService(threadPool);
transportService = new TransportService(transport, threadPool);
transportService.start();
transportService.acceptIncomingRequests();
broadcastReplicationAction = new TestBroadcastReplicationAction(Settings.EMPTY, threadPool, clusterService, transportService, new ActionFilters(new HashSet<ActionFilter>()), new IndexNameExpressionResolver(Settings.EMPTY), null);
}

View File

@ -18,8 +18,6 @@
*/
package org.elasticsearch.action.support.replication;
import com.carrotsearch.randomizedtesting.annotations.Repeat;
import org.apache.lucene.index.CorruptIndexException;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ReplicationResponse;
@ -126,6 +124,7 @@ public class TransportReplicationActionTests extends ESTestCase {
clusterService = new TestClusterService(threadPool);
transportService = new TransportService(transport, threadPool);
transportService.start();
transportService.acceptIncomingRequests();
action = new Action(Settings.EMPTY, "testAction", transportService, clusterService, threadPool);
count.set(1);
}

View File

@ -141,6 +141,7 @@ public class TransportInstanceSingleOperationActionTests extends ESTestCase {
clusterService = new TestClusterService(THREAD_POOL);
transportService = new TransportService(transport, THREAD_POOL);
transportService.start();
transportService.acceptIncomingRequests();
action = new TestTransportInstanceSingleOperationAction(
Settings.EMPTY,
"indices:admin/test",

View File

@ -73,6 +73,7 @@ public class TransportClientNodesServiceTests extends ESTestCase {
};
transportService = new TransportService(Settings.EMPTY, transport, threadPool, new NamedWriteableRegistry());
transportService.start();
transportService.acceptIncomingRequests();
transportClientNodesService = new TransportClientNodesService(Settings.EMPTY, ClusterName.DEFAULT, transportService, threadPool, Version.CURRENT);
nodesCount = randomIntBetween(1, 10);

View File

@ -107,6 +107,7 @@ public class ShardStateActionTests extends ESTestCase {
clusterService = new TestClusterService(THREAD_POOL);
transportService = new TransportService(transport, THREAD_POOL);
transportService.start();
transportService.acceptIncomingRequests();
shardStateAction = new TestShardStateAction(Settings.EMPTY, clusterService, transportService, null, null);
shardStateAction.setOnBeforeWaitForNewMasterAndRetry(() -> {});
shardStateAction.setOnAfterWaitForNewMasterAndRetry(() -> {});

View File

@ -108,6 +108,7 @@ public class ZenFaultDetectionTests extends ESTestCase {
MockTransportService transportService = new MockTransportService(Settings.EMPTY,
new LocalTransport(settings, threadPool, version, namedWriteableRegistry), threadPool, namedWriteableRegistry);
transportService.start();
transportService.acceptIncomingRequests();
return transportService;
}

View File

@ -58,12 +58,14 @@ public class UnicastZenPingIT extends ESTestCase {
NettyTransport transportA = new NettyTransport(settings, threadPool, networkService, BigArrays.NON_RECYCLING_INSTANCE, Version.CURRENT, new NamedWriteableRegistry());
final TransportService transportServiceA = new TransportService(transportA, threadPool).start();
transportServiceA.acceptIncomingRequests();
final DiscoveryNode nodeA = new DiscoveryNode("UZP_A", transportServiceA.boundAddress().publishAddress(), Version.CURRENT);
InetSocketTransportAddress addressA = (InetSocketTransportAddress) transportA.boundAddress().publishAddress();
NettyTransport transportB = new NettyTransport(settings, threadPool, networkService, BigArrays.NON_RECYCLING_INSTANCE, Version.CURRENT, new NamedWriteableRegistry());
final TransportService transportServiceB = new TransportService(transportB, threadPool).start();
transportServiceB.acceptIncomingRequests();
final DiscoveryNode nodeB = new DiscoveryNode("UZP_B", transportServiceA.boundAddress().publishAddress(), Version.CURRENT);
InetSocketTransportAddress addressB = (InetSocketTransportAddress) transportB.boundAddress().publishAddress();

View File

@ -232,6 +232,7 @@ public class PublishClusterStateActionTests extends ESTestCase {
protected MockTransportService buildTransportService(Settings settings, Version version) {
MockTransportService transportService = MockTransportService.local(Settings.EMPTY, version, threadPool);
transportService.start();
transportService.acceptIncomingRequests();
return transportService;
}

View File

@ -72,6 +72,11 @@ public class NoopDiscovery implements Discovery {
return null;
}
@Override
public void startInitialJoin() {
}
@Override
public int getMinimumMasterNodes() {
return -1;

View File

@ -40,6 +40,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import static java.util.Collections.emptyMap;
@ -74,11 +75,13 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
Settings.builder().put("name", "TS_A", TransportService.TRACE_LOG_INCLUDE_SETTING.getKey(), "", TransportService.TRACE_LOG_EXCLUDE_SETTING.getKey(), "NOTHING").build(),
version0, new NamedWriteableRegistry()
);
serviceA.acceptIncomingRequests();
nodeA = new DiscoveryNode("TS_A", "TS_A", serviceA.boundAddress().publishAddress(), emptyMap(), version0);
serviceB = build(
Settings.builder().put("name", "TS_B", TransportService.TRACE_LOG_INCLUDE_SETTING.getKey(), "", TransportService.TRACE_LOG_EXCLUDE_SETTING.getKey(), "NOTHING").build(),
version1, new NamedWriteableRegistry()
);
serviceB.acceptIncomingRequests();
nodeB = new DiscoveryNode("TS_B", "TS_B", serviceB.boundAddress().publishAddress(), emptyMap(), version1);
// wait till all nodes are properly connected and the event has been sent, so tests in this class
@ -1254,6 +1257,54 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
assertTrue(nodeB.address().sameHost(addressB.get()));
}
public void testBlockingIncomingRequests() throws Exception {
TransportService service = build(
Settings.builder().put("name", "TS_TEST", TransportService.TRACE_LOG_INCLUDE_SETTING.getKey(), "", TransportService.TRACE_LOG_EXCLUDE_SETTING.getKey(), "NOTHING").build(),
version0, new NamedWriteableRegistry()
);
AtomicBoolean requestProcessed = new AtomicBoolean();
service.registerRequestHandler("action", TestRequest::new, ThreadPool.Names.SAME,
(request, channel) -> {
requestProcessed.set(true);
channel.sendResponse(TransportResponse.Empty.INSTANCE);
});
DiscoveryNode node = new DiscoveryNode("TS_TEST", "TS_TEST", service.boundAddress().publishAddress(), emptyMap(), version0);
serviceA.connectToNode(node);
CountDownLatch latch = new CountDownLatch(1);
serviceA.sendRequest(node, "action", new TestRequest(), new TransportResponseHandler<TestResponse>() {
@Override
public TestResponse newInstance() {
return new TestResponse();
}
@Override
public void handleResponse(TestResponse response) {
latch.countDown();
}
@Override
public void handleException(TransportException exp) {
latch.countDown();
}
@Override
public String executor() {
return ThreadPool.Names.SAME;
}
});
assertFalse(requestProcessed.get());
service.acceptIncomingRequests();
assertBusy(() -> assertTrue(requestProcessed.get()));
latch.await();
service.close();
}
public static class TestRequest extends TransportRequest {
}

View File

@ -56,11 +56,13 @@ public class NettyScheduledPingTests extends ESTestCase {
final NettyTransport nettyA = new NettyTransport(settings, threadPool, new NetworkService(settings), BigArrays.NON_RECYCLING_INSTANCE, Version.CURRENT, registryA);
MockTransportService serviceA = new MockTransportService(settings, nettyA, threadPool, registryA);
serviceA.start();
serviceA.acceptIncomingRequests();
NamedWriteableRegistry registryB = new NamedWriteableRegistry();
final NettyTransport nettyB = new NettyTransport(settings, threadPool, new NetworkService(settings), BigArrays.NON_RECYCLING_INSTANCE, Version.CURRENT, registryB);
MockTransportService serviceB = new MockTransportService(settings, nettyB, threadPool, registryB);
serviceB.start();
serviceB.acceptIncomingRequests();
DiscoveryNode nodeA = new DiscoveryNode("TS_A", "TS_A", serviceA.boundAddress().publishAddress(), emptyMap(), Version.CURRENT);
DiscoveryNode nodeB = new DiscoveryNode("TS_B", "TS_B", serviceB.boundAddress().publishAddress(), emptyMap(), Version.CURRENT);