Handle serialization exceptions during publication (#41781)

Today if an exception is thrown when serializing a cluster state during
publication then the master enters a poisoned state where it cannot publish any
more cluster states, but nor does it stand down as master, yielding repeated
exceptions of the following form:

```
failed to commit cluster state version [12345]
org.elasticsearch.cluster.coordination.FailedToCommitClusterStateException: publishing failed
        at org.elasticsearch.cluster.coordination.Coordinator.publish(Coordinator.java:1045) ~[elasticsearch-7.0.0.jar:7.0.0]
        at org.elasticsearch.cluster.service.MasterService.publish(MasterService.java:252) [elasticsearch-7.0.0.jar:7.0.0]
        at org.elasticsearch.cluster.service.MasterService.runTasks(MasterService.java:238) [elasticsearch-7.0.0.jar:7.0.0]
        at org.elasticsearch.cluster.service.MasterService$Batcher.run(MasterService.java:142) [elasticsearch-7.0.0.jar:7.0.0]
        at org.elasticsearch.cluster.service.TaskBatcher.runIfNotProcessed(TaskBatcher.java:150) [elasticsearch-7.0.0.jar:7.0.0]
        at org.elasticsearch.cluster.service.TaskBatcher$BatchedTask.run(TaskBatcher.java:188) [elasticsearch-7.0.0.jar:7.0.0]
        at org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingRunnable.run(ThreadContext.java:681) [elasticsearch-7.0.0.jar:7.0.0]
        at org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor$TieBreakingPrioritizedRunnable.runAndClean(PrioritizedEsThreadPoolExecutor.java:252) [elasticsearch-7.0.0.jar:7.0.0]
        at org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor$TieBreakingPrioritizedRunnable.run(PrioritizedEsThreadPoolExecutor.java:215) [elasticsearch-7.0.0.jar:7.0.0]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_144]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_144]
        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_144]
Caused by: org.elasticsearch.cluster.coordination.CoordinationStateRejectedException: cannot start publishing next value before accepting previous one
        at org.elasticsearch.cluster.coordination.CoordinationState.handleClientValue(CoordinationState.java:280) ~[elasticsearch-7.0.0.jar:7.0.0]
        at org.elasticsearch.cluster.coordination.Coordinator.publish(Coordinator.java:1030) ~[elasticsearch-7.0.0.jar:7.0.0]
        ... 11 more
```

This is because it already created the publication request using
`CoordinationState#handleClientValue()` but then it fails before accepting it.
This commit addresses this by performing the serialization before calling
`handleClientValue()`.

Relates #41090, which was the source of such a serialization exception.
This commit is contained in:
David Turner 2019-05-07 17:39:07 +01:00
parent 4cca1e8fff
commit 935f70c05e
2 changed files with 68 additions and 2 deletions

View File

@ -502,7 +502,6 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
});
}
private void processJoinRequest(JoinRequest joinRequest, JoinHelper.JoinCallback joinCallback) {
final Optional<Join> optionalJoin = joinRequest.getOptionalJoin();
synchronized (mutex) {
@ -1027,9 +1026,10 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
assert getLocalNode().equals(clusterState.getNodes().get(getLocalNode().getId())) :
getLocalNode() + " should be in published " + clusterState;
final PublishRequest publishRequest = coordinationState.get().handleClientValue(clusterState);
final PublicationTransportHandler.PublicationContext publicationContext =
publicationHandler.newPublicationContext(clusterChangedEvent);
final PublishRequest publishRequest = coordinationState.get().handleClientValue(clusterState);
final CoordinatorPublication publication = new CoordinatorPublication(publishRequest, publicationContext,
new ListenableFuture<>(), ackListener, publishListener);
currentPublication = Optional.of(publication);

View File

@ -26,6 +26,7 @@ import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.AbstractDiffable;
import org.elasticsearch.cluster.ClusterModule;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateTaskListener;
@ -54,6 +55,7 @@ import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting;
@ -63,6 +65,7 @@ import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.discovery.DiscoveryModule;
import org.elasticsearch.discovery.SeedHostsProvider.HostsResolver;
import org.elasticsearch.discovery.zen.PublishClusterStateStats;
@ -94,6 +97,7 @@ import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
@ -136,6 +140,7 @@ import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.not;
@ -1150,6 +1155,67 @@ public class CoordinatorTests extends ESTestCase {
cluster.stabilise();
}
private static class BrokenCustom extends AbstractDiffable<ClusterState.Custom> implements ClusterState.Custom {
static final String EXCEPTION_MESSAGE = "simulated";
@Override
public String getWriteableName() {
return "broken";
}
@Override
public Version getMinimalSupportedVersion() {
return Version.V_EMPTY;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
throw new ElasticsearchException(EXCEPTION_MESSAGE);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
return builder;
}
}
public void testClusterRecoversAfterExceptionDuringSerialization() {
final Cluster cluster = new Cluster(randomIntBetween(2, 5)); // 1-node cluster doesn't do any serialization
cluster.runRandomly();
cluster.stabilise();
final ClusterNode leader1 = cluster.getAnyLeader();
logger.info("--> submitting broken task to [{}]", leader1);
final AtomicBoolean failed = new AtomicBoolean();
leader1.submitUpdateTask("broken-task",
cs -> ClusterState.builder(cs).putCustom("broken", new BrokenCustom()).build(),
(source, e) -> {
assertThat(e.getCause(), instanceOf(ElasticsearchException.class));
assertThat(e.getCause().getMessage(), equalTo(BrokenCustom.EXCEPTION_MESSAGE));
failed.set(true);
});
cluster.runFor(DEFAULT_DELAY_VARIABILITY + 1, "processing broken task");
assertTrue(failed.get());
cluster.stabilise();
final ClusterNode leader2 = cluster.getAnyLeader();
long finalValue = randomLong();
logger.info("--> submitting value [{}] to [{}]", finalValue, leader2);
leader2.submitValue(finalValue);
cluster.stabilise(DEFAULT_CLUSTER_STATE_UPDATE_DELAY);
for (final ClusterNode clusterNode : cluster.clusterNodes) {
final String nodeId = clusterNode.getId();
final ClusterState appliedState = clusterNode.getLastAppliedClusterState();
assertThat(nodeId + " has the applied value", value(appliedState), is(finalValue));
}
}
private static long defaultMillis(Setting<TimeValue> setting) {
return setting.get(Settings.EMPTY).millis() + Cluster.DEFAULT_DELAY_VARIABILITY;
}