Use cancel instead of timeout for aborting publications (#37670)

When publications were cancelled because a node turned to follower or candidate, it would still
show as time out, which can be confusing in the logs. This change adapts the improper call of
onTimeout by generalizing it to a cancel method.
This commit is contained in:
Yannick Welsch 2019-01-22 12:51:03 +01:00 committed by GitHub
parent ef2f5e4a13
commit 2a7b7ccf1c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 23 additions and 23 deletions

View File

@ -446,7 +446,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
if (mode != Mode.CANDIDATE) { if (mode != Mode.CANDIDATE) {
mode = Mode.CANDIDATE; mode = Mode.CANDIDATE;
cancelActivePublication(); cancelActivePublication("become candidate: " + method);
joinAccumulator.close(mode); joinAccumulator.close(mode);
joinAccumulator = joinHelper.new CandidateJoinAccumulator(); joinAccumulator = joinHelper.new CandidateJoinAccumulator();
@ -518,7 +518,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
discoveryUpgradeService.deactivate(); discoveryUpgradeService.deactivate();
clusterFormationFailureHelper.stop(); clusterFormationFailureHelper.stop();
closePrevotingAndElectionScheduler(); closePrevotingAndElectionScheduler();
cancelActivePublication(); cancelActivePublication("become follower: " + method);
preVoteCollector.update(getPreVoteResponse(), leaderNode); preVoteCollector.update(getPreVoteResponse(), leaderNode);
if (restartLeaderChecker) { if (restartLeaderChecker) {
@ -902,7 +902,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
@Override @Override
public void run() { public void run() {
synchronized (mutex) { synchronized (mutex) {
publication.onTimeout(); publication.cancel("timed out after " + publishTimeout);
} }
} }
@ -958,10 +958,10 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
}; };
} }
private void cancelActivePublication() { private void cancelActivePublication(String reason) {
assert Thread.holdsLock(mutex) : "Coordinator mutex not held"; assert Thread.holdsLock(mutex) : "Coordinator mutex not held";
if (currentPublication.isPresent()) { if (currentPublication.isPresent()) {
currentPublication.get().onTimeout(); currentPublication.get().cancel(reason);
} }
} }

View File

@ -49,7 +49,7 @@ public abstract class Publication {
private Optional<ApplyCommitRequest> applyCommitRequest; // set when state is committed private Optional<ApplyCommitRequest> applyCommitRequest; // set when state is committed
private boolean isCompleted; // set when publication is completed private boolean isCompleted; // set when publication is completed
private boolean timedOut; // set when publication timed out private boolean cancelled; // set when publication is cancelled
public Publication(PublishRequest publishRequest, AckListener ackListener, LongSupplier currentTimeSupplier) { public Publication(PublishRequest publishRequest, AckListener ackListener, LongSupplier currentTimeSupplier) {
this.publishRequest = publishRequest; this.publishRequest = publishRequest;
@ -71,17 +71,17 @@ public abstract class Publication {
publicationTargets.forEach(PublicationTarget::sendPublishRequest); publicationTargets.forEach(PublicationTarget::sendPublishRequest);
} }
public void onTimeout() { public void cancel(String reason) {
if (isCompleted) { if (isCompleted) {
return; return;
} }
assert timedOut == false; assert cancelled == false;
timedOut = true; cancelled = true;
if (applyCommitRequest.isPresent() == false) { if (applyCommitRequest.isPresent() == false) {
logger.debug("onTimeout: [{}] timed out before committing", this); logger.debug("cancel: [{}] cancelled before committing (reason: {})", this, reason);
// fail all current publications // fail all current publications
final Exception e = new ElasticsearchException("publication timed out before committing"); final Exception e = new ElasticsearchException("publication cancelled before committing: " + reason);
publicationTargets.stream().filter(PublicationTarget::isActive).forEach(pt -> pt.setFailed(e)); publicationTargets.stream().filter(PublicationTarget::isActive).forEach(pt -> pt.setFailed(e));
} }
onPossibleCompletion(); onPossibleCompletion();
@ -101,7 +101,7 @@ public abstract class Publication {
return; return;
} }
if (timedOut == false) { if (cancelled == false) {
for (final PublicationTarget target : publicationTargets) { for (final PublicationTarget target : publicationTargets) {
if (target.isActive()) { if (target.isActive()) {
return; return;
@ -125,8 +125,8 @@ public abstract class Publication {
} }
// For assertions only: verify that this invariant holds // For assertions only: verify that this invariant holds
private boolean publicationCompletedIffAllTargetsInactiveOrTimedOut() { private boolean publicationCompletedIffAllTargetsInactiveOrCancelled() {
if (timedOut == false) { if (cancelled == false) {
for (final PublicationTarget target : publicationTargets) { for (final PublicationTarget target : publicationTargets) {
if (target.isActive()) { if (target.isActive()) {
return isCompleted == false; return isCompleted == false;
@ -222,7 +222,7 @@ public abstract class Publication {
state = PublicationTargetState.SENT_PUBLISH_REQUEST; state = PublicationTargetState.SENT_PUBLISH_REQUEST;
Publication.this.sendPublishRequest(discoveryNode, publishRequest, new PublishResponseHandler()); Publication.this.sendPublishRequest(discoveryNode, publishRequest, new PublishResponseHandler());
// TODO Can this ^ fail with an exception? Target should be failed if so. // TODO Can this ^ fail with an exception? Target should be failed if so.
assert publicationCompletedIffAllTargetsInactiveOrTimedOut(); assert publicationCompletedIffAllTargetsInactiveOrCancelled();
} }
void handlePublishResponse(PublishResponse publishResponse) { void handlePublishResponse(PublishResponse publishResponse) {
@ -245,7 +245,7 @@ public abstract class Publication {
state = PublicationTargetState.SENT_APPLY_COMMIT; state = PublicationTargetState.SENT_APPLY_COMMIT;
assert applyCommitRequest.isPresent(); assert applyCommitRequest.isPresent();
Publication.this.sendApplyCommit(discoveryNode, applyCommitRequest.get(), new ApplyCommitResponseHandler()); Publication.this.sendApplyCommit(discoveryNode, applyCommitRequest.get(), new ApplyCommitResponseHandler());
assert publicationCompletedIffAllTargetsInactiveOrTimedOut(); assert publicationCompletedIffAllTargetsInactiveOrCancelled();
} }
void setAppliedCommit() { void setAppliedCommit() {
@ -300,7 +300,7 @@ public abstract class Publication {
public void onResponse(PublishWithJoinResponse response) { public void onResponse(PublishWithJoinResponse response) {
if (isFailed()) { if (isFailed()) {
logger.debug("PublishResponseHandler.handleResponse: already failed, ignoring response from [{}]", discoveryNode); logger.debug("PublishResponseHandler.handleResponse: already failed, ignoring response from [{}]", discoveryNode);
assert publicationCompletedIffAllTargetsInactiveOrTimedOut(); assert publicationCompletedIffAllTargetsInactiveOrCancelled();
return; return;
} }
@ -319,7 +319,7 @@ public abstract class Publication {
state = PublicationTargetState.WAITING_FOR_QUORUM; state = PublicationTargetState.WAITING_FOR_QUORUM;
handlePublishResponse(response.getPublishResponse()); handlePublishResponse(response.getPublishResponse());
assert publicationCompletedIffAllTargetsInactiveOrTimedOut(); assert publicationCompletedIffAllTargetsInactiveOrCancelled();
} }
@Override @Override
@ -330,7 +330,7 @@ public abstract class Publication {
assert ((TransportException) e).getRootCause() instanceof Exception; assert ((TransportException) e).getRootCause() instanceof Exception;
setFailed((Exception) exp.getRootCause()); setFailed((Exception) exp.getRootCause());
onPossibleCommitFailure(); onPossibleCommitFailure();
assert publicationCompletedIffAllTargetsInactiveOrTimedOut(); assert publicationCompletedIffAllTargetsInactiveOrCancelled();
} }
} }
@ -346,7 +346,7 @@ public abstract class Publication {
} }
setAppliedCommit(); setAppliedCommit();
onPossibleCompletion(); onPossibleCompletion();
assert publicationCompletedIffAllTargetsInactiveOrTimedOut(); assert publicationCompletedIffAllTargetsInactiveOrCancelled();
} }
@Override @Override
@ -357,7 +357,7 @@ public abstract class Publication {
assert ((TransportException) e).getRootCause() instanceof Exception; assert ((TransportException) e).getRootCause() instanceof Exception;
setFailed((Exception) exp.getRootCause()); setFailed((Exception) exp.getRootCause());
onPossibleCompletion(); onPossibleCompletion();
assert publicationCompletedIffAllTargetsInactiveOrTimedOut(); assert publicationCompletedIffAllTargetsInactiveOrCancelled();
} }
} }
} }

View File

@ -346,7 +346,7 @@ public class PublicationTests extends ESTestCase {
publication.pendingPublications.entrySet().stream().collect(shuffle()).forEach(e -> { publication.pendingPublications.entrySet().stream().collect(shuffle()).forEach(e -> {
if (e.getKey().equals(n2)) { if (e.getKey().equals(n2)) {
if (timeOut) { if (timeOut) {
publication.onTimeout(); publication.cancel("timed out");
} else { } else {
e.getValue().onFailure(new TransportException(new Exception("dummy failure"))); e.getValue().onFailure(new TransportException(new Exception("dummy failure")));
} }
@ -407,7 +407,7 @@ public class PublicationTests extends ESTestCase {
} }
}); });
publication.onTimeout(); publication.cancel("timed out");
assertTrue(publication.completed); assertTrue(publication.completed);
assertTrue(publication.committed); assertTrue(publication.committed);
assertEquals(committingNodes, ackListener.await(0L, TimeUnit.SECONDS)); assertEquals(committingNodes, ackListener.await(0L, TimeUnit.SECONDS));