Remove anonymous PublicationContext implementation (#58412)

Today the `PublicationContext` interface has a single anonymous
implementation, and `PublicationTransportHandler` has various methods
that take the variables that this anonymous class captures. This commit
refactors this into a proper class with proper fields and moves the
relevant methods onto this class.

Backport of #58405 to 7.x.
This commit is contained in:
David Turner 2020-06-23 11:13:23 +01:00 committed by GitHub
parent 519d1278e2
commit 256b660f0a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 235 additions and 231 deletions

View File

@ -72,14 +72,14 @@ public class PublicationTransportHandler {
private final NamedWriteableRegistry namedWriteableRegistry;
private final Function<PublishRequest, PublishWithJoinResponse> handlePublishRequest;
private AtomicReference<ClusterState> lastSeenClusterState = new AtomicReference<>();
private final AtomicReference<ClusterState> lastSeenClusterState = new AtomicReference<>();
// the master needs the original non-serialized state as the cluster state contains some volatile information that we
// don't want to be replicated because it's not usable on another node (e.g. UnassignedInfo.unassignedTimeNanos) or
// because it's mostly just debugging info that would unnecessarily blow up CS updates (I think there was one in
// snapshot code).
// TODO: look into these and check how to get rid of them
private AtomicReference<PublishRequest> currentPublishRequestToSelf = new AtomicReference<>();
private final AtomicReference<PublishRequest> currentPublishRequestToSelf = new AtomicReference<>();
private final AtomicLong fullClusterStateReceivedCount = new AtomicLong();
private final AtomicLong incompatibleClusterStateDiffReceivedCount = new AtomicLong();
@ -155,235 +155,6 @@ public class PublicationTransportHandler {
compatibleClusterStateDiffReceivedCount.get());
}
public interface PublicationContext {
void sendPublishRequest(DiscoveryNode destination, PublishRequest publishRequest,
ActionListener<PublishWithJoinResponse> responseActionListener);
void sendApplyCommit(DiscoveryNode destination, ApplyCommitRequest applyCommitRequest,
ActionListener<TransportResponse.Empty> responseActionListener);
}
public PublicationContext newPublicationContext(ClusterChangedEvent clusterChangedEvent) {
final DiscoveryNodes nodes = clusterChangedEvent.state().nodes();
final ClusterState newState = clusterChangedEvent.state();
final ClusterState previousState = clusterChangedEvent.previousState();
final boolean sendFullVersion = clusterChangedEvent.previousState().getBlocks().disableStatePersistence();
final Map<Version, BytesReference> serializedStates = new HashMap<>();
final Map<Version, BytesReference> serializedDiffs = new HashMap<>();
// we build these early as a best effort not to commit in the case of error.
// sadly this is not water tight as it may that a failed diff based publishing to a node
// will cause a full serialization based on an older version, which may fail after the
// change has been committed.
buildDiffAndSerializeStates(clusterChangedEvent.state(), clusterChangedEvent.previousState(),
nodes, sendFullVersion, serializedStates, serializedDiffs);
return new PublicationContext() {
@Override
public void sendPublishRequest(DiscoveryNode destination, PublishRequest publishRequest,
ActionListener<PublishWithJoinResponse> originalListener) {
assert publishRequest.getAcceptedState() == clusterChangedEvent.state() : "state got switched on us";
assert transportService.getThreadPool().getThreadContext().isSystemContext();
final ActionListener<PublishWithJoinResponse> responseActionListener;
if (destination.equals(nodes.getLocalNode())) {
// if publishing to self, use original request instead (see currentPublishRequestToSelf for explanation)
final PublishRequest previousRequest = currentPublishRequestToSelf.getAndSet(publishRequest);
// we might override an in-flight publication to self in case where we failed as master and became master again,
// and the new publication started before the previous one completed (which fails anyhow because of higher current term)
assert previousRequest == null || previousRequest.getAcceptedState().term() < publishRequest.getAcceptedState().term();
responseActionListener = new ActionListener<PublishWithJoinResponse>() {
@Override
public void onResponse(PublishWithJoinResponse publishWithJoinResponse) {
currentPublishRequestToSelf.compareAndSet(publishRequest, null); // only clean-up our mess
originalListener.onResponse(publishWithJoinResponse);
}
@Override
public void onFailure(Exception e) {
currentPublishRequestToSelf.compareAndSet(publishRequest, null); // only clean-up our mess
originalListener.onFailure(e);
}
};
} else {
responseActionListener = originalListener;
}
if (sendFullVersion || !previousState.nodes().nodeExists(destination)) {
logger.trace("sending full cluster state version {} to {}", newState.version(), destination);
PublicationTransportHandler.this.sendFullClusterState(newState, serializedStates, destination, responseActionListener);
} else {
logger.trace("sending cluster state diff for version {} to {}", newState.version(), destination);
PublicationTransportHandler.this.sendClusterStateDiff(newState, serializedDiffs, serializedStates, destination,
responseActionListener);
}
}
@Override
public void sendApplyCommit(DiscoveryNode destination, ApplyCommitRequest applyCommitRequest,
ActionListener<TransportResponse.Empty> responseActionListener) {
assert transportService.getThreadPool().getThreadContext().isSystemContext();
final String actionName;
final TransportRequest transportRequest;
if (Coordinator.isZen1Node(destination)) {
actionName = PublishClusterStateAction.COMMIT_ACTION_NAME;
transportRequest = new PublishClusterStateAction.CommitClusterStateRequest(newState.stateUUID());
} else {
actionName = COMMIT_STATE_ACTION_NAME;
transportRequest = applyCommitRequest;
}
transportService.sendRequest(destination, actionName, transportRequest, stateRequestOptions,
new TransportResponseHandler<TransportResponse.Empty>() {
@Override
public TransportResponse.Empty read(StreamInput in) {
return TransportResponse.Empty.INSTANCE;
}
@Override
public void handleResponse(TransportResponse.Empty response) {
responseActionListener.onResponse(response);
}
@Override
public void handleException(TransportException exp) {
responseActionListener.onFailure(exp);
}
@Override
public String executor() {
return ThreadPool.Names.GENERIC;
}
});
}
};
}
private void sendClusterStateToNode(ClusterState clusterState, BytesReference bytes, DiscoveryNode node,
ActionListener<PublishWithJoinResponse> responseActionListener, boolean sendDiffs,
Map<Version, BytesReference> serializedStates) {
try {
final BytesTransportRequest request = new BytesTransportRequest(bytes, node.getVersion());
final Consumer<TransportException> transportExceptionHandler = exp -> {
if (sendDiffs && exp.unwrapCause() instanceof IncompatibleClusterStateVersionException) {
logger.debug("resending full cluster state to node {} reason {}", node, exp.getDetailedMessage());
sendFullClusterState(clusterState, serializedStates, node, responseActionListener);
} else {
logger.debug(() -> new ParameterizedMessage("failed to send cluster state to {}", node), exp);
responseActionListener.onFailure(exp);
}
};
final TransportResponseHandler<PublishWithJoinResponse> publishWithJoinResponseHandler =
new TransportResponseHandler<PublishWithJoinResponse>() {
@Override
public PublishWithJoinResponse read(StreamInput in) throws IOException {
return new PublishWithJoinResponse(in);
}
@Override
public void handleResponse(PublishWithJoinResponse response) {
responseActionListener.onResponse(response);
}
@Override
public void handleException(TransportException exp) {
transportExceptionHandler.accept(exp);
}
@Override
public String executor() {
return ThreadPool.Names.GENERIC;
}
};
final String actionName;
final TransportResponseHandler<?> transportResponseHandler;
if (Coordinator.isZen1Node(node)) {
actionName = PublishClusterStateAction.SEND_ACTION_NAME;
transportResponseHandler = publishWithJoinResponseHandler.wrap(empty -> new PublishWithJoinResponse(
new PublishResponse(clusterState.term(), clusterState.version()),
Optional.of(new Join(node, transportService.getLocalNode(), clusterState.term(), clusterState.term(),
clusterState.version()))), in -> TransportResponse.Empty.INSTANCE);
} else {
actionName = PUBLISH_STATE_ACTION_NAME;
transportResponseHandler = publishWithJoinResponseHandler;
}
transportService.sendRequest(node, actionName, request, stateRequestOptions, transportResponseHandler);
} catch (Exception e) {
logger.warn(() -> new ParameterizedMessage("error sending cluster state to {}", node), e);
responseActionListener.onFailure(e);
}
}
private static void buildDiffAndSerializeStates(ClusterState clusterState, ClusterState previousState, DiscoveryNodes discoveryNodes,
boolean sendFullVersion, Map<Version, BytesReference> serializedStates,
Map<Version, BytesReference> serializedDiffs) {
Diff<ClusterState> diff = null;
for (DiscoveryNode node : discoveryNodes) {
try {
if (sendFullVersion || !previousState.nodes().nodeExists(node)) {
if (serializedStates.containsKey(node.getVersion()) == false) {
serializedStates.put(node.getVersion(), serializeFullClusterState(clusterState, node.getVersion()));
}
} else {
// will send a diff
if (diff == null) {
diff = clusterState.diff(previousState);
}
if (serializedDiffs.containsKey(node.getVersion()) == false) {
serializedDiffs.put(node.getVersion(), serializeDiffClusterState(diff, node.getVersion()));
}
}
} catch (IOException e) {
throw new ElasticsearchException("failed to serialize cluster state for publishing to node {}", e, node);
}
}
}
private void sendFullClusterState(ClusterState clusterState, Map<Version, BytesReference> serializedStates,
DiscoveryNode node, ActionListener<PublishWithJoinResponse> responseActionListener) {
BytesReference bytes = serializedStates.get(node.getVersion());
if (bytes == null) {
try {
bytes = serializeFullClusterState(clusterState, node.getVersion());
serializedStates.put(node.getVersion(), bytes);
} catch (Exception e) {
logger.warn(() -> new ParameterizedMessage("failed to serialize cluster state before publishing it to node {}", node), e);
responseActionListener.onFailure(e);
return;
}
}
sendClusterStateToNode(clusterState, bytes, node, responseActionListener, false, serializedStates);
}
private void sendClusterStateDiff(ClusterState clusterState,
Map<Version, BytesReference> serializedDiffs, Map<Version, BytesReference> serializedStates,
DiscoveryNode node, ActionListener<PublishWithJoinResponse> responseActionListener) {
final BytesReference bytes = serializedDiffs.get(node.getVersion());
assert bytes != null : "failed to find serialized diff for node " + node + " of version [" + node.getVersion() + "]";
sendClusterStateToNode(clusterState, bytes, node, responseActionListener, true, serializedStates);
}
public static BytesReference serializeFullClusterState(ClusterState clusterState, Version nodeVersion) throws IOException {
final BytesStreamOutput bStream = new BytesStreamOutput();
try (StreamOutput stream = CompressorFactory.COMPRESSOR.streamOutput(bStream)) {
stream.setVersion(nodeVersion);
stream.writeBoolean(true);
clusterState.writeTo(stream);
}
return bStream.bytes();
}
public static BytesReference serializeDiffClusterState(Diff diff, Version nodeVersion) throws IOException {
final BytesStreamOutput bStream = new BytesStreamOutput();
try (StreamOutput stream = CompressorFactory.COMPRESSOR.streamOutput(bStream)) {
stream.setVersion(nodeVersion);
stream.writeBoolean(false);
diff.writeTo(stream);
}
return bStream.bytes();
}
private PublishWithJoinResponse handleIncomingPublishRequest(BytesTransportRequest request) throws IOException {
final Compressor compressor = CompressorFactory.compressor(request.bytes());
StreamInput in = request.bytes().streamInput();
@ -451,4 +222,237 @@ public class PublicationTransportHandler {
}
return handlePublishRequest.apply(new PublishRequest(incomingState));
}
public PublicationContext newPublicationContext(ClusterChangedEvent clusterChangedEvent) {
final PublicationContext publicationContext = new PublicationContext(clusterChangedEvent);
// Build the serializations we expect to need now, early in the process, so that an error during serialization fails the publication
// straight away. This isn't watertight since we send diffs on a best-effort basis and may fall back to sending a full state (and
// therefore serializing it) if the diff-based publication fails.
publicationContext.buildDiffAndSerializeStates();
return publicationContext;
}
private static BytesReference serializeFullClusterState(ClusterState clusterState, Version nodeVersion) throws IOException {
final BytesStreamOutput bStream = new BytesStreamOutput();
try (StreamOutput stream = CompressorFactory.COMPRESSOR.streamOutput(bStream)) {
stream.setVersion(nodeVersion);
stream.writeBoolean(true);
clusterState.writeTo(stream);
}
final BytesReference serializedState = bStream.bytes();
logger.trace("serialized full cluster state version [{}] for node version [{}] with size [{}]",
clusterState.version(), nodeVersion, serializedState.length());
return serializedState;
}
private static BytesReference serializeDiffClusterState(Diff<ClusterState> diff, Version nodeVersion) throws IOException {
final BytesStreamOutput bStream = new BytesStreamOutput();
try (StreamOutput stream = CompressorFactory.COMPRESSOR.streamOutput(bStream)) {
stream.setVersion(nodeVersion);
stream.writeBoolean(false);
diff.writeTo(stream);
}
return bStream.bytes();
}
/**
* Publishing a cluster state typically involves sending the same cluster state (or diff) to every node, so the work of diffing,
* serializing, and compressing the state can be done once and the results shared across publish requests. The
* {@code PublicationContext} implements this sharing.
*/
public class PublicationContext {
private final DiscoveryNodes discoveryNodes;
private final ClusterState newState;
private final ClusterState previousState;
private final boolean sendFullVersion;
private final Map<Version, BytesReference> serializedStates = new HashMap<>();
private final Map<Version, BytesReference> serializedDiffs = new HashMap<>();
PublicationContext(ClusterChangedEvent clusterChangedEvent) {
discoveryNodes = clusterChangedEvent.state().nodes();
newState = clusterChangedEvent.state();
previousState = clusterChangedEvent.previousState();
sendFullVersion = previousState.getBlocks().disableStatePersistence();
}
void buildDiffAndSerializeStates() {
Diff<ClusterState> diff = null;
for (DiscoveryNode node : discoveryNodes) {
try {
if (sendFullVersion || previousState.nodes().nodeExists(node) == false) {
if (serializedStates.containsKey(node.getVersion()) == false) {
serializedStates.put(node.getVersion(), serializeFullClusterState(newState, node.getVersion()));
}
} else {
// will send a diff
if (diff == null) {
diff = newState.diff(previousState);
}
if (serializedDiffs.containsKey(node.getVersion()) == false) {
final BytesReference serializedDiff = serializeDiffClusterState(diff, node.getVersion());
serializedDiffs.put(node.getVersion(), serializedDiff);
logger.trace("serialized cluster state diff for version [{}] in for node version [{}] with size [{}]",
newState.version(), node.getVersion(), serializedDiff.length());
}
}
} catch (IOException e) {
throw new ElasticsearchException("failed to serialize cluster state for publishing to node {}", e, node);
}
}
}
public void sendPublishRequest(DiscoveryNode destination, PublishRequest publishRequest,
ActionListener<PublishWithJoinResponse> listener) {
assert publishRequest.getAcceptedState() == newState : "state got switched on us";
assert transportService.getThreadPool().getThreadContext().isSystemContext();
final ActionListener<PublishWithJoinResponse> responseActionListener;
if (destination.equals(discoveryNodes.getLocalNode())) {
// if publishing to self, use original request instead (see currentPublishRequestToSelf for explanation)
final PublishRequest previousRequest = currentPublishRequestToSelf.getAndSet(publishRequest);
// we might override an in-flight publication to self in case where we failed as master and became master again,
// and the new publication started before the previous one completed (which fails anyhow because of higher current term)
assert previousRequest == null || previousRequest.getAcceptedState().term() < publishRequest.getAcceptedState().term();
responseActionListener = new ActionListener<PublishWithJoinResponse>() {
@Override
public void onResponse(PublishWithJoinResponse publishWithJoinResponse) {
currentPublishRequestToSelf.compareAndSet(publishRequest, null); // only clean-up our mess
listener.onResponse(publishWithJoinResponse);
}
@Override
public void onFailure(Exception e) {
currentPublishRequestToSelf.compareAndSet(publishRequest, null); // only clean-up our mess
listener.onFailure(e);
}
};
} else {
responseActionListener = listener;
}
if (sendFullVersion || previousState.nodes().nodeExists(destination) == false) {
logger.trace("sending full cluster state version [{}] to [{}]", newState.version(), destination);
sendFullClusterState(destination, responseActionListener);
} else {
logger.trace("sending cluster state diff for version [{}] to [{}]", newState.version(), destination);
sendClusterStateDiff(destination, responseActionListener);
}
}
public void sendApplyCommit(DiscoveryNode destination, ApplyCommitRequest applyCommitRequest,
ActionListener<TransportResponse.Empty> listener) {
assert transportService.getThreadPool().getThreadContext().isSystemContext();
final String actionName;
final TransportRequest transportRequest;
if (Coordinator.isZen1Node(destination)) {
actionName = PublishClusterStateAction.COMMIT_ACTION_NAME;
transportRequest = new PublishClusterStateAction.CommitClusterStateRequest(newState.stateUUID());
} else {
actionName = COMMIT_STATE_ACTION_NAME;
transportRequest = applyCommitRequest;
}
transportService.sendRequest(destination, actionName, transportRequest, stateRequestOptions,
new TransportResponseHandler<TransportResponse.Empty>() {
@Override
public TransportResponse.Empty read(StreamInput in) {
return TransportResponse.Empty.INSTANCE;
}
@Override
public void handleResponse(TransportResponse.Empty response) {
listener.onResponse(response);
}
@Override
public void handleException(TransportException exp) {
listener.onFailure(exp);
}
@Override
public String executor() {
return ThreadPool.Names.GENERIC;
}
});
}
private void sendFullClusterState(DiscoveryNode destination, ActionListener<PublishWithJoinResponse> listener) {
BytesReference bytes = serializedStates.get(destination.getVersion());
if (bytes == null) {
try {
bytes = serializeFullClusterState(newState, destination.getVersion());
serializedStates.put(destination.getVersion(), bytes);
} catch (Exception e) {
logger.warn(() -> new ParameterizedMessage(
"failed to serialize cluster state before publishing it to node {}", destination), e);
listener.onFailure(e);
return;
}
}
sendClusterState(destination, bytes, false, listener);
}
private void sendClusterStateDiff(DiscoveryNode destination, ActionListener<PublishWithJoinResponse> listener) {
final BytesReference bytes = serializedDiffs.get(destination.getVersion());
assert bytes != null
: "failed to find serialized diff for node " + destination + " of version [" + destination.getVersion() + "]";
sendClusterState(destination, bytes, true, listener);
}
private void sendClusterState(DiscoveryNode destination, BytesReference bytes, boolean retryWithFullClusterStateOnFailure,
ActionListener<PublishWithJoinResponse> listener) {
try {
final BytesTransportRequest request = new BytesTransportRequest(bytes, destination.getVersion());
final Consumer<TransportException> transportExceptionHandler = exp -> {
if (retryWithFullClusterStateOnFailure && exp.unwrapCause() instanceof IncompatibleClusterStateVersionException) {
logger.debug("resending full cluster state to node {} reason {}", destination, exp.getDetailedMessage());
sendFullClusterState(destination, listener);
} else {
logger.debug(() -> new ParameterizedMessage("failed to send cluster state to {}", destination), exp);
listener.onFailure(exp);
}
};
final TransportResponseHandler<PublishWithJoinResponse> responseHandler =
new TransportResponseHandler<PublishWithJoinResponse>() {
@Override
public PublishWithJoinResponse read(StreamInput in) throws IOException {
return new PublishWithJoinResponse(in);
}
@Override
public void handleResponse(PublishWithJoinResponse response) {
listener.onResponse(response);
}
@Override
public void handleException(TransportException exp) {
transportExceptionHandler.accept(exp);
}
@Override
public String executor() {
return ThreadPool.Names.GENERIC;
}
};
final String actionName;
final TransportResponseHandler<?> transportResponseHandler;
if (Coordinator.isZen1Node(destination)) {
actionName = PublishClusterStateAction.SEND_ACTION_NAME;
transportResponseHandler = responseHandler.wrap(empty -> new PublishWithJoinResponse(
new PublishResponse(newState.term(), newState.version()),
Optional.of(new Join(destination, transportService.getLocalNode(), newState.term(), newState.term(),
newState.version()))), in -> TransportResponse.Empty.INSTANCE);
} else {
actionName = PUBLISH_STATE_ACTION_NAME;
transportResponseHandler = responseHandler;
}
transportService.sendRequest(destination, actionName, request, stateRequestOptions, transportResponseHandler);
} catch (Exception e) {
logger.warn(() -> new ParameterizedMessage("error sending cluster state to {}", destination), e);
listener.onFailure(e);
}
}
}
}