Revert "Apply cluster states in system context (#53785)"
This reverts commit 4178c57410
.
This commit is contained in:
parent
4178c57410
commit
7d3ac4f57d
|
@ -185,7 +185,6 @@ public class PublicationTransportHandler {
|
|||
public void sendPublishRequest(DiscoveryNode destination, PublishRequest publishRequest,
|
||||
ActionListener<PublishWithJoinResponse> originalListener) {
|
||||
assert publishRequest.getAcceptedState() == clusterChangedEvent.state() : "state got switched on us";
|
||||
assert transportService.getThreadPool().getThreadContext().isSystemContext();
|
||||
final ActionListener<PublishWithJoinResponse> responseActionListener;
|
||||
if (destination.equals(nodes.getLocalNode())) {
|
||||
// if publishing to self, use original request instead (see currentPublishRequestToSelf for explanation)
|
||||
|
@ -222,7 +221,6 @@ public class PublicationTransportHandler {
|
|||
@Override
|
||||
public void sendApplyCommit(DiscoveryNode destination, ApplyCommitRequest applyCommitRequest,
|
||||
ActionListener<TransportResponse.Empty> responseActionListener) {
|
||||
assert transportService.getThreadPool().getThreadContext().isSystemContext();
|
||||
final String actionName;
|
||||
final TransportRequest transportRequest;
|
||||
if (Coordinator.isZen1Node(destination)) {
|
||||
|
|
|
@ -46,7 +46,6 @@ import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
|
|||
import org.elasticsearch.common.util.concurrent.EsExecutors;
|
||||
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
|
||||
import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor;
|
||||
import org.elasticsearch.common.util.concurrent.ThreadContext;
|
||||
import org.elasticsearch.common.util.iterable.Iterables;
|
||||
import org.elasticsearch.threadpool.Scheduler;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
@ -348,9 +347,7 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements
|
|||
if (!lifecycle.started()) {
|
||||
return;
|
||||
}
|
||||
final ThreadContext threadContext = threadPool.getThreadContext();
|
||||
try (ThreadContext.StoredContext ignored = threadContext.stashContext()) {
|
||||
threadContext.markAsSystemContext();
|
||||
try {
|
||||
UpdateTask updateTask = new UpdateTask(config.priority(), source, new SafeClusterApplyListener(listener, logger), executor);
|
||||
if (config.timeout() != null) {
|
||||
threadPoolExecutor.execute(updateTask, config.timeout(),
|
||||
|
|
|
@ -122,10 +122,9 @@ public class NodeJoinTests extends ESTestCase {
|
|||
private void setupFakeMasterServiceAndCoordinator(long term, ClusterState initialState) {
|
||||
deterministicTaskQueue
|
||||
= new DeterministicTaskQueue(Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), "test").build(), random());
|
||||
final ThreadPool fakeThreadPool = deterministicTaskQueue.getThreadPool();
|
||||
FakeThreadPoolMasterService fakeMasterService = new FakeThreadPoolMasterService("test_node","test",
|
||||
fakeThreadPool, deterministicTaskQueue::scheduleNow);
|
||||
setupMasterServiceAndCoordinator(term, initialState, fakeMasterService, fakeThreadPool, Randomness.get());
|
||||
deterministicTaskQueue::scheduleNow);
|
||||
setupMasterServiceAndCoordinator(term, initialState, fakeMasterService, deterministicTaskQueue.getThreadPool(), Randomness.get());
|
||||
fakeMasterService.setClusterStatePublisher((event, publishListener, ackListener) -> {
|
||||
coordinator.handlePublishRequest(new PublishRequest(event.state()));
|
||||
publishListener.onResponse(null);
|
||||
|
|
|
@ -1167,15 +1167,15 @@ public class SnapshotResiliencyTests extends ESTestCase {
|
|||
TestClusterNode(DiscoveryNode node) throws IOException {
|
||||
this.node = node;
|
||||
final Environment environment = createEnvironment(node.getName());
|
||||
threadPool = deterministicTaskQueue.getThreadPool(runnable -> CoordinatorTests.onNodeLog(node, runnable));
|
||||
masterService = new FakeThreadPoolMasterService(node.getName(), "test", threadPool, deterministicTaskQueue::scheduleNow);
|
||||
masterService = new FakeThreadPoolMasterService(node.getName(), "test", deterministicTaskQueue::scheduleNow);
|
||||
final Settings settings = environment.settings();
|
||||
final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
|
||||
threadPool = deterministicTaskQueue.getThreadPool();
|
||||
clusterService = new ClusterService(settings, clusterSettings, masterService,
|
||||
new ClusterApplierService(node.getName(), settings, clusterSettings, threadPool) {
|
||||
@Override
|
||||
protected PrioritizedEsThreadPoolExecutor createThreadPoolExecutor() {
|
||||
return new MockSinglePrioritizingExecutor(node.getName(), deterministicTaskQueue, threadPool);
|
||||
return new MockSinglePrioritizingExecutor(node.getName(), deterministicTaskQueue);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -1215,7 +1215,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
|
|||
}
|
||||
};
|
||||
transportService = mockTransport.createTransportService(
|
||||
settings, threadPool,
|
||||
settings, deterministicTaskQueue.getThreadPool(runnable -> CoordinatorTests.onNodeLog(node, runnable)),
|
||||
new TransportInterceptor() {
|
||||
@Override
|
||||
public <T extends TransportRequest> TransportRequestHandler<T> interceptHandler(String action, String executor,
|
||||
|
|
|
@ -911,7 +911,6 @@ public class AbstractCoordinatorTestCase extends ESTestCase {
|
|||
}
|
||||
|
||||
private void setUp() {
|
||||
final ThreadPool threadPool = deterministicTaskQueue.getThreadPool(this::onNode);
|
||||
mockTransport = new DisruptableMockTransport(localNode, logger) {
|
||||
@Override
|
||||
protected void execute(Runnable runnable) {
|
||||
|
@ -929,20 +928,24 @@ public class AbstractCoordinatorTestCase extends ESTestCase {
|
|||
.filter(transport -> transport.getLocalNode().getAddress().equals(address)).findAny();
|
||||
}
|
||||
};
|
||||
|
||||
final Settings settings = nodeSettings.hasValue(DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey()) ?
|
||||
nodeSettings : Settings.builder().put(nodeSettings)
|
||||
.putList(ClusterBootstrapService.INITIAL_MASTER_NODES_SETTING.getKey(),
|
||||
ClusterBootstrapService.INITIAL_MASTER_NODES_SETTING.get(Settings.EMPTY)).build(); // suppress auto-bootstrap
|
||||
transportService = mockTransport.createTransportService(settings, threadPool,
|
||||
getTransportInterceptor(localNode, threadPool), a -> localNode, null, emptySet());
|
||||
masterService = new AckedFakeThreadPoolMasterService(localNode.getId(), "test", threadPool,
|
||||
transportService = mockTransport.createTransportService(
|
||||
settings, deterministicTaskQueue.getThreadPool(this::onNode),
|
||||
getTransportInterceptor(localNode, deterministicTaskQueue.getThreadPool(this::onNode)),
|
||||
a -> localNode, null, emptySet());
|
||||
masterService = new AckedFakeThreadPoolMasterService(localNode.getId(), "test",
|
||||
runnable -> deterministicTaskQueue.scheduleNow(onNode(runnable)));
|
||||
final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
|
||||
clusterApplierService = new DisruptableClusterApplierService(localNode.getId(), settings, clusterSettings,
|
||||
deterministicTaskQueue, threadPool);
|
||||
deterministicTaskQueue, this::onNode);
|
||||
clusterService = new ClusterService(settings, clusterSettings, masterService, clusterApplierService);
|
||||
clusterService.setNodeConnectionsService(
|
||||
new NodeConnectionsService(clusterService.getSettings(), threadPool, transportService));
|
||||
new NodeConnectionsService(clusterService.getSettings(), deterministicTaskQueue.getThreadPool(this::onNode),
|
||||
transportService));
|
||||
final Collection<BiConsumer<DiscoveryNode, ClusterState>> onJoinValidators =
|
||||
Collections.singletonList((dn, cs) -> extraJoinValidators.forEach(validator -> validator.accept(dn, cs)));
|
||||
final AllocationService allocationService = ESAllocationTestCase.createAllocationService(Settings.EMPTY);
|
||||
|
@ -952,7 +955,7 @@ public class AbstractCoordinatorTestCase extends ESTestCase {
|
|||
getElectionStrategy());
|
||||
masterService.setClusterStatePublisher(coordinator);
|
||||
final GatewayService gatewayService = new GatewayService(settings, allocationService, clusterService,
|
||||
threadPool, null, coordinator);
|
||||
deterministicTaskQueue.getThreadPool(this::onNode), null, coordinator);
|
||||
|
||||
logger.trace("starting up [{}]", localNode);
|
||||
transportService.start();
|
||||
|
@ -1289,9 +1292,8 @@ public class AbstractCoordinatorTestCase extends ESTestCase {
|
|||
|
||||
AckCollector nextAckCollector = new AckCollector();
|
||||
|
||||
AckedFakeThreadPoolMasterService(String nodeName, String serviceName, ThreadPool threadPool,
|
||||
Consumer<Runnable> onTaskAvailableToRun) {
|
||||
super(nodeName, serviceName, threadPool, onTaskAvailableToRun);
|
||||
AckedFakeThreadPoolMasterService(String nodeName, String serviceName, Consumer<Runnable> onTaskAvailableToRun) {
|
||||
super(nodeName, serviceName, onTaskAvailableToRun);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -1321,8 +1323,8 @@ public class AbstractCoordinatorTestCase extends ESTestCase {
|
|||
private boolean applicationMayFail;
|
||||
|
||||
DisruptableClusterApplierService(String nodeName, Settings settings, ClusterSettings clusterSettings,
|
||||
DeterministicTaskQueue deterministicTaskQueue, ThreadPool threadPool) {
|
||||
super(nodeName, settings, clusterSettings, threadPool);
|
||||
DeterministicTaskQueue deterministicTaskQueue, Function<Runnable, Runnable> runnableWrapper) {
|
||||
super(nodeName, settings, clusterSettings, deterministicTaskQueue.getThreadPool(runnableWrapper));
|
||||
this.nodeName = nodeName;
|
||||
this.deterministicTaskQueue = deterministicTaskQueue;
|
||||
addStateApplier(event -> {
|
||||
|
@ -1342,7 +1344,7 @@ public class AbstractCoordinatorTestCase extends ESTestCase {
|
|||
|
||||
@Override
|
||||
protected PrioritizedEsThreadPoolExecutor createThreadPoolExecutor() {
|
||||
return new MockSinglePrioritizingExecutor(nodeName, deterministicTaskQueue, threadPool);
|
||||
return new MockSinglePrioritizingExecutor(nodeName, deterministicTaskQueue);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -20,7 +20,6 @@ package org.elasticsearch.cluster.coordination;
|
|||
|
||||
import org.elasticsearch.common.util.concurrent.EsExecutors;
|
||||
import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
|
@ -30,7 +29,7 @@ import java.util.concurrent.TimeUnit;
|
|||
*/
|
||||
public class MockSinglePrioritizingExecutor extends PrioritizedEsThreadPoolExecutor {
|
||||
|
||||
public MockSinglePrioritizingExecutor(String name, DeterministicTaskQueue deterministicTaskQueue, ThreadPool threadPool) {
|
||||
public MockSinglePrioritizingExecutor(String name, DeterministicTaskQueue deterministicTaskQueue) {
|
||||
super(name, 0, 1, 0L, TimeUnit.MILLISECONDS,
|
||||
r -> new Thread() {
|
||||
@Override
|
||||
|
@ -52,7 +51,7 @@ public class MockSinglePrioritizingExecutor extends PrioritizedEsThreadPoolExecu
|
|||
});
|
||||
}
|
||||
},
|
||||
threadPool.getThreadContext(), threadPool.scheduler());
|
||||
deterministicTaskQueue.getThreadPool().getThreadContext(), deterministicTaskQueue.getThreadPool().scheduler());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -1,50 +0,0 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.elasticsearch.cluster.service;
|
||||
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
||||
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
|
||||
import org.elasticsearch.env.Environment;
|
||||
import org.elasticsearch.env.NodeEnvironment;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.script.ScriptService;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.watcher.ResourceWatcherService;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
|
||||
public class ClusterApplierAssertionPlugin extends Plugin {
|
||||
@Override
|
||||
public Collection<Object> createComponents(Client client, ClusterService clusterService, ThreadPool threadPool,
|
||||
ResourceWatcherService resourceWatcherService, ScriptService scriptService,
|
||||
NamedXContentRegistry xContentRegistry, Environment environment,
|
||||
NodeEnvironment nodeEnvironment, NamedWriteableRegistry namedWriteableRegistry,
|
||||
IndexNameExpressionResolver indexNameExpressionResolver) {
|
||||
clusterService.addStateApplier(event -> {
|
||||
assert threadPool.getThreadContext().isSystemContext();
|
||||
});
|
||||
clusterService.addListener(event -> {
|
||||
assert threadPool.getThreadContext().isSystemContext();
|
||||
});
|
||||
return Collections.emptyList();
|
||||
}
|
||||
}
|
|
@ -41,6 +41,8 @@ import java.util.function.Consumer;
|
|||
|
||||
import static org.apache.lucene.util.LuceneTestCase.random;
|
||||
import static org.elasticsearch.test.ESTestCase.randomInt;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
public class FakeThreadPoolMasterService extends MasterService {
|
||||
private static final Logger logger = LogManager.getLogger(FakeThreadPoolMasterService.class);
|
||||
|
@ -52,14 +54,21 @@ public class FakeThreadPoolMasterService extends MasterService {
|
|||
private boolean taskInProgress = false;
|
||||
private boolean waitForPublish = false;
|
||||
|
||||
public FakeThreadPoolMasterService(String nodeName, String serviceName, ThreadPool threadPool,
|
||||
Consumer<Runnable> onTaskAvailableToRun) {
|
||||
public FakeThreadPoolMasterService(String nodeName, String serviceName, Consumer<Runnable> onTaskAvailableToRun) {
|
||||
super(Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), nodeName).build(),
|
||||
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), threadPool);
|
||||
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
|
||||
createMockThreadPool());
|
||||
this.name = serviceName;
|
||||
this.onTaskAvailableToRun = onTaskAvailableToRun;
|
||||
}
|
||||
|
||||
private static ThreadPool createMockThreadPool() {
|
||||
final ThreadContext context = new ThreadContext(Settings.EMPTY);
|
||||
final ThreadPool mockThreadPool = mock(ThreadPool.class);
|
||||
when(mockThreadPool.getThreadContext()).thenReturn(context);
|
||||
return mockThreadPool;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected PrioritizedEsThreadPoolExecutor createThreadPoolExecutor() {
|
||||
return new PrioritizedEsThreadPoolExecutor(name, 1, 1, 1, TimeUnit.SECONDS, EsExecutors.daemonThreadFactory(name),
|
||||
|
@ -101,11 +110,7 @@ public class FakeThreadPoolMasterService extends MasterService {
|
|||
final Runnable task = pendingTasks.remove(taskIndex);
|
||||
taskInProgress = true;
|
||||
scheduledNextTask = false;
|
||||
final ThreadContext threadContext = threadPool.getThreadContext();
|
||||
try (ThreadContext.StoredContext ignored = threadContext.stashContext()) {
|
||||
threadContext.markAsSystemContext();
|
||||
task.run();
|
||||
}
|
||||
task.run();
|
||||
if (waitForPublish == false) {
|
||||
taskInProgress = false;
|
||||
}
|
||||
|
@ -163,5 +168,4 @@ public class FakeThreadPoolMasterService extends MasterService {
|
|||
protected AckListener wrapAckListener(AckListener ackListener) {
|
||||
return ackListener;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -79,7 +79,6 @@ import org.elasticsearch.cluster.routing.ShardRouting;
|
|||
import org.elasticsearch.cluster.routing.UnassignedInfo;
|
||||
import org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings;
|
||||
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
|
||||
import org.elasticsearch.cluster.service.ClusterApplierAssertionPlugin;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.Priority;
|
||||
|
@ -1936,9 +1935,6 @@ public abstract class ESIntegTestCase extends ESTestCase {
|
|||
if (randomBoolean()) {
|
||||
mocks.add(MockFieldFilterPlugin.class);
|
||||
}
|
||||
if (randomBoolean()) {
|
||||
mocks.add(ClusterApplierAssertionPlugin.class);
|
||||
}
|
||||
}
|
||||
|
||||
if (addMockTransportService()) {
|
||||
|
|
|
@ -29,7 +29,7 @@ public class MockSinglePrioritizingExecutorTests extends ESTestCase {
|
|||
|
||||
public void testPrioritizedEsThreadPoolExecutor() {
|
||||
final DeterministicTaskQueue taskQueue = DeterministicTaskQueueTests.newTaskQueue();
|
||||
final PrioritizedEsThreadPoolExecutor executor = new MockSinglePrioritizingExecutor("test", taskQueue, taskQueue.getThreadPool());
|
||||
final PrioritizedEsThreadPoolExecutor executor = new MockSinglePrioritizingExecutor("test", taskQueue);
|
||||
final AtomicBoolean called1 = new AtomicBoolean();
|
||||
final AtomicBoolean called2 = new AtomicBoolean();
|
||||
executor.execute(new PrioritizedRunnable(Priority.NORMAL) {
|
||||
|
|
|
@ -27,10 +27,7 @@ import org.elasticsearch.cluster.metadata.IndexMetaData;
|
|||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.util.concurrent.ThreadContext;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
|
@ -40,8 +37,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
|
|||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
public class FakeThreadPoolMasterServiceTests extends ESTestCase {
|
||||
|
||||
|
@ -53,10 +48,7 @@ public class FakeThreadPoolMasterServiceTests extends ESTestCase {
|
|||
lastClusterStateRef.set(ClusterStateCreationUtils.state(discoveryNode, discoveryNode));
|
||||
long firstClusterStateVersion = lastClusterStateRef.get().version();
|
||||
AtomicReference<ActionListener<Void>> publishingCallback = new AtomicReference<>();
|
||||
final ThreadContext context = new ThreadContext(Settings.EMPTY);
|
||||
final ThreadPool mockThreadPool = mock(ThreadPool.class);
|
||||
when(mockThreadPool.getThreadContext()).thenReturn(context);
|
||||
FakeThreadPoolMasterService masterService = new FakeThreadPoolMasterService("test_node","test", mockThreadPool, runnableTasks::add);
|
||||
FakeThreadPoolMasterService masterService = new FakeThreadPoolMasterService("test_node","test", runnableTasks::add);
|
||||
masterService.setClusterStateSupplier(lastClusterStateRef::get);
|
||||
masterService.setClusterStatePublisher((event, publishListener, ackListener) -> {
|
||||
lastClusterStateRef.set(event.state());
|
||||
|
|
Loading…
Reference in New Issue