Apply cluster states in system context (#53785)

Today cluster states are sometimes (rarely) applied in the default context
rather than system context, which means that any appliers which capture their
contexts cannot do things like remote transport actions when security is
enabled.

There are at least two ways that we end up applying the cluster state in the
default context:

1. locally applying a cluster state that indicates that the master has failed
2. the elected master times out while waiting for a response from another node

This commit ensures that cluster states are always applied in the system
context.

Mitigates #53751
This commit is contained in:
David Turner 2020-03-19 14:13:52 +00:00
parent 00203c35fe
commit 4178c57410
11 changed files with 102 additions and 39 deletions

View File

@ -185,6 +185,7 @@ public class PublicationTransportHandler {
public void sendPublishRequest(DiscoveryNode destination, PublishRequest publishRequest,
ActionListener<PublishWithJoinResponse> originalListener) {
assert publishRequest.getAcceptedState() == clusterChangedEvent.state() : "state got switched on us";
assert transportService.getThreadPool().getThreadContext().isSystemContext();
final ActionListener<PublishWithJoinResponse> responseActionListener;
if (destination.equals(nodes.getLocalNode())) {
// if publishing to self, use original request instead (see currentPublishRequestToSelf for explanation)
@ -221,6 +222,7 @@ public class PublicationTransportHandler {
@Override
public void sendApplyCommit(DiscoveryNode destination, ApplyCommitRequest applyCommitRequest,
ActionListener<TransportResponse.Empty> responseActionListener) {
assert transportService.getThreadPool().getThreadContext().isSystemContext();
final String actionName;
final TransportRequest transportRequest;
if (Coordinator.isZen1Node(destination)) {

View File

@ -46,6 +46,7 @@ import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.util.iterable.Iterables;
import org.elasticsearch.threadpool.Scheduler;
import org.elasticsearch.threadpool.ThreadPool;
@ -347,7 +348,9 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements
if (!lifecycle.started()) {
return;
}
try {
final ThreadContext threadContext = threadPool.getThreadContext();
try (ThreadContext.StoredContext ignored = threadContext.stashContext()) {
threadContext.markAsSystemContext();
UpdateTask updateTask = new UpdateTask(config.priority(), source, new SafeClusterApplyListener(listener, logger), executor);
if (config.timeout() != null) {
threadPoolExecutor.execute(updateTask, config.timeout(),

View File

@ -122,9 +122,10 @@ public class NodeJoinTests extends ESTestCase {
private void setupFakeMasterServiceAndCoordinator(long term, ClusterState initialState) {
deterministicTaskQueue
= new DeterministicTaskQueue(Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), "test").build(), random());
final ThreadPool fakeThreadPool = deterministicTaskQueue.getThreadPool();
FakeThreadPoolMasterService fakeMasterService = new FakeThreadPoolMasterService("test_node","test",
deterministicTaskQueue::scheduleNow);
setupMasterServiceAndCoordinator(term, initialState, fakeMasterService, deterministicTaskQueue.getThreadPool(), Randomness.get());
fakeThreadPool, deterministicTaskQueue::scheduleNow);
setupMasterServiceAndCoordinator(term, initialState, fakeMasterService, fakeThreadPool, Randomness.get());
fakeMasterService.setClusterStatePublisher((event, publishListener, ackListener) -> {
coordinator.handlePublishRequest(new PublishRequest(event.state()));
publishListener.onResponse(null);

View File

@ -1167,15 +1167,15 @@ public class SnapshotResiliencyTests extends ESTestCase {
TestClusterNode(DiscoveryNode node) throws IOException {
this.node = node;
final Environment environment = createEnvironment(node.getName());
masterService = new FakeThreadPoolMasterService(node.getName(), "test", deterministicTaskQueue::scheduleNow);
threadPool = deterministicTaskQueue.getThreadPool(runnable -> CoordinatorTests.onNodeLog(node, runnable));
masterService = new FakeThreadPoolMasterService(node.getName(), "test", threadPool, deterministicTaskQueue::scheduleNow);
final Settings settings = environment.settings();
final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
threadPool = deterministicTaskQueue.getThreadPool();
clusterService = new ClusterService(settings, clusterSettings, masterService,
new ClusterApplierService(node.getName(), settings, clusterSettings, threadPool) {
@Override
protected PrioritizedEsThreadPoolExecutor createThreadPoolExecutor() {
return new MockSinglePrioritizingExecutor(node.getName(), deterministicTaskQueue);
return new MockSinglePrioritizingExecutor(node.getName(), deterministicTaskQueue, threadPool);
}
@Override
@ -1215,7 +1215,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
}
};
transportService = mockTransport.createTransportService(
settings, deterministicTaskQueue.getThreadPool(runnable -> CoordinatorTests.onNodeLog(node, runnable)),
settings, threadPool,
new TransportInterceptor() {
@Override
public <T extends TransportRequest> TransportRequestHandler<T> interceptHandler(String action, String executor,

View File

@ -911,6 +911,7 @@ public class AbstractCoordinatorTestCase extends ESTestCase {
}
private void setUp() {
final ThreadPool threadPool = deterministicTaskQueue.getThreadPool(this::onNode);
mockTransport = new DisruptableMockTransport(localNode, logger) {
@Override
protected void execute(Runnable runnable) {
@ -928,24 +929,20 @@ public class AbstractCoordinatorTestCase extends ESTestCase {
.filter(transport -> transport.getLocalNode().getAddress().equals(address)).findAny();
}
};
final Settings settings = nodeSettings.hasValue(DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey()) ?
nodeSettings : Settings.builder().put(nodeSettings)
.putList(ClusterBootstrapService.INITIAL_MASTER_NODES_SETTING.getKey(),
ClusterBootstrapService.INITIAL_MASTER_NODES_SETTING.get(Settings.EMPTY)).build(); // suppress auto-bootstrap
transportService = mockTransport.createTransportService(
settings, deterministicTaskQueue.getThreadPool(this::onNode),
getTransportInterceptor(localNode, deterministicTaskQueue.getThreadPool(this::onNode)),
a -> localNode, null, emptySet());
masterService = new AckedFakeThreadPoolMasterService(localNode.getId(), "test",
transportService = mockTransport.createTransportService(settings, threadPool,
getTransportInterceptor(localNode, threadPool), a -> localNode, null, emptySet());
masterService = new AckedFakeThreadPoolMasterService(localNode.getId(), "test", threadPool,
runnable -> deterministicTaskQueue.scheduleNow(onNode(runnable)));
final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
clusterApplierService = new DisruptableClusterApplierService(localNode.getId(), settings, clusterSettings,
deterministicTaskQueue, this::onNode);
deterministicTaskQueue, threadPool);
clusterService = new ClusterService(settings, clusterSettings, masterService, clusterApplierService);
clusterService.setNodeConnectionsService(
new NodeConnectionsService(clusterService.getSettings(), deterministicTaskQueue.getThreadPool(this::onNode),
transportService));
new NodeConnectionsService(clusterService.getSettings(), threadPool, transportService));
final Collection<BiConsumer<DiscoveryNode, ClusterState>> onJoinValidators =
Collections.singletonList((dn, cs) -> extraJoinValidators.forEach(validator -> validator.accept(dn, cs)));
final AllocationService allocationService = ESAllocationTestCase.createAllocationService(Settings.EMPTY);
@ -955,7 +952,7 @@ public class AbstractCoordinatorTestCase extends ESTestCase {
getElectionStrategy());
masterService.setClusterStatePublisher(coordinator);
final GatewayService gatewayService = new GatewayService(settings, allocationService, clusterService,
deterministicTaskQueue.getThreadPool(this::onNode), null, coordinator);
threadPool, null, coordinator);
logger.trace("starting up [{}]", localNode);
transportService.start();
@ -1292,8 +1289,9 @@ public class AbstractCoordinatorTestCase extends ESTestCase {
AckCollector nextAckCollector = new AckCollector();
AckedFakeThreadPoolMasterService(String nodeName, String serviceName, Consumer<Runnable> onTaskAvailableToRun) {
super(nodeName, serviceName, onTaskAvailableToRun);
AckedFakeThreadPoolMasterService(String nodeName, String serviceName, ThreadPool threadPool,
Consumer<Runnable> onTaskAvailableToRun) {
super(nodeName, serviceName, threadPool, onTaskAvailableToRun);
}
@Override
@ -1323,8 +1321,8 @@ public class AbstractCoordinatorTestCase extends ESTestCase {
private boolean applicationMayFail;
DisruptableClusterApplierService(String nodeName, Settings settings, ClusterSettings clusterSettings,
DeterministicTaskQueue deterministicTaskQueue, Function<Runnable, Runnable> runnableWrapper) {
super(nodeName, settings, clusterSettings, deterministicTaskQueue.getThreadPool(runnableWrapper));
DeterministicTaskQueue deterministicTaskQueue, ThreadPool threadPool) {
super(nodeName, settings, clusterSettings, threadPool);
this.nodeName = nodeName;
this.deterministicTaskQueue = deterministicTaskQueue;
addStateApplier(event -> {
@ -1344,7 +1342,7 @@ public class AbstractCoordinatorTestCase extends ESTestCase {
@Override
protected PrioritizedEsThreadPoolExecutor createThreadPoolExecutor() {
return new MockSinglePrioritizingExecutor(nodeName, deterministicTaskQueue);
return new MockSinglePrioritizingExecutor(nodeName, deterministicTaskQueue, threadPool);
}
@Override

View File

@ -20,6 +20,7 @@ package org.elasticsearch.cluster.coordination;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.concurrent.TimeUnit;
@ -29,7 +30,7 @@ import java.util.concurrent.TimeUnit;
*/
public class MockSinglePrioritizingExecutor extends PrioritizedEsThreadPoolExecutor {
public MockSinglePrioritizingExecutor(String name, DeterministicTaskQueue deterministicTaskQueue) {
public MockSinglePrioritizingExecutor(String name, DeterministicTaskQueue deterministicTaskQueue, ThreadPool threadPool) {
super(name, 0, 1, 0L, TimeUnit.MILLISECONDS,
r -> new Thread() {
@Override
@ -51,7 +52,7 @@ public class MockSinglePrioritizingExecutor extends PrioritizedEsThreadPoolExecu
});
}
},
deterministicTaskQueue.getThreadPool().getThreadContext(), deterministicTaskQueue.getThreadPool().scheduler());
threadPool.getThreadContext(), threadPool.scheduler());
}
@Override

View File

@ -0,0 +1,50 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.service;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.watcher.ResourceWatcherService;
import java.util.Collection;
import java.util.Collections;
public class ClusterApplierAssertionPlugin extends Plugin {
@Override
public Collection<Object> createComponents(Client client, ClusterService clusterService, ThreadPool threadPool,
ResourceWatcherService resourceWatcherService, ScriptService scriptService,
NamedXContentRegistry xContentRegistry, Environment environment,
NodeEnvironment nodeEnvironment, NamedWriteableRegistry namedWriteableRegistry,
IndexNameExpressionResolver indexNameExpressionResolver) {
clusterService.addStateApplier(event -> {
assert threadPool.getThreadContext().isSystemContext();
});
clusterService.addListener(event -> {
assert threadPool.getThreadContext().isSystemContext();
});
return Collections.emptyList();
}
}

View File

@ -41,8 +41,6 @@ import java.util.function.Consumer;
import static org.apache.lucene.util.LuceneTestCase.random;
import static org.elasticsearch.test.ESTestCase.randomInt;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class FakeThreadPoolMasterService extends MasterService {
private static final Logger logger = LogManager.getLogger(FakeThreadPoolMasterService.class);
@ -54,21 +52,14 @@ public class FakeThreadPoolMasterService extends MasterService {
private boolean taskInProgress = false;
private boolean waitForPublish = false;
public FakeThreadPoolMasterService(String nodeName, String serviceName, Consumer<Runnable> onTaskAvailableToRun) {
public FakeThreadPoolMasterService(String nodeName, String serviceName, ThreadPool threadPool,
Consumer<Runnable> onTaskAvailableToRun) {
super(Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), nodeName).build(),
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
createMockThreadPool());
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), threadPool);
this.name = serviceName;
this.onTaskAvailableToRun = onTaskAvailableToRun;
}
private static ThreadPool createMockThreadPool() {
final ThreadContext context = new ThreadContext(Settings.EMPTY);
final ThreadPool mockThreadPool = mock(ThreadPool.class);
when(mockThreadPool.getThreadContext()).thenReturn(context);
return mockThreadPool;
}
@Override
protected PrioritizedEsThreadPoolExecutor createThreadPoolExecutor() {
return new PrioritizedEsThreadPoolExecutor(name, 1, 1, 1, TimeUnit.SECONDS, EsExecutors.daemonThreadFactory(name),
@ -110,7 +101,11 @@ public class FakeThreadPoolMasterService extends MasterService {
final Runnable task = pendingTasks.remove(taskIndex);
taskInProgress = true;
scheduledNextTask = false;
final ThreadContext threadContext = threadPool.getThreadContext();
try (ThreadContext.StoredContext ignored = threadContext.stashContext()) {
threadContext.markAsSystemContext();
task.run();
}
if (waitForPublish == false) {
taskInProgress = false;
}
@ -168,4 +163,5 @@ public class FakeThreadPoolMasterService extends MasterService {
protected AckListener wrapAckListener(AckListener ackListener) {
return ackListener;
}
}

View File

@ -79,6 +79,7 @@ import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings;
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
import org.elasticsearch.cluster.service.ClusterApplierAssertionPlugin;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Priority;
@ -1935,6 +1936,9 @@ public abstract class ESIntegTestCase extends ESTestCase {
if (randomBoolean()) {
mocks.add(MockFieldFilterPlugin.class);
}
if (randomBoolean()) {
mocks.add(ClusterApplierAssertionPlugin.class);
}
}
if (addMockTransportService()) {

View File

@ -29,7 +29,7 @@ public class MockSinglePrioritizingExecutorTests extends ESTestCase {
public void testPrioritizedEsThreadPoolExecutor() {
final DeterministicTaskQueue taskQueue = DeterministicTaskQueueTests.newTaskQueue();
final PrioritizedEsThreadPoolExecutor executor = new MockSinglePrioritizingExecutor("test", taskQueue);
final PrioritizedEsThreadPoolExecutor executor = new MockSinglePrioritizingExecutor("test", taskQueue, taskQueue.getThreadPool());
final AtomicBoolean called1 = new AtomicBoolean();
final AtomicBoolean called2 = new AtomicBoolean();
executor.execute(new PrioritizedRunnable(Priority.NORMAL) {

View File

@ -27,7 +27,10 @@ import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.ArrayList;
import java.util.Collections;
@ -37,6 +40,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import static org.hamcrest.Matchers.equalTo;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class FakeThreadPoolMasterServiceTests extends ESTestCase {
@ -48,7 +53,10 @@ public class FakeThreadPoolMasterServiceTests extends ESTestCase {
lastClusterStateRef.set(ClusterStateCreationUtils.state(discoveryNode, discoveryNode));
long firstClusterStateVersion = lastClusterStateRef.get().version();
AtomicReference<ActionListener<Void>> publishingCallback = new AtomicReference<>();
FakeThreadPoolMasterService masterService = new FakeThreadPoolMasterService("test_node","test", runnableTasks::add);
final ThreadContext context = new ThreadContext(Settings.EMPTY);
final ThreadPool mockThreadPool = mock(ThreadPool.class);
when(mockThreadPool.getThreadContext()).thenReturn(context);
FakeThreadPoolMasterService masterService = new FakeThreadPoolMasterService("test_node","test", mockThreadPool, runnableTasks::add);
masterService.setClusterStateSupplier(lastClusterStateRef::get);
masterService.setClusterStatePublisher((event, publishListener, ackListener) -> {
lastClusterStateRef.set(event.state());