Fix line length for bootstrap/client/discovery/gateway files (#34905)
Removes the checkstyle suppressions for files in org.elasticsearch.bootstrap/client/discovery/gateway packages. Relates to #34884
This commit is contained in:
parent
052dfa5646
commit
01c62fc06b
|
@ -108,10 +108,6 @@
|
|||
<suppress files="server[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]update[/\\]TransportUpdateAction.java" checks="LineLength" />
|
||||
<suppress files="server[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]update[/\\]UpdateRequest.java" checks="LineLength" />
|
||||
<suppress files="server[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]update[/\\]UpdateRequestBuilder.java" checks="LineLength" />
|
||||
<suppress files="server[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]bootstrap[/\\]JNANatives.java" checks="LineLength" />
|
||||
<suppress files="server[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]client[/\\]FilterClient.java" checks="LineLength" />
|
||||
<suppress files="server[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]client[/\\]support[/\\]AbstractClient.java" checks="LineLength" />
|
||||
<suppress files="server[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]client[/\\]transport[/\\]TransportClient.java" checks="LineLength" />
|
||||
<suppress files="server[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]cluster[/\\]ClusterStateObserver.java" checks="LineLength" />
|
||||
<suppress files="server[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]cluster[/\\]ClusterStateUpdateTask.java" checks="LineLength" />
|
||||
<suppress files="server[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]cluster[/\\]DiffableUtils.java" checks="LineLength" />
|
||||
|
@ -146,15 +142,6 @@
|
|||
<suppress files="server[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]cluster[/\\]routing[/\\]allocation[/\\]command[/\\]AllocationCommands.java" checks="LineLength" />
|
||||
<suppress files="server[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]cluster[/\\]routing[/\\]allocation[/\\]command[/\\]MoveAllocationCommand.java" checks="LineLength" />
|
||||
<suppress files="server[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]cluster[/\\]routing[/\\]allocation[/\\]decider[/\\]AllocationDeciders.java" checks="LineLength" />
|
||||
<suppress files="server[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]discovery[/\\]Discovery.java" checks="LineLength" />
|
||||
<suppress files="server[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]discovery[/\\]DiscoverySettings.java" checks="LineLength" />
|
||||
<suppress files="server[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]discovery[/\\]zen[/\\]ZenDiscovery.java" checks="LineLength" />
|
||||
<suppress files="server[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]gateway[/\\]GatewayAllocator.java" checks="LineLength" />
|
||||
<suppress files="server[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]gateway[/\\]GatewayMetaState.java" checks="LineLength" />
|
||||
<suppress files="server[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]gateway[/\\]GatewayService.java" checks="LineLength" />
|
||||
<suppress files="server[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]gateway[/\\]LocalAllocateDangledIndices.java" checks="LineLength" />
|
||||
<suppress files="server[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]gateway[/\\]PrimaryShardAllocator.java" checks="LineLength" />
|
||||
<suppress files="server[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]gateway[/\\]ReplicaShardAllocator.java" checks="LineLength" />
|
||||
<suppress files="server[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]index[/\\]CompositeIndexEventListener.java" checks="LineLength" />
|
||||
<suppress files="server[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]index[/\\]IndexSettings.java" checks="LineLength" />
|
||||
<suppress files="server[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]index[/\\]MergePolicyConfig.java" checks="LineLength" />
|
||||
|
@ -289,7 +276,6 @@
|
|||
<suppress files="server[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]termvectors[/\\]MultiTermVectorsIT.java" checks="LineLength" />
|
||||
<suppress files="server[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]termvectors[/\\]TermVectorsUnitTests.java" checks="LineLength" />
|
||||
<suppress files="server[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]aliases[/\\]IndexAliasesIT.java" checks="LineLength" />
|
||||
<suppress files="server[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]client[/\\]AbstractClientHeadersTestCase.java" checks="LineLength" />
|
||||
<suppress files="server[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]cluster[/\\]ClusterHealthIT.java" checks="LineLength" />
|
||||
<suppress files="server[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]cluster[/\\]ClusterInfoServiceIT.java" checks="LineLength" />
|
||||
<suppress files="server[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]cluster[/\\]ClusterStateDiffIT.java" checks="LineLength" />
|
||||
|
@ -360,20 +346,9 @@
|
|||
<suppress files="server[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]cluster[/\\]shards[/\\]ClusterSearchShardsIT.java" checks="LineLength" />
|
||||
<suppress files="server[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]cluster[/\\]structure[/\\]RoutingIteratorTests.java" checks="LineLength" />
|
||||
<suppress files="server[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]deps[/\\]joda[/\\]SimpleJodaTests.java" checks="LineLength" />
|
||||
<suppress files="server[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]discovery[/\\]BlockingClusterStatePublishResponseHandlerTests.java" checks="LineLength" />
|
||||
<suppress files="server[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]discovery[/\\]zen[/\\]ZenDiscoveryUnitTests.java" checks="LineLength" />
|
||||
<suppress files="server[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]env[/\\]EnvironmentTests.java" checks="LineLength" />
|
||||
<suppress files="server[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]env[/\\]NodeEnvironmentTests.java" checks="LineLength" />
|
||||
<suppress files="server[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]explain[/\\]ExplainActionIT.java" checks="LineLength" />
|
||||
<suppress files="server[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]gateway[/\\]GatewayServiceTests.java" checks="LineLength" />
|
||||
<suppress files="server[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]gateway[/\\]MetaDataStateFormatTests.java" checks="LineLength" />
|
||||
<suppress files="server[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]gateway[/\\]MetaDataWriteDataNodesIT.java" checks="LineLength" />
|
||||
<suppress files="server[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]gateway[/\\]PrimaryShardAllocatorTests.java" checks="LineLength" />
|
||||
<suppress files="server[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]gateway[/\\]PriorityComparatorTests.java" checks="LineLength" />
|
||||
<suppress files="server[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]gateway[/\\]QuorumGatewayIT.java" checks="LineLength" />
|
||||
<suppress files="server[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]gateway[/\\]RecoveryFromGatewayIT.java" checks="LineLength" />
|
||||
<suppress files="server[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]gateway[/\\]ReplicaShardAllocatorTests.java" checks="LineLength" />
|
||||
<suppress files="server[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]gateway[/\\]ReusePeerRecoverySharedTest.java" checks="LineLength" />
|
||||
<suppress files="server[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]get[/\\]GetActionIT.java" checks="LineLength" />
|
||||
<suppress files="server[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]index[/\\]IndexingSlowLogTests.java" checks="LineLength" />
|
||||
<suppress files="server[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]index[/\\]MergePolicySettingsTests.java" checks="LineLength" />
|
||||
|
|
|
@ -95,7 +95,8 @@ class JNANatives {
|
|||
logger.warn("This can result in part of the JVM being swapped out.");
|
||||
if (errno == JNACLibrary.ENOMEM) {
|
||||
if (rlimitSuccess) {
|
||||
logger.warn("Increase RLIMIT_MEMLOCK, soft limit: {}, hard limit: {}", rlimitToString(softLimit), rlimitToString(hardLimit));
|
||||
logger.warn("Increase RLIMIT_MEMLOCK, soft limit: {}, hard limit: {}", rlimitToString(softLimit),
|
||||
rlimitToString(hardLimit));
|
||||
if (Constants.LINUX) {
|
||||
// give specific instructions for the linux case to make it easy
|
||||
String user = System.getProperty("user.name");
|
||||
|
|
|
@ -752,7 +752,8 @@ public abstract class AbstractClient extends AbstractComponent implements Client
|
|||
}
|
||||
|
||||
@Override
|
||||
public void updateSettings(final ClusterUpdateSettingsRequest request, final ActionListener<ClusterUpdateSettingsResponse> listener) {
|
||||
public void updateSettings(final ClusterUpdateSettingsRequest request,
|
||||
final ActionListener<ClusterUpdateSettingsResponse> listener) {
|
||||
execute(ClusterUpdateSettingsAction.INSTANCE, request, listener);
|
||||
}
|
||||
|
||||
|
|
|
@ -76,7 +76,8 @@ import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds;
|
|||
|
||||
/**
|
||||
* The transport client allows to create a client that is not part of the cluster, but simply connects to one
|
||||
* or more nodes directly by adding their respective addresses using {@link #addTransportAddress(org.elasticsearch.common.transport.TransportAddress)}.
|
||||
* or more nodes directly by adding their respective addresses using
|
||||
* {@link #addTransportAddress(org.elasticsearch.common.transport.TransportAddress)}.
|
||||
* <p>
|
||||
* The transport client important modules used is the {@link org.elasticsearch.common.network.NetworkModule} which is
|
||||
* started in client mode (only connects, no bind).
|
||||
|
@ -223,7 +224,8 @@ public abstract class TransportClient extends AbstractClient {
|
|||
transportService.start();
|
||||
transportService.acceptIncomingRequests();
|
||||
|
||||
ClientTemplate transportClient = new ClientTemplate(injector, pluginLifecycleComponents, nodesService, proxy, namedWriteableRegistry);
|
||||
ClientTemplate transportClient = new ClientTemplate(injector, pluginLifecycleComponents, nodesService, proxy,
|
||||
namedWriteableRegistry);
|
||||
resourcesToClose.clear();
|
||||
return transportClient;
|
||||
} finally {
|
||||
|
|
|
@ -43,7 +43,8 @@ public interface Discovery extends LifecycleComponent {
|
|||
* The {@link AckListener} allows to keep track of the ack received from nodes, and verify whether
|
||||
* they updated their own cluster state or not.
|
||||
*
|
||||
* The method is guaranteed to throw a {@link FailedToCommitClusterStateException} if the change is not committed and should be rejected.
|
||||
* The method is guaranteed to throw a {@link FailedToCommitClusterStateException} if the change is not
|
||||
* committed and should be rejected.
|
||||
* Any other exception signals the something wrong happened but the change is committed.
|
||||
*/
|
||||
void publish(ClusterChangedEvent clusterChangedEvent, AckListener ackListener);
|
||||
|
|
|
@ -37,8 +37,10 @@ import java.util.EnumSet;
|
|||
public class DiscoverySettings extends AbstractComponent {
|
||||
|
||||
public static final int NO_MASTER_BLOCK_ID = 2;
|
||||
public static final ClusterBlock NO_MASTER_BLOCK_ALL = new ClusterBlock(NO_MASTER_BLOCK_ID, "no master", true, true, false, RestStatus.SERVICE_UNAVAILABLE, ClusterBlockLevel.ALL);
|
||||
public static final ClusterBlock NO_MASTER_BLOCK_WRITES = new ClusterBlock(NO_MASTER_BLOCK_ID, "no master", true, false, false, RestStatus.SERVICE_UNAVAILABLE, EnumSet.of(ClusterBlockLevel.WRITE, ClusterBlockLevel.METADATA_WRITE));
|
||||
public static final ClusterBlock NO_MASTER_BLOCK_ALL = new ClusterBlock(NO_MASTER_BLOCK_ID, "no master", true, true, false,
|
||||
RestStatus.SERVICE_UNAVAILABLE, ClusterBlockLevel.ALL);
|
||||
public static final ClusterBlock NO_MASTER_BLOCK_WRITES = new ClusterBlock(NO_MASTER_BLOCK_ID, "no master", true, false, false,
|
||||
RestStatus.SERVICE_UNAVAILABLE, EnumSet.of(ClusterBlockLevel.WRITE, ClusterBlockLevel.METADATA_WRITE));
|
||||
/**
|
||||
* sets the timeout for a complete publishing cycle, including both sending and committing. the master
|
||||
* will continue to process the next cluster state update after this time has elapsed
|
||||
|
|
|
@ -302,7 +302,8 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
|
|||
try {
|
||||
membership.sendLeaveRequest(nodes.getLocalNode(), possibleMaster);
|
||||
} catch (Exception e) {
|
||||
logger.debug(() -> new ParameterizedMessage("failed to send leave request from master [{}] to possible master [{}]", nodes.getMasterNode(), possibleMaster), e);
|
||||
logger.debug(() -> new ParameterizedMessage("failed to send leave request from master [{}] to possible master [{}]",
|
||||
nodes.getMasterNode(), possibleMaster), e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -520,16 +521,19 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
|
|||
final Throwable unwrap = ExceptionsHelper.unwrapCause(e);
|
||||
if (unwrap instanceof NotMasterException) {
|
||||
if (++joinAttempt == this.joinRetryAttempts) {
|
||||
logger.info("failed to send join request to master [{}], reason [{}], tried [{}] times", masterNode, ExceptionsHelper.detailedMessage(e), joinAttempt);
|
||||
logger.info("failed to send join request to master [{}], reason [{}], tried [{}] times", masterNode,
|
||||
ExceptionsHelper.detailedMessage(e), joinAttempt);
|
||||
return false;
|
||||
} else {
|
||||
logger.trace("master {} failed with [{}]. retrying... (attempts done: [{}])", masterNode, ExceptionsHelper.detailedMessage(e), joinAttempt);
|
||||
logger.trace("master {} failed with [{}]. retrying... (attempts done: [{}])", masterNode,
|
||||
ExceptionsHelper.detailedMessage(e), joinAttempt);
|
||||
}
|
||||
} else {
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace(() -> new ParameterizedMessage("failed to send join request to master [{}]", masterNode), e);
|
||||
} else {
|
||||
logger.info("failed to send join request to master [{}], reason [{}]", masterNode, ExceptionsHelper.detailedMessage(e));
|
||||
logger.info("failed to send join request to master [{}], reason [{}]", masterNode,
|
||||
ExceptionsHelper.detailedMessage(e));
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
@ -557,7 +561,8 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
|
|||
}
|
||||
|
||||
// visible for testing
|
||||
public static class NodeRemovalClusterStateTaskExecutor implements ClusterStateTaskExecutor<NodeRemovalClusterStateTaskExecutor.Task>, ClusterStateTaskListener {
|
||||
public static class NodeRemovalClusterStateTaskExecutor
|
||||
implements ClusterStateTaskExecutor<NodeRemovalClusterStateTaskExecutor.Task>, ClusterStateTaskListener {
|
||||
|
||||
private final AllocationService allocationService;
|
||||
private final ElectMasterService electMasterService;
|
||||
|
@ -696,7 +701,8 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
|
|||
synchronized (stateMutex) {
|
||||
// check if we have enough master nodes, if not, we need to move into joining the cluster again
|
||||
if (!electMaster.hasEnoughMasterNodes(committedState.get().nodes())) {
|
||||
rejoin("not enough master nodes on change of minimum_master_nodes from [" + prevMinimumMasterNode + "] to [" + minimumMasterNodes + "]");
|
||||
rejoin("not enough master nodes on change of minimum_master_nodes from [" + prevMinimumMasterNode + "] to [" +
|
||||
minimumMasterNodes + "]");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -734,10 +740,12 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
|
|||
}
|
||||
|
||||
assert newClusterState.nodes().getMasterNode() != null : "received a cluster state without a master";
|
||||
assert !newClusterState.blocks().hasGlobalBlock(discoverySettings.getNoMasterBlock()) : "received a cluster state with a master block";
|
||||
assert !newClusterState.blocks().hasGlobalBlock(discoverySettings.getNoMasterBlock()) :
|
||||
"received a cluster state with a master block";
|
||||
|
||||
if (currentState.nodes().isLocalNodeElectedMaster() && newClusterState.nodes().isLocalNodeElectedMaster() == false) {
|
||||
handleAnotherMaster(currentState, newClusterState.nodes().getMasterNode(), newClusterState.version(), "via a new cluster state");
|
||||
handleAnotherMaster(currentState, newClusterState.nodes().getMasterNode(), newClusterState.version(),
|
||||
"via a new cluster state");
|
||||
return false;
|
||||
}
|
||||
|
||||
|
@ -826,15 +834,18 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
|
|||
|
||||
// reject cluster states that are not new from the same master
|
||||
if (currentState.supersedes(newClusterState) ||
|
||||
(newClusterState.nodes().getMasterNodeId().equals(currentState.nodes().getMasterNodeId()) && currentState.version() == newClusterState.version())) {
|
||||
(newClusterState.nodes().getMasterNodeId().equals(currentState.nodes().getMasterNodeId()) &&
|
||||
currentState.version() == newClusterState.version())) {
|
||||
// if the new state has a smaller version, and it has the same master node, then no need to process it
|
||||
logger.debug("received a cluster state that is not newer than the current one, ignoring (received {}, current {})", newClusterState.version(), currentState.version());
|
||||
logger.debug("received a cluster state that is not newer than the current one, ignoring (received {}, current {})",
|
||||
newClusterState.version(), currentState.version());
|
||||
return true;
|
||||
}
|
||||
|
||||
// reject older cluster states if we are following a master
|
||||
if (currentState.nodes().getMasterNodeId() != null && newClusterState.version() < currentState.version()) {
|
||||
logger.debug("received a cluster state that has a lower version than the current one, ignoring (received {}, current {})", newClusterState.version(), currentState.version());
|
||||
logger.debug("received a cluster state that has a lower version than the current one, ignoring (received {}, current {})",
|
||||
newClusterState.version(), currentState.version());
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
|
@ -850,8 +861,10 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
|
|||
return;
|
||||
}
|
||||
if (!currentNodes.getMasterNodeId().equals(newClusterState.nodes().getMasterNodeId())) {
|
||||
logger.warn("received a cluster state from a different master than the current one, rejecting (received {}, current {})", newClusterState.nodes().getMasterNode(), currentNodes.getMasterNode());
|
||||
throw new IllegalStateException("cluster state from a different master than the current one, rejecting (received " + newClusterState.nodes().getMasterNode() + ", current " + currentNodes.getMasterNode() + ")");
|
||||
logger.warn("received a cluster state from a different master than the current one, rejecting (received {}, current {})",
|
||||
newClusterState.nodes().getMasterNode(), currentNodes.getMasterNode());
|
||||
throw new IllegalStateException("cluster state from a different master than the current one, rejecting (received " +
|
||||
newClusterState.nodes().getMasterNode() + ", current " + currentNodes.getMasterNode() + ")");
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -941,13 +954,15 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
|
|||
return null;
|
||||
}
|
||||
} else {
|
||||
assert !activeMasters.contains(localNode) : "local node should never be elected as master when other nodes indicate an active master";
|
||||
assert !activeMasters.contains(localNode) :
|
||||
"local node should never be elected as master when other nodes indicate an active master";
|
||||
// lets tie break between discovered nodes
|
||||
return electMaster.tieBreakActiveMasters(activeMasters);
|
||||
}
|
||||
}
|
||||
|
||||
static List<ZenPing.PingResponse> filterPingResponses(List<ZenPing.PingResponse> fullPingResponses, boolean masterElectionIgnoreNonMasters, Logger logger) {
|
||||
static List<ZenPing.PingResponse> filterPingResponses(List<ZenPing.PingResponse> fullPingResponses,
|
||||
boolean masterElectionIgnoreNonMasters, Logger logger) {
|
||||
List<ZenPing.PingResponse> pingResponses;
|
||||
if (masterElectionIgnoreNonMasters) {
|
||||
pingResponses = fullPingResponses.stream().filter(ping -> ping.node().isMasterNode()).collect(Collectors.toList());
|
||||
|
@ -1004,7 +1019,8 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
|
|||
return clusterState().nodes().isLocalNodeElectedMaster();
|
||||
}
|
||||
|
||||
private void handleAnotherMaster(ClusterState localClusterState, final DiscoveryNode otherMaster, long otherClusterStateVersion, String reason) {
|
||||
private void handleAnotherMaster(ClusterState localClusterState, final DiscoveryNode otherMaster, long otherClusterStateVersion,
|
||||
String reason) {
|
||||
assert localClusterState.nodes().isLocalNodeElectedMaster() : "handleAnotherMaster called but current node is not a master";
|
||||
assert Thread.holdsLock(stateMutex);
|
||||
|
||||
|
@ -1012,13 +1028,16 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
|
|||
rejoin("zen-disco-discovered another master with a new cluster_state [" + otherMaster + "][" + reason + "]");
|
||||
} else {
|
||||
// TODO: do this outside mutex
|
||||
logger.warn("discovered [{}] which is also master but with an older cluster_state, telling [{}] to rejoin the cluster ([{}])", otherMaster, otherMaster, reason);
|
||||
logger.warn("discovered [{}] which is also master but with an older cluster_state, telling [{}] to rejoin the cluster ([{}])",
|
||||
otherMaster, otherMaster, reason);
|
||||
try {
|
||||
// make sure we're connected to this node (connect to node does nothing if we're already connected)
|
||||
// since the network connections are asymmetric, it may be that we received a state but have disconnected from the node
|
||||
// in the past (after a master failure, for example)
|
||||
transportService.connectToNode(otherMaster);
|
||||
transportService.sendRequest(otherMaster, DISCOVERY_REJOIN_ACTION_NAME, new RejoinClusterRequest(localClusterState.nodes().getLocalNodeId()), new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
|
||||
transportService.sendRequest(otherMaster, DISCOVERY_REJOIN_ACTION_NAME,
|
||||
new RejoinClusterRequest(localClusterState.nodes().getLocalNodeId()),
|
||||
new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
|
||||
|
||||
@Override
|
||||
public void handleException(TransportException exp) {
|
||||
|
@ -1140,10 +1159,12 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
|
|||
}
|
||||
|
||||
if (pingsWhileMaster.incrementAndGet() < maxPingsFromAnotherMaster) {
|
||||
logger.trace("got a ping from another master {}. current ping count: [{}]", pingRequest.masterNode(), pingsWhileMaster.get());
|
||||
logger.trace("got a ping from another master {}. current ping count: [{}]", pingRequest.masterNode(),
|
||||
pingsWhileMaster.get());
|
||||
return;
|
||||
}
|
||||
logger.debug("got a ping from another master {}. resolving who should rejoin. current ping count: [{}]", pingRequest.masterNode(), pingsWhileMaster.get());
|
||||
logger.debug("got a ping from another master {}. resolving who should rejoin. current ping count: [{}]",
|
||||
pingRequest.masterNode(), pingsWhileMaster.get());
|
||||
synchronized (stateMutex) {
|
||||
ClusterState currentState = committedState.get();
|
||||
if (currentState.nodes().isLocalNodeElectedMaster()) {
|
||||
|
|
|
@ -48,8 +48,10 @@ public class GatewayAllocator extends AbstractComponent {
|
|||
private final PrimaryShardAllocator primaryShardAllocator;
|
||||
private final ReplicaShardAllocator replicaShardAllocator;
|
||||
|
||||
private final ConcurrentMap<ShardId, AsyncShardFetch<TransportNodesListGatewayStartedShards.NodeGatewayStartedShards>> asyncFetchStarted = ConcurrentCollections.newConcurrentMap();
|
||||
private final ConcurrentMap<ShardId, AsyncShardFetch<TransportNodesListShardStoreMetaData.NodeStoreFilesMetaData>> asyncFetchStore = ConcurrentCollections.newConcurrentMap();
|
||||
private final ConcurrentMap<ShardId, AsyncShardFetch<TransportNodesListGatewayStartedShards.NodeGatewayStartedShards>>
|
||||
asyncFetchStarted = ConcurrentCollections.newConcurrentMap();
|
||||
private final ConcurrentMap<ShardId, AsyncShardFetch<TransportNodesListShardStoreMetaData.NodeStoreFilesMetaData>>
|
||||
asyncFetchStore = ConcurrentCollections.newConcurrentMap();
|
||||
|
||||
@Inject
|
||||
public GatewayAllocator(Settings settings, ClusterService clusterService, RoutingService routingService,
|
||||
|
@ -161,9 +163,11 @@ public class GatewayAllocator extends AbstractComponent {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected AsyncShardFetch.FetchResult<TransportNodesListGatewayStartedShards.NodeGatewayStartedShards> fetchData(ShardRouting shard, RoutingAllocation allocation) {
|
||||
protected AsyncShardFetch.FetchResult<TransportNodesListGatewayStartedShards.NodeGatewayStartedShards>
|
||||
fetchData(ShardRouting shard, RoutingAllocation allocation) {
|
||||
AsyncShardFetch<TransportNodesListGatewayStartedShards.NodeGatewayStartedShards> fetch =
|
||||
asyncFetchStarted.computeIfAbsent(shard.shardId(), shardId -> new InternalAsyncFetch<>(logger, "shard_started", shardId, startedAction));
|
||||
asyncFetchStarted.computeIfAbsent(shard.shardId(),
|
||||
shardId -> new InternalAsyncFetch<>(logger, "shard_started", shardId, startedAction));
|
||||
AsyncShardFetch.FetchResult<TransportNodesListGatewayStartedShards.NodeGatewayStartedShards> shardState =
|
||||
fetch.fetchData(allocation.nodes(), allocation.getIgnoreNodes(shard.shardId()));
|
||||
|
||||
|
@ -184,9 +188,11 @@ public class GatewayAllocator extends AbstractComponent {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected AsyncShardFetch.FetchResult<TransportNodesListShardStoreMetaData.NodeStoreFilesMetaData> fetchData(ShardRouting shard, RoutingAllocation allocation) {
|
||||
protected AsyncShardFetch.FetchResult<TransportNodesListShardStoreMetaData.NodeStoreFilesMetaData>
|
||||
fetchData(ShardRouting shard, RoutingAllocation allocation) {
|
||||
AsyncShardFetch<TransportNodesListShardStoreMetaData.NodeStoreFilesMetaData> fetch =
|
||||
asyncFetchStore.computeIfAbsent(shard.shardId(), shardId -> new InternalAsyncFetch<>(logger, "shard_store", shard.shardId(), storeAction));
|
||||
asyncFetchStore.computeIfAbsent(shard.shardId(),
|
||||
shardId -> new InternalAsyncFetch<>(logger, "shard_store", shard.shardId(), storeAction));
|
||||
AsyncShardFetch.FetchResult<TransportNodesListShardStoreMetaData.NodeStoreFilesMetaData> shardStores =
|
||||
fetch.fetchData(allocation.nodes(), allocation.getIgnoreNodes(shard.shardId()));
|
||||
if (shardStores.hasData()) {
|
||||
|
|
|
@ -166,7 +166,8 @@ public class GatewayMetaState extends AbstractComponent implements ClusterStateA
|
|||
|
||||
|
||||
relevantIndices = getRelevantIndices(event.state(), event.previousState(), previouslyWrittenIndices);
|
||||
final Iterable<IndexMetaWriteInfo> writeInfo = resolveStatesToBeWritten(previouslyWrittenIndices, relevantIndices, previousMetaData, event.state().metaData());
|
||||
final Iterable<IndexMetaWriteInfo> writeInfo = resolveStatesToBeWritten(previouslyWrittenIndices, relevantIndices,
|
||||
previousMetaData, event.state().metaData());
|
||||
// check and write changes in indices
|
||||
for (IndexMetaWriteInfo indexMetaWrite : writeInfo) {
|
||||
try {
|
||||
|
@ -303,11 +304,14 @@ public class GatewayMetaState extends AbstractComponent implements ClusterStateA
|
|||
*
|
||||
* @param previouslyWrittenIndices A list of indices for which the state was already written before
|
||||
* @param potentiallyUnwrittenIndices The list of indices for which state should potentially be written
|
||||
* @param previousMetaData The last meta data we know of. meta data for all indices in previouslyWrittenIndices list is persisted now
|
||||
* @param previousMetaData The last meta data we know of. meta data for all indices in previouslyWrittenIndices list is
|
||||
* persisted now
|
||||
* @param newMetaData The new metadata
|
||||
* @return iterable over all indices states that should be written to disk
|
||||
*/
|
||||
public static Iterable<GatewayMetaState.IndexMetaWriteInfo> resolveStatesToBeWritten(Set<Index> previouslyWrittenIndices, Set<Index> potentiallyUnwrittenIndices, MetaData previousMetaData, MetaData newMetaData) {
|
||||
public static Iterable<GatewayMetaState.IndexMetaWriteInfo> resolveStatesToBeWritten(Set<Index> previouslyWrittenIndices,
|
||||
Set<Index> potentiallyUnwrittenIndices,
|
||||
MetaData previousMetaData, MetaData newMetaData) {
|
||||
List<GatewayMetaState.IndexMetaWriteInfo> indicesToWrite = new ArrayList<>();
|
||||
for (Index index : potentiallyUnwrittenIndices) {
|
||||
IndexMetaData newIndexMetaData = newMetaData.getIndexSafe(index);
|
||||
|
@ -316,7 +320,8 @@ public class GatewayMetaState extends AbstractComponent implements ClusterStateA
|
|||
if (previouslyWrittenIndices.contains(index) == false || previousIndexMetaData == null) {
|
||||
writeReason = "freshly created";
|
||||
} else if (previousIndexMetaData.getVersion() != newIndexMetaData.getVersion()) {
|
||||
writeReason = "version changed from [" + previousIndexMetaData.getVersion() + "] to [" + newIndexMetaData.getVersion() + "]";
|
||||
writeReason = "version changed from [" + previousIndexMetaData.getVersion() + "] to [" +
|
||||
newIndexMetaData.getVersion() + "]";
|
||||
}
|
||||
if (writeReason != null) {
|
||||
indicesToWrite.add(new GatewayMetaState.IndexMetaWriteInfo(newIndexMetaData, previousIndexMetaData, writeReason));
|
||||
|
@ -325,7 +330,8 @@ public class GatewayMetaState extends AbstractComponent implements ClusterStateA
|
|||
return indicesToWrite;
|
||||
}
|
||||
|
||||
public static Set<Index> getRelevantIndicesOnDataOnlyNode(ClusterState state, ClusterState previousState, Set<Index> previouslyWrittenIndices) {
|
||||
public static Set<Index> getRelevantIndicesOnDataOnlyNode(ClusterState state, ClusterState previousState,
|
||||
Set<Index> previouslyWrittenIndices) {
|
||||
RoutingNode newRoutingNode = state.getRoutingNodes().node(state.nodes().getLocalNodeId());
|
||||
if (newRoutingNode == null) {
|
||||
throw new IllegalStateException("cluster state does not contain this node - cannot write index meta state");
|
||||
|
@ -334,7 +340,8 @@ public class GatewayMetaState extends AbstractComponent implements ClusterStateA
|
|||
for (ShardRouting routing : newRoutingNode) {
|
||||
indices.add(routing.index());
|
||||
}
|
||||
// we have to check the meta data also: closed indices will not appear in the routing table, but we must still write the state if we have it written on disk previously
|
||||
// we have to check the meta data also: closed indices will not appear in the routing table, but we must still write the state if
|
||||
// we have it written on disk previously
|
||||
for (IndexMetaData indexMetaData : state.metaData()) {
|
||||
boolean isOrWasClosed = indexMetaData.getState().equals(IndexMetaData.State.CLOSE);
|
||||
// if the index is open we might still have to write the state if it just transitioned from closed to open
|
||||
|
|
|
@ -64,7 +64,8 @@ public class GatewayService extends AbstractLifecycleComponent implements Cluste
|
|||
public static final Setting<Integer> RECOVER_AFTER_MASTER_NODES_SETTING =
|
||||
Setting.intSetting("gateway.recover_after_master_nodes", 0, 0, Property.NodeScope);
|
||||
|
||||
public static final ClusterBlock STATE_NOT_RECOVERED_BLOCK = new ClusterBlock(1, "state not recovered / initialized", true, true, false, RestStatus.SERVICE_UNAVAILABLE, ClusterBlockLevel.ALL);
|
||||
public static final ClusterBlock STATE_NOT_RECOVERED_BLOCK = new ClusterBlock(1, "state not recovered / initialized", true, true,
|
||||
false, RestStatus.SERVICE_UNAVAILABLE, ClusterBlockLevel.ALL);
|
||||
|
||||
public static final TimeValue DEFAULT_RECOVER_AFTER_TIME_IF_EXPECTED_NODES_IS_SET = TimeValue.timeValueMinutes(5);
|
||||
|
||||
|
@ -185,7 +186,8 @@ public class GatewayService extends AbstractLifecycleComponent implements Cluste
|
|||
} else if (expectedDataNodes != -1 && (nodes.getDataNodes().size() < expectedDataNodes)) { // does not meet the expected...
|
||||
enforceRecoverAfterTime = true;
|
||||
reason = "expecting [" + expectedDataNodes + "] data nodes, but only have [" + nodes.getDataNodes().size() + "]";
|
||||
} else if (expectedMasterNodes != -1 && (nodes.getMasterNodes().size() < expectedMasterNodes)) { // does not meet the expected...
|
||||
} else if (expectedMasterNodes != -1 && (nodes.getMasterNodes().size() < expectedMasterNodes)) {
|
||||
// does not meet the expected...
|
||||
enforceRecoverAfterTime = true;
|
||||
reason = "expecting [" + expectedMasterNodes + "] master nodes, but only have [" + nodes.getMasterNodes().size() + "]";
|
||||
}
|
||||
|
|
|
@ -71,7 +71,8 @@ public class LocalAllocateDangledIndices extends AbstractComponent {
|
|||
this.clusterService = clusterService;
|
||||
this.allocationService = allocationService;
|
||||
this.metaDataIndexUpgradeService = metaDataIndexUpgradeService;
|
||||
transportService.registerRequestHandler(ACTION_NAME, AllocateDangledRequest::new, ThreadPool.Names.SAME, new AllocateDangledRequestHandler());
|
||||
transportService.registerRequestHandler(ACTION_NAME, AllocateDangledRequest::new, ThreadPool.Names.SAME,
|
||||
new AllocateDangledRequestHandler());
|
||||
}
|
||||
|
||||
public void allocateDangled(Collection<IndexMetaData> indices, final Listener listener) {
|
||||
|
@ -81,7 +82,8 @@ public class LocalAllocateDangledIndices extends AbstractComponent {
|
|||
listener.onFailure(new MasterNotDiscoveredException("no master to send allocate dangled request"));
|
||||
return;
|
||||
}
|
||||
AllocateDangledRequest request = new AllocateDangledRequest(clusterService.localNode(), indices.toArray(new IndexMetaData[indices.size()]));
|
||||
AllocateDangledRequest request = new AllocateDangledRequest(clusterService.localNode(),
|
||||
indices.toArray(new IndexMetaData[indices.size()]));
|
||||
transportService.sendRequest(masterNode, ACTION_NAME, request, new TransportResponseHandler<AllocateDangledResponse>() {
|
||||
@Override
|
||||
public AllocateDangledResponse read(StreamInput in) throws IOException {
|
||||
|
@ -159,15 +161,18 @@ public class LocalAllocateDangledIndices extends AbstractComponent {
|
|||
minIndexCompatibilityVersion);
|
||||
} catch (Exception ex) {
|
||||
// upgrade failed - adding index as closed
|
||||
logger.warn(() -> new ParameterizedMessage("found dangled index [{}] on node [{}]. This index cannot be upgraded to the latest version, adding as closed", indexMetaData.getIndex(), request.fromNode), ex);
|
||||
upgradedIndexMetaData = IndexMetaData.builder(indexMetaData).state(IndexMetaData.State.CLOSE).version(indexMetaData.getVersion() + 1).build();
|
||||
logger.warn(() -> new ParameterizedMessage("found dangled index [{}] on node [{}]. This index cannot be " +
|
||||
"upgraded to the latest version, adding as closed", indexMetaData.getIndex(), request.fromNode), ex);
|
||||
upgradedIndexMetaData = IndexMetaData.builder(indexMetaData).state(IndexMetaData.State.CLOSE)
|
||||
.version(indexMetaData.getVersion() + 1).build();
|
||||
}
|
||||
metaData.put(upgradedIndexMetaData, false);
|
||||
blocks.addBlocks(upgradedIndexMetaData);
|
||||
if (upgradedIndexMetaData.getState() == IndexMetaData.State.OPEN) {
|
||||
routingTableBuilder.addAsFromDangling(upgradedIndexMetaData);
|
||||
}
|
||||
sb.append("[").append(upgradedIndexMetaData.getIndex()).append("/").append(upgradedIndexMetaData.getState()).append("]");
|
||||
sb.append("[").append(upgradedIndexMetaData.getIndex()).append("/").append(upgradedIndexMetaData.getState())
|
||||
.append("]");
|
||||
}
|
||||
if (!importNeeded) {
|
||||
return currentState;
|
||||
|
@ -175,7 +180,8 @@ public class LocalAllocateDangledIndices extends AbstractComponent {
|
|||
logger.info("auto importing dangled indices {} from [{}]", sb.toString(), request.fromNode);
|
||||
|
||||
RoutingTable routingTable = routingTableBuilder.build();
|
||||
ClusterState updatedState = ClusterState.builder(currentState).metaData(metaData).blocks(blocks).routingTable(routingTable).build();
|
||||
ClusterState updatedState = ClusterState.builder(currentState).metaData(metaData).blocks(blocks)
|
||||
.routingTable(routingTable).build();
|
||||
|
||||
// now, reroute
|
||||
return allocationService.reroute(
|
||||
|
|
|
@ -257,9 +257,13 @@ public abstract class PrimaryShardAllocator extends BaseGatewayShardAllocator {
|
|||
} else {
|
||||
final String finalAllocationId = allocationId;
|
||||
if (nodeShardState.storeException() instanceof ShardLockObtainFailedException) {
|
||||
logger.trace(() -> new ParameterizedMessage("[{}] on node [{}] has allocation id [{}] but the store can not be opened as it's locked, treating as valid shard", shard, nodeShardState.getNode(), finalAllocationId), nodeShardState.storeException());
|
||||
logger.trace(() -> new ParameterizedMessage("[{}] on node [{}] has allocation id [{}] but the store can not be " +
|
||||
"opened as it's locked, treating as valid shard", shard, nodeShardState.getNode(), finalAllocationId),
|
||||
nodeShardState.storeException());
|
||||
} else {
|
||||
logger.trace(() -> new ParameterizedMessage("[{}] on node [{}] has allocation id [{}] but the store can not be opened, treating as no allocation id", shard, nodeShardState.getNode(), finalAllocationId), nodeShardState.storeException());
|
||||
logger.trace(() -> new ParameterizedMessage("[{}] on node [{}] has allocation id [{}] but the store can not be " +
|
||||
"opened, treating as no allocation id", shard, nodeShardState.getNode(), finalAllocationId),
|
||||
nodeShardState.storeException());
|
||||
allocationId = null;
|
||||
}
|
||||
}
|
||||
|
@ -267,7 +271,8 @@ public abstract class PrimaryShardAllocator extends BaseGatewayShardAllocator {
|
|||
if (allocationId != null) {
|
||||
assert nodeShardState.storeException() == null ||
|
||||
nodeShardState.storeException() instanceof ShardLockObtainFailedException :
|
||||
"only allow store that can be opened or that throws a ShardLockObtainFailedException while being opened but got a store throwing " + nodeShardState.storeException();
|
||||
"only allow store that can be opened or that throws a ShardLockObtainFailedException while being opened but got a " +
|
||||
"store throwing " + nodeShardState.storeException();
|
||||
numberOfAllocationsFound++;
|
||||
if (matchAnyShard || inSyncAllocationIds.contains(nodeShardState.allocationId())) {
|
||||
nodeShardStates.add(nodeShardState);
|
||||
|
@ -280,7 +285,8 @@ public abstract class PrimaryShardAllocator extends BaseGatewayShardAllocator {
|
|||
// prefer shards with matching allocation ids
|
||||
Comparator<NodeGatewayStartedShards> matchingAllocationsFirst = Comparator.comparing(
|
||||
(NodeGatewayStartedShards state) -> inSyncAllocationIds.contains(state.allocationId())).reversed();
|
||||
comparator = matchingAllocationsFirst.thenComparing(NO_STORE_EXCEPTION_FIRST_COMPARATOR).thenComparing(PRIMARY_FIRST_COMPARATOR);
|
||||
comparator = matchingAllocationsFirst.thenComparing(NO_STORE_EXCEPTION_FIRST_COMPARATOR)
|
||||
.thenComparing(PRIMARY_FIRST_COMPARATOR);
|
||||
} else {
|
||||
comparator = NO_STORE_EXCEPTION_FIRST_COMPARATOR.thenComparing(PRIMARY_FIRST_COMPARATOR);
|
||||
}
|
||||
|
@ -288,7 +294,8 @@ public abstract class PrimaryShardAllocator extends BaseGatewayShardAllocator {
|
|||
nodeShardStates.sort(comparator);
|
||||
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("{} candidates for allocation: {}", shard, nodeShardStates.stream().map(s -> s.getNode().getName()).collect(Collectors.joining(", ")));
|
||||
logger.trace("{} candidates for allocation: {}", shard, nodeShardStates.stream().map(s -> s.getNode().getName())
|
||||
.collect(Collectors.joining(", ")));
|
||||
}
|
||||
return new NodeShardsResult(nodeShardStates, numberOfAllocationsFound);
|
||||
}
|
||||
|
|
|
@ -121,10 +121,13 @@ public abstract class ReplicaShardAllocator extends BaseGatewayShardAllocator {
|
|||
logger.debug("cancelling allocation of replica on [{}], sync id match found on node [{}]",
|
||||
currentNode, nodeWithHighestMatch);
|
||||
UnassignedInfo unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.REALLOCATED_REPLICA,
|
||||
"existing allocation of replica to [" + currentNode + "] cancelled, sync id match found on node ["+ nodeWithHighestMatch + "]",
|
||||
null, 0, allocation.getCurrentNanoTime(), System.currentTimeMillis(), false, UnassignedInfo.AllocationStatus.NO_ATTEMPT);
|
||||
"existing allocation of replica to [" + currentNode + "] cancelled, sync id match found on node ["+
|
||||
nodeWithHighestMatch + "]",
|
||||
null, 0, allocation.getCurrentNanoTime(), System.currentTimeMillis(), false,
|
||||
UnassignedInfo.AllocationStatus.NO_ATTEMPT);
|
||||
// don't cancel shard in the loop as it will cause a ConcurrentModificationException
|
||||
shardCancellationActions.add(() -> routingNodes.failShard(logger, shard, unassignedInfo, metaData.getIndexSafe(shard.index()), allocation.changes()));
|
||||
shardCancellationActions.add(() -> routingNodes.failShard(logger, shard, unassignedInfo,
|
||||
metaData.getIndexSafe(shard.index()), allocation.changes()));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -298,7 +301,8 @@ public abstract class ReplicaShardAllocator extends BaseGatewayShardAllocator {
|
|||
/**
|
||||
* Finds the store for the assigned shard in the fetched data, returns null if none is found.
|
||||
*/
|
||||
private TransportNodesListShardStoreMetaData.StoreFilesMetaData findStore(ShardRouting shard, RoutingAllocation allocation, AsyncShardFetch.FetchResult<NodeStoreFilesMetaData> data) {
|
||||
private TransportNodesListShardStoreMetaData.StoreFilesMetaData findStore(ShardRouting shard, RoutingAllocation allocation,
|
||||
AsyncShardFetch.FetchResult<NodeStoreFilesMetaData> data) {
|
||||
assert shard.currentNodeId() != null;
|
||||
DiscoveryNode primaryNode = allocation.nodes().get(shard.currentNodeId());
|
||||
if (primaryNode == null) {
|
||||
|
|
|
@ -106,18 +106,22 @@ public abstract class AbstractClientHeadersTestCase extends ESTestCase {
|
|||
client.prepareGet("idx", "type", "id").execute(new AssertingActionListener<>(GetAction.NAME, client.threadPool()));
|
||||
client.prepareSearch().execute(new AssertingActionListener<>(SearchAction.NAME, client.threadPool()));
|
||||
client.prepareDelete("idx", "type", "id").execute(new AssertingActionListener<>(DeleteAction.NAME, client.threadPool()));
|
||||
client.admin().cluster().prepareDeleteStoredScript("id").execute(new AssertingActionListener<>(DeleteStoredScriptAction.NAME, client.threadPool()));
|
||||
client.prepareIndex("idx", "type", "id").setSource("source", XContentType.JSON).execute(new AssertingActionListener<>(IndexAction.NAME, client.threadPool()));
|
||||
client.admin().cluster().prepareDeleteStoredScript("id")
|
||||
.execute(new AssertingActionListener<>(DeleteStoredScriptAction.NAME, client.threadPool()));
|
||||
client.prepareIndex("idx", "type", "id").setSource("source", XContentType.JSON)
|
||||
.execute(new AssertingActionListener<>(IndexAction.NAME, client.threadPool()));
|
||||
|
||||
// choosing arbitrary cluster admin actions to test
|
||||
client.admin().cluster().prepareClusterStats().execute(new AssertingActionListener<>(ClusterStatsAction.NAME, client.threadPool()));
|
||||
client.admin().cluster().prepareCreateSnapshot("repo", "bck").execute(new AssertingActionListener<>(CreateSnapshotAction.NAME, client.threadPool()));
|
||||
client.admin().cluster().prepareCreateSnapshot("repo", "bck")
|
||||
.execute(new AssertingActionListener<>(CreateSnapshotAction.NAME, client.threadPool()));
|
||||
client.admin().cluster().prepareReroute().execute(new AssertingActionListener<>(ClusterRerouteAction.NAME, client.threadPool()));
|
||||
|
||||
// choosing arbitrary indices admin actions to test
|
||||
client.admin().indices().prepareCreate("idx").execute(new AssertingActionListener<>(CreateIndexAction.NAME, client.threadPool()));
|
||||
client.admin().indices().prepareStats().execute(new AssertingActionListener<>(IndicesStatsAction.NAME, client.threadPool()));
|
||||
client.admin().indices().prepareClearCache("idx1", "idx2").execute(new AssertingActionListener<>(ClearIndicesCacheAction.NAME, client.threadPool()));
|
||||
client.admin().indices().prepareClearCache("idx1", "idx2")
|
||||
.execute(new AssertingActionListener<>(ClearIndicesCacheAction.NAME, client.threadPool()));
|
||||
client.admin().indices().prepareFlush().execute(new AssertingActionListener<>(FlushAction.NAME, client.threadPool()));
|
||||
}
|
||||
|
||||
|
|
|
@ -47,7 +47,8 @@ public class BlockingClusterStatePublishResponseHandlerTests extends ESTestCase
|
|||
final Logger logger;
|
||||
final BlockingClusterStatePublishResponseHandler handler;
|
||||
|
||||
PublishResponder(boolean fail, DiscoveryNode node, CyclicBarrier barrier, Logger logger, BlockingClusterStatePublishResponseHandler handler) {
|
||||
PublishResponder(boolean fail, DiscoveryNode node, CyclicBarrier barrier, Logger logger,
|
||||
BlockingClusterStatePublishResponseHandler handler) {
|
||||
this.fail = fail;
|
||||
|
||||
this.node = node;
|
||||
|
@ -80,7 +81,8 @@ public class BlockingClusterStatePublishResponseHandlerTests extends ESTestCase
|
|||
allNodes[i] = node;
|
||||
}
|
||||
|
||||
BlockingClusterStatePublishResponseHandler handler = new BlockingClusterStatePublishResponseHandler(new HashSet<>(Arrays.asList(allNodes)));
|
||||
BlockingClusterStatePublishResponseHandler handler =
|
||||
new BlockingClusterStatePublishResponseHandler(new HashSet<>(Arrays.asList(allNodes)));
|
||||
|
||||
int firstRound = randomIntBetween(5, nodeCount - 1);
|
||||
Thread[] threads = new Thread[firstRound];
|
||||
|
|
|
@ -106,13 +106,16 @@ public class ZenDiscoveryUnitTests extends ESTestCase {
|
|||
|
||||
currentState.version(2);
|
||||
newState.version(1);
|
||||
assertTrue("should ignore, because new state's version is lower to current state's version", shouldIgnoreOrRejectNewClusterState(logger, currentState.build(), newState.build()));
|
||||
assertTrue("should ignore, because new state's version is lower to current state's version",
|
||||
shouldIgnoreOrRejectNewClusterState(logger, currentState.build(), newState.build()));
|
||||
currentState.version(1);
|
||||
newState.version(1);
|
||||
assertTrue("should ignore, because new state's version is equal to current state's version", shouldIgnoreOrRejectNewClusterState(logger, currentState.build(), newState.build()));
|
||||
assertTrue("should ignore, because new state's version is equal to current state's version",
|
||||
shouldIgnoreOrRejectNewClusterState(logger, currentState.build(), newState.build()));
|
||||
currentState.version(1);
|
||||
newState.version(2);
|
||||
assertFalse("should not ignore, because new state's version is higher to current state's version", shouldIgnoreOrRejectNewClusterState(logger, currentState.build(), newState.build()));
|
||||
assertFalse("should not ignore, because new state's version is higher to current state's version",
|
||||
shouldIgnoreOrRejectNewClusterState(logger, currentState.build(), newState.build()));
|
||||
|
||||
currentNodes = DiscoveryNodes.builder();
|
||||
currentNodes.masterNodeId("b").add(new DiscoveryNode("b", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT));
|
||||
|
@ -144,7 +147,8 @@ public class ZenDiscoveryUnitTests extends ESTestCase {
|
|||
currentState.version(1);
|
||||
newState.version(2);
|
||||
}
|
||||
assertFalse("should not ignore, because current state doesn't have a master", shouldIgnoreOrRejectNewClusterState(logger, currentState.build(), newState.build()));
|
||||
assertFalse("should not ignore, because current state doesn't have a master",
|
||||
shouldIgnoreOrRejectNewClusterState(logger, currentState.build(), newState.build()));
|
||||
}
|
||||
|
||||
public void testFilterNonMasterPingResponse() {
|
||||
|
@ -311,8 +315,10 @@ public class ZenDiscoveryUnitTests extends ESTestCase {
|
|||
listener.onSuccess(source);
|
||||
}
|
||||
};
|
||||
ZenDiscovery zenDiscovery = new ZenDiscovery(settings, threadPool, service, new NamedWriteableRegistry(ClusterModule.getNamedWriteables()),
|
||||
masterService, clusterApplier, clusterSettings, hostsResolver -> Collections.emptyList(), ESAllocationTestCase.createAllocationService(),
|
||||
ZenDiscovery zenDiscovery = new ZenDiscovery(settings, threadPool, service,
|
||||
new NamedWriteableRegistry(ClusterModule.getNamedWriteables()),
|
||||
masterService, clusterApplier, clusterSettings, hostsResolver -> Collections.emptyList(),
|
||||
ESAllocationTestCase.createAllocationService(),
|
||||
Collections.emptyList());
|
||||
zenDiscovery.start();
|
||||
return zenDiscovery;
|
||||
|
@ -341,8 +347,9 @@ public class ZenDiscoveryUnitTests extends ESTestCase {
|
|||
(() -> localNode, ZenDiscovery.addBuiltInJoinValidators(Collections.emptyList()));
|
||||
final boolean incompatible = randomBoolean();
|
||||
IndexMetaData indexMetaData = IndexMetaData.builder("test").settings(Settings.builder()
|
||||
.put(SETTING_VERSION_CREATED, incompatible ? VersionUtils.getPreviousVersion(Version.CURRENT.minimumIndexCompatibilityVersion())
|
||||
: VersionUtils.randomVersionBetween(random(), Version.CURRENT.minimumIndexCompatibilityVersion(), Version.CURRENT))
|
||||
.put(SETTING_VERSION_CREATED,
|
||||
incompatible ? VersionUtils.getPreviousVersion(Version.CURRENT.minimumIndexCompatibilityVersion())
|
||||
: VersionUtils.randomVersionBetween(random(), Version.CURRENT.minimumIndexCompatibilityVersion(), Version.CURRENT))
|
||||
.put(SETTING_NUMBER_OF_SHARDS, 1).put(SETTING_NUMBER_OF_REPLICAS, 0)
|
||||
.put(SETTING_CREATION_DATE, System.currentTimeMillis()))
|
||||
.state(IndexMetaData.State.OPEN)
|
||||
|
|
|
@ -58,7 +58,8 @@ public class GatewayServiceTests extends ESTestCase {
|
|||
// ensure settings override default
|
||||
TimeValue timeValue = TimeValue.timeValueHours(3);
|
||||
// ensure default is set when setting expected_nodes
|
||||
service = createService(Settings.builder().put("gateway.expected_nodes", 1).put("gateway.recover_after_time", timeValue.toString()));
|
||||
service = createService(Settings.builder().put("gateway.expected_nodes", 1).put("gateway.recover_after_time",
|
||||
timeValue.toString()));
|
||||
assertThat(service.recoverAfterTime().millis(), Matchers.equalTo(timeValue.millis()));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -102,7 +102,8 @@ public class MetaDataStateFormatTests extends ESTestCase {
|
|||
}
|
||||
final long id = addDummyFiles("foo-", dirs);
|
||||
Format format = new Format("foo-");
|
||||
DummyState state = new DummyState(randomRealisticUnicodeOfCodepointLengthBetween(1, 1000), randomInt(), randomLong(), randomDouble(), randomBoolean());
|
||||
DummyState state = new DummyState(randomRealisticUnicodeOfCodepointLengthBetween(1, 1000), randomInt(), randomLong(),
|
||||
randomDouble(), randomBoolean());
|
||||
format.write(state, dirs);
|
||||
for (Path file : dirs) {
|
||||
Path[] list = content("*", file);
|
||||
|
@ -116,7 +117,8 @@ public class MetaDataStateFormatTests extends ESTestCase {
|
|||
DummyState read = format.read(NamedXContentRegistry.EMPTY, list[0]);
|
||||
assertThat(read, equalTo(state));
|
||||
}
|
||||
DummyState state2 = new DummyState(randomRealisticUnicodeOfCodepointLengthBetween(1, 1000), randomInt(), randomLong(), randomDouble(), randomBoolean());
|
||||
DummyState state2 = new DummyState(randomRealisticUnicodeOfCodepointLengthBetween(1, 1000), randomInt(), randomLong(),
|
||||
randomDouble(), randomBoolean());
|
||||
format.write(state2, dirs);
|
||||
|
||||
for (Path file : dirs) {
|
||||
|
@ -142,7 +144,8 @@ public class MetaDataStateFormatTests extends ESTestCase {
|
|||
final long id = addDummyFiles("foo-", dirs);
|
||||
|
||||
Format format = new Format("foo-");
|
||||
DummyState state = new DummyState(randomRealisticUnicodeOfCodepointLengthBetween(1, 1000), randomInt(), randomLong(), randomDouble(), randomBoolean());
|
||||
DummyState state = new DummyState(randomRealisticUnicodeOfCodepointLengthBetween(1, 1000), randomInt(), randomLong(),
|
||||
randomDouble(), randomBoolean());
|
||||
format.write(state, dirs);
|
||||
for (Path file : dirs) {
|
||||
Path[] list = content("*", file);
|
||||
|
@ -165,7 +168,8 @@ public class MetaDataStateFormatTests extends ESTestCase {
|
|||
}
|
||||
final long id = addDummyFiles("foo-", dirs);
|
||||
Format format = new Format("foo-");
|
||||
DummyState state = new DummyState(randomRealisticUnicodeOfCodepointLengthBetween(1, 1000), randomInt(), randomLong(), randomDouble(), randomBoolean());
|
||||
DummyState state = new DummyState(randomRealisticUnicodeOfCodepointLengthBetween(1, 1000), randomInt(), randomLong(),
|
||||
randomDouble(), randomBoolean());
|
||||
format.write(state, dirs);
|
||||
for (Path file : dirs) {
|
||||
Path[] list = content("*", file);
|
||||
|
@ -207,7 +211,8 @@ public class MetaDataStateFormatTests extends ESTestCase {
|
|||
byte newValue = (byte) ~oldValue;
|
||||
bb.put(0, newValue);
|
||||
raf.write(bb, filePointer);
|
||||
logger.debug("Corrupting file {} -- flipping at position {} from {} to {} ", fileToCorrupt.getFileName().toString(), filePointer, Integer.toHexString(oldValue), Integer.toHexString(newValue));
|
||||
logger.debug("Corrupting file {} -- flipping at position {} from {} to {} ", fileToCorrupt.getFileName().toString(),
|
||||
filePointer, Integer.toHexString(oldValue), Integer.toHexString(newValue));
|
||||
}
|
||||
long checksumAfterCorruption;
|
||||
long actualChecksumAfterCorruption;
|
||||
|
@ -221,7 +226,8 @@ public class MetaDataStateFormatTests extends ESTestCase {
|
|||
msg.append("Checksum before: [").append(checksumBeforeCorruption).append("]");
|
||||
msg.append(" after: [").append(checksumAfterCorruption).append("]");
|
||||
msg.append(" checksum value after corruption: ").append(actualChecksumAfterCorruption).append("]");
|
||||
msg.append(" file: ").append(fileToCorrupt.getFileName().toString()).append(" length: ").append(dir.fileLength(fileToCorrupt.getFileName().toString()));
|
||||
msg.append(" file: ").append(fileToCorrupt.getFileName().toString()).append(" length: ")
|
||||
.append(dir.fileLength(fileToCorrupt.getFileName().toString()));
|
||||
logger.debug("{}", msg.toString());
|
||||
assumeTrue("Checksum collision - " + msg.toString(),
|
||||
checksumAfterCorruption != checksumBeforeCorruption // collision
|
||||
|
@ -243,7 +249,8 @@ public class MetaDataStateFormatTests extends ESTestCase {
|
|||
Files.createDirectories(dirs[i].resolve(MetaDataStateFormat.STATE_DIR_NAME));
|
||||
for (int j = 0; j < numStates; j++) {
|
||||
format.write(meta.get(j), dirs[i]);
|
||||
if (randomBoolean() && (j < numStates - 1 || dirs.length > 0 && i != 0)) { // corrupt a file that we do not necessarily need here....
|
||||
if (randomBoolean() && (j < numStates - 1 || dirs.length > 0 && i != 0)) { // corrupt a file that we do not necessarily
|
||||
// need here....
|
||||
Path file = dirs[i].resolve(MetaDataStateFormat.STATE_DIR_NAME).resolve("global-" + j + ".st");
|
||||
corruptedFiles.add(file);
|
||||
MetaDataStateFormatTests.corruptFile(file, logger);
|
||||
|
@ -320,7 +327,8 @@ public class MetaDataStateFormatTests extends ESTestCase {
|
|||
|
||||
private IndexMetaData.Builder indexBuilder(String index) throws IOException {
|
||||
return IndexMetaData.builder(index)
|
||||
.settings(settings(Version.CURRENT).put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, randomIntBetween(1, 10)).put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, randomIntBetween(0, 5)));
|
||||
.settings(settings(Version.CURRENT).put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, randomIntBetween(1, 10))
|
||||
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, randomIntBetween(0, 5)));
|
||||
}
|
||||
|
||||
|
||||
|
@ -471,7 +479,8 @@ public class MetaDataStateFormatTests extends ESTestCase {
|
|||
} else {
|
||||
realId = Math.max(realId, id);
|
||||
}
|
||||
try (OutputStream stream = Files.newOutputStream(stateDir.resolve(actualPrefix + id + MetaDataStateFormat.STATE_FILE_EXTENSION))) {
|
||||
try (OutputStream stream =
|
||||
Files.newOutputStream(stateDir.resolve(actualPrefix + id + MetaDataStateFormat.STATE_FILE_EXTENSION))) {
|
||||
stream.write(0);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -63,7 +63,8 @@ public class MetaDataWriteDataNodesIT extends ESIntegTestCase {
|
|||
String node2 = nodeNames.get(1);
|
||||
|
||||
String index = "index";
|
||||
assertAcked(prepareCreate(index).setSettings(Settings.builder().put("index.number_of_replicas", 0).put(IndexMetaData.INDEX_ROUTING_INCLUDE_GROUP_SETTING.getKey() + "_name", node1)));
|
||||
assertAcked(prepareCreate(index).setSettings(Settings.builder().put("index.number_of_replicas", 0)
|
||||
.put(IndexMetaData.INDEX_ROUTING_INCLUDE_GROUP_SETTING.getKey() + "_name", node1)));
|
||||
index(index, "_doc", "1", jsonBuilder().startObject().field("text", "some text").endObject());
|
||||
ensureGreen();
|
||||
assertIndexInMetaState(node1, index);
|
||||
|
@ -72,7 +73,8 @@ public class MetaDataWriteDataNodesIT extends ESIntegTestCase {
|
|||
assertIndexInMetaState(masterNode, index);
|
||||
|
||||
logger.debug("relocating index...");
|
||||
client().admin().indices().prepareUpdateSettings(index).setSettings(Settings.builder().put(IndexMetaData.INDEX_ROUTING_INCLUDE_GROUP_SETTING.getKey() + "_name", node2)).get();
|
||||
client().admin().indices().prepareUpdateSettings(index).setSettings(Settings.builder()
|
||||
.put(IndexMetaData.INDEX_ROUTING_INCLUDE_GROUP_SETTING.getKey() + "_name", node2)).get();
|
||||
client().admin().cluster().prepareHealth().setWaitForNoRelocatingShards(true).get();
|
||||
ensureGreen();
|
||||
assertIndexDirectoryDeleted(node1, resolveIndex);
|
||||
|
@ -109,11 +111,13 @@ public class MetaDataWriteDataNodesIT extends ESIntegTestCase {
|
|||
.endObject()).get();
|
||||
|
||||
GetMappingsResponse getMappingsResponse = client().admin().indices().prepareGetMappings(index).addTypes("_doc").get();
|
||||
assertNotNull(((Map<String,?>) (getMappingsResponse.getMappings().get(index).get("_doc").getSourceAsMap().get("properties"))).get("integer_field"));
|
||||
assertNotNull(((Map<String,?>) (getMappingsResponse.getMappings().get(index).get("_doc").getSourceAsMap().get("properties")))
|
||||
.get("integer_field"));
|
||||
|
||||
// make sure it was also written on red node although index is closed
|
||||
ImmutableOpenMap<String, IndexMetaData> indicesMetaData = getIndicesMetaDataOnNode(dataNode);
|
||||
assertNotNull(((Map<String,?>) (indicesMetaData.get(index).getMappings().get("_doc").getSourceAsMap().get("properties"))).get("integer_field"));
|
||||
assertNotNull(((Map<String,?>) (indicesMetaData.get(index).getMappings().get("_doc").getSourceAsMap().get("properties")))
|
||||
.get("integer_field"));
|
||||
assertThat(indicesMetaData.get(index).getState(), equalTo(IndexMetaData.State.CLOSE));
|
||||
|
||||
/* Try the same and see if this also works if node was just restarted.
|
||||
|
@ -134,11 +138,13 @@ public class MetaDataWriteDataNodesIT extends ESIntegTestCase {
|
|||
.endObject()).get();
|
||||
|
||||
getMappingsResponse = client().admin().indices().prepareGetMappings(index).addTypes("_doc").get();
|
||||
assertNotNull(((Map<String,?>) (getMappingsResponse.getMappings().get(index).get("_doc").getSourceAsMap().get("properties"))).get("float_field"));
|
||||
assertNotNull(((Map<String,?>) (getMappingsResponse.getMappings().get(index).get("_doc").getSourceAsMap().get("properties")))
|
||||
.get("float_field"));
|
||||
|
||||
// make sure it was also written on red node although index is closed
|
||||
indicesMetaData = getIndicesMetaDataOnNode(dataNode);
|
||||
assertNotNull(((Map<String,?>) (indicesMetaData.get(index).getMappings().get("_doc").getSourceAsMap().get("properties"))).get("float_field"));
|
||||
assertNotNull(((Map<String,?>) (indicesMetaData.get(index).getMappings().get("_doc").getSourceAsMap().get("properties")))
|
||||
.get("float_field"));
|
||||
assertThat(indicesMetaData.get(index).getState(), equalTo(IndexMetaData.State.CLOSE));
|
||||
|
||||
// finally check that meta data is also written of index opened again
|
||||
|
@ -152,7 +158,8 @@ public class MetaDataWriteDataNodesIT extends ESIntegTestCase {
|
|||
protected void assertIndexDirectoryDeleted(final String nodeName, final Index index) throws Exception {
|
||||
assertBusy(() -> {
|
||||
logger.info("checking if index directory exists...");
|
||||
assertFalse("Expecting index directory of " + index + " to be deleted from node " + nodeName, indexDirectoryExists(nodeName, index));
|
||||
assertFalse("Expecting index directory of " + index + " to be deleted from node " + nodeName,
|
||||
indexDirectoryExists(nodeName, index));
|
||||
}
|
||||
);
|
||||
}
|
||||
|
@ -161,7 +168,8 @@ public class MetaDataWriteDataNodesIT extends ESIntegTestCase {
|
|||
assertBusy(() -> {
|
||||
logger.info("checking if meta state exists...");
|
||||
try {
|
||||
assertTrue("Expecting meta state of index " + indexName + " to be on node " + nodeName, getIndicesMetaDataOnNode(nodeName).containsKey(indexName));
|
||||
assertTrue("Expecting meta state of index " + indexName + " to be on node " + nodeName,
|
||||
getIndicesMetaDataOnNode(nodeName).containsKey(indexName));
|
||||
} catch (Exception e) {
|
||||
logger.info("failed to load meta state", e);
|
||||
fail("could not load meta state");
|
||||
|
|
|
@ -118,7 +118,8 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
|
|||
}
|
||||
|
||||
/**
|
||||
* Tests when the node returns data with a shard allocation id that does not match active allocation ids, it will be moved to ignore unassigned.
|
||||
* Tests when the node returns data with a shard allocation id that does not match active allocation ids, it will be moved to ignore
|
||||
* unassigned.
|
||||
*/
|
||||
public void testNoMatchingAllocationIdFound() {
|
||||
RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), CLUSTER_RECOVERED, "id2");
|
||||
|
@ -155,9 +156,11 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
|
|||
assertThat(allocation.routingNodesChanged(), equalTo(true));
|
||||
assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true));
|
||||
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1));
|
||||
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(), equalTo(node1.getId()));
|
||||
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(),
|
||||
equalTo(node1.getId()));
|
||||
// check that allocation id is reused
|
||||
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).allocationId().getId(), equalTo("allocId1"));
|
||||
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).allocationId().getId(),
|
||||
equalTo("allocId1"));
|
||||
assertClusterHealthStatus(allocation, ClusterHealthStatus.YELLOW);
|
||||
}
|
||||
|
||||
|
@ -177,9 +180,11 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
|
|||
assertThat(allocation.routingNodesChanged(), equalTo(true));
|
||||
assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true));
|
||||
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1));
|
||||
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(), equalTo(node2.getId()));
|
||||
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(),
|
||||
equalTo(node2.getId()));
|
||||
// check that allocation id is reused
|
||||
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).allocationId().getId(), equalTo(allocId2));
|
||||
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).allocationId().getId(),
|
||||
equalTo(allocId2));
|
||||
assertClusterHealthStatus(allocation, ClusterHealthStatus.YELLOW);
|
||||
}
|
||||
|
||||
|
@ -187,16 +192,18 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
|
|||
* Tests that when there is a node to allocate the shard to, it will be allocated to it.
|
||||
*/
|
||||
public void testFoundAllocationAndAllocating() {
|
||||
final RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), randomFrom(CLUSTER_RECOVERED, INDEX_REOPENED),
|
||||
"allocId1");
|
||||
final RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(),
|
||||
randomFrom(CLUSTER_RECOVERED, INDEX_REOPENED), "allocId1");
|
||||
testAllocator.addData(node1, "allocId1", randomBoolean());
|
||||
testAllocator.allocateUnassigned(allocation);
|
||||
assertThat(allocation.routingNodesChanged(), equalTo(true));
|
||||
assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true));
|
||||
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1));
|
||||
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(), equalTo(node1.getId()));
|
||||
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(),
|
||||
equalTo(node1.getId()));
|
||||
// check that allocation id is reused
|
||||
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).allocationId().getId(), equalTo("allocId1"));
|
||||
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).allocationId().getId(),
|
||||
equalTo("allocId1"));
|
||||
assertClusterHealthStatus(allocation, ClusterHealthStatus.YELLOW);
|
||||
}
|
||||
|
||||
|
@ -284,7 +291,8 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
|
|||
assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true));
|
||||
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1));
|
||||
DiscoveryNode allocatedNode = node1HasPrimaryShard ? node1 : node2;
|
||||
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(), equalTo(allocatedNode.getId()));
|
||||
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(),
|
||||
equalTo(allocatedNode.getId()));
|
||||
assertClusterHealthStatus(allocation, ClusterHealthStatus.YELLOW);
|
||||
}
|
||||
|
||||
|
@ -315,7 +323,8 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
|
|||
assertThat(allocation.routingNodesChanged(), equalTo(true));
|
||||
assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true));
|
||||
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1));
|
||||
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(), equalTo(node1.getId()));
|
||||
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(),
|
||||
equalTo(node1.getId()));
|
||||
assertClusterHealthStatus(allocation, ClusterHealthStatus.YELLOW);
|
||||
}
|
||||
|
||||
|
@ -475,12 +484,14 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
|
|||
if (data == null) {
|
||||
data = new HashMap<>();
|
||||
}
|
||||
data.put(node, new TransportNodesListGatewayStartedShards.NodeGatewayStartedShards(node, allocationId, primary, storeException));
|
||||
data.put(node,
|
||||
new TransportNodesListGatewayStartedShards.NodeGatewayStartedShards(node, allocationId, primary, storeException));
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected AsyncShardFetch.FetchResult<TransportNodesListGatewayStartedShards.NodeGatewayStartedShards> fetchData(ShardRouting shard, RoutingAllocation allocation) {
|
||||
protected AsyncShardFetch.FetchResult<TransportNodesListGatewayStartedShards.NodeGatewayStartedShards>
|
||||
fetchData(ShardRouting shard, RoutingAllocation allocation) {
|
||||
return new AsyncShardFetch.FetchResult<>(shardId, data, Collections.<String>emptySet());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -111,7 +111,8 @@ public class PriorityComparatorTests extends ESTestCase {
|
|||
|
||||
for (int i = 0; i < indices.length; i++) {
|
||||
if (frequently()) {
|
||||
indices[i] = new IndexMeta("idx_2015_04_" + String.format(Locale.ROOT, "%02d", i), randomIntBetween(1, 1000), randomIntBetween(1, 10000));
|
||||
indices[i] = new IndexMeta("idx_2015_04_" + String.format(Locale.ROOT, "%02d", i), randomIntBetween(1, 1000),
|
||||
randomIntBetween(1, 10000));
|
||||
} else { // sometimes just use defaults
|
||||
indices[i] = new IndexMeta("idx_2015_04_" + String.format(Locale.ROOT, "%02d", i));
|
||||
}
|
||||
|
@ -121,7 +122,8 @@ public class PriorityComparatorTests extends ESTestCase {
|
|||
for (int i = 0; i < numShards; i++) {
|
||||
IndexMeta indexMeta = randomFrom(indices);
|
||||
shards.add(TestShardRouting.newShardRouting(indexMeta.name, randomIntBetween(1, 5), null, null,
|
||||
randomBoolean(), ShardRoutingState.UNASSIGNED, new UnassignedInfo(randomFrom(UnassignedInfo.Reason.values()), "foobar")));
|
||||
randomBoolean(), ShardRoutingState.UNASSIGNED, new UnassignedInfo(randomFrom(UnassignedInfo.Reason.values()),
|
||||
"foobar")));
|
||||
}
|
||||
shards.sort(new PriorityComparator() {
|
||||
@Override
|
||||
|
@ -138,13 +140,16 @@ public class PriorityComparatorTests extends ESTestCase {
|
|||
if (prevMeta.priority == currentMeta.priority) {
|
||||
if (prevMeta.creationDate == currentMeta.creationDate) {
|
||||
if (prevMeta.name.equals(currentMeta.name) == false) {
|
||||
assertTrue("indexName mismatch, expected:" + currentMeta.name + " after " + prevMeta.name + " " + prevMeta.name.compareTo(currentMeta.name), prevMeta.name.compareTo(currentMeta.name) > 0);
|
||||
assertTrue("indexName mismatch, expected:" + currentMeta.name + " after " + prevMeta.name + " " +
|
||||
prevMeta.name.compareTo(currentMeta.name), prevMeta.name.compareTo(currentMeta.name) > 0);
|
||||
}
|
||||
} else {
|
||||
assertTrue("creationDate mismatch, expected:" + currentMeta.creationDate + " after " + prevMeta.creationDate, prevMeta.creationDate > currentMeta.creationDate);
|
||||
assertTrue("creationDate mismatch, expected:" + currentMeta.creationDate + " after " + prevMeta.creationDate,
|
||||
prevMeta.creationDate > currentMeta.creationDate);
|
||||
}
|
||||
} else {
|
||||
assertTrue("priority mismatch, expected:" + currentMeta.priority + " after " + prevMeta.priority, prevMeta.priority > currentMeta.priority);
|
||||
assertTrue("priority mismatch, expected:" + currentMeta.priority + " after " + prevMeta.priority,
|
||||
prevMeta.priority > currentMeta.priority);
|
||||
}
|
||||
}
|
||||
previous = routing;
|
||||
|
|
|
@ -75,12 +75,14 @@ public class QuorumGatewayIT extends ESIntegTestCase {
|
|||
if (numNodes == 1) {
|
||||
assertTrue(awaitBusy(() -> {
|
||||
logger.info("--> running cluster_health (wait for the shards to startup)");
|
||||
ClusterHealthResponse clusterHealth = activeClient.admin().cluster().health(clusterHealthRequest().waitForYellowStatus().waitForNodes("2").waitForActiveShards(test.numPrimaries * 2)).actionGet();
|
||||
ClusterHealthResponse clusterHealth = activeClient.admin().cluster().health(clusterHealthRequest()
|
||||
.waitForYellowStatus().waitForNodes("2").waitForActiveShards(test.numPrimaries * 2)).actionGet();
|
||||
logger.info("--> done cluster_health, status {}", clusterHealth.getStatus());
|
||||
return (!clusterHealth.isTimedOut()) && clusterHealth.getStatus() == ClusterHealthStatus.YELLOW;
|
||||
}, 30, TimeUnit.SECONDS));
|
||||
logger.info("--> one node is closed -- index 1 document into the remaining nodes");
|
||||
activeClient.prepareIndex("test", "type1", "3").setSource(jsonBuilder().startObject().field("field", "value3").endObject()).get();
|
||||
activeClient.prepareIndex("test", "type1", "3").setSource(jsonBuilder().startObject().field("field", "value3")
|
||||
.endObject()).get();
|
||||
assertNoFailures(activeClient.admin().indices().prepareRefresh().get());
|
||||
for (int i = 0; i < 10; i++) {
|
||||
assertHitCount(activeClient.prepareSearch().setSize(0).setQuery(matchAllQuery()).get(), 3L);
|
||||
|
|
|
@ -145,7 +145,8 @@ public class RecoveryFromGatewayIT extends ESIntegTestCase {
|
|||
} else {
|
||||
assertThat("number of terms changed for index [" + index + "]", current.length, equalTo(previous.length));
|
||||
for (int shard = 0; shard < current.length; shard++) {
|
||||
assertThat("primary term didn't increase for [" + index + "][" + shard + "]", current[shard], greaterThan(previous[shard]));
|
||||
assertThat("primary term didn't increase for [" + index + "][" + shard + "]", current[shard],
|
||||
greaterThan(previous[shard]));
|
||||
}
|
||||
result.put(index, current);
|
||||
}
|
||||
|
@ -158,7 +159,8 @@ public class RecoveryFromGatewayIT extends ESIntegTestCase {
|
|||
internalCluster().startNode();
|
||||
|
||||
String mapping = Strings.toString(XContentFactory.jsonBuilder().startObject().startObject("type1")
|
||||
.startObject("properties").startObject("field").field("type", "text").endObject().startObject("num").field("type", "integer").endObject().endObject()
|
||||
.startObject("properties").startObject("field").field("type", "text").endObject().startObject("num").field("type", "integer")
|
||||
.endObject().endObject()
|
||||
.endObject().endObject());
|
||||
// note: default replica settings are tied to #data nodes-1 which is 0 here. We can do with 1 in this test.
|
||||
int numberOfShards = numberOfShards();
|
||||
|
@ -243,9 +245,11 @@ public class RecoveryFromGatewayIT extends ESIntegTestCase {
|
|||
|
||||
public void testSingleNodeWithFlush() throws Exception {
|
||||
internalCluster().startNode();
|
||||
client().prepareIndex("test", "type1", "1").setSource(jsonBuilder().startObject().field("field", "value1").endObject()).execute().actionGet();
|
||||
client().prepareIndex("test", "type1", "1").setSource(jsonBuilder().startObject().field("field", "value1").endObject()).execute()
|
||||
.actionGet();
|
||||
flush();
|
||||
client().prepareIndex("test", "type1", "2").setSource(jsonBuilder().startObject().field("field", "value2").endObject()).execute().actionGet();
|
||||
client().prepareIndex("test", "type1", "2").setSource(jsonBuilder().startObject().field("field", "value2").endObject()).execute()
|
||||
.actionGet();
|
||||
refresh();
|
||||
|
||||
assertHitCount(client().prepareSearch().setSize(0).setQuery(matchAllQuery()).execute().actionGet(), 2);
|
||||
|
@ -280,9 +284,11 @@ public class RecoveryFromGatewayIT extends ESIntegTestCase {
|
|||
final String firstNode = internalCluster().startNode();
|
||||
internalCluster().startNode();
|
||||
|
||||
client().prepareIndex("test", "type1", "1").setSource(jsonBuilder().startObject().field("field", "value1").endObject()).execute().actionGet();
|
||||
client().prepareIndex("test", "type1", "1").setSource(jsonBuilder().startObject().field("field", "value1").endObject()).execute()
|
||||
.actionGet();
|
||||
flush();
|
||||
client().prepareIndex("test", "type1", "2").setSource(jsonBuilder().startObject().field("field", "value2").endObject()).execute().actionGet();
|
||||
client().prepareIndex("test", "type1", "2").setSource(jsonBuilder().startObject().field("field", "value2").endObject()).execute()
|
||||
.actionGet();
|
||||
refresh();
|
||||
|
||||
logger.info("Running Cluster Health (wait for the shards to startup)");
|
||||
|
@ -321,9 +327,11 @@ public class RecoveryFromGatewayIT extends ESIntegTestCase {
|
|||
internalCluster().startNodes(2, Settings.builder().put("gateway.recover_after_nodes", 2).build());
|
||||
|
||||
assertAcked(client().admin().indices().prepareCreate("test"));
|
||||
client().prepareIndex("test", "type1", "1").setSource(jsonBuilder().startObject().field("field", "value1").endObject()).execute().actionGet();
|
||||
client().prepareIndex("test", "type1", "1").setSource(jsonBuilder().startObject().field("field", "value1").endObject()).execute()
|
||||
.actionGet();
|
||||
client().admin().indices().prepareFlush().execute().actionGet();
|
||||
client().prepareIndex("test", "type1", "2").setSource(jsonBuilder().startObject().field("field", "value2").endObject()).execute().actionGet();
|
||||
client().prepareIndex("test", "type1", "2").setSource(jsonBuilder().startObject().field("field", "value2").endObject()).execute()
|
||||
.actionGet();
|
||||
client().admin().indices().prepareRefresh().execute().actionGet();
|
||||
|
||||
logger.info("--> running cluster_health (wait for the shards to startup)");
|
||||
|
@ -340,7 +348,8 @@ public class RecoveryFromGatewayIT extends ESIntegTestCase {
|
|||
internalCluster().stopRandomDataNode();
|
||||
|
||||
logger.info("--> one node is closed - start indexing data into the second one");
|
||||
client().prepareIndex("test", "type1", "3").setSource(jsonBuilder().startObject().field("field", "value3").endObject()).execute().actionGet();
|
||||
client().prepareIndex("test", "type1", "3").setSource(jsonBuilder().startObject().field("field", "value3").endObject()).execute()
|
||||
.actionGet();
|
||||
// TODO: remove once refresh doesn't fail immediately if there a master block:
|
||||
// https://github.com/elastic/elasticsearch/issues/9997
|
||||
// client().admin().cluster().prepareHealth("test").setWaitForYellowStatus().get();
|
||||
|
@ -361,7 +370,8 @@ public class RecoveryFromGatewayIT extends ESIntegTestCase {
|
|||
.startObject("field2").field("type", "keyword").field("store", true).endObject()
|
||||
.endObject().endObject().endObject())
|
||||
.execute().actionGet();
|
||||
client().admin().indices().prepareAliases().addAlias("test", "test_alias", QueryBuilders.termQuery("field", "value")).execute().actionGet();
|
||||
client().admin().indices().prepareAliases().addAlias("test", "test_alias", QueryBuilders.termQuery("field", "value")).execute()
|
||||
.actionGet();
|
||||
|
||||
logger.info("--> stopping the second node");
|
||||
internalCluster().stopRandomDataNode();
|
||||
|
@ -476,10 +486,13 @@ public class RecoveryFromGatewayIT extends ESIntegTestCase {
|
|||
assertThat("bytes should have been recovered", recoveryState.getIndex().recoveredBytes(), equalTo(recovered));
|
||||
assertThat("data should have been reused", recoveryState.getIndex().reusedBytes(), greaterThan(0L));
|
||||
// we have to recover the segments file since we commit the translog ID on engine startup
|
||||
assertThat("all existing files should be reused, byte count mismatch", recoveryState.getIndex().reusedBytes(), equalTo(reused));
|
||||
assertThat("all existing files should be reused, byte count mismatch", recoveryState.getIndex().reusedBytes(),
|
||||
equalTo(reused));
|
||||
assertThat(recoveryState.getIndex().reusedBytes(), equalTo(recoveryState.getIndex().totalBytes() - recovered));
|
||||
assertThat("the segment from the last round of indexing should be recovered", recoveryState.getIndex().recoveredFileCount(), equalTo(filesRecovered));
|
||||
assertThat("all existing files should be reused, file count mismatch", recoveryState.getIndex().reusedFileCount(), equalTo(filesReused));
|
||||
assertThat("the segment from the last round of indexing should be recovered", recoveryState.getIndex().recoveredFileCount(),
|
||||
equalTo(filesRecovered));
|
||||
assertThat("all existing files should be reused, file count mismatch", recoveryState.getIndex().reusedFileCount(),
|
||||
equalTo(filesReused));
|
||||
assertThat(recoveryState.getIndex().reusedFileCount(), equalTo(recoveryState.getIndex().totalFileCount() - filesRecovered));
|
||||
assertThat("> 0 files should be reused", recoveryState.getIndex().reusedFileCount(), greaterThan(0));
|
||||
assertThat("no translog ops should be recovered", recoveryState.getTranslog().recoveredOperations(), equalTo(0));
|
||||
|
@ -498,12 +511,14 @@ public class RecoveryFromGatewayIT extends ESIntegTestCase {
|
|||
// we need different data paths so we make sure we start the second node fresh
|
||||
|
||||
final Path pathNode1 = createTempDir();
|
||||
final String node_1 = internalCluster().startNode(Settings.builder().put(Environment.PATH_DATA_SETTING.getKey(), pathNode1).build());
|
||||
final String node_1 =
|
||||
internalCluster().startNode(Settings.builder().put(Environment.PATH_DATA_SETTING.getKey(), pathNode1).build());
|
||||
|
||||
client().prepareIndex("test", "type1", "1").setSource("field", "value").execute().actionGet();
|
||||
|
||||
final Path pathNode2 = createTempDir();
|
||||
final String node_2 = internalCluster().startNode(Settings.builder().put(Environment.PATH_DATA_SETTING.getKey(), pathNode2).build());
|
||||
final String node_2 =
|
||||
internalCluster().startNode(Settings.builder().put(Environment.PATH_DATA_SETTING.getKey(), pathNode2).build());
|
||||
|
||||
ensureGreen();
|
||||
Map<String, long[]> primaryTerms = assertAndCapturePrimaryTerms(null);
|
||||
|
|
|
@ -95,7 +95,8 @@ public class ReplicaShardAllocatorTests extends ESAllocationTestCase {
|
|||
* the shard allocator to allocate it. There isn't a copy around to find anyhow.
|
||||
*/
|
||||
public void testNoAsyncFetchOnIndexCreation() {
|
||||
RoutingAllocation allocation = onePrimaryOnNode1And1Replica(yesAllocationDeciders(), Settings.EMPTY, UnassignedInfo.Reason.INDEX_CREATED);
|
||||
RoutingAllocation allocation = onePrimaryOnNode1And1Replica(yesAllocationDeciders(), Settings.EMPTY,
|
||||
UnassignedInfo.Reason.INDEX_CREATED);
|
||||
testAllocator.clean();
|
||||
testAllocator.allocateUnassigned(allocation);
|
||||
assertThat(testAllocator.getFetchDataCalledAndClean(), equalTo(false));
|
||||
|
@ -108,7 +109,8 @@ public class ReplicaShardAllocatorTests extends ESAllocationTestCase {
|
|||
* and find a better copy for the shard.
|
||||
*/
|
||||
public void testAsyncFetchOnAnythingButIndexCreation() {
|
||||
UnassignedInfo.Reason reason = RandomPicks.randomFrom(random(), EnumSet.complementOf(EnumSet.of(UnassignedInfo.Reason.INDEX_CREATED)));
|
||||
UnassignedInfo.Reason reason = RandomPicks.randomFrom(random(),
|
||||
EnumSet.complementOf(EnumSet.of(UnassignedInfo.Reason.INDEX_CREATED)));
|
||||
RoutingAllocation allocation = onePrimaryOnNode1And1Replica(yesAllocationDeciders(), Settings.EMPTY, reason);
|
||||
testAllocator.clean();
|
||||
testAllocator.allocateUnassigned(allocation);
|
||||
|
@ -125,7 +127,8 @@ public class ReplicaShardAllocatorTests extends ESAllocationTestCase {
|
|||
.addData(nodeToMatch, "MATCH", new StoreFileMetaData("file1", 10, "MATCH_CHECKSUM", MIN_SUPPORTED_LUCENE_VERSION));
|
||||
testAllocator.allocateUnassigned(allocation);
|
||||
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1));
|
||||
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(), equalTo(nodeToMatch.getId()));
|
||||
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(),
|
||||
equalTo(nodeToMatch.getId()));
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -138,7 +141,8 @@ public class ReplicaShardAllocatorTests extends ESAllocationTestCase {
|
|||
.addData(nodeToMatch, "MATCH", new StoreFileMetaData("file1", 10, "NO_MATCH_CHECKSUM" ,MIN_SUPPORTED_LUCENE_VERSION));
|
||||
testAllocator.allocateUnassigned(allocation);
|
||||
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1));
|
||||
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(), equalTo(nodeToMatch.getId()));
|
||||
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(),
|
||||
equalTo(nodeToMatch.getId()));
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -151,7 +155,8 @@ public class ReplicaShardAllocatorTests extends ESAllocationTestCase {
|
|||
.addData(nodeToMatch, "NO_MATCH", new StoreFileMetaData("file1", 10, "MATCH_CHECKSUM", MIN_SUPPORTED_LUCENE_VERSION));
|
||||
testAllocator.allocateUnassigned(allocation);
|
||||
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1));
|
||||
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(), equalTo(nodeToMatch.getId()));
|
||||
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(),
|
||||
equalTo(nodeToMatch.getId()));
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -198,7 +203,8 @@ public class ReplicaShardAllocatorTests extends ESAllocationTestCase {
|
|||
* moves to the ignore unassigned list.
|
||||
*/
|
||||
public void testNoOrThrottleDecidersRemainsInUnassigned() {
|
||||
RoutingAllocation allocation = onePrimaryOnNode1And1Replica(randomBoolean() ? noAllocationDeciders() : throttleAllocationDeciders());
|
||||
RoutingAllocation allocation =
|
||||
onePrimaryOnNode1And1Replica(randomBoolean() ? noAllocationDeciders() : throttleAllocationDeciders());
|
||||
testAllocator.addData(node1, "MATCH", new StoreFileMetaData("file1", 10, "MATCH_CHECKSUM", MIN_SUPPORTED_LUCENE_VERSION))
|
||||
.addData(node2, "MATCH", new StoreFileMetaData("file1", 10, "MATCH_CHECKSUM", MIN_SUPPORTED_LUCENE_VERSION));
|
||||
testAllocator.allocateUnassigned(allocation);
|
||||
|
@ -246,12 +252,14 @@ public class ReplicaShardAllocatorTests extends ESAllocationTestCase {
|
|||
assertThat(allocation.routingNodes().unassigned().ignored().get(0).shardId(), equalTo(shardId));
|
||||
|
||||
allocation = onePrimaryOnNode1And1Replica(yesAllocationDeciders(),
|
||||
Settings.builder().put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), TimeValue.timeValueHours(1)).build(), UnassignedInfo.Reason.NODE_LEFT);
|
||||
Settings.builder().put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(),
|
||||
TimeValue.timeValueHours(1)).build(), UnassignedInfo.Reason.NODE_LEFT);
|
||||
testAllocator.addData(node2, "MATCH", new StoreFileMetaData("file1", 10, "MATCH_CHECKSUM", MIN_SUPPORTED_LUCENE_VERSION));
|
||||
testAllocator.allocateUnassigned(allocation);
|
||||
assertThat(allocation.routingNodesChanged(), equalTo(true));
|
||||
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1));
|
||||
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(), equalTo(node2.getId()));
|
||||
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(),
|
||||
equalTo(node2.getId()));
|
||||
}
|
||||
|
||||
public void testCancelRecoveryBetterSyncId() {
|
||||
|
@ -330,7 +338,9 @@ public class ReplicaShardAllocatorTests extends ESAllocationTestCase {
|
|||
.add(IndexRoutingTable.builder(shardId.getIndex())
|
||||
.addIndexShard(new IndexShardRoutingTable.Builder(shardId)
|
||||
.addShard(primaryShard)
|
||||
.addShard(TestShardRouting.newShardRouting(shardId, node2.getId(), null, false, ShardRoutingState.INITIALIZING, new UnassignedInfo(UnassignedInfo.Reason.CLUSTER_RECOVERED, null)))
|
||||
.addShard(TestShardRouting.newShardRouting(shardId, node2.getId(), null, false,
|
||||
ShardRoutingState.INITIALIZING,
|
||||
new UnassignedInfo(UnassignedInfo.Reason.CLUSTER_RECOVERED, null)))
|
||||
.build())
|
||||
)
|
||||
.build();
|
||||
|
@ -380,13 +390,15 @@ public class ReplicaShardAllocatorTests extends ESAllocationTestCase {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected AsyncShardFetch.FetchResult<TransportNodesListShardStoreMetaData.NodeStoreFilesMetaData> fetchData(ShardRouting shard, RoutingAllocation allocation) {
|
||||
protected AsyncShardFetch.FetchResult<TransportNodesListShardStoreMetaData.NodeStoreFilesMetaData>
|
||||
fetchData(ShardRouting shard, RoutingAllocation allocation) {
|
||||
fetchDataCalled.set(true);
|
||||
Map<DiscoveryNode, TransportNodesListShardStoreMetaData.NodeStoreFilesMetaData> tData = null;
|
||||
if (data != null) {
|
||||
tData = new HashMap<>();
|
||||
for (Map.Entry<DiscoveryNode, TransportNodesListShardStoreMetaData.StoreFilesMetaData> entry : data.entrySet()) {
|
||||
tData.put(entry.getKey(), new TransportNodesListShardStoreMetaData.NodeStoreFilesMetaData(entry.getKey(), entry.getValue()));
|
||||
tData.put(entry.getKey(),
|
||||
new TransportNodesListShardStoreMetaData.NodeStoreFilesMetaData(entry.getKey(), entry.getValue()));
|
||||
}
|
||||
}
|
||||
return new AsyncShardFetch.FetchResult<>(shardId, tData, Collections.emptySet());
|
||||
|
|
|
@ -87,7 +87,8 @@ public class ReusePeerRecoverySharedTest {
|
|||
|
||||
// Disable allocations while we are closing nodes
|
||||
client().admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder()
|
||||
.put(EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING.getKey(), EnableAllocationDecider.Allocation.NONE)).get();
|
||||
.put(EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING.getKey(),
|
||||
EnableAllocationDecider.Allocation.NONE)).get();
|
||||
logger.info("--> full cluster restart");
|
||||
restartCluster.run();
|
||||
|
||||
|
@ -102,7 +103,8 @@ public class ReusePeerRecoverySharedTest {
|
|||
logger.info("--> disabling allocation while the cluster is shut down{}", useSyncIds ? "" : " a second time");
|
||||
// Disable allocations while we are closing nodes
|
||||
client().admin().cluster().prepareUpdateSettings().setTransientSettings(
|
||||
Settings.builder().put(EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING.getKey(), EnableAllocationDecider.Allocation.NONE))
|
||||
Settings.builder().put(EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING.getKey(),
|
||||
EnableAllocationDecider.Allocation.NONE))
|
||||
.get();
|
||||
logger.info("--> full cluster restart");
|
||||
restartCluster.run();
|
||||
|
@ -138,7 +140,8 @@ public class ReusePeerRecoverySharedTest {
|
|||
} else {
|
||||
if (useSyncIds && !recoveryState.getPrimary()) {
|
||||
logger.info("--> replica shard {} recovered from {} to {} using sync id, recovered {}, reuse {}",
|
||||
recoveryState.getShardId().getId(), recoveryState.getSourceNode().getName(), recoveryState.getTargetNode().getName(),
|
||||
recoveryState.getShardId().getId(), recoveryState.getSourceNode().getName(),
|
||||
recoveryState.getTargetNode().getName(),
|
||||
recoveryState.getIndex().recoveredBytes(), recoveryState.getIndex().reusedBytes());
|
||||
}
|
||||
assertThat(recoveryState.getIndex().recoveredBytes(), equalTo(0L));
|
||||
|
|
Loading…
Reference in New Issue