TESTS: Real Coordinator in SnapshotServiceTests (#37162)

* TESTS: Real Coordinator in SnapshotServiceTests

* Introduce real coordinator in SnapshotServiceTests to be able to test network disruptions realistically
  * Make adjustments to cluster applier service so that we can pass a mocked single threaded executor for tests
This commit is contained in:
Armin Braun 2019-01-09 16:53:49 +01:00 committed by GitHub
parent ae086ebcc4
commit eacc63b032
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 273 additions and 82 deletions

View File

@ -56,6 +56,8 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.ListenableFuture;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.discovery.DiscoverySettings;
import org.elasticsearch.discovery.DiscoveryStats;
@ -872,12 +874,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
return;
}
// there is no equals on cluster state, so we just serialize it to XContent and compare JSON representation
assert clusterChangedEvent.previousState() == coordinationState.get().getLastAcceptedState() ||
Strings.toString(clusterChangedEvent.previousState()).equals(
Strings.toString(clusterStateWithNoMasterBlock(coordinationState.get().getLastAcceptedState())))
: Strings.toString(clusterChangedEvent.previousState()) + " vs "
+ Strings.toString(clusterStateWithNoMasterBlock(coordinationState.get().getLastAcceptedState()));
assert assertPreviousStateConsistency(clusterChangedEvent);
final ClusterState clusterState = clusterChangedEvent.state();
@ -917,6 +914,22 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
}
}
// there is no equals on cluster state, so we just serialize it to XContent and compare Maps
// deserialized from the resulting JSON
private boolean assertPreviousStateConsistency(ClusterChangedEvent event) {
assert event.previousState() == coordinationState.get().getLastAcceptedState() ||
XContentHelper.convertToMap(
JsonXContent.jsonXContent, Strings.toString(event.previousState()), false
).equals(
XContentHelper.convertToMap(
JsonXContent.jsonXContent,
Strings.toString(clusterStateWithNoMasterBlock(coordinationState.get().getLastAcceptedState())),
false))
: Strings.toString(event.previousState()) + " vs "
+ Strings.toString(clusterStateWithNoMasterBlock(coordinationState.get().getLastAcceptedState()));
return true;
}
private <T> ActionListener<T> wrapWithMutex(ActionListener<T> listener) {
return new ActionListener<T>() {
@Override

View File

@ -89,7 +89,7 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements
private final Collection<ClusterStateListener> clusterStateListeners = new CopyOnWriteArrayList<>();
private final Collection<TimeoutClusterStateListener> timeoutClusterStateListeners =
Collections.newSetFromMap(new ConcurrentHashMap<TimeoutClusterStateListener, Boolean>());
Collections.newSetFromMap(new ConcurrentHashMap<>());
private final LocalNodeMasterListeners localNodeMasterListeners;
@ -134,11 +134,15 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements
Objects.requireNonNull(nodeConnectionsService, "please set the node connection service before starting");
Objects.requireNonNull(state.get(), "please set initial state before starting");
addListener(localNodeMasterListeners);
threadPoolExecutor = EsExecutors.newSinglePrioritizing(
nodeName + "/" + CLUSTER_UPDATE_THREAD_NAME,
daemonThreadFactory(nodeName, CLUSTER_UPDATE_THREAD_NAME),
threadPool.getThreadContext(),
threadPool.scheduler());
threadPoolExecutor = createThreadPoolExecutor();
}
protected PrioritizedEsThreadPoolExecutor createThreadPoolExecutor() {
return EsExecutors.newSinglePrioritizing(
nodeName + "/" + CLUSTER_UPDATE_THREAD_NAME,
daemonThreadFactory(nodeName, CLUSTER_UPDATE_THREAD_NAME),
threadPool.getThreadContext(),
threadPool.scheduler());
}
class UpdateTask extends SourcePrioritizedRunnable implements Function<ClusterState, ClusterState> {

View File

@ -71,8 +71,13 @@ public class ClusterService extends AbstractLifecycleComponent {
private final String nodeName;
public ClusterService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool,
MasterService masterService) {
public ClusterService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool) {
this(settings, clusterSettings, new MasterService(Node.NODE_NAME_SETTING.get(settings), settings, threadPool),
new ClusterApplierService(Node.NODE_NAME_SETTING.get(settings), settings, clusterSettings, threadPool));
}
public ClusterService(Settings settings, ClusterSettings clusterSettings, MasterService masterService,
ClusterApplierService clusterApplierService) {
super(settings);
this.settings = settings;
this.nodeName = Node.NODE_NAME_SETTING.get(settings);
@ -84,11 +89,7 @@ public class ClusterService extends AbstractLifecycleComponent {
this::setSlowTaskLoggingThreshold);
// Add a no-op update consumer so changes are logged
this.clusterSettings.addAffixUpdateConsumer(USER_DEFINED_META_DATA, (first, second) -> {}, (first, second) -> {});
this.clusterApplierService = new ClusterApplierService(nodeName, settings, clusterSettings, threadPool);
}
public ClusterService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool) {
this(settings, clusterSettings, threadPool, new MasterService(Node.NODE_NAME_SETTING.get(settings), settings, threadPool));
this.clusterApplierService = clusterApplierService;
}
private void setSlowTaskLoggingThreshold(TimeValue slowTaskLoggingThreshold) {

View File

@ -24,6 +24,7 @@ import org.apache.logging.log4j.Logger;
import org.elasticsearch.Version;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.bootstrap.BootstrapConfiguration;
import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryAction;
import org.elasticsearch.action.admin.cluster.repositories.put.TransportPutRepositoryAction;
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotAction;
@ -36,7 +37,7 @@ import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.TransportAction;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterModule;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ESAllocationTestCase;
@ -44,8 +45,13 @@ import org.elasticsearch.cluster.NodeConnectionsService;
import org.elasticsearch.cluster.SnapshotsInProgress;
import org.elasticsearch.cluster.action.index.NodeMappingRefreshAction;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.coordination.ClusterBootstrapService;
import org.elasticsearch.cluster.coordination.CoordinationState;
import org.elasticsearch.cluster.coordination.Coordinator;
import org.elasticsearch.cluster.coordination.CoordinatorTests;
import org.elasticsearch.cluster.coordination.DeterministicTaskQueue;
import org.elasticsearch.cluster.coordination.InMemoryPersistedState;
import org.elasticsearch.cluster.coordination.MockSinglePrioritizingExecutor;
import org.elasticsearch.cluster.metadata.AliasValidator;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
@ -55,15 +61,16 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.RoutingService;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.service.ClusterApplierService;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.cluster.service.MasterService;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.IndexScopedSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.PageCacheRecycler;
import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.NodeEnvironment;
@ -102,21 +109,23 @@ import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
import static org.elasticsearch.env.Environment.PATH_HOME_SETTING;
import static org.elasticsearch.node.Node.NODE_NAME_SETTING;
import static org.elasticsearch.transport.TransportService.HANDSHAKE_ACTION_NAME;
import static org.elasticsearch.transport.TransportService.NOOP_TRANSPORT_INTERCEPTOR;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.lessThan;
import static org.mockito.Mockito.mock;
public class SnapshotsServiceTests extends ESTestCase {
@ -132,8 +141,6 @@ public class SnapshotsServiceTests extends ESTestCase {
tempDir = createTempDir();
deterministicTaskQueue =
new DeterministicTaskQueue(Settings.builder().put(NODE_NAME_SETTING.getKey(), "shared").build(), random());
// TODO: Random number of master nodes and simulate master failover states
testClusterNodes = new TestClusterNodes(1, randomIntBetween(2, 10));
}
@After
@ -144,41 +151,40 @@ public class SnapshotsServiceTests extends ESTestCase {
n.clusterService.close();
n.indicesClusterStateService.close();
n.nodeEnv.close();
n.coordinator.close();
}
);
}
public void testSuccessfulSnapshot() {
setupTestCluster(randomFrom(1, 3, 5), randomIntBetween(2, 10));
String repoName = "repo";
String snapshotName = "snapshot";
final String index = "test";
final int shards = randomIntBetween(1, 10);
ClusterState initialClusterState =
new ClusterState.Builder(ClusterName.DEFAULT).nodes(testClusterNodes.randomDiscoveryNodes()).build();
testClusterNodes.nodes.values().forEach(testClusterNode -> testClusterNode.start(initialClusterState));
TestClusterNode masterNode = testClusterNodes.currentMaster(initialClusterState);
final AtomicBoolean createdSnapshot = new AtomicBoolean(false);
TestClusterNode masterNode =
testClusterNodes.currentMaster(testClusterNodes.nodes.values().iterator().next().clusterService.state());
final AtomicBoolean createdSnapshot = new AtomicBoolean();
masterNode.client.admin().cluster().preparePutRepository(repoName)
.setType(FsRepository.TYPE).setSettings(Settings.builder().put("location", randomAlphaOfLength(10)))
.execute(
assertingListener(
assertNoFailureListener(
() -> masterNode.client.admin().indices().create(
new CreateIndexRequest(index).waitForActiveShards(ActiveShardCount.ALL).settings(
Settings.builder()
.put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), shards)
.put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 0)),
assertingListener(
assertNoFailureListener(
() -> masterNode.client.admin().cluster().prepareCreateSnapshot(repoName, snapshotName)
.execute(assertingListener(() -> createdSnapshot.set(true)))))));
.execute(assertNoFailureListener(() -> createdSnapshot.set(true)))))));
deterministicTaskQueue.runAllRunnableTasks();
assertTrue(createdSnapshot.get());
SnapshotsInProgress finalSnapshotsInProgress = masterNode.currentState.get().custom(SnapshotsInProgress.TYPE);
SnapshotsInProgress finalSnapshotsInProgress = masterNode.clusterService.state().custom(SnapshotsInProgress.TYPE);
assertFalse(finalSnapshotsInProgress.entries().stream().anyMatch(entry -> entry.state().completed() == false));
final Repository repository = masterNode.repositoriesService.repository(repoName);
Collection<SnapshotId> snapshotIds = repository.getRepositoryData().getSnapshotIds();
@ -191,7 +197,52 @@ public class SnapshotsServiceTests extends ESTestCase {
assertEquals(0, snapshotInfo.failedShards());
}
private static <T> ActionListener<T> assertingListener(Runnable r) {
private void startCluster() {
final ClusterState initialClusterState =
new ClusterState.Builder(ClusterName.DEFAULT).nodes(testClusterNodes.randomDiscoveryNodes()).build();
testClusterNodes.nodes.values().forEach(testClusterNode -> testClusterNode.start(initialClusterState));
deterministicTaskQueue.advanceTime();
deterministicTaskQueue.runAllRunnableTasks();
final BootstrapConfiguration bootstrapConfiguration = new BootstrapConfiguration(
testClusterNodes.nodes.values().stream().filter(n -> n.node.isMasterNode())
.map(node -> new BootstrapConfiguration.NodeDescription(node.node))
.distinct()
.collect(Collectors.toList()));
testClusterNodes.nodes.values().stream().filter(n -> n.node.isMasterNode()).forEach(
testClusterNode -> testClusterNode.coordinator.setInitialConfiguration(bootstrapConfiguration)
);
runUntil(
() -> {
List<String> masterNodeIds = testClusterNodes.nodes.values().stream()
.map(node -> node.clusterService.state().nodes().getMasterNodeId())
.distinct().collect(Collectors.toList());
return masterNodeIds.size() == 1 && masterNodeIds.contains(null) == false;
},
TimeUnit.SECONDS.toMillis(30L)
);
}
private void runUntil(Supplier<Boolean> fulfilled, long timeout) {
final long start = deterministicTaskQueue.getCurrentTimeMillis();
while (timeout > deterministicTaskQueue.getCurrentTimeMillis() - start) {
deterministicTaskQueue.runAllRunnableTasks();
if (fulfilled.get()) {
return;
}
deterministicTaskQueue.advanceTime();
}
fail("Condition wasn't fulfilled.");
}
private void setupTestCluster(int masterNodes, int dataNodes) {
testClusterNodes = new TestClusterNodes(masterNodes, dataNodes);
startCluster();
}
private static <T> ActionListener<T> assertNoFailureListener(Runnable r) {
return new ActionListener<T>() {
@Override
public void onResponse(final T t) {
@ -213,6 +264,8 @@ public class SnapshotsServiceTests extends ESTestCase {
.put(NODE_NAME_SETTING.getKey(), nodeName)
.put(PATH_HOME_SETTING.getKey(), tempDir.resolve(nodeName).toAbsolutePath())
.put(Environment.PATH_REPO_SETTING.getKey(), tempDir.resolve("repo").toAbsolutePath())
.putList(ClusterBootstrapService.INITIAL_MASTER_NODES_SETTING.getKey(),
ClusterBootstrapService.INITIAL_MASTER_NODES_SETTING.get(Settings.EMPTY))
.build());
}
@ -235,10 +288,6 @@ public class SnapshotsServiceTests extends ESTestCase {
return ClusterState.builder(state).nodes(DiscoveryNodes.builder(state.nodes()).localNodeId(node.getId())).build();
}
private static ClusterChangedEvent changeEventForNode(ClusterChangedEvent event, DiscoveryNode node) {
return new ClusterChangedEvent(event.source(), stateForNode(event.state(), node), stateForNode(event.previousState(), node));
}
private final class TestClusterNodes {
// LinkedHashMap so we have deterministic ordering when iterating over the map in tests
@ -254,8 +303,8 @@ public class SnapshotsServiceTests extends ESTestCase {
}
});
}
for (int i = masterNodes; i < dataNodes + masterNodes; ++i) {
nodes.computeIfAbsent("node" + i, nodeName -> {
for (int i = 0; i < dataNodes; ++i) {
nodes.computeIfAbsent("data-node" + i, nodeName -> {
try {
return SnapshotsServiceTests.this.newDataNode(nodeName);
} catch (IOException e) {
@ -273,10 +322,7 @@ public class SnapshotsServiceTests extends ESTestCase {
public DiscoveryNodes randomDiscoveryNodes() {
DiscoveryNodes.Builder builder = DiscoveryNodes.builder();
nodes.values().forEach(node -> builder.add(node.node));
String masterId = randomFrom(nodes.values().stream().map(node -> node.node).filter(DiscoveryNode::isMasterNode)
.map(DiscoveryNode::getId)
.collect(Collectors.toList()));
return builder.localNodeId(masterId).masterNodeId(masterId).build();
return builder.build();
}
/**
@ -296,6 +342,8 @@ public class SnapshotsServiceTests extends ESTestCase {
private final Logger logger = LogManager.getLogger(TestClusterNode.class);
private final NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(ClusterModule.getNamedWriteables());
private final TransportService transportService;
private final ClusterService clusterService;
@ -316,23 +364,30 @@ public class SnapshotsServiceTests extends ESTestCase {
private final AllocationService allocationService;
private final AtomicReference<ClusterState> currentState = new AtomicReference<>();
private final NodeClient client;
private final NodeEnvironment nodeEnv;
private final DisruptableMockTransport mockTransport;
private final ThreadPool threadPool;
private Coordinator coordinator;
TestClusterNode(DiscoveryNode node) throws IOException {
this.node = node;
final Environment environment = createEnvironment(node.getName());
masterService = new FakeThreadPoolMasterService(node.getName(), "test", deterministicTaskQueue::scheduleNow);
final Settings settings = environment.settings();
allocationService = ESAllocationTestCase.createAllocationService(settings);
final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
final ThreadPool threadPool = deterministicTaskQueue.getThreadPool();
clusterService = new ClusterService(settings, clusterSettings, threadPool, masterService);
threadPool = deterministicTaskQueue.getThreadPool();
clusterService = new ClusterService(settings, clusterSettings, masterService,
new ClusterApplierService(node.getName(), settings, clusterSettings, threadPool) {
@Override
protected PrioritizedEsThreadPoolExecutor createThreadPoolExecutor() {
return new MockSinglePrioritizingExecutor(node.getName(), deterministicTaskQueue);
}
});
mockTransport = new DisruptableMockTransport(logger) {
@Override
protected DiscoveryNode getLocalNode() {
@ -346,12 +401,24 @@ public class SnapshotsServiceTests extends ESTestCase {
@Override
protected Optional<DisruptableMockTransport> getDisruptedCapturingTransport(DiscoveryNode node, String action) {
return Optional.ofNullable(testClusterNodes.nodes.get(node.getName()).mockTransport);
final Predicate<TestClusterNode> matchesDestination;
if (action.equals(HANDSHAKE_ACTION_NAME)) {
matchesDestination = n -> n.transportService.getLocalNode().getAddress().equals(node.getAddress());
} else {
matchesDestination = n -> n.transportService.getLocalNode().equals(node);
}
return testClusterNodes.nodes.values().stream().filter(matchesDestination).findAny().map(cn -> cn.mockTransport);
}
@Override
protected void handle(DiscoveryNode sender, DiscoveryNode destination, String action, Runnable doDelivery) {
deterministicTaskQueue.scheduleNow(CoordinatorTests.onNode(destination, doDelivery));
// handshake needs to run inline as the caller blockingly waits on the result
final Runnable runnable = CoordinatorTests.onNode(destination, doDelivery);
if (action.equals(HANDSHAKE_ACTION_NAME)) {
runnable.run();
} else {
deterministicTaskQueue.scheduleNow(runnable);
}
}
};
transportService = mockTransport.createTransportService(
@ -382,6 +449,7 @@ public class SnapshotsServiceTests extends ESTestCase {
final NamedXContentRegistry namedXContentRegistry = new NamedXContentRegistry(Collections.emptyList());
final ScriptService scriptService = new ScriptService(settings, emptyMap(), emptyMap());
client = new NodeClient(settings, threadPool);
allocationService = ESAllocationTestCase.createAllocationService(settings);
final IndexScopedSettings indexScopedSettings =
new IndexScopedSettings(settings, IndexScopedSettings.BUILT_IN_INDEX_SETTINGS);
indicesService = new IndicesService(
@ -393,7 +461,7 @@ public class SnapshotsServiceTests extends ESTestCase {
emptyMap(), emptyMap(), emptyMap(), emptyMap()),
indexNameExpressionResolver,
new MapperRegistry(emptyMap(), emptyMap(), MapperPlugin.NOOP_FIELD_FILTER),
new NamedWriteableRegistry(Collections.emptyList()),
namedWriteableRegistry,
threadPool,
indexScopedSettings,
new NoneCircuitBreakerService(),
@ -464,35 +532,23 @@ public class SnapshotsServiceTests extends ESTestCase {
transportService.acceptIncomingRequests();
snapshotsService.start();
snapshotShardsService.start();
// Mock publisher that invokes other cluster change listeners directly
masterService.setClusterStatePublisher((clusterChangedEvent, publishListener, ackListener) -> {
final AtomicInteger applyCounter = new AtomicInteger(testClusterNodes.nodes.size());
testClusterNodes.nodes.values().forEach(
n ->
deterministicTaskQueue.scheduleNow(() -> {
assertThat(n.currentState.get().version(), lessThan(clusterChangedEvent.state().version()));
ClusterChangedEvent adjustedEvent = changeEventForNode(clusterChangedEvent, n.node);
n.repositoriesService.applyClusterState(adjustedEvent);
n.snapshotsService.applyClusterState(adjustedEvent);
n.snapshotShardsService.clusterChanged(adjustedEvent);
n.indicesClusterStateService.applyClusterState(adjustedEvent);
n.currentState.set(adjustedEvent.state());
if (applyCounter.decrementAndGet() == 0) {
publishListener.onResponse(null);
ackListener.onCommit(TimeValue.timeValueMillis(deterministicTaskQueue.getLatestDeferredExecutionTime()));
}
}));
});
masterService.setClusterStateSupplier(currentState::get);
final CoordinationState.PersistedState persistedState =
new InMemoryPersistedState(0L, stateForNode(initialState, node));
coordinator = new Coordinator(node.getName(), clusterService.getSettings(),
clusterService.getClusterSettings(), transportService, namedWriteableRegistry,
allocationService, masterService, () -> persistedState,
hostsResolver -> testClusterNodes.nodes.values().stream().filter(n -> n.node.isMasterNode())
.map(n -> n.node.getAddress()).collect(Collectors.toList()),
clusterService.getClusterApplierService(), random());
masterService.setClusterStatePublisher(coordinator);
coordinator.start();
masterService.start();
ClusterState stateForNode = stateForNode(initialState, node);
currentState.set(stateForNode);
clusterService.getClusterApplierService().setInitialState(stateForNode);
clusterService.getClusterApplierService().setNodeConnectionsService(new NodeConnectionsService(clusterService.getSettings(),
deterministicTaskQueue.getThreadPool(), transportService));
clusterService.getClusterApplierService().setNodeConnectionsService(
new NodeConnectionsService(clusterService.getSettings(), threadPool, transportService));
clusterService.getClusterApplierService().start();
indicesService.start();
indicesClusterStateService.start();
coordinator.startInitialJoin();
}
}
}

View File

@ -0,0 +1,58 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.coordination;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* Mock single threaded {@link PrioritizedEsThreadPoolExecutor} based on {@link DeterministicTaskQueue},
* simulating the behaviour of an executor returned by {@link EsExecutors#newSinglePrioritizing}.
*/
public class MockSinglePrioritizingExecutor extends PrioritizedEsThreadPoolExecutor {
public MockSinglePrioritizingExecutor(String name, DeterministicTaskQueue deterministicTaskQueue) {
super(name, 0, 1, 0L, TimeUnit.MILLISECONDS,
r -> new Thread() {
@Override
public void start() {
deterministicTaskQueue.scheduleNow(() -> {
try {
r.run();
} catch (KillWorkerError kwe) {
// hacks everywhere
}
});
}
},
deterministicTaskQueue.getThreadPool().getThreadContext(), deterministicTaskQueue.getThreadPool().scheduler());
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
// kill worker so that next one will be scheduled
throw new KillWorkerError();
}
private static final class KillWorkerError extends Error {
}
}

View File

@ -422,7 +422,7 @@ public class DeterministicTaskQueueTests extends ESTestCase {
assertThat(strings, contains("periodic-0", "periodic-1", "periodic-2"));
}
private static DeterministicTaskQueue newTaskQueue() {
static DeterministicTaskQueue newTaskQueue() {
return newTaskQueue(random());
}

View File

@ -0,0 +1,59 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.coordination;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor;
import org.elasticsearch.common.util.concurrent.PrioritizedRunnable;
import org.elasticsearch.test.ESTestCase;
import java.util.concurrent.atomic.AtomicBoolean;
public class MockSinglePrioritizingExecutorTests extends ESTestCase {
public void testPrioritizedEsThreadPoolExecutor() {
final DeterministicTaskQueue taskQueue = DeterministicTaskQueueTests.newTaskQueue();
final PrioritizedEsThreadPoolExecutor executor = new MockSinglePrioritizingExecutor("test", taskQueue);
final AtomicBoolean called1 = new AtomicBoolean();
final AtomicBoolean called2 = new AtomicBoolean();
executor.execute(new PrioritizedRunnable(Priority.NORMAL) {
@Override
public void run() {
assertTrue(called1.compareAndSet(false, true)); // check that this is only called once
}
});
executor.execute(new PrioritizedRunnable(Priority.HIGH) {
@Override
public void run() {
assertTrue(called2.compareAndSet(false, true)); // check that this is only called once
}
});
assertFalse(called1.get());
assertFalse(called2.get());
taskQueue.runRandomTask();
assertFalse(called1.get());
assertTrue(called2.get());
taskQueue.runRandomTask();
assertTrue(called1.get());
assertTrue(called2.get());
taskQueue.runRandomTask();
assertFalse(taskQueue.hasRunnableTasks());
}
}