more feedback

This commit is contained in:
Boaz Leskes 2015-08-27 11:29:00 +02:00
parent c9ee8dbd16
commit 0668e0d623
2 changed files with 9 additions and 12 deletions

View File

@ -140,8 +140,7 @@ public class PublishClusterStateAction extends AbstractComponent {
throw t;
} catch (Throwable t) {
// try to fail committing, in cause it's still on going
sendingController.markAsFailed("unexpected error [" + t.getMessage() + "]");
if (sendingController.isCommitted() == false) {
if (sendingController.markAsFailed("unexpected error [" + t.getMessage() + "]")) {
// signal the change should be rejected
throw new Discovery.FailedToCommitClusterStateException("unexpected error [{}]", t, t.getMessage());
} else {
@ -215,7 +214,7 @@ public class PublishClusterStateAction extends AbstractComponent {
}
}
private void sendFullClusterState(ClusterState clusterState, @Nullable Map<Version, BytesReference> serializedStates,
private void sendFullClusterState(ClusterState clusterState, Map<Version, BytesReference> serializedStates,
DiscoveryNode node, TimeValue publishTimeout, SendingController sendingController) {
BytesReference bytes = serializedStates.get(node.version());
if (bytes == null) {
@ -247,13 +246,14 @@ public class PublishClusterStateAction extends AbstractComponent {
final SendingController sendingController,
final boolean sendDiffs, final Map<Version, BytesReference> serializedStates) {
try {
// -> no need to put a timeout on the options here, because we want the response to eventually be received
// and not log an error if it arrives after the timeout
// -> no need to compress, we already compressed the bytes
TransportRequestOptions options = TransportRequestOptions.options().withType(TransportRequestOptions.Type.STATE).withCompress(false);
// no need to put a timeout on the options here, because we want the response to eventually be received
// and not log an error if it arrives after the timeout
transportService.sendRequest(node, SEND_ACTION_NAME,
new BytesTransportRequest(bytes, node.version()),
options, // no need to compress, we already compressed the bytes
options,
new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
@Override
@ -284,13 +284,12 @@ public class PublishClusterStateAction extends AbstractComponent {
private void sendCommitToNode(final DiscoveryNode node, final ClusterState clusterState, final SendingController sendingController) {
try {
logger.trace("sending commit for cluster state (uuid: [{}], version [{}]) to [{}]", clusterState.stateUUID(), clusterState.version(), node);
TransportRequestOptions options = TransportRequestOptions.options().withType(TransportRequestOptions.Type.STATE).withCompress(false);
TransportRequestOptions options = TransportRequestOptions.options().withType(TransportRequestOptions.Type.STATE);
// no need to put a timeout on the options here, because we want the response to eventually be received
// and not log an error if it arrives after the timeout
transportService.sendRequest(node, COMMIT_ACTION_NAME,
new CommitClusterStateRequest(clusterState.stateUUID()),
options, // no need to compress, we already compressed the bytes
options,
new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
@Override

View File

@ -25,7 +25,6 @@ import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.transport.DummyTransportAddress;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.test.ESTestCase;
import java.util.Collections;
@ -46,7 +45,6 @@ public class ZenDiscoveryUnitTest extends ESTestCase {
DiscoveryNodes.Builder currentNodes = DiscoveryNodes.builder();
currentNodes.masterNodeId("a").put(new DiscoveryNode("a", DummyTransportAddress.INSTANCE, Version.CURRENT));
;
DiscoveryNodes.Builder newNodes = DiscoveryNodes.builder();
newNodes.masterNodeId("a").put(new DiscoveryNode("a", DummyTransportAddress.INSTANCE, Version.CURRENT));