mirror of
https://github.com/apache/lucene.git
synced 2025-02-23 02:35:02 +00:00
SOLR-7336: Add State enum to Replica
git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1671240 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
a4be57c1a1
commit
2d15d935f0
@ -71,6 +71,9 @@ Upgrading from Solr 5.1
|
||||
* SOLR-7325: Slice.getState() now returns a State enum instead of a String. This helps
|
||||
clarify the states a Slice can be in, as well comparing the state of a Slice.
|
||||
(Shai Erera)
|
||||
|
||||
* SOLR-7336: Added Replica.getState() and removed ZkStateReader state-related constants.
|
||||
You should use Replica.State to compare a replica's state. (Shai Erera)
|
||||
|
||||
Detailed Change List
|
||||
----------------------
|
||||
|
@ -17,14 +17,15 @@ package org.apache.solr.cloud;
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import com.google.common.base.Strings;
|
||||
import java.util.Properties;
|
||||
|
||||
import org.apache.solr.common.cloud.Replica;
|
||||
import org.apache.solr.common.cloud.Slice;
|
||||
import org.apache.solr.common.cloud.ZkStateReader;
|
||||
import org.apache.solr.common.params.SolrParams;
|
||||
import org.apache.solr.core.CoreDescriptor;
|
||||
import org.apache.solr.util.PropertiesUtil;
|
||||
|
||||
import java.util.Properties;
|
||||
import com.google.common.base.Strings;
|
||||
|
||||
public class CloudDescriptor {
|
||||
|
||||
@ -43,7 +44,7 @@ public class CloudDescriptor {
|
||||
volatile String shardParent = null;
|
||||
|
||||
volatile boolean isLeader = false;
|
||||
volatile String lastPublished = ZkStateReader.ACTIVE;
|
||||
volatile Replica.State lastPublished = Replica.State.ACTIVE;
|
||||
|
||||
public static final String NUM_SHARDS = "numShards";
|
||||
|
||||
@ -61,7 +62,7 @@ public class CloudDescriptor {
|
||||
this.numShards = PropertiesUtil.toInteger(props.getProperty(CloudDescriptor.NUM_SHARDS), null);
|
||||
}
|
||||
|
||||
public String getLastPublished() {
|
||||
public Replica.State getLastPublished() {
|
||||
return lastPublished;
|
||||
}
|
||||
|
||||
|
@ -161,7 +161,7 @@ class ShardLeaderElectionContextBase extends ElectionContext {
|
||||
leaderProps.getProperties().get(ZkStateReader.BASE_URL_PROP),
|
||||
ZkStateReader.CORE_NAME_PROP,
|
||||
leaderProps.getProperties().get(ZkStateReader.CORE_NAME_PROP),
|
||||
ZkStateReader.STATE_PROP, ZkStateReader.ACTIVE);
|
||||
ZkStateReader.STATE_PROP, Replica.State.ACTIVE.toString());
|
||||
Overseer.getInQueue(zkClient).offer(ZkStateReader.toJSON(m));
|
||||
}
|
||||
}
|
||||
@ -384,11 +384,11 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
|
||||
if (coreNodeName.equals(replicaCoreNodeName))
|
||||
continue; // added safe-guard so we don't mark this core as down
|
||||
|
||||
String lirState = zkController.getLeaderInitiatedRecoveryState(coll, shardId, replicaCoreNodeName);
|
||||
if (ZkStateReader.DOWN.equals(lirState) || ZkStateReader.RECOVERY_FAILED.equals(lirState)) {
|
||||
final Replica.State lirState = zkController.getLeaderInitiatedRecoveryState(coll, shardId, replicaCoreNodeName);
|
||||
if (lirState == Replica.State.DOWN || lirState == Replica.State.RECOVERY_FAILED) {
|
||||
log.info("After core={} coreNodeName={} was elected leader, a replica coreNodeName={} was found in state: "
|
||||
+ lirState + " and needing recovery.", coreName, coreNodeName, replicaCoreNodeName);
|
||||
List<ZkCoreNodeProps> replicaProps =
|
||||
+ lirState.toString() + " and needing recovery.", coreName, coreNodeName, replicaCoreNodeName);
|
||||
List<ZkCoreNodeProps> replicaProps =
|
||||
zkController.getZkStateReader().getReplicaProps(collection, shardId, coreNodeName);
|
||||
|
||||
if (replicaProps != null && replicaProps.size() > 0) {
|
||||
@ -507,15 +507,15 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
|
||||
return true;
|
||||
}
|
||||
|
||||
if (core.getCoreDescriptor().getCloudDescriptor().getLastPublished().equals(ZkStateReader.ACTIVE)) {
|
||||
if (core.getCoreDescriptor().getCloudDescriptor().getLastPublished() == Replica.State.ACTIVE) {
|
||||
|
||||
// maybe active but if the previous leader marked us as down and
|
||||
// we haven't recovered, then can't be leader
|
||||
String lirState = zkController.getLeaderInitiatedRecoveryState(collection, shardId,
|
||||
final Replica.State lirState = zkController.getLeaderInitiatedRecoveryState(collection, shardId,
|
||||
core.getCoreDescriptor().getCloudDescriptor().getCoreNodeName());
|
||||
if (ZkStateReader.DOWN.equals(lirState) || ZkStateReader.RECOVERING.equals(lirState)) {
|
||||
if (lirState == Replica.State.DOWN || lirState == Replica.State.RECOVERING) {
|
||||
log.warn("Although my last published state is Active, the previous leader marked me "+core.getName()
|
||||
+ " as " + lirState
|
||||
+ " as " + lirState.toString()
|
||||
+ " and I haven't recovered yet, so I shouldn't be the leader.");
|
||||
return false;
|
||||
}
|
||||
|
@ -193,8 +193,7 @@ public class LeaderInitiatedRecoveryThread extends Thread {
|
||||
if (collection != null && shardId != null) {
|
||||
try {
|
||||
// call out to ZooKeeper to get the leader-initiated recovery state
|
||||
String lirState =
|
||||
zkController.getLeaderInitiatedRecoveryState(collection, shardId, replicaCoreNodeName);
|
||||
final Replica.State lirState = zkController.getLeaderInitiatedRecoveryState(collection, shardId, replicaCoreNodeName);
|
||||
|
||||
if (lirState == null) {
|
||||
log.warn("Stop trying to send recovery command to downed replica core="+coreNeedingRecovery+
|
||||
@ -203,7 +202,7 @@ public class LeaderInitiatedRecoveryThread extends Thread {
|
||||
break;
|
||||
}
|
||||
|
||||
if (ZkStateReader.RECOVERING.equals(lirState)) {
|
||||
if (lirState == Replica.State.RECOVERING) {
|
||||
// replica has ack'd leader initiated recovery and entered the recovering state
|
||||
// so we don't need to keep looping to send the command
|
||||
continueTrying = false;
|
||||
@ -216,12 +215,12 @@ public class LeaderInitiatedRecoveryThread extends Thread {
|
||||
zkStateReader.getReplicaProps(collection, shardId, leaderCoreNodeName);
|
||||
if (replicaProps != null && replicaProps.size() > 0) {
|
||||
for (ZkCoreNodeProps prop : replicaProps) {
|
||||
if (replicaCoreNodeName.equals(((Replica) prop.getNodeProps()).getName())) {
|
||||
String replicaState = prop.getState();
|
||||
if (ZkStateReader.ACTIVE.equals(replicaState)) {
|
||||
final Replica replica = (Replica) prop.getNodeProps();
|
||||
if (replicaCoreNodeName.equals(replica.getName())) {
|
||||
if (replica.getState() == Replica.State.ACTIVE) {
|
||||
// replica published its state as "active",
|
||||
// which is bad if lirState is still "down"
|
||||
if (ZkStateReader.DOWN.equals(lirState)) {
|
||||
if (lirState == Replica.State.DOWN) {
|
||||
// OK, so the replica thinks it is active, but it never ack'd the leader initiated recovery
|
||||
// so its state cannot be trusted and it needs to be told to recover again ... and we keep looping here
|
||||
log.warn("Replica core={} coreNodeName={} set to active but the leader thinks it should be in recovery;"
|
||||
|
@ -517,7 +517,7 @@ public class Overseer implements Closeable {
|
||||
}
|
||||
|
||||
private boolean isActive(Replica replica) {
|
||||
return ZkStateReader.ACTIVE.equals(replica.getStr(ZkStateReader.STATE_PROP));
|
||||
return replica.getState() == Replica.State.ACTIVE;
|
||||
}
|
||||
|
||||
// Collect a list of all the nodes that _can_ host the indicated property. Along the way, also collect any of
|
||||
|
@ -19,6 +19,7 @@ package org.apache.solr.cloud;
|
||||
|
||||
import com.google.common.cache.Cache;
|
||||
import com.google.common.cache.CacheBuilder;
|
||||
|
||||
import org.apache.solr.client.solrj.impl.HttpSolrClient;
|
||||
import org.apache.solr.client.solrj.request.CoreAdminRequest.Create;
|
||||
import org.apache.solr.common.SolrException;
|
||||
@ -271,13 +272,13 @@ public class OverseerAutoReplicaFailoverThread implements Runnable, Closeable {
|
||||
for (Replica replica : replicas) {
|
||||
// on a live node?
|
||||
boolean live = clusterState.liveNodesContain(replica.getNodeName());
|
||||
String state = replica.getStr(ZkStateReader.STATE_PROP);
|
||||
final Replica.State state = replica.getState();
|
||||
|
||||
boolean okayState = (state.equals(ZkStateReader.DOWN)
|
||||
|| state.equals(ZkStateReader.RECOVERING) || state
|
||||
.equals(ZkStateReader.ACTIVE));
|
||||
final boolean okayState = state == Replica.State.DOWN
|
||||
|| state == Replica.State.RECOVERING
|
||||
|| state == Replica.State.ACTIVE;
|
||||
|
||||
log.debug("Process replica name={} live={} state={}", replica.getName(), live, state);
|
||||
log.debug("Process replica name={} live={} state={}", replica.getName(), live, state.toString());
|
||||
|
||||
if (live && okayState) {
|
||||
goodReplicas++;
|
||||
@ -395,13 +396,10 @@ public class OverseerAutoReplicaFailoverThread implements Runnable, Closeable {
|
||||
if (replicas != null) {
|
||||
log.debug("check if replica already exists on node using replicas {}", getNames(replicas));
|
||||
for (Replica replica : replicas) {
|
||||
final Replica.State state = replica.getState();
|
||||
if (!replica.getName().equals(badReplica.replica.getName()) && replica.getStr(ZkStateReader.BASE_URL_PROP).equals(baseUrl)
|
||||
&& clusterState.liveNodesContain(replica.getNodeName())
|
||||
&& (replica.getStr(ZkStateReader.STATE_PROP).equals(
|
||||
ZkStateReader.ACTIVE)
|
||||
|| replica.getStr(ZkStateReader.STATE_PROP).equals(
|
||||
ZkStateReader.DOWN) || replica.getStr(
|
||||
ZkStateReader.STATE_PROP).equals(ZkStateReader.RECOVERING))) {
|
||||
&& (state == Replica.State.ACTIVE || state == Replica.State.DOWN || state == Replica.State.RECOVERING)) {
|
||||
log.debug("replica already exists on node, bad replica={}, existing replica={}, node name={}", badReplica.replica.getName(), replica.getName(), replica.getNodeName());
|
||||
return true;
|
||||
}
|
||||
|
@ -17,6 +17,11 @@ package org.apache.solr.cloud;
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import static org.apache.solr.cloud.Assign.*;
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.*;
|
||||
import static org.apache.solr.common.params.CollectionParams.CollectionAction.*;
|
||||
import static org.apache.solr.common.params.CommonParams.*;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
@ -37,7 +42,6 @@ import java.util.concurrent.SynchronousQueue;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.apache.solr.client.solrj.SolrResponse;
|
||||
import org.apache.solr.client.solrj.SolrServerException;
|
||||
@ -94,41 +98,13 @@ import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.slf4j.MDC;
|
||||
|
||||
import static org.apache.solr.cloud.Assign.getNodesForNewShard;
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.BASE_URL_PROP;
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP;
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.ELECTION_NODE_PROP;
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.NODE_NAME_PROP;
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.PROPERTY_PROP;
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.PROPERTY_VALUE_PROP;
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.REJOIN_AT_HEAD_PROP;
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.REPLICA_PROP;
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
|
||||
import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDREPLICA;
|
||||
import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDREPLICAPROP;
|
||||
import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDROLE;
|
||||
import static org.apache.solr.common.params.CollectionParams.CollectionAction.BALANCESHARDUNIQUE;
|
||||
import static org.apache.solr.common.params.CollectionParams.CollectionAction.CLUSTERSTATUS;
|
||||
import static org.apache.solr.common.params.CollectionParams.CollectionAction.CREATE;
|
||||
import static org.apache.solr.common.params.CollectionParams.CollectionAction.CREATESHARD;
|
||||
import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETE;
|
||||
import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETEREPLICAPROP;
|
||||
import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETESHARD;
|
||||
import static org.apache.solr.common.params.CollectionParams.CollectionAction.REMOVEROLE;
|
||||
import static org.apache.solr.common.params.CommonParams.NAME;
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
|
||||
|
||||
public class OverseerCollectionProcessor implements Runnable, Closeable {
|
||||
|
||||
public static final String NUM_SLICES = "numShards";
|
||||
|
||||
// @Deprecated- see on ZkStateReader
|
||||
public static final String REPLICATION_FACTOR = "replicationFactor";
|
||||
|
||||
// @Deprecated- see on ZkStateReader
|
||||
public static final String MAX_SHARDS_PER_NODE = "maxShardsPerNode";
|
||||
|
||||
static final boolean CREATE_NODE_SET_SHUFFLE_DEFAULT = true;
|
||||
public static final String CREATE_NODE_SET_SHUFFLE = "createNodeSet.shuffle";
|
||||
public static final String CREATE_NODE_SET = "createNodeSet";
|
||||
@ -582,7 +558,7 @@ public class OverseerCollectionProcessor implements Runnable, Closeable {
|
||||
case RELOAD:
|
||||
ModifiableSolrParams params = new ModifiableSolrParams();
|
||||
params.set(CoreAdminParams.ACTION, CoreAdminAction.RELOAD.toString());
|
||||
collectionCmd(zkStateReader.getClusterState(), message, params, results, ZkStateReader.ACTIVE);
|
||||
collectionCmd(zkStateReader.getClusterState(), message, params, results, Replica.State.ACTIVE);
|
||||
break;
|
||||
case CREATEALIAS:
|
||||
createAlias(zkStateReader.getAliases(), message);
|
||||
@ -932,12 +908,12 @@ public class OverseerCollectionProcessor implements Runnable, Closeable {
|
||||
Map<String,Object> replicas = (Map<String,Object>)shardMap.get("replicas");
|
||||
for (Object nextReplica : replicas.values()) {
|
||||
Map<String,Object> replicaMap = (Map<String,Object>)nextReplica;
|
||||
if (!ZkStateReader.DOWN.equals(replicaMap.get(ZkStateReader.STATE_PROP))) {
|
||||
if (Replica.State.getState((String) replicaMap.get(ZkStateReader.STATE_PROP)) != Replica.State.DOWN) {
|
||||
// not down, so verify the node is live
|
||||
String node_name = (String)replicaMap.get(ZkStateReader.NODE_NAME_PROP);
|
||||
if (!liveNodes.contains(node_name)) {
|
||||
// node is not live, so this replica is actually down
|
||||
replicaMap.put(ZkStateReader.STATE_PROP, ZkStateReader.DOWN);
|
||||
replicaMap.put(ZkStateReader.STATE_PROP, Replica.State.DOWN.toString());
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -1046,8 +1022,7 @@ public class OverseerCollectionProcessor implements Runnable, Closeable {
|
||||
|
||||
// If users are being safe and only want to remove a shard if it is down, they can specify onlyIfDown=true
|
||||
// on the command.
|
||||
if (Boolean.parseBoolean(message.getStr(ONLY_IF_DOWN)) &&
|
||||
ZkStateReader.DOWN.equals(replica.getStr(ZkStateReader.STATE_PROP)) == false) {
|
||||
if (Boolean.parseBoolean(message.getStr(ONLY_IF_DOWN)) && replica.getState() != Replica.State.DOWN) {
|
||||
throw new SolrException(ErrorCode.BAD_REQUEST, "Attempted to remove replica : " + collectionName + "/" +
|
||||
shard + "/" + replicaName +
|
||||
" with onlyIfDown='true', but state is '" + replica.getStr(ZkStateReader.STATE_PROP) + "'");
|
||||
@ -1073,7 +1048,7 @@ public class OverseerCollectionProcessor implements Runnable, Closeable {
|
||||
log.warn("Exception trying to unload core " + sreq, e);
|
||||
}
|
||||
|
||||
collectShardResponses(!ZkStateReader.ACTIVE.equals(replica.getStr(ZkStateReader.STATE_PROP)) ? new NamedList() : results,
|
||||
collectShardResponses(replica.getState() != Replica.State.ACTIVE ? new NamedList() : results,
|
||||
false, null, shardHandler);
|
||||
|
||||
if (waitForCoreNodeGone(collectionName, shard, replicaName, 5000))
|
||||
@ -1586,7 +1561,7 @@ public class OverseerCollectionProcessor implements Runnable, Closeable {
|
||||
cmd.setCoreName(subShardName);
|
||||
cmd.setNodeName(nodeName);
|
||||
cmd.setCoreNodeName(coreNodeName);
|
||||
cmd.setState(ZkStateReader.ACTIVE);
|
||||
cmd.setState(Replica.State.ACTIVE);
|
||||
cmd.setCheckLive(true);
|
||||
cmd.setOnlyIfLeader(true);
|
||||
|
||||
@ -1709,7 +1684,7 @@ public class OverseerCollectionProcessor implements Runnable, Closeable {
|
||||
cmd.setCoreName(subShardNames.get(i - 1));
|
||||
cmd.setNodeName(subShardNodeName);
|
||||
cmd.setCoreNodeName(coreNodeName);
|
||||
cmd.setState(ZkStateReader.RECOVERING);
|
||||
cmd.setState(Replica.State.RECOVERING);
|
||||
cmd.setCheckLive(true);
|
||||
cmd.setOnlyIfLeader(true);
|
||||
ModifiableSolrParams p = new ModifiableSolrParams(cmd.getParams());
|
||||
@ -2105,7 +2080,7 @@ public class OverseerCollectionProcessor implements Runnable, Closeable {
|
||||
cmd.setCoreName(tempCollectionReplica1);
|
||||
cmd.setNodeName(sourceLeader.getNodeName());
|
||||
cmd.setCoreNodeName(coreNodeName);
|
||||
cmd.setState(ZkStateReader.ACTIVE);
|
||||
cmd.setState(Replica.State.ACTIVE);
|
||||
cmd.setCheckLive(true);
|
||||
cmd.setOnlyIfLeader(true);
|
||||
// we don't want this to happen asynchronously
|
||||
@ -2164,7 +2139,7 @@ public class OverseerCollectionProcessor implements Runnable, Closeable {
|
||||
cmd.setCoreName(tempSourceLeader.getStr("core"));
|
||||
cmd.setNodeName(targetLeader.getNodeName());
|
||||
cmd.setCoreNodeName(coreNodeName);
|
||||
cmd.setState(ZkStateReader.ACTIVE);
|
||||
cmd.setState(Replica.State.ACTIVE);
|
||||
cmd.setCheckLive(true);
|
||||
cmd.setOnlyIfLeader(true);
|
||||
params = new ModifiableSolrParams(cmd.getParams());
|
||||
@ -2406,7 +2381,7 @@ public class OverseerCollectionProcessor implements Runnable, Closeable {
|
||||
ZkStateReader.COLLECTION_PROP, collectionName,
|
||||
ZkStateReader.SHARD_ID_PROP, sliceName,
|
||||
ZkStateReader.CORE_NAME_PROP, coreName,
|
||||
ZkStateReader.STATE_PROP, ZkStateReader.DOWN,
|
||||
ZkStateReader.STATE_PROP, Replica.State.DOWN.toString(),
|
||||
ZkStateReader.BASE_URL_PROP,baseUrl);
|
||||
Overseer.getInQueue(zkStateReader.getZkClient()).offer(ZkStateReader.toJSON(props));
|
||||
}
|
||||
@ -2551,7 +2526,7 @@ public class OverseerCollectionProcessor implements Runnable, Closeable {
|
||||
ZkStateReader.COLLECTION_PROP, collection,
|
||||
ZkStateReader.SHARD_ID_PROP, shard,
|
||||
ZkStateReader.CORE_NAME_PROP, coreName,
|
||||
ZkStateReader.STATE_PROP, ZkStateReader.DOWN,
|
||||
ZkStateReader.STATE_PROP, Replica.State.DOWN.toString(),
|
||||
ZkStateReader.BASE_URL_PROP, zkStateReader.getBaseUrlForNodeName(node));
|
||||
Overseer.getInQueue(zkStateReader.getZkClient()).offer(ZkStateReader.toJSON(props));
|
||||
params.set(CoreAdminParams.CORE_NODE_NAME, waitToSeeReplicasInState(collection, Collections.singletonList(coreName)).get(coreName).getName());
|
||||
@ -2661,7 +2636,7 @@ public class OverseerCollectionProcessor implements Runnable, Closeable {
|
||||
|
||||
}
|
||||
|
||||
private void collectionCmd(ClusterState clusterState, ZkNodeProps message, ModifiableSolrParams params, NamedList results, String stateMatcher) {
|
||||
private void collectionCmd(ClusterState clusterState, ZkNodeProps message, ModifiableSolrParams params, NamedList results, Replica.State stateMatcher) {
|
||||
log.info("Executing Collection Cmd : " + params);
|
||||
String collectionName = message.getStr(NAME);
|
||||
ShardHandler shardHandler = shardHandlerFactory.getShardHandler();
|
||||
@ -2677,18 +2652,18 @@ public class OverseerCollectionProcessor implements Runnable, Closeable {
|
||||
|
||||
}
|
||||
|
||||
private void sliceCmd(ClusterState clusterState, ModifiableSolrParams params, String stateMatcher,
|
||||
private void sliceCmd(ClusterState clusterState, ModifiableSolrParams params, Replica.State stateMatcher,
|
||||
Slice slice, ShardHandler shardHandler) {
|
||||
Map<String,Replica> shards = slice.getReplicasMap();
|
||||
Set<Map.Entry<String,Replica>> shardEntries = shards.entrySet();
|
||||
for (Map.Entry<String,Replica> shardEntry : shardEntries) {
|
||||
final ZkNodeProps node = shardEntry.getValue();
|
||||
if (clusterState.liveNodesContain(node.getStr(ZkStateReader.NODE_NAME_PROP)) && (stateMatcher != null ? node.getStr(ZkStateReader.STATE_PROP).equals(stateMatcher) : true)) {
|
||||
if (clusterState.liveNodesContain(node.getStr(ZkStateReader.NODE_NAME_PROP))
|
||||
&& (stateMatcher == null || Replica.State.getState(node.getStr(ZkStateReader.STATE_PROP)) == stateMatcher)) {
|
||||
// For thread safety, only simple clone the ModifiableSolrParams
|
||||
ModifiableSolrParams cloneParams = new ModifiableSolrParams();
|
||||
cloneParams.add(params);
|
||||
cloneParams.set(CoreAdminParams.CORE,
|
||||
node.getStr(ZkStateReader.CORE_NAME_PROP));
|
||||
cloneParams.set(CoreAdminParams.CORE, node.getStr(ZkStateReader.CORE_NAME_PROP));
|
||||
|
||||
String replica = node.getStr(ZkStateReader.BASE_URL_PROP);
|
||||
ShardRequest sreq = new ShardRequest();
|
||||
|
@ -37,6 +37,7 @@ import org.apache.solr.client.solrj.request.UpdateRequest;
|
||||
import org.apache.solr.common.SolrException;
|
||||
import org.apache.solr.common.SolrException.ErrorCode;
|
||||
import org.apache.solr.common.cloud.ClosableThread;
|
||||
import org.apache.solr.common.cloud.Replica;
|
||||
import org.apache.solr.common.cloud.Slice;
|
||||
import org.apache.solr.common.cloud.ZkCoreNodeProps;
|
||||
import org.apache.solr.common.cloud.ZkNodeProps;
|
||||
@ -126,7 +127,7 @@ public class RecoveryStrategy extends Thread implements ClosableThread {
|
||||
final String shardZkNodeName, final CoreDescriptor cd) throws KeeperException, InterruptedException {
|
||||
SolrException.log(log, "Recovery failed - I give up. core=" + coreName);
|
||||
try {
|
||||
zkController.publish(cd, ZkStateReader.RECOVERY_FAILED);
|
||||
zkController.publish(cd, Replica.State.RECOVERY_FAILED);
|
||||
} finally {
|
||||
close();
|
||||
recoveryListener.failed();
|
||||
@ -338,12 +339,12 @@ public class RecoveryStrategy extends Thread implements ClosableThread {
|
||||
// we are now the leader - no one else must have been suitable
|
||||
log.warn("We have not yet recovered - but we are now the leader! core=" + coreName);
|
||||
log.info("Finished recovery process. core=" + coreName);
|
||||
zkController.publish(core.getCoreDescriptor(), ZkStateReader.ACTIVE);
|
||||
zkController.publish(core.getCoreDescriptor(), Replica.State.ACTIVE);
|
||||
return;
|
||||
}
|
||||
|
||||
log.info("Publishing state of core "+core.getName()+" as recovering, leader is "+leaderUrl+" and I am "+ourUrl);
|
||||
zkController.publish(core.getCoreDescriptor(), ZkStateReader.RECOVERING);
|
||||
zkController.publish(core.getCoreDescriptor(), Replica.State.RECOVERING);
|
||||
|
||||
|
||||
final Slice slice = zkStateReader.getClusterState().getSlice(cloudDesc.getCollectionName(), cloudDesc.getShardId());
|
||||
@ -413,8 +414,7 @@ public class RecoveryStrategy extends Thread implements ClosableThread {
|
||||
}
|
||||
|
||||
// sync success - register as active and return
|
||||
zkController.publish(core.getCoreDescriptor(),
|
||||
ZkStateReader.ACTIVE);
|
||||
zkController.publish(core.getCoreDescriptor(), Replica.State.ACTIVE);
|
||||
successfulRecovery = true;
|
||||
close = true;
|
||||
return;
|
||||
@ -453,7 +453,7 @@ public class RecoveryStrategy extends Thread implements ClosableThread {
|
||||
|
||||
log.info("Replication Recovery was successful - registering as Active. core=" + coreName);
|
||||
// if there are pending recovery requests, don't advert as active
|
||||
zkController.publish(core.getCoreDescriptor(), ZkStateReader.ACTIVE);
|
||||
zkController.publish(core.getCoreDescriptor(), Replica.State.ACTIVE);
|
||||
close = true;
|
||||
successfulRecovery = true;
|
||||
recoveryListener.recovered();
|
||||
@ -577,7 +577,7 @@ public class RecoveryStrategy extends Thread implements ClosableThread {
|
||||
prepCmd.setCoreName(leaderCoreName);
|
||||
prepCmd.setNodeName(zkController.getNodeName());
|
||||
prepCmd.setCoreNodeName(coreZkNodeName);
|
||||
prepCmd.setState(ZkStateReader.RECOVERING);
|
||||
prepCmd.setState(Replica.State.RECOVERING);
|
||||
prepCmd.setCheckLive(true);
|
||||
prepCmd.setOnlyIfLeader(true);
|
||||
final Slice.State state = slice.getState();
|
||||
|
@ -106,7 +106,7 @@ import org.slf4j.MDC;
|
||||
*/
|
||||
public final class ZkController {
|
||||
|
||||
private static Logger log = LoggerFactory.getLogger(ZkController.class);
|
||||
private static final Logger log = LoggerFactory.getLogger(ZkController.class);
|
||||
|
||||
private final boolean SKIP_AUTO_RECOVERY = Boolean.getBoolean("solrcloud.skip.autorecovery");
|
||||
|
||||
@ -379,7 +379,7 @@ public final class ZkController {
|
||||
for (CoreDescriptor descriptor : descriptors) {
|
||||
try {
|
||||
descriptor.getCloudDescriptor().setLeader(false);
|
||||
publish(descriptor, ZkStateReader.DOWN, updateLastPublished);
|
||||
publish(descriptor, Replica.State.DOWN, updateLastPublished);
|
||||
} catch (Exception e) {
|
||||
if (isClosed) {
|
||||
return;
|
||||
@ -390,7 +390,7 @@ public final class ZkController {
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
try {
|
||||
publish(descriptor, ZkStateReader.DOWN);
|
||||
publish(descriptor, Replica.State.DOWN);
|
||||
} catch (Exception e2) {
|
||||
SolrException.log(log, "", e2);
|
||||
continue;
|
||||
@ -668,10 +668,9 @@ public final class ZkController {
|
||||
Collection<Replica> replicas = slice.getReplicas();
|
||||
for (Replica replica : replicas) {
|
||||
if (getNodeName().equals(replica.getNodeName())
|
||||
&& !(replica.getStr(ZkStateReader.STATE_PROP)
|
||||
.equals(ZkStateReader.DOWN))) {
|
||||
&& replica.getState() != Replica.State.DOWN) {
|
||||
ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, "state",
|
||||
ZkStateReader.STATE_PROP, ZkStateReader.DOWN,
|
||||
ZkStateReader.STATE_PROP, Replica.State.DOWN.toString(),
|
||||
ZkStateReader.BASE_URL_PROP, getBaseUrl(),
|
||||
ZkStateReader.CORE_NAME_PROP,
|
||||
replica.getStr(ZkStateReader.CORE_NAME_PROP),
|
||||
@ -702,8 +701,7 @@ public final class ZkController {
|
||||
for (Slice slice : slices) {
|
||||
Collection<Replica> replicas = slice.getReplicas();
|
||||
for (Replica replica : replicas) {
|
||||
if (replica.getStr(ZkStateReader.STATE_PROP).equals(
|
||||
ZkStateReader.DOWN)) {
|
||||
if (replica.getState() == Replica.State.DOWN) {
|
||||
updatedNodes.remove(replica.getStr(ZkStateReader.CORE_NAME_PROP));
|
||||
|
||||
}
|
||||
@ -903,7 +901,7 @@ public final class ZkController {
|
||||
boolean didRecovery = checkRecovery(coreName, desc, recoverReloadedCores, isLeader, cloudDesc,
|
||||
collection, coreZkNodeName, shardId, leaderProps, core, cc);
|
||||
if (!didRecovery) {
|
||||
publish(desc, ZkStateReader.ACTIVE);
|
||||
publish(desc, Replica.State.ACTIVE);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -1072,9 +1070,9 @@ public final class ZkController {
|
||||
}
|
||||
|
||||
// see if the leader told us to recover
|
||||
String lirState = getLeaderInitiatedRecoveryState(collection, shardId,
|
||||
final Replica.State lirState = getLeaderInitiatedRecoveryState(collection, shardId,
|
||||
core.getCoreDescriptor().getCloudDescriptor().getCoreNodeName());
|
||||
if (ZkStateReader.DOWN.equals(lirState)) {
|
||||
if (lirState == Replica.State.DOWN) {
|
||||
log.info("Leader marked core " + core.getName() + " down; starting recovery process");
|
||||
core.getUpdateHandler().getSolrCoreState().doRecovery(cc, core.getCoreDescriptor());
|
||||
return true;
|
||||
@ -1091,18 +1089,18 @@ public final class ZkController {
|
||||
return baseURL;
|
||||
}
|
||||
|
||||
public void publish(final CoreDescriptor cd, final String state) throws KeeperException, InterruptedException {
|
||||
public void publish(final CoreDescriptor cd, final Replica.State state) throws KeeperException, InterruptedException {
|
||||
publish(cd, state, true);
|
||||
}
|
||||
|
||||
public void publish(final CoreDescriptor cd, final String state, boolean updateLastState) throws KeeperException, InterruptedException {
|
||||
public void publish(final CoreDescriptor cd, final Replica.State state, boolean updateLastState) throws KeeperException, InterruptedException {
|
||||
publish(cd, state, updateLastState, false);
|
||||
}
|
||||
|
||||
/**
|
||||
* Publish core state to overseer.
|
||||
*/
|
||||
public void publish(final CoreDescriptor cd, final String state, boolean updateLastState, boolean forcePublish) throws KeeperException, InterruptedException {
|
||||
public void publish(final CoreDescriptor cd, final Replica.State state, boolean updateLastState, boolean forcePublish) throws KeeperException, InterruptedException {
|
||||
if (!forcePublish) {
|
||||
try (SolrCore core = cc.getCore(cd.getName())) {
|
||||
if (core == null || core.isClosed()) {
|
||||
@ -1118,7 +1116,7 @@ public final class ZkController {
|
||||
try {
|
||||
if (cd != null && cd.getName() != null)
|
||||
MDCUtils.setCore(cd.getName());
|
||||
log.info("publishing core={} state={} collection={}", cd.getName(), state, collection);
|
||||
log.info("publishing core={} state={} collection={}", cd.getName(), state.toString(), collection);
|
||||
//System.out.println(Thread.currentThread().getStackTrace()[3]);
|
||||
Integer numShards = cd.getCloudDescriptor().getNumShards();
|
||||
if (numShards == null) { //XXX sys prop hack
|
||||
@ -1133,21 +1131,21 @@ public final class ZkController {
|
||||
String coreNodeName = cd.getCloudDescriptor().getCoreNodeName();
|
||||
// If the leader initiated recovery, then verify that this replica has performed
|
||||
// recovery as requested before becoming active; don't even look at lirState if going down
|
||||
if (!ZkStateReader.DOWN.equals(state)) {
|
||||
String lirState = getLeaderInitiatedRecoveryState(collection, shardId, coreNodeName);
|
||||
if (state != Replica.State.DOWN) {
|
||||
final Replica.State lirState = getLeaderInitiatedRecoveryState(collection, shardId, coreNodeName);
|
||||
if (lirState != null) {
|
||||
if (ZkStateReader.ACTIVE.equals(state)) {
|
||||
if (state == Replica.State.ACTIVE) {
|
||||
// trying to become active, so leader-initiated state must be recovering
|
||||
if (ZkStateReader.RECOVERING.equals(lirState)) {
|
||||
updateLeaderInitiatedRecoveryState(collection, shardId, coreNodeName, ZkStateReader.ACTIVE, null, true);
|
||||
} else if (ZkStateReader.DOWN.equals(lirState)) {
|
||||
if (lirState == Replica.State.RECOVERING) {
|
||||
updateLeaderInitiatedRecoveryState(collection, shardId, coreNodeName, Replica.State.ACTIVE, null);
|
||||
} else if (lirState == Replica.State.DOWN) {
|
||||
throw new SolrException(ErrorCode.INVALID_STATE,
|
||||
"Cannot publish state of core '" + cd.getName() + "' as active without recovering first!");
|
||||
}
|
||||
} else if (ZkStateReader.RECOVERING.equals(state)) {
|
||||
} else if (state == Replica.State.RECOVERING) {
|
||||
// if it is currently DOWN, then trying to enter into recovering state is good
|
||||
if (ZkStateReader.DOWN.equals(lirState)) {
|
||||
updateLeaderInitiatedRecoveryState(collection, shardId, coreNodeName, ZkStateReader.RECOVERING, null, true);
|
||||
if (lirState == Replica.State.DOWN) {
|
||||
updateLeaderInitiatedRecoveryState(collection, shardId, coreNodeName, Replica.State.RECOVERING, null);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -1155,7 +1153,7 @@ public final class ZkController {
|
||||
|
||||
Map<String, Object> props = new HashMap<>();
|
||||
props.put(Overseer.QUEUE_OPERATION, "state");
|
||||
props.put(ZkStateReader.STATE_PROP, state);
|
||||
props.put(ZkStateReader.STATE_PROP, state.toString());
|
||||
props.put(ZkStateReader.BASE_URL_PROP, getBaseUrl());
|
||||
props.put(ZkStateReader.CORE_NAME_PROP, cd.getName());
|
||||
props.put(ZkStateReader.ROLES_PROP, cd.getCloudDescriptor().getRoles());
|
||||
@ -1250,7 +1248,7 @@ public final class ZkController {
|
||||
|
||||
if (configLocation != null) {
|
||||
synchronized (confDirectoryListeners) {
|
||||
log.info("This conf directory is no more watched {0}", configLocation);
|
||||
log.info("This conf directory is no more watched {}", configLocation);
|
||||
confDirectoryListeners.remove(configLocation);
|
||||
}
|
||||
}
|
||||
@ -1498,7 +1496,7 @@ public final class ZkController {
|
||||
cloudDesc.setCoreNodeName(coreNodeName);
|
||||
}
|
||||
|
||||
publish(cd, ZkStateReader.DOWN, false, true);
|
||||
publish(cd, Replica.State.DOWN, false, true);
|
||||
DocCollection collection = zkStateReader.getClusterState().getCollectionOrNull(cd.getCloudDescriptor().getCollectionName());
|
||||
if (collection != null && collection.getStateFormat() > 1) {
|
||||
log.info("Registering watch for external collection {}", cd.getCloudDescriptor().getCollectionName());
|
||||
@ -1600,7 +1598,7 @@ public final class ZkController {
|
||||
|
||||
// detect if this core is in leader-initiated recovery and if so,
|
||||
// then we don't need the leader to wait on seeing the down state
|
||||
String lirState = null;
|
||||
Replica.State lirState = null;
|
||||
try {
|
||||
lirState = getLeaderInitiatedRecoveryState(collection, shard, myCoreNodeName);
|
||||
} catch (Exception exc) {
|
||||
@ -1623,7 +1621,7 @@ public final class ZkController {
|
||||
prepCmd.setCoreName(leaderCoreName);
|
||||
prepCmd.setNodeName(getNodeName());
|
||||
prepCmd.setCoreNodeName(coreZkNodeName);
|
||||
prepCmd.setState(ZkStateReader.DOWN);
|
||||
prepCmd.setState(Replica.State.DOWN);
|
||||
|
||||
// let's retry a couple times - perhaps the leader just went down,
|
||||
// or perhaps he is just not quite ready for us yet
|
||||
@ -1941,7 +1939,7 @@ public final class ZkController {
|
||||
// we only really need to try to send the recovery command if the node itself is "live"
|
||||
if (getZkStateReader().getClusterState().liveNodesContain(replicaNodeName)) {
|
||||
// create a znode that requires the replica needs to "ack" to verify it knows it was out-of-sync
|
||||
updateLeaderInitiatedRecoveryState(collection, shardId, replicaCoreNodeName, ZkStateReader.DOWN, leaderCoreNodeName, retryOnConnLoss);
|
||||
updateLeaderInitiatedRecoveryState(collection, shardId, replicaCoreNodeName, Replica.State.DOWN, leaderCoreNodeName);
|
||||
replicasInLeaderInitiatedRecovery.put(replicaUrl,
|
||||
getLeaderInitiatedRecoveryZnodePath(collection, shardId, replicaCoreNodeName));
|
||||
log.info("Put replica core={} coreNodeName={} on " +
|
||||
@ -1960,14 +1958,14 @@ public final class ZkController {
|
||||
if (publishDownState || forcePublishState) {
|
||||
String replicaCoreName = replicaCoreProps.getCoreName();
|
||||
ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, "state",
|
||||
ZkStateReader.STATE_PROP, ZkStateReader.DOWN,
|
||||
ZkStateReader.STATE_PROP, Replica.State.DOWN.toString(),
|
||||
ZkStateReader.BASE_URL_PROP, replicaCoreProps.getBaseUrl(),
|
||||
ZkStateReader.CORE_NAME_PROP, replicaCoreProps.getCoreName(),
|
||||
ZkStateReader.NODE_NAME_PROP, replicaCoreProps.getNodeName(),
|
||||
ZkStateReader.SHARD_ID_PROP, shardId,
|
||||
ZkStateReader.COLLECTION_PROP, collection);
|
||||
log.warn("Leader is publishing core={} coreNodeName ={} state={} on behalf of un-reachable replica {}; forcePublishState? " + forcePublishState,
|
||||
replicaCoreName, replicaCoreNodeName, ZkStateReader.DOWN, replicaUrl);
|
||||
replicaCoreName, replicaCoreNodeName, Replica.State.DOWN.toString(), replicaUrl);
|
||||
overseerJobQueue.offer(ZkStateReader.toJSON(m));
|
||||
}
|
||||
|
||||
@ -1988,9 +1986,13 @@ public final class ZkController {
|
||||
}
|
||||
}
|
||||
|
||||
public String getLeaderInitiatedRecoveryState(String collection, String shardId, String coreNodeName) {
|
||||
Map<String, Object> stateObj = getLeaderInitiatedRecoveryStateObject(collection, shardId, coreNodeName);
|
||||
return (stateObj != null) ? (String) stateObj.get("state") : null;
|
||||
public Replica.State getLeaderInitiatedRecoveryState(String collection, String shardId, String coreNodeName) {
|
||||
final Map<String, Object> stateObj = getLeaderInitiatedRecoveryStateObject(collection, shardId, coreNodeName);
|
||||
if (stateObj == null) {
|
||||
return null;
|
||||
}
|
||||
final String stateStr = (String) stateObj.get(ZkStateReader.STATE_PROP);
|
||||
return stateStr == null ? null : Replica.State.getState(stateStr);
|
||||
}
|
||||
|
||||
public Map<String, Object> getLeaderInitiatedRecoveryStateObject(String collection, String shardId, String coreNodeName) {
|
||||
@ -2037,17 +2039,18 @@ public final class ZkController {
|
||||
return stateObj;
|
||||
}
|
||||
|
||||
private void updateLeaderInitiatedRecoveryState(String collection, String shardId, String coreNodeName, String state,
|
||||
String leaderCoreNodeName, boolean retryOnConnLoss) {
|
||||
private void updateLeaderInitiatedRecoveryState(String collection, String shardId, String coreNodeName,
|
||||
Replica.State state, String leaderCoreNodeName) {
|
||||
if (collection == null || shardId == null || coreNodeName == null) {
|
||||
log.warn("Cannot set leader-initiated recovery state znode to " + state + " using: collection=" + collection +
|
||||
"; shardId=" + shardId + "; coreNodeName=" + coreNodeName);
|
||||
log.warn("Cannot set leader-initiated recovery state znode to "
|
||||
+ state.toString() + " using: collection=" + collection
|
||||
+ "; shardId=" + shardId + "; coreNodeName=" + coreNodeName);
|
||||
return; // if we don't have complete data about a core in cloud mode, do nothing
|
||||
}
|
||||
|
||||
String znodePath = getLeaderInitiatedRecoveryZnodePath(collection, shardId, coreNodeName);
|
||||
|
||||
if (ZkStateReader.ACTIVE.equals(state)) {
|
||||
if (state == Replica.State.ACTIVE) {
|
||||
// since we're marking it active, we don't need this znode anymore, so delete instead of update
|
||||
try {
|
||||
zkClient.delete(znodePath, -1, false);
|
||||
@ -2066,7 +2069,7 @@ public final class ZkController {
|
||||
if (stateObj == null)
|
||||
stateObj = ZkNodeProps.makeMap();
|
||||
|
||||
stateObj.put("state", state);
|
||||
stateObj.put(ZkStateReader.STATE_PROP, state.toString());
|
||||
// only update the createdBy value if it's not set
|
||||
if (stateObj.get("createdByNodeName") == null)
|
||||
stateObj.put("createdByNodeName", String.valueOf(this.nodeName));
|
||||
@ -2074,7 +2077,7 @@ public final class ZkController {
|
||||
byte[] znodeData = ZkStateReader.toJSON(stateObj);
|
||||
|
||||
try {
|
||||
if (ZkStateReader.DOWN.equals(state)) {
|
||||
if (state == Replica.State.DOWN) {
|
||||
markShardAsDownIfLeader(collection, shardId, leaderCoreNodeName, znodePath, znodeData);
|
||||
} else {
|
||||
if (zkClient.exists(znodePath, true)) {
|
||||
@ -2083,13 +2086,13 @@ public final class ZkController {
|
||||
zkClient.makePath(znodePath, znodeData, true);
|
||||
}
|
||||
}
|
||||
log.info("Wrote " + state + " to " + znodePath);
|
||||
log.info("Wrote {} to {}", state.toString(), znodePath);
|
||||
} catch (Exception exc) {
|
||||
if (exc instanceof SolrException) {
|
||||
throw (SolrException) exc;
|
||||
} else {
|
||||
throw new SolrException(ErrorCode.SERVER_ERROR,
|
||||
"Failed to update data to " + state + " for znode: " + znodePath, exc);
|
||||
"Failed to update data to " + state.toString() + " for znode: " + znodePath, exc);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -314,7 +314,7 @@ public class ReplicaMutator {
|
||||
DocCollection collection = prevState.getCollectionOrNull(collectionName);
|
||||
if (slice != null) {
|
||||
collection = prevState.getCollection(collectionName);
|
||||
collection = checkAndCompleteShardSplit(prevState, collection, coreNodeName, sliceName, replicaProps);
|
||||
collection = checkAndCompleteShardSplit(prevState, collection, coreNodeName, sliceName, replica);
|
||||
// get the current slice again because it may have been updated due to checkAndCompleteShardSplit method
|
||||
slice = collection.getSlice(sliceName);
|
||||
sliceProps = slice.getProperties();
|
||||
@ -355,19 +355,19 @@ public class ReplicaMutator {
|
||||
return updateState(clusterState, message);
|
||||
}
|
||||
|
||||
private DocCollection checkAndCompleteShardSplit(ClusterState prevState, DocCollection collection, String coreNodeName, String sliceName, Map<String, Object> replicaProps) {
|
||||
private DocCollection checkAndCompleteShardSplit(ClusterState prevState, DocCollection collection, String coreNodeName, String sliceName, Replica replica) {
|
||||
Slice slice = collection.getSlice(sliceName);
|
||||
Map<String, Object> sliceProps = slice.getProperties();
|
||||
if (slice.getState() == Slice.State.RECOVERY) {
|
||||
log.info("Shard: {} is in recovery state", sliceName);
|
||||
// is this replica active?
|
||||
if (ZkStateReader.ACTIVE.equals(replicaProps.get(ZkStateReader.STATE_PROP))) {
|
||||
if (replica.getState() == Replica.State.ACTIVE) {
|
||||
log.info("Shard: {} is in recovery state and coreNodeName: {} is active", sliceName, coreNodeName);
|
||||
// are all other replicas also active?
|
||||
boolean allActive = true;
|
||||
for (Map.Entry<String, Replica> entry : slice.getReplicasMap().entrySet()) {
|
||||
if (coreNodeName.equals(entry.getKey())) continue;
|
||||
if (!ZkStateReader.ACTIVE.equals(entry.getValue().getStr(ZkStateReader.STATE_PROP))) {
|
||||
if (entry.getValue().getState() != Replica.State.ACTIVE) {
|
||||
allActive = false;
|
||||
break;
|
||||
}
|
||||
@ -387,7 +387,7 @@ public class ReplicaMutator {
|
||||
log.info("Shard: {} - Fellow sub-shard: {} found", sliceName, otherSlice.getName());
|
||||
// this is a fellow sub shard so check if all replicas are active
|
||||
for (Map.Entry<String, Replica> sliceEntry : otherSlice.getReplicasMap().entrySet()) {
|
||||
if (!ZkStateReader.ACTIVE.equals(sliceEntry.getValue().getStr(ZkStateReader.STATE_PROP))) {
|
||||
if (sliceEntry.getValue().getState() != Replica.State.ACTIVE) {
|
||||
allActive = false;
|
||||
break outer;
|
||||
}
|
||||
|
@ -136,7 +136,7 @@ public class JarRepository {
|
||||
List<Replica> replicas = new ArrayList<>(slice.getReplicasMap().values());
|
||||
Collections.shuffle(replicas, RANDOM);
|
||||
for (Replica r : replicas) {
|
||||
if (ZkStateReader.ACTIVE.equals(r.getStr(ZkStateReader.STATE_PROP))) {
|
||||
if (r.getState() == Replica.State.ACTIVE) {
|
||||
if(zkStateReader.getClusterState().getLiveNodes().contains(r.get(ZkStateReader.NODE_NAME_PROP))){
|
||||
replica = r;
|
||||
break;
|
||||
|
@ -17,19 +17,6 @@ package org.apache.solr.core;
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import org.apache.solr.cloud.CurrentCoreDescriptorProvider;
|
||||
import org.apache.solr.cloud.SolrZkServer;
|
||||
import org.apache.solr.cloud.ZkController;
|
||||
import org.apache.solr.common.SolrException;
|
||||
import org.apache.solr.common.cloud.ZkConfigManager;
|
||||
import org.apache.solr.common.cloud.ZkStateReader;
|
||||
import org.apache.solr.common.cloud.ZooKeeperException;
|
||||
import org.apache.solr.common.util.ExecutorUtil;
|
||||
import org.apache.solr.util.DefaultSolrThreadFactory;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
@ -41,6 +28,19 @@ import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
import org.apache.solr.cloud.CurrentCoreDescriptorProvider;
|
||||
import org.apache.solr.cloud.SolrZkServer;
|
||||
import org.apache.solr.cloud.ZkController;
|
||||
import org.apache.solr.common.SolrException;
|
||||
import org.apache.solr.common.cloud.Replica;
|
||||
import org.apache.solr.common.cloud.ZkConfigManager;
|
||||
import org.apache.solr.common.cloud.ZooKeeperException;
|
||||
import org.apache.solr.common.util.ExecutorUtil;
|
||||
import org.apache.solr.util.DefaultSolrThreadFactory;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
public class ZkContainer {
|
||||
protected static Logger log = LoggerFactory.getLogger(ZkContainer.class);
|
||||
|
||||
@ -185,7 +185,7 @@ public class ZkContainer {
|
||||
SolrException.log(log, "", e);
|
||||
} catch (Exception e) {
|
||||
try {
|
||||
zkController.publish(core.getCoreDescriptor(), ZkStateReader.DOWN);
|
||||
zkController.publish(core.getCoreDescriptor(), Replica.State.DOWN);
|
||||
} catch (InterruptedException e1) {
|
||||
Thread.currentThread().interrupt();
|
||||
log.error("", e1);
|
||||
@ -215,7 +215,7 @@ public class ZkContainer {
|
||||
|
||||
for (SolrCore core : cores) {
|
||||
try {
|
||||
zkController.publish(core.getCoreDescriptor(), ZkStateReader.DOWN);
|
||||
zkController.publish(core.getCoreDescriptor(), Replica.State.DOWN);
|
||||
} catch (KeeperException e) {
|
||||
CoreContainer.log.error("", e);
|
||||
} catch (InterruptedException e) {
|
||||
|
@ -722,8 +722,7 @@ public class SolrConfigHandler extends RequestHandlerBase {
|
||||
if (replicasMap != null) {
|
||||
for (Map.Entry<String, Replica> entry : replicasMap.entrySet()) {
|
||||
Replica replica = entry.getValue();
|
||||
if (ZkStateReader.ACTIVE.equals(replica.getStr(ZkStateReader.STATE_PROP)) &&
|
||||
liveNodes.contains(replica.getNodeName())) {
|
||||
if (replica.getState() == Replica.State.ACTIVE && liveNodes.contains(replica.getNodeName())) {
|
||||
activeReplicaCoreUrls.add(replica.getCoreUrl());
|
||||
}
|
||||
}
|
||||
|
@ -17,6 +17,14 @@ package org.apache.solr.handler.admin;
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import static org.apache.solr.cloud.Overseer.*;
|
||||
import static org.apache.solr.cloud.OverseerCollectionProcessor.*;
|
||||
import static org.apache.solr.common.cloud.DocCollection.*;
|
||||
import static org.apache.solr.common.cloud.ZkNodeProps.*;
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.*;
|
||||
import static org.apache.solr.common.params.CollectionParams.CollectionAction.*;
|
||||
import static org.apache.solr.common.params.CommonParams.*;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.ArrayList;
|
||||
@ -28,7 +36,6 @@ import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.apache.solr.client.solrj.SolrResponse;
|
||||
import org.apache.solr.client.solrj.SolrServerException;
|
||||
@ -70,55 +77,7 @@ import org.apache.zookeeper.KeeperException;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import static org.apache.solr.cloud.Overseer.QUEUE_OPERATION;
|
||||
import static org.apache.solr.cloud.OverseerCollectionProcessor.ASYNC;
|
||||
import static org.apache.solr.cloud.OverseerCollectionProcessor.COLL_CONF;
|
||||
import static org.apache.solr.cloud.OverseerCollectionProcessor.CREATE_NODE_SET;
|
||||
import static org.apache.solr.cloud.OverseerCollectionProcessor.CREATE_NODE_SET_SHUFFLE;
|
||||
import static org.apache.solr.cloud.OverseerCollectionProcessor.NUM_SLICES;
|
||||
import static org.apache.solr.cloud.OverseerCollectionProcessor.ONLY_ACTIVE_NODES;
|
||||
import static org.apache.solr.cloud.OverseerCollectionProcessor.ONLY_IF_DOWN;
|
||||
import static org.apache.solr.cloud.OverseerCollectionProcessor.REPLICATION_FACTOR;
|
||||
import static org.apache.solr.cloud.OverseerCollectionProcessor.REQUESTID;
|
||||
import static org.apache.solr.cloud.OverseerCollectionProcessor.SHARDS_PROP;
|
||||
import static org.apache.solr.cloud.OverseerCollectionProcessor.SHARD_UNIQUE;
|
||||
import static org.apache.solr.common.cloud.DocCollection.DOC_ROUTER;
|
||||
import static org.apache.solr.common.cloud.ZkNodeProps.makeMap;
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.ACTIVE;
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.AUTO_ADD_REPLICAS;
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP;
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.ELECTION_NODE_PROP;
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.LEADER_PROP;
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.MAX_AT_ONCE_PROP;
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.MAX_SHARDS_PER_NODE;
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.MAX_WAIT_SECONDS_PROP;
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.NODE_NAME_PROP;
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.PROPERTY_PROP;
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.PROPERTY_VALUE_PROP;
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.REJOIN_AT_HEAD_PROP;
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.REPLICA_PROP;
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.STATE_PROP;
|
||||
import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDREPLICAPROP;
|
||||
import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDROLE;
|
||||
import static org.apache.solr.common.params.CollectionParams.CollectionAction.BALANCESHARDUNIQUE;
|
||||
import static org.apache.solr.common.params.CollectionParams.CollectionAction.CLUSTERPROP;
|
||||
import static org.apache.solr.common.params.CollectionParams.CollectionAction.CREATE;
|
||||
import static org.apache.solr.common.params.CollectionParams.CollectionAction.CREATEALIAS;
|
||||
import static org.apache.solr.common.params.CollectionParams.CollectionAction.CREATESHARD;
|
||||
import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETE;
|
||||
import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETEALIAS;
|
||||
import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETEREPLICA;
|
||||
import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETEREPLICAPROP;
|
||||
import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETESHARD;
|
||||
import static org.apache.solr.common.params.CollectionParams.CollectionAction.MIGRATE;
|
||||
import static org.apache.solr.common.params.CollectionParams.CollectionAction.OVERSEERSTATUS;
|
||||
import static org.apache.solr.common.params.CollectionParams.CollectionAction.REBALANCELEADERS;
|
||||
import static org.apache.solr.common.params.CollectionParams.CollectionAction.RELOAD;
|
||||
import static org.apache.solr.common.params.CollectionParams.CollectionAction.REMOVEROLE;
|
||||
import static org.apache.solr.common.params.CollectionParams.CollectionAction.SPLITSHARD;
|
||||
import static org.apache.solr.common.params.CommonParams.NAME;
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
|
||||
public class CollectionsHandler extends RequestHandlerBase {
|
||||
protected static Logger log = LoggerFactory.getLogger(CollectionsHandler.class);
|
||||
@ -357,7 +316,7 @@ public class CollectionsHandler extends RequestHandlerBase {
|
||||
}
|
||||
|
||||
// We're the preferred leader, but someone else is leader. Only become leader if we're active.
|
||||
if (StringUtils.equalsIgnoreCase(replica.getStr(STATE_PROP), ACTIVE) == false) {
|
||||
if (replica.getState() != Replica.State.ACTIVE) {
|
||||
NamedList<Object> inactives = (NamedList<Object>) results.get(inactivePreferreds);
|
||||
if (inactives == null) {
|
||||
inactives = new NamedList<>();
|
||||
|
@ -867,7 +867,7 @@ public class CoreAdminHandler extends RequestHandlerBase {
|
||||
|
||||
String nodeName = params.get("nodeName");
|
||||
String coreNodeName = params.get("coreNodeName");
|
||||
String waitForState = params.get("state");
|
||||
Replica.State waitForState = Replica.State.getState(params.get(ZkStateReader.STATE_PROP));
|
||||
Boolean checkLive = params.getBool("checkLive");
|
||||
Boolean onlyIfLeader = params.getBool("onlyIfLeader");
|
||||
Boolean onlyIfLeaderActive = params.getBool("onlyIfLeaderActive");
|
||||
@ -877,7 +877,7 @@ public class CoreAdminHandler extends RequestHandlerBase {
|
||||
+ ", onlyIfLeaderActive: "+onlyIfLeaderActive);
|
||||
|
||||
int maxTries = 0;
|
||||
String state = null;
|
||||
Replica.State state = null;
|
||||
boolean live = false;
|
||||
int retry = 0;
|
||||
while (true) {
|
||||
@ -918,41 +918,39 @@ public class CoreAdminHandler extends RequestHandlerBase {
|
||||
cloudDescriptor.getCollectionName() + ") have state: " + waitForState);
|
||||
}
|
||||
|
||||
ClusterState clusterState = coreContainer.getZkController()
|
||||
.getClusterState();
|
||||
ClusterState clusterState = coreContainer.getZkController().getClusterState();
|
||||
String collection = cloudDescriptor.getCollectionName();
|
||||
Slice slice = clusterState.getSlice(collection,
|
||||
cloudDescriptor.getShardId());
|
||||
Slice slice = clusterState.getSlice(collection, cloudDescriptor.getShardId());
|
||||
if (slice != null) {
|
||||
ZkNodeProps nodeProps = slice.getReplicasMap().get(coreNodeName);
|
||||
if (nodeProps != null) {
|
||||
state = nodeProps.getStr(ZkStateReader.STATE_PROP);
|
||||
final Replica replica = slice.getReplicasMap().get(coreNodeName);
|
||||
if (replica != null) {
|
||||
state = replica.getState();
|
||||
live = clusterState.liveNodesContain(nodeName);
|
||||
|
||||
String localState = cloudDescriptor.getLastPublished();
|
||||
final Replica.State localState = cloudDescriptor.getLastPublished();
|
||||
|
||||
// TODO: This is funky but I've seen this in testing where the replica asks the
|
||||
// leader to be in recovery? Need to track down how that happens ... in the meantime,
|
||||
// this is a safeguard
|
||||
boolean leaderDoesNotNeedRecovery = (onlyIfLeader != null &&
|
||||
onlyIfLeader &&
|
||||
core.getName().equals(nodeProps.getStr("core")) &&
|
||||
ZkStateReader.RECOVERING.equals(waitForState) &&
|
||||
ZkStateReader.ACTIVE.equals(localState) &&
|
||||
ZkStateReader.ACTIVE.equals(state));
|
||||
core.getName().equals(replica.getStr("core")) &&
|
||||
waitForState == Replica.State.RECOVERING &&
|
||||
localState == Replica.State.ACTIVE &&
|
||||
state == Replica.State.ACTIVE);
|
||||
|
||||
if (leaderDoesNotNeedRecovery) {
|
||||
log.warn("Leader "+core.getName()+" ignoring request to be in the recovering state because it is live and active.");
|
||||
}
|
||||
|
||||
boolean onlyIfActiveCheckResult = onlyIfLeaderActive != null && onlyIfLeaderActive && (localState == null || !localState.equals(ZkStateReader.ACTIVE));
|
||||
boolean onlyIfActiveCheckResult = onlyIfLeaderActive != null && onlyIfLeaderActive && localState != Replica.State.ACTIVE;
|
||||
log.info("In WaitForState("+waitForState+"): collection="+collection+", shard="+slice.getName()+
|
||||
", thisCore="+core.getName()+", leaderDoesNotNeedRecovery="+leaderDoesNotNeedRecovery+
|
||||
", isLeader? "+core.getCoreDescriptor().getCloudDescriptor().isLeader()+
|
||||
", live="+live+", checkLive="+checkLive+", currentState="+state+", localState="+localState+", nodeName="+nodeName+
|
||||
", coreNodeName="+coreNodeName+", onlyIfActiveCheckResult="+onlyIfActiveCheckResult+", nodeProps: "+nodeProps);
|
||||
", live="+live+", checkLive="+checkLive+", currentState="+state.toString()+", localState="+localState+", nodeName="+nodeName+
|
||||
", coreNodeName="+coreNodeName+", onlyIfActiveCheckResult="+onlyIfActiveCheckResult+", nodeProps: "+replica);
|
||||
|
||||
if (!onlyIfActiveCheckResult && nodeProps != null && (state.equals(waitForState) || leaderDoesNotNeedRecovery)) {
|
||||
if (!onlyIfActiveCheckResult && replica != null && (state == waitForState || leaderDoesNotNeedRecovery)) {
|
||||
if (checkLive == null) {
|
||||
break;
|
||||
} else if (checkLive && live) {
|
||||
@ -984,7 +982,7 @@ public class CoreAdminHandler extends RequestHandlerBase {
|
||||
"I was asked to wait on state " + waitForState + " for "
|
||||
+ shardId + " in " + collection + " on " + nodeName
|
||||
+ " but I still do not see the requested state. I see state: "
|
||||
+ state + " live:" + live + " leader from ZK: " + leaderInfo
|
||||
+ state.toString() + " live:" + live + " leader from ZK: " + leaderInfo
|
||||
);
|
||||
}
|
||||
|
||||
@ -1050,7 +1048,7 @@ public class CoreAdminHandler extends RequestHandlerBase {
|
||||
SolrException.log(log, "Replay failed");
|
||||
throw new SolrException(ErrorCode.SERVER_ERROR, "Replay failed");
|
||||
}
|
||||
coreContainer.getZkController().publish(core.getCoreDescriptor(), ZkStateReader.ACTIVE);
|
||||
coreContainer.getZkController().publish(core.getCoreDescriptor(), Replica.State.ACTIVE);
|
||||
rsp.add("core", cname);
|
||||
rsp.add("status", "BUFFER_APPLIED");
|
||||
} catch (InterruptedException e) {
|
||||
|
@ -16,36 +16,6 @@ package org.apache.solr.handler.component;
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import com.google.common.base.Strings;
|
||||
import org.apache.http.client.HttpClient;
|
||||
import org.apache.solr.client.solrj.SolrRequest;
|
||||
import org.apache.solr.client.solrj.SolrResponse;
|
||||
import org.apache.solr.client.solrj.SolrClient;
|
||||
import org.apache.solr.client.solrj.impl.HttpSolrClient;
|
||||
import org.apache.solr.client.solrj.impl.LBHttpSolrClient;
|
||||
import org.apache.solr.client.solrj.request.QueryRequest;
|
||||
import org.apache.solr.client.solrj.util.ClientUtils;
|
||||
import org.apache.solr.cloud.CloudDescriptor;
|
||||
import org.apache.solr.cloud.ZkController;
|
||||
import org.apache.solr.common.SolrException;
|
||||
import org.apache.solr.common.cloud.ClusterState;
|
||||
import org.apache.solr.common.cloud.DocCollection;
|
||||
import org.apache.solr.common.cloud.Replica;
|
||||
import org.apache.solr.common.cloud.Slice;
|
||||
import org.apache.solr.common.cloud.ZkCoreNodeProps;
|
||||
import org.apache.solr.common.cloud.ZkStateReader;
|
||||
import org.apache.solr.common.params.CommonParams;
|
||||
import org.apache.solr.common.params.ModifiableSolrParams;
|
||||
import org.apache.solr.common.params.ShardParams;
|
||||
import org.apache.solr.common.params.SolrParams;
|
||||
import org.apache.solr.common.util.NamedList;
|
||||
import org.apache.solr.common.util.StrUtils;
|
||||
import org.apache.solr.core.CoreDescriptor;
|
||||
import org.apache.solr.core.SolrCore;
|
||||
import org.apache.solr.request.SolrQueryRequest;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.net.ConnectException;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
@ -61,6 +31,36 @@ import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.http.client.HttpClient;
|
||||
import org.apache.solr.client.solrj.SolrClient;
|
||||
import org.apache.solr.client.solrj.SolrRequest;
|
||||
import org.apache.solr.client.solrj.SolrResponse;
|
||||
import org.apache.solr.client.solrj.impl.HttpSolrClient;
|
||||
import org.apache.solr.client.solrj.impl.LBHttpSolrClient;
|
||||
import org.apache.solr.client.solrj.request.QueryRequest;
|
||||
import org.apache.solr.client.solrj.util.ClientUtils;
|
||||
import org.apache.solr.cloud.CloudDescriptor;
|
||||
import org.apache.solr.cloud.ZkController;
|
||||
import org.apache.solr.common.SolrException;
|
||||
import org.apache.solr.common.cloud.ClusterState;
|
||||
import org.apache.solr.common.cloud.DocCollection;
|
||||
import org.apache.solr.common.cloud.Replica;
|
||||
import org.apache.solr.common.cloud.Slice;
|
||||
import org.apache.solr.common.cloud.ZkCoreNodeProps;
|
||||
import org.apache.solr.common.params.CommonParams;
|
||||
import org.apache.solr.common.params.ModifiableSolrParams;
|
||||
import org.apache.solr.common.params.ShardParams;
|
||||
import org.apache.solr.common.params.SolrParams;
|
||||
import org.apache.solr.common.util.NamedList;
|
||||
import org.apache.solr.common.util.StrUtils;
|
||||
import org.apache.solr.core.CoreDescriptor;
|
||||
import org.apache.solr.core.SolrCore;
|
||||
import org.apache.solr.request.SolrQueryRequest;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.google.common.base.Strings;
|
||||
|
||||
public class HttpShardHandler extends ShardHandler {
|
||||
|
||||
private HttpShardHandlerFactory httpShardHandlerFactory;
|
||||
@ -407,8 +407,7 @@ public class HttpShardHandler extends ShardHandler {
|
||||
String ourCollection = cloudDescriptor.getCollectionName();
|
||||
if (rb.slices.length == 1 && rb.slices[0] != null
|
||||
&& ( rb.slices[0].equals(ourSlice) || rb.slices[0].equals(ourCollection + "_" + ourSlice) ) // handle the <collection>_<slice> format
|
||||
&& ZkStateReader.ACTIVE.equals(cloudDescriptor.getLastPublished()) )
|
||||
{
|
||||
&& cloudDescriptor.getLastPublished() == Replica.State.ACTIVE) {
|
||||
boolean shortCircuit = params.getBool("shortCircuit", true); // currently just a debugging parameter to check distrib search on a single node
|
||||
|
||||
String targetHandler = params.get(ShardParams.SHARDS_QT);
|
||||
@ -448,8 +447,9 @@ public class HttpShardHandler extends ShardHandler {
|
||||
boolean first = true;
|
||||
for (Replica replica : sliceShards.values()) {
|
||||
if (!clusterState.liveNodesContain(replica.getNodeName())
|
||||
|| !replica.getStr(ZkStateReader.STATE_PROP).equals(
|
||||
ZkStateReader.ACTIVE)) continue;
|
||||
|| replica.getState() != Replica.State.ACTIVE) {
|
||||
continue;
|
||||
}
|
||||
if (first) {
|
||||
first = false;
|
||||
} else {
|
||||
|
@ -41,8 +41,8 @@ import org.apache.solr.common.SolrException;
|
||||
import org.apache.solr.common.SolrInputDocument;
|
||||
import org.apache.solr.common.cloud.ClusterState;
|
||||
import org.apache.solr.common.cloud.DocCollection;
|
||||
import org.apache.solr.common.cloud.Replica;
|
||||
import org.apache.solr.common.cloud.Slice;
|
||||
import org.apache.solr.common.cloud.ZkStateReader;
|
||||
import org.apache.solr.common.params.ModifiableSolrParams;
|
||||
import org.apache.solr.common.params.ShardParams;
|
||||
import org.apache.solr.common.params.SolrParams;
|
||||
@ -560,7 +560,7 @@ public class RealTimeGetComponent extends SearchComponent
|
||||
boolean onlyIfActive = rb.req.getParams().getBool("onlyIfActive", false);
|
||||
|
||||
if (onlyIfActive) {
|
||||
if (!rb.req.getCore().getCoreDescriptor().getCloudDescriptor().getLastPublished().equals(ZkStateReader.ACTIVE)) {
|
||||
if (rb.req.getCore().getCoreDescriptor().getCloudDescriptor().getLastPublished() != Replica.State.ACTIVE) {
|
||||
log.info("Last published state was not ACTIVE, cannot sync.");
|
||||
rb.rsp.add("sync", "false");
|
||||
return;
|
||||
|
@ -294,7 +294,7 @@ public final class ManagedIndexSchema extends IndexSchema {
|
||||
for (Map.Entry<String, Replica> entry : replicasMap.entrySet()) {
|
||||
Replica replica = entry.getValue();
|
||||
if (!localCoreNodeName.equals(replica.getName()) &&
|
||||
ZkStateReader.ACTIVE.equals(replica.getStr(ZkStateReader.STATE_PROP)) &&
|
||||
replica.getState() == Replica.State.ACTIVE &&
|
||||
liveNodes.contains(replica.getNodeName())) {
|
||||
ZkCoreNodeProps replicaCoreProps = new ZkCoreNodeProps(replica);
|
||||
activeReplicaCoreUrls.add(replicaCoreProps.getCoreUrl());
|
||||
|
@ -50,7 +50,6 @@ import org.apache.solr.common.SolrException;
|
||||
import org.apache.solr.common.cloud.Aliases;
|
||||
import org.apache.solr.common.cloud.Replica;
|
||||
import org.apache.solr.common.cloud.Slice;
|
||||
import org.apache.solr.common.cloud.ZkCoreNodeProps;
|
||||
import org.apache.solr.common.cloud.ZkStateReader;
|
||||
import org.apache.solr.common.params.SolrParams;
|
||||
import org.apache.solr.common.util.NamedList;
|
||||
@ -162,11 +161,10 @@ public class JoinQParserPlugin extends QParserPlugin {
|
||||
fromReplica = replica.getStr(ZkStateReader.CORE_NAME_PROP);
|
||||
|
||||
// found local replica, but is it Active?
|
||||
ZkCoreNodeProps replicaCoreProps = new ZkCoreNodeProps(replica);
|
||||
if (!ZkStateReader.ACTIVE.equals(replicaCoreProps.getState()))
|
||||
if (replica.getState() != Replica.State.ACTIVE)
|
||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
|
||||
"SolrCloud join: "+fromIndex+" has a local replica ("+fromReplica+
|
||||
") on "+nodeName+", but it is "+replicaCoreProps.getState());
|
||||
") on "+nodeName+", but it is "+replica.getState());
|
||||
|
||||
break;
|
||||
}
|
||||
|
@ -17,6 +17,32 @@
|
||||
|
||||
package org.apache.solr.servlet;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.EOFException;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.Enumeration;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.Properties;
|
||||
import java.util.Set;
|
||||
|
||||
import javax.servlet.FilterChain;
|
||||
import javax.servlet.FilterConfig;
|
||||
import javax.servlet.ServletException;
|
||||
import javax.servlet.ServletRequest;
|
||||
import javax.servlet.ServletResponse;
|
||||
import javax.servlet.http.HttpServletRequest;
|
||||
import javax.servlet.http.HttpServletResponse;
|
||||
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.apache.http.Header;
|
||||
@ -44,7 +70,6 @@ import org.apache.solr.common.cloud.ClusterState;
|
||||
import org.apache.solr.common.cloud.Replica;
|
||||
import org.apache.solr.common.cloud.Slice;
|
||||
import org.apache.solr.common.cloud.SolrZkClient;
|
||||
import org.apache.solr.common.cloud.ZkCoreNodeProps;
|
||||
import org.apache.solr.common.cloud.ZkNodeProps;
|
||||
import org.apache.solr.common.cloud.ZkStateReader;
|
||||
import org.apache.solr.common.params.CommonParams;
|
||||
@ -75,31 +100,6 @@ import org.apache.solr.update.processor.DistributingUpdateProcessorFactory;
|
||||
import org.apache.solr.util.RTimer;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import javax.servlet.FilterChain;
|
||||
import javax.servlet.FilterConfig;
|
||||
import javax.servlet.ServletException;
|
||||
import javax.servlet.ServletRequest;
|
||||
import javax.servlet.ServletResponse;
|
||||
import javax.servlet.http.HttpServletRequest;
|
||||
import javax.servlet.http.HttpServletResponse;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.EOFException;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.Enumeration;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.Properties;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
* This filter looks at the incoming URL maps them to handlers defined in solrconfig.xml
|
||||
@ -687,24 +687,23 @@ public class SolrDispatchFilter extends BaseSolrFilter {
|
||||
Set<String> liveNodes = clusterState.getLiveNodes();
|
||||
for (Slice slice : slices) {
|
||||
Map<String,Replica> sliceShards = slice.getReplicasMap();
|
||||
for (ZkNodeProps nodeProps : sliceShards.values()) {
|
||||
ZkCoreNodeProps coreNodeProps = new ZkCoreNodeProps(nodeProps);
|
||||
if (!activeReplicas || (liveNodes.contains(coreNodeProps.getNodeName())
|
||||
&& coreNodeProps.getState().equals(ZkStateReader.ACTIVE))) {
|
||||
for (Replica replica : sliceShards.values()) {
|
||||
if (!activeReplicas || (liveNodes.contains(replica.getNodeName())
|
||||
&& replica.getState() == Replica.State.ACTIVE)) {
|
||||
|
||||
if (byCoreName && !collectionName.equals(coreNodeProps.getCoreName())) {
|
||||
if (byCoreName && !collectionName.equals(replica.getStr(ZkStateReader.CORE_NAME_PROP))) {
|
||||
// if it's by core name, make sure they match
|
||||
continue;
|
||||
}
|
||||
if (coreNodeProps.getBaseUrl().equals(cores.getZkController().getBaseUrl())) {
|
||||
if (replica.getStr(ZkStateReader.BASE_URL_PROP).equals(cores.getZkController().getBaseUrl())) {
|
||||
// don't count a local core
|
||||
continue;
|
||||
}
|
||||
|
||||
if (origCorename != null) {
|
||||
coreUrl = coreNodeProps.getBaseUrl() + "/" + origCorename;
|
||||
coreUrl = replica.getStr(ZkStateReader.BASE_URL_PROP) + "/" + origCorename;
|
||||
} else {
|
||||
coreUrl = coreNodeProps.getCoreUrl();
|
||||
coreUrl = replica.getCoreUrl();
|
||||
if (coreUrl.endsWith("/")) {
|
||||
coreUrl = coreUrl.substring(0, coreUrl.length() - 1);
|
||||
}
|
||||
|
@ -43,6 +43,7 @@ import org.apache.solr.cloud.ZkController;
|
||||
import org.apache.solr.common.SolrException;
|
||||
import org.apache.solr.common.SolrException.ErrorCode;
|
||||
import org.apache.solr.common.cloud.OnReconnect;
|
||||
import org.apache.solr.common.cloud.Replica;
|
||||
import org.apache.solr.common.cloud.SolrZkClient;
|
||||
import org.apache.solr.common.cloud.ZkNodeProps;
|
||||
import org.apache.solr.common.cloud.ZkStateReader;
|
||||
@ -162,17 +163,17 @@ public final class ZookeeperInfoServlet extends BaseSolrServlet {
|
||||
Map<String,Object> replicas = (Map<String,Object>)shard.get("replicas");
|
||||
for (String replicaId : replicas.keySet()) {
|
||||
Map<String,Object> replicaState = (Map<String,Object>)replicas.get(replicaId);
|
||||
String coreState = (String)replicaState.get("state");
|
||||
Replica.State coreState = Replica.State.getState((String)replicaState.get(ZkStateReader.STATE_PROP));
|
||||
String nodeName = (String)replicaState.get("node_name");
|
||||
|
||||
// state can lie to you if the node is offline, so need to reconcile with live_nodes too
|
||||
if (!liveNodes.contains(nodeName))
|
||||
coreState = ZkStateReader.DOWN; // not on a live node, so must be down
|
||||
coreState = Replica.State.DOWN; // not on a live node, so must be down
|
||||
|
||||
if (ZkStateReader.ACTIVE.equals(coreState)) {
|
||||
if (coreState == Replica.State.ACTIVE) {
|
||||
hasActive = true; // assumed no replicas active and found one that is for this shard
|
||||
} else {
|
||||
if (ZkStateReader.RECOVERING.equals(coreState)) {
|
||||
if (coreState == Replica.State.RECOVERING) {
|
||||
replicaInRecovery = true;
|
||||
}
|
||||
isHealthy = false; // assumed healthy and found one replica that is not
|
||||
@ -189,7 +190,7 @@ public final class ZookeeperInfoServlet extends BaseSolrServlet {
|
||||
return !hasDownedShard && !isHealthy; // means no shards offline but not 100% healthy either
|
||||
} else if ("downed_shard".equals(filter)) {
|
||||
return hasDownedShard;
|
||||
} else if (ZkStateReader.RECOVERING.equals(filter)) {
|
||||
} else if (Replica.State.getState(filter) == Replica.State.RECOVERING) {
|
||||
return !isHealthy && replicaInRecovery;
|
||||
}
|
||||
|
||||
|
@ -372,7 +372,8 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
|
||||
slice = coll.getSlice(myShardId);
|
||||
shardId = myShardId;
|
||||
leaderReplica = zkController.getZkStateReader().getLeaderRetry(collection, myShardId);
|
||||
List<ZkCoreNodeProps> myReplicas = zkController.getZkStateReader().getReplicaProps(collection, shardId, leaderReplica.getName(), null, ZkStateReader.DOWN);
|
||||
List<ZkCoreNodeProps> myReplicas = zkController.getZkStateReader()
|
||||
.getReplicaProps(collection, shardId, leaderReplica.getName(), null, Replica.State.DOWN);
|
||||
}
|
||||
}
|
||||
|
||||
@ -390,7 +391,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
|
||||
// so get the replicas...
|
||||
forwardToLeader = false;
|
||||
List<ZkCoreNodeProps> replicaProps = zkController.getZkStateReader()
|
||||
.getReplicaProps(collection, shardId, leaderReplica.getName(), null, ZkStateReader.DOWN);
|
||||
.getReplicaProps(collection, shardId, leaderReplica.getName(), null, Replica.State.DOWN);
|
||||
|
||||
if (replicaProps != null) {
|
||||
if (nodes == null) {
|
||||
@ -1391,7 +1392,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
|
||||
Replica leaderReplica = zkController.getZkStateReader().getLeaderRetry(
|
||||
collection, myShardId);
|
||||
List<ZkCoreNodeProps> replicaProps = zkController.getZkStateReader()
|
||||
.getReplicaProps(collection, myShardId, leaderReplica.getName(), null, ZkStateReader.DOWN);
|
||||
.getReplicaProps(collection, myShardId, leaderReplica.getName(), null, Replica.State.DOWN);
|
||||
if (replicaProps != null) {
|
||||
List<Node> myReplicas = new ArrayList<>();
|
||||
for (ZkCoreNodeProps replicaProp : replicaProps) {
|
||||
|
@ -867,7 +867,7 @@ public class SolrCLI {
|
||||
if (replicaHealth.isLeader)
|
||||
hasLeader = true;
|
||||
|
||||
if (!ZkStateReader.ACTIVE.equals(replicaHealth.status)) {
|
||||
if (!Replica.State.ACTIVE.toString().equals(replicaHealth.status)) {
|
||||
healthy = false;
|
||||
} else {
|
||||
atLeastOneActive = true;
|
||||
@ -964,7 +964,7 @@ public class SolrCLI {
|
||||
// if replica's node is not live, its status is DOWN
|
||||
String nodeName = replicaCoreProps.getNodeName();
|
||||
if (nodeName == null || !liveNodes.contains(nodeName)) {
|
||||
replicaStatus = ZkStateReader.DOWN;
|
||||
replicaStatus = Replica.State.DOWN.toString();
|
||||
} else {
|
||||
// query this replica directly to get doc count and assess health
|
||||
q = new SolrQuery("*:*");
|
||||
@ -991,7 +991,7 @@ public class SolrCLI {
|
||||
log.error("ERROR: " + exc + " when trying to reach: " + coreUrl);
|
||||
|
||||
if (checkCommunicationError(exc)) {
|
||||
replicaStatus = "down";
|
||||
replicaStatus = Replica.State.DOWN.toString();
|
||||
} else {
|
||||
replicaStatus = "error: "+exc;
|
||||
}
|
||||
|
@ -65,7 +65,7 @@ public class AssignTest extends SolrTestCaseJ4 {
|
||||
Map<String,Replica> replicas = new HashMap<>();
|
||||
|
||||
ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, "state",
|
||||
ZkStateReader.STATE_PROP, "ACTIVE",
|
||||
ZkStateReader.STATE_PROP, Replica.State.ACTIVE.toString(),
|
||||
ZkStateReader.BASE_URL_PROP, "0.0.0.0",
|
||||
ZkStateReader.CORE_NAME_PROP, "core1",
|
||||
ZkStateReader.ROLES_PROP, null,
|
||||
|
@ -209,9 +209,7 @@ public class ChaosMonkeyShardSplitTest extends ShardSplitTest {
|
||||
Collection<Replica> replicas = slice.getReplicas();
|
||||
boolean allActive = true;
|
||||
for (Replica replica : replicas) {
|
||||
if (!clusterState.liveNodesContain(replica.getNodeName())
|
||||
|| !replica.get(ZkStateReader.STATE_PROP).equals(
|
||||
ZkStateReader.ACTIVE)) {
|
||||
if (!clusterState.liveNodesContain(replica.getNodeName()) || replica.getState() != Replica.State.ACTIVE) {
|
||||
allActive = false;
|
||||
break;
|
||||
}
|
||||
|
@ -282,7 +282,9 @@ public class CustomCollectionTest extends AbstractFullDistribZkTestBase {
|
||||
attempts++;
|
||||
int activeReplicaCount = 0;
|
||||
for (Replica x : zkStateReader.getClusterState().getCollection(collectionName).getSlice("x").getReplicas()) {
|
||||
if(ZkStateReader.ACTIVE.equals(x.getStr(ZkStateReader.STATE_PROP))) activeReplicaCount++;
|
||||
if (x.getState() == Replica.State.ACTIVE) {
|
||||
activeReplicaCount++;
|
||||
}
|
||||
}
|
||||
Thread.sleep(500);
|
||||
if(activeReplicaCount >= replicationFactor) break;
|
||||
|
@ -107,8 +107,7 @@ public class DeleteInactiveReplicaTest extends AbstractFullDistribZkTestBase{
|
||||
while (System.currentTimeMillis() < endAt) {
|
||||
testcoll = client.getZkStateReader()
|
||||
.getClusterState().getCollection(collectionName);
|
||||
if (!ZkStateReader.ACTIVE.equals(testcoll.getSlice(shard1.getName())
|
||||
.getReplica(replica1.getName()).getStr(ZkStateReader.STATE_PROP))) {
|
||||
if (testcoll.getSlice(shard1.getName()).getReplica(replica1.getName()).getState() != Replica.State.ACTIVE) {
|
||||
success = true;
|
||||
}
|
||||
if (success) break;
|
||||
|
@ -25,6 +25,7 @@ import org.apache.solr.common.cloud.DocCollection;
|
||||
import org.apache.solr.common.cloud.ImplicitDocRouter;
|
||||
import org.apache.solr.common.cloud.Replica;
|
||||
import org.apache.solr.common.cloud.ZkNodeProps;
|
||||
import org.apache.solr.common.cloud.ZkStateReader;
|
||||
import org.apache.solr.common.params.MapSolrParams;
|
||||
import org.apache.solr.common.params.SolrParams;
|
||||
import org.junit.BeforeClass;
|
||||
@ -36,9 +37,7 @@ import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.apache.solr.cloud.OverseerCollectionProcessor.MAX_SHARDS_PER_NODE;
|
||||
import static org.apache.solr.cloud.OverseerCollectionProcessor.NUM_SLICES;
|
||||
import static org.apache.solr.cloud.OverseerCollectionProcessor.REPLICATION_FACTOR;
|
||||
import static org.apache.solr.cloud.OverseerCollectionProcessor.SHARDS_PROP;
|
||||
import static org.apache.solr.common.cloud.ZkNodeProps.makeMap;
|
||||
import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETEREPLICA;
|
||||
@ -83,8 +82,8 @@ public class DeleteLastCustomShardedReplicaTest extends AbstractFullDistribZkTes
|
||||
|
||||
Map<String, Object> props = ZkNodeProps.makeMap(
|
||||
"router.name", ImplicitDocRouter.NAME,
|
||||
REPLICATION_FACTOR, replicationFactor,
|
||||
MAX_SHARDS_PER_NODE, maxShardsPerNode,
|
||||
ZkStateReader.REPLICATION_FACTOR, replicationFactor,
|
||||
ZkStateReader.MAX_SHARDS_PER_NODE, maxShardsPerNode,
|
||||
NUM_SLICES, 1,
|
||||
SHARDS_PROP,"a,b");
|
||||
|
||||
|
@ -95,7 +95,7 @@ public class DeleteReplicaTest extends AbstractFullDistribZkTestBase {
|
||||
if (slice.getState() == Slice.State.ACTIVE) {
|
||||
shard1 = slice;
|
||||
for (Replica replica : shard1.getReplicas()) {
|
||||
if (ZkStateReader.ACTIVE.equals(replica.getStr(ZkStateReader.STATE_PROP))) {
|
||||
if (replica.getState() == Replica.State.ACTIVE) {
|
||||
replica1 = replica;
|
||||
break;
|
||||
}
|
||||
|
@ -129,8 +129,7 @@ public class HttpPartitionTest extends AbstractFullDistribZkTestBase {
|
||||
createCollectionRetry(testCollectionName, 1, 2, 1);
|
||||
cloudClient.setDefaultCollection(testCollectionName);
|
||||
|
||||
Replica leader =
|
||||
cloudClient.getZkStateReader().getLeaderRetry(testCollectionName, shardId);
|
||||
Replica leader = cloudClient.getZkStateReader().getLeaderRetry(testCollectionName, shardId);
|
||||
JettySolrRunner leaderJetty = getJettyOnPort(getReplicaPort(leader));
|
||||
|
||||
CoreContainer cores = ((SolrDispatchFilter)leaderJetty.getDispatchFilter().getFilter()).getCores();
|
||||
@ -148,7 +147,7 @@ public class HttpPartitionTest extends AbstractFullDistribZkTestBase {
|
||||
assertTrue(zkController.isReplicaInRecoveryHandling(replicaUrl));
|
||||
Map<String,Object> lirStateMap = zkController.getLeaderInitiatedRecoveryStateObject(testCollectionName, shardId, notLeader.getName());
|
||||
assertNotNull(lirStateMap);
|
||||
assertEquals(ZkStateReader.DOWN, lirStateMap.get("state"));
|
||||
assertSame(Replica.State.DOWN, Replica.State.getState((String) lirStateMap.get(ZkStateReader.STATE_PROP)));
|
||||
zkController.removeReplicaFromLeaderInitiatedRecoveryHandling(replicaUrl);
|
||||
assertTrue(!zkController.isReplicaInRecoveryHandling(replicaUrl));
|
||||
|
||||
@ -158,7 +157,7 @@ public class HttpPartitionTest extends AbstractFullDistribZkTestBase {
|
||||
zkClient.setData(znodePath, "down".getBytes(StandardCharsets.UTF_8), true);
|
||||
lirStateMap = zkController.getLeaderInitiatedRecoveryStateObject(testCollectionName, shardId, notLeader.getName());
|
||||
assertNotNull(lirStateMap);
|
||||
assertEquals(ZkStateReader.DOWN, lirStateMap.get("state"));
|
||||
assertSame(Replica.State.DOWN, Replica.State.getState((String) lirStateMap.get(ZkStateReader.STATE_PROP)));
|
||||
zkClient.delete(znodePath, -1, false);
|
||||
|
||||
// try to clean up
|
||||
@ -425,8 +424,8 @@ public class HttpPartitionTest extends AbstractFullDistribZkTestBase {
|
||||
for (Slice shard : cs.getActiveSlices(testCollectionName)) {
|
||||
if (shard.getName().equals(shardId)) {
|
||||
for (Replica replica : shard.getReplicas()) {
|
||||
String replicaState = replica.getStr(ZkStateReader.STATE_PROP);
|
||||
if (ZkStateReader.ACTIVE.equals(replicaState) || ZkStateReader.RECOVERING.equals(replicaState)) {
|
||||
final Replica.State state = replica.getState();
|
||||
if (state == Replica.State.ACTIVE || state == Replica.State.RECOVERING) {
|
||||
activeReplicas.put(replica.getName(), replica);
|
||||
}
|
||||
}
|
||||
@ -529,9 +528,9 @@ public class HttpPartitionTest extends AbstractFullDistribZkTestBase {
|
||||
if (!replicasToCheck.contains(replica.getName()))
|
||||
continue;
|
||||
|
||||
String replicaState = replica.getStr(ZkStateReader.STATE_PROP);
|
||||
if (!ZkStateReader.ACTIVE.equals(replicaState)) {
|
||||
log.info("Replica " + replica.getName() + " is currently " + replicaState);
|
||||
final Replica.State state = replica.getState();
|
||||
if (state != Replica.State.ACTIVE) {
|
||||
log.info("Replica " + replica.getName() + " is currently " + state);
|
||||
allReplicasUp = false;
|
||||
}
|
||||
}
|
||||
|
@ -93,7 +93,7 @@ public class LeaderInitiatedRecoveryOnCommitTest extends BasicDistributedZkTest
|
||||
|
||||
cloudClient.getZkStateReader().updateClusterState(true); // get the latest state
|
||||
leader = cloudClient.getZkStateReader().getLeaderRetry(testCollectionName, "shard1");
|
||||
assertEquals("Leader was not active", ZkStateReader.ACTIVE, leader.getStr(ZkStateReader.STATE_PROP));
|
||||
assertSame("Leader was not active", Replica.State.ACTIVE, leader.getState());
|
||||
|
||||
leaderProxy.reopen();
|
||||
Thread.sleep(sleepMsBeforeHealPartition);
|
||||
@ -136,7 +136,7 @@ public class LeaderInitiatedRecoveryOnCommitTest extends BasicDistributedZkTest
|
||||
|
||||
cloudClient.getZkStateReader().updateClusterState(true); // get the latest state
|
||||
leader = cloudClient.getZkStateReader().getLeaderRetry(testCollectionName, "shard1");
|
||||
assertEquals("Leader was not active", ZkStateReader.ACTIVE, leader.getStr(ZkStateReader.STATE_PROP));
|
||||
assertSame("Leader was not active", Replica.State.ACTIVE, leader.getState());
|
||||
|
||||
leaderProxy.reopen();
|
||||
Thread.sleep(sleepMsBeforeHealPartition);
|
||||
|
@ -17,6 +17,23 @@ package org.apache.solr.cloud;
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import javax.xml.parsers.ParserConfigurationException;
|
||||
|
||||
import org.apache.lucene.util.LuceneTestCase.Slow;
|
||||
import org.apache.solr.SolrTestCaseJ4;
|
||||
import org.apache.solr.cloud.overseer.OverseerAction;
|
||||
@ -47,22 +64,6 @@ import org.junit.Ignore;
|
||||
import org.junit.Test;
|
||||
import org.xml.sax.SAXException;
|
||||
|
||||
import javax.xml.parsers.ParserConfigurationException;
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
@Slow
|
||||
public class OverseerTest extends SolrTestCaseJ4 {
|
||||
|
||||
@ -116,7 +117,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
|
||||
zkClient.close();
|
||||
}
|
||||
|
||||
public String publishState(String collection, String coreName, String coreNodeName, String stateName, int numShards)
|
||||
public String publishState(String collection, String coreName, String coreNodeName, Replica.State stateName, int numShards)
|
||||
throws KeeperException, InterruptedException, IOException {
|
||||
if (stateName == null) {
|
||||
ElectionContext ec = electionContext.remove(coreName);
|
||||
@ -133,14 +134,13 @@ public class OverseerTest extends SolrTestCaseJ4 {
|
||||
return null;
|
||||
} else {
|
||||
ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, OverseerAction.STATE.toLower(),
|
||||
ZkStateReader.STATE_PROP, stateName,
|
||||
ZkStateReader.STATE_PROP, stateName.toString(),
|
||||
ZkStateReader.NODE_NAME_PROP, nodeName,
|
||||
ZkStateReader.CORE_NAME_PROP, coreName,
|
||||
ZkStateReader.CORE_NODE_NAME_PROP, coreNodeName,
|
||||
ZkStateReader.COLLECTION_PROP, collection,
|
||||
ZkStateReader.NUM_SHARDS_PROP, Integer.toString(numShards),
|
||||
ZkStateReader.BASE_URL_PROP, "http://" + nodeName
|
||||
+ "/solr/");
|
||||
ZkStateReader.BASE_URL_PROP, "http://" + nodeName + "/solr/");
|
||||
DistributedQueue q = Overseer.getInQueue(zkClient);
|
||||
q.offer(ZkStateReader.toJSON(m));
|
||||
}
|
||||
@ -250,7 +250,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
|
||||
final int numShards=6;
|
||||
|
||||
for (int i = 0; i < numShards; i++) {
|
||||
assertNotNull("shard got no id?", zkController.publishState(collection, "core" + (i+1), "node" + (i+1), ZkStateReader.ACTIVE, 3));
|
||||
assertNotNull("shard got no id?", zkController.publishState(collection, "core" + (i+1), "node" + (i+1), Replica.State.ACTIVE, 3));
|
||||
}
|
||||
Map<String,Replica> rmap = reader.getClusterState().getSlice("collection1", "shard1").getReplicasMap();
|
||||
assertEquals(rmap.toString(), 2, rmap.size());
|
||||
@ -305,7 +305,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
|
||||
final int numShards=3;
|
||||
|
||||
for (int i = 0; i < numShards; i++) {
|
||||
assertNotNull("shard got no id?", zkController.publishState(collection, "core" + (i+1), "node" + (i+1), ZkStateReader.ACTIVE, 3));
|
||||
assertNotNull("shard got no id?", zkController.publishState(collection, "core" + (i+1), "node" + (i+1), Replica.State.ACTIVE, 3));
|
||||
}
|
||||
|
||||
assertEquals(1, reader.getClusterState().getSlice(collection, "shard1").getReplicasMap().size());
|
||||
@ -319,12 +319,12 @@ public class OverseerTest extends SolrTestCaseJ4 {
|
||||
|
||||
// publish a bad queue item
|
||||
String emptyCollectionName = "";
|
||||
zkController.publishState(emptyCollectionName, "core0", "node0", ZkStateReader.ACTIVE, 1);
|
||||
zkController.publishState(emptyCollectionName, "core0", "node0", Replica.State.ACTIVE, 1);
|
||||
zkController.publishState(emptyCollectionName, "core0", "node0", null, 1);
|
||||
|
||||
// make sure the Overseer is still processing items
|
||||
for (int i = 0; i < numShards; i++) {
|
||||
assertNotNull("shard got no id?", zkController.publishState("collection2", "core" + (i+1), "node" + (i+1), ZkStateReader.ACTIVE, 3));
|
||||
assertNotNull("shard got no id?", zkController.publishState("collection2", "core" + (i+1), "node" + (i+1), Replica.State.ACTIVE, 3));
|
||||
}
|
||||
|
||||
assertEquals(1, reader.getClusterState().getSlice("collection2", "shard1").getReplicasMap().size());
|
||||
@ -398,7 +398,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
|
||||
final String coreName = "core" + slot;
|
||||
|
||||
try {
|
||||
ids[slot]=controllers[slot % nodeCount].publishState(collection, coreName, "node" + slot, ZkStateReader.ACTIVE, sliceCount);
|
||||
ids[slot]=controllers[slot % nodeCount].publishState(collection, coreName, "node" + slot, Replica.State.ACTIVE, sliceCount);
|
||||
} catch (Throwable e) {
|
||||
e.printStackTrace();
|
||||
fail("register threw exception:" + e.getClass());
|
||||
@ -552,15 +552,14 @@ public class OverseerTest extends SolrTestCaseJ4 {
|
||||
ZkStateReader.COLLECTION_PROP, "collection1",
|
||||
ZkStateReader.CORE_NAME_PROP, "core1",
|
||||
ZkStateReader.ROLES_PROP, "",
|
||||
ZkStateReader.STATE_PROP, ZkStateReader.RECOVERING);
|
||||
ZkStateReader.STATE_PROP, Replica.State.RECOVERING.toString());
|
||||
|
||||
q.offer(ZkStateReader.toJSON(m));
|
||||
|
||||
waitForCollections(reader, "collection1");
|
||||
|
||||
assertEquals(reader.getClusterState().toString(), ZkStateReader.RECOVERING,
|
||||
reader.getClusterState().getSlice("collection1", "shard1").getReplicasMap()
|
||||
.get("core_node1").getStr(ZkStateReader.STATE_PROP));
|
||||
assertSame(reader.getClusterState().toString(), Replica.State.RECOVERING,
|
||||
reader.getClusterState().getSlice("collection1", "shard1").getReplica("core_node1").getState());
|
||||
|
||||
//publish node state (active)
|
||||
m = new ZkNodeProps(Overseer.QUEUE_OPERATION, "state",
|
||||
@ -569,11 +568,11 @@ public class OverseerTest extends SolrTestCaseJ4 {
|
||||
ZkStateReader.COLLECTION_PROP, "collection1",
|
||||
ZkStateReader.CORE_NAME_PROP, "core1",
|
||||
ZkStateReader.ROLES_PROP, "",
|
||||
ZkStateReader.STATE_PROP, ZkStateReader.ACTIVE);
|
||||
ZkStateReader.STATE_PROP, Replica.State.ACTIVE.toString());
|
||||
|
||||
q.offer(ZkStateReader.toJSON(m));
|
||||
|
||||
verifyStatus(reader, ZkStateReader.ACTIVE);
|
||||
verifyStatus(reader, Replica.State.ACTIVE);
|
||||
|
||||
} finally {
|
||||
|
||||
@ -585,14 +584,14 @@ public class OverseerTest extends SolrTestCaseJ4 {
|
||||
}
|
||||
}
|
||||
|
||||
private void verifyStatus(ZkStateReader reader, String expectedState) throws InterruptedException {
|
||||
private void verifyStatus(ZkStateReader reader, Replica.State expectedState) throws InterruptedException {
|
||||
int maxIterations = 100;
|
||||
String coreState = null;
|
||||
Replica.State coreState = null;
|
||||
while(maxIterations-->0) {
|
||||
Slice slice = reader.getClusterState().getSlice("collection1", "shard1");
|
||||
if(slice!=null) {
|
||||
coreState = slice.getReplicasMap().get("core_node1").getStr(ZkStateReader.STATE_PROP);
|
||||
if(coreState.equals(expectedState)) {
|
||||
coreState = slice.getReplicasMap().get("core_node1").getState();
|
||||
if(coreState == expectedState) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
@ -646,33 +645,30 @@ public class OverseerTest extends SolrTestCaseJ4 {
|
||||
overseerClient = electNewOverseer(server.getZkAddress());
|
||||
|
||||
Thread.sleep(1000);
|
||||
mockController.publishState(collection, "core1", "core_node1",
|
||||
ZkStateReader.RECOVERING, 1);
|
||||
mockController.publishState(collection, "core1", "core_node1", Replica.State.RECOVERING, 1);
|
||||
|
||||
waitForCollections(reader, collection);
|
||||
verifyStatus(reader, ZkStateReader.RECOVERING);
|
||||
verifyStatus(reader, Replica.State.RECOVERING);
|
||||
|
||||
int version = getClusterStateVersion(zkClient);
|
||||
|
||||
mockController.publishState(collection, "core1", "core_node1", ZkStateReader.ACTIVE,
|
||||
1);
|
||||
mockController.publishState(collection, "core1", "core_node1", Replica.State.ACTIVE, 1);
|
||||
|
||||
while (version == getClusterStateVersion(zkClient));
|
||||
|
||||
verifyStatus(reader, ZkStateReader.ACTIVE);
|
||||
verifyStatus(reader, Replica.State.ACTIVE);
|
||||
version = getClusterStateVersion(zkClient);
|
||||
overseerClient.close();
|
||||
Thread.sleep(1000); // wait for overseer to get killed
|
||||
|
||||
mockController.publishState(collection, "core1", "core_node1",
|
||||
ZkStateReader.RECOVERING, 1);
|
||||
mockController.publishState(collection, "core1", "core_node1", Replica.State.RECOVERING, 1);
|
||||
version = getClusterStateVersion(zkClient);
|
||||
|
||||
overseerClient = electNewOverseer(server.getZkAddress());
|
||||
|
||||
while (version == getClusterStateVersion(zkClient));
|
||||
|
||||
verifyStatus(reader, ZkStateReader.RECOVERING);
|
||||
verifyStatus(reader, Replica.State.RECOVERING);
|
||||
|
||||
assertEquals("Live nodes count does not match", 1, reader
|
||||
.getClusterState().getLiveNodes().size());
|
||||
@ -765,16 +761,16 @@ public class OverseerTest extends SolrTestCaseJ4 {
|
||||
for (int i = 0; i < atLeast(4); i++) {
|
||||
killCounter.incrementAndGet(); //for each round allow 1 kill
|
||||
mockController = new MockZKController(server.getZkAddress(), "node1");
|
||||
mockController.publishState(collection, "core1", "node1", "state1",1);
|
||||
mockController.publishState(collection, "core1", "node1", Replica.State.ACTIVE,1);
|
||||
if(mockController2!=null) {
|
||||
mockController2.close();
|
||||
mockController2 = null;
|
||||
}
|
||||
mockController.publishState(collection, "core1", "node1","state2",1);
|
||||
mockController.publishState(collection, "core1", "node1",Replica.State.RECOVERING,1);
|
||||
mockController2 = new MockZKController(server.getZkAddress(), "node2");
|
||||
mockController.publishState(collection, "core1", "node1", "state1",1);
|
||||
mockController.publishState(collection, "core1", "node1", Replica.State.ACTIVE,1);
|
||||
verifyShardLeader(reader, "collection1", "shard1", "core1");
|
||||
mockController2.publishState(collection, "core4", "node2", "state2" ,1);
|
||||
mockController2.publishState(collection, "core4", "node2", Replica.State.ACTIVE ,1);
|
||||
mockController.close();
|
||||
mockController = null;
|
||||
verifyShardLeader(reader, "collection1", "shard1", "core4");
|
||||
@ -820,18 +816,18 @@ public class OverseerTest extends SolrTestCaseJ4 {
|
||||
|
||||
overseerClient = electNewOverseer(server.getZkAddress());
|
||||
|
||||
mockController.publishState(collection, "core1", "core_node1", ZkStateReader.RECOVERING, 1);
|
||||
mockController.publishState(collection, "core1", "core_node1", Replica.State.RECOVERING, 1);
|
||||
|
||||
waitForCollections(reader, "collection1");
|
||||
|
||||
verifyStatus(reader, ZkStateReader.RECOVERING);
|
||||
verifyStatus(reader, Replica.State.RECOVERING);
|
||||
|
||||
mockController.close();
|
||||
|
||||
int version = getClusterStateVersion(controllerClient);
|
||||
|
||||
mockController = new MockZKController(server.getZkAddress(), "node1");
|
||||
mockController.publishState(collection, "core1", "core_node1", ZkStateReader.RECOVERING, 1);
|
||||
mockController.publishState(collection, "core1", "core_node1", Replica.State.RECOVERING, 1);
|
||||
|
||||
while (version == getClusterStateVersion(controllerClient));
|
||||
|
||||
@ -884,7 +880,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
|
||||
|
||||
overseerClient = electNewOverseer(server.getZkAddress());
|
||||
|
||||
mockController.publishState(collection, "core1", "node1", ZkStateReader.RECOVERING, 12);
|
||||
mockController.publishState(collection, "core1", "node1", Replica.State.RECOVERING, 12);
|
||||
|
||||
waitForCollections(reader, "collection1");
|
||||
|
||||
@ -941,7 +937,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
|
||||
|
||||
for (int i = 0, j = 0, k = 0; i < MAX_STATE_CHANGES; i++, j++, k++) {
|
||||
ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, OverseerAction.STATE.toLower(),
|
||||
ZkStateReader.STATE_PROP, ZkStateReader.RECOVERING,
|
||||
ZkStateReader.STATE_PROP, Replica.State.RECOVERING.toString(),
|
||||
ZkStateReader.NODE_NAME_PROP, "node1",
|
||||
ZkStateReader.CORE_NAME_PROP, "core" + k,
|
||||
ZkStateReader.CORE_NODE_NAME_PROP, "node1",
|
||||
@ -958,7 +954,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
|
||||
|
||||
// let's publish a sentinel collection which we'll use to wait for overseer to complete operations
|
||||
ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, OverseerAction.STATE.toLower(),
|
||||
ZkStateReader.STATE_PROP, ZkStateReader.ACTIVE,
|
||||
ZkStateReader.STATE_PROP, Replica.State.ACTIVE.toString(),
|
||||
ZkStateReader.NODE_NAME_PROP, "node1",
|
||||
ZkStateReader.CORE_NAME_PROP, "core1",
|
||||
ZkStateReader.CORE_NODE_NAME_PROP, "node1",
|
||||
@ -1062,7 +1058,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
|
||||
ZkStateReader.COLLECTION_PROP, "collection1",
|
||||
ZkStateReader.CORE_NAME_PROP, "core1",
|
||||
ZkStateReader.ROLES_PROP, "",
|
||||
ZkStateReader.STATE_PROP, ZkStateReader.RECOVERING);
|
||||
ZkStateReader.STATE_PROP, Replica.State.RECOVERING.toString());
|
||||
queue.offer(ZkStateReader.toJSON(m));
|
||||
m = new ZkNodeProps(Overseer.QUEUE_OPERATION, "state",
|
||||
ZkStateReader.BASE_URL_PROP, "http://127.0.0.1/solr",
|
||||
@ -1071,7 +1067,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
|
||||
ZkStateReader.COLLECTION_PROP, "collection1",
|
||||
ZkStateReader.CORE_NAME_PROP, "core2",
|
||||
ZkStateReader.ROLES_PROP, "",
|
||||
ZkStateReader.STATE_PROP, ZkStateReader.RECOVERING);
|
||||
ZkStateReader.STATE_PROP, Replica.State.RECOVERING.toString());
|
||||
queue.offer(ZkStateReader.toJSON(m));
|
||||
|
||||
overseerClient = electNewOverseer(server.getZkAddress());
|
||||
@ -1085,7 +1081,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
|
||||
ZkStateReader.COLLECTION_PROP, "collection1",
|
||||
ZkStateReader.CORE_NAME_PROP, "core3",
|
||||
ZkStateReader.ROLES_PROP, "",
|
||||
ZkStateReader.STATE_PROP, ZkStateReader.RECOVERING);
|
||||
ZkStateReader.STATE_PROP, Replica.State.RECOVERING.toString());
|
||||
queue.offer(ZkStateReader.toJSON(m));
|
||||
|
||||
for(int i=0;i<100;i++) {
|
||||
|
@ -219,7 +219,7 @@ public class SharedFSAutoReplicaFailoverTest extends AbstractFullDistribZkTestBa
|
||||
for (Slice slice : slices) {
|
||||
for (Replica replica : slice.getReplicas()) {
|
||||
boolean live = cloudClient.getZkStateReader().getClusterState().liveNodesContain(replica.getNodeName());
|
||||
boolean active = replica.getStr(ZkStateReader.STATE_PROP).equals(ZkStateReader.ACTIVE);
|
||||
boolean active = replica.getState() == Replica.State.ACTIVE;
|
||||
if (live && active) {
|
||||
liveAndActive++;
|
||||
}
|
||||
|
@ -298,22 +298,22 @@ public class SharedFSAutoReplicaFailoverUtilsTest extends SolrTestCaseJ4 {
|
||||
node = "1";
|
||||
}
|
||||
|
||||
String state = ZkStateReader.ACTIVE;
|
||||
Replica.State state = Replica.State.ACTIVE;
|
||||
String stateCode = m.group(3);
|
||||
|
||||
if (stateCode != null) {
|
||||
switch (stateCode.charAt(0)) {
|
||||
case 'S':
|
||||
state = ZkStateReader.ACTIVE;
|
||||
state = Replica.State.ACTIVE;
|
||||
break;
|
||||
case 'R':
|
||||
state = ZkStateReader.RECOVERING;
|
||||
state = Replica.State.RECOVERING;
|
||||
break;
|
||||
case 'D':
|
||||
state = ZkStateReader.DOWN;
|
||||
state = Replica.State.DOWN;
|
||||
break;
|
||||
case 'F':
|
||||
state = ZkStateReader.RECOVERY_FAILED;
|
||||
state = Replica.State.RECOVERY_FAILED;
|
||||
break;
|
||||
default:
|
||||
throw new IllegalArgumentException(
|
||||
@ -330,7 +330,7 @@ public class SharedFSAutoReplicaFailoverUtilsTest extends SolrTestCaseJ4 {
|
||||
|
||||
replicaPropMap.put(ZkStateReader.NODE_NAME_PROP, nodeName);
|
||||
replicaPropMap.put(ZkStateReader.BASE_URL_PROP, "http://baseUrl" + node);
|
||||
replicaPropMap.put(ZkStateReader.STATE_PROP, state);
|
||||
replicaPropMap.put(ZkStateReader.STATE_PROP, state.toString());
|
||||
|
||||
replica = new Replica(replicaName, replicaPropMap);
|
||||
|
||||
|
@ -243,9 +243,7 @@ public class SyncSliceTest extends AbstractFullDistribZkTestBase {
|
||||
Collection<Replica> replicas = slice.getReplicas();
|
||||
boolean allActive = true;
|
||||
for (Replica replica : replicas) {
|
||||
if (!clusterState.liveNodesContain(replica.getNodeName())
|
||||
|| !replica.get(ZkStateReader.STATE_PROP).equals(
|
||||
ZkStateReader.ACTIVE)) {
|
||||
if (!clusterState.liveNodesContain(replica.getNodeName()) || replica.getState() != Replica.State.ACTIVE) {
|
||||
allActive = false;
|
||||
break;
|
||||
}
|
||||
|
@ -1008,7 +1008,7 @@ public class CloudSolrClient extends SolrClient {
|
||||
ZkCoreNodeProps coreNodeProps = new ZkCoreNodeProps(nodeProps);
|
||||
String node = coreNodeProps.getNodeName();
|
||||
if (!liveNodes.contains(coreNodeProps.getNodeName())
|
||||
|| !coreNodeProps.getState().equals(ZkStateReader.ACTIVE)) continue;
|
||||
|| Replica.State.getState(coreNodeProps.getState()) != Replica.State.ACTIVE) continue;
|
||||
if (nodes.put(node, nodeProps) == null) {
|
||||
if (!sendToLeaders || coreNodeProps.isLeader()) {
|
||||
String url;
|
||||
|
@ -21,6 +21,7 @@ import org.apache.solr.client.solrj.SolrClient;
|
||||
import org.apache.solr.client.solrj.SolrRequest;
|
||||
import org.apache.solr.client.solrj.SolrServerException;
|
||||
import org.apache.solr.client.solrj.response.CoreAdminResponse;
|
||||
import org.apache.solr.common.cloud.Replica;
|
||||
import org.apache.solr.common.cloud.ZkStateReader;
|
||||
import org.apache.solr.common.params.CoreAdminParams;
|
||||
import org.apache.solr.common.params.CoreAdminParams.CoreAdminAction;
|
||||
@ -164,7 +165,7 @@ public class CoreAdminRequest extends SolrRequest<CoreAdminResponse> {
|
||||
public static class WaitForState extends CoreAdminRequest {
|
||||
protected String nodeName;
|
||||
protected String coreNodeName;
|
||||
protected String state;
|
||||
protected Replica.State state;
|
||||
protected Boolean checkLive;
|
||||
protected Boolean onlyIfLeader;
|
||||
protected Boolean onlyIfLeaderActive;
|
||||
@ -189,11 +190,11 @@ public class CoreAdminRequest extends SolrRequest<CoreAdminResponse> {
|
||||
this.coreNodeName = coreNodeName;
|
||||
}
|
||||
|
||||
public String getState() {
|
||||
public Replica.State getState() {
|
||||
return state;
|
||||
}
|
||||
|
||||
public void setState(String state) {
|
||||
public void setState(Replica.State state) {
|
||||
this.state = state;
|
||||
}
|
||||
|
||||
@ -236,7 +237,7 @@ public class CoreAdminRequest extends SolrRequest<CoreAdminResponse> {
|
||||
}
|
||||
|
||||
if (state != null) {
|
||||
params.set( "state", state);
|
||||
params.set(ZkStateReader.STATE_PROP, state.toString());
|
||||
}
|
||||
|
||||
if (checkLive != null) {
|
||||
|
@ -79,10 +79,9 @@ public class ClusterStateUtil {
|
||||
Collection<Replica> replicas = slice.getReplicas();
|
||||
for (Replica replica : replicas) {
|
||||
// on a live node?
|
||||
boolean live = clusterState.liveNodesContain(replica
|
||||
.getNodeName());
|
||||
String state = replica.getStr(ZkStateReader.STATE_PROP);
|
||||
if (!live || !state.equals(ZkStateReader.ACTIVE)) {
|
||||
final boolean live = clusterState.liveNodesContain(replica.getNodeName());
|
||||
final boolean isActive = replica.getState() == Replica.State.ACTIVE;
|
||||
if (!live || !isActive) {
|
||||
// fail
|
||||
success = false;
|
||||
}
|
||||
|
@ -17,21 +17,87 @@ package org.apache.solr.common.cloud;
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import org.noggit.JSONUtil;
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.*;
|
||||
|
||||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.BASE_URL_PROP;
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP;
|
||||
import org.noggit.JSONUtil;
|
||||
|
||||
public class Replica extends ZkNodeProps {
|
||||
|
||||
/**
|
||||
* The replica's state. In general, if the node the replica is hosted on is
|
||||
* not under {@code /live_nodes} in ZK, the replica's state should be
|
||||
* discarded.
|
||||
*/
|
||||
public enum State {
|
||||
|
||||
/**
|
||||
* The replica is ready to receive updates and queries.
|
||||
* <p>
|
||||
* <b>NOTE</b>: when the node the replica is hosted on crashes, the
|
||||
* replica's state may remain ACTIVE in ZK. To determine if the replica is
|
||||
* truly active, you must also verify that its {@link Replica#getNodeName()
|
||||
* node} is under {@code /live_nodes} in ZK (or use
|
||||
* {@link ClusterState#liveNodesContain(String)}).
|
||||
* </p>
|
||||
*/
|
||||
ACTIVE,
|
||||
|
||||
/**
|
||||
* The first state before {@link State#RECOVERING}. A node in this state
|
||||
* should be actively trying to move to {@link State#RECOVERING}.
|
||||
* <p>
|
||||
* <b>NOTE</b>: a replica's state may appear DOWN in ZK also when the node
|
||||
* it's hosted on gracefully shuts down. This is a best effort though, and
|
||||
* should not be relied on.
|
||||
* </p>
|
||||
*/
|
||||
DOWN,
|
||||
|
||||
/**
|
||||
* The node is recovering from the leader. This might involve peer-sync,
|
||||
* full replication or finding out things are already in sync.
|
||||
*/
|
||||
RECOVERING,
|
||||
|
||||
/**
|
||||
* Recovery attempts have not worked, something is not right.
|
||||
* <p>
|
||||
* <b>NOTE</b>: This state doesn't matter if the node is not part of
|
||||
* {@code /live_nodes} in ZK; in that case the node is not part of the
|
||||
* cluster and it's state should be discarded.
|
||||
* </p>
|
||||
*/
|
||||
RECOVERY_FAILED;
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return super.toString().toLowerCase(Locale.ROOT);
|
||||
}
|
||||
|
||||
/** Converts the state string to a State instance. */
|
||||
public static State getState(String stateStr) {
|
||||
return stateStr == null ? null : State.valueOf(stateStr.toUpperCase(Locale.ROOT));
|
||||
}
|
||||
}
|
||||
|
||||
private final String name;
|
||||
private final String nodeName;
|
||||
private final State state;
|
||||
|
||||
public Replica(String name, Map<String,Object> propMap) {
|
||||
super(propMap);
|
||||
this.name = name;
|
||||
nodeName = (String)propMap.get(ZkStateReader.NODE_NAME_PROP);
|
||||
this.nodeName = (String) propMap.get(ZkStateReader.NODE_NAME_PROP);
|
||||
if (propMap.get(ZkStateReader.STATE_PROP) != null) {
|
||||
this.state = State.getState((String) propMap.get(ZkStateReader.STATE_PROP));
|
||||
} else {
|
||||
this.state = State.ACTIVE; //Default to ACTIVE
|
||||
propMap.put(ZkStateReader.STATE_PROP, state.toString());
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public String getName() {
|
||||
@ -45,6 +111,11 @@ public class Replica extends ZkNodeProps {
|
||||
public String getNodeName() {
|
||||
return nodeName;
|
||||
}
|
||||
|
||||
/** Returns the {@link State} of this replica. */
|
||||
public State getState() {
|
||||
return state;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
|
@ -86,12 +86,6 @@ public class ZkStateReader implements Closeable {
|
||||
|
||||
public static final String ROLES = "/roles.json";
|
||||
|
||||
public static final String RECOVERING = "recovering";
|
||||
public static final String RECOVERY_FAILED = "recovery_failed";
|
||||
public static final String ACTIVE = "active";
|
||||
public static final String DOWN = "down";
|
||||
public static final String SYNC = "sync";
|
||||
|
||||
public static final String CONFIGS_ZKNODE = "/configs";
|
||||
public final static String CONFIGNAME_PROP="configName";
|
||||
|
||||
@ -696,18 +690,17 @@ public class ZkStateReader implements Closeable {
|
||||
}
|
||||
|
||||
|
||||
public List<ZkCoreNodeProps> getReplicaProps(String collection,
|
||||
String shardId, String thisCoreNodeName) {
|
||||
public List<ZkCoreNodeProps> getReplicaProps(String collection, String shardId, String thisCoreNodeName) {
|
||||
return getReplicaProps(collection, shardId, thisCoreNodeName, null);
|
||||
}
|
||||
|
||||
public List<ZkCoreNodeProps> getReplicaProps(String collection,
|
||||
String shardId, String thisCoreNodeName, String mustMatchStateFilter) {
|
||||
public List<ZkCoreNodeProps> getReplicaProps(String collection, String shardId, String thisCoreNodeName,
|
||||
Replica.State mustMatchStateFilter) {
|
||||
return getReplicaProps(collection, shardId, thisCoreNodeName, mustMatchStateFilter, null);
|
||||
}
|
||||
|
||||
public List<ZkCoreNodeProps> getReplicaProps(String collection,
|
||||
String shardId, String thisCoreNodeName, String mustMatchStateFilter, String mustNotMatchStateFilter) {
|
||||
public List<ZkCoreNodeProps> getReplicaProps(String collection, String shardId, String thisCoreNodeName,
|
||||
Replica.State mustMatchStateFilter, Replica.State mustNotMatchStateFilter) {
|
||||
assert thisCoreNodeName != null;
|
||||
ClusterState clusterState = this.clusterState;
|
||||
if (clusterState == null) {
|
||||
@ -733,8 +726,8 @@ public class ZkStateReader implements Closeable {
|
||||
String coreNodeName = entry.getValue().getName();
|
||||
|
||||
if (clusterState.liveNodesContain(nodeProps.getNodeName()) && !coreNodeName.equals(thisCoreNodeName)) {
|
||||
if (mustMatchStateFilter == null || mustMatchStateFilter.equals(nodeProps.getState())) {
|
||||
if (mustNotMatchStateFilter == null || !mustNotMatchStateFilter.equals(nodeProps.getState())) {
|
||||
if (mustMatchStateFilter == null || mustMatchStateFilter == Replica.State.getState(nodeProps.getState())) {
|
||||
if (mustNotMatchStateFilter == null || mustNotMatchStateFilter != Replica.State.getState(nodeProps.getState())) {
|
||||
nodes.add(nodeProps);
|
||||
}
|
||||
}
|
||||
|
@ -154,11 +154,9 @@ public abstract class AbstractDistribZkTestBase extends BaseDistributedSearchTes
|
||||
+ shard.getValue().getStr(ZkStateReader.STATE_PROP)
|
||||
+ " live:"
|
||||
+ clusterState.liveNodesContain(shard.getValue().getNodeName()));
|
||||
String state = shard.getValue().getStr(ZkStateReader.STATE_PROP);
|
||||
if ((state.equals(ZkStateReader.RECOVERING) || state
|
||||
.equals(ZkStateReader.SYNC) || state.equals(ZkStateReader.DOWN))
|
||||
&& clusterState.liveNodesContain(shard.getValue().getStr(
|
||||
ZkStateReader.NODE_NAME_PROP))) {
|
||||
final Replica.State state = shard.getValue().getState();
|
||||
if ((state == Replica.State.RECOVERING || state == Replica.State.DOWN)
|
||||
&& clusterState.liveNodesContain(shard.getValue().getStr(ZkStateReader.NODE_NAME_PROP))) {
|
||||
sawLiveRecovering = true;
|
||||
}
|
||||
}
|
||||
@ -199,9 +197,9 @@ public abstract class AbstractDistribZkTestBase extends BaseDistributedSearchTes
|
||||
Map<String,Replica> shards = entry.getValue().getReplicasMap();
|
||||
for (Map.Entry<String,Replica> shard : shards.entrySet()) {
|
||||
|
||||
String state = shard.getValue().getStr(ZkStateReader.STATE_PROP);
|
||||
if (!state.equals(ZkStateReader.ACTIVE)) {
|
||||
fail("Not all shards are ACTIVE - found a shard that is: " + state);
|
||||
final Replica.State state = shard.getValue().getState();
|
||||
if (state != Replica.State.ACTIVE) {
|
||||
fail("Not all shards are ACTIVE - found a shard that is: " + state.toString());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1079,7 +1079,7 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
|
||||
for (CloudJettyRunner cjetty : solrJetties) {
|
||||
ZkNodeProps props = cjetty.info;
|
||||
String nodeName = props.getStr(ZkStateReader.NODE_NAME_PROP);
|
||||
boolean active = props.getStr(ZkStateReader.STATE_PROP).equals(ZkStateReader.ACTIVE);
|
||||
boolean active = Replica.State.getState(props.getStr(ZkStateReader.STATE_PROP)) == Replica.State.ACTIVE;
|
||||
boolean live = zkStateReader.getClusterState().liveNodesContain(nodeName);
|
||||
if (active && live) {
|
||||
shardClients.add(cjetty.client.solrClient);
|
||||
@ -1169,8 +1169,7 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
|
||||
if (verbose) System.err.println(" live:" + live);
|
||||
if (verbose) System.err.println(" num:" + num + "\n");
|
||||
|
||||
boolean active = props.getStr(ZkStateReader.STATE_PROP).equals(
|
||||
ZkStateReader.ACTIVE);
|
||||
boolean active = Replica.State.getState(props.getStr(ZkStateReader.STATE_PROP)) == Replica.State.ACTIVE;
|
||||
if (active && live) {
|
||||
if (lastNum > -1 && lastNum != num && failMessage == null) {
|
||||
failMessage = shard + " is not consistent. Got " + lastNum + " from " + lastJetty.url + "lastClient"
|
||||
@ -1315,8 +1314,7 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
|
||||
CloudJettyRunner cjetty = shardToJetty.get(s).get(i);
|
||||
ZkNodeProps props = cjetty.info;
|
||||
SolrClient client = cjetty.client.solrClient;
|
||||
boolean active = props.getStr(ZkStateReader.STATE_PROP).equals(
|
||||
ZkStateReader.ACTIVE);
|
||||
boolean active = Replica.State.getState(props.getStr(ZkStateReader.STATE_PROP)) == Replica.State.ACTIVE;
|
||||
if (active) {
|
||||
SolrQuery query = new SolrQuery("*:*");
|
||||
query.set("distrib", false);
|
||||
@ -1400,11 +1398,9 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
|
||||
}
|
||||
ZkStateReader zkStateReader = cloudClient.getZkStateReader();
|
||||
long count = 0;
|
||||
String currentState = cjetty.info.getStr(ZkStateReader.STATE_PROP);
|
||||
if (currentState != null
|
||||
&& currentState.equals(ZkStateReader.ACTIVE)
|
||||
&& zkStateReader.getClusterState().liveNodesContain(
|
||||
cjetty.info.getStr(ZkStateReader.NODE_NAME_PROP))) {
|
||||
final Replica.State currentState = Replica.State.getState(cjetty.info.getStr(ZkStateReader.STATE_PROP));
|
||||
if (currentState == Replica.State.ACTIVE
|
||||
&& zkStateReader.getClusterState().liveNodesContain(cjetty.info.getStr(ZkStateReader.NODE_NAME_PROP))) {
|
||||
SolrQuery query = new SolrQuery("*:*");
|
||||
query.set("distrib", false);
|
||||
count = client.solrClient.query(query).getResults().getNumFound();
|
||||
@ -1591,9 +1587,9 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
|
||||
String shardNames = (String) collectionProps.get(SHARDS_PROP);
|
||||
numShards = StrUtils.splitSmart(shardNames,',').size();
|
||||
}
|
||||
Integer replicationFactor = (Integer) collectionProps.get(REPLICATION_FACTOR);
|
||||
Integer replicationFactor = (Integer) collectionProps.get(ZkStateReader.REPLICATION_FACTOR);
|
||||
if(replicationFactor==null){
|
||||
replicationFactor = (Integer) OverseerCollectionProcessor.COLL_PROPS.get(REPLICATION_FACTOR);
|
||||
replicationFactor = (Integer) OverseerCollectionProcessor.COLL_PROPS.get(ZkStateReader.REPLICATION_FACTOR);
|
||||
}
|
||||
|
||||
if (confSetName != null) {
|
||||
@ -1634,9 +1630,9 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
|
||||
return createCollection(collectionInfos, collectionName,
|
||||
ZkNodeProps.makeMap(
|
||||
NUM_SLICES, numShards,
|
||||
REPLICATION_FACTOR, replicationFactor,
|
||||
ZkStateReader.REPLICATION_FACTOR, replicationFactor,
|
||||
CREATE_NODE_SET, createNodeSetStr,
|
||||
MAX_SHARDS_PER_NODE, maxShardsPerNode),
|
||||
ZkStateReader.MAX_SHARDS_PER_NODE, maxShardsPerNode),
|
||||
client);
|
||||
}
|
||||
|
||||
@ -1646,9 +1642,9 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
|
||||
return createCollection(collectionInfos, collectionName,
|
||||
ZkNodeProps.makeMap(
|
||||
NUM_SLICES, numShards,
|
||||
REPLICATION_FACTOR, replicationFactor,
|
||||
ZkStateReader.REPLICATION_FACTOR, replicationFactor,
|
||||
CREATE_NODE_SET, createNodeSetStr,
|
||||
MAX_SHARDS_PER_NODE, maxShardsPerNode),
|
||||
ZkStateReader.MAX_SHARDS_PER_NODE, maxShardsPerNode),
|
||||
client, configName);
|
||||
}
|
||||
|
||||
@ -1861,8 +1857,8 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
|
||||
.getZkStateReader().getClusterState().getLiveNodes().size())) + 1;
|
||||
|
||||
Map<String, Object> props = makeMap(
|
||||
REPLICATION_FACTOR, replicationFactor,
|
||||
MAX_SHARDS_PER_NODE, maxShardsPerNode,
|
||||
ZkStateReader.REPLICATION_FACTOR, replicationFactor,
|
||||
ZkStateReader.MAX_SHARDS_PER_NODE, maxShardsPerNode,
|
||||
NUM_SLICES, numShards);
|
||||
Map<String,List<Integer>> collectionInfos = new HashMap<>();
|
||||
createCollection(collectionInfos, collName, props, client);
|
||||
@ -1903,9 +1899,8 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
|
||||
|
||||
// ensure all replicas are "active" and identify the non-leader replica
|
||||
for (Replica replica : replicas) {
|
||||
String replicaState = replica.getStr(ZkStateReader.STATE_PROP);
|
||||
if (!ZkStateReader.ACTIVE.equals(replicaState)) {
|
||||
log.info("Replica " + replica.getName() + " is currently " + replicaState);
|
||||
if (replica.getState() != Replica.State.ACTIVE) {
|
||||
log.info("Replica {} is currently {}", replica.getName(), replica.getState());
|
||||
allReplicasUp = false;
|
||||
}
|
||||
|
||||
|
@ -21,6 +21,7 @@ import org.apache.lucene.util.LuceneTestCase;
|
||||
import org.apache.solr.client.solrj.SolrClient;
|
||||
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
|
||||
import org.apache.solr.cloud.AbstractFullDistribZkTestBase.CloudJettyRunner;
|
||||
import org.apache.solr.common.cloud.Replica;
|
||||
import org.apache.solr.common.cloud.Slice;
|
||||
import org.apache.solr.common.cloud.SolrZkClient;
|
||||
import org.apache.solr.common.cloud.ZkNodeProps;
|
||||
@ -398,8 +399,7 @@ public class ChaosMonkey {
|
||||
return cjetty;
|
||||
}
|
||||
|
||||
private int checkIfKillIsLegal(String slice, int numActive)
|
||||
throws KeeperException, InterruptedException {
|
||||
private int checkIfKillIsLegal(String slice, int numActive) throws KeeperException, InterruptedException {
|
||||
for (CloudJettyRunner cloudJetty : shardToJetty.get(slice)) {
|
||||
|
||||
// get latest cloud state
|
||||
@ -413,11 +413,11 @@ public class ChaosMonkey {
|
||||
throw new RuntimeException("shard name " + cloudJetty.coreNodeName + " not found in " + theShards.getReplicasMap().keySet());
|
||||
}
|
||||
|
||||
String state = props.getStr(ZkStateReader.STATE_PROP);
|
||||
String nodeName = props.getStr(ZkStateReader.NODE_NAME_PROP);
|
||||
final Replica.State state = Replica.State.getState(props.getStr(ZkStateReader.STATE_PROP));
|
||||
final String nodeName = props.getStr(ZkStateReader.NODE_NAME_PROP);
|
||||
|
||||
if (cloudJetty.jetty.isRunning()
|
||||
&& state.equals(ZkStateReader.ACTIVE)
|
||||
&& state == Replica.State.ACTIVE
|
||||
&& zkStateReader.getClusterState().liveNodesContain(nodeName)) {
|
||||
numActive++;
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user