mirror of https://github.com/apache/lucene.git
SOLR-10397: Remove old implementation of autoAddReplicas features
This commit is contained in:
parent
b53736155a
commit
0f7e3be589
|
@ -191,6 +191,9 @@ public class AddReplicaCmd implements OverseerCollectionMessageHandler.Cmd {
|
|||
if (instanceDir != null) {
|
||||
params.set(CoreAdminParams.INSTANCE_DIR, instanceDir);
|
||||
}
|
||||
if (coreNodeName != null) {
|
||||
params.set(CoreAdminParams.CORE_NODE_NAME, coreNodeName);
|
||||
}
|
||||
ocmh.addPropertyParams(message, params);
|
||||
|
||||
// For tracking async calls.
|
||||
|
|
|
@ -31,7 +31,6 @@ import org.apache.solr.common.cloud.Replica;
|
|||
import org.apache.solr.common.cloud.Slice;
|
||||
import org.apache.solr.common.cloud.SolrZkClient;
|
||||
import org.apache.solr.common.cloud.ZkStateReader;
|
||||
import org.apache.solr.common.params.CoreAdminParams;
|
||||
import org.apache.solr.core.CoreContainer;
|
||||
import org.apache.solr.core.CoreDescriptor;
|
||||
import org.apache.solr.core.SolrResourceLoader;
|
||||
|
@ -49,7 +48,8 @@ public class CloudUtil {
|
|||
* + throw exception if it has been.
|
||||
*/
|
||||
public static void checkSharedFSFailoverReplaced(CoreContainer cc, CoreDescriptor desc) {
|
||||
|
||||
if (!cc.isSharedFs(desc)) return;
|
||||
|
||||
ZkController zkController = cc.getZkController();
|
||||
String thisCnn = zkController.getCoreNodeName(desc);
|
||||
String thisBaseUrl = zkController.getBaseUrl();
|
||||
|
@ -65,11 +65,10 @@ public class CloudUtil {
|
|||
|
||||
String cnn = replica.getName();
|
||||
String baseUrl = replica.getStr(ZkStateReader.BASE_URL_PROP);
|
||||
boolean isSharedFs = replica.getStr(CoreAdminParams.DATA_DIR) != null;
|
||||
log.debug("compare against coreNodeName={} baseUrl={}", cnn, baseUrl);
|
||||
|
||||
if (thisCnn != null && thisCnn.equals(cnn)
|
||||
&& !thisBaseUrl.equals(baseUrl) && isSharedFs) {
|
||||
&& !thisBaseUrl.equals(baseUrl)) {
|
||||
if (cc.getLoadedCoreNames().contains(desc.getName())) {
|
||||
cc.unload(desc.getName());
|
||||
}
|
||||
|
|
|
@ -31,7 +31,6 @@ import java.util.concurrent.TimeUnit;
|
|||
|
||||
import org.apache.solr.client.solrj.cloud.autoscaling.Policy;
|
||||
import org.apache.solr.cloud.OverseerCollectionMessageHandler.Cmd;
|
||||
import org.apache.solr.cloud.autoscaling.AutoScaling;
|
||||
import org.apache.solr.cloud.autoscaling.AutoScalingHandler;
|
||||
import org.apache.solr.cloud.overseer.ClusterStateMutator;
|
||||
import org.apache.solr.common.cloud.ReplicaPosition;
|
||||
|
@ -110,7 +109,6 @@ public class CreateCollectionCmd implements Cmd {
|
|||
int numTlogReplicas = message.getInt(TLOG_REPLICAS, 0);
|
||||
int numNrtReplicas = message.getInt(NRT_REPLICAS, message.getInt(REPLICATION_FACTOR, numTlogReplicas>0?0:1));
|
||||
int numPullReplicas = message.getInt(PULL_REPLICAS, 0);
|
||||
boolean autoAddReplicas = message.getBool(AUTO_ADD_REPLICAS, false);
|
||||
Map autoScalingJson = Utils.getJson(ocmh.zkStateReader.getZkClient(), SOLR_AUTOSCALING_CONF_PATH, true);
|
||||
String policy = message.getStr(Policy.POLICY);
|
||||
boolean usePolicyFramework = autoScalingJson.get(Policy.CLUSTER_POLICY) != null || policy != null;
|
||||
|
@ -318,9 +316,6 @@ public class CreateCollectionCmd implements Cmd {
|
|||
ocmh.cleanupCollection(collectionName, new NamedList());
|
||||
log.info("Cleaned up artifacts for failed create collection for [{}]", collectionName);
|
||||
} else {
|
||||
if (autoAddReplicas) {
|
||||
ocmh.forwardToAutoScaling(AutoScaling.AUTO_ADD_REPLICAS_TRIGGER_DSL);
|
||||
}
|
||||
log.debug("Finished create command on all shards for collection: {}", collectionName);
|
||||
|
||||
// Emit a warning about production use of data driven functionality
|
||||
|
|
|
@ -30,6 +30,8 @@ import java.util.concurrent.atomic.AtomicInteger;
|
|||
|
||||
import com.codahale.metrics.Timer;
|
||||
import org.apache.solr.client.solrj.SolrResponse;
|
||||
import org.apache.solr.cloud.autoscaling.AutoScaling;
|
||||
import org.apache.solr.cloud.autoscaling.AutoScalingHandler;
|
||||
import org.apache.solr.cloud.autoscaling.OverseerTriggerThread;
|
||||
import org.apache.solr.cloud.overseer.ClusterStateMutator;
|
||||
import org.apache.solr.cloud.overseer.CollectionMutator;
|
||||
|
@ -39,17 +41,22 @@ import org.apache.solr.cloud.overseer.ReplicaMutator;
|
|||
import org.apache.solr.cloud.overseer.SliceMutator;
|
||||
import org.apache.solr.cloud.overseer.ZkStateWriter;
|
||||
import org.apache.solr.cloud.overseer.ZkWriteCommand;
|
||||
import org.apache.solr.common.SolrException;
|
||||
import org.apache.solr.common.cloud.ClusterState;
|
||||
import org.apache.solr.common.cloud.SolrZkClient;
|
||||
import org.apache.solr.common.cloud.ZkNodeProps;
|
||||
import org.apache.solr.common.cloud.ZkStateReader;
|
||||
import org.apache.solr.common.params.CollectionParams;
|
||||
import org.apache.solr.common.params.ModifiableSolrParams;
|
||||
import org.apache.solr.common.util.ContentStreamBase;
|
||||
import org.apache.solr.common.util.IOUtils;
|
||||
import org.apache.solr.common.util.ObjectReleaseTracker;
|
||||
import org.apache.solr.common.util.Utils;
|
||||
import org.apache.solr.core.CloudConfig;
|
||||
import org.apache.solr.handler.admin.CollectionsHandler;
|
||||
import org.apache.solr.handler.component.ShardHandler;
|
||||
import org.apache.solr.request.LocalSolrQueryRequest;
|
||||
import org.apache.solr.response.SolrQueryResponse;
|
||||
import org.apache.solr.update.UpdateShardHandler;
|
||||
import org.apache.zookeeper.CreateMode;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
|
@ -470,8 +477,6 @@ public class Overseer implements Closeable {
|
|||
private OverseerThread ccThread;
|
||||
|
||||
private OverseerThread updaterThread;
|
||||
|
||||
private OverseerThread arfoThread;
|
||||
|
||||
private OverseerThread triggerThread;
|
||||
|
||||
|
@ -524,12 +529,11 @@ public class Overseer implements Closeable {
|
|||
overseerCollectionConfigSetProcessor = new OverseerCollectionConfigSetProcessor(reader, id, shardHandler, adminPath, stats, Overseer.this, overseerPrioritizer);
|
||||
ccThread = new OverseerThread(ccTg, overseerCollectionConfigSetProcessor, "OverseerCollectionConfigSetProcessor-" + id);
|
||||
ccThread.setDaemon(true);
|
||||
|
||||
ThreadGroup ohcfTg = new ThreadGroup("Overseer Hdfs SolrCore Failover Thread.");
|
||||
|
||||
OverseerAutoReplicaFailoverThread autoReplicaFailoverThread = new OverseerAutoReplicaFailoverThread(config, reader, updateShardHandler);
|
||||
arfoThread = new OverseerThread(ohcfTg, autoReplicaFailoverThread, "OverseerHdfsCoreFailoverThread-" + id);
|
||||
arfoThread.setDaemon(true);
|
||||
//TODO nocommit, autoscaling framework should start autoAddReplicas trigger automatically (implicitly)
|
||||
Thread autoscalingTriggerCreator = new Thread(createAutoscalingTriggerIfNotExist(), "AutoscalingTriggerCreator");
|
||||
autoscalingTriggerCreator.setDaemon(true);
|
||||
autoscalingTriggerCreator.start();
|
||||
|
||||
ThreadGroup triggerThreadGroup = new ThreadGroup("Overseer autoscaling triggers");
|
||||
OverseerTriggerThread trigger = new OverseerTriggerThread(zkController);
|
||||
|
@ -537,7 +541,6 @@ public class Overseer implements Closeable {
|
|||
|
||||
updaterThread.start();
|
||||
ccThread.start();
|
||||
arfoThread.start();
|
||||
triggerThread.start();
|
||||
assert ObjectReleaseTracker.track(this);
|
||||
}
|
||||
|
@ -569,6 +572,43 @@ public class Overseer implements Closeable {
|
|||
assert ObjectReleaseTracker.release(this);
|
||||
}
|
||||
|
||||
private Runnable createAutoscalingTriggerIfNotExist() {
|
||||
return new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
boolean triggerExist = getZkStateReader().getAutoScalingConfig()
|
||||
.getTriggerConfigs().get(".auto_add_replicas") != null;
|
||||
if (triggerExist) return;
|
||||
} catch (InterruptedException | KeeperException e) {
|
||||
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
|
||||
"Failed when creating .auto_add_replicas trigger");
|
||||
}
|
||||
while (getZkController().getCoreContainer()
|
||||
.getRequestHandler(AutoScalingHandler.HANDLER_PATH) == null) {
|
||||
try {
|
||||
Thread.sleep(500);
|
||||
} catch (InterruptedException e) {
|
||||
// expected
|
||||
}
|
||||
}
|
||||
|
||||
String dsl = AutoScaling.AUTO_ADD_REPLICAS_TRIGGER_DSL.replace("{{waitFor}}",
|
||||
String.valueOf(config.getAutoReplicaFailoverWaitAfterExpiration()/1000));
|
||||
LocalSolrQueryRequest request = new LocalSolrQueryRequest(null, new ModifiableSolrParams());
|
||||
request.getContext().put("httpMethod", "POST");
|
||||
request.setContentStreams(Collections.singleton(new ContentStreamBase.StringStream(dsl)));
|
||||
SolrQueryResponse response = new SolrQueryResponse();
|
||||
getZkController().getCoreContainer()
|
||||
.getRequestHandler(AutoScalingHandler.HANDLER_PATH).handleRequest(request, response);
|
||||
if (!"success".equals(response.getValues().get("result"))) {
|
||||
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
|
||||
"Failed when creating .auto_add_replicas trigger, return " + response);
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
private void doClose() {
|
||||
|
||||
if (updaterThread != null) {
|
||||
|
@ -579,10 +619,6 @@ public class Overseer implements Closeable {
|
|||
IOUtils.closeQuietly(ccThread);
|
||||
ccThread.interrupt();
|
||||
}
|
||||
if (arfoThread != null) {
|
||||
IOUtils.closeQuietly(arfoThread);
|
||||
arfoThread.interrupt();
|
||||
}
|
||||
if (triggerThread != null) {
|
||||
IOUtils.closeQuietly(triggerThread);
|
||||
triggerThread.interrupt();
|
||||
|
@ -598,11 +634,6 @@ public class Overseer implements Closeable {
|
|||
ccThread.join();
|
||||
} catch (InterruptedException e) {}
|
||||
}
|
||||
if (arfoThread != null) {
|
||||
try {
|
||||
arfoThread.join();
|
||||
} catch (InterruptedException e) {}
|
||||
}
|
||||
if (triggerThread != null) {
|
||||
try {
|
||||
triggerThread.join();
|
||||
|
@ -611,7 +642,6 @@ public class Overseer implements Closeable {
|
|||
|
||||
updaterThread = null;
|
||||
ccThread = null;
|
||||
arfoThread = null;
|
||||
triggerThread = null;
|
||||
}
|
||||
|
||||
|
|
|
@ -1,531 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.solr.cloud;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.lang.invoke.MethodHandles;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.TreeMap;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import com.google.common.cache.Cache;
|
||||
import com.google.common.cache.CacheBuilder;
|
||||
import org.apache.solr.client.solrj.impl.HttpSolrClient;
|
||||
import org.apache.solr.client.solrj.request.CoreAdminRequest.Create;
|
||||
import org.apache.solr.common.SolrException;
|
||||
import org.apache.solr.common.cloud.ClusterState;
|
||||
import org.apache.solr.common.cloud.ClusterStateUtil;
|
||||
import org.apache.solr.common.cloud.DocCollection;
|
||||
import org.apache.solr.common.cloud.Replica;
|
||||
import org.apache.solr.common.cloud.Slice;
|
||||
import org.apache.solr.common.cloud.ZkStateReader;
|
||||
import org.apache.solr.core.CloudConfig;
|
||||
import org.apache.solr.update.UpdateShardHandler;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.slf4j.MDC;
|
||||
|
||||
|
||||
// TODO: how to tmp exclude nodes?
|
||||
|
||||
// TODO: more fine grained failover rules?
|
||||
|
||||
// TODO: test with lots of collections
|
||||
|
||||
// TODO: add config for only failover if replicas is < N
|
||||
|
||||
// TODO: general support for non shared filesystems
|
||||
// this is specialized for a shared file system, but it should
|
||||
// not be much work to generalize
|
||||
|
||||
// NOTE: using replication can slow down failover if a whole
|
||||
// shard is lost.
|
||||
|
||||
/**
|
||||
*
|
||||
* In this simple initial implementation we are limited in how quickly we detect
|
||||
* a failure by a worst case of roughly zk session timeout + WAIT_AFTER_EXPIRATION_SECONDS + WORK_LOOP_DELAY_MS
|
||||
* and best case of roughly zk session timeout + WAIT_AFTER_EXPIRATION_SECONDS. Also, consider the time to
|
||||
* create the SolrCore, do any recovery necessary, and warm up the readers.
|
||||
*
|
||||
* NOTE: this will only work with collections created via the collections api because they will have defined
|
||||
* replicationFactor and maxShardsPerNode.
|
||||
*
|
||||
* @lucene.experimental
|
||||
*/
|
||||
public class OverseerAutoReplicaFailoverThread implements Runnable, Closeable {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||
|
||||
private Integer lastClusterStateVersion;
|
||||
|
||||
private final ExecutorService updateExecutor;
|
||||
private volatile boolean isClosed;
|
||||
private ZkStateReader zkStateReader;
|
||||
private final Cache<String,Long> baseUrlForBadNodes;
|
||||
private Set<String> liveNodes = Collections.EMPTY_SET;
|
||||
|
||||
private final int workLoopDelay;
|
||||
private final int waitAfterExpiration;
|
||||
|
||||
private volatile Thread thread;
|
||||
|
||||
public OverseerAutoReplicaFailoverThread(CloudConfig config, ZkStateReader zkStateReader,
|
||||
UpdateShardHandler updateShardHandler) {
|
||||
this.zkStateReader = zkStateReader;
|
||||
|
||||
this.workLoopDelay = config.getAutoReplicaFailoverWorkLoopDelay();
|
||||
this.waitAfterExpiration = config.getAutoReplicaFailoverWaitAfterExpiration();
|
||||
int badNodeExpiration = config.getAutoReplicaFailoverBadNodeExpiration();
|
||||
|
||||
log.debug(
|
||||
"Starting "
|
||||
+ this.getClass().getSimpleName()
|
||||
+ " autoReplicaFailoverWorkLoopDelay={} autoReplicaFailoverWaitAfterExpiration={} autoReplicaFailoverBadNodeExpiration={}",
|
||||
workLoopDelay, waitAfterExpiration, badNodeExpiration);
|
||||
|
||||
baseUrlForBadNodes = CacheBuilder.newBuilder()
|
||||
.concurrencyLevel(1).expireAfterWrite(badNodeExpiration, TimeUnit.MILLISECONDS).build();
|
||||
|
||||
// TODO: Speed up our work loop when live_nodes changes??
|
||||
|
||||
updateExecutor = updateShardHandler.getUpdateExecutor();
|
||||
|
||||
|
||||
// TODO: perhaps do a health ping periodically to each node (scaryish)
|
||||
// And/OR work on JIRA issue around self health checks (SOLR-5805)
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
this.thread = Thread.currentThread();
|
||||
while (!this.isClosed) {
|
||||
// work loop
|
||||
log.debug("do " + this.getClass().getSimpleName() + " work loop");
|
||||
|
||||
// every n, look at state and make add / remove calls
|
||||
|
||||
try {
|
||||
doWork();
|
||||
} catch (Exception e) {
|
||||
SolrException.log(log, this.getClass().getSimpleName()
|
||||
+ " had an error in its thread work loop.", e);
|
||||
}
|
||||
|
||||
if (!this.isClosed) {
|
||||
try {
|
||||
Thread.sleep(workLoopDelay);
|
||||
} catch (InterruptedException e) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void doWork() {
|
||||
|
||||
// TODO: extract to configurable strategy class ??
|
||||
ClusterState clusterState = zkStateReader.getClusterState();
|
||||
//check if we have disabled autoAddReplicas cluster wide
|
||||
String autoAddReplicas = zkStateReader.getClusterProperty(ZkStateReader.AUTO_ADD_REPLICAS, (String) null);
|
||||
if (autoAddReplicas != null && autoAddReplicas.equals("false")) {
|
||||
return;
|
||||
}
|
||||
if (clusterState != null) {
|
||||
if (clusterState.getZkClusterStateVersion() != null &&
|
||||
clusterState.getZkClusterStateVersion().equals(lastClusterStateVersion) && baseUrlForBadNodes.size() == 0 &&
|
||||
liveNodes.equals(clusterState.getLiveNodes())) {
|
||||
// nothing has changed, no work to do
|
||||
return;
|
||||
}
|
||||
|
||||
liveNodes = clusterState.getLiveNodes();
|
||||
lastClusterStateVersion = clusterState.getZkClusterStateVersion();
|
||||
Map<String, DocCollection> collections = clusterState.getCollectionsMap();
|
||||
for (Map.Entry<String, DocCollection> entry : collections.entrySet()) {
|
||||
log.debug("look at collection={}", entry.getKey());
|
||||
DocCollection docCollection = entry.getValue();
|
||||
if (!docCollection.getAutoAddReplicas()) {
|
||||
log.debug("Collection {} is not setup to use autoAddReplicas, skipping..", docCollection.getName());
|
||||
continue;
|
||||
}
|
||||
if (docCollection.getReplicationFactor() == null) {
|
||||
log.debug("Skipping collection because it has no defined replicationFactor, name={}", docCollection.getName());
|
||||
continue;
|
||||
}
|
||||
log.debug("Found collection, name={} replicationFactor={}", entry.getKey(), docCollection.getReplicationFactor());
|
||||
|
||||
Collection<Slice> slices = docCollection.getSlices();
|
||||
for (Slice slice : slices) {
|
||||
if (slice.getState() == Slice.State.ACTIVE) {
|
||||
|
||||
final Collection<DownReplica> downReplicas = new ArrayList<DownReplica>();
|
||||
|
||||
int goodReplicas = findDownReplicasInSlice(clusterState, docCollection, slice, downReplicas);
|
||||
|
||||
log.debug("collection={} replicationFactor={} goodReplicaCount={}", docCollection.getName(), docCollection.getReplicationFactor(), goodReplicas);
|
||||
|
||||
if (downReplicas.size() > 0 && goodReplicas < docCollection.getReplicationFactor()) {
|
||||
// badReplicaMap.put(collection, badReplicas);
|
||||
processBadReplicas(entry.getKey(), downReplicas);
|
||||
} else if (goodReplicas > docCollection.getReplicationFactor()) {
|
||||
log.debug("There are too many replicas");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
private void processBadReplicas(final String collection, final Collection<DownReplica> badReplicas) {
|
||||
for (DownReplica badReplica : badReplicas) {
|
||||
log.debug("process down replica={} from collection={}", badReplica.replica.getName(), collection);
|
||||
String baseUrl = badReplica.replica.getStr(ZkStateReader.BASE_URL_PROP);
|
||||
Long wentBadAtNS = baseUrlForBadNodes.getIfPresent(baseUrl);
|
||||
if (wentBadAtNS == null) {
|
||||
log.warn("Replica {} may need to failover.",
|
||||
badReplica.replica.getName());
|
||||
baseUrlForBadNodes.put(baseUrl, System.nanoTime());
|
||||
|
||||
} else {
|
||||
|
||||
long elasped = System.nanoTime() - wentBadAtNS;
|
||||
if (elasped < TimeUnit.NANOSECONDS.convert(waitAfterExpiration, TimeUnit.MILLISECONDS)) {
|
||||
// protect against ZK 'flapping', startup and shutdown
|
||||
log.debug("Looks troublesome...continue. Elapsed={}", elasped + "ns");
|
||||
} else {
|
||||
log.debug("We need to add a replica. Elapsed={}", elasped + "ns");
|
||||
|
||||
if (addReplica(collection, badReplica)) {
|
||||
baseUrlForBadNodes.invalidate(baseUrl);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private boolean addReplica(final String collection, DownReplica badReplica) {
|
||||
// first find best home - first strategy, sort by number of cores
|
||||
// hosted where maxCoresPerNode is not violated
|
||||
final Integer maxCoreCount = zkStateReader.getClusterProperty(ZkStateReader.MAX_CORES_PER_NODE, (Integer) null);
|
||||
final String createUrl = getBestCreateUrl(zkStateReader, badReplica, maxCoreCount);
|
||||
if (createUrl == null) {
|
||||
log.warn("Could not find a node to create new replica on.");
|
||||
return false;
|
||||
}
|
||||
|
||||
// NOTE: we send the absolute path, which will slightly change
|
||||
// behavior of these cores as they won't respond to changes
|
||||
// in the solr.hdfs.home sys prop as they would have.
|
||||
final String dataDir = badReplica.replica.getStr("dataDir");
|
||||
final String ulogDir = badReplica.replica.getStr("ulogDir");
|
||||
final String coreNodeName = badReplica.replica.getName();
|
||||
final String shardId = badReplica.slice.getName();
|
||||
if (dataDir != null) {
|
||||
// need an async request - full shard goes down leader election
|
||||
final String coreName = badReplica.replica.getStr(ZkStateReader.CORE_NAME_PROP);
|
||||
log.debug("submit call to {}", createUrl);
|
||||
MDC.put("OverseerAutoReplicaFailoverThread.createUrl", createUrl);
|
||||
try {
|
||||
updateExecutor.submit(() -> createSolrCore(collection, createUrl, dataDir, ulogDir, coreNodeName, coreName, shardId));
|
||||
} finally {
|
||||
MDC.remove("OverseerAutoReplicaFailoverThread.createUrl");
|
||||
}
|
||||
|
||||
// wait to see state for core we just created
|
||||
boolean success = ClusterStateUtil.waitToSeeLiveReplica(zkStateReader, collection, coreNodeName, createUrl, 30000);
|
||||
if (!success) {
|
||||
log.error("Creating new replica appears to have failed, timed out waiting to see created SolrCore register in the clusterstate.");
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
log.warn("Could not find dataDir or ulogDir in cluster state.");
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
private static int findDownReplicasInSlice(ClusterState clusterState, DocCollection collection, Slice slice, final Collection<DownReplica> badReplicas) {
|
||||
int goodReplicas = 0;
|
||||
Collection<Replica> replicas = slice.getReplicas();
|
||||
if (replicas != null) {
|
||||
for (Replica replica : replicas) {
|
||||
// on a live node?
|
||||
boolean live = clusterState.liveNodesContain(replica.getNodeName());
|
||||
final Replica.State state = replica.getState();
|
||||
|
||||
final boolean okayState = state == Replica.State.DOWN
|
||||
|| state == Replica.State.RECOVERING
|
||||
|| state == Replica.State.ACTIVE;
|
||||
|
||||
log.debug("Process replica name={} live={} state={}", replica.getName(), live, state.toString());
|
||||
|
||||
if (live && okayState) {
|
||||
goodReplicas++;
|
||||
} else {
|
||||
DownReplica badReplica = new DownReplica();
|
||||
badReplica.replica = replica;
|
||||
badReplica.slice = slice;
|
||||
badReplica.collection = collection;
|
||||
badReplicas.add(badReplica);
|
||||
}
|
||||
}
|
||||
}
|
||||
log.debug("bad replicas for slice {}", badReplicas);
|
||||
return goodReplicas;
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* @return the best node to replace the badReplica on or null if there is no
|
||||
* such node
|
||||
*/
|
||||
static String getBestCreateUrl(ZkStateReader zkStateReader, DownReplica badReplica, Integer maxCoreCount) {
|
||||
assert badReplica != null;
|
||||
assert badReplica.collection != null;
|
||||
assert badReplica.slice != null;
|
||||
log.debug("getBestCreateUrl for " + badReplica.replica);
|
||||
Map<String,Counts> counts = new HashMap<>();
|
||||
Set<String> unsuitableHosts = new HashSet<>();
|
||||
|
||||
Set<String> liveNodes = new HashSet<>(zkStateReader.getClusterState().getLiveNodes());
|
||||
Map<String, Integer> coresPerNode = new HashMap<>();
|
||||
|
||||
ClusterState clusterState = zkStateReader.getClusterState();
|
||||
if (clusterState != null) {
|
||||
Map<String, DocCollection> collections = clusterState.getCollectionsMap();
|
||||
for (Map.Entry<String, DocCollection> entry : collections.entrySet()) {
|
||||
String collection = entry.getKey();
|
||||
log.debug("look at collection {} as possible create candidate", collection);
|
||||
DocCollection docCollection = entry.getValue();
|
||||
// TODO - only operate on collections with sharedfs failover = true ??
|
||||
Collection<Slice> slices = docCollection.getSlices();
|
||||
for (Slice slice : slices) {
|
||||
// only look at active shards
|
||||
if (slice.getState() == Slice.State.ACTIVE) {
|
||||
log.debug("look at slice {} for collection {} as possible create candidate", slice.getName(), collection);
|
||||
Collection<Replica> replicas = slice.getReplicas();
|
||||
|
||||
for (Replica replica : replicas) {
|
||||
liveNodes.remove(replica.getNodeName());
|
||||
String baseUrl = replica.getStr(ZkStateReader.BASE_URL_PROP);
|
||||
if (coresPerNode.containsKey(baseUrl)) {
|
||||
Integer nodeCount = coresPerNode.get(baseUrl);
|
||||
coresPerNode.put(baseUrl, nodeCount++);
|
||||
} else {
|
||||
coresPerNode.put(baseUrl, 1);
|
||||
}
|
||||
if (baseUrl.equals(badReplica.replica.getStr(ZkStateReader.BASE_URL_PROP))) {
|
||||
continue;
|
||||
}
|
||||
// on a live node?
|
||||
log.debug("collection={} nodename={} livenodes={}", collection, replica.getNodeName(), clusterState.getLiveNodes());
|
||||
boolean live = clusterState.liveNodesContain(replica.getNodeName());
|
||||
log.debug("collection={} look at replica {} as possible create candidate, live={}", collection, replica.getName(), live);
|
||||
if (live) {
|
||||
Counts cnt = counts.get(baseUrl);
|
||||
if (cnt == null) {
|
||||
cnt = new Counts();
|
||||
}
|
||||
if (badReplica.collection.getName().equals(collection)) {
|
||||
cnt.negRankingWeight += 3;
|
||||
cnt.collectionShardsOnNode += 1;
|
||||
} else {
|
||||
cnt.negRankingWeight += 1;
|
||||
}
|
||||
if (badReplica.collection.getName().equals(collection) && badReplica.slice.getName().equals(slice.getName())) {
|
||||
cnt.ourReplicas++;
|
||||
}
|
||||
|
||||
Integer maxShardsPerNode = badReplica.collection.getMaxShardsPerNode();
|
||||
if (maxShardsPerNode == null) {
|
||||
log.warn("maxShardsPerNode is not defined for collection, name=" + badReplica.collection.getName());
|
||||
maxShardsPerNode = Integer.MAX_VALUE;
|
||||
}
|
||||
log.debug("collection={} node={} maxShardsPerNode={} maxCoresPerNode={} potential hosts={}",
|
||||
collection, baseUrl, maxShardsPerNode, maxCoreCount, cnt);
|
||||
|
||||
Collection<Replica> badSliceReplicas = null;
|
||||
DocCollection c = clusterState.getCollection(badReplica.collection.getName());
|
||||
if (c != null) {
|
||||
Slice s = c.getSlice(badReplica.slice.getName());
|
||||
if (s != null) {
|
||||
badSliceReplicas = s.getReplicas();
|
||||
}
|
||||
}
|
||||
boolean alreadyExistsOnNode = replicaAlreadyExistsOnNode(zkStateReader.getClusterState(), badSliceReplicas, badReplica, baseUrl);
|
||||
if (unsuitableHosts.contains(baseUrl) || alreadyExistsOnNode || cnt.collectionShardsOnNode >= maxShardsPerNode
|
||||
|| (maxCoreCount != null && coresPerNode.get(baseUrl) >= maxCoreCount) ) {
|
||||
counts.remove(baseUrl);
|
||||
unsuitableHosts.add(baseUrl);
|
||||
log.debug("not a candidate node, collection={} node={} max shards per node={} good replicas={}", collection, baseUrl, maxShardsPerNode, cnt);
|
||||
} else {
|
||||
counts.put(baseUrl, cnt);
|
||||
log.debug("is a candidate node, collection={} node={} max shards per node={} good replicas={}", collection, baseUrl, maxShardsPerNode, cnt);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for (String node : liveNodes) {
|
||||
counts.put(zkStateReader.getBaseUrlForNodeName(node), new Counts(0, 0));
|
||||
}
|
||||
|
||||
if (counts.size() == 0) {
|
||||
log.debug("no suitable hosts found for getBestCreateUrl for collection={}", badReplica.collection.getName());
|
||||
return null;
|
||||
}
|
||||
|
||||
ValueComparator vc = new ValueComparator(counts);
|
||||
Map<String,Counts> sortedCounts = new TreeMap<String, Counts>(vc);
|
||||
sortedCounts.putAll(counts);
|
||||
|
||||
log.debug("empty nodes={} for collection={}", liveNodes, badReplica.collection.getName());
|
||||
log.debug("sorted hosts={} for collection={}", sortedCounts, badReplica.collection.getName());
|
||||
log.debug("unsuitable hosts={} for collection={}", unsuitableHosts, badReplica.collection.getName());
|
||||
|
||||
return sortedCounts.keySet().iterator().next();
|
||||
}
|
||||
|
||||
private static boolean replicaAlreadyExistsOnNode(ClusterState clusterState, Collection<Replica> replicas, DownReplica badReplica, String baseUrl) {
|
||||
if (replicas != null) {
|
||||
log.debug("collection={} check if replica already exists on node using replicas {}", badReplica.collection.getName(), getNames(replicas));
|
||||
for (Replica replica : replicas) {
|
||||
final Replica.State state = replica.getState();
|
||||
if (!replica.getName().equals(badReplica.replica.getName()) && replica.getStr(ZkStateReader.BASE_URL_PROP).equals(baseUrl)
|
||||
&& clusterState.liveNodesContain(replica.getNodeName())
|
||||
&& (state == Replica.State.ACTIVE || state == Replica.State.DOWN || state == Replica.State.RECOVERING)) {
|
||||
log.debug("collection={} replica already exists on node, bad replica={}, existing replica={}, node name={}", badReplica.collection.getName(), badReplica.replica.getName(), replica.getName(), replica.getNodeName());
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
log.debug("collection={} replica does not yet exist on node: {}", badReplica.collection.getName(), baseUrl);
|
||||
return false;
|
||||
}
|
||||
|
||||
private static Object getNames(Collection<Replica> replicas) {
|
||||
Set<String> names = new HashSet<>(replicas.size());
|
||||
for (Replica replica : replicas) {
|
||||
names.add(replica.getName());
|
||||
}
|
||||
return names;
|
||||
}
|
||||
|
||||
private boolean createSolrCore(final String collection,
|
||||
final String createUrl, final String dataDir, final String ulogDir,
|
||||
final String coreNodeName, final String coreName, final String shardId) {
|
||||
|
||||
try (HttpSolrClient client = new HttpSolrClient.Builder(createUrl)
|
||||
.withConnectionTimeout(30000)
|
||||
.withSocketTimeout(60000)
|
||||
.build()) {
|
||||
log.debug("create url={}", createUrl);
|
||||
Create createCmd = new Create();
|
||||
createCmd.setCollection(collection);
|
||||
createCmd.setCoreNodeName(coreNodeName);
|
||||
// TODO: how do we ensure unique coreName
|
||||
// for now, the collections API will use unique names
|
||||
createCmd.setShardId(shardId);
|
||||
createCmd.setCoreName(coreName);
|
||||
createCmd.setDataDir(dataDir);
|
||||
createCmd.setUlogDir(ulogDir.substring(0, ulogDir.length() - "/tlog".length()));
|
||||
client.request(createCmd);
|
||||
} catch (Exception e) {
|
||||
SolrException.log(log, "Exception trying to create new replica on " + createUrl, e);
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
private static class ValueComparator implements Comparator<String> {
|
||||
Map<String,Counts> map;
|
||||
|
||||
public ValueComparator(Map<String,Counts> map) {
|
||||
this.map = map;
|
||||
}
|
||||
|
||||
public int compare(String a, String b) {
|
||||
if (map.get(a).negRankingWeight >= map.get(b).negRankingWeight) {
|
||||
return 1;
|
||||
} else {
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
isClosed = true;
|
||||
Thread lThread = thread;
|
||||
if (lThread != null) {
|
||||
lThread.interrupt();
|
||||
}
|
||||
}
|
||||
|
||||
public boolean isClosed() {
|
||||
return isClosed;
|
||||
}
|
||||
|
||||
|
||||
private static class Counts {
|
||||
int collectionShardsOnNode = 0;
|
||||
int negRankingWeight = 0;
|
||||
int ourReplicas = 0;
|
||||
|
||||
private Counts() {
|
||||
|
||||
}
|
||||
|
||||
private Counts(int totalReplicas, int ourReplicas) {
|
||||
this.negRankingWeight = totalReplicas;
|
||||
this.ourReplicas = ourReplicas;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "Counts [negRankingWeight=" + negRankingWeight + ", sameSliceCount="
|
||||
+ ourReplicas + ", collectionShardsOnNode=" + collectionShardsOnNode + "]";
|
||||
}
|
||||
}
|
||||
|
||||
static class DownReplica {
|
||||
Replica replica;
|
||||
Slice slice;
|
||||
DocCollection collection;
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "DownReplica [replica=" + replica.getName() + ", slice="
|
||||
+ slice.getName() + ", collection=" + collection.getName() + "]";
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -41,9 +41,6 @@ import org.apache.solr.client.solrj.impl.HttpSolrClient.RemoteSolrException;
|
|||
import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
|
||||
import org.apache.solr.client.solrj.request.UpdateRequest;
|
||||
import org.apache.solr.client.solrj.response.UpdateResponse;
|
||||
import org.apache.solr.cloud.autoscaling.AutoScaling;
|
||||
import org.apache.solr.cloud.autoscaling.AutoScalingHandler;
|
||||
import org.apache.solr.client.solrj.cloud.autoscaling.Policy;
|
||||
import org.apache.solr.cloud.overseer.OverseerAction;
|
||||
import org.apache.solr.common.SolrException;
|
||||
import org.apache.solr.common.SolrException.ErrorCode;
|
||||
|
@ -62,8 +59,6 @@ import org.apache.solr.common.params.CollectionParams.CollectionAction;
|
|||
import org.apache.solr.common.params.CoreAdminParams;
|
||||
import org.apache.solr.common.params.CoreAdminParams.CoreAdminAction;
|
||||
import org.apache.solr.common.params.ModifiableSolrParams;
|
||||
import org.apache.solr.common.params.SolrParams;
|
||||
import org.apache.solr.common.util.ContentStreamBase;
|
||||
import org.apache.solr.common.util.ExecutorUtil;
|
||||
import org.apache.solr.common.util.NamedList;
|
||||
import org.apache.solr.common.util.SimpleOrderedMap;
|
||||
|
@ -74,10 +69,6 @@ import org.apache.solr.handler.component.ShardHandler;
|
|||
import org.apache.solr.handler.component.ShardHandlerFactory;
|
||||
import org.apache.solr.handler.component.ShardRequest;
|
||||
import org.apache.solr.handler.component.ShardResponse;
|
||||
import org.apache.solr.request.LocalSolrQueryRequest;
|
||||
import org.apache.solr.request.SolrQueryRequest;
|
||||
import org.apache.solr.request.SolrRequestInfo;
|
||||
import org.apache.solr.response.SolrQueryResponse;
|
||||
import org.apache.solr.util.DefaultSolrThreadFactory;
|
||||
import org.apache.solr.util.RTimer;
|
||||
import org.apache.solr.util.TimeOut;
|
||||
|
@ -678,10 +669,6 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
|
|||
Thread.sleep(100);
|
||||
}
|
||||
|
||||
if (message.getBool(ZkStateReader.AUTO_ADD_REPLICAS, false)) {
|
||||
forwardToAutoScaling(AutoScaling.AUTO_ADD_REPLICAS_TRIGGER_DSL);
|
||||
}
|
||||
|
||||
if (!areChangesVisible)
|
||||
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Could not modify collection " + message);
|
||||
}
|
||||
|
@ -881,17 +868,6 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
|
|||
}
|
||||
}
|
||||
|
||||
void forwardToAutoScaling(String command) {
|
||||
LocalSolrQueryRequest request = new LocalSolrQueryRequest(null, new ModifiableSolrParams());
|
||||
request.getContext().put("httpMethod", "POST");
|
||||
request.setContentStreams(Collections.singleton(new ContentStreamBase.StringStream(command)));
|
||||
SolrQueryResponse response = new SolrQueryResponse();
|
||||
overseer.getZkController().getCoreContainer().getRequestHandler(AutoScalingHandler.HANDLER_PATH).handleRequest(request, response);
|
||||
if (!"success".equals(response.getValues().get("result"))) {
|
||||
throw new SolrException(ErrorCode.SERVER_ERROR, "Failed when execute command on autoScalingHandler, return " + response);
|
||||
}
|
||||
}
|
||||
|
||||
private NamedList waitForCoreAdminAsyncCallToComplete(String nodeName, String requestId) {
|
||||
ShardHandler shardHandler = shardHandlerFactory.getShardHandler();
|
||||
ModifiableSolrParams params = new ModifiableSolrParams();
|
||||
|
|
|
@ -1482,6 +1482,7 @@ public class ZkController {
|
|||
if (msgNodeName.equals(nodeName) && core.equals(msgCore)) {
|
||||
descriptor.getCloudDescriptor()
|
||||
.setCoreNodeName(replica.getName());
|
||||
getCoreContainer().getCoresLocator().persist(getCoreContainer(), descriptor);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -17,35 +17,35 @@
|
|||
|
||||
package org.apache.solr.cloud.autoscaling;
|
||||
|
||||
import java.util.HashSet;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.solr.client.solrj.cloud.autoscaling.NoneSuggester;
|
||||
import org.apache.solr.client.solrj.cloud.autoscaling.Policy;
|
||||
import org.apache.solr.common.cloud.ClusterState;
|
||||
import org.apache.solr.common.cloud.DocCollection;
|
||||
import org.apache.solr.common.cloud.ZkStateReader;
|
||||
|
||||
public class AutoAddReplicasPlanAction extends ComputePlanAction {
|
||||
Set<String> autoAddReplicasCollections;
|
||||
|
||||
@Override
|
||||
protected Policy.Suggester getSuggester(Policy.Session session, TriggerEvent event, ZkStateReader zkStateReader) {
|
||||
Policy.Suggester suggester = super.getSuggester(session, event, zkStateReader);
|
||||
if (autoAddReplicasCollections == null) {
|
||||
autoAddReplicasCollections = new HashSet<>();
|
||||
// for backward compatibility
|
||||
String autoAddReplicas = zkStateReader.getClusterProperty(ZkStateReader.AUTO_ADD_REPLICAS, (String) null);
|
||||
if (autoAddReplicas != null && autoAddReplicas.equals("false")) {
|
||||
return new NoneSuggester();
|
||||
}
|
||||
|
||||
ClusterState clusterState = zkStateReader.getClusterState();
|
||||
for (DocCollection collection: clusterState.getCollectionsMap().values()) {
|
||||
if (collection.getAutoAddReplicas()) {
|
||||
autoAddReplicasCollections.add(collection.getName());
|
||||
}
|
||||
Policy.Suggester suggester = super.getSuggester(session, event, zkStateReader);
|
||||
ClusterState clusterState = zkStateReader.getClusterState();
|
||||
|
||||
boolean anyCollections = false;
|
||||
for (DocCollection collection: clusterState.getCollectionsMap().values()) {
|
||||
if (collection.getAutoAddReplicas()) {
|
||||
anyCollections = true;
|
||||
suggester.hint(Policy.Suggester.Hint.COLL, collection.getName());
|
||||
}
|
||||
}
|
||||
|
||||
for (String collection : autoAddReplicasCollections) {
|
||||
suggester.hint(Policy.Suggester.Hint.COLL, collection);
|
||||
}
|
||||
|
||||
if (!anyCollections) return new NoneSuggester();
|
||||
return suggester;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -148,7 +148,7 @@ public class AutoScaling {
|
|||
" 'set-trigger' : {" +
|
||||
" 'name' : '.auto_add_replicas'," +
|
||||
" 'event' : 'nodeLost'," +
|
||||
" 'waitFor' : '5s'," +
|
||||
" 'waitFor' : '{{waitFor}}s'," +
|
||||
" 'enabled' : true," +
|
||||
" 'actions' : [" +
|
||||
" {" +
|
||||
|
|
|
@ -37,6 +37,7 @@ import org.apache.solr.common.cloud.Replica;
|
|||
import org.apache.solr.common.cloud.Slice;
|
||||
import org.apache.solr.common.cloud.ZkNodeProps;
|
||||
import org.apache.solr.common.cloud.ZkStateReader;
|
||||
import org.apache.solr.common.params.CoreAdminParams;
|
||||
import org.apache.solr.common.util.Utils;
|
||||
import org.apache.zookeeper.data.Stat;
|
||||
import org.slf4j.Logger;
|
||||
|
|
|
@ -38,10 +38,6 @@ public class CloudConfig {
|
|||
|
||||
private final int autoReplicaFailoverWaitAfterExpiration;
|
||||
|
||||
private final int autoReplicaFailoverWorkLoopDelay;
|
||||
|
||||
private final int autoReplicaFailoverBadNodeExpiration;
|
||||
|
||||
private final String zkCredentialsProviderClass;
|
||||
|
||||
private final String zkACLProviderClass;
|
||||
|
@ -51,9 +47,9 @@ public class CloudConfig {
|
|||
private final boolean createCollectionCheckLeaderActive;
|
||||
|
||||
CloudConfig(String zkHost, int zkClientTimeout, int hostPort, String hostName, String hostContext, boolean useGenericCoreNames,
|
||||
int leaderVoteWait, int leaderConflictResolveWait, int autoReplicaFailoverWaitAfterExpiration,
|
||||
int autoReplicaFailoverWorkLoopDelay, int autoReplicaFailoverBadNodeExpiration, String zkCredentialsProviderClass,
|
||||
String zkACLProviderClass, int createCollectionWaitTimeTillActive, boolean createCollectionCheckLeaderActive) {
|
||||
int leaderVoteWait, int leaderConflictResolveWait, int autoReplicaFailoverWaitAfterExpiration,
|
||||
String zkCredentialsProviderClass, String zkACLProviderClass, int createCollectionWaitTimeTillActive,
|
||||
boolean createCollectionCheckLeaderActive) {
|
||||
this.zkHost = zkHost;
|
||||
this.zkClientTimeout = zkClientTimeout;
|
||||
this.hostPort = hostPort;
|
||||
|
@ -63,8 +59,6 @@ public class CloudConfig {
|
|||
this.leaderVoteWait = leaderVoteWait;
|
||||
this.leaderConflictResolveWait = leaderConflictResolveWait;
|
||||
this.autoReplicaFailoverWaitAfterExpiration = autoReplicaFailoverWaitAfterExpiration;
|
||||
this.autoReplicaFailoverWorkLoopDelay = autoReplicaFailoverWorkLoopDelay;
|
||||
this.autoReplicaFailoverBadNodeExpiration = autoReplicaFailoverBadNodeExpiration;
|
||||
this.zkCredentialsProviderClass = zkCredentialsProviderClass;
|
||||
this.zkACLProviderClass = zkACLProviderClass;
|
||||
this.createCollectionWaitTimeTillActive = createCollectionWaitTimeTillActive;
|
||||
|
@ -116,14 +110,6 @@ public class CloudConfig {
|
|||
return autoReplicaFailoverWaitAfterExpiration;
|
||||
}
|
||||
|
||||
public int getAutoReplicaFailoverWorkLoopDelay() {
|
||||
return autoReplicaFailoverWorkLoopDelay;
|
||||
}
|
||||
|
||||
public int getAutoReplicaFailoverBadNodeExpiration() {
|
||||
return autoReplicaFailoverBadNodeExpiration;
|
||||
}
|
||||
|
||||
public boolean getGenericCoreNodeNames() {
|
||||
return useGenericCoreNames;
|
||||
}
|
||||
|
@ -146,8 +132,6 @@ public class CloudConfig {
|
|||
|
||||
// TODO: tune defaults
|
||||
private static final int DEFAULT_AUTO_REPLICA_FAILOVER_WAIT_AFTER_EXPIRATION = 30000;
|
||||
private static final int DEFAULT_AUTO_REPLICA_FAILOVER_WORKLOOP_DELAY = 10000;
|
||||
private static final int DEFAULT_AUTO_REPLICA_FAILOVER_BAD_NODE_EXPIRATION = 60000;
|
||||
|
||||
private String zkHost = System.getProperty("zkHost");
|
||||
private int zkClientTimeout = Integer.getInteger("zkClientTimeout", DEFAULT_ZK_CLIENT_TIMEOUT);
|
||||
|
@ -158,8 +142,6 @@ public class CloudConfig {
|
|||
private int leaderVoteWait = DEFAULT_LEADER_VOTE_WAIT;
|
||||
private int leaderConflictResolveWait = DEFAULT_LEADER_CONFLICT_RESOLVE_WAIT;
|
||||
private int autoReplicaFailoverWaitAfterExpiration = DEFAULT_AUTO_REPLICA_FAILOVER_WAIT_AFTER_EXPIRATION;
|
||||
private int autoReplicaFailoverWorkLoopDelay = DEFAULT_AUTO_REPLICA_FAILOVER_WORKLOOP_DELAY;
|
||||
private int autoReplicaFailoverBadNodeExpiration = DEFAULT_AUTO_REPLICA_FAILOVER_BAD_NODE_EXPIRATION;
|
||||
private String zkCredentialsProviderClass;
|
||||
private String zkACLProviderClass;
|
||||
private int createCollectionWaitTimeTillActive = DEFAULT_CREATE_COLLECTION_ACTIVE_WAIT;
|
||||
|
@ -205,16 +187,6 @@ public class CloudConfig {
|
|||
return this;
|
||||
}
|
||||
|
||||
public CloudConfigBuilder setAutoReplicaFailoverWorkLoopDelay(int autoReplicaFailoverWorkLoopDelay) {
|
||||
this.autoReplicaFailoverWorkLoopDelay = autoReplicaFailoverWorkLoopDelay;
|
||||
return this;
|
||||
}
|
||||
|
||||
public CloudConfigBuilder setAutoReplicaFailoverBadNodeExpiration(int autoReplicaFailoverBadNodeExpiration) {
|
||||
this.autoReplicaFailoverBadNodeExpiration = autoReplicaFailoverBadNodeExpiration;
|
||||
return this;
|
||||
}
|
||||
|
||||
public CloudConfigBuilder setZkCredentialsProviderClass(String zkCredentialsProviderClass) {
|
||||
this.zkCredentialsProviderClass = zkCredentialsProviderClass;
|
||||
return this;
|
||||
|
@ -237,8 +209,7 @@ public class CloudConfig {
|
|||
|
||||
public CloudConfig build() {
|
||||
return new CloudConfig(zkHost, zkClientTimeout, hostPort, hostName, hostContext, useGenericCoreNames, leaderVoteWait,
|
||||
leaderConflictResolveWait, autoReplicaFailoverWaitAfterExpiration, autoReplicaFailoverWorkLoopDelay,
|
||||
autoReplicaFailoverBadNodeExpiration, zkCredentialsProviderClass, zkACLProviderClass, createCollectionWaitTimeTillActive,
|
||||
leaderConflictResolveWait, autoReplicaFailoverWaitAfterExpiration, zkCredentialsProviderClass, zkACLProviderClass, createCollectionWaitTimeTillActive,
|
||||
createCollectionCheckLeaderActive);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -101,6 +101,7 @@ import org.apache.solr.security.HttpClientBuilderPlugin;
|
|||
import org.apache.solr.security.PKIAuthenticationPlugin;
|
||||
import org.apache.solr.security.SecurityPluginHolder;
|
||||
import org.apache.solr.update.SolrCoreState;
|
||||
import org.apache.solr.update.UpdateLog;
|
||||
import org.apache.solr.update.UpdateShardHandler;
|
||||
import org.apache.solr.util.DefaultSolrThreadFactory;
|
||||
import org.apache.solr.util.stats.MetricUtils;
|
||||
|
@ -965,7 +966,7 @@ public class CoreContainer {
|
|||
zkSys.getZkController().preRegister(dcore);
|
||||
}
|
||||
|
||||
ConfigSet coreConfig = coreConfigService.getConfig(dcore);
|
||||
ConfigSet coreConfig = getConfigSet(dcore);
|
||||
dcore.setConfigSetTrusted(coreConfig.isTrusted());
|
||||
log.info("Creating SolrCore '{}' using configuration from {}, trusted={}", dcore.getName(), coreConfig.getName(), dcore.isConfigSetTrusted());
|
||||
try {
|
||||
|
@ -1000,6 +1001,21 @@ public class CoreContainer {
|
|||
MDCLoggingContext.clear();
|
||||
}
|
||||
}
|
||||
|
||||
public boolean isSharedFs(CoreDescriptor cd) {
|
||||
try (SolrCore core = this.getCore(cd.getName())) {
|
||||
if (core != null) {
|
||||
return core.getDirectoryFactory().isSharedStorage();
|
||||
} else {
|
||||
ConfigSet configSet = getConfigSet(cd);
|
||||
return DirectoryFactory.loadDirectoryFactory(configSet.getSolrConfig(), this, null).isSharedStorage();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private ConfigSet getConfigSet(CoreDescriptor cd) {
|
||||
return coreConfigService.getConfig(cd);
|
||||
}
|
||||
|
||||
/**
|
||||
* Take action when we failed to create a SolrCore. If error is due to corrupt index, try to recover. Various recovery
|
||||
|
|
|
@ -376,15 +376,13 @@ public class SolrXmlConfig {
|
|||
case "zkClientTimeout":
|
||||
builder.setZkClientTimeout(parseInt(name, value));
|
||||
break;
|
||||
case "autoReplicaFailoverBadNodeExpiration":
|
||||
builder.setAutoReplicaFailoverBadNodeExpiration(parseInt(name, value));
|
||||
case "autoReplicaFailoverBadNodeExpiration": case "autoReplicaFailoverWorkLoopDelay":
|
||||
//TODO remove this in Solr 8.0
|
||||
log.info("Configuration parameter " + name + " is ignored");
|
||||
break;
|
||||
case "autoReplicaFailoverWaitAfterExpiration":
|
||||
builder.setAutoReplicaFailoverWaitAfterExpiration(parseInt(name, value));
|
||||
break;
|
||||
case "autoReplicaFailoverWorkLoopDelay":
|
||||
builder.setAutoReplicaFailoverWorkLoopDelay(parseInt(name, value));
|
||||
break;
|
||||
case "zkHost":
|
||||
builder.setZkHost(value);
|
||||
break;
|
||||
|
|
|
@ -18,13 +18,10 @@
|
|||
package org.apache.solr.cloud;
|
||||
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.io.UnsupportedEncodingException;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
|
@ -40,22 +37,12 @@ public class ClusterStateMockUtil {
|
|||
|
||||
private final static Pattern BLUEPRINT = Pattern.compile("([a-z])(\\d+)?(?:(['A','R','D','F']))?(\\*)?");
|
||||
|
||||
protected static class Result implements Closeable {
|
||||
OverseerAutoReplicaFailoverThread.DownReplica badReplica;
|
||||
ZkStateReader reader;
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
reader.close();
|
||||
}
|
||||
protected static ZkStateReader buildClusterState(String string, String ... liveNodes) {
|
||||
return buildClusterState(string, 1, liveNodes);
|
||||
}
|
||||
|
||||
protected static ClusterStateMockUtil.Result buildClusterState(List<Result> results, String string, String ... liveNodes) {
|
||||
return buildClusterState(results, string, 1, liveNodes);
|
||||
}
|
||||
|
||||
protected static ClusterStateMockUtil.Result buildClusterState(List<Result> results, String string, int replicationFactor, String ... liveNodes) {
|
||||
return buildClusterState(results, string, replicationFactor, 10, liveNodes);
|
||||
protected static ZkStateReader buildClusterState(String string, int replicationFactor, String ... liveNodes) {
|
||||
return buildClusterState(string, replicationFactor, 10, liveNodes);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -118,9 +105,7 @@ public class ClusterStateMockUtil {
|
|||
*
|
||||
*/
|
||||
@SuppressWarnings("resource")
|
||||
protected static ClusterStateMockUtil.Result buildClusterState(List<Result> results, String clusterDescription, int replicationFactor, int maxShardsPerNode, String ... liveNodes) {
|
||||
ClusterStateMockUtil.Result result = new ClusterStateMockUtil.Result();
|
||||
|
||||
protected static ZkStateReader buildClusterState(String clusterDescription, int replicationFactor, int maxShardsPerNode, String ... liveNodes) {
|
||||
Map<String,Slice> slices = null;
|
||||
Map<String,Replica> replicas = null;
|
||||
Map<String,Object> collectionProps = new HashMap<>();
|
||||
|
@ -181,23 +166,12 @@ public class ClusterStateMockUtil {
|
|||
String nodeName = "baseUrl" + node + "_";
|
||||
String replicaName = "replica" + replicaCount++;
|
||||
|
||||
if ("*".equals(m.group(4))) {
|
||||
replicaName += " (bad)";
|
||||
}
|
||||
|
||||
replicaPropMap.put(ZkStateReader.NODE_NAME_PROP, nodeName);
|
||||
replicaPropMap.put(ZkStateReader.BASE_URL_PROP, "http://baseUrl" + node);
|
||||
replicaPropMap.put(ZkStateReader.STATE_PROP, state.toString());
|
||||
|
||||
replica = new Replica(replicaName, replicaPropMap);
|
||||
|
||||
if ("*".equals(m.group(4))) {
|
||||
result.badReplica = new OverseerAutoReplicaFailoverThread.DownReplica();
|
||||
result.badReplica.replica = replica;
|
||||
result.badReplica.slice = slice;
|
||||
result.badReplica.collection = docCollection;
|
||||
}
|
||||
|
||||
replicas.put(replica.getName(), replica);
|
||||
break;
|
||||
default:
|
||||
|
@ -216,17 +190,7 @@ public class ClusterStateMockUtil {
|
|||
}
|
||||
System.err.println(json);
|
||||
|
||||
// todo remove the limitation of always having a bad replica
|
||||
assert result.badReplica != null : "Is there no bad replica?";
|
||||
assert result.badReplica.slice != null : "Is there no bad replica?";
|
||||
|
||||
result.reader = reader;
|
||||
|
||||
if (results != null) {
|
||||
results.add(result);
|
||||
}
|
||||
|
||||
return result;
|
||||
return reader;
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -42,7 +42,7 @@ import org.junit.Test;
|
|||
BadHdfsThreadsFilter.class, // hdfs currently leaks thread(s)
|
||||
MoveReplicaHDFSTest.ForkJoinThreadsFilter.class
|
||||
})
|
||||
public class MoveReplicaHDFSUlogDirTest extends SolrCloudTestCase {
|
||||
public class MoveReplicaHDFSFailoverTest extends SolrCloudTestCase {
|
||||
private static MiniDFSCluster dfsCluster;
|
||||
|
||||
@BeforeClass
|
||||
|
@ -128,6 +128,71 @@ public class MoveReplicaHDFSUlogDirTest extends SolrCloudTestCase {
|
|||
|
||||
cluster.getSolrClient().commit(coll);
|
||||
assertEquals(numDocs, cluster.getSolrClient().query(coll, new SolrQuery("*:*")).getResults().getNumFound());
|
||||
CollectionAdminRequest.deleteCollection(coll).process(cluster.getSolrClient());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testOldReplicaIsDeleted() throws Exception {
|
||||
String coll = "movereplicatest_coll3";
|
||||
CollectionAdminRequest.createCollection(coll, "conf1", 1, 1)
|
||||
.setCreateNodeSet(cluster.getJettySolrRunner(0).getNodeName())
|
||||
.process(cluster.getSolrClient());
|
||||
addDocs(coll, 2);
|
||||
Replica replica = getCollectionState(coll).getReplicas().iterator().next();
|
||||
|
||||
cluster.getJettySolrRunners().get(0).stop();
|
||||
assertTrue(ClusterStateUtil.waitForAllReplicasNotLive(cluster.getSolrClient().getZkStateReader(), 20000));
|
||||
|
||||
// move replica from node0 -> node1
|
||||
new CollectionAdminRequest.MoveReplica(coll, replica.getName(), cluster.getJettySolrRunner(1).getNodeName())
|
||||
.process(cluster.getSolrClient());
|
||||
assertTrue(ClusterStateUtil.waitForAllActiveAndLiveReplicas(cluster.getSolrClient().getZkStateReader(), 20000));
|
||||
|
||||
cluster.getJettySolrRunners().get(1).stop();
|
||||
assertTrue(ClusterStateUtil.waitForAllReplicasNotLive(cluster.getSolrClient().getZkStateReader(), 20000));
|
||||
|
||||
// node0 will delete it replica because of CloudUtil.checkSharedFSFailoverReplaced()
|
||||
cluster.getJettySolrRunners().get(0).start();
|
||||
Thread.sleep(5000);
|
||||
assertTrue(ClusterStateUtil.waitForAllReplicasNotLive(cluster.getSolrClient().getZkStateReader(), 20000));
|
||||
|
||||
cluster.getJettySolrRunners().get(1).start();
|
||||
assertTrue(ClusterStateUtil.waitForAllActiveAndLiveReplicas(cluster.getSolrClient().getZkStateReader(), 20000));
|
||||
|
||||
assertEquals(1, getCollectionState(coll).getReplicas().size());
|
||||
assertEquals(2, cluster.getSolrClient().query(coll, new SolrQuery("*:*")).getResults().getNumFound());
|
||||
CollectionAdminRequest.deleteCollection(coll).process(cluster.getSolrClient());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testOldReplicaIsDeletedInRaceCondition() throws Exception {
|
||||
String coll = "movereplicatest_coll4";
|
||||
CollectionAdminRequest.createCollection(coll, "conf1", 1, 1)
|
||||
.setCreateNodeSet(cluster.getJettySolrRunner(0).getNodeName())
|
||||
.process(cluster.getSolrClient());
|
||||
addDocs(coll, 100);
|
||||
Replica replica = getCollectionState(coll).getReplicas().iterator().next();
|
||||
|
||||
cluster.getJettySolrRunners().get(0).stop();
|
||||
assertTrue(ClusterStateUtil.waitForAllReplicasNotLive(cluster.getSolrClient().getZkStateReader(), 20000));
|
||||
|
||||
// move replica from node0 -> node1
|
||||
new CollectionAdminRequest.MoveReplica(coll, replica.getName(), cluster.getJettySolrRunner(1).getNodeName())
|
||||
.process(cluster.getSolrClient());
|
||||
assertTrue(ClusterStateUtil.waitForAllActiveAndLiveReplicas(cluster.getSolrClient().getZkStateReader(), 20000));
|
||||
|
||||
cluster.getJettySolrRunners().get(1).stop();
|
||||
assertTrue(ClusterStateUtil.waitForAllReplicasNotLive(cluster.getSolrClient().getZkStateReader(), 20000));
|
||||
|
||||
cluster.getJettySolrRunners().get(1).start();
|
||||
// node0 will delete it replica because of CloudUtil.checkSharedFSFailoverReplaced()
|
||||
cluster.getJettySolrRunners().get(0).start();
|
||||
Thread.sleep(5000);
|
||||
assertTrue(ClusterStateUtil.waitForAllActiveAndLiveReplicas(cluster.getSolrClient().getZkStateReader(), 20000));
|
||||
|
||||
assertEquals(1, getCollectionState(coll).getReplicas().size());
|
||||
assertEquals(100, cluster.getSolrClient().query(coll, new SolrQuery("*:*")).getResults().getNumFound());
|
||||
CollectionAdminRequest.deleteCollection(coll).process(cluster.getSolrClient());
|
||||
}
|
||||
|
||||
private void addDocs(String collection, int numDocs) throws SolrServerException, IOException {
|
|
@ -42,31 +42,30 @@ public class NodeMutatorTest extends SolrTestCaseJ4Test {
|
|||
@Test
|
||||
public void downNodeReportsAllImpactedCollectionsAndNothingElse() throws IOException {
|
||||
NodeMutator nm = new NodeMutator();
|
||||
ZkNodeProps props = new ZkNodeProps(ZkStateReader.NODE_NAME_PROP, NODE1);
|
||||
|
||||
//We use 2 nodes with maxShardsPerNode as 1
|
||||
//Collection1: 2 shards X 1 replica = replica1 on node1 and replica2 on node2
|
||||
//Collection2: 1 shard X 1 replica = replica1 on node2
|
||||
ClusterStateMockUtil.Result result = ClusterStateMockUtil.buildClusterState(null, "csrr2rD*csr2", 1, 1, NODE1, NODE2);
|
||||
ClusterState clusterState = result.reader.getClusterState();
|
||||
ZkStateReader reader = ClusterStateMockUtil.buildClusterState("csrr2rDcsr2", 1, 1, NODE1, NODE2);
|
||||
ClusterState clusterState = reader.getClusterState();
|
||||
assertEquals(clusterState.getCollection("collection1").getReplica("replica1").getBaseUrl(), NODE1_URL);
|
||||
assertEquals(clusterState.getCollection("collection1").getReplica("replica2").getBaseUrl(), NODE2_URL);
|
||||
assertEquals(clusterState.getCollection("collection2").getReplica("replica4").getBaseUrl(), NODE2_URL);
|
||||
|
||||
props = new ZkNodeProps(ZkStateReader.NODE_NAME_PROP, NODE1);
|
||||
ZkNodeProps props = new ZkNodeProps(ZkStateReader.NODE_NAME_PROP, NODE1);
|
||||
List<ZkWriteCommand> writes = nm.downNode(clusterState, props);
|
||||
assertEquals(writes.size(), 1);
|
||||
assertEquals(writes.get(0).name, "collection1");
|
||||
assertEquals(writes.get(0).collection.getReplica("replica1").getState(), Replica.State.DOWN);
|
||||
assertEquals(writes.get(0).collection.getReplica("replica2").getState(), Replica.State.ACTIVE);
|
||||
result.close();
|
||||
reader.close();
|
||||
|
||||
//We use 3 nodes with maxShardsPerNode as 1
|
||||
//Collection1: 2 shards X 1 replica = replica1 on node1 and replica2 on node2
|
||||
//Collection2: 1 shard X 1 replica = replica1 on node2
|
||||
//Collection3: 1 shard X 3 replica = replica1 on node1 , replica2 on node2, replica3 on node3
|
||||
result = ClusterStateMockUtil.buildClusterState(null, "csrr2rD*csr2csr1r2r3", 1, 1, NODE1, NODE2, NODE3);
|
||||
clusterState = result.reader.getClusterState();
|
||||
reader = ClusterStateMockUtil.buildClusterState("csrr2rDcsr2csr1r2r3", 1, 1, NODE1, NODE2, NODE3);
|
||||
clusterState = reader.getClusterState();
|
||||
assertEquals(clusterState.getCollection("collection1").getReplica("replica1").getBaseUrl(), NODE1_URL);
|
||||
assertEquals(clusterState.getCollection("collection1").getReplica("replica2").getBaseUrl(), NODE2_URL);
|
||||
|
||||
|
@ -90,6 +89,6 @@ public class NodeMutatorTest extends SolrTestCaseJ4Test {
|
|||
fail("No other collection needs to be changed");
|
||||
}
|
||||
}
|
||||
result.close();
|
||||
reader.close();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -16,12 +16,11 @@
|
|||
*/
|
||||
package org.apache.solr.cloud;
|
||||
|
||||
import static org.apache.solr.common.util.Utils.makeMap;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CompletionService;
|
||||
|
@ -30,33 +29,35 @@ import java.util.concurrent.Future;
|
|||
import java.util.concurrent.SynchronousQueue;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.lucene.util.LuceneTestCase.Slow;
|
||||
import com.carrotsearch.randomizedtesting.annotations.Nightly;
|
||||
import org.apache.solr.SolrTestCaseJ4.SuppressSSL;
|
||||
import org.apache.solr.client.solrj.SolrQuery;
|
||||
import org.apache.solr.client.solrj.SolrRequest;
|
||||
import org.apache.solr.client.solrj.SolrServerException;
|
||||
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
|
||||
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
|
||||
import org.apache.solr.client.solrj.request.CollectionAdminRequest.Create;
|
||||
import org.apache.solr.client.solrj.request.QueryRequest;
|
||||
import org.apache.solr.client.solrj.response.CollectionAdminResponse;
|
||||
import org.apache.solr.client.solrj.response.QueryResponse;
|
||||
import org.apache.solr.cloud.hdfs.HdfsTestUtil;
|
||||
import org.apache.solr.common.SolrException;
|
||||
import org.apache.solr.common.SolrInputDocument;
|
||||
import org.apache.solr.common.cloud.ClusterState;
|
||||
import org.apache.solr.common.cloud.ClusterStateUtil;
|
||||
import org.apache.solr.common.cloud.DocCollection;
|
||||
import org.apache.solr.common.cloud.Replica;
|
||||
import org.apache.solr.common.cloud.Slice;
|
||||
import org.apache.solr.common.cloud.ZkStateReader;
|
||||
import org.apache.solr.common.params.CollectionParams;
|
||||
import org.apache.solr.common.params.MapSolrParams;
|
||||
import org.apache.solr.common.params.SolrParams;
|
||||
import org.apache.solr.common.util.ExecutorUtil;
|
||||
import org.apache.solr.core.CoreDescriptor;
|
||||
import org.apache.solr.update.DirectUpdateHandler2;
|
||||
import org.apache.solr.util.DefaultSolrThreadFactory;
|
||||
import org.apache.solr.util.BadHdfsThreadsFilter;
|
||||
import org.apache.solr.util.TimeOut;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
|
@ -71,7 +72,7 @@ import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
|
|||
BadHdfsThreadsFilter.class // hdfs currently leaks thread(s)
|
||||
})
|
||||
public class SharedFSAutoReplicaFailoverTest extends AbstractFullDistribZkTestBase {
|
||||
|
||||
|
||||
private static final boolean DEBUG = true;
|
||||
private static MiniDFSCluster dfsCluster;
|
||||
|
||||
|
@ -210,9 +211,7 @@ public class SharedFSAutoReplicaFailoverTest extends AbstractFullDistribZkTestBa
|
|||
|
||||
Thread.sleep(5000);
|
||||
|
||||
assertTrue("Timeout waiting for all live and active", ClusterStateUtil.waitForAllActiveAndLiveReplicas(cloudClient.getZkStateReader(), collection1, 120000));
|
||||
|
||||
assertSliceAndReplicaCount(collection1);
|
||||
assertSliceAndReplicaCount(collection1, 2, 2, 120000);
|
||||
|
||||
assertEquals(4, ClusterStateUtil.getLiveAndActiveReplicaCount(cloudClient.getZkStateReader(), collection1));
|
||||
assertTrue(ClusterStateUtil.getLiveAndActiveReplicaCount(cloudClient.getZkStateReader(), collection2) < 4);
|
||||
|
@ -225,7 +224,7 @@ public class SharedFSAutoReplicaFailoverTest extends AbstractFullDistribZkTestBa
|
|||
|
||||
// all docs should be queried after failover
|
||||
cloudClient.commit(); // to query all docs
|
||||
assertSingleReplicationAndShardSize(collection4, 5);
|
||||
assertSliceAndReplicaCount(collection4, 5, 1, 120000);
|
||||
queryAndAssertResultSize(collection4, numDocs, 10000);
|
||||
|
||||
// collection1 should still be at 4
|
||||
|
@ -235,21 +234,21 @@ public class SharedFSAutoReplicaFailoverTest extends AbstractFullDistribZkTestBa
|
|||
|
||||
assertUlogDir(collections);
|
||||
|
||||
ChaosMonkey.stop(jettys);
|
||||
//TODO nocommit, we must test overseer failover
|
||||
List<JettySolrRunner> stoppedJetties = notOverseerJetties();
|
||||
ChaosMonkey.stop(stoppedJetties);
|
||||
ChaosMonkey.stop(controlJetty);
|
||||
|
||||
assertTrue("Timeout waiting for all not live", ClusterStateUtil.waitForAllReplicasNotLive(cloudClient.getZkStateReader(), 45000));
|
||||
assertTrue("Timeout waiting for all not live", waitingForReplicasNotLive(cloudClient.getZkStateReader(), 45000, stoppedJetties));
|
||||
|
||||
ChaosMonkey.start(jettys);
|
||||
ChaosMonkey.start(stoppedJetties);
|
||||
ChaosMonkey.start(controlJetty);
|
||||
|
||||
assertTrue("Timeout waiting for all live and active", ClusterStateUtil.waitForAllActiveAndLiveReplicas(cloudClient.getZkStateReader(), collection1, 120000));
|
||||
|
||||
assertSliceAndReplicaCount(collection1);
|
||||
assertSingleReplicationAndShardSize(collection3, 5);
|
||||
assertSliceAndReplicaCount(collection1, 2, 2, 120000);
|
||||
assertSliceAndReplicaCount(collection3, 5, 1, 120000);
|
||||
|
||||
// all docs should be queried
|
||||
assertSingleReplicationAndShardSize(collection4, 5);
|
||||
assertSliceAndReplicaCount(collection4, 5, 1, 120000);
|
||||
queryAndAssertResultSize(collection4, numDocs, 10000);
|
||||
|
||||
assertUlogDir(collections);
|
||||
|
@ -257,75 +256,13 @@ public class SharedFSAutoReplicaFailoverTest extends AbstractFullDistribZkTestBa
|
|||
int jettyIndex = random().nextInt(jettys.size());
|
||||
ChaosMonkey.stop(jettys.get(jettyIndex));
|
||||
ChaosMonkey.start(jettys.get(jettyIndex));
|
||||
|
||||
assertTrue("Timeout waiting for all live and active", ClusterStateUtil.waitForAllActiveAndLiveReplicas(cloudClient.getZkStateReader(), collection1, 60000));
|
||||
|
||||
assertSliceAndReplicaCount(collection1);
|
||||
|
||||
assertSliceAndReplicaCount(collection1, 2, 2, 120000);
|
||||
|
||||
assertUlogDir(collections);
|
||||
|
||||
assertSingleReplicationAndShardSize(collection3, 5);
|
||||
ClusterStateUtil.waitForLiveAndActiveReplicaCount(cloudClient.getZkStateReader(), collection3, 5, 30000);
|
||||
|
||||
assertSingleReplicationAndShardSize(collection4, 5);
|
||||
ClusterStateUtil.waitForLiveAndActiveReplicaCount(cloudClient.getZkStateReader(), collection4, 5, 30000);
|
||||
|
||||
//disable autoAddReplicas
|
||||
Map m = makeMap(
|
||||
"action", CollectionParams.CollectionAction.CLUSTERPROP.toLower(),
|
||||
"name", ZkStateReader.AUTO_ADD_REPLICAS,
|
||||
"val", "false");
|
||||
|
||||
SolrRequest request = new QueryRequest(new MapSolrParams(m));
|
||||
request.setPath("/admin/collections");
|
||||
cloudClient.request(request);
|
||||
|
||||
int currentCount = ClusterStateUtil.getLiveAndActiveReplicaCount(cloudClient.getZkStateReader(), collection1);
|
||||
|
||||
ChaosMonkey.stop(jettys.get(3));
|
||||
|
||||
//solr.xml has defined workLoopDelay=10s and waitAfterExpiration=10s
|
||||
//Hence waiting for 30 seconds to be on the safe side.
|
||||
Thread.sleep(30000);
|
||||
//Ensures that autoAddReplicas has not kicked in.
|
||||
assertTrue(currentCount > ClusterStateUtil.getLiveAndActiveReplicaCount(cloudClient.getZkStateReader(), collection1));
|
||||
|
||||
//enable autoAddReplicas
|
||||
m = makeMap(
|
||||
"action", CollectionParams.CollectionAction.CLUSTERPROP.toLower(),
|
||||
"name", ZkStateReader.AUTO_ADD_REPLICAS);
|
||||
|
||||
request = new QueryRequest(new MapSolrParams(m));
|
||||
request.setPath("/admin/collections");
|
||||
cloudClient.request(request);
|
||||
|
||||
assertTrue("Timeout waiting for all live and active", ClusterStateUtil.waitForAllActiveAndLiveReplicas(cloudClient.getZkStateReader(), collection1, 90000));
|
||||
assertSliceAndReplicaCount(collection1);
|
||||
|
||||
assertUlogDir(collections);
|
||||
|
||||
// restart all to test core saved state
|
||||
|
||||
ChaosMonkey.stop(jettys);
|
||||
ChaosMonkey.stop(controlJetty);
|
||||
|
||||
assertTrue("Timeout waiting for all not live", ClusterStateUtil.waitForAllReplicasNotLive(cloudClient.getZkStateReader(), 45000));
|
||||
|
||||
ChaosMonkey.start(jettys);
|
||||
ChaosMonkey.start(controlJetty);
|
||||
|
||||
assertTrue("Timeout waiting for all live and active", ClusterStateUtil.waitForAllActiveAndLiveReplicas(cloudClient.getZkStateReader(), collection1, 120000));
|
||||
|
||||
assertSliceAndReplicaCount(collection1);
|
||||
|
||||
assertUlogDir(collections);
|
||||
|
||||
assertSliceAndReplicaCount(collection1);
|
||||
assertSingleReplicationAndShardSize(collection3, 5);
|
||||
|
||||
// all docs should be queried
|
||||
assertSingleReplicationAndShardSize(collection4, 5);
|
||||
queryAndAssertResultSize(collection4, numDocs, 10000);
|
||||
assertSliceAndReplicaCount(collection3, 5, 1, 120000);
|
||||
assertSliceAndReplicaCount(collection4, 5, 1, 120000);
|
||||
}
|
||||
|
||||
private void queryAndAssertResultSize(String collection, int expectedResultSize, int timeoutMS)
|
||||
|
@ -386,26 +323,83 @@ public class SharedFSAutoReplicaFailoverTest extends AbstractFullDistribZkTestBa
|
|||
}
|
||||
}
|
||||
|
||||
private void assertSingleReplicationAndShardSize(String collection, int numSlices) {
|
||||
Collection<Slice> slices;
|
||||
slices = cloudClient.getZkStateReader().getClusterState().getActiveSlices(collection);
|
||||
assertEquals(numSlices, slices.size());
|
||||
for (Slice slice : slices) {
|
||||
assertEquals(1, slice.getReplicas().size());
|
||||
}
|
||||
// Overseer failover is not currently guaranteed with MoveReplica or Policy Framework
|
||||
private List<JettySolrRunner> notOverseerJetties() throws IOException, SolrServerException {
|
||||
CollectionAdminResponse response = CollectionAdminRequest.getOverseerStatus().process(cloudClient);
|
||||
String overseerNode = (String) response.getResponse().get("leader");
|
||||
return jettys.stream().filter(jetty -> !(jetty.getCoreContainer() != null && overseerNode.equals(jetty.getNodeName())))
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
private void assertSliceAndReplicaCount(String collection) {
|
||||
Collection<Slice> slices;
|
||||
slices = cloudClient.getZkStateReader().getClusterState().getActiveSlices(collection);
|
||||
assertEquals(2, slices.size());
|
||||
for (Slice slice : slices) {
|
||||
assertEquals(2, slice.getReplicas().size());
|
||||
private boolean waitingForReplicasNotLive(ZkStateReader zkStateReader, int timeoutInMs, List<JettySolrRunner> jetties) {
|
||||
Set<String> nodeNames = jetties.stream()
|
||||
.filter(jetty -> jetty.getCoreContainer() != null)
|
||||
.map(JettySolrRunner::getNodeName)
|
||||
.collect(Collectors.toSet());
|
||||
long timeout = System.nanoTime()
|
||||
+ TimeUnit.NANOSECONDS.convert(timeoutInMs, TimeUnit.MILLISECONDS);
|
||||
boolean success = false;
|
||||
while (!success && System.nanoTime() < timeout) {
|
||||
success = true;
|
||||
ClusterState clusterState = zkStateReader.getClusterState();
|
||||
if (clusterState != null) {
|
||||
Map<String, DocCollection> collections = clusterState.getCollectionsMap();
|
||||
for (Map.Entry<String, DocCollection> entry : collections.entrySet()) {
|
||||
DocCollection docCollection = entry.getValue();
|
||||
Collection<Slice> slices = docCollection.getSlices();
|
||||
for (Slice slice : slices) {
|
||||
// only look at active shards
|
||||
if (slice.getState() == Slice.State.ACTIVE) {
|
||||
Collection<Replica> replicas = slice.getReplicas();
|
||||
for (Replica replica : replicas) {
|
||||
if (nodeNames.contains(replica.getNodeName())) {
|
||||
boolean live = clusterState.liveNodesContain(replica
|
||||
.getNodeName());
|
||||
if (live) {
|
||||
success = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if (!success) {
|
||||
try {
|
||||
Thread.sleep(500);
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Interrupted");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return success;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void distribTearDown() throws Exception {
|
||||
super.distribTearDown();
|
||||
|
||||
private void assertSliceAndReplicaCount(String collection, int numSlices, int numReplicas, int timeOutInMs) throws InterruptedException {
|
||||
TimeOut timeOut = new TimeOut(timeOutInMs, TimeUnit.MILLISECONDS);
|
||||
while (!timeOut.hasTimedOut()) {
|
||||
ClusterState clusterState = cloudClient.getZkStateReader().getClusterState();
|
||||
Collection<Slice> slices = clusterState.getCollection(collection).getActiveSlices();
|
||||
if (slices.size() == numSlices) {
|
||||
boolean isMatch = true;
|
||||
for (Slice slice : slices) {
|
||||
int count = 0;
|
||||
for (Replica replica : slice.getReplicas()) {
|
||||
if (replica.getState() == Replica.State.ACTIVE && clusterState.liveNodesContain(replica.getNodeName())) {
|
||||
count++;
|
||||
}
|
||||
}
|
||||
if (count < numReplicas) {
|
||||
isMatch = false;
|
||||
}
|
||||
}
|
||||
if (isMatch) return;
|
||||
}
|
||||
Thread.sleep(200);
|
||||
}
|
||||
fail("Expected numSlices=" + numSlices + " numReplicas=" + numReplicas + " but found " + cloudClient.getZkStateReader().getClusterState().getCollection(collection));
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -1,191 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.solr.cloud;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.solr.SolrTestCaseJ4;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import static org.apache.solr.cloud.ClusterStateMockUtil.buildClusterState;
|
||||
|
||||
public class SharedFSAutoReplicaFailoverUtilsTest extends SolrTestCaseJ4 {
|
||||
private static final String NODE6 = "baseUrl6_";
|
||||
private static final String NODE6_URL = "http://baseUrl6";
|
||||
|
||||
private static final String NODE5 = "baseUrl5_";
|
||||
private static final String NODE5_URL = "http://baseUrl5";
|
||||
|
||||
private static final String NODE4 = "baseUrl4_";
|
||||
private static final String NODE4_URL = "http://baseUrl4";
|
||||
|
||||
private static final String NODE3 = "baseUrl3_";
|
||||
private static final String NODE3_URL = "http://baseUrl3";
|
||||
|
||||
private static final String NODE2 = "baseUrl2_";
|
||||
private static final String NODE2_URL = "http://baseUrl2";
|
||||
|
||||
private static final String NODE1 = "baseUrl1_";
|
||||
private static final String NODE1_URL = "http://baseUrl1";
|
||||
|
||||
private List<ClusterStateMockUtil.Result> results;
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
super.setUp();
|
||||
results = new ArrayList<>();
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
super.tearDown();
|
||||
for (ClusterStateMockUtil.Result result : results) {
|
||||
result.close();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetBestCreateUrlBasics() {
|
||||
ClusterStateMockUtil.Result result = buildClusterState(results, "csr1R*r2", NODE1);
|
||||
String createUrl = OverseerAutoReplicaFailoverThread.getBestCreateUrl(result.reader, result.badReplica, null);
|
||||
assertNull("Should be no live node to failover to", createUrl);
|
||||
|
||||
result = buildClusterState(results, "csr1R*r2", NODE1, NODE2);
|
||||
createUrl = OverseerAutoReplicaFailoverThread.getBestCreateUrl(result.reader, result.badReplica, null);
|
||||
assertNull("Only failover candidate node already has a replica", createUrl);
|
||||
|
||||
result = buildClusterState(results, "csr1R*r2sr3", NODE1, NODE2, NODE3);
|
||||
createUrl = OverseerAutoReplicaFailoverThread.getBestCreateUrl(result.reader, result.badReplica, null);
|
||||
assertEquals("Node3 does not have a replica from the bad slice and should be the best choice", NODE3_URL, createUrl);
|
||||
|
||||
result = buildClusterState(results, "csr1R*r2Fsr3r4r5", NODE1, NODE2, NODE3);
|
||||
createUrl = OverseerAutoReplicaFailoverThread.getBestCreateUrl(result.reader, result.badReplica, null);
|
||||
assertTrue(createUrl.equals(NODE3_URL));
|
||||
|
||||
result = buildClusterState(results, "csr1*r2r3sr3r3sr4", NODE1, NODE2, NODE3, NODE4);
|
||||
createUrl = OverseerAutoReplicaFailoverThread.getBestCreateUrl(result.reader, result.badReplica, null);
|
||||
assertEquals(NODE4_URL, createUrl);
|
||||
|
||||
result = buildClusterState(results, "csr1*r2sr3r3sr4sr4", NODE1, NODE2, NODE3, NODE4);
|
||||
createUrl = OverseerAutoReplicaFailoverThread.getBestCreateUrl(result.reader, result.badReplica, null);
|
||||
assertTrue(createUrl.equals(NODE3_URL) || createUrl.equals(NODE4_URL));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetBestCreateUrlMultipleCollections() throws Exception {
|
||||
|
||||
ClusterStateMockUtil.Result result = buildClusterState(results, "csr*r2csr2", NODE1);
|
||||
String createUrl = OverseerAutoReplicaFailoverThread.getBestCreateUrl(result.reader, result.badReplica, null);
|
||||
assertNull(createUrl);
|
||||
|
||||
result = buildClusterState(results, "csr*r2csr2", NODE1);
|
||||
createUrl = OverseerAutoReplicaFailoverThread.getBestCreateUrl(result.reader, result.badReplica, null);
|
||||
assertNull(createUrl);
|
||||
|
||||
result = buildClusterState(results, "csr*r2csr2", NODE1, NODE2);
|
||||
createUrl = OverseerAutoReplicaFailoverThread.getBestCreateUrl(result.reader, result.badReplica, null);
|
||||
assertNull(createUrl);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetBestCreateUrlMultipleCollections2() {
|
||||
|
||||
ClusterStateMockUtil.Result result = buildClusterState(results, "csr*r2sr3cr2", NODE1);
|
||||
String createUrl = OverseerAutoReplicaFailoverThread.getBestCreateUrl(result.reader, result.badReplica, null);
|
||||
assertNull(createUrl);
|
||||
|
||||
result = buildClusterState(results, "csr*r2sr3cr2", NODE1, NODE2, NODE3);
|
||||
createUrl = OverseerAutoReplicaFailoverThread.getBestCreateUrl(result.reader, result.badReplica, null);
|
||||
assertEquals(NODE3_URL, createUrl);
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testGetBestCreateUrlMultipleCollections3() {
|
||||
ClusterStateMockUtil.Result result = buildClusterState(results, "csr5r1sr4r2sr3r6csr2*r6sr5r3sr4r3", NODE1, NODE4, NODE5, NODE6);
|
||||
String createUrl = OverseerAutoReplicaFailoverThread.getBestCreateUrl(result.reader, result.badReplica, null);
|
||||
assertEquals(NODE1_URL, createUrl);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetBestCreateUrlMultipleCollections4() {
|
||||
ClusterStateMockUtil.Result result = buildClusterState(results, "csr1r4sr3r5sr2r6csr5r6sr4r6sr5*r4", NODE6);
|
||||
String createUrl = OverseerAutoReplicaFailoverThread.getBestCreateUrl(result.reader, result.badReplica, null);
|
||||
assertEquals(NODE6_URL, createUrl);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFailOverToEmptySolrInstance() {
|
||||
ClusterStateMockUtil.Result result = buildClusterState(results, "csr1*r1sr1csr1", NODE2);
|
||||
String createUrl = OverseerAutoReplicaFailoverThread.getBestCreateUrl(result.reader, result.badReplica, null);
|
||||
assertEquals(NODE2_URL, createUrl);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFavorForeignSlices() {
|
||||
ClusterStateMockUtil.Result result = buildClusterState(results, "csr*sr2csr3r3", NODE2, NODE3);
|
||||
String createUrl = OverseerAutoReplicaFailoverThread.getBestCreateUrl(result.reader, result.badReplica, null);
|
||||
assertEquals(NODE3_URL, createUrl);
|
||||
|
||||
result = buildClusterState(results, "csr*sr2csr3r3r3r3r3r3r3", NODE2, NODE3);
|
||||
createUrl = OverseerAutoReplicaFailoverThread.getBestCreateUrl(result.reader, result.badReplica, null);
|
||||
assertEquals(NODE2_URL, createUrl);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCollectionMaxNodesPerShard() {
|
||||
ClusterStateMockUtil.Result result = buildClusterState(results, "csr*sr2", 1, 1, NODE2);
|
||||
String createUrl = OverseerAutoReplicaFailoverThread.getBestCreateUrl(result.reader, result.badReplica, null);
|
||||
assertNull(createUrl);
|
||||
|
||||
result = buildClusterState(results, "csr*sr2", 1, 2, NODE2);
|
||||
createUrl = OverseerAutoReplicaFailoverThread.getBestCreateUrl(result.reader, result.badReplica, null);
|
||||
assertEquals(NODE2_URL, createUrl);
|
||||
|
||||
result = buildClusterState(results, "csr*csr2r2", 1, 1, NODE2);
|
||||
createUrl = OverseerAutoReplicaFailoverThread.getBestCreateUrl(result.reader, result.badReplica, null);
|
||||
assertEquals(NODE2_URL, createUrl);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMaxCoresPerNode() {
|
||||
ClusterStateMockUtil.Result result = buildClusterState(results, "csr*sr2", 1, 1, NODE2);
|
||||
String createUrl = OverseerAutoReplicaFailoverThread.getBestCreateUrl(result.reader, result.badReplica, 1);
|
||||
assertNull(createUrl);
|
||||
|
||||
createUrl = OverseerAutoReplicaFailoverThread.getBestCreateUrl(result.reader, result.badReplica, 2);
|
||||
assertNull(createUrl);
|
||||
|
||||
result = buildClusterState(results, "csr*sr2", 1, 2, NODE2);
|
||||
createUrl = OverseerAutoReplicaFailoverThread.getBestCreateUrl(result.reader, result.badReplica, 2);
|
||||
assertEquals(NODE2_URL, createUrl);
|
||||
|
||||
result = buildClusterState(results, "csr*sr2sr3sr4", 1, 1, NODE2, NODE3, NODE4);
|
||||
createUrl = OverseerAutoReplicaFailoverThread.getBestCreateUrl(result.reader, result.badReplica, 1);
|
||||
assertNull(createUrl);
|
||||
|
||||
createUrl = OverseerAutoReplicaFailoverThread.getBestCreateUrl(result.reader, result.badReplica, 2);
|
||||
assertNull(createUrl);
|
||||
|
||||
result = buildClusterState(results, "csr*sr2sr3sr4", 1, 2, NODE2, NODE3, NODE4);
|
||||
createUrl = OverseerAutoReplicaFailoverThread.getBestCreateUrl(result.reader, result.badReplica, 2);
|
||||
assertTrue(createUrl.equals(NODE3_URL) || createUrl.equals(NODE4_URL));
|
||||
}
|
||||
}
|
|
@ -0,0 +1,180 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.solr.cloud.autoscaling;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.solr.client.solrj.SolrServerException;
|
||||
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
|
||||
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
|
||||
import org.apache.solr.client.solrj.request.QueryRequest;
|
||||
import org.apache.solr.cloud.SolrCloudTestCase;
|
||||
import org.apache.solr.common.cloud.ClusterStateUtil;
|
||||
import org.apache.solr.common.cloud.DocCollection;
|
||||
import org.apache.solr.common.cloud.Replica;
|
||||
import org.apache.solr.common.cloud.ZkStateReader;
|
||||
import org.apache.solr.common.params.CollectionParams;
|
||||
import org.apache.solr.common.params.MapSolrParams;
|
||||
import org.apache.solr.common.params.ModifiableSolrParams;
|
||||
import org.apache.solr.common.params.SolrParams;
|
||||
import org.apache.solr.util.TimeOut;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
import static org.apache.solr.common.util.Utils.makeMap;
|
||||
|
||||
public class AutoAddReplicasIntegrationTest extends SolrCloudTestCase {
|
||||
private static final String COLLECTION1 = "testSimple1";
|
||||
private static final String COLLECTION2 = "testSimple2";
|
||||
|
||||
@BeforeClass
|
||||
public static void setupCluster() throws Exception {
|
||||
configureCluster(3)
|
||||
.addConfig("conf", configset("cloud-minimal"))
|
||||
.configure();
|
||||
JettySolrRunner jetty1 = cluster.getJettySolrRunner(0);
|
||||
JettySolrRunner jetty2 = cluster.getJettySolrRunner(1);
|
||||
JettySolrRunner jetty3 = cluster.getJettySolrRunner(2);
|
||||
CollectionAdminRequest.createCollection(COLLECTION1, "conf", 2, 2)
|
||||
.setCreateNodeSet(jetty1.getNodeName()+","+jetty2.getNodeName())
|
||||
.setAutoAddReplicas(true)
|
||||
.setMaxShardsPerNode(2)
|
||||
.process(cluster.getSolrClient());
|
||||
CollectionAdminRequest.createCollection(COLLECTION2, "conf", 2, 2)
|
||||
.setCreateNodeSet(jetty2.getNodeName()+","+jetty3.getNodeName())
|
||||
.setAutoAddReplicas(false)
|
||||
.setMaxShardsPerNode(2)
|
||||
.process(cluster.getSolrClient());
|
||||
// the number of cores in jetty1 (5) will be larger than jetty3 (1)
|
||||
CollectionAdminRequest.createCollection("testSimple3", "conf", 3, 1)
|
||||
.setCreateNodeSet(jetty1.getNodeName())
|
||||
.setAutoAddReplicas(false)
|
||||
.setMaxShardsPerNode(3)
|
||||
.process(cluster.getSolrClient());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSimple() throws Exception{
|
||||
JettySolrRunner jetty2 = cluster.getJettySolrRunner(1);
|
||||
JettySolrRunner jetty3 = cluster.getJettySolrRunner(2);
|
||||
ZkStateReader zkStateReader = cluster.getSolrClient().getZkStateReader();
|
||||
|
||||
// start the tests
|
||||
JettySolrRunner lostJetty = random().nextBoolean() ? cluster.getJettySolrRunner(0) : cluster.getJettySolrRunner(1);
|
||||
String lostNodeName = lostJetty.getNodeName();
|
||||
List<Replica> replacedHdfsReplicas = getReplacedSharedFsReplicas(COLLECTION1, zkStateReader, lostNodeName);
|
||||
lostJetty.stop();
|
||||
waitForNodeLeave(lostNodeName);
|
||||
waitForState("Waiting for collection " + COLLECTION1, COLLECTION1, clusterShape(2, 2));
|
||||
checkSharedFsReplicasMovedCorrectly(replacedHdfsReplicas, zkStateReader, COLLECTION1);
|
||||
lostJetty.start();
|
||||
assertTrue("Timeout waiting for all live and active", ClusterStateUtil.waitForAllActiveAndLiveReplicas(cluster.getSolrClient().getZkStateReader(), 30000));
|
||||
|
||||
// check cluster property is considered
|
||||
disableAutoAddReplicasInCluster();
|
||||
lostNodeName = jetty3.getNodeName();
|
||||
jetty3.stop();
|
||||
waitForNodeLeave(lostNodeName);
|
||||
waitForState("Waiting for collection " + COLLECTION1, COLLECTION1, clusterShape(2, 1));
|
||||
jetty3.start();
|
||||
assertTrue("Timeout waiting for all live and active", ClusterStateUtil.waitForAllActiveAndLiveReplicas(cluster.getSolrClient().getZkStateReader(), 30000));
|
||||
enableAutoAddReplicasInCluster();
|
||||
|
||||
|
||||
// test for multiple collections
|
||||
new CollectionAdminRequest.AsyncCollectionAdminRequest(CollectionParams.CollectionAction.MODIFYCOLLECTION) {
|
||||
@Override
|
||||
public SolrParams getParams() {
|
||||
ModifiableSolrParams params = (ModifiableSolrParams) super.getParams();
|
||||
params.set("collection", COLLECTION2);
|
||||
params.set("autoAddReplicas", true);
|
||||
return params;
|
||||
}
|
||||
}.process(cluster.getSolrClient());
|
||||
|
||||
lostNodeName = jetty2.getNodeName();
|
||||
replacedHdfsReplicas = getReplacedSharedFsReplicas(COLLECTION2, zkStateReader, lostNodeName);
|
||||
jetty2.stop();
|
||||
waitForNodeLeave(lostNodeName);
|
||||
waitForState("Waiting for collection " + COLLECTION1, COLLECTION1, clusterShape(2, 2));
|
||||
waitForState("Waiting for collection " + COLLECTION2, COLLECTION2, clusterShape(2, 2));
|
||||
checkSharedFsReplicasMovedCorrectly(replacedHdfsReplicas, zkStateReader, COLLECTION2);
|
||||
|
||||
// overseer failover test..
|
||||
}
|
||||
|
||||
private void disableAutoAddReplicasInCluster() throws SolrServerException, IOException {
|
||||
Map m = makeMap(
|
||||
"action", CollectionParams.CollectionAction.CLUSTERPROP.toLower(),
|
||||
"name", ZkStateReader.AUTO_ADD_REPLICAS,
|
||||
"val", "false");
|
||||
QueryRequest request = new QueryRequest(new MapSolrParams(m));
|
||||
request.setPath("/admin/collections");
|
||||
cluster.getSolrClient().request(request);
|
||||
}
|
||||
|
||||
private void enableAutoAddReplicasInCluster() throws SolrServerException, IOException {
|
||||
Map m = makeMap(
|
||||
"action", CollectionParams.CollectionAction.CLUSTERPROP.toLower(),
|
||||
"name", ZkStateReader.AUTO_ADD_REPLICAS);
|
||||
QueryRequest request = new QueryRequest(new MapSolrParams(m));
|
||||
request.setPath("/admin/collections");
|
||||
cluster.getSolrClient().request(request);
|
||||
}
|
||||
|
||||
private void checkSharedFsReplicasMovedCorrectly(List<Replica> replacedHdfsReplicas, ZkStateReader zkStateReader, String collection){
|
||||
DocCollection docCollection = zkStateReader.getClusterState().getCollection(collection);
|
||||
for (Replica replica :replacedHdfsReplicas) {
|
||||
boolean found = false;
|
||||
String dataDir = replica.getStr("dataDir");
|
||||
String ulogDir = replica.getStr("ulogDir");
|
||||
for (Replica replica2 : docCollection.getReplicas()) {
|
||||
if (dataDir.equals(replica2.getStr("dataDir")) && ulogDir.equals(replica2.getStr("ulogDir"))) {
|
||||
found = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (!found) fail("Can not found a replica with same dataDir and ulogDir as " + replica + " from:" + docCollection.getReplicas());
|
||||
}
|
||||
}
|
||||
|
||||
private List<Replica> getReplacedSharedFsReplicas(String collection, ZkStateReader zkStateReader, String lostNodeName) {
|
||||
List<Replica> replacedHdfsReplicas = new ArrayList<>();
|
||||
for (Replica replica : zkStateReader.getClusterState().getCollection(collection).getReplicas()) {
|
||||
String dataDir = replica.getStr("dataDir");
|
||||
if (replica.getNodeName().equals(lostNodeName) && dataDir != null) {
|
||||
replacedHdfsReplicas.add(replica);
|
||||
}
|
||||
}
|
||||
|
||||
return replacedHdfsReplicas;
|
||||
}
|
||||
|
||||
private void waitForNodeLeave(String lostNodeName) throws InterruptedException {
|
||||
ZkStateReader reader = cluster.getSolrClient().getZkStateReader();
|
||||
TimeOut timeOut = new TimeOut(10, TimeUnit.SECONDS);
|
||||
while (reader.getClusterState().getLiveNodes().contains(lostNodeName)) {
|
||||
Thread.sleep(100);
|
||||
if (timeOut.hasTimedOut()) fail("Wait for " + lostNodeName + " to leave failed!");
|
||||
}
|
||||
}
|
||||
}
|
|
@ -31,6 +31,8 @@ import org.apache.solr.cloud.CloudDescriptor;
|
|||
import org.apache.solr.cloud.SolrCloudTestCase;
|
||||
import org.apache.solr.common.cloud.ClusterStateUtil;
|
||||
import org.apache.solr.common.cloud.ZkStateReader;
|
||||
import org.apache.solr.common.params.CollectionParams;
|
||||
import org.apache.solr.common.params.ModifiableSolrParams;
|
||||
import org.apache.solr.common.params.SolrParams;
|
||||
import org.apache.solr.common.util.NamedList;
|
||||
import org.apache.solr.util.TimeOut;
|
||||
|
@ -114,6 +116,25 @@ public class AutoAddReplicasPlanActionTest extends SolrCloudTestCase{
|
|||
|
||||
operations = getOperations(jetty3, lostNodeName);
|
||||
assertOperations(collection1, operations, lostNodeName, cloudDescriptors, jetty3);
|
||||
|
||||
lostJetty.start();
|
||||
assertTrue("Timeout waiting for all live and active", ClusterStateUtil.waitForAllActiveAndLiveReplicas(cluster.getSolrClient().getZkStateReader(), 30000));
|
||||
|
||||
new CollectionAdminRequest.AsyncCollectionAdminRequest(CollectionParams.CollectionAction.MODIFYCOLLECTION) {
|
||||
@Override
|
||||
public SolrParams getParams() {
|
||||
ModifiableSolrParams params = (ModifiableSolrParams) super.getParams();
|
||||
params.set("collection", collection1);
|
||||
params.set("autoAddReplicas", false);
|
||||
return params;
|
||||
}
|
||||
}.process(cluster.getSolrClient());
|
||||
lostJetty = jetty1;
|
||||
lostNodeName = lostJetty.getNodeName();
|
||||
lostJetty.stop();
|
||||
waitForNodeLeave(lostNodeName);
|
||||
operations = getOperations(jetty3, lostNodeName);
|
||||
assertNull(operations);
|
||||
}
|
||||
|
||||
private void waitForNodeLeave(String lostNodeName) throws InterruptedException {
|
||||
|
|
|
@ -0,0 +1,58 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.solr.cloud.autoscaling;
|
||||
|
||||
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.lucene.util.LuceneTestCase;
|
||||
import org.apache.solr.cloud.MoveReplicaHDFSTest;
|
||||
import org.apache.solr.cloud.hdfs.HdfsTestUtil;
|
||||
import org.apache.solr.common.cloud.ZkConfigManager;
|
||||
import org.apache.solr.util.BadHdfsThreadsFilter;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
|
||||
import static org.apache.lucene.util.LuceneTestCase.createTempDir;
|
||||
|
||||
@LuceneTestCase.Slow
|
||||
@ThreadLeakFilters(defaultFilters = true, filters = {
|
||||
BadHdfsThreadsFilter.class, // hdfs currently leaks thread(s)
|
||||
MoveReplicaHDFSTest.ForkJoinThreadsFilter.class
|
||||
})
|
||||
public class HdfsAutoAddReplicasIntegrationTest extends AutoAddReplicasIntegrationTest {
|
||||
|
||||
private static MiniDFSCluster dfsCluster;
|
||||
|
||||
@BeforeClass
|
||||
public static void setupClass() throws Exception {
|
||||
dfsCluster = HdfsTestUtil.setupClass(createTempDir().toFile().getAbsolutePath());
|
||||
System.setProperty("solr.hdfs.blockcache.blocksperbank", "2048");
|
||||
|
||||
ZkConfigManager configManager = new ZkConfigManager(zkClient());
|
||||
configManager.uploadConfigDir(configset("cloud-hdfs"), "conf");
|
||||
|
||||
System.setProperty("solr.hdfs.home", HdfsTestUtil.getDataDir(dfsCluster, "data"));
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void teardownClass() throws Exception {
|
||||
cluster.shutdown(); // need to close before the MiniDFSCluster
|
||||
HdfsTestUtil.teardownClass(dfsCluster);
|
||||
dfsCluster = null;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,32 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.solr.client.solrj.cloud.autoscaling;
|
||||
|
||||
import org.apache.solr.client.solrj.SolrRequest;
|
||||
|
||||
public class NoneSuggester extends Policy.Suggester{
|
||||
@Override
|
||||
SolrRequest init() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public SolrRequest getOperation() {
|
||||
return null;
|
||||
}
|
||||
}
|
|
@ -253,16 +253,5 @@ public class ClusterStateUtil {
|
|||
|
||||
return success;
|
||||
}
|
||||
|
||||
public static boolean isAutoAddReplicas(ZkStateReader reader, String collection) {
|
||||
ClusterState clusterState = reader.getClusterState();
|
||||
if (clusterState != null) {
|
||||
DocCollection docCollection = clusterState.getCollectionOrNull(collection);
|
||||
if (docCollection != null) {
|
||||
return docCollection.getAutoAddReplicas();
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
|
|
@ -509,6 +509,135 @@ public class TestPolicy extends SolrTestCaseJ4 {
|
|||
|
||||
}
|
||||
|
||||
|
||||
public void testMoveReplicasInMultipleCollections() {
|
||||
Map<String, Map> nodeValues = (Map<String, Map>) Utils.fromJSONString("{" +
|
||||
"node1:{cores:2}," +
|
||||
"node3:{cores:4}" +
|
||||
"}");
|
||||
String clusterState = "{\n" +
|
||||
"'collection1' : {\n" +
|
||||
" 'pullReplicas':'0',\n" +
|
||||
" 'replicationFactor':'2',\n" +
|
||||
" 'shards':{\n" +
|
||||
" 'shard1':{\n" +
|
||||
" 'range':'80000000-ffffffff',\n" +
|
||||
" 'state':'active',\n" +
|
||||
" 'replicas':{\n" +
|
||||
" 'core_node1':{\n" +
|
||||
" 'core':'collection1_shard1_replica_n1',\n" +
|
||||
" 'base_url':'http://127.0.0.1:51650/solr',\n" +
|
||||
" 'node_name':'node1',\n" +
|
||||
" 'state':'active',\n" +
|
||||
" 'type':'NRT',\n" +
|
||||
" 'leader':'true'},\n" +
|
||||
" 'core_node6':{\n" +
|
||||
" 'core':'collection1_shard1_replica_n3',\n" +
|
||||
" 'base_url':'http://127.0.0.1:51651/solr',\n" +
|
||||
" 'node_name':'node3',\n" +
|
||||
" 'state':'active',\n" +
|
||||
" 'type':'NRT'}}},\n" +
|
||||
" 'shard2':{\n" +
|
||||
" 'range':'0-7fffffff',\n" +
|
||||
" 'state':'active',\n" +
|
||||
" 'replicas':{\n" +
|
||||
" 'core_node3':{\n" +
|
||||
" 'core':'collection1_shard2_replica_n1',\n" +
|
||||
" 'base_url':'http://127.0.0.1:51650/solr',\n" +
|
||||
" 'node_name':'node1',\n" +
|
||||
" 'state':'active',\n" +
|
||||
" 'type':'NRT',\n" +
|
||||
" 'leader':'true'},\n" +
|
||||
" 'core_node5':{\n" +
|
||||
" 'core':'collection1_shard2_replica_n3',\n" +
|
||||
" 'base_url':'http://127.0.0.1:51651/solr',\n" +
|
||||
" 'node_name':'node3',\n" +
|
||||
" 'state':'active',\n" +
|
||||
" 'type':'NRT'}}}},\n" +
|
||||
" 'router':{'name':'compositeId'},\n" +
|
||||
" 'maxShardsPerNode':'2',\n" +
|
||||
" 'autoAddReplicas':'true',\n" +
|
||||
" 'nrtReplicas':'2',\n" +
|
||||
" 'tlogReplicas':'0'},\n" +
|
||||
"'collection2' : {\n" +
|
||||
" 'pullReplicas':'0',\n" +
|
||||
" 'replicationFactor':'2',\n" +
|
||||
" 'shards':{\n" +
|
||||
" 'shard1':{\n" +
|
||||
" 'range':'80000000-ffffffff',\n" +
|
||||
" 'state':'active',\n" +
|
||||
" 'replicas':{\n" +
|
||||
" 'core_node1':{\n" +
|
||||
" 'core':'collection2_shard1_replica_n1',\n" +
|
||||
" 'base_url':'http://127.0.0.1:51649/solr',\n" +
|
||||
" 'node_name':'node2',\n" +
|
||||
" 'state':'active',\n" +
|
||||
" 'type':'NRT'},\n" +
|
||||
" 'core_node2':{\n" +
|
||||
" 'core':'collection2_shard1_replica_n2',\n" +
|
||||
" 'base_url':'http://127.0.0.1:51651/solr',\n" +
|
||||
" 'node_name':'node3',\n" +
|
||||
" 'state':'active',\n" +
|
||||
" 'type':'NRT',\n" +
|
||||
" 'leader':'true'}}},\n" +
|
||||
" 'shard2':{\n" +
|
||||
" 'range':'0-7fffffff',\n" +
|
||||
" 'state':'active',\n" +
|
||||
" 'replicas':{\n" +
|
||||
" 'core_node3':{\n" +
|
||||
" 'core':'collection2_shard2_replica_n1',\n" +
|
||||
" 'base_url':'http://127.0.0.1:51649/solr',\n" +
|
||||
" 'node_name':'node2',\n" +
|
||||
" 'state':'active',\n" +
|
||||
" 'type':'NRT'},\n" +
|
||||
" 'core_node4':{\n" +
|
||||
" 'core':'collection2_shard2_replica_n2',\n" +
|
||||
" 'base_url':'http://127.0.0.1:51651/solr',\n" +
|
||||
" 'node_name':'node3',\n" +
|
||||
" 'state':'active',\n" +
|
||||
" 'type':'NRT',\n" +
|
||||
" 'leader':'true'}}}},\n" +
|
||||
" 'router':{'name':'compositeId'},\n" +
|
||||
" 'maxShardsPerNode':'2',\n" +
|
||||
" 'autoAddReplicas':'true',\n" +
|
||||
" 'nrtReplicas':'2',\n" +
|
||||
" 'tlogReplicas':'0'}\n" +
|
||||
"}";
|
||||
Policy policy = new Policy(new HashMap<>());
|
||||
Policy.Suggester suggester = policy.createSession(getClusterDataProvider(nodeValues, clusterState))
|
||||
.getSuggester(MOVEREPLICA)
|
||||
.hint(Hint.COLL, "collection1")
|
||||
.hint(Hint.COLL, "collection2")
|
||||
.hint(Policy.Suggester.Hint.SRC_NODE, "node2");
|
||||
SolrRequest op = suggester.getOperation();
|
||||
assertNotNull(op);
|
||||
assertEquals("collection2", op.getParams().get("collection"));
|
||||
assertEquals("node1", op.getParams().get("targetNode"));
|
||||
String coreNodeName = op.getParams().get("replica");
|
||||
assertTrue(coreNodeName.equals("core_node3") || coreNodeName.equals("core_node1"));
|
||||
|
||||
suggester = suggester.getSession()
|
||||
.getSuggester(MOVEREPLICA)
|
||||
.hint(Hint.COLL, "collection1")
|
||||
.hint(Hint.COLL, "collection2")
|
||||
.hint(Policy.Suggester.Hint.SRC_NODE, "node2");
|
||||
op = suggester.getOperation();
|
||||
assertNotNull(op);
|
||||
assertEquals("collection2", op.getParams().get("collection"));
|
||||
assertEquals("node1", op.getParams().get("targetNode"));
|
||||
coreNodeName = op.getParams().get("replica");
|
||||
assertTrue(coreNodeName.equals("core_node3") || coreNodeName.equals("core_node1"));
|
||||
|
||||
suggester = suggester.getSession()
|
||||
.getSuggester(MOVEREPLICA)
|
||||
.hint(Hint.COLL, "collection1")
|
||||
.hint(Hint.COLL, "collection2")
|
||||
.hint(Policy.Suggester.Hint.SRC_NODE, "node2");
|
||||
op = suggester.getOperation();
|
||||
assertNull(op);
|
||||
}
|
||||
|
||||
|
||||
public void testMultipleCollections() {
|
||||
Map policies = (Map) Utils.fromJSONString("{" +
|
||||
" 'cluster-preferences': [" +
|
||||
|
|
Loading…
Reference in New Issue