Move all zen discovery classes into o.e.discovery.zen (#21032)
* Move all zen discovery classes into o.e.discovery.zen This collapses sub packages of zen into zen. These all had just a couple classes each, and there is really no reason to have the subpackages. * fix checkstyle
This commit is contained in:
parent
95b6f85c87
commit
53cff0f00f
|
@ -319,12 +319,6 @@
|
||||||
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]discovery[/\\]DiscoverySettings.java" checks="LineLength" />
|
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]discovery[/\\]DiscoverySettings.java" checks="LineLength" />
|
||||||
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]discovery[/\\]local[/\\]LocalDiscovery.java" checks="LineLength" />
|
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]discovery[/\\]local[/\\]LocalDiscovery.java" checks="LineLength" />
|
||||||
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]discovery[/\\]zen[/\\]ZenDiscovery.java" checks="LineLength" />
|
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]discovery[/\\]zen[/\\]ZenDiscovery.java" checks="LineLength" />
|
||||||
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]discovery[/\\]zen[/\\]elect[/\\]ElectMasterService.java" checks="LineLength" />
|
|
||||||
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]discovery[/\\]zen[/\\]fd[/\\]FaultDetection.java" checks="LineLength" />
|
|
||||||
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]discovery[/\\]zen[/\\]membership[/\\]MembershipAction.java" checks="LineLength" />
|
|
||||||
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]discovery[/\\]zen[/\\]ping[/\\]ZenPing.java" checks="LineLength" />
|
|
||||||
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]discovery[/\\]zen[/\\]publish[/\\]PendingClusterStatesQueue.java" checks="LineLength" />
|
|
||||||
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]discovery[/\\]zen[/\\]publish[/\\]PublishClusterStateAction.java" checks="LineLength" />
|
|
||||||
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]env[/\\]ESFileStore.java" checks="LineLength" />
|
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]env[/\\]ESFileStore.java" checks="LineLength" />
|
||||||
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]gateway[/\\]AsyncShardFetch.java" checks="LineLength" />
|
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]gateway[/\\]AsyncShardFetch.java" checks="LineLength" />
|
||||||
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]gateway[/\\]GatewayAllocator.java" checks="LineLength" />
|
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]gateway[/\\]GatewayAllocator.java" checks="LineLength" />
|
||||||
|
@ -721,7 +715,6 @@
|
||||||
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]discovery[/\\]DiscoveryWithServiceDisruptionsIT.java" checks="LineLength" />
|
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]discovery[/\\]DiscoveryWithServiceDisruptionsIT.java" checks="LineLength" />
|
||||||
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]discovery[/\\]ZenUnicastDiscoveryIT.java" checks="LineLength" />
|
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]discovery[/\\]ZenUnicastDiscoveryIT.java" checks="LineLength" />
|
||||||
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]discovery[/\\]zen[/\\]ZenDiscoveryUnitTests.java" checks="LineLength" />
|
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]discovery[/\\]zen[/\\]ZenDiscoveryUnitTests.java" checks="LineLength" />
|
||||||
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]discovery[/\\]zen[/\\]publish[/\\]PublishClusterStateActionTests.java" checks="LineLength" />
|
|
||||||
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]document[/\\]DocumentActionsIT.java" checks="LineLength" />
|
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]document[/\\]DocumentActionsIT.java" checks="LineLength" />
|
||||||
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]env[/\\]EnvironmentTests.java" checks="LineLength" />
|
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]env[/\\]EnvironmentTests.java" checks="LineLength" />
|
||||||
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]env[/\\]NodeEnvironmentTests.java" checks="LineLength" />
|
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]env[/\\]NodeEnvironmentTests.java" checks="LineLength" />
|
||||||
|
|
|
@ -52,7 +52,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||||
import org.elasticsearch.common.xcontent.XContentFactory;
|
import org.elasticsearch.common.xcontent.XContentFactory;
|
||||||
import org.elasticsearch.common.xcontent.XContentParser;
|
import org.elasticsearch.common.xcontent.XContentParser;
|
||||||
import org.elasticsearch.discovery.Discovery;
|
import org.elasticsearch.discovery.Discovery;
|
||||||
import org.elasticsearch.discovery.zen.publish.PublishClusterStateAction;
|
import org.elasticsearch.discovery.zen.PublishClusterStateAction;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.EnumSet;
|
import java.util.EnumSet;
|
||||||
|
|
|
@ -31,6 +31,8 @@ import org.elasticsearch.common.util.concurrent.AbstractRunnable;
|
||||||
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
|
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
|
||||||
import org.elasticsearch.common.util.concurrent.FutureUtils;
|
import org.elasticsearch.common.util.concurrent.FutureUtils;
|
||||||
import org.elasticsearch.common.util.concurrent.KeyedLock;
|
import org.elasticsearch.common.util.concurrent.KeyedLock;
|
||||||
|
import org.elasticsearch.discovery.zen.MasterFaultDetection;
|
||||||
|
import org.elasticsearch.discovery.zen.NodesFaultDetection;
|
||||||
import org.elasticsearch.threadpool.ThreadPool;
|
import org.elasticsearch.threadpool.ThreadPool;
|
||||||
import org.elasticsearch.transport.TransportService;
|
import org.elasticsearch.transport.TransportService;
|
||||||
|
|
||||||
|
@ -45,8 +47,8 @@ import static org.elasticsearch.common.settings.Setting.positiveTimeSetting;
|
||||||
* This component is responsible for connecting to nodes once they are added to the cluster state, and disconnect when they are
|
* This component is responsible for connecting to nodes once they are added to the cluster state, and disconnect when they are
|
||||||
* removed. Also, it periodically checks that all connections are still open and if needed restores them.
|
* removed. Also, it periodically checks that all connections are still open and if needed restores them.
|
||||||
* Note that this component is *not* responsible for removing nodes from the cluster if they disconnect / do not respond
|
* Note that this component is *not* responsible for removing nodes from the cluster if they disconnect / do not respond
|
||||||
* to pings. This is done by {@link org.elasticsearch.discovery.zen.fd.NodesFaultDetection}. Master fault detection
|
* to pings. This is done by {@link NodesFaultDetection}. Master fault detection
|
||||||
* is done by {@link org.elasticsearch.discovery.zen.fd.MasterFaultDetection}.
|
* is done by {@link MasterFaultDetection}.
|
||||||
*/
|
*/
|
||||||
public class NodeConnectionsService extends AbstractLifecycleComponent {
|
public class NodeConnectionsService extends AbstractLifecycleComponent {
|
||||||
|
|
||||||
|
|
|
@ -56,8 +56,8 @@ import org.elasticsearch.discovery.DiscoveryModule;
|
||||||
import org.elasticsearch.discovery.DiscoverySettings;
|
import org.elasticsearch.discovery.DiscoverySettings;
|
||||||
import org.elasticsearch.discovery.zen.ElectMasterService;
|
import org.elasticsearch.discovery.zen.ElectMasterService;
|
||||||
import org.elasticsearch.discovery.zen.ZenDiscovery;
|
import org.elasticsearch.discovery.zen.ZenDiscovery;
|
||||||
import org.elasticsearch.discovery.zen.fd.FaultDetection;
|
import org.elasticsearch.discovery.zen.FaultDetection;
|
||||||
import org.elasticsearch.discovery.zen.ping.unicast.UnicastZenPing;
|
import org.elasticsearch.discovery.zen.UnicastZenPing;
|
||||||
import org.elasticsearch.env.Environment;
|
import org.elasticsearch.env.Environment;
|
||||||
import org.elasticsearch.env.NodeEnvironment;
|
import org.elasticsearch.env.NodeEnvironment;
|
||||||
import org.elasticsearch.gateway.GatewayService;
|
import org.elasticsearch.gateway.GatewayService;
|
||||||
|
|
|
@ -25,12 +25,11 @@ import org.elasticsearch.common.settings.Setting;
|
||||||
import org.elasticsearch.common.settings.Setting.Property;
|
import org.elasticsearch.common.settings.Setting.Property;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.common.util.ExtensionPoint;
|
import org.elasticsearch.common.util.ExtensionPoint;
|
||||||
import org.elasticsearch.discovery.zen.ElectMasterService;
|
|
||||||
import org.elasticsearch.discovery.zen.ZenDiscovery;
|
import org.elasticsearch.discovery.zen.ZenDiscovery;
|
||||||
import org.elasticsearch.discovery.zen.ping.ZenPing;
|
import org.elasticsearch.discovery.zen.ZenPing;
|
||||||
import org.elasticsearch.discovery.zen.ping.ZenPingService;
|
import org.elasticsearch.discovery.zen.ZenPingService;
|
||||||
import org.elasticsearch.discovery.zen.ping.unicast.UnicastHostsProvider;
|
import org.elasticsearch.discovery.zen.UnicastHostsProvider;
|
||||||
import org.elasticsearch.discovery.zen.ping.unicast.UnicastZenPing;
|
import org.elasticsearch.discovery.zen.UnicastZenPing;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
|
|
@ -25,7 +25,7 @@ import org.elasticsearch.common.io.stream.StreamOutput;
|
||||||
import org.elasticsearch.common.io.stream.Writeable;
|
import org.elasticsearch.common.io.stream.Writeable;
|
||||||
import org.elasticsearch.common.xcontent.ToXContent;
|
import org.elasticsearch.common.xcontent.ToXContent;
|
||||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||||
import org.elasticsearch.discovery.zen.publish.PendingClusterStateStats;
|
import org.elasticsearch.discovery.zen.PendingClusterStateStats;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
|
||||||
|
|
|
@ -16,7 +16,8 @@
|
||||||
* specific language governing permissions and limitations
|
* specific language governing permissions and limitations
|
||||||
* under the License.
|
* under the License.
|
||||||
*/
|
*/
|
||||||
package org.elasticsearch.discovery.zen.fd;
|
|
||||||
|
package org.elasticsearch.discovery.zen;
|
||||||
|
|
||||||
import org.elasticsearch.cluster.ClusterName;
|
import org.elasticsearch.cluster.ClusterName;
|
||||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||||
|
@ -32,7 +33,7 @@ import org.elasticsearch.transport.TransportService;
|
||||||
import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds;
|
import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A base class for {@link org.elasticsearch.discovery.zen.fd.MasterFaultDetection} & {@link org.elasticsearch.discovery.zen.fd.NodesFaultDetection},
|
* A base class for {@link MasterFaultDetection} & {@link NodesFaultDetection},
|
||||||
* making sure both use the same setting.
|
* making sure both use the same setting.
|
||||||
*/
|
*/
|
||||||
public abstract class FaultDetection extends AbstractComponent {
|
public abstract class FaultDetection extends AbstractComponent {
|
|
@ -17,7 +17,7 @@
|
||||||
* under the License.
|
* under the License.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
package org.elasticsearch.discovery.zen.fd;
|
package org.elasticsearch.discovery.zen;
|
||||||
|
|
||||||
import org.apache.logging.log4j.message.ParameterizedMessage;
|
import org.apache.logging.log4j.message.ParameterizedMessage;
|
||||||
import org.apache.logging.log4j.util.Supplier;
|
import org.apache.logging.log4j.util.Supplier;
|
|
@ -17,7 +17,7 @@
|
||||||
* under the License.
|
* under the License.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
package org.elasticsearch.discovery.zen.membership;
|
package org.elasticsearch.discovery.zen;
|
||||||
|
|
||||||
import org.elasticsearch.cluster.ClusterState;
|
import org.elasticsearch.cluster.ClusterState;
|
||||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||||
|
@ -62,36 +62,42 @@ public class MembershipAction extends AbstractComponent {
|
||||||
|
|
||||||
private final MembershipListener listener;
|
private final MembershipListener listener;
|
||||||
|
|
||||||
public MembershipAction(Settings settings, TransportService transportService, DiscoveryNodesProvider nodesProvider, MembershipListener listener) {
|
public MembershipAction(Settings settings, TransportService transportService,
|
||||||
|
DiscoveryNodesProvider nodesProvider, MembershipListener listener) {
|
||||||
super(settings);
|
super(settings);
|
||||||
this.transportService = transportService;
|
this.transportService = transportService;
|
||||||
this.nodesProvider = nodesProvider;
|
this.nodesProvider = nodesProvider;
|
||||||
this.listener = listener;
|
this.listener = listener;
|
||||||
|
|
||||||
transportService.registerRequestHandler(DISCOVERY_JOIN_ACTION_NAME, JoinRequest::new, ThreadPool.Names.GENERIC, new JoinRequestRequestHandler());
|
transportService.registerRequestHandler(DISCOVERY_JOIN_ACTION_NAME, JoinRequest::new,
|
||||||
transportService.registerRequestHandler(DISCOVERY_JOIN_VALIDATE_ACTION_NAME, ValidateJoinRequest::new, ThreadPool.Names.GENERIC, new ValidateJoinRequestRequestHandler());
|
ThreadPool.Names.GENERIC, new JoinRequestRequestHandler());
|
||||||
transportService.registerRequestHandler(DISCOVERY_LEAVE_ACTION_NAME, LeaveRequest::new, ThreadPool.Names.GENERIC, new LeaveRequestRequestHandler());
|
transportService.registerRequestHandler(DISCOVERY_JOIN_VALIDATE_ACTION_NAME, ValidateJoinRequest::new,
|
||||||
|
ThreadPool.Names.GENERIC, new ValidateJoinRequestRequestHandler());
|
||||||
|
transportService.registerRequestHandler(DISCOVERY_LEAVE_ACTION_NAME, LeaveRequest::new,
|
||||||
|
ThreadPool.Names.GENERIC, new LeaveRequestRequestHandler());
|
||||||
}
|
}
|
||||||
|
|
||||||
public void sendLeaveRequest(DiscoveryNode masterNode, DiscoveryNode node) {
|
public void sendLeaveRequest(DiscoveryNode masterNode, DiscoveryNode node) {
|
||||||
transportService.sendRequest(node, DISCOVERY_LEAVE_ACTION_NAME, new LeaveRequest(masterNode), EmptyTransportResponseHandler.INSTANCE_SAME);
|
transportService.sendRequest(node, DISCOVERY_LEAVE_ACTION_NAME, new LeaveRequest(masterNode),
|
||||||
|
EmptyTransportResponseHandler.INSTANCE_SAME);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void sendLeaveRequestBlocking(DiscoveryNode masterNode, DiscoveryNode node, TimeValue timeout) {
|
public void sendLeaveRequestBlocking(DiscoveryNode masterNode, DiscoveryNode node, TimeValue timeout) {
|
||||||
transportService.submitRequest(masterNode, DISCOVERY_LEAVE_ACTION_NAME, new LeaveRequest(node), EmptyTransportResponseHandler.INSTANCE_SAME).txGet(timeout.millis(), TimeUnit.MILLISECONDS);
|
transportService.submitRequest(masterNode, DISCOVERY_LEAVE_ACTION_NAME, new LeaveRequest(node),
|
||||||
|
EmptyTransportResponseHandler.INSTANCE_SAME).txGet(timeout.millis(), TimeUnit.MILLISECONDS);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void sendJoinRequestBlocking(DiscoveryNode masterNode, DiscoveryNode node, TimeValue timeout) {
|
public void sendJoinRequestBlocking(DiscoveryNode masterNode, DiscoveryNode node, TimeValue timeout) {
|
||||||
transportService.submitRequest(masterNode, DISCOVERY_JOIN_ACTION_NAME, new JoinRequest(node), EmptyTransportResponseHandler.INSTANCE_SAME)
|
transportService.submitRequest(masterNode, DISCOVERY_JOIN_ACTION_NAME, new JoinRequest(node),
|
||||||
.txGet(timeout.millis(), TimeUnit.MILLISECONDS);
|
EmptyTransportResponseHandler.INSTANCE_SAME).txGet(timeout.millis(), TimeUnit.MILLISECONDS);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Validates the join request, throwing a failure if it failed.
|
* Validates the join request, throwing a failure if it failed.
|
||||||
*/
|
*/
|
||||||
public void sendValidateJoinRequestBlocking(DiscoveryNode node, ClusterState state, TimeValue timeout) {
|
public void sendValidateJoinRequestBlocking(DiscoveryNode node, ClusterState state, TimeValue timeout) {
|
||||||
transportService.submitRequest(node, DISCOVERY_JOIN_VALIDATE_ACTION_NAME, new ValidateJoinRequest(state), EmptyTransportResponseHandler.INSTANCE_SAME)
|
transportService.submitRequest(node, DISCOVERY_JOIN_VALIDATE_ACTION_NAME, new ValidateJoinRequest(state),
|
||||||
.txGet(timeout.millis(), TimeUnit.MILLISECONDS);
|
EmptyTransportResponseHandler.INSTANCE_SAME).txGet(timeout.millis(), TimeUnit.MILLISECONDS);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static class JoinRequest extends TransportRequest {
|
public static class JoinRequest extends TransportRequest {
|
|
@ -40,7 +40,6 @@ import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.common.transport.TransportAddress;
|
import org.elasticsearch.common.transport.TransportAddress;
|
||||||
import org.elasticsearch.common.unit.TimeValue;
|
import org.elasticsearch.common.unit.TimeValue;
|
||||||
import org.elasticsearch.discovery.DiscoverySettings;
|
import org.elasticsearch.discovery.DiscoverySettings;
|
||||||
import org.elasticsearch.discovery.zen.membership.MembershipAction;
|
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
|
|
@ -17,7 +17,7 @@
|
||||||
* under the License.
|
* under the License.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
package org.elasticsearch.discovery.zen.fd;
|
package org.elasticsearch.discovery.zen;
|
||||||
|
|
||||||
import org.apache.logging.log4j.message.ParameterizedMessage;
|
import org.apache.logging.log4j.message.ParameterizedMessage;
|
||||||
import org.apache.logging.log4j.util.Supplier;
|
import org.apache.logging.log4j.util.Supplier;
|
|
@ -17,7 +17,7 @@
|
||||||
* under the License.
|
* under the License.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
package org.elasticsearch.discovery.zen.publish;
|
package org.elasticsearch.discovery.zen;
|
||||||
|
|
||||||
import org.elasticsearch.common.io.stream.StreamInput;
|
import org.elasticsearch.common.io.stream.StreamInput;
|
||||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
import org.elasticsearch.common.io.stream.StreamOutput;
|
|
@ -16,7 +16,8 @@
|
||||||
* specific language governing permissions and limitations
|
* specific language governing permissions and limitations
|
||||||
* under the License.
|
* under the License.
|
||||||
*/
|
*/
|
||||||
package org.elasticsearch.discovery.zen.publish;
|
|
||||||
|
package org.elasticsearch.discovery.zen;
|
||||||
|
|
||||||
import org.apache.logging.log4j.Logger;
|
import org.apache.logging.log4j.Logger;
|
||||||
import org.elasticsearch.ElasticsearchException;
|
import org.elasticsearch.ElasticsearchException;
|
||||||
|
@ -34,10 +35,12 @@ import java.util.Objects;
|
||||||
* <p>
|
* <p>
|
||||||
* The queue is bound by {@link #maxQueueSize}. When the queue is at capacity and a new cluster state is inserted
|
* The queue is bound by {@link #maxQueueSize}. When the queue is at capacity and a new cluster state is inserted
|
||||||
* the oldest cluster state will be dropped. This is safe because:
|
* the oldest cluster state will be dropped. This is safe because:
|
||||||
* 1) Under normal operations, master will publish & commit a cluster state before processing another change (i.e., the queue length is 1)
|
* 1) Under normal operations, master will publish & commit a cluster state before processing
|
||||||
|
* another change (i.e., the queue length is 1)
|
||||||
* 2) If the master fails to commit a change, it will step down, causing a master election, which will flush the queue.
|
* 2) If the master fails to commit a change, it will step down, causing a master election, which will flush the queue.
|
||||||
* 3) In general it's safe to process the incoming cluster state as a replacement to the cluster state that's dropped.
|
* 3) In general it's safe to process the incoming cluster state as a replacement to the cluster state that's dropped.
|
||||||
* a) If the dropped cluster is from the same master as the incoming one is, it is likely to be superseded by the incoming state (or another state in the queue).
|
* a) If the dropped cluster is from the same master as the incoming one is, it is likely to be superseded by the
|
||||||
|
* incoming state (or another state in the queue).
|
||||||
* This is only not true in very extreme cases of out of order delivery.
|
* This is only not true in very extreme cases of out of order delivery.
|
||||||
* b) If the dropping cluster state is not from the same master, it means that:
|
* b) If the dropping cluster state is not from the same master, it means that:
|
||||||
* i) we are no longer following the master of the dropped cluster state but follow the incoming one
|
* i) we are no longer following the master of the dropped cluster state but follow the incoming one
|
||||||
|
@ -70,7 +73,8 @@ public class PendingClusterStatesQueue {
|
||||||
ClusterStateContext context = pendingStates.remove(0);
|
ClusterStateContext context = pendingStates.remove(0);
|
||||||
logger.warn("dropping pending state [{}]. more than [{}] pending states.", context, maxQueueSize);
|
logger.warn("dropping pending state [{}]. more than [{}] pending states.", context, maxQueueSize);
|
||||||
if (context.committed()) {
|
if (context.committed()) {
|
||||||
context.listener.onNewClusterStateFailed(new ElasticsearchException("too many pending states ([{}] pending)", maxQueueSize));
|
context.listener.onNewClusterStateFailed(new ElasticsearchException("too many pending states ([{}] pending)",
|
||||||
|
maxQueueSize));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -82,11 +86,13 @@ public class PendingClusterStatesQueue {
|
||||||
public synchronized ClusterState markAsCommitted(String stateUUID, StateProcessedListener listener) {
|
public synchronized ClusterState markAsCommitted(String stateUUID, StateProcessedListener listener) {
|
||||||
final ClusterStateContext context = findState(stateUUID);
|
final ClusterStateContext context = findState(stateUUID);
|
||||||
if (context == null) {
|
if (context == null) {
|
||||||
listener.onNewClusterStateFailed(new IllegalStateException("can't resolve cluster state with uuid [" + stateUUID + "] to commit"));
|
listener.onNewClusterStateFailed(new IllegalStateException("can't resolve cluster state with uuid" +
|
||||||
|
" [" + stateUUID + "] to commit"));
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
if (context.committed()) {
|
if (context.committed()) {
|
||||||
listener.onNewClusterStateFailed(new IllegalStateException("cluster state with uuid [" + stateUUID + "] is already committed"));
|
listener.onNewClusterStateFailed(new IllegalStateException("cluster state with uuid" +
|
||||||
|
" [" + stateUUID + "] is already committed"));
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
context.markAsCommitted(listener);
|
context.markAsCommitted(listener);
|
||||||
|
@ -94,13 +100,14 @@ public class PendingClusterStatesQueue {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* mark that the processing of the given state has failed. All committed states that are {@link ClusterState#supersedes(ClusterState)}-ed
|
* mark that the processing of the given state has failed. All committed states that are
|
||||||
* by this failed state, will be failed as well
|
* {@link ClusterState#supersedes(ClusterState)}-ed by this failed state, will be failed as well
|
||||||
*/
|
*/
|
||||||
public synchronized void markAsFailed(ClusterState state, Exception reason) {
|
public synchronized void markAsFailed(ClusterState state, Exception reason) {
|
||||||
final ClusterStateContext failedContext = findState(state.stateUUID());
|
final ClusterStateContext failedContext = findState(state.stateUUID());
|
||||||
if (failedContext == null) {
|
if (failedContext == null) {
|
||||||
throw new IllegalArgumentException("can't resolve failed cluster state with uuid [" + state.stateUUID() + "], version [" + state.version() + "]");
|
throw new IllegalArgumentException("can't resolve failed cluster state with uuid [" + state.stateUUID()
|
||||||
|
+ "], version [" + state.version() + "]");
|
||||||
}
|
}
|
||||||
if (failedContext.committed() == false) {
|
if (failedContext.committed() == false) {
|
||||||
throw new IllegalArgumentException("failed cluster state is not committed " + state);
|
throw new IllegalArgumentException("failed cluster state is not committed " + state);
|
||||||
|
@ -128,15 +135,16 @@ public class PendingClusterStatesQueue {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* indicates that a cluster state was successfully processed. Any committed state that is {@link ClusterState#supersedes(ClusterState)}-ed
|
* indicates that a cluster state was successfully processed. Any committed state that is
|
||||||
* by the processed state will be marked as processed as well.
|
* {@link ClusterState#supersedes(ClusterState)}-ed by the processed state will be marked as processed as well.
|
||||||
* <p>
|
* <p>
|
||||||
* NOTE: successfully processing a state indicates we are following the master it came from. Any committed state from another master will
|
* NOTE: successfully processing a state indicates we are following the master it came from. Any committed state
|
||||||
* be failed by this method
|
* from another master will be failed by this method
|
||||||
*/
|
*/
|
||||||
public synchronized void markAsProcessed(ClusterState state) {
|
public synchronized void markAsProcessed(ClusterState state) {
|
||||||
if (findState(state.stateUUID()) == null) {
|
if (findState(state.stateUUID()) == null) {
|
||||||
throw new IllegalStateException("can't resolve processed cluster state with uuid [" + state.stateUUID() + "], version [" + state.version() + "]");
|
throw new IllegalStateException("can't resolve processed cluster state with uuid [" + state.stateUUID()
|
||||||
|
+ "], version [" + state.version() + "]");
|
||||||
}
|
}
|
||||||
final DiscoveryNode currentMaster = state.nodes().getMasterNode();
|
final DiscoveryNode currentMaster = state.nodes().getMasterNode();
|
||||||
assert currentMaster != null : "processed cluster state mast have a master. " + state;
|
assert currentMaster != null : "processed cluster state mast have a master. " + state;
|
||||||
|
@ -152,17 +160,16 @@ public class PendingClusterStatesQueue {
|
||||||
contextsToRemove.add(pendingContext);
|
contextsToRemove.add(pendingContext);
|
||||||
if (pendingContext.committed()) {
|
if (pendingContext.committed()) {
|
||||||
// this is a committed state , warn
|
// this is a committed state , warn
|
||||||
logger.warn("received a cluster state (uuid[{}]/v[{}]) from a different master than the current one, rejecting (received {}, current {})",
|
logger.warn("received a cluster state (uuid[{}]/v[{}]) from a different master than the current one,"
|
||||||
pendingState.stateUUID(), pendingState.version(),
|
+ " rejecting (received {}, current {})",
|
||||||
pendingMasterNode, currentMaster);
|
pendingState.stateUUID(), pendingState.version(), pendingMasterNode, currentMaster);
|
||||||
pendingContext.listener.onNewClusterStateFailed(
|
pendingContext.listener.onNewClusterStateFailed(
|
||||||
new IllegalStateException("cluster state from a different master than the current one, rejecting (received " + pendingMasterNode + ", current " + currentMaster + ")")
|
new IllegalStateException("cluster state from a different master than the current one," +
|
||||||
);
|
" rejecting (received " + pendingMasterNode + ", current " + currentMaster + ")"));
|
||||||
} else {
|
} else {
|
||||||
logger.trace("removing non-committed state with uuid[{}]/v[{}] from [{}] - a state from [{}] was successfully processed",
|
logger.trace("removing non-committed state with uuid[{}]/v[{}] from [{}] - a state from" +
|
||||||
pendingState.stateUUID(), pendingState.version(), pendingMasterNode,
|
" [{}] was successfully processed",
|
||||||
currentMaster
|
pendingState.stateUUID(), pendingState.version(), pendingMasterNode, currentMaster);
|
||||||
);
|
|
||||||
}
|
}
|
||||||
} else if (pendingState.stateUUID().equals(state.stateUUID())) {
|
} else if (pendingState.stateUUID().equals(state.stateUUID())) {
|
||||||
assert pendingContext.committed() : "processed cluster state is not committed " + state;
|
assert pendingContext.committed() : "processed cluster state is not committed " + state;
|
|
@ -17,7 +17,7 @@
|
||||||
* under the License.
|
* under the License.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
package org.elasticsearch.discovery.zen.ping;
|
package org.elasticsearch.discovery.zen;
|
||||||
|
|
||||||
import org.elasticsearch.cluster.ClusterState;
|
import org.elasticsearch.cluster.ClusterState;
|
||||||
import org.elasticsearch.discovery.zen.DiscoveryNodesProvider;
|
import org.elasticsearch.discovery.zen.DiscoveryNodesProvider;
|
|
@ -17,7 +17,7 @@
|
||||||
* under the License.
|
* under the License.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
package org.elasticsearch.discovery.zen.publish;
|
package org.elasticsearch.discovery.zen;
|
||||||
|
|
||||||
import org.apache.logging.log4j.message.ParameterizedMessage;
|
import org.apache.logging.log4j.message.ParameterizedMessage;
|
||||||
import org.elasticsearch.ElasticsearchException;
|
import org.elasticsearch.ElasticsearchException;
|
||||||
|
@ -42,7 +42,6 @@ import org.elasticsearch.discovery.AckClusterStatePublishResponseHandler;
|
||||||
import org.elasticsearch.discovery.BlockingClusterStatePublishResponseHandler;
|
import org.elasticsearch.discovery.BlockingClusterStatePublishResponseHandler;
|
||||||
import org.elasticsearch.discovery.Discovery;
|
import org.elasticsearch.discovery.Discovery;
|
||||||
import org.elasticsearch.discovery.DiscoverySettings;
|
import org.elasticsearch.discovery.DiscoverySettings;
|
||||||
import org.elasticsearch.discovery.zen.ZenDiscovery;
|
|
||||||
import org.elasticsearch.threadpool.ThreadPool;
|
import org.elasticsearch.threadpool.ThreadPool;
|
||||||
import org.elasticsearch.transport.BytesTransportRequest;
|
import org.elasticsearch.transport.BytesTransportRequest;
|
||||||
import org.elasticsearch.transport.EmptyTransportResponseHandler;
|
import org.elasticsearch.transport.EmptyTransportResponseHandler;
|
||||||
|
@ -114,10 +113,12 @@ public class PublishClusterStateAction extends AbstractComponent {
|
||||||
* publishes a cluster change event to other nodes. if at least minMasterNodes acknowledge the change it is committed and will
|
* publishes a cluster change event to other nodes. if at least minMasterNodes acknowledge the change it is committed and will
|
||||||
* be processed by the master and the other nodes.
|
* be processed by the master and the other nodes.
|
||||||
* <p>
|
* <p>
|
||||||
* The method is guaranteed to throw a {@link org.elasticsearch.discovery.Discovery.FailedToCommitClusterStateException} if the change is not committed and should be rejected.
|
* The method is guaranteed to throw a {@link org.elasticsearch.discovery.Discovery.FailedToCommitClusterStateException}
|
||||||
|
* if the change is not committed and should be rejected.
|
||||||
* Any other exception signals the something wrong happened but the change is committed.
|
* Any other exception signals the something wrong happened but the change is committed.
|
||||||
*/
|
*/
|
||||||
public void publish(final ClusterChangedEvent clusterChangedEvent, final int minMasterNodes, final Discovery.AckListener ackListener) throws Discovery.FailedToCommitClusterStateException {
|
public void publish(final ClusterChangedEvent clusterChangedEvent, final int minMasterNodes,
|
||||||
|
final Discovery.AckListener ackListener) throws Discovery.FailedToCommitClusterStateException {
|
||||||
final DiscoveryNodes nodes;
|
final DiscoveryNodes nodes;
|
||||||
final SendingController sendingController;
|
final SendingController sendingController;
|
||||||
final Set<DiscoveryNode> nodesToPublishTo;
|
final Set<DiscoveryNode> nodesToPublishTo;
|
||||||
|
@ -145,8 +146,10 @@ public class PublishClusterStateAction extends AbstractComponent {
|
||||||
buildDiffAndSerializeStates(clusterChangedEvent.state(), clusterChangedEvent.previousState(),
|
buildDiffAndSerializeStates(clusterChangedEvent.state(), clusterChangedEvent.previousState(),
|
||||||
nodesToPublishTo, sendFullVersion, serializedStates, serializedDiffs);
|
nodesToPublishTo, sendFullVersion, serializedStates, serializedDiffs);
|
||||||
|
|
||||||
final BlockingClusterStatePublishResponseHandler publishResponseHandler = new AckClusterStatePublishResponseHandler(nodesToPublishTo, ackListener);
|
final BlockingClusterStatePublishResponseHandler publishResponseHandler =
|
||||||
sendingController = new SendingController(clusterChangedEvent.state(), minMasterNodes, totalMasterNodes, publishResponseHandler);
|
new AckClusterStatePublishResponseHandler(nodesToPublishTo, ackListener);
|
||||||
|
sendingController = new SendingController(clusterChangedEvent.state(), minMasterNodes,
|
||||||
|
totalMasterNodes, publishResponseHandler);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
throw new Discovery.FailedToCommitClusterStateException("unexpected error while preparing to publish", e);
|
throw new Discovery.FailedToCommitClusterStateException("unexpected error while preparing to publish", e);
|
||||||
}
|
}
|
||||||
|
@ -197,7 +200,8 @@ public class PublishClusterStateAction extends AbstractComponent {
|
||||||
DiscoveryNode[] pendingNodes = publishResponseHandler.pendingNodes();
|
DiscoveryNode[] pendingNodes = publishResponseHandler.pendingNodes();
|
||||||
// everyone may have just responded
|
// everyone may have just responded
|
||||||
if (pendingNodes.length > 0) {
|
if (pendingNodes.length > 0) {
|
||||||
logger.warn("timed out waiting for all nodes to process published state [{}] (timeout [{}], pending nodes: {})", clusterState.version(), publishTimeout, pendingNodes);
|
logger.warn("timed out waiting for all nodes to process published state [{}] (timeout [{}], pending nodes: {})",
|
||||||
|
clusterState.version(), publishTimeout, pendingNodes);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
|
@ -207,7 +211,8 @@ public class PublishClusterStateAction extends AbstractComponent {
|
||||||
}
|
}
|
||||||
|
|
||||||
private void buildDiffAndSerializeStates(ClusterState clusterState, ClusterState previousState, Set<DiscoveryNode> nodesToPublishTo,
|
private void buildDiffAndSerializeStates(ClusterState clusterState, ClusterState previousState, Set<DiscoveryNode> nodesToPublishTo,
|
||||||
boolean sendFullVersion, Map<Version, BytesReference> serializedStates, Map<Version, BytesReference> serializedDiffs) {
|
boolean sendFullVersion, Map<Version, BytesReference> serializedStates,
|
||||||
|
Map<Version, BytesReference> serializedDiffs) {
|
||||||
Diff<ClusterState> diff = null;
|
Diff<ClusterState> diff = null;
|
||||||
for (final DiscoveryNode node : nodesToPublishTo) {
|
for (final DiscoveryNode node : nodesToPublishTo) {
|
||||||
try {
|
try {
|
||||||
|
@ -240,7 +245,8 @@ public class PublishClusterStateAction extends AbstractComponent {
|
||||||
serializedStates.put(node.getVersion(), bytes);
|
serializedStates.put(node.getVersion(), bytes);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
logger.warn(
|
logger.warn(
|
||||||
(org.apache.logging.log4j.util.Supplier<?>) () -> new ParameterizedMessage("failed to serialize cluster_state before publishing it to node {}", node), e);
|
(org.apache.logging.log4j.util.Supplier<?>) () ->
|
||||||
|
new ParameterizedMessage("failed to serialize cluster_state before publishing it to node {}", node), e);
|
||||||
sendingController.onNodeSendFailed(node, e);
|
sendingController.onNodeSendFailed(node, e);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -266,7 +272,8 @@ public class PublishClusterStateAction extends AbstractComponent {
|
||||||
// -> no need to put a timeout on the options here, because we want the response to eventually be received
|
// -> no need to put a timeout on the options here, because we want the response to eventually be received
|
||||||
// and not log an error if it arrives after the timeout
|
// and not log an error if it arrives after the timeout
|
||||||
// -> no need to compress, we already compressed the bytes
|
// -> no need to compress, we already compressed the bytes
|
||||||
TransportRequestOptions options = TransportRequestOptions.builder().withType(TransportRequestOptions.Type.STATE).withCompress(false).build();
|
TransportRequestOptions options = TransportRequestOptions.builder()
|
||||||
|
.withType(TransportRequestOptions.Type.STATE).withCompress(false).build();
|
||||||
transportService.sendRequest(node, SEND_ACTION_NAME,
|
transportService.sendRequest(node, SEND_ACTION_NAME,
|
||||||
new BytesTransportRequest(bytes, node.getVersion()),
|
new BytesTransportRequest(bytes, node.getVersion()),
|
||||||
options,
|
options,
|
||||||
|
@ -275,7 +282,8 @@ public class PublishClusterStateAction extends AbstractComponent {
|
||||||
@Override
|
@Override
|
||||||
public void handleResponse(TransportResponse.Empty response) {
|
public void handleResponse(TransportResponse.Empty response) {
|
||||||
if (sendingController.getPublishingTimedOut()) {
|
if (sendingController.getPublishingTimedOut()) {
|
||||||
logger.debug("node {} responded for cluster state [{}] (took longer than [{}])", node, clusterState.version(), publishTimeout);
|
logger.debug("node {} responded for cluster state [{}] (took longer than [{}])", node,
|
||||||
|
clusterState.version(), publishTimeout);
|
||||||
}
|
}
|
||||||
sendingController.onNodeSendAck(node);
|
sendingController.onNodeSendAck(node);
|
||||||
}
|
}
|
||||||
|
@ -286,21 +294,24 @@ public class PublishClusterStateAction extends AbstractComponent {
|
||||||
logger.debug("resending full cluster state to node {} reason {}", node, exp.getDetailedMessage());
|
logger.debug("resending full cluster state to node {} reason {}", node, exp.getDetailedMessage());
|
||||||
sendFullClusterState(clusterState, serializedStates, node, publishTimeout, sendingController);
|
sendFullClusterState(clusterState, serializedStates, node, publishTimeout, sendingController);
|
||||||
} else {
|
} else {
|
||||||
logger.debug((org.apache.logging.log4j.util.Supplier<?>) () -> new ParameterizedMessage("failed to send cluster state to {}", node), exp);
|
logger.debug((org.apache.logging.log4j.util.Supplier<?>) () ->
|
||||||
|
new ParameterizedMessage("failed to send cluster state to {}", node), exp);
|
||||||
sendingController.onNodeSendFailed(node, exp);
|
sendingController.onNodeSendFailed(node, exp);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
logger.warn(
|
logger.warn(
|
||||||
(org.apache.logging.log4j.util.Supplier<?>) () -> new ParameterizedMessage("error sending cluster state to {}", node), e);
|
(org.apache.logging.log4j.util.Supplier<?>) () ->
|
||||||
|
new ParameterizedMessage("error sending cluster state to {}", node), e);
|
||||||
sendingController.onNodeSendFailed(node, e);
|
sendingController.onNodeSendFailed(node, e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void sendCommitToNode(final DiscoveryNode node, final ClusterState clusterState, final SendingController sendingController) {
|
private void sendCommitToNode(final DiscoveryNode node, final ClusterState clusterState, final SendingController sendingController) {
|
||||||
try {
|
try {
|
||||||
logger.trace("sending commit for cluster state (uuid: [{}], version [{}]) to [{}]", clusterState.stateUUID(), clusterState.version(), node);
|
logger.trace("sending commit for cluster state (uuid: [{}], version [{}]) to [{}]",
|
||||||
|
clusterState.stateUUID(), clusterState.version(), node);
|
||||||
TransportRequestOptions options = TransportRequestOptions.builder().withType(TransportRequestOptions.Type.STATE).build();
|
TransportRequestOptions options = TransportRequestOptions.builder().withType(TransportRequestOptions.Type.STATE).build();
|
||||||
// no need to put a timeout on the options here, because we want the response to eventually be received
|
// no need to put a timeout on the options here, because we want the response to eventually be received
|
||||||
// and not log an error if it arrives after the timeout
|
// and not log an error if it arrives after the timeout
|
||||||
|
@ -319,12 +330,16 @@ public class PublishClusterStateAction extends AbstractComponent {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void handleException(TransportException exp) {
|
public void handleException(TransportException exp) {
|
||||||
logger.debug((org.apache.logging.log4j.util.Supplier<?>) () -> new ParameterizedMessage("failed to commit cluster state (uuid [{}], version [{}]) to {}", clusterState.stateUUID(), clusterState.version(), node), exp);
|
logger.debug((org.apache.logging.log4j.util.Supplier<?>) () ->
|
||||||
|
new ParameterizedMessage("failed to commit cluster state (uuid [{}], version [{}]) to {}",
|
||||||
|
clusterState.stateUUID(), clusterState.version(), node), exp);
|
||||||
sendingController.getPublishResponseHandler().onFailure(node, exp);
|
sendingController.getPublishResponseHandler().onFailure(node, exp);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
} catch (Exception t) {
|
} catch (Exception t) {
|
||||||
logger.warn((org.apache.logging.log4j.util.Supplier<?>) () -> new ParameterizedMessage("error sending cluster state commit (uuid [{}], version [{}]) to {}", clusterState.stateUUID(), clusterState.version(), node), t);
|
logger.warn((org.apache.logging.log4j.util.Supplier<?>) () ->
|
||||||
|
new ParameterizedMessage("error sending cluster state commit (uuid [{}], version [{}]) to {}",
|
||||||
|
clusterState.stateUUID(), clusterState.version(), node), t);
|
||||||
sendingController.getPublishResponseHandler().onFailure(node, t);
|
sendingController.getPublishResponseHandler().onFailure(node, t);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -371,7 +386,8 @@ public class PublishClusterStateAction extends AbstractComponent {
|
||||||
} else if (lastSeenClusterState != null) {
|
} else if (lastSeenClusterState != null) {
|
||||||
Diff<ClusterState> diff = lastSeenClusterState.readDiffFrom(in);
|
Diff<ClusterState> diff = lastSeenClusterState.readDiffFrom(in);
|
||||||
incomingState = diff.apply(lastSeenClusterState);
|
incomingState = diff.apply(lastSeenClusterState);
|
||||||
logger.debug("received diff cluster state version [{}] with uuid [{}], diff size [{}]", incomingState.version(), incomingState.stateUUID(), request.bytes().length());
|
logger.debug("received diff cluster state version [{}] with uuid [{}], diff size [{}]",
|
||||||
|
incomingState.version(), incomingState.stateUUID(), request.bytes().length());
|
||||||
} else {
|
} else {
|
||||||
logger.debug("received diff for but don't have any local cluster state - requesting full state");
|
logger.debug("received diff for but don't have any local cluster state - requesting full state");
|
||||||
throw new IncompatibleClusterStateVersionException("have no local cluster state");
|
throw new IncompatibleClusterStateVersionException("have no local cluster state");
|
||||||
|
@ -394,13 +410,15 @@ public class PublishClusterStateAction extends AbstractComponent {
|
||||||
void validateIncomingState(ClusterState incomingState, ClusterState lastSeenClusterState) {
|
void validateIncomingState(ClusterState incomingState, ClusterState lastSeenClusterState) {
|
||||||
final ClusterName incomingClusterName = incomingState.getClusterName();
|
final ClusterName incomingClusterName = incomingState.getClusterName();
|
||||||
if (!incomingClusterName.equals(this.clusterName)) {
|
if (!incomingClusterName.equals(this.clusterName)) {
|
||||||
logger.warn("received cluster state from [{}] which is also master but with a different cluster name [{}]", incomingState.nodes().getMasterNode(), incomingClusterName);
|
logger.warn("received cluster state from [{}] which is also master but with a different cluster name [{}]",
|
||||||
|
incomingState.nodes().getMasterNode(), incomingClusterName);
|
||||||
throw new IllegalStateException("received state from a node that is not part of the cluster");
|
throw new IllegalStateException("received state from a node that is not part of the cluster");
|
||||||
}
|
}
|
||||||
final ClusterState clusterState = clusterStateSupplier.get();
|
final ClusterState clusterState = clusterStateSupplier.get();
|
||||||
|
|
||||||
if (clusterState.nodes().getLocalNode().equals(incomingState.nodes().getLocalNode()) == false) {
|
if (clusterState.nodes().getLocalNode().equals(incomingState.nodes().getLocalNode()) == false) {
|
||||||
logger.warn("received a cluster state from [{}] and not part of the cluster, should not happen", incomingState.nodes().getMasterNode());
|
logger.warn("received a cluster state from [{}] and not part of the cluster, should not happen",
|
||||||
|
incomingState.nodes().getMasterNode());
|
||||||
throw new IllegalStateException("received state with a local node that does not match the current local node");
|
throw new IllegalStateException("received state with a local node that does not match the current local node");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -419,7 +437,8 @@ public class PublishClusterStateAction extends AbstractComponent {
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void handleCommitRequest(CommitClusterStateRequest request, final TransportChannel channel) {
|
protected void handleCommitRequest(CommitClusterStateRequest request, final TransportChannel channel) {
|
||||||
final ClusterState state = pendingStatesQueue.markAsCommitted(request.stateUUID, new PendingClusterStatesQueue.StateProcessedListener() {
|
final ClusterState state = pendingStatesQueue.markAsCommitted(request.stateUUID,
|
||||||
|
new PendingClusterStatesQueue.StateProcessedListener() {
|
||||||
@Override
|
@Override
|
||||||
public void onNewClusterStateProcessed() {
|
public void onNewClusterStateProcessed() {
|
||||||
try {
|
try {
|
||||||
|
@ -442,7 +461,8 @@ public class PublishClusterStateAction extends AbstractComponent {
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
if (state != null) {
|
if (state != null) {
|
||||||
newPendingClusterStatelistener.onNewClusterState("master " + state.nodes().getMasterNode() + " committed version [" + state.version() + "]");
|
newPendingClusterStatelistener.onNewClusterState("master " + state.nodes().getMasterNode() +
|
||||||
|
" committed version [" + state.version() + "]");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -512,13 +532,15 @@ public class PublishClusterStateAction extends AbstractComponent {
|
||||||
// an external marker to note that the publishing process is timed out. This is useful for proper logging.
|
// an external marker to note that the publishing process is timed out. This is useful for proper logging.
|
||||||
final AtomicBoolean publishingTimedOut = new AtomicBoolean();
|
final AtomicBoolean publishingTimedOut = new AtomicBoolean();
|
||||||
|
|
||||||
private SendingController(ClusterState clusterState, int minMasterNodes, int totalMasterNodes, BlockingClusterStatePublishResponseHandler publishResponseHandler) {
|
private SendingController(ClusterState clusterState, int minMasterNodes, int totalMasterNodes,
|
||||||
|
BlockingClusterStatePublishResponseHandler publishResponseHandler) {
|
||||||
this.clusterState = clusterState;
|
this.clusterState = clusterState;
|
||||||
this.publishResponseHandler = publishResponseHandler;
|
this.publishResponseHandler = publishResponseHandler;
|
||||||
this.neededMastersToCommit = Math.max(0, minMasterNodes - 1); // we are one of the master nodes
|
this.neededMastersToCommit = Math.max(0, minMasterNodes - 1); // we are one of the master nodes
|
||||||
this.pendingMasterNodes = totalMasterNodes - 1;
|
this.pendingMasterNodes = totalMasterNodes - 1;
|
||||||
if (this.neededMastersToCommit > this.pendingMasterNodes) {
|
if (this.neededMastersToCommit > this.pendingMasterNodes) {
|
||||||
throw new Discovery.FailedToCommitClusterStateException("not enough masters to ack sent cluster state. [{}] needed , have [{}]", neededMastersToCommit, pendingMasterNodes);
|
throw new Discovery.FailedToCommitClusterStateException("not enough masters to ack sent cluster state." +
|
||||||
|
"[{}] needed , have [{}]", neededMastersToCommit, pendingMasterNodes);
|
||||||
}
|
}
|
||||||
this.committed = neededMastersToCommit == 0;
|
this.committed = neededMastersToCommit == 0;
|
||||||
this.committedOrFailedLatch = new CountDownLatch(committed ? 0 : 1);
|
this.committedOrFailedLatch = new CountDownLatch(committed ? 0 : 1);
|
||||||
|
@ -592,7 +614,8 @@ public class PublishClusterStateAction extends AbstractComponent {
|
||||||
|
|
||||||
public synchronized void onNodeSendFailed(DiscoveryNode node, Exception e) {
|
public synchronized void onNodeSendFailed(DiscoveryNode node, Exception e) {
|
||||||
if (node.isMasterNode()) {
|
if (node.isMasterNode()) {
|
||||||
logger.trace("master node {} failed to ack cluster state version [{}]. processing ... (current pending [{}], needed [{}])",
|
logger.trace("master node {} failed to ack cluster state version [{}]. " +
|
||||||
|
"processing ... (current pending [{}], needed [{}])",
|
||||||
node, clusterState.version(), pendingMasterNodes, neededMastersToCommit);
|
node, clusterState.version(), pendingMasterNodes, neededMastersToCommit);
|
||||||
decrementPendingMasterAcksAndChangeForFailure();
|
decrementPendingMasterAcksAndChangeForFailure();
|
||||||
}
|
}
|
||||||
|
@ -623,7 +646,8 @@ public class PublishClusterStateAction extends AbstractComponent {
|
||||||
if (committedOrFailed()) {
|
if (committedOrFailed()) {
|
||||||
return committed == false;
|
return committed == false;
|
||||||
}
|
}
|
||||||
logger.trace((org.apache.logging.log4j.util.Supplier<?>) () -> new ParameterizedMessage("failed to commit version [{}]. {}", clusterState.version(), details), reason);
|
logger.trace((org.apache.logging.log4j.util.Supplier<?>) () -> new ParameterizedMessage("failed to commit version [{}]. {}",
|
||||||
|
clusterState.version(), details), reason);
|
||||||
committed = false;
|
committed = false;
|
||||||
committedOrFailedLatch.countDown();
|
committedOrFailedLatch.countDown();
|
||||||
return true;
|
return true;
|
|
@ -17,7 +17,7 @@
|
||||||
* under the License.
|
* under the License.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
package org.elasticsearch.discovery.zen.ping.unicast;
|
package org.elasticsearch.discovery.zen;
|
||||||
|
|
||||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||||
|
|
|
@ -17,7 +17,7 @@
|
||||||
* under the License.
|
* under the License.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
package org.elasticsearch.discovery.zen.ping.unicast;
|
package org.elasticsearch.discovery.zen;
|
||||||
|
|
||||||
import com.carrotsearch.hppc.cursors.ObjectCursor;
|
import com.carrotsearch.hppc.cursors.ObjectCursor;
|
||||||
import org.apache.logging.log4j.message.ParameterizedMessage;
|
import org.apache.logging.log4j.message.ParameterizedMessage;
|
||||||
|
@ -44,9 +44,6 @@ import org.elasticsearch.common.util.concurrent.AbstractRunnable;
|
||||||
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
|
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
|
||||||
import org.elasticsearch.common.util.concurrent.EsExecutors;
|
import org.elasticsearch.common.util.concurrent.EsExecutors;
|
||||||
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
|
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
|
||||||
import org.elasticsearch.discovery.zen.ElectMasterService;
|
|
||||||
import org.elasticsearch.discovery.zen.ping.PingContextProvider;
|
|
||||||
import org.elasticsearch.discovery.zen.ping.ZenPing;
|
|
||||||
import org.elasticsearch.threadpool.ThreadPool;
|
import org.elasticsearch.threadpool.ThreadPool;
|
||||||
import org.elasticsearch.transport.ConnectTransportException;
|
import org.elasticsearch.transport.ConnectTransportException;
|
||||||
import org.elasticsearch.transport.RemoteTransportException;
|
import org.elasticsearch.transport.RemoteTransportException;
|
||||||
|
@ -84,7 +81,7 @@ import static java.util.Collections.emptyList;
|
||||||
import static java.util.Collections.emptyMap;
|
import static java.util.Collections.emptyMap;
|
||||||
import static java.util.Collections.emptySet;
|
import static java.util.Collections.emptySet;
|
||||||
import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentMap;
|
import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentMap;
|
||||||
import static org.elasticsearch.discovery.zen.ping.ZenPing.PingResponse.readPingResponse;
|
import static org.elasticsearch.discovery.zen.ZenPing.PingResponse.readPingResponse;
|
||||||
|
|
||||||
public class UnicastZenPing extends AbstractLifecycleComponent implements ZenPing {
|
public class UnicastZenPing extends AbstractLifecycleComponent implements ZenPing {
|
||||||
|
|
|
@ -54,14 +54,6 @@ import org.elasticsearch.common.unit.TimeValue;
|
||||||
import org.elasticsearch.discovery.Discovery;
|
import org.elasticsearch.discovery.Discovery;
|
||||||
import org.elasticsearch.discovery.DiscoverySettings;
|
import org.elasticsearch.discovery.DiscoverySettings;
|
||||||
import org.elasticsearch.discovery.DiscoveryStats;
|
import org.elasticsearch.discovery.DiscoveryStats;
|
||||||
import org.elasticsearch.discovery.zen.fd.MasterFaultDetection;
|
|
||||||
import org.elasticsearch.discovery.zen.fd.NodesFaultDetection;
|
|
||||||
import org.elasticsearch.discovery.zen.membership.MembershipAction;
|
|
||||||
import org.elasticsearch.discovery.zen.ping.PingContextProvider;
|
|
||||||
import org.elasticsearch.discovery.zen.ping.ZenPing;
|
|
||||||
import org.elasticsearch.discovery.zen.ping.ZenPingService;
|
|
||||||
import org.elasticsearch.discovery.zen.publish.PendingClusterStateStats;
|
|
||||||
import org.elasticsearch.discovery.zen.publish.PublishClusterStateAction;
|
|
||||||
import org.elasticsearch.threadpool.ThreadPool;
|
import org.elasticsearch.threadpool.ThreadPool;
|
||||||
import org.elasticsearch.transport.EmptyTransportResponseHandler;
|
import org.elasticsearch.transport.EmptyTransportResponseHandler;
|
||||||
import org.elasticsearch.transport.TransportChannel;
|
import org.elasticsearch.transport.TransportChannel;
|
||||||
|
@ -288,7 +280,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
|
||||||
return clusterName.value() + "/" + clusterService.localNode().getId();
|
return clusterName.value() + "/" + clusterService.localNode().getId();
|
||||||
}
|
}
|
||||||
|
|
||||||
/** start of {@link org.elasticsearch.discovery.zen.ping.PingContextProvider } implementation */
|
/** start of {@link PingContextProvider } implementation */
|
||||||
@Override
|
@Override
|
||||||
public DiscoveryNodes nodes() {
|
public DiscoveryNodes nodes() {
|
||||||
return clusterService.state().nodes();
|
return clusterService.state().nodes();
|
||||||
|
@ -299,7 +291,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
|
||||||
return clusterService.state();
|
return clusterService.state();
|
||||||
}
|
}
|
||||||
|
|
||||||
/** end of {@link org.elasticsearch.discovery.zen.ping.PingContextProvider } implementation */
|
/** end of {@link PingContextProvider } implementation */
|
||||||
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -17,7 +17,7 @@
|
||||||
* under the License.
|
* under the License.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
package org.elasticsearch.discovery.zen.ping;
|
package org.elasticsearch.discovery.zen;
|
||||||
|
|
||||||
import org.elasticsearch.cluster.ClusterName;
|
import org.elasticsearch.cluster.ClusterName;
|
||||||
import org.elasticsearch.cluster.ClusterState;
|
import org.elasticsearch.cluster.ClusterState;
|
||||||
|
@ -27,7 +27,6 @@ import org.elasticsearch.common.io.stream.StreamInput;
|
||||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||||
import org.elasticsearch.common.io.stream.Streamable;
|
import org.elasticsearch.common.io.stream.Streamable;
|
||||||
import org.elasticsearch.common.unit.TimeValue;
|
import org.elasticsearch.common.unit.TimeValue;
|
||||||
import org.elasticsearch.discovery.zen.ElectMasterService;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
@ -159,8 +158,8 @@ public interface ZenPing extends LifecycleComponent {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return "ping_response{node [" + node + "], id[" + id + "], master [" + master + "], cluster_state_version [" + clusterStateVersion
|
return "ping_response{node [" + node + "], id[" + id + "], master [" + master + "]," +
|
||||||
+ "], cluster_name[" + clusterName.value() + "]}";
|
"cluster_state_version [" + clusterStateVersion + "], cluster_name[" + clusterName.value() + "]}";
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,7 +17,7 @@
|
||||||
* under the License.
|
* under the License.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
package org.elasticsearch.discovery.zen.ping;
|
package org.elasticsearch.discovery.zen;
|
||||||
|
|
||||||
import org.elasticsearch.common.component.AbstractLifecycleComponent;
|
import org.elasticsearch.common.component.AbstractLifecycleComponent;
|
||||||
import org.elasticsearch.common.inject.Inject;
|
import org.elasticsearch.common.inject.Inject;
|
|
@ -23,7 +23,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||||
import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
||||||
import org.elasticsearch.common.io.stream.StreamInput;
|
import org.elasticsearch.common.io.stream.StreamInput;
|
||||||
import org.elasticsearch.discovery.DiscoveryStats;
|
import org.elasticsearch.discovery.DiscoveryStats;
|
||||||
import org.elasticsearch.discovery.zen.publish.PendingClusterStateStats;
|
import org.elasticsearch.discovery.zen.PendingClusterStateStats;
|
||||||
import org.elasticsearch.http.HttpStats;
|
import org.elasticsearch.http.HttpStats;
|
||||||
import org.elasticsearch.indices.breaker.AllCircuitBreakerStats;
|
import org.elasticsearch.indices.breaker.AllCircuitBreakerStats;
|
||||||
import org.elasticsearch.indices.breaker.CircuitBreakerStats;
|
import org.elasticsearch.indices.breaker.CircuitBreakerStats;
|
||||||
|
|
|
@ -24,7 +24,7 @@ import org.elasticsearch.action.index.IndexResponse;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.discovery.DiscoverySettings;
|
import org.elasticsearch.discovery.DiscoverySettings;
|
||||||
import org.elasticsearch.discovery.zen.ElectMasterService;
|
import org.elasticsearch.discovery.zen.ElectMasterService;
|
||||||
import org.elasticsearch.discovery.zen.fd.FaultDetection;
|
import org.elasticsearch.discovery.zen.FaultDetection;
|
||||||
import org.elasticsearch.plugins.Plugin;
|
import org.elasticsearch.plugins.Plugin;
|
||||||
import org.elasticsearch.test.ESIntegTestCase;
|
import org.elasticsearch.test.ESIntegTestCase;
|
||||||
import org.elasticsearch.test.disruption.NetworkDisruption;
|
import org.elasticsearch.test.disruption.NetworkDisruption;
|
||||||
|
|
|
@ -51,12 +51,12 @@ import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.common.unit.TimeValue;
|
import org.elasticsearch.common.unit.TimeValue;
|
||||||
import org.elasticsearch.discovery.zen.ElectMasterService;
|
import org.elasticsearch.discovery.zen.ElectMasterService;
|
||||||
import org.elasticsearch.discovery.zen.ZenDiscovery;
|
import org.elasticsearch.discovery.zen.ZenDiscovery;
|
||||||
import org.elasticsearch.discovery.zen.fd.FaultDetection;
|
import org.elasticsearch.discovery.zen.FaultDetection;
|
||||||
import org.elasticsearch.discovery.zen.membership.MembershipAction;
|
import org.elasticsearch.discovery.zen.MembershipAction;
|
||||||
import org.elasticsearch.discovery.zen.ping.ZenPing;
|
import org.elasticsearch.discovery.zen.ZenPing;
|
||||||
import org.elasticsearch.discovery.zen.ping.ZenPingService;
|
import org.elasticsearch.discovery.zen.ZenPingService;
|
||||||
import org.elasticsearch.discovery.zen.ping.unicast.UnicastZenPing;
|
import org.elasticsearch.discovery.zen.UnicastZenPing;
|
||||||
import org.elasticsearch.discovery.zen.publish.PublishClusterStateAction;
|
import org.elasticsearch.discovery.zen.PublishClusterStateAction;
|
||||||
import org.elasticsearch.env.NodeEnvironment;
|
import org.elasticsearch.env.NodeEnvironment;
|
||||||
import org.elasticsearch.indices.store.IndicesStoreIntegrationIT;
|
import org.elasticsearch.indices.store.IndicesStoreIntegrationIT;
|
||||||
import org.elasticsearch.monitor.jvm.HotThreads;
|
import org.elasticsearch.monitor.jvm.HotThreads;
|
||||||
|
|
|
@ -34,9 +34,9 @@ import org.elasticsearch.common.settings.ClusterSettings;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||||
import org.elasticsearch.common.util.BigArrays;
|
import org.elasticsearch.common.util.BigArrays;
|
||||||
import org.elasticsearch.discovery.zen.fd.FaultDetection;
|
import org.elasticsearch.discovery.zen.FaultDetection;
|
||||||
import org.elasticsearch.discovery.zen.fd.MasterFaultDetection;
|
import org.elasticsearch.discovery.zen.MasterFaultDetection;
|
||||||
import org.elasticsearch.discovery.zen.fd.NodesFaultDetection;
|
import org.elasticsearch.discovery.zen.NodesFaultDetection;
|
||||||
import org.elasticsearch.indices.breaker.CircuitBreakerService;
|
import org.elasticsearch.indices.breaker.CircuitBreakerService;
|
||||||
import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService;
|
import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService;
|
||||||
import org.elasticsearch.test.ESTestCase;
|
import org.elasticsearch.test.ESTestCase;
|
||||||
|
|
|
@ -42,7 +42,6 @@ import org.elasticsearch.common.unit.TimeValue;
|
||||||
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
|
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
|
||||||
import org.elasticsearch.common.util.concurrent.BaseFuture;
|
import org.elasticsearch.common.util.concurrent.BaseFuture;
|
||||||
import org.elasticsearch.discovery.DiscoverySettings;
|
import org.elasticsearch.discovery.DiscoverySettings;
|
||||||
import org.elasticsearch.discovery.zen.membership.MembershipAction;
|
|
||||||
import org.elasticsearch.index.shard.ShardId;
|
import org.elasticsearch.index.shard.ShardId;
|
||||||
import org.elasticsearch.test.ESTestCase;
|
import org.elasticsearch.test.ESTestCase;
|
||||||
import org.elasticsearch.test.VersionUtils;
|
import org.elasticsearch.test.VersionUtils;
|
||||||
|
|
|
@ -16,7 +16,8 @@
|
||||||
* specific language governing permissions and limitations
|
* specific language governing permissions and limitations
|
||||||
* under the License.
|
* under the License.
|
||||||
*/
|
*/
|
||||||
package org.elasticsearch.discovery.zen.publish;
|
|
||||||
|
package org.elasticsearch.discovery.zen;
|
||||||
|
|
||||||
import org.elasticsearch.ElasticsearchException;
|
import org.elasticsearch.ElasticsearchException;
|
||||||
import org.elasticsearch.Version;
|
import org.elasticsearch.Version;
|
||||||
|
@ -25,7 +26,8 @@ import org.elasticsearch.cluster.ClusterState;
|
||||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.discovery.zen.publish.PendingClusterStatesQueue.ClusterStateContext;
|
import org.elasticsearch.discovery.zen.PendingClusterStatesQueue;
|
||||||
|
import org.elasticsearch.discovery.zen.PendingClusterStatesQueue.ClusterStateContext;
|
||||||
import org.elasticsearch.test.ESTestCase;
|
import org.elasticsearch.test.ESTestCase;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
|
@ -17,7 +17,7 @@
|
||||||
* under the License.
|
* under the License.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
package org.elasticsearch.discovery.zen.publish;
|
package org.elasticsearch.discovery.zen;
|
||||||
|
|
||||||
import org.apache.logging.log4j.Logger;
|
import org.apache.logging.log4j.Logger;
|
||||||
import org.elasticsearch.ElasticsearchException;
|
import org.elasticsearch.ElasticsearchException;
|
||||||
|
@ -42,6 +42,7 @@ import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.discovery.Discovery;
|
import org.elasticsearch.discovery.Discovery;
|
||||||
import org.elasticsearch.discovery.DiscoverySettings;
|
import org.elasticsearch.discovery.DiscoverySettings;
|
||||||
import org.elasticsearch.discovery.zen.DiscoveryNodesProvider;
|
import org.elasticsearch.discovery.zen.DiscoveryNodesProvider;
|
||||||
|
import org.elasticsearch.discovery.zen.PublishClusterStateAction;
|
||||||
import org.elasticsearch.env.NodeEnvironment;
|
import org.elasticsearch.env.NodeEnvironment;
|
||||||
import org.elasticsearch.node.Node;
|
import org.elasticsearch.node.Node;
|
||||||
import org.elasticsearch.test.ESTestCase;
|
import org.elasticsearch.test.ESTestCase;
|
||||||
|
@ -100,21 +101,25 @@ public class PublishClusterStateActionTests extends ESTestCase {
|
||||||
|
|
||||||
private final Logger logger;
|
private final Logger logger;
|
||||||
|
|
||||||
public MockNode(DiscoveryNode discoveryNode, MockTransportService service, @Nullable ClusterStateListener listener, Logger logger) {
|
public MockNode(DiscoveryNode discoveryNode, MockTransportService service,
|
||||||
|
@Nullable ClusterStateListener listener, Logger logger) {
|
||||||
this.discoveryNode = discoveryNode;
|
this.discoveryNode = discoveryNode;
|
||||||
this.service = service;
|
this.service = service;
|
||||||
this.listener = listener;
|
this.listener = listener;
|
||||||
this.logger = logger;
|
this.logger = logger;
|
||||||
this.clusterState = ClusterState.builder(CLUSTER_NAME).nodes(DiscoveryNodes.builder().add(discoveryNode).localNodeId(discoveryNode.getId()).build()).build();
|
this.clusterState = ClusterState.builder(CLUSTER_NAME).nodes(DiscoveryNodes.builder()
|
||||||
|
.add(discoveryNode).localNodeId(discoveryNode.getId()).build()).build();
|
||||||
}
|
}
|
||||||
|
|
||||||
public MockNode setAsMaster() {
|
public MockNode setAsMaster() {
|
||||||
this.clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes()).masterNodeId(discoveryNode.getId())).build();
|
this.clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes())
|
||||||
|
.masterNodeId(discoveryNode.getId())).build();
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
public MockNode resetMasterId() {
|
public MockNode resetMasterId() {
|
||||||
this.clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes()).masterNodeId(null)).build();
|
this.clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes())
|
||||||
|
.masterNodeId(null)).build();
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -126,7 +131,8 @@ public class PublishClusterStateActionTests extends ESTestCase {
|
||||||
@Override
|
@Override
|
||||||
public void onNewClusterState(String reason) {
|
public void onNewClusterState(String reason) {
|
||||||
ClusterState newClusterState = action.pendingStatesQueue().getNextClusterStateToProcess();
|
ClusterState newClusterState = action.pendingStatesQueue().getNextClusterStateToProcess();
|
||||||
logger.debug("[{}] received version [{}], uuid [{}]", discoveryNode.getName(), newClusterState.version(), newClusterState.stateUUID());
|
logger.debug("[{}] received version [{}], uuid [{}]",
|
||||||
|
discoveryNode.getName(), newClusterState.version(), newClusterState.stateUUID());
|
||||||
if (listener != null) {
|
if (listener != null) {
|
||||||
ClusterChangedEvent event = new ClusterChangedEvent("", newClusterState, clusterState);
|
ClusterChangedEvent event = new ClusterChangedEvent("", newClusterState, clusterState);
|
||||||
listener.clusterChanged(event);
|
listener.clusterChanged(event);
|
||||||
|
@ -156,7 +162,8 @@ public class PublishClusterStateActionTests extends ESTestCase {
|
||||||
ThreadPool threadPool, Logger logger, Map<String, MockNode> nodes) throws Exception {
|
ThreadPool threadPool, Logger logger, Map<String, MockNode> nodes) throws Exception {
|
||||||
final Settings settings = Settings.builder()
|
final Settings settings = Settings.builder()
|
||||||
.put("name", name)
|
.put("name", name)
|
||||||
.put(TransportService.TRACE_LOG_INCLUDE_SETTING.getKey(), "", TransportService.TRACE_LOG_EXCLUDE_SETTING.getKey(), "NOTHING")
|
.put(TransportService.TRACE_LOG_INCLUDE_SETTING.getKey(), "",
|
||||||
|
TransportService.TRACE_LOG_EXCLUDE_SETTING.getKey(), "NOTHING")
|
||||||
.put(basSettings)
|
.put(basSettings)
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
|
@ -229,7 +236,7 @@ public class PublishClusterStateActionTests extends ESTestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
private static MockTransportService buildTransportService(Settings settings, ThreadPool threadPool) {
|
private static MockTransportService buildTransportService(Settings settings, ThreadPool threadPool) {
|
||||||
MockTransportService transportService = MockTransportService.createNewService(settings, Version.CURRENT, threadPool, null );
|
MockTransportService transportService = MockTransportService.createNewService(settings, Version.CURRENT, threadPool, null);
|
||||||
transportService.start();
|
transportService.start();
|
||||||
transportService.acceptIncomingRequests();
|
transportService.acceptIncomingRequests();
|
||||||
return transportService;
|
return transportService;
|
||||||
|
@ -268,7 +275,8 @@ public class PublishClusterStateActionTests extends ESTestCase {
|
||||||
|
|
||||||
// cluster state update - add block
|
// cluster state update - add block
|
||||||
previousClusterState = clusterState;
|
previousClusterState = clusterState;
|
||||||
clusterState = ClusterState.builder(clusterState).blocks(ClusterBlocks.builder().addGlobalBlock(MetaData.CLUSTER_READ_ONLY_BLOCK)).incrementVersion().build();
|
clusterState = ClusterState.builder(clusterState).blocks(ClusterBlocks.builder()
|
||||||
|
.addGlobalBlock(MetaData.CLUSTER_READ_ONLY_BLOCK)).incrementVersion().build();
|
||||||
publishStateAndWait(nodeA.action, clusterState, previousClusterState);
|
publishStateAndWait(nodeA.action, clusterState, previousClusterState);
|
||||||
assertSameStateFromDiff(nodeB.clusterState, clusterState);
|
assertSameStateFromDiff(nodeB.clusterState, clusterState);
|
||||||
assertThat(nodeB.clusterState.blocks().global().size(), equalTo(1));
|
assertThat(nodeB.clusterState.blocks().global().size(), equalTo(1));
|
||||||
|
@ -295,7 +303,8 @@ public class PublishClusterStateActionTests extends ESTestCase {
|
||||||
|
|
||||||
// cluster state update 4 - update settings
|
// cluster state update 4 - update settings
|
||||||
previousClusterState = clusterState;
|
previousClusterState = clusterState;
|
||||||
MetaData metaData = MetaData.builder(clusterState.metaData()).transientSettings(Settings.builder().put("foo", "bar").build()).build();
|
MetaData metaData = MetaData.builder(clusterState.metaData())
|
||||||
|
.transientSettings(Settings.builder().put("foo", "bar").build()).build();
|
||||||
clusterState = ClusterState.builder(clusterState).metaData(metaData).incrementVersion().build();
|
clusterState = ClusterState.builder(clusterState).metaData(metaData).incrementVersion().build();
|
||||||
publishStateAndWait(nodeA.action, clusterState, previousClusterState);
|
publishStateAndWait(nodeA.action, clusterState, previousClusterState);
|
||||||
assertSameStateFromDiff(nodeB.clusterState, clusterState);
|
assertSameStateFromDiff(nodeB.clusterState, clusterState);
|
||||||
|
@ -338,7 +347,8 @@ public class PublishClusterStateActionTests extends ESTestCase {
|
||||||
|
|
||||||
MockNode nodeB = createMockNode("nodeB");
|
MockNode nodeB = createMockNode("nodeB");
|
||||||
|
|
||||||
// Initial cluster state with both states - the second node still shouldn't get diff even though it's present in the previous cluster state
|
// Initial cluster state with both states - the second node still shouldn't
|
||||||
|
// get diff even though it's present in the previous cluster state
|
||||||
DiscoveryNodes discoveryNodes = DiscoveryNodes.builder(nodeA.nodes()).add(nodeB.discoveryNode).build();
|
DiscoveryNodes discoveryNodes = DiscoveryNodes.builder(nodeA.nodes()).add(nodeB.discoveryNode).build();
|
||||||
ClusterState previousClusterState = ClusterState.builder(CLUSTER_NAME).nodes(discoveryNodes).build();
|
ClusterState previousClusterState = ClusterState.builder(CLUSTER_NAME).nodes(discoveryNodes).build();
|
||||||
ClusterState clusterState = ClusterState.builder(previousClusterState).incrementVersion().build();
|
ClusterState clusterState = ClusterState.builder(previousClusterState).incrementVersion().build();
|
||||||
|
@ -347,7 +357,8 @@ public class PublishClusterStateActionTests extends ESTestCase {
|
||||||
|
|
||||||
// cluster state update - add block
|
// cluster state update - add block
|
||||||
previousClusterState = clusterState;
|
previousClusterState = clusterState;
|
||||||
clusterState = ClusterState.builder(clusterState).blocks(ClusterBlocks.builder().addGlobalBlock(MetaData.CLUSTER_READ_ONLY_BLOCK)).incrementVersion().build();
|
clusterState = ClusterState.builder(clusterState).blocks(ClusterBlocks.builder()
|
||||||
|
.addGlobalBlock(MetaData.CLUSTER_READ_ONLY_BLOCK)).incrementVersion().build();
|
||||||
publishStateAndWait(nodeA.action, clusterState, previousClusterState);
|
publishStateAndWait(nodeA.action, clusterState, previousClusterState);
|
||||||
assertSameStateFromDiff(nodeB.clusterState, clusterState);
|
assertSameStateFromDiff(nodeB.clusterState, clusterState);
|
||||||
}
|
}
|
||||||
|
@ -370,7 +381,8 @@ public class PublishClusterStateActionTests extends ESTestCase {
|
||||||
});
|
});
|
||||||
|
|
||||||
// Initial cluster state
|
// Initial cluster state
|
||||||
DiscoveryNodes discoveryNodes = DiscoveryNodes.builder().add(nodeA.discoveryNode).localNodeId(nodeA.discoveryNode.getId()).masterNodeId(nodeA.discoveryNode.getId()).build();
|
DiscoveryNodes discoveryNodes = DiscoveryNodes.builder()
|
||||||
|
.add(nodeA.discoveryNode).localNodeId(nodeA.discoveryNode.getId()).masterNodeId(nodeA.discoveryNode.getId()).build();
|
||||||
ClusterState clusterState = ClusterState.builder(CLUSTER_NAME).nodes(discoveryNodes).build();
|
ClusterState clusterState = ClusterState.builder(CLUSTER_NAME).nodes(discoveryNodes).build();
|
||||||
|
|
||||||
// cluster state update - add nodeB
|
// cluster state update - add nodeB
|
||||||
|
@ -381,7 +393,8 @@ public class PublishClusterStateActionTests extends ESTestCase {
|
||||||
|
|
||||||
// cluster state update - add block
|
// cluster state update - add block
|
||||||
previousClusterState = clusterState;
|
previousClusterState = clusterState;
|
||||||
clusterState = ClusterState.builder(clusterState).blocks(ClusterBlocks.builder().addGlobalBlock(MetaData.CLUSTER_READ_ONLY_BLOCK)).incrementVersion().build();
|
clusterState = ClusterState.builder(clusterState).blocks(ClusterBlocks.builder()
|
||||||
|
.addGlobalBlock(MetaData.CLUSTER_READ_ONLY_BLOCK)).incrementVersion().build();
|
||||||
publishStateAndWait(nodeA.action, clusterState, previousClusterState);
|
publishStateAndWait(nodeA.action, clusterState, previousClusterState);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -446,7 +459,8 @@ public class PublishClusterStateActionTests extends ESTestCase {
|
||||||
|
|
||||||
MockNode nodeB = createMockNode("nodeB");
|
MockNode nodeB = createMockNode("nodeB");
|
||||||
|
|
||||||
// Initial cluster state with both states - the second node still shouldn't get diff even though it's present in the previous cluster state
|
// Initial cluster state with both states - the second node still shouldn't get
|
||||||
|
// diff even though it's present in the previous cluster state
|
||||||
DiscoveryNodes discoveryNodes = DiscoveryNodes.builder(nodeA.nodes()).add(nodeB.discoveryNode).build();
|
DiscoveryNodes discoveryNodes = DiscoveryNodes.builder(nodeA.nodes()).add(nodeB.discoveryNode).build();
|
||||||
ClusterState previousClusterState = ClusterState.builder(CLUSTER_NAME).nodes(discoveryNodes).build();
|
ClusterState previousClusterState = ClusterState.builder(CLUSTER_NAME).nodes(discoveryNodes).build();
|
||||||
ClusterState clusterState = ClusterState.builder(previousClusterState).incrementVersion().build();
|
ClusterState clusterState = ClusterState.builder(previousClusterState).incrementVersion().build();
|
||||||
|
@ -455,7 +469,8 @@ public class PublishClusterStateActionTests extends ESTestCase {
|
||||||
|
|
||||||
// cluster state update - add block
|
// cluster state update - add block
|
||||||
previousClusterState = clusterState;
|
previousClusterState = clusterState;
|
||||||
clusterState = ClusterState.builder(clusterState).blocks(ClusterBlocks.builder().addGlobalBlock(MetaData.CLUSTER_READ_ONLY_BLOCK)).incrementVersion().build();
|
clusterState = ClusterState.builder(clusterState).blocks(ClusterBlocks.builder()
|
||||||
|
.addGlobalBlock(MetaData.CLUSTER_READ_ONLY_BLOCK)).incrementVersion().build();
|
||||||
|
|
||||||
ClusterState unserializableClusterState = new ClusterState(clusterState.version(), clusterState.stateUUID(), clusterState) {
|
ClusterState unserializableClusterState = new ClusterState(clusterState.version(), clusterState.stateUUID(), clusterState) {
|
||||||
@Override
|
@Override
|
||||||
|
@ -603,7 +618,8 @@ public class PublishClusterStateActionTests extends ESTestCase {
|
||||||
node.action.validateIncomingState(state, null);
|
node.action.validateIncomingState(state, null);
|
||||||
|
|
||||||
// now set a master node
|
// now set a master node
|
||||||
node.clusterState = ClusterState.builder(node.clusterState).nodes(DiscoveryNodes.builder(node.nodes()).masterNodeId("master")).build();
|
node.clusterState = ClusterState.builder(node.clusterState)
|
||||||
|
.nodes(DiscoveryNodes.builder(node.nodes()).masterNodeId("master")).build();
|
||||||
logger.info("--> testing rejection of another master");
|
logger.info("--> testing rejection of another master");
|
||||||
try {
|
try {
|
||||||
node.action.validateIncomingState(state, node.clusterState);
|
node.action.validateIncomingState(state, node.clusterState);
|
||||||
|
@ -619,7 +635,8 @@ public class PublishClusterStateActionTests extends ESTestCase {
|
||||||
|
|
||||||
logger.info("--> testing rejection of another cluster name");
|
logger.info("--> testing rejection of another cluster name");
|
||||||
try {
|
try {
|
||||||
node.action.validateIncomingState(ClusterState.builder(new ClusterName(randomAsciiOfLength(10))).nodes(node.nodes()).build(), node.clusterState);
|
node.action.validateIncomingState(ClusterState.builder(new ClusterName(randomAsciiOfLength(10)))
|
||||||
|
.nodes(node.nodes()).build(), node.clusterState);
|
||||||
fail("node accepted state with another cluster name");
|
fail("node accepted state with another cluster name");
|
||||||
} catch (IllegalStateException OK) {
|
} catch (IllegalStateException OK) {
|
||||||
assertThat(OK.toString(), containsString("received state from a node that is not part of the cluster"));
|
assertThat(OK.toString(), containsString("received state from a node that is not part of the cluster"));
|
||||||
|
@ -687,8 +704,8 @@ public class PublishClusterStateActionTests extends ESTestCase {
|
||||||
logger.info("--> publishing states");
|
logger.info("--> publishing states");
|
||||||
for (ClusterState state : states) {
|
for (ClusterState state : states) {
|
||||||
node.action.handleIncomingClusterStateRequest(
|
node.action.handleIncomingClusterStateRequest(
|
||||||
new BytesTransportRequest(PublishClusterStateAction.serializeFullClusterState(state, Version.CURRENT), Version.CURRENT),
|
new BytesTransportRequest(PublishClusterStateAction.serializeFullClusterState(state, Version.CURRENT), Version.CURRENT),
|
||||||
channel);
|
channel);
|
||||||
assertThat(channel.response.get(), equalTo((TransportResponse) TransportResponse.Empty.INSTANCE));
|
assertThat(channel.response.get(), equalTo((TransportResponse) TransportResponse.Empty.INSTANCE));
|
||||||
assertThat(channel.error.get(), nullValue());
|
assertThat(channel.error.get(), nullValue());
|
||||||
channel.clear();
|
channel.clear();
|
||||||
|
@ -725,12 +742,14 @@ public class PublishClusterStateActionTests extends ESTestCase {
|
||||||
*/
|
*/
|
||||||
public void testTimeoutOrCommit() throws Exception {
|
public void testTimeoutOrCommit() throws Exception {
|
||||||
Settings settings = Settings.builder()
|
Settings settings = Settings.builder()
|
||||||
.put(DiscoverySettings.COMMIT_TIMEOUT_SETTING.getKey(), "1ms").build(); // short but so we will sometime commit sometime timeout
|
// short but so we will sometime commit sometime timeout
|
||||||
|
.put(DiscoverySettings.COMMIT_TIMEOUT_SETTING.getKey(), "1ms").build();
|
||||||
|
|
||||||
MockNode master = createMockNode("master", settings, null);
|
MockNode master = createMockNode("master", settings, null);
|
||||||
MockNode node = createMockNode("node", settings, null);
|
MockNode node = createMockNode("node", settings, null);
|
||||||
ClusterState state = ClusterState.builder(master.clusterState)
|
ClusterState state = ClusterState.builder(master.clusterState)
|
||||||
.nodes(DiscoveryNodes.builder(master.clusterState.nodes()).add(node.discoveryNode).masterNodeId(master.discoveryNode.getId())).build();
|
.nodes(DiscoveryNodes.builder(master.clusterState.nodes())
|
||||||
|
.add(node.discoveryNode).masterNodeId(master.discoveryNode.getId())).build();
|
||||||
|
|
||||||
for (int i = 0; i < 10; i++) {
|
for (int i = 0; i < 10; i++) {
|
||||||
state = ClusterState.builder(state).incrementVersion().build();
|
state = ClusterState.builder(state).incrementVersion().build();
|
||||||
|
@ -755,7 +774,8 @@ public class PublishClusterStateActionTests extends ESTestCase {
|
||||||
|
|
||||||
private MetaData buildMetaDataForVersion(MetaData metaData, long version) {
|
private MetaData buildMetaDataForVersion(MetaData metaData, long version) {
|
||||||
ImmutableOpenMap.Builder<String, IndexMetaData> indices = ImmutableOpenMap.builder(metaData.indices());
|
ImmutableOpenMap.Builder<String, IndexMetaData> indices = ImmutableOpenMap.builder(metaData.indices());
|
||||||
indices.put("test" + version, IndexMetaData.builder("test" + version).settings(Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT))
|
indices.put("test" + version, IndexMetaData.builder("test" + version)
|
||||||
|
.settings(Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT))
|
||||||
.numberOfShards((int) version).numberOfReplicas(0).build());
|
.numberOfShards((int) version).numberOfReplicas(0).build());
|
||||||
return MetaData.builder(metaData)
|
return MetaData.builder(metaData)
|
||||||
.transientSettings(Settings.builder().put("test", version).build())
|
.transientSettings(Settings.builder().put("test", version).build())
|
||||||
|
@ -772,16 +792,19 @@ public class PublishClusterStateActionTests extends ESTestCase {
|
||||||
assertThat(metaData.transientSettings().get("test"), equalTo(Long.toString(version)));
|
assertThat(metaData.transientSettings().get("test"), equalTo(Long.toString(version)));
|
||||||
}
|
}
|
||||||
|
|
||||||
public void publishStateAndWait(PublishClusterStateAction action, ClusterState state, ClusterState previousState) throws InterruptedException {
|
public void publishStateAndWait(PublishClusterStateAction action, ClusterState state,
|
||||||
|
ClusterState previousState) throws InterruptedException {
|
||||||
publishState(action, state, previousState).await(1, TimeUnit.SECONDS);
|
publishState(action, state, previousState).await(1, TimeUnit.SECONDS);
|
||||||
}
|
}
|
||||||
|
|
||||||
public AssertingAckListener publishState(PublishClusterStateAction action, ClusterState state, ClusterState previousState) throws InterruptedException {
|
public AssertingAckListener publishState(PublishClusterStateAction action, ClusterState state,
|
||||||
|
ClusterState previousState) throws InterruptedException {
|
||||||
final int minimumMasterNodes = randomIntBetween(-1, state.nodes().getMasterNodes().size());
|
final int minimumMasterNodes = randomIntBetween(-1, state.nodes().getMasterNodes().size());
|
||||||
return publishState(action, state, previousState, minimumMasterNodes);
|
return publishState(action, state, previousState, minimumMasterNodes);
|
||||||
}
|
}
|
||||||
|
|
||||||
public AssertingAckListener publishState(PublishClusterStateAction action, ClusterState state, ClusterState previousState, int minMasterNodes) throws InterruptedException {
|
public AssertingAckListener publishState(PublishClusterStateAction action, ClusterState state,
|
||||||
|
ClusterState previousState, int minMasterNodes) throws InterruptedException {
|
||||||
AssertingAckListener assertingAckListener = new AssertingAckListener(state.nodes().getSize() - 1);
|
AssertingAckListener assertingAckListener = new AssertingAckListener(state.nodes().getSize() - 1);
|
||||||
ClusterChangedEvent changedEvent = new ClusterChangedEvent("test update", state, previousState);
|
ClusterChangedEvent changedEvent = new ClusterChangedEvent("test update", state, previousState);
|
||||||
action.publish(changedEvent, minMasterNodes, assertingAckListener);
|
action.publish(changedEvent, minMasterNodes, assertingAckListener);
|
||||||
|
@ -829,7 +852,8 @@ public class PublishClusterStateActionTests extends ESTestCase {
|
||||||
|
|
||||||
void assertSameState(ClusterState actual, ClusterState expected) {
|
void assertSameState(ClusterState actual, ClusterState expected) {
|
||||||
assertThat(actual, notNullValue());
|
assertThat(actual, notNullValue());
|
||||||
final String reason = "\n--> actual ClusterState: " + actual.prettyPrint() + "\n--> expected ClusterState:" + expected.prettyPrint();
|
final String reason = "\n--> actual ClusterState: " + actual.prettyPrint() + "\n" +
|
||||||
|
"--> expected ClusterState:" + expected.prettyPrint();
|
||||||
assertThat("unequal UUIDs" + reason, actual.stateUUID(), equalTo(expected.stateUUID()));
|
assertThat("unequal UUIDs" + reason, actual.stateUUID(), equalTo(expected.stateUUID()));
|
||||||
assertThat("unequal versions" + reason, actual.version(), equalTo(expected.version()));
|
assertThat("unequal versions" + reason, actual.version(), equalTo(expected.version()));
|
||||||
}
|
}
|
||||||
|
@ -851,7 +875,9 @@ public class PublishClusterStateActionTests extends ESTestCase {
|
||||||
AtomicBoolean timeoutOnCommit = new AtomicBoolean();
|
AtomicBoolean timeoutOnCommit = new AtomicBoolean();
|
||||||
AtomicBoolean errorOnCommit = new AtomicBoolean();
|
AtomicBoolean errorOnCommit = new AtomicBoolean();
|
||||||
|
|
||||||
public MockPublishAction(Settings settings, TransportService transportService, Supplier<ClusterState> clusterStateSupplier, NewPendingClusterStateListener listener, DiscoverySettings discoverySettings, ClusterName clusterName) {
|
public MockPublishAction(Settings settings, TransportService transportService,
|
||||||
|
Supplier<ClusterState> clusterStateSupplier, NewPendingClusterStateListener listener,
|
||||||
|
DiscoverySettings discoverySettings, ClusterName clusterName) {
|
||||||
super(settings, transportService, clusterStateSupplier, listener, discoverySettings, clusterName);
|
super(settings, transportService, clusterStateSupplier, listener, discoverySettings, clusterName);
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,7 +17,7 @@
|
||||||
* under the License.
|
* under the License.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
package org.elasticsearch.discovery.zen.ping.unicast;
|
package org.elasticsearch.discovery.zen;
|
||||||
|
|
||||||
import org.elasticsearch.Version;
|
import org.elasticsearch.Version;
|
||||||
import org.elasticsearch.cluster.ClusterName;
|
import org.elasticsearch.cluster.ClusterName;
|
||||||
|
@ -34,8 +34,9 @@ import org.elasticsearch.common.unit.TimeValue;
|
||||||
import org.elasticsearch.common.util.BigArrays;
|
import org.elasticsearch.common.util.BigArrays;
|
||||||
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
|
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
|
||||||
import org.elasticsearch.discovery.zen.ElectMasterService;
|
import org.elasticsearch.discovery.zen.ElectMasterService;
|
||||||
import org.elasticsearch.discovery.zen.ping.PingContextProvider;
|
import org.elasticsearch.discovery.zen.UnicastZenPing;
|
||||||
import org.elasticsearch.discovery.zen.ping.ZenPing;
|
import org.elasticsearch.discovery.zen.PingContextProvider;
|
||||||
|
import org.elasticsearch.discovery.zen.ZenPing;
|
||||||
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
|
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
|
||||||
import org.elasticsearch.test.ESTestCase;
|
import org.elasticsearch.test.ESTestCase;
|
||||||
import org.elasticsearch.test.VersionUtils;
|
import org.elasticsearch.test.VersionUtils;
|
|
@ -37,9 +37,6 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||||
import org.elasticsearch.common.xcontent.XContentFactory;
|
import org.elasticsearch.common.xcontent.XContentFactory;
|
||||||
import org.elasticsearch.discovery.Discovery;
|
import org.elasticsearch.discovery.Discovery;
|
||||||
import org.elasticsearch.discovery.DiscoveryStats;
|
import org.elasticsearch.discovery.DiscoveryStats;
|
||||||
import org.elasticsearch.discovery.zen.fd.FaultDetection;
|
|
||||||
import org.elasticsearch.discovery.zen.membership.MembershipAction;
|
|
||||||
import org.elasticsearch.discovery.zen.publish.PublishClusterStateAction;
|
|
||||||
import org.elasticsearch.node.Node;
|
import org.elasticsearch.node.Node;
|
||||||
import org.elasticsearch.test.ESIntegTestCase;
|
import org.elasticsearch.test.ESIntegTestCase;
|
||||||
import org.elasticsearch.test.TestCustomMetaData;
|
import org.elasticsearch.test.TestCustomMetaData;
|
||||||
|
|
|
@ -32,9 +32,7 @@ import org.elasticsearch.cluster.service.ClusterService;
|
||||||
import org.elasticsearch.common.settings.ClusterSettings;
|
import org.elasticsearch.common.settings.ClusterSettings;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.discovery.Discovery;
|
import org.elasticsearch.discovery.Discovery;
|
||||||
import org.elasticsearch.discovery.zen.ping.ZenPing;
|
import org.elasticsearch.discovery.zen.PublishClusterStateActionTests.AssertingAckListener;
|
||||||
import org.elasticsearch.discovery.zen.ping.ZenPingService;
|
|
||||||
import org.elasticsearch.discovery.zen.publish.PublishClusterStateActionTests.AssertingAckListener;
|
|
||||||
import org.elasticsearch.test.ESTestCase;
|
import org.elasticsearch.test.ESTestCase;
|
||||||
import org.elasticsearch.test.transport.MockTransportService;
|
import org.elasticsearch.test.transport.MockTransportService;
|
||||||
import org.elasticsearch.threadpool.TestThreadPool;
|
import org.elasticsearch.threadpool.TestThreadPool;
|
||||||
|
|
|
@ -23,7 +23,6 @@ import org.elasticsearch.Version;
|
||||||
import org.elasticsearch.cluster.ClusterName;
|
import org.elasticsearch.cluster.ClusterName;
|
||||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.discovery.zen.ping.ZenPing;
|
|
||||||
import org.elasticsearch.test.ESTestCase;
|
import org.elasticsearch.test.ESTestCase;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
|
|
@ -39,7 +39,7 @@ import org.elasticsearch.common.network.NetworkService;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.common.transport.TransportAddress;
|
import org.elasticsearch.common.transport.TransportAddress;
|
||||||
import org.elasticsearch.common.unit.TimeValue;
|
import org.elasticsearch.common.unit.TimeValue;
|
||||||
import org.elasticsearch.discovery.zen.ping.unicast.UnicastHostsProvider;
|
import org.elasticsearch.discovery.zen.UnicastHostsProvider;
|
||||||
import org.elasticsearch.transport.TransportService;
|
import org.elasticsearch.transport.TransportService;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
|
|
@ -39,7 +39,7 @@ import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.common.transport.TransportAddress;
|
import org.elasticsearch.common.transport.TransportAddress;
|
||||||
import org.elasticsearch.common.unit.TimeValue;
|
import org.elasticsearch.common.unit.TimeValue;
|
||||||
import org.elasticsearch.common.util.SingleObjectCache;
|
import org.elasticsearch.common.util.SingleObjectCache;
|
||||||
import org.elasticsearch.discovery.zen.ping.unicast.UnicastHostsProvider;
|
import org.elasticsearch.discovery.zen.UnicastHostsProvider;
|
||||||
import org.elasticsearch.transport.TransportService;
|
import org.elasticsearch.transport.TransportService;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
|
|
@ -25,7 +25,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||||
import org.elasticsearch.common.component.AbstractComponent;
|
import org.elasticsearch.common.component.AbstractComponent;
|
||||||
import org.elasticsearch.common.inject.Inject;
|
import org.elasticsearch.common.inject.Inject;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.discovery.zen.ping.unicast.UnicastHostsProvider;
|
import org.elasticsearch.discovery.zen.UnicastHostsProvider;
|
||||||
import org.elasticsearch.env.Environment;
|
import org.elasticsearch.env.Environment;
|
||||||
import org.elasticsearch.transport.TransportService;
|
import org.elasticsearch.transport.TransportService;
|
||||||
|
|
||||||
|
@ -41,7 +41,7 @@ import java.util.concurrent.atomic.AtomicLong;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
import java.util.stream.Stream;
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
import static org.elasticsearch.discovery.zen.ping.unicast.UnicastZenPing.resolveDiscoveryNodes;
|
import static org.elasticsearch.discovery.zen.UnicastZenPing.resolveDiscoveryNodes;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* An implementation of {@link UnicastHostsProvider} that reads hosts/ports
|
* An implementation of {@link UnicastHostsProvider} that reads hosts/ports
|
||||||
|
|
|
@ -37,7 +37,7 @@ import org.elasticsearch.common.settings.Setting.Property;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.common.transport.TransportAddress;
|
import org.elasticsearch.common.transport.TransportAddress;
|
||||||
import org.elasticsearch.common.unit.TimeValue;
|
import org.elasticsearch.common.unit.TimeValue;
|
||||||
import org.elasticsearch.discovery.zen.ping.unicast.UnicastHostsProvider;
|
import org.elasticsearch.discovery.zen.UnicastHostsProvider;
|
||||||
import org.elasticsearch.transport.TransportService;
|
import org.elasticsearch.transport.TransportService;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
|
|
@ -26,8 +26,8 @@ import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.common.unit.TimeValue;
|
import org.elasticsearch.common.unit.TimeValue;
|
||||||
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
|
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
|
||||||
import org.elasticsearch.discovery.DiscoveryModule;
|
import org.elasticsearch.discovery.DiscoveryModule;
|
||||||
import org.elasticsearch.discovery.zen.ping.PingContextProvider;
|
import org.elasticsearch.discovery.zen.PingContextProvider;
|
||||||
import org.elasticsearch.discovery.zen.ping.ZenPing;
|
import org.elasticsearch.discovery.zen.ZenPing;
|
||||||
import org.elasticsearch.plugins.DiscoveryPlugin;
|
import org.elasticsearch.plugins.DiscoveryPlugin;
|
||||||
import org.elasticsearch.plugins.Plugin;
|
import org.elasticsearch.plugins.Plugin;
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue