Zen2: Add diff-based publishing (#35290)

Enables diff-based publishing, which is an optimization where only the changing parts of the cluster
state are published to the nodes in the cluster, falling back to full cluster state publishing if the
receiver does not have the previous cluster state.
This commit is contained in:
Yannick Welsch 2018-11-08 17:16:09 +01:00 committed by GitHub
parent 6885a7cb0f
commit c315ead0ac
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 444 additions and 64 deletions

View File

@ -43,6 +43,7 @@ import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting;
@ -56,6 +57,7 @@ import org.elasticsearch.discovery.DiscoveryStats;
import org.elasticsearch.discovery.HandshakingTransportAddressConnector;
import org.elasticsearch.discovery.PeerFinder;
import org.elasticsearch.discovery.UnicastConfiguredHostsResolver;
import org.elasticsearch.discovery.zen.PendingClusterStateStats;
import org.elasticsearch.discovery.zen.UnicastHostsProvider;
import org.elasticsearch.threadpool.ThreadPool.Names;
import org.elasticsearch.transport.TransportResponse.Empty;
@ -122,7 +124,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
private final Set<Consumer<Iterable<DiscoveryNode>>> discoveredNodesListeners = newConcurrentSet();
public Coordinator(String nodeName, Settings settings, ClusterSettings clusterSettings, TransportService transportService,
AllocationService allocationService, MasterService masterService,
NamedWriteableRegistry namedWriteableRegistry, AllocationService allocationService, MasterService masterService,
Supplier<CoordinationState.PersistedState> persistedStateSupplier, UnicastHostsProvider unicastHostsProvider,
ClusterApplier clusterApplier, Random random) {
super(settings);
@ -141,7 +143,8 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
configuredHostsResolver = new UnicastConfiguredHostsResolver(nodeName, settings, transportService, unicastHostsProvider);
this.peerFinder = new CoordinatorPeerFinder(settings, transportService,
new HandshakingTransportAddressConnector(settings, transportService), configuredHostsResolver);
this.publicationHandler = new PublicationTransportHandler(transportService, this::handlePublishRequest, this::handleApplyCommit);
this.publicationHandler = new PublicationTransportHandler(transportService, namedWriteableRegistry,
this::handlePublishRequest, this::handleApplyCommit);
this.leaderChecker = new LeaderChecker(settings, transportService, getOnLeaderFailure());
this.followersChecker = new FollowersChecker(settings, transportService, this::onFollowerCheckRequest, this::removeNode);
this.nodeRemovalExecutor = new NodeRemovalClusterStateTaskExecutor(allocationService, logger);
@ -472,8 +475,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
@Override
public DiscoveryStats stats() {
// TODO implement
return null;
return new DiscoveryStats(new PendingClusterStateStats(0, 0, 0), publicationHandler.stats());
}
@Override
@ -794,8 +796,10 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
getLocalNode() + " should be in published " + clusterState;
final PublishRequest publishRequest = coordinationState.get().handleClientValue(clusterState);
final CoordinatorPublication publication = new CoordinatorPublication(publishRequest, new ListenableFuture<>(), ackListener,
publishListener);
final PublicationTransportHandler.PublicationContext publicationContext =
publicationHandler.newPublicationContext(clusterChangedEvent);
final CoordinatorPublication publication = new CoordinatorPublication(publishRequest, publicationContext,
new ListenableFuture<>(), ackListener, publishListener);
currentPublication = Optional.of(publication);
transportService.getThreadPool().schedule(publishTimeout, Names.GENERIC, new Runnable() {
@ -937,14 +941,15 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
private final ListenableFuture<Void> localNodeAckEvent;
private final AckListener ackListener;
private final ActionListener<Void> publishListener;
private final PublicationTransportHandler.PublicationContext publicationContext;
// We may not have accepted our own state before receiving a join from another node, causing its join to be rejected (we cannot
// safely accept a join whose last-accepted term/version is ahead of ours), so store them up and process them at the end.
private final List<Join> receivedJoins = new ArrayList<>();
private boolean receivedJoinsProcessed;
CoordinatorPublication(PublishRequest publishRequest, ListenableFuture<Void> localNodeAckEvent, AckListener ackListener,
ActionListener<Void> publishListener) {
CoordinatorPublication(PublishRequest publishRequest, PublicationTransportHandler.PublicationContext publicationContext,
ListenableFuture<Void> localNodeAckEvent, AckListener ackListener, ActionListener<Void> publishListener) {
super(publishRequest,
new AckListener() {
@Override
@ -970,6 +975,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
},
transportService.getThreadPool()::relativeTimeInMillis);
this.publishRequest = publishRequest;
this.publicationContext = publicationContext;
this.localNodeAckEvent = localNodeAckEvent;
this.ackListener = ackListener;
this.publishListener = publishListener;
@ -1098,7 +1104,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
@Override
protected void sendPublishRequest(DiscoveryNode destination, PublishRequest publishRequest,
ActionListener<PublishWithJoinResponse> responseActionListener) {
publicationHandler.sendPublishRequest(destination, publishRequest, wrapWithMutex(responseActionListener));
publicationContext.sendPublishRequest(destination, publishRequest, wrapWithMutex(responseActionListener));
}
@Override

View File

@ -18,11 +18,31 @@
*/
package org.elasticsearch.cluster.coordination;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.Diff;
import org.elasticsearch.cluster.IncompatibleClusterStateVersionException;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.compress.Compressor;
import org.elasticsearch.common.compress.CompressorFactory;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.discovery.zen.PublishClusterStateStats;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.BytesTransportRequest;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportResponse;
@ -30,6 +50,10 @@ import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Function;
@ -39,15 +63,24 @@ public class PublicationTransportHandler extends AbstractComponent {
public static final String COMMIT_STATE_ACTION_NAME = "internal:cluster/coordination/commit_state";
private final TransportService transportService;
private final NamedWriteableRegistry namedWriteableRegistry;
private final Function<PublishRequest, PublishWithJoinResponse> handlePublishRequest;
public PublicationTransportHandler(TransportService transportService,
private AtomicReference<ClusterState> lastSeenClusterState = new AtomicReference<>();
private final AtomicLong fullClusterStateReceivedCount = new AtomicLong();
private final AtomicLong incompatibleClusterStateDiffReceivedCount = new AtomicLong();
private final AtomicLong compatibleClusterStateDiffReceivedCount = new AtomicLong();
public PublicationTransportHandler(TransportService transportService, NamedWriteableRegistry namedWriteableRegistry,
Function<PublishRequest, PublishWithJoinResponse> handlePublishRequest,
BiConsumer<ApplyCommitRequest, ActionListener<Void>> handleApplyCommit) {
this.transportService = transportService;
this.namedWriteableRegistry = namedWriteableRegistry;
this.handlePublishRequest = handlePublishRequest;
transportService.registerRequestHandler(PUBLISH_STATE_ACTION_NAME, ThreadPool.Names.GENERIC, false, false,
in -> new PublishRequest(in, transportService.getLocalNode()),
(request, channel, task) -> channel.sendResponse(handlePublishRequest.apply(request)));
transportService.registerRequestHandler(PUBLISH_STATE_ACTION_NAME, BytesTransportRequest::new, ThreadPool.Names.GENERIC,
false, false, (request, channel, task) -> handleIncomingPublishRequest(request, channel));
transportService.registerRequestHandler(COMMIT_STATE_ACTION_NAME, ThreadPool.Names.GENERIC, false, false,
ApplyCommitRequest::new,
@ -74,37 +107,228 @@ public class PublicationTransportHandler extends AbstractComponent {
}));
}
public void sendPublishRequest(DiscoveryNode destination, PublishRequest publishRequest,
ActionListener<PublishWithJoinResponse> responseActionListener) {
// TODO: serialize and compress state similar as in PublishClusterStateAction
TransportRequestOptions options = TransportRequestOptions.builder().withType(TransportRequestOptions.Type.STATE).build();
transportService.sendRequest(destination, PUBLISH_STATE_ACTION_NAME, publishRequest, options,
new TransportResponseHandler<PublishWithJoinResponse>() {
public PublishClusterStateStats stats() {
return new PublishClusterStateStats(
fullClusterStateReceivedCount.get(),
incompatibleClusterStateDiffReceivedCount.get(),
compatibleClusterStateDiffReceivedCount.get());
}
@Override
public PublishWithJoinResponse read(StreamInput in) throws IOException {
return new PublishWithJoinResponse(in);
}
public interface PublicationContext {
@Override
public void handleResponse(PublishWithJoinResponse response) {
responseActionListener.onResponse(response);
}
void sendPublishRequest(DiscoveryNode destination, PublishRequest publishRequest,
ActionListener<PublishWithJoinResponse> responseActionListener);
@Override
public void handleException(TransportException exp) {
responseActionListener.onFailure(exp);
}
}
@Override
public String executor() {
return ThreadPool.Names.GENERIC;
public PublicationContext newPublicationContext(ClusterChangedEvent clusterChangedEvent) {
final DiscoveryNodes nodes = clusterChangedEvent.state().nodes();
final ClusterState newState = clusterChangedEvent.state();
final ClusterState previousState = clusterChangedEvent.previousState();
final boolean sendFullVersion = clusterChangedEvent.previousState().getBlocks().disableStatePersistence();
final Map<Version, BytesReference> serializedStates = new HashMap<>();
final Map<Version, BytesReference> serializedDiffs = new HashMap<>();
// we build these early as a best effort not to commit in the case of error.
// sadly this is not water tight as it may that a failed diff based publishing to a node
// will cause a full serialization based on an older version, which may fail after the
// change has been committed.
buildDiffAndSerializeStates(clusterChangedEvent.state(), clusterChangedEvent.previousState(),
nodes, sendFullVersion, serializedStates, serializedDiffs);
return (destination, publishRequest, responseActionListener) -> {
assert publishRequest.getAcceptedState() == clusterChangedEvent.state() : "state got switched on us";
if (destination.equals(nodes.getLocalNode())) {
// the master needs the original non-serialized state as the cluster state contains some volatile information that we don't
// want to be replicated because it's not usable on another node (e.g. UnassignedInfo.unassignedTimeNanos) or because it's
// mostly just debugging info that would unnecessarily blow up CS updates (I think there was one in snapshot code).
// TODO: look into these and check how to get rid of them
transportService.getThreadPool().generic().execute(new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
// wrap into fake TransportException, as that's what we expect in Publication
responseActionListener.onFailure(new TransportException(e));
}
@Override
protected void doRun() {
responseActionListener.onResponse(handlePublishRequest.apply(publishRequest));
}
@Override
public String toString() {
return "publish to self of " + publishRequest;
}
});
} else if (sendFullVersion || !previousState.nodes().nodeExists(destination)) {
sendFullClusterState(newState, serializedStates, destination, responseActionListener);
} else {
sendClusterStateDiff(newState, serializedDiffs, serializedStates, destination, responseActionListener);
}
};
}
private void sendClusterStateToNode(ClusterState clusterState, BytesReference bytes, DiscoveryNode node,
ActionListener<PublishWithJoinResponse> responseActionListener, boolean sendDiffs,
Map<Version, BytesReference> serializedStates) {
try {
// -> no need to put a timeout on the options here, because we want the response to eventually be received
// and not log an error if it arrives after the timeout
// -> no need to compress, we already compressed the bytes
TransportRequestOptions options = TransportRequestOptions.builder()
.withType(TransportRequestOptions.Type.STATE).withCompress(false).build();
transportService.sendRequest(node, PUBLISH_STATE_ACTION_NAME,
new BytesTransportRequest(bytes, node.getVersion()),
options,
new TransportResponseHandler<PublishWithJoinResponse>() {
@Override
public PublishWithJoinResponse read(StreamInput in) throws IOException {
return new PublishWithJoinResponse(in);
}
@Override
public void handleResponse(PublishWithJoinResponse response) {
responseActionListener.onResponse(response);
}
@Override
public void handleException(TransportException exp) {
if (sendDiffs && exp.unwrapCause() instanceof IncompatibleClusterStateVersionException) {
logger.debug("resending full cluster state to node {} reason {}", node, exp.getDetailedMessage());
sendFullClusterState(clusterState, serializedStates, node, responseActionListener);
} else {
logger.debug(() -> new ParameterizedMessage("failed to send cluster state to {}", node), exp);
responseActionListener.onFailure(exp);
}
}
@Override
public String executor() {
return ThreadPool.Names.GENERIC;
}
});
} catch (Exception e) {
logger.warn(() -> new ParameterizedMessage("error sending cluster state to {}", node), e);
responseActionListener.onFailure(e);
}
}
private static void buildDiffAndSerializeStates(ClusterState clusterState, ClusterState previousState, DiscoveryNodes discoveryNodes,
boolean sendFullVersion, Map<Version, BytesReference> serializedStates,
Map<Version, BytesReference> serializedDiffs) {
Diff<ClusterState> diff = null;
for (DiscoveryNode node : discoveryNodes) {
if (node.equals(discoveryNodes.getLocalNode())) {
// ignore, see newPublicationContext
continue;
}
try {
if (sendFullVersion || !previousState.nodes().nodeExists(node)) {
serializedStates.putIfAbsent(node.getVersion(), serializeFullClusterState(clusterState, node.getVersion()));
} else {
// will send a diff
if (diff == null) {
diff = clusterState.diff(previousState);
}
serializedDiffs.putIfAbsent(node.getVersion(), serializeDiffClusterState(diff, node.getVersion()));
}
});
} catch (IOException e) {
throw new ElasticsearchException("failed to serialize cluster state for publishing to node {}", e, node);
}
}
}
private void sendFullClusterState(ClusterState clusterState, Map<Version, BytesReference> serializedStates,
DiscoveryNode node, ActionListener<PublishWithJoinResponse> responseActionListener) {
BytesReference bytes = serializedStates.get(node.getVersion());
if (bytes == null) {
try {
bytes = serializeFullClusterState(clusterState, node.getVersion());
serializedStates.put(node.getVersion(), bytes);
} catch (Exception e) {
logger.warn(() -> new ParameterizedMessage("failed to serialize cluster state before publishing it to node {}", node), e);
responseActionListener.onFailure(e);
return;
}
}
sendClusterStateToNode(clusterState, bytes, node, responseActionListener, false, serializedStates);
}
private void sendClusterStateDiff(ClusterState clusterState,
Map<Version, BytesReference> serializedDiffs, Map<Version, BytesReference> serializedStates,
DiscoveryNode node, ActionListener<PublishWithJoinResponse> responseActionListener) {
final BytesReference bytes = serializedDiffs.get(node.getVersion());
assert bytes != null : "failed to find serialized diff for node " + node + " of version [" + node.getVersion() + "]";
sendClusterStateToNode(clusterState, bytes, node, responseActionListener, true, serializedStates);
}
public static BytesReference serializeFullClusterState(ClusterState clusterState, Version nodeVersion) throws IOException {
final BytesStreamOutput bStream = new BytesStreamOutput();
try (StreamOutput stream = CompressorFactory.COMPRESSOR.streamOutput(bStream)) {
stream.setVersion(nodeVersion);
stream.writeBoolean(true);
clusterState.writeTo(stream);
}
return bStream.bytes();
}
public static BytesReference serializeDiffClusterState(Diff diff, Version nodeVersion) throws IOException {
final BytesStreamOutput bStream = new BytesStreamOutput();
try (StreamOutput stream = CompressorFactory.COMPRESSOR.streamOutput(bStream)) {
stream.setVersion(nodeVersion);
stream.writeBoolean(false);
diff.writeTo(stream);
}
return bStream.bytes();
}
private void handleIncomingPublishRequest(BytesTransportRequest request, TransportChannel channel) throws IOException {
final Compressor compressor = CompressorFactory.compressor(request.bytes());
StreamInput in = request.bytes().streamInput();
final ClusterState incomingState;
try {
if (compressor != null) {
in = compressor.streamInput(in);
}
in = new NamedWriteableAwareStreamInput(in, namedWriteableRegistry);
in.setVersion(request.version());
// If true we received full cluster state - otherwise diffs
if (in.readBoolean()) {
incomingState = ClusterState.readFrom(in, transportService.getLocalNode());
fullClusterStateReceivedCount.incrementAndGet();
logger.debug("received full cluster state version [{}] with size [{}]", incomingState.version(),
request.bytes().length());
lastSeenClusterState.set(incomingState);
} else {
final ClusterState lastSeen = lastSeenClusterState.get();
if (lastSeen == null) {
logger.debug("received diff for but don't have any local cluster state - requesting full state");
throw new IncompatibleClusterStateVersionException("have no local cluster state");
} else {
Diff<ClusterState> diff = ClusterState.readDiffFrom(in, lastSeen.nodes().getLocalNode());
incomingState = diff.apply(lastSeen); // might throw IncompatibleClusterStateVersionException
compatibleClusterStateDiffReceivedCount.incrementAndGet();
logger.debug("received diff cluster state version [{}] with uuid [{}], diff size [{}]",
incomingState.version(), incomingState.stateUUID(), request.bytes().length());
lastSeenClusterState.compareAndSet(lastSeen, incomingState);
}
}
} catch (IncompatibleClusterStateVersionException e) {
incompatibleClusterStateDiffReceivedCount.incrementAndGet();
throw e;
} catch (Exception e) {
logger.warn("unexpected error while deserializing an incoming cluster state", e);
throw e;
} finally {
IOUtils.close(in);
}
channel.sendResponse(handlePublishRequest.apply(new PublishRequest(incomingState)));
}
public void sendApplyCommit(DiscoveryNode destination, ApplyCommitRequest applyCommitRequest,
ActionListener<TransportResponse.Empty> responseActionListener) {
ActionListener<TransportResponse.Empty> responseActionListener) {
TransportRequestOptions options = TransportRequestOptions.builder().withType(TransportRequestOptions.Type.STATE).build();
transportService.sendRequest(destination, COMMIT_STATE_ACTION_NAME, applyCommitRequest, options,
new TransportResponseHandler<TransportResponse.Empty>() {
@ -130,5 +354,4 @@ public class PublicationTransportHandler extends AbstractComponent {
}
});
}
}

View File

@ -19,18 +19,14 @@
package org.elasticsearch.cluster.coordination;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.transport.TransportRequest;
import java.io.IOException;
import java.util.Objects;
/**
* Request which is used by the master node to publish cluster state changes.
* Actual serialization of this request is done by {@link PublicationTransportHandler}
*/
public class PublishRequest extends TransportRequest {
public class PublishRequest {
private final ClusterState acceptedState;
@ -38,17 +34,6 @@ public class PublishRequest extends TransportRequest {
this.acceptedState = acceptedState;
}
public PublishRequest(StreamInput in, DiscoveryNode localNode) throws IOException {
super(in);
acceptedState = ClusterState.readFrom(in, localNode);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
acceptedState.writeTo(out);
}
public ClusterState getAcceptedState() {
return acceptedState;
}

View File

@ -74,11 +74,11 @@ public class PublishClusterStateStats implements Writeable, ToXContentObject {
return builder;
}
long getFullClusterStateReceivedCount() { return fullClusterStateReceivedCount; }
public long getFullClusterStateReceivedCount() { return fullClusterStateReceivedCount; }
long getIncompatibleClusterStateDiffReceivedCount() { return incompatibleClusterStateDiffReceivedCount; }
public long getIncompatibleClusterStateDiffReceivedCount() { return incompatibleClusterStateDiffReceivedCount; }
long getCompatibleClusterStateDiffReceivedCount() { return compatibleClusterStateDiffReceivedCount; }
public long getCompatibleClusterStateDiffReceivedCount() { return compatibleClusterStateDiffReceivedCount; }
@Override
public String toString() {

View File

@ -41,6 +41,7 @@ import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.discovery.zen.PublishClusterStateStats;
import org.elasticsearch.discovery.zen.UnicastHostsProvider.HostsResolver;
import org.elasticsearch.indices.cluster.FakeThreadPoolMasterService;
import org.elasticsearch.test.ESTestCase;
@ -64,6 +65,7 @@ import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.function.UnaryOperator;
@ -762,6 +764,81 @@ public class CoordinatorTests extends ESTestCase {
cluster.stabilise();
}
public void testDiffBasedPublishing() {
final Cluster cluster = new Cluster(randomIntBetween(1, 5));
cluster.runRandomly();
cluster.stabilise();
final ClusterNode leader = cluster.getAnyLeader();
final long finalValue = randomLong();
final Map<ClusterNode, PublishClusterStateStats> prePublishStats = cluster.clusterNodes.stream().collect(
Collectors.toMap(Function.identity(), cn -> cn.coordinator.stats().getPublishStats()));
logger.info("--> submitting value [{}] to [{}]", finalValue, leader);
leader.submitValue(finalValue);
cluster.stabilise(DEFAULT_CLUSTER_STATE_UPDATE_DELAY);
final Map<ClusterNode, PublishClusterStateStats> postPublishStats = cluster.clusterNodes.stream().collect(
Collectors.toMap(Function.identity(), cn -> cn.coordinator.stats().getPublishStats()));
for (ClusterNode cn : cluster.clusterNodes) {
assertThat(value(cn.getLastAppliedClusterState()), is(finalValue));
if (cn == leader) {
// leader does not update publish stats as it's not using the serialized state
assertEquals(prePublishStats.get(cn).getFullClusterStateReceivedCount(),
postPublishStats.get(cn).getFullClusterStateReceivedCount());
assertEquals(prePublishStats.get(cn).getCompatibleClusterStateDiffReceivedCount(),
postPublishStats.get(cn).getCompatibleClusterStateDiffReceivedCount());
assertEquals(prePublishStats.get(cn).getIncompatibleClusterStateDiffReceivedCount(),
postPublishStats.get(cn).getIncompatibleClusterStateDiffReceivedCount());
} else {
// followers receive a diff
assertEquals(prePublishStats.get(cn).getFullClusterStateReceivedCount(),
postPublishStats.get(cn).getFullClusterStateReceivedCount());
assertEquals(prePublishStats.get(cn).getCompatibleClusterStateDiffReceivedCount() + 1,
postPublishStats.get(cn).getCompatibleClusterStateDiffReceivedCount());
assertEquals(prePublishStats.get(cn).getIncompatibleClusterStateDiffReceivedCount(),
postPublishStats.get(cn).getIncompatibleClusterStateDiffReceivedCount());
}
}
}
public void testJoiningNodeReceivesFullState() {
final Cluster cluster = new Cluster(randomIntBetween(1, 5));
cluster.runRandomly();
cluster.stabilise();
cluster.addNodesAndStabilise(1);
final ClusterNode newNode = cluster.clusterNodes.get(cluster.clusterNodes.size() - 1);
final PublishClusterStateStats newNodePublishStats = newNode.coordinator.stats().getPublishStats();
// initial cluster state send when joining
assertEquals(1L, newNodePublishStats.getFullClusterStateReceivedCount());
// possible follow-up reconfiguration was published as a diff
assertEquals(cluster.size() % 2, newNodePublishStats.getCompatibleClusterStateDiffReceivedCount());
assertEquals(0L, newNodePublishStats.getIncompatibleClusterStateDiffReceivedCount());
}
public void testIncompatibleDiffResendsFullState() {
final Cluster cluster = new Cluster(randomIntBetween(2, 5));
cluster.runRandomly();
cluster.stabilise();
final ClusterNode leader = cluster.getAnyLeader();
final ClusterNode follower = cluster.getAnyNodeExcept(leader);
follower.blackhole();
final PublishClusterStateStats prePublishStats = follower.coordinator.stats().getPublishStats();
leader.submitValue(randomLong());
cluster.runFor(DEFAULT_CLUSTER_STATE_UPDATE_DELAY + defaultMillis(PUBLISH_TIMEOUT_SETTING), "publish first state");
follower.heal();
leader.submitValue(randomLong());
cluster.stabilise(DEFAULT_CLUSTER_STATE_UPDATE_DELAY);
final PublishClusterStateStats postPublishStats = follower.coordinator.stats().getPublishStats();
assertEquals(prePublishStats.getFullClusterStateReceivedCount() + 1,
postPublishStats.getFullClusterStateReceivedCount());
assertEquals(prePublishStats.getCompatibleClusterStateDiffReceivedCount(),
postPublishStats.getCompatibleClusterStateDiffReceivedCount());
assertEquals(prePublishStats.getIncompatibleClusterStateDiffReceivedCount() + 1,
postPublishStats.getIncompatibleClusterStateDiffReceivedCount());
}
private static long defaultMillis(Setting<TimeValue> setting) {
return setting.get(Settings.EMPTY).millis() + Cluster.DEFAULT_DELAY_VARIABILITY;
}
@ -1294,7 +1371,7 @@ public class CoordinatorTests extends ESTestCase {
transportService = mockTransport.createTransportService(
settings, deterministicTaskQueue.getThreadPool(runnable -> onNode(localNode, runnable)), NOOP_TRANSPORT_INTERCEPTOR,
a -> localNode, null, emptySet());
coordinator = new Coordinator("test_node", settings, clusterSettings, transportService,
coordinator = new Coordinator("test_node", settings, clusterSettings, transportService, writableRegistry(),
ESAllocationTestCase.createAllocationService(Settings.EMPTY), masterService, this::getPersistedState,
Cluster.this::provideUnicastHosts, clusterApplier, Randomness.get());
masterService.setClusterStatePublisher(coordinator);

View File

@ -70,11 +70,10 @@ public class MessagesTests extends ESTestCase {
});
}
public void testPublishRequestEqualsHashCodeSerialization() {
public void testPublishRequestEqualsHashCode() {
PublishRequest initialPublishRequest = new PublishRequest(randomClusterState());
EqualsHashCodeTestUtils.checkEqualsAndHashCode(initialPublishRequest,
publishRequest -> copyWriteable(publishRequest, writableRegistry(),
in -> new PublishRequest(in, publishRequest.getAcceptedState().nodes().getLocalNode())),
publishRequest -> new PublishRequest(publishRequest.getAcceptedState()),
in -> new PublishRequest(randomClusterState()));
}

View File

@ -163,7 +163,7 @@ public class NodeJoinTests extends ESTestCase {
x -> initialState.nodes().getLocalNode(),
clusterSettings, Collections.emptySet());
coordinator = new Coordinator("test_node", Settings.EMPTY, clusterSettings,
transportService,
transportService, writableRegistry(),
ESAllocationTestCase.createAllocationService(Settings.EMPTY),
masterService,
() -> new InMemoryPersistedState(term, initialState), r -> emptyList(),

View File

@ -0,0 +1,89 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.coordination;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.Diff;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.node.Node;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.transport.CapturingTransport;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.util.Collections;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.instanceOf;
public class PublicationTransportHandlerTests extends ESTestCase {
public void testDiffSerializationFailure() {
DeterministicTaskQueue deterministicTaskQueue =
new DeterministicTaskQueue(Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), "test").build(), random());
final ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
final DiscoveryNode localNode = new DiscoveryNode("localNode", buildNewFakeTransportAddress(), Version.CURRENT);
final TransportService transportService = new CapturingTransport().createTransportService(Settings.EMPTY,
deterministicTaskQueue.getThreadPool(),
TransportService.NOOP_TRANSPORT_INTERCEPTOR,
x -> localNode,
clusterSettings, Collections.emptySet());
final PublicationTransportHandler handler = new PublicationTransportHandler(transportService,
writableRegistry(), pu -> null, (pu, l) -> {});
transportService.start();
transportService.acceptIncomingRequests();
final DiscoveryNode otherNode = new DiscoveryNode("otherNode", buildNewFakeTransportAddress(), Version.CURRENT);
final ClusterState clusterState = CoordinationStateTests.clusterState(2L, 1L,
DiscoveryNodes.builder().add(localNode).add(otherNode).localNodeId(localNode.getId()).build(),
ClusterState.VotingConfiguration.EMPTY_CONFIG, ClusterState.VotingConfiguration.EMPTY_CONFIG, 0L);
final ClusterState unserializableClusterState = new ClusterState(clusterState.term(), clusterState.version(),
clusterState.stateUUID(), clusterState) {
@Override
public Diff<ClusterState> diff(ClusterState previousState) {
return new Diff<ClusterState>() {
@Override
public ClusterState apply(ClusterState part) {
fail("this diff shouldn't be applied");
return part;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
throw new IOException("Simulated failure of diff serialization");
}
};
}
};
ElasticsearchException e = expectThrows(ElasticsearchException.class, () ->
handler.newPublicationContext(new ClusterChangedEvent("test", unserializableClusterState, clusterState)));
assertNotNull(e.getCause());
assertThat(e.getCause(), instanceOf(IOException.class));
assertThat(e.getCause().getMessage(), containsString("Simulated failure of diff serialization"));
}
}

View File

@ -86,8 +86,9 @@ public class TestZenDiscovery extends ZenDiscovery {
() -> new InMemoryPersistedState(0L, ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.get(settings))
.nodes(DiscoveryNodes.builder().add(transportService.getLocalNode())
.localNodeId(transportService.getLocalNode().getId()).build()).build());
return new Coordinator("test_node", fixedSettings, clusterSettings, transportService, allocationService, masterService,
persistedStateSupplier, hostsProvider, clusterApplier, new Random(Randomness.get().nextLong()));
return new Coordinator("test_node", fixedSettings, clusterSettings, transportService, namedWriteableRegistry,
allocationService, masterService, persistedStateSupplier, hostsProvider, clusterApplier,
new Random(Randomness.get().nextLong()));
} else {
return new TestZenDiscovery(fixedSettings, threadPool, transportService, namedWriteableRegistry, masterService,
clusterApplier, clusterSettings, hostsProvider, allocationService);