Treat ack timeout more like a publish timeout (#31303)
This commit changes the ack timeout mechanism so that its behavior is closer to the publish timeout, i.e., it only comes into play after committing a cluster state. This ensures for example that an index creation request with a low (ack) timeout value does not return before the cluster state that contains information about the newly created index is even committed.
This commit is contained in:
parent
9b293275af
commit
8f886cd4be
|
@ -50,7 +50,6 @@ import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor;
|
||||||
import org.elasticsearch.discovery.Discovery;
|
import org.elasticsearch.discovery.Discovery;
|
||||||
import org.elasticsearch.threadpool.ThreadPool;
|
import org.elasticsearch.threadpool.ThreadPool;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
@ -365,28 +364,11 @@ public class MasterService extends AbstractLifecycleComponent {
|
||||||
}
|
}
|
||||||
|
|
||||||
public Discovery.AckListener createAckListener(ThreadPool threadPool, ClusterState newClusterState) {
|
public Discovery.AckListener createAckListener(ThreadPool threadPool, ClusterState newClusterState) {
|
||||||
ArrayList<Discovery.AckListener> ackListeners = new ArrayList<>();
|
return new DelegatingAckListener(nonFailedTasks.stream()
|
||||||
|
.filter(task -> task.listener instanceof AckedClusterStateTaskListener)
|
||||||
//timeout straightaway, otherwise we could wait forever as the timeout thread has not started
|
.map(task -> new AckCountDownListener((AckedClusterStateTaskListener) task.listener, newClusterState.version(),
|
||||||
nonFailedTasks.stream().filter(task -> task.listener instanceof AckedClusterStateTaskListener).forEach(task -> {
|
newClusterState.nodes(), threadPool))
|
||||||
final AckedClusterStateTaskListener ackedListener = (AckedClusterStateTaskListener) task.listener;
|
.collect(Collectors.toList()));
|
||||||
if (ackedListener.ackTimeout() == null || ackedListener.ackTimeout().millis() == 0) {
|
|
||||||
ackedListener.onAckTimeout();
|
|
||||||
} else {
|
|
||||||
try {
|
|
||||||
ackListeners.add(new AckCountDownListener(ackedListener, newClusterState.version(), newClusterState.nodes(),
|
|
||||||
threadPool));
|
|
||||||
} catch (EsRejectedExecutionException ex) {
|
|
||||||
if (logger.isDebugEnabled()) {
|
|
||||||
logger.debug("Couldn't schedule timeout thread - node might be shutting down", ex);
|
|
||||||
}
|
|
||||||
//timeout straightaway, otherwise we could wait forever as the timeout thread has not started
|
|
||||||
ackedListener.onAckTimeout();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
return new DelegatingAckListener(ackListeners);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean clusterStateUnchanged() {
|
public boolean clusterStateUnchanged() {
|
||||||
|
@ -549,6 +531,13 @@ public class MasterService extends AbstractLifecycleComponent {
|
||||||
this.listeners = listeners;
|
this.listeners = listeners;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onCommit(TimeValue commitTime) {
|
||||||
|
for (Discovery.AckListener listener : listeners) {
|
||||||
|
listener.onCommit(commitTime);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onNodeAck(DiscoveryNode node, @Nullable Exception e) {
|
public void onNodeAck(DiscoveryNode node, @Nullable Exception e) {
|
||||||
for (Discovery.AckListener listener : listeners) {
|
for (Discovery.AckListener listener : listeners) {
|
||||||
|
@ -564,14 +553,16 @@ public class MasterService extends AbstractLifecycleComponent {
|
||||||
private final AckedClusterStateTaskListener ackedTaskListener;
|
private final AckedClusterStateTaskListener ackedTaskListener;
|
||||||
private final CountDown countDown;
|
private final CountDown countDown;
|
||||||
private final DiscoveryNode masterNode;
|
private final DiscoveryNode masterNode;
|
||||||
|
private final ThreadPool threadPool;
|
||||||
private final long clusterStateVersion;
|
private final long clusterStateVersion;
|
||||||
private final Future<?> ackTimeoutCallback;
|
private volatile Future<?> ackTimeoutCallback;
|
||||||
private Exception lastFailure;
|
private Exception lastFailure;
|
||||||
|
|
||||||
AckCountDownListener(AckedClusterStateTaskListener ackedTaskListener, long clusterStateVersion, DiscoveryNodes nodes,
|
AckCountDownListener(AckedClusterStateTaskListener ackedTaskListener, long clusterStateVersion, DiscoveryNodes nodes,
|
||||||
ThreadPool threadPool) {
|
ThreadPool threadPool) {
|
||||||
this.ackedTaskListener = ackedTaskListener;
|
this.ackedTaskListener = ackedTaskListener;
|
||||||
this.clusterStateVersion = clusterStateVersion;
|
this.clusterStateVersion = clusterStateVersion;
|
||||||
|
this.threadPool = threadPool;
|
||||||
this.masterNode = nodes.getMasterNode();
|
this.masterNode = nodes.getMasterNode();
|
||||||
int countDown = 0;
|
int countDown = 0;
|
||||||
for (DiscoveryNode node : nodes) {
|
for (DiscoveryNode node : nodes) {
|
||||||
|
@ -581,8 +572,27 @@ public class MasterService extends AbstractLifecycleComponent {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
logger.trace("expecting {} acknowledgements for cluster_state update (version: {})", countDown, clusterStateVersion);
|
logger.trace("expecting {} acknowledgements for cluster_state update (version: {})", countDown, clusterStateVersion);
|
||||||
this.countDown = new CountDown(countDown);
|
this.countDown = new CountDown(countDown + 1); // we also wait for onCommit to be called
|
||||||
this.ackTimeoutCallback = threadPool.schedule(ackedTaskListener.ackTimeout(), ThreadPool.Names.GENERIC, () -> onTimeout());
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onCommit(TimeValue commitTime) {
|
||||||
|
TimeValue ackTimeout = ackedTaskListener.ackTimeout();
|
||||||
|
if (ackTimeout == null) {
|
||||||
|
ackTimeout = TimeValue.ZERO;
|
||||||
|
}
|
||||||
|
final TimeValue timeLeft = TimeValue.timeValueNanos(Math.max(0, ackTimeout.nanos() - commitTime.nanos()));
|
||||||
|
if (timeLeft.nanos() == 0L) {
|
||||||
|
onTimeout();
|
||||||
|
} else if (countDown.countDown()) {
|
||||||
|
finish();
|
||||||
|
} else {
|
||||||
|
this.ackTimeoutCallback = threadPool.schedule(timeLeft, ThreadPool.Names.GENERIC, this::onTimeout);
|
||||||
|
// re-check if onNodeAck has not completed while we were scheduling the timeout
|
||||||
|
if (countDown.isCountedDown()) {
|
||||||
|
FutureUtils.cancel(ackTimeoutCallback);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -599,12 +609,16 @@ public class MasterService extends AbstractLifecycleComponent {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (countDown.countDown()) {
|
if (countDown.countDown()) {
|
||||||
logger.trace("all expected nodes acknowledged cluster_state update (version: {})", clusterStateVersion);
|
finish();
|
||||||
FutureUtils.cancel(ackTimeoutCallback);
|
|
||||||
ackedTaskListener.onAllNodesAcked(lastFailure);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void finish() {
|
||||||
|
logger.trace("all expected nodes acknowledged cluster_state update (version: {})", clusterStateVersion);
|
||||||
|
FutureUtils.cancel(ackTimeoutCallback);
|
||||||
|
ackedTaskListener.onAllNodesAcked(lastFailure);
|
||||||
|
}
|
||||||
|
|
||||||
public void onTimeout() {
|
public void onTimeout() {
|
||||||
if (countDown.fastForward()) {
|
if (countDown.fastForward()) {
|
||||||
logger.trace("timeout waiting for acknowledgement for cluster_state update (version: {})", clusterStateVersion);
|
logger.trace("timeout waiting for acknowledgement for cluster_state update (version: {})", clusterStateVersion);
|
||||||
|
|
|
@ -25,6 +25,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||||
import org.elasticsearch.common.Nullable;
|
import org.elasticsearch.common.Nullable;
|
||||||
import org.elasticsearch.common.component.LifecycleComponent;
|
import org.elasticsearch.common.component.LifecycleComponent;
|
||||||
import org.elasticsearch.common.io.stream.StreamInput;
|
import org.elasticsearch.common.io.stream.StreamInput;
|
||||||
|
import org.elasticsearch.common.unit.TimeValue;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
|
||||||
|
@ -48,6 +49,19 @@ public interface Discovery extends LifecycleComponent {
|
||||||
void publish(ClusterChangedEvent clusterChangedEvent, AckListener ackListener);
|
void publish(ClusterChangedEvent clusterChangedEvent, AckListener ackListener);
|
||||||
|
|
||||||
interface AckListener {
|
interface AckListener {
|
||||||
|
/**
|
||||||
|
* Should be called when the discovery layer has committed the clusters state (i.e. even if this publication fails,
|
||||||
|
* it is guaranteed to appear in future publications).
|
||||||
|
* @param commitTime the time it took to commit the cluster state
|
||||||
|
*/
|
||||||
|
void onCommit(TimeValue commitTime);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Should be called whenever the discovery layer receives confirmation from a node that it has successfully applied
|
||||||
|
* the cluster state. In case of failures, an exception should be provided as parameter.
|
||||||
|
* @param node the node
|
||||||
|
* @param e the optional exception
|
||||||
|
*/
|
||||||
void onNodeAck(DiscoveryNode node, @Nullable Exception e);
|
void onNodeAck(DiscoveryNode node, @Nullable Exception e);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -30,6 +30,7 @@ import org.elasticsearch.cluster.service.ClusterApplier.ClusterApplyListener;
|
||||||
import org.elasticsearch.cluster.service.MasterService;
|
import org.elasticsearch.cluster.service.MasterService;
|
||||||
import org.elasticsearch.common.component.AbstractLifecycleComponent;
|
import org.elasticsearch.common.component.AbstractLifecycleComponent;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
|
import org.elasticsearch.common.unit.TimeValue;
|
||||||
import org.elasticsearch.discovery.Discovery;
|
import org.elasticsearch.discovery.Discovery;
|
||||||
import org.elasticsearch.discovery.DiscoveryStats;
|
import org.elasticsearch.discovery.DiscoveryStats;
|
||||||
import org.elasticsearch.transport.TransportService;
|
import org.elasticsearch.transport.TransportService;
|
||||||
|
@ -61,6 +62,7 @@ public class SingleNodeDiscovery extends AbstractLifecycleComponent implements D
|
||||||
public synchronized void publish(final ClusterChangedEvent event,
|
public synchronized void publish(final ClusterChangedEvent event,
|
||||||
final AckListener ackListener) {
|
final AckListener ackListener) {
|
||||||
clusterState = event.state();
|
clusterState = event.state();
|
||||||
|
ackListener.onCommit(TimeValue.ZERO);
|
||||||
CountDownLatch latch = new CountDownLatch(1);
|
CountDownLatch latch = new CountDownLatch(1);
|
||||||
|
|
||||||
ClusterApplyListener listener = new ClusterApplyListener() {
|
ClusterApplyListener listener = new ClusterApplyListener() {
|
||||||
|
|
|
@ -158,7 +158,8 @@ public class PublishClusterStateAction extends AbstractComponent {
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
innerPublish(clusterChangedEvent, nodesToPublishTo, sendingController, sendFullVersion, serializedStates, serializedDiffs);
|
innerPublish(clusterChangedEvent, nodesToPublishTo, sendingController, ackListener, sendFullVersion, serializedStates,
|
||||||
|
serializedDiffs);
|
||||||
} catch (Discovery.FailedToCommitClusterStateException t) {
|
} catch (Discovery.FailedToCommitClusterStateException t) {
|
||||||
throw t;
|
throw t;
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
|
@ -173,8 +174,9 @@ public class PublishClusterStateAction extends AbstractComponent {
|
||||||
}
|
}
|
||||||
|
|
||||||
private void innerPublish(final ClusterChangedEvent clusterChangedEvent, final Set<DiscoveryNode> nodesToPublishTo,
|
private void innerPublish(final ClusterChangedEvent clusterChangedEvent, final Set<DiscoveryNode> nodesToPublishTo,
|
||||||
final SendingController sendingController, final boolean sendFullVersion,
|
final SendingController sendingController, final Discovery.AckListener ackListener,
|
||||||
final Map<Version, BytesReference> serializedStates, final Map<Version, BytesReference> serializedDiffs) {
|
final boolean sendFullVersion, final Map<Version, BytesReference> serializedStates,
|
||||||
|
final Map<Version, BytesReference> serializedDiffs) {
|
||||||
|
|
||||||
final ClusterState clusterState = clusterChangedEvent.state();
|
final ClusterState clusterState = clusterChangedEvent.state();
|
||||||
final ClusterState previousState = clusterChangedEvent.previousState();
|
final ClusterState previousState = clusterChangedEvent.previousState();
|
||||||
|
@ -195,8 +197,12 @@ public class PublishClusterStateAction extends AbstractComponent {
|
||||||
|
|
||||||
sendingController.waitForCommit(discoverySettings.getCommitTimeout());
|
sendingController.waitForCommit(discoverySettings.getCommitTimeout());
|
||||||
|
|
||||||
|
final long commitTime = System.nanoTime() - publishingStartInNanos;
|
||||||
|
|
||||||
|
ackListener.onCommit(TimeValue.timeValueNanos(commitTime));
|
||||||
|
|
||||||
try {
|
try {
|
||||||
long timeLeftInNanos = Math.max(0, publishTimeout.nanos() - (System.nanoTime() - publishingStartInNanos));
|
long timeLeftInNanos = Math.max(0, publishTimeout.nanos() - commitTime);
|
||||||
final BlockingClusterStatePublishResponseHandler publishResponseHandler = sendingController.getPublishResponseHandler();
|
final BlockingClusterStatePublishResponseHandler publishResponseHandler = sendingController.getPublishResponseHandler();
|
||||||
sendingController.setPublishingTimedOut(!publishResponseHandler.awaitAllNodes(TimeValue.timeValueNanos(timeLeftInNanos)));
|
sendingController.setPublishingTimedOut(!publishResponseHandler.awaitAllNodes(TimeValue.timeValueNanos(timeLeftInNanos)));
|
||||||
if (sendingController.getPublishingTimedOut()) {
|
if (sendingController.getPublishingTimedOut()) {
|
||||||
|
|
|
@ -22,6 +22,7 @@ import org.apache.logging.log4j.Level;
|
||||||
import org.apache.logging.log4j.Logger;
|
import org.apache.logging.log4j.Logger;
|
||||||
import org.elasticsearch.ExceptionsHelper;
|
import org.elasticsearch.ExceptionsHelper;
|
||||||
import org.elasticsearch.Version;
|
import org.elasticsearch.Version;
|
||||||
|
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
|
||||||
import org.elasticsearch.cluster.ClusterChangedEvent;
|
import org.elasticsearch.cluster.ClusterChangedEvent;
|
||||||
import org.elasticsearch.cluster.ClusterName;
|
import org.elasticsearch.cluster.ClusterName;
|
||||||
import org.elasticsearch.cluster.ClusterState;
|
import org.elasticsearch.cluster.ClusterState;
|
||||||
|
@ -39,6 +40,7 @@ import org.elasticsearch.common.logging.Loggers;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.common.unit.TimeValue;
|
import org.elasticsearch.common.unit.TimeValue;
|
||||||
import org.elasticsearch.common.util.concurrent.BaseFuture;
|
import org.elasticsearch.common.util.concurrent.BaseFuture;
|
||||||
|
import org.elasticsearch.discovery.Discovery;
|
||||||
import org.elasticsearch.test.ESTestCase;
|
import org.elasticsearch.test.ESTestCase;
|
||||||
import org.elasticsearch.test.MockLogAppender;
|
import org.elasticsearch.test.MockLogAppender;
|
||||||
import org.elasticsearch.test.junit.annotations.TestLogging;
|
import org.elasticsearch.test.junit.annotations.TestLogging;
|
||||||
|
@ -65,6 +67,7 @@ import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
import java.util.function.BiConsumer;
|
||||||
|
|
||||||
import static java.util.Collections.emptyMap;
|
import static java.util.Collections.emptyMap;
|
||||||
import static java.util.Collections.emptySet;
|
import static java.util.Collections.emptySet;
|
||||||
|
@ -680,6 +683,132 @@ public class MasterServiceTests extends ESTestCase {
|
||||||
mockAppender.assertAllExpectationsMatched();
|
mockAppender.assertAllExpectationsMatched();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testAcking() throws InterruptedException {
|
||||||
|
final DiscoveryNode node1 = new DiscoveryNode("node1", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT);
|
||||||
|
final DiscoveryNode node2 = new DiscoveryNode("node2", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT);
|
||||||
|
final DiscoveryNode node3 = new DiscoveryNode("node3", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT);
|
||||||
|
TimedMasterService timedMasterService = new TimedMasterService(Settings.builder().put("cluster.name",
|
||||||
|
MasterServiceTests.class.getSimpleName()).build(), threadPool);
|
||||||
|
ClusterState initialClusterState = ClusterState.builder(new ClusterName(MasterServiceTests.class.getSimpleName()))
|
||||||
|
.nodes(DiscoveryNodes.builder()
|
||||||
|
.add(node1)
|
||||||
|
.add(node2)
|
||||||
|
.add(node3)
|
||||||
|
.localNodeId(node1.getId())
|
||||||
|
.masterNodeId(node1.getId()))
|
||||||
|
.blocks(ClusterBlocks.EMPTY_CLUSTER_BLOCK).build();
|
||||||
|
final AtomicReference<BiConsumer<ClusterChangedEvent, Discovery.AckListener>> publisherRef = new AtomicReference<>();
|
||||||
|
timedMasterService.setClusterStatePublisher((cce, l) -> publisherRef.get().accept(cce, l));
|
||||||
|
timedMasterService.setClusterStateSupplier(() -> initialClusterState);
|
||||||
|
timedMasterService.start();
|
||||||
|
|
||||||
|
|
||||||
|
// check that we don't time out before even committing the cluster state
|
||||||
|
{
|
||||||
|
final CountDownLatch latch = new CountDownLatch(1);
|
||||||
|
|
||||||
|
publisherRef.set((clusterChangedEvent, ackListener) -> {
|
||||||
|
throw new Discovery.FailedToCommitClusterStateException("mock exception");
|
||||||
|
});
|
||||||
|
|
||||||
|
timedMasterService.submitStateUpdateTask("test2", new AckedClusterStateUpdateTask<Void>(null, null) {
|
||||||
|
@Override
|
||||||
|
public ClusterState execute(ClusterState currentState) {
|
||||||
|
return ClusterState.builder(currentState).build();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public TimeValue ackTimeout() {
|
||||||
|
return TimeValue.ZERO;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public TimeValue timeout() {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
|
||||||
|
fail();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected Void newResponse(boolean acknowledged) {
|
||||||
|
fail();
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onFailure(String source, Exception e) {
|
||||||
|
latch.countDown();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onAckTimeout() {
|
||||||
|
fail();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
latch.await();
|
||||||
|
}
|
||||||
|
|
||||||
|
// check that we timeout if commit took too long
|
||||||
|
{
|
||||||
|
final CountDownLatch latch = new CountDownLatch(2);
|
||||||
|
|
||||||
|
final TimeValue ackTimeout = TimeValue.timeValueMillis(randomInt(100));
|
||||||
|
|
||||||
|
publisherRef.set((clusterChangedEvent, ackListener) -> {
|
||||||
|
ackListener.onCommit(TimeValue.timeValueMillis(ackTimeout.millis() + randomInt(100)));
|
||||||
|
ackListener.onNodeAck(node1, null);
|
||||||
|
ackListener.onNodeAck(node2, null);
|
||||||
|
ackListener.onNodeAck(node3, null);
|
||||||
|
});
|
||||||
|
|
||||||
|
timedMasterService.submitStateUpdateTask("test2", new AckedClusterStateUpdateTask<Void>(null, null) {
|
||||||
|
@Override
|
||||||
|
public ClusterState execute(ClusterState currentState) {
|
||||||
|
return ClusterState.builder(currentState).build();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public TimeValue ackTimeout() {
|
||||||
|
return ackTimeout;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public TimeValue timeout() {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
|
||||||
|
latch.countDown();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected Void newResponse(boolean acknowledged) {
|
||||||
|
fail();
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onFailure(String source, Exception e) {
|
||||||
|
fail();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onAckTimeout() {
|
||||||
|
latch.countDown();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
latch.await();
|
||||||
|
}
|
||||||
|
|
||||||
|
timedMasterService.close();
|
||||||
|
}
|
||||||
|
|
||||||
static class TimedMasterService extends MasterService {
|
static class TimedMasterService extends MasterService {
|
||||||
|
|
||||||
public volatile Long currentTimeOverride = null;
|
public volatile Long currentTimeOverride = null;
|
||||||
|
|
|
@ -42,6 +42,7 @@ import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
||||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||||
import org.elasticsearch.common.settings.ClusterSettings;
|
import org.elasticsearch.common.settings.ClusterSettings;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
|
import org.elasticsearch.common.unit.TimeValue;
|
||||||
import org.elasticsearch.discovery.Discovery;
|
import org.elasticsearch.discovery.Discovery;
|
||||||
import org.elasticsearch.discovery.DiscoverySettings;
|
import org.elasticsearch.discovery.DiscoverySettings;
|
||||||
import org.elasticsearch.node.Node;
|
import org.elasticsearch.node.Node;
|
||||||
|
@ -815,9 +816,16 @@ public class PublishClusterStateActionTests extends ESTestCase {
|
||||||
public static class AssertingAckListener implements Discovery.AckListener {
|
public static class AssertingAckListener implements Discovery.AckListener {
|
||||||
private final List<Tuple<DiscoveryNode, Throwable>> errors = new CopyOnWriteArrayList<>();
|
private final List<Tuple<DiscoveryNode, Throwable>> errors = new CopyOnWriteArrayList<>();
|
||||||
private final CountDownLatch countDown;
|
private final CountDownLatch countDown;
|
||||||
|
private final CountDownLatch commitCountDown;
|
||||||
|
|
||||||
public AssertingAckListener(int nodeCount) {
|
public AssertingAckListener(int nodeCount) {
|
||||||
countDown = new CountDownLatch(nodeCount);
|
countDown = new CountDownLatch(nodeCount);
|
||||||
|
commitCountDown = new CountDownLatch(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onCommit(TimeValue commitTime) {
|
||||||
|
commitCountDown.countDown();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -830,6 +838,7 @@ public class PublishClusterStateActionTests extends ESTestCase {
|
||||||
|
|
||||||
public void await(long timeout, TimeUnit unit) throws InterruptedException {
|
public void await(long timeout, TimeUnit unit) throws InterruptedException {
|
||||||
assertThat(awaitErrors(timeout, unit), emptyIterable());
|
assertThat(awaitErrors(timeout, unit), emptyIterable());
|
||||||
|
assertTrue(commitCountDown.await(timeout, unit));
|
||||||
}
|
}
|
||||||
|
|
||||||
public List<Tuple<DiscoveryNode, Throwable>> awaitErrors(long timeout, TimeUnit unit) throws InterruptedException {
|
public List<Tuple<DiscoveryNode, Throwable>> awaitErrors(long timeout, TimeUnit unit) throws InterruptedException {
|
||||||
|
|
Loading…
Reference in New Issue