diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java b/core/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java index 32867fae78d..adddfb1a376 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java @@ -140,8 +140,7 @@ public class PublishClusterStateAction extends AbstractComponent { throw t; } catch (Throwable t) { // try to fail committing, in cause it's still on going - sendingController.markAsFailed("unexpected error [" + t.getMessage() + "]"); - if (sendingController.isCommitted() == false) { + if (sendingController.markAsFailed("unexpected error [" + t.getMessage() + "]")) { // signal the change should be rejected throw new Discovery.FailedToCommitClusterStateException("unexpected error [{}]", t, t.getMessage()); } else { @@ -215,7 +214,7 @@ public class PublishClusterStateAction extends AbstractComponent { } } - private void sendFullClusterState(ClusterState clusterState, @Nullable Map serializedStates, + private void sendFullClusterState(ClusterState clusterState, Map serializedStates, DiscoveryNode node, TimeValue publishTimeout, SendingController sendingController) { BytesReference bytes = serializedStates.get(node.version()); if (bytes == null) { @@ -247,13 +246,14 @@ public class PublishClusterStateAction extends AbstractComponent { final SendingController sendingController, final boolean sendDiffs, final Map serializedStates) { try { + + // -> no need to put a timeout on the options here, because we want the response to eventually be received + // and not log an error if it arrives after the timeout + // -> no need to compress, we already compressed the bytes TransportRequestOptions options = TransportRequestOptions.options().withType(TransportRequestOptions.Type.STATE).withCompress(false); - // no need to put a timeout on the options here, because we want the response to eventually be received - // and not log an error if it arrives after the timeout transportService.sendRequest(node, SEND_ACTION_NAME, new BytesTransportRequest(bytes, node.version()), - options, // no need to compress, we already compressed the bytes - + options, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) { @Override @@ -284,13 +284,12 @@ public class PublishClusterStateAction extends AbstractComponent { private void sendCommitToNode(final DiscoveryNode node, final ClusterState clusterState, final SendingController sendingController) { try { logger.trace("sending commit for cluster state (uuid: [{}], version [{}]) to [{}]", clusterState.stateUUID(), clusterState.version(), node); - TransportRequestOptions options = TransportRequestOptions.options().withType(TransportRequestOptions.Type.STATE).withCompress(false); + TransportRequestOptions options = TransportRequestOptions.options().withType(TransportRequestOptions.Type.STATE); // no need to put a timeout on the options here, because we want the response to eventually be received // and not log an error if it arrives after the timeout transportService.sendRequest(node, COMMIT_ACTION_NAME, new CommitClusterStateRequest(clusterState.stateUUID()), - options, // no need to compress, we already compressed the bytes - + options, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) { @Override diff --git a/core/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryUnitTest.java b/core/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryUnitTest.java index 0ce3fd45a36..61b1062dbed 100644 --- a/core/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryUnitTest.java +++ b/core/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryUnitTest.java @@ -25,7 +25,6 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.common.transport.DummyTransportAddress; -import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.test.ESTestCase; import java.util.Collections; @@ -46,7 +45,6 @@ public class ZenDiscoveryUnitTest extends ESTestCase { DiscoveryNodes.Builder currentNodes = DiscoveryNodes.builder(); currentNodes.masterNodeId("a").put(new DiscoveryNode("a", DummyTransportAddress.INSTANCE, Version.CURRENT)); - ; DiscoveryNodes.Builder newNodes = DiscoveryNodes.builder(); newNodes.masterNodeId("a").put(new DiscoveryNode("a", DummyTransportAddress.INSTANCE, Version.CURRENT));