Treat ack timeout more like a publish timeout (#31303)

This commit changes the ack timeout mechanism so that its behavior is closer to the publish
timeout, i.e., it only comes into play after committing a cluster state. This ensures for example that
an index creation request with a low (ack) timeout value does not return before the cluster state
that contains information about the newly created index is even committed.
This commit is contained in:
Yannick Welsch 2018-06-14 18:32:35 +02:00 committed by GitHub
parent 9b293275af
commit 8f886cd4be
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 207 additions and 33 deletions

View File

@ -50,7 +50,6 @@ import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor;
import org.elasticsearch.discovery.Discovery; import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
@ -365,28 +364,11 @@ public class MasterService extends AbstractLifecycleComponent {
} }
public Discovery.AckListener createAckListener(ThreadPool threadPool, ClusterState newClusterState) { public Discovery.AckListener createAckListener(ThreadPool threadPool, ClusterState newClusterState) {
ArrayList<Discovery.AckListener> ackListeners = new ArrayList<>(); return new DelegatingAckListener(nonFailedTasks.stream()
.filter(task -> task.listener instanceof AckedClusterStateTaskListener)
//timeout straightaway, otherwise we could wait forever as the timeout thread has not started .map(task -> new AckCountDownListener((AckedClusterStateTaskListener) task.listener, newClusterState.version(),
nonFailedTasks.stream().filter(task -> task.listener instanceof AckedClusterStateTaskListener).forEach(task -> { newClusterState.nodes(), threadPool))
final AckedClusterStateTaskListener ackedListener = (AckedClusterStateTaskListener) task.listener; .collect(Collectors.toList()));
if (ackedListener.ackTimeout() == null || ackedListener.ackTimeout().millis() == 0) {
ackedListener.onAckTimeout();
} else {
try {
ackListeners.add(new AckCountDownListener(ackedListener, newClusterState.version(), newClusterState.nodes(),
threadPool));
} catch (EsRejectedExecutionException ex) {
if (logger.isDebugEnabled()) {
logger.debug("Couldn't schedule timeout thread - node might be shutting down", ex);
}
//timeout straightaway, otherwise we could wait forever as the timeout thread has not started
ackedListener.onAckTimeout();
}
}
});
return new DelegatingAckListener(ackListeners);
} }
public boolean clusterStateUnchanged() { public boolean clusterStateUnchanged() {
@ -549,6 +531,13 @@ public class MasterService extends AbstractLifecycleComponent {
this.listeners = listeners; this.listeners = listeners;
} }
@Override
public void onCommit(TimeValue commitTime) {
for (Discovery.AckListener listener : listeners) {
listener.onCommit(commitTime);
}
}
@Override @Override
public void onNodeAck(DiscoveryNode node, @Nullable Exception e) { public void onNodeAck(DiscoveryNode node, @Nullable Exception e) {
for (Discovery.AckListener listener : listeners) { for (Discovery.AckListener listener : listeners) {
@ -564,14 +553,16 @@ public class MasterService extends AbstractLifecycleComponent {
private final AckedClusterStateTaskListener ackedTaskListener; private final AckedClusterStateTaskListener ackedTaskListener;
private final CountDown countDown; private final CountDown countDown;
private final DiscoveryNode masterNode; private final DiscoveryNode masterNode;
private final ThreadPool threadPool;
private final long clusterStateVersion; private final long clusterStateVersion;
private final Future<?> ackTimeoutCallback; private volatile Future<?> ackTimeoutCallback;
private Exception lastFailure; private Exception lastFailure;
AckCountDownListener(AckedClusterStateTaskListener ackedTaskListener, long clusterStateVersion, DiscoveryNodes nodes, AckCountDownListener(AckedClusterStateTaskListener ackedTaskListener, long clusterStateVersion, DiscoveryNodes nodes,
ThreadPool threadPool) { ThreadPool threadPool) {
this.ackedTaskListener = ackedTaskListener; this.ackedTaskListener = ackedTaskListener;
this.clusterStateVersion = clusterStateVersion; this.clusterStateVersion = clusterStateVersion;
this.threadPool = threadPool;
this.masterNode = nodes.getMasterNode(); this.masterNode = nodes.getMasterNode();
int countDown = 0; int countDown = 0;
for (DiscoveryNode node : nodes) { for (DiscoveryNode node : nodes) {
@ -581,8 +572,27 @@ public class MasterService extends AbstractLifecycleComponent {
} }
} }
logger.trace("expecting {} acknowledgements for cluster_state update (version: {})", countDown, clusterStateVersion); logger.trace("expecting {} acknowledgements for cluster_state update (version: {})", countDown, clusterStateVersion);
this.countDown = new CountDown(countDown); this.countDown = new CountDown(countDown + 1); // we also wait for onCommit to be called
this.ackTimeoutCallback = threadPool.schedule(ackedTaskListener.ackTimeout(), ThreadPool.Names.GENERIC, () -> onTimeout()); }
@Override
public void onCommit(TimeValue commitTime) {
TimeValue ackTimeout = ackedTaskListener.ackTimeout();
if (ackTimeout == null) {
ackTimeout = TimeValue.ZERO;
}
final TimeValue timeLeft = TimeValue.timeValueNanos(Math.max(0, ackTimeout.nanos() - commitTime.nanos()));
if (timeLeft.nanos() == 0L) {
onTimeout();
} else if (countDown.countDown()) {
finish();
} else {
this.ackTimeoutCallback = threadPool.schedule(timeLeft, ThreadPool.Names.GENERIC, this::onTimeout);
// re-check if onNodeAck has not completed while we were scheduling the timeout
if (countDown.isCountedDown()) {
FutureUtils.cancel(ackTimeoutCallback);
}
}
} }
@Override @Override
@ -599,11 +609,15 @@ public class MasterService extends AbstractLifecycleComponent {
} }
if (countDown.countDown()) { if (countDown.countDown()) {
finish();
}
}
private void finish() {
logger.trace("all expected nodes acknowledged cluster_state update (version: {})", clusterStateVersion); logger.trace("all expected nodes acknowledged cluster_state update (version: {})", clusterStateVersion);
FutureUtils.cancel(ackTimeoutCallback); FutureUtils.cancel(ackTimeoutCallback);
ackedTaskListener.onAllNodesAcked(lastFailure); ackedTaskListener.onAllNodesAcked(lastFailure);
} }
}
public void onTimeout() { public void onTimeout() {
if (countDown.fastForward()) { if (countDown.fastForward()) {

View File

@ -25,6 +25,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.component.LifecycleComponent; import org.elasticsearch.common.component.LifecycleComponent;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.unit.TimeValue;
import java.io.IOException; import java.io.IOException;
@ -48,6 +49,19 @@ public interface Discovery extends LifecycleComponent {
void publish(ClusterChangedEvent clusterChangedEvent, AckListener ackListener); void publish(ClusterChangedEvent clusterChangedEvent, AckListener ackListener);
interface AckListener { interface AckListener {
/**
* Should be called when the discovery layer has committed the clusters state (i.e. even if this publication fails,
* it is guaranteed to appear in future publications).
* @param commitTime the time it took to commit the cluster state
*/
void onCommit(TimeValue commitTime);
/**
* Should be called whenever the discovery layer receives confirmation from a node that it has successfully applied
* the cluster state. In case of failures, an exception should be provided as parameter.
* @param node the node
* @param e the optional exception
*/
void onNodeAck(DiscoveryNode node, @Nullable Exception e); void onNodeAck(DiscoveryNode node, @Nullable Exception e);
} }

View File

@ -30,6 +30,7 @@ import org.elasticsearch.cluster.service.ClusterApplier.ClusterApplyListener;
import org.elasticsearch.cluster.service.MasterService; import org.elasticsearch.cluster.service.MasterService;
import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.discovery.Discovery; import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.discovery.DiscoveryStats; import org.elasticsearch.discovery.DiscoveryStats;
import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportService;
@ -61,6 +62,7 @@ public class SingleNodeDiscovery extends AbstractLifecycleComponent implements D
public synchronized void publish(final ClusterChangedEvent event, public synchronized void publish(final ClusterChangedEvent event,
final AckListener ackListener) { final AckListener ackListener) {
clusterState = event.state(); clusterState = event.state();
ackListener.onCommit(TimeValue.ZERO);
CountDownLatch latch = new CountDownLatch(1); CountDownLatch latch = new CountDownLatch(1);
ClusterApplyListener listener = new ClusterApplyListener() { ClusterApplyListener listener = new ClusterApplyListener() {

View File

@ -158,7 +158,8 @@ public class PublishClusterStateAction extends AbstractComponent {
} }
try { try {
innerPublish(clusterChangedEvent, nodesToPublishTo, sendingController, sendFullVersion, serializedStates, serializedDiffs); innerPublish(clusterChangedEvent, nodesToPublishTo, sendingController, ackListener, sendFullVersion, serializedStates,
serializedDiffs);
} catch (Discovery.FailedToCommitClusterStateException t) { } catch (Discovery.FailedToCommitClusterStateException t) {
throw t; throw t;
} catch (Exception e) { } catch (Exception e) {
@ -173,8 +174,9 @@ public class PublishClusterStateAction extends AbstractComponent {
} }
private void innerPublish(final ClusterChangedEvent clusterChangedEvent, final Set<DiscoveryNode> nodesToPublishTo, private void innerPublish(final ClusterChangedEvent clusterChangedEvent, final Set<DiscoveryNode> nodesToPublishTo,
final SendingController sendingController, final boolean sendFullVersion, final SendingController sendingController, final Discovery.AckListener ackListener,
final Map<Version, BytesReference> serializedStates, final Map<Version, BytesReference> serializedDiffs) { final boolean sendFullVersion, final Map<Version, BytesReference> serializedStates,
final Map<Version, BytesReference> serializedDiffs) {
final ClusterState clusterState = clusterChangedEvent.state(); final ClusterState clusterState = clusterChangedEvent.state();
final ClusterState previousState = clusterChangedEvent.previousState(); final ClusterState previousState = clusterChangedEvent.previousState();
@ -195,8 +197,12 @@ public class PublishClusterStateAction extends AbstractComponent {
sendingController.waitForCommit(discoverySettings.getCommitTimeout()); sendingController.waitForCommit(discoverySettings.getCommitTimeout());
final long commitTime = System.nanoTime() - publishingStartInNanos;
ackListener.onCommit(TimeValue.timeValueNanos(commitTime));
try { try {
long timeLeftInNanos = Math.max(0, publishTimeout.nanos() - (System.nanoTime() - publishingStartInNanos)); long timeLeftInNanos = Math.max(0, publishTimeout.nanos() - commitTime);
final BlockingClusterStatePublishResponseHandler publishResponseHandler = sendingController.getPublishResponseHandler(); final BlockingClusterStatePublishResponseHandler publishResponseHandler = sendingController.getPublishResponseHandler();
sendingController.setPublishingTimedOut(!publishResponseHandler.awaitAllNodes(TimeValue.timeValueNanos(timeLeftInNanos))); sendingController.setPublishingTimedOut(!publishResponseHandler.awaitAllNodes(TimeValue.timeValueNanos(timeLeftInNanos)));
if (sendingController.getPublishingTimedOut()) { if (sendingController.getPublishingTimedOut()) {

View File

@ -22,6 +22,7 @@ import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version; import org.elasticsearch.Version;
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
@ -39,6 +40,7 @@ import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.BaseFuture; import org.elasticsearch.common.util.concurrent.BaseFuture;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.MockLogAppender; import org.elasticsearch.test.MockLogAppender;
import org.elasticsearch.test.junit.annotations.TestLogging; import org.elasticsearch.test.junit.annotations.TestLogging;
@ -65,6 +67,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import static java.util.Collections.emptyMap; import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet; import static java.util.Collections.emptySet;
@ -680,6 +683,132 @@ public class MasterServiceTests extends ESTestCase {
mockAppender.assertAllExpectationsMatched(); mockAppender.assertAllExpectationsMatched();
} }
public void testAcking() throws InterruptedException {
final DiscoveryNode node1 = new DiscoveryNode("node1", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT);
final DiscoveryNode node2 = new DiscoveryNode("node2", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT);
final DiscoveryNode node3 = new DiscoveryNode("node3", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT);
TimedMasterService timedMasterService = new TimedMasterService(Settings.builder().put("cluster.name",
MasterServiceTests.class.getSimpleName()).build(), threadPool);
ClusterState initialClusterState = ClusterState.builder(new ClusterName(MasterServiceTests.class.getSimpleName()))
.nodes(DiscoveryNodes.builder()
.add(node1)
.add(node2)
.add(node3)
.localNodeId(node1.getId())
.masterNodeId(node1.getId()))
.blocks(ClusterBlocks.EMPTY_CLUSTER_BLOCK).build();
final AtomicReference<BiConsumer<ClusterChangedEvent, Discovery.AckListener>> publisherRef = new AtomicReference<>();
timedMasterService.setClusterStatePublisher((cce, l) -> publisherRef.get().accept(cce, l));
timedMasterService.setClusterStateSupplier(() -> initialClusterState);
timedMasterService.start();
// check that we don't time out before even committing the cluster state
{
final CountDownLatch latch = new CountDownLatch(1);
publisherRef.set((clusterChangedEvent, ackListener) -> {
throw new Discovery.FailedToCommitClusterStateException("mock exception");
});
timedMasterService.submitStateUpdateTask("test2", new AckedClusterStateUpdateTask<Void>(null, null) {
@Override
public ClusterState execute(ClusterState currentState) {
return ClusterState.builder(currentState).build();
}
@Override
public TimeValue ackTimeout() {
return TimeValue.ZERO;
}
@Override
public TimeValue timeout() {
return null;
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
fail();
}
@Override
protected Void newResponse(boolean acknowledged) {
fail();
return null;
}
@Override
public void onFailure(String source, Exception e) {
latch.countDown();
}
@Override
public void onAckTimeout() {
fail();
}
});
latch.await();
}
// check that we timeout if commit took too long
{
final CountDownLatch latch = new CountDownLatch(2);
final TimeValue ackTimeout = TimeValue.timeValueMillis(randomInt(100));
publisherRef.set((clusterChangedEvent, ackListener) -> {
ackListener.onCommit(TimeValue.timeValueMillis(ackTimeout.millis() + randomInt(100)));
ackListener.onNodeAck(node1, null);
ackListener.onNodeAck(node2, null);
ackListener.onNodeAck(node3, null);
});
timedMasterService.submitStateUpdateTask("test2", new AckedClusterStateUpdateTask<Void>(null, null) {
@Override
public ClusterState execute(ClusterState currentState) {
return ClusterState.builder(currentState).build();
}
@Override
public TimeValue ackTimeout() {
return ackTimeout;
}
@Override
public TimeValue timeout() {
return null;
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
latch.countDown();
}
@Override
protected Void newResponse(boolean acknowledged) {
fail();
return null;
}
@Override
public void onFailure(String source, Exception e) {
fail();
}
@Override
public void onAckTimeout() {
latch.countDown();
}
});
latch.await();
}
timedMasterService.close();
}
static class TimedMasterService extends MasterService { static class TimedMasterService extends MasterService {
public volatile Long currentTimeOverride = null; public volatile Long currentTimeOverride = null;

View File

@ -42,6 +42,7 @@ import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.discovery.Discovery; import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.discovery.DiscoverySettings; import org.elasticsearch.discovery.DiscoverySettings;
import org.elasticsearch.node.Node; import org.elasticsearch.node.Node;
@ -815,9 +816,16 @@ public class PublishClusterStateActionTests extends ESTestCase {
public static class AssertingAckListener implements Discovery.AckListener { public static class AssertingAckListener implements Discovery.AckListener {
private final List<Tuple<DiscoveryNode, Throwable>> errors = new CopyOnWriteArrayList<>(); private final List<Tuple<DiscoveryNode, Throwable>> errors = new CopyOnWriteArrayList<>();
private final CountDownLatch countDown; private final CountDownLatch countDown;
private final CountDownLatch commitCountDown;
public AssertingAckListener(int nodeCount) { public AssertingAckListener(int nodeCount) {
countDown = new CountDownLatch(nodeCount); countDown = new CountDownLatch(nodeCount);
commitCountDown = new CountDownLatch(1);
}
@Override
public void onCommit(TimeValue commitTime) {
commitCountDown.countDown();
} }
@Override @Override
@ -830,6 +838,7 @@ public class PublishClusterStateActionTests extends ESTestCase {
public void await(long timeout, TimeUnit unit) throws InterruptedException { public void await(long timeout, TimeUnit unit) throws InterruptedException {
assertThat(awaitErrors(timeout, unit), emptyIterable()); assertThat(awaitErrors(timeout, unit), emptyIterable());
assertTrue(commitCountDown.await(timeout, unit));
} }
public List<Tuple<DiscoveryNode, Throwable>> awaitErrors(long timeout, TimeUnit unit) throws InterruptedException { public List<Tuple<DiscoveryNode, Throwable>> awaitErrors(long timeout, TimeUnit unit) throws InterruptedException {