SOLR-5656: Add autoAddReplicas feature for shared file systems.

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1617919 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Mark Robert Miller 2014-08-14 12:32:27 +00:00
parent c8e9209383
commit f25288764a
59 changed files with 2139 additions and 286 deletions

View File

@ -191,6 +191,8 @@ New Features
* SOLR-2894: Distributed query support for facet.pivot (Dan Cooper, Erik Hatcher, Chris Russell,
Andrew Muldowney, Brett Lucey, Mark Miller, hossman)
* SOLR-5656: Add autoAddReplicas feature for shared file systems. (Mark Miller, Gregory Chanan)
Bug Fixes
----------------------

View File

@ -340,8 +340,8 @@ public class TestSolrEntityProcessorEndToEnd extends AbstractDataImportHandlerTe
}
private JettySolrRunner createJetty(SolrInstance instance) throws Exception {
System.setProperty("solr.data.dir", instance.getDataDir());
JettySolrRunner jetty = new JettySolrRunner(instance.getHomeDir(), "/solr", 0, null, null, true, null, sslConfig);
jetty.setDataDir(instance.getDataDir());
jetty.start();
return jetty;
}

View File

@ -22,6 +22,7 @@ import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.util.StrUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -38,9 +39,8 @@ import java.util.regex.Matcher;
import java.util.regex.Pattern;
import static org.apache.solr.cloud.OverseerCollectionProcessor.CREATE_NODE_SET;
import static org.apache.solr.cloud.OverseerCollectionProcessor.MAX_SHARDS_PER_NODE;
import static org.apache.solr.cloud.OverseerCollectionProcessor.NUM_SLICES;
import static org.apache.solr.cloud.OverseerCollectionProcessor.REPLICATION_FACTOR;
import static org.apache.solr.common.cloud.ZkStateReader.MAX_SHARDS_PER_NODE;
public class Assign {
@ -167,7 +167,7 @@ public class Assign {
if (repFactor > nodeNameVsShardCount.size()) {
log.warn("Specified "
+ REPLICATION_FACTOR
+ ZkStateReader.REPLICATION_FACTOR
+ " of "
+ repFactor
+ " on collection "
@ -186,7 +186,7 @@ public class Assign {
+ ", and the number of live nodes is " + nodeList.size()
+ ". This allows a maximum of " + maxCoresAllowedToCreate
+ " to be created. Value of " + NUM_SLICES + " is " + numSlices
+ " and value of " + REPLICATION_FACTOR + " is " + repFactor
+ " and value of " + ZkStateReader.REPLICATION_FACTOR + " is " + repFactor
+ ". This requires " + requestedCoresToCreate
+ " shards to be created (higher than the allowed number)");
}

View File

@ -0,0 +1,87 @@
package org.apache.solr.cloud;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.io.File;
import java.io.IOException;
import java.util.Map;
import org.apache.commons.io.FileUtils;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.core.CoreDescriptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class CloudUtil {
protected static Logger log = LoggerFactory.getLogger(CloudUtil.class);
/**
* See if coreNodeName has been taken over by another baseUrl and unload core
* + throw exception if it has been.
*/
public static void checkSharedFSFailoverReplaced(CoreContainer cc, CoreDescriptor desc) {
ZkController zkController = cc.getZkController();
String thisCnn = zkController.getCoreNodeName(desc);
String thisBaseUrl = zkController.getBaseUrl();
log.debug("checkSharedFSFailoverReplaced running for coreNodeName={} baseUrl={}", thisCnn, thisBaseUrl);
// if we see our core node name on a different base url, unload
Map<String,Slice> slicesMap = zkController.getClusterState().getSlicesMap(desc.getCloudDescriptor().getCollectionName());
if (slicesMap != null) {
for (Slice slice : slicesMap.values()) {
for (Replica replica : slice.getReplicas()) {
String cnn = replica.getName();
String baseUrl = replica.getStr(ZkStateReader.BASE_URL_PROP);
log.debug("compare against coreNodeName={} baseUrl={}", cnn, baseUrl);
if (thisCnn != null && thisCnn.equals(cnn)
&& !thisBaseUrl.equals(baseUrl)) {
if (cc.getCoreNames().contains(desc.getName())) {
cc.unload(desc.getName());
}
File instanceDir = new File(desc.getInstanceDir());
try {
FileUtils.deleteDirectory(instanceDir);
} catch (IOException e) {
SolrException.log(log, "Failed to delete instance dir for core:"
+ desc.getName() + " dir:" + instanceDir.getAbsolutePath());
}
log.error("", new SolrException(ErrorCode.SERVER_ERROR,
"Will not load SolrCore " + desc.getName()
+ " because it has been replaced due to failover."));
throw new SolrException(ErrorCode.SERVER_ERROR,
"Will not load SolrCore " + desc.getName()
+ " because it has been replaced due to failover.");
}
}
}
}
}
}

View File

@ -1,5 +1,9 @@
package org.apache.solr.cloud;
import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
@ -46,7 +50,7 @@ import java.util.concurrent.TimeUnit;
* limitations under the License.
*/
public abstract class ElectionContext {
public abstract class ElectionContext implements Closeable {
static Logger log = LoggerFactory.getLogger(ElectionContext.class);
final String electionPath;
final ZkNodeProps leaderProps;
@ -64,7 +68,9 @@ public abstract class ElectionContext {
this.zkClient = zkClient;
}
public void close() {}
public void close() {
}
public void cancelElection() throws InterruptedException, KeeperException {
if( leaderSeqPath != null ){
@ -178,6 +184,7 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
@Override
public void close() {
super.close();
this.isClosed = true;
syncStrategy.close();
}
@ -535,6 +542,7 @@ final class OverseerElectionContext extends ElectionContext {
overseer.start(id);
}
@Override
public void cancelElection() throws InterruptedException, KeeperException {
super.cancelElection();
overseer.close();

View File

@ -22,6 +22,8 @@ import static org.apache.solr.common.cloud.ZkNodeProps.makeMap;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDREPLICA;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.CLUSTERPROP;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@ -38,7 +40,6 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.solr.client.solrj.SolrResponse;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ClosableThread;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.DocRouter;
@ -50,7 +51,10 @@ import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkCoreNodeProps;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.core.ConfigSolr;
import org.apache.solr.handler.component.ShardHandler;
import org.apache.solr.update.UpdateShardHandler;
import org.apache.solr.util.IOUtils;
import org.apache.solr.util.stats.Clock;
import org.apache.solr.util.stats.Timer;
import org.apache.solr.util.stats.TimerContext;
@ -62,7 +66,7 @@ import org.slf4j.LoggerFactory;
/**
* Cluster leader. Responsible node assignments, cluster state file?
*/
public class Overseer {
public class Overseer implements Closeable {
public static final String QUEUE_OPERATION = "operation";
public static final String DELETECORE = "deletecore";
public static final String REMOVECOLLECTION = "removecollection";
@ -82,7 +86,7 @@ public class Overseer {
private long lastUpdatedTime = 0;
private class ClusterStateUpdater implements Runnable, ClosableThread {
private class ClusterStateUpdater implements Runnable, Closeable {
private final ZkStateReader reader;
private final SolrZkClient zkClient;
@ -1112,11 +1116,6 @@ public class Overseer {
this.isClosed = true;
}
@Override
public boolean isClosed() {
return this.isClosed;
}
}
static void getShardNames(Integer numShards, List<String> shardNames) {
@ -1141,104 +1140,135 @@ public class Overseer {
}
class OverseerThread extends Thread implements ClosableThread {
class OverseerThread extends Thread implements Closeable {
protected volatile boolean isClosed;
private ClosableThread thread;
private Closeable thread;
public OverseerThread(ThreadGroup tg, ClosableThread thread) {
public OverseerThread(ThreadGroup tg, Closeable thread) {
super(tg, (Runnable) thread);
this.thread = thread;
}
public OverseerThread(ThreadGroup ccTg, ClosableThread thread, String name) {
public OverseerThread(ThreadGroup ccTg, Closeable thread, String name) {
super(ccTg, (Runnable) thread, name);
this.thread = thread;
}
@Override
public void close() {
public void close() throws IOException {
thread.close();
this.isClosed = true;
}
@Override
public boolean isClosed() {
return this.isClosed;
}
}
private volatile OverseerThread ccThread;
private OverseerThread ccThread;
private volatile OverseerThread updaterThread;
private OverseerThread updaterThread;
private ZkStateReader reader;
private OverseerThread arfoThread;
private ShardHandler shardHandler;
private final ZkStateReader reader;
private String adminPath;
private final ShardHandler shardHandler;
private final UpdateShardHandler updateShardHandler;
private final String adminPath;
private OverseerCollectionProcessor overseerCollectionProcessor;
private ZkController zkController;
private Stats stats;
private String id;
private boolean closed;
private ConfigSolr config;
// overseer not responsible for closing reader
public Overseer(ShardHandler shardHandler, String adminPath, final ZkStateReader reader, ZkController zkController) throws KeeperException, InterruptedException {
public Overseer(ShardHandler shardHandler,
UpdateShardHandler updateShardHandler, String adminPath,
final ZkStateReader reader, ZkController zkController, ConfigSolr config)
throws KeeperException, InterruptedException {
this.reader = reader;
this.shardHandler = shardHandler;
this.updateShardHandler = updateShardHandler;
this.adminPath = adminPath;
this.zkController = zkController;
this.stats = new Stats();
this.config = config;
}
public void start(String id) {
close();
public synchronized void start(String id) {
this.id = id;
closed = false;
doClose();
log.info("Overseer (id=" + id + ") starting");
createOverseerNode(reader.getZkClient());
//launch cluster state updater thread
ThreadGroup tg = new ThreadGroup("Overseer state updater.");
updaterThread = new OverseerThread(tg, new ClusterStateUpdater(reader, id, stats));
updaterThread = new OverseerThread(tg, new ClusterStateUpdater(reader, id, stats), "OverseerStateUpdate-" + id);
updaterThread.setDaemon(true);
ThreadGroup ccTg = new ThreadGroup("Overseer collection creation process.");
overseerCollectionProcessor = new OverseerCollectionProcessor(reader, id, shardHandler, adminPath, stats);
ccThread = new OverseerThread(ccTg, overseerCollectionProcessor, "Overseer-" + id);
ccThread = new OverseerThread(ccTg, overseerCollectionProcessor, "OverseerCollectionProcessor-" + id);
ccThread.setDaemon(true);
ThreadGroup ohcfTg = new ThreadGroup("Overseer Hdfs SolrCore Failover Thread.");
OverseerAutoReplicaFailoverThread autoReplicaFailoverThread = new OverseerAutoReplicaFailoverThread(config, reader, updateShardHandler);
arfoThread = new OverseerThread(ohcfTg, autoReplicaFailoverThread, "OverseerHdfsCoreFailoverThread-" + id);
arfoThread.setDaemon(true);
updaterThread.start();
ccThread.start();
arfoThread.start();
}
public OverseerThread getUpdaterThread() {
/**
* For tests.
*
* @lucene.internal
* @return state updater thread
*/
public synchronized OverseerThread getUpdaterThread() {
return updaterThread;
}
public void close() {
try {
public synchronized void close() {
if (closed) return;
log.info("Overseer (id=" + id + ") closing");
doClose();
this.closed = true;
}
private void doClose() {
if (updaterThread != null) {
try {
updaterThread.close();
IOUtils.closeQuietly(updaterThread);
updaterThread.interrupt();
} catch (Exception e) {
log.error("Error closing updaterThread", e);
}
}
} finally {
if (ccThread != null) {
try {
ccThread.close();
IOUtils.closeQuietly(ccThread);
ccThread.interrupt();
} catch (Exception e) {
log.error("Error closing ccThread", e);
}
}
if (arfoThread != null) {
IOUtils.closeQuietly(arfoThread);
arfoThread.interrupt();
}
updaterThread = null;
ccThread = null;
arfoThread = null;
}
/**

View File

@ -0,0 +1,501 @@
package org.apache.solr.cloud;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.io.Closeable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.solr.client.solrj.impl.HttpSolrServer;
import org.apache.solr.client.solrj.request.CoreAdminRequest.Create;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.ClusterStateUtil;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.core.ConfigSolr;
import org.apache.solr.update.UpdateShardHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
// TODO: how to tmp exclude nodes?
// TODO: more fine grained failover rules?
// TODO: test with lots of collections
// TODO: add config for only failover if replicas is < N
// TODO: general support for non shared filesystems
// this is specialized for a shared file system, but it should
// not be much work to generalize
// NOTE: using replication can slow down failover if a whole
// shard is lost.
/**
*
* In this simple initial implementation we are limited in how quickly we detect
* a failure by a worst case of roughly zk session timeout + WAIT_AFTER_EXPIRATION_SECONDS + WORK_LOOP_DELAY_MS
* and best case of roughly zk session timeout + WAIT_AFTER_EXPIRATION_SECONDS. Also, consider the time to
* create the SolrCore, do any recovery necessary, and warm up the readers.
*
* NOTE: this will only work with collections created via the collections api because they will have defined
* replicationFactor and maxShardsPerNode.
*
* @lucene.experimental
*/
public class OverseerAutoReplicaFailoverThread implements Runnable, Closeable {
private static Logger log = LoggerFactory.getLogger(OverseerAutoReplicaFailoverThread.class);
private Integer lastClusterStateVersion;
private final ExecutorService updateExecutor;
private volatile boolean isClosed;
private ZkStateReader zkStateReader;
private final Cache<String,Long> baseUrlForBadNodes;
private final int workLoopDelay;
private final int waitAfterExpiration;
public OverseerAutoReplicaFailoverThread(ConfigSolr config, ZkStateReader zkStateReader,
UpdateShardHandler updateShardHandler) {
this.zkStateReader = zkStateReader;
this.workLoopDelay = config.getAutoReplicaFailoverWorkLoopDelay();
this.waitAfterExpiration = config.getAutoReplicaFailoverWaitAfterExpiration();
int badNodeExpiration = config.getAutoReplicaFailoverBadNodeExpiration();
log.info(
"Starting "
+ this.getClass().getSimpleName()
+ " autoReplicaFailoverWorkLoopDelay={} autoReplicaFailoverWaitAfterExpiration={} autoReplicaFailoverBadNodeExpiration={}",
workLoopDelay, waitAfterExpiration, badNodeExpiration);
baseUrlForBadNodes = CacheBuilder.newBuilder()
.concurrencyLevel(1).expireAfterWrite(badNodeExpiration, TimeUnit.MILLISECONDS).build();
// TODO: Speed up our work loop when live_nodes changes??
updateExecutor = updateShardHandler.getUpdateExecutor();
// TODO: perhaps do a health ping periodically to each node (scaryish)
// And/OR work on JIRA issue around self health checks (SOLR-5805)
}
@Override
public void run() {
while (!this.isClosed) {
// work loop
log.debug("do " + this.getClass().getSimpleName() + " work loop");
// every n, look at state and make add / remove calls
try {
doWork();
} catch (Exception e) {
SolrException.log(log, this.getClass().getSimpleName()
+ " had an error it's thread work loop.", e);
}
if (!this.isClosed) {
try {
Thread.sleep(workLoopDelay);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}
private void doWork() {
// TODO: extract to configurable strategy class ??
ClusterState clusterState = zkStateReader.getClusterState();
if (clusterState != null) {
if (lastClusterStateVersion == clusterState.getZkClusterStateVersion() && baseUrlForBadNodes.size() == 0) {
// nothing has changed, no work to do
return;
}
lastClusterStateVersion = clusterState.getZkClusterStateVersion();
Set<String> collections = clusterState.getCollections();
for (final String collection : collections) {
DocCollection docCollection = clusterState.getCollection(collection);
if (!docCollection.getAutoAddReplicas()) {
continue;
}
if (docCollection.getReplicationFactor() == null) {
log.debug("Skipping collection because it has no defined replicationFactor, name={}", docCollection.getName());
continue;
}
log.debug("Found collection, name={} replicationFactor=", collection, docCollection.getReplicationFactor());
Collection<Slice> slices = docCollection.getSlices();
for (Slice slice : slices) {
if (slice.getState().equals(Slice.ACTIVE)) {
final Collection<DownReplica> downReplicas = new ArrayList<DownReplica>();
int goodReplicas = findDownReplicasInSlice(clusterState, docCollection, slice, downReplicas);
log.debug("replicationFactor={} goodReplicaCount={}", docCollection.getReplicationFactor(), goodReplicas);
if (downReplicas.size() > 0 && goodReplicas < docCollection.getReplicationFactor()) {
// badReplicaMap.put(collection, badReplicas);
processBadReplicas(collection, downReplicas);
} else if (goodReplicas > docCollection.getReplicationFactor()) {
log.debug("There are too many replicas");
}
}
}
}
}
}
private void processBadReplicas(final String collection, final Collection<DownReplica> badReplicas) {
for (DownReplica badReplica : badReplicas) {
log.debug("process down replica {}", badReplica.replica.getName());
String baseUrl = badReplica.replica.getStr(ZkStateReader.BASE_URL_PROP);
Long wentBadAtNS = baseUrlForBadNodes.getIfPresent(baseUrl);
if (wentBadAtNS == null) {
log.warn("Replica {} may need to failover.",
badReplica.replica.getName());
baseUrlForBadNodes.put(baseUrl, System.nanoTime());
} else {
long elasped = System.nanoTime() - wentBadAtNS;
if (elasped < TimeUnit.NANOSECONDS.convert(waitAfterExpiration, TimeUnit.MILLISECONDS)) {
// protect against ZK 'flapping', startup and shutdown
log.debug("Looks troublesome...continue. Elapsed={}", elasped + "ns");
} else {
log.debug("We need to add a replica. Elapsed={}", elasped + "ns");
if (addReplica(collection, badReplica)) {
baseUrlForBadNodes.invalidate(baseUrl);
}
}
}
}
}
private boolean addReplica(final String collection, DownReplica badReplica) {
// first find best home - first strategy, sort by number of cores
// hosted where maxCoresPerNode is not violated
final String createUrl = getBestCreateUrl(zkStateReader, badReplica);
if (createUrl == null) {
log.warn("Could not find a node to create new replica on.");
return false;
}
// NOTE: we send the absolute path, which will slightly change
// behavior of these cores as they won't respond to changes
// in the solr.hdfs.home sys prop as they would have.
final String dataDir = badReplica.replica.getStr("dataDir");
final String ulogDir = badReplica.replica.getStr("ulogDir");
final String coreNodeName = badReplica.replica.getName();
if (dataDir != null) {
// need an async request - full shard goes down leader election
final String coreName = badReplica.replica.getStr(ZkStateReader.CORE_NAME_PROP);
log.debug("submit call to {}", createUrl);
updateExecutor.submit(new Callable<Boolean>() {
@Override
public Boolean call() {
return createSolrCore(collection, createUrl, dataDir, ulogDir, coreNodeName, coreName);
}
});
// wait to see state for core we just created
boolean success = ClusterStateUtil.waitToSeeLive(zkStateReader, collection, coreNodeName, createUrl, 30);
if (!success) {
log.error("Creating new replica appears to have failed, timed out waiting to see created SolrCore register in the clusterstate.");
return false;
}
return true;
}
log.warn("Could not find dataDir or ulogDir in cluster state.");
return false;
}
private static int findDownReplicasInSlice(ClusterState clusterState, DocCollection collection, Slice slice, final Collection<DownReplica> badReplicas) {
int goodReplicas = 0;
Collection<Replica> replicas = slice.getReplicas();
if (replicas != null) {
for (Replica replica : replicas) {
// on a live node?
boolean live = clusterState.liveNodesContain(replica.getNodeName());
String state = replica.getStr(ZkStateReader.STATE_PROP);
boolean okayState = (state.equals(ZkStateReader.DOWN)
|| state.equals(ZkStateReader.RECOVERING) || state
.equals(ZkStateReader.ACTIVE));
log.debug("Process replica name={} live={} state={}", replica.getName(), live, state);
if (live && okayState) {
goodReplicas++;
} else {
DownReplica badReplica = new DownReplica();
badReplica.replica = replica;
badReplica.slice = slice;
badReplica.collection = collection;
badReplicas.add(badReplica);
}
}
}
log.debug("bad replicas for slice {}", badReplicas);
return goodReplicas;
}
/**
*
* @return the best node to replace the badReplica on or null if there is no
* such node
*/
static String getBestCreateUrl(ZkStateReader zkStateReader, DownReplica badReplica) {
assert badReplica != null;
assert badReplica.collection != null;
assert badReplica.slice != null;
Map<String,Counts> counts = new HashMap<>();
ValueComparator vc = new ValueComparator(counts);
Set<String> liveNodes = new HashSet<>(zkStateReader.getClusterState().getLiveNodes());
ClusterState clusterState = zkStateReader.getClusterState();
if (clusterState != null) {
Set<String> collections = clusterState.getCollections();
for (String collection : collections) {
log.debug("look at collection {} as possible create candidate", collection);
DocCollection docCollection = clusterState.getCollection(collection);
// TODO - only operate on collections with sharedfs failover = true ??
Collection<Slice> slices = docCollection.getSlices();
for (Slice slice : slices) {
// only look at active shards
if (slice.getState().equals(Slice.ACTIVE)) {
log.debug("look at slice {} as possible create candidate", slice.getName());
Collection<Replica> replicas = slice.getReplicas();
for (Replica replica : replicas) {
liveNodes.remove(replica.getNodeName());
if (replica.getStr(ZkStateReader.BASE_URL_PROP).equals(
badReplica.replica.getStr(ZkStateReader.BASE_URL_PROP))) {
continue;
}
String baseUrl = replica.getStr(ZkStateReader.BASE_URL_PROP);
// on a live node?
log.debug("nodename={} livenodes={}", replica.getNodeName(), clusterState.getLiveNodes());
boolean live = clusterState.liveNodesContain(replica.getNodeName());
log.debug("look at replica {} as possible create candidate, live={}", replica.getName(), live);
if (live) {
Counts cnt = counts.get(baseUrl);
if (cnt == null) {
cnt = new Counts();
}
if (badReplica.collection.getName().equals(collection)) {
cnt.negRankingWeight += 3;
cnt.collectionShardsOnNode += 1;
} else {
cnt.negRankingWeight += 1;
}
if (badReplica.collection.getName().equals(collection) && badReplica.slice.getName().equals(slice.getName())) {
cnt.ourReplicas++;
}
// TODO: this is collection wide and we want to take into
// account cluster wide - use new cluster sys prop
int maxShardsPerNode = docCollection.getMaxShardsPerNode();
log.debug("max shards per node={} good replicas={}", maxShardsPerNode, cnt);
Collection<Replica> badSliceReplicas = null;
DocCollection c = clusterState.getCollection(badReplica.collection.getName());
if (c != null) {
Slice s = c.getSlice(badReplica.slice.getName());
if (s != null) {
badSliceReplicas = s.getReplicas();
}
}
boolean alreadyExistsOnNode = replicaAlreadyExistsOnNode(zkStateReader.getClusterState(), badSliceReplicas, badReplica, baseUrl);
if (alreadyExistsOnNode || cnt.collectionShardsOnNode >= maxShardsPerNode) {
counts.remove(replica.getStr(ZkStateReader.BASE_URL_PROP));
} else {
counts.put(replica.getStr(ZkStateReader.BASE_URL_PROP), cnt);
}
}
}
}
}
}
}
for (String node : liveNodes) {
counts.put(zkStateReader.getBaseUrlForNodeName(node), new Counts(0, 0));
}
if (counts.size() == 0) {
return null;
}
Map<String,Counts> sortedCounts = new TreeMap<>(vc);
sortedCounts.putAll(counts);
log.debug("empty nodes={}", liveNodes);
log.debug("sorted hosts={}", sortedCounts);
return sortedCounts.keySet().iterator().next();
}
private static boolean replicaAlreadyExistsOnNode(ClusterState clusterState, Collection<Replica> replicas, DownReplica badReplica, String baseUrl) {
if (replicas != null) {
log.debug("check if replica already exists on node using replicas {}", getNames(replicas));
for (Replica replica : replicas) {
if (!replica.getName().equals(badReplica.replica.getName()) && replica.getStr(ZkStateReader.BASE_URL_PROP).equals(baseUrl)
&& clusterState.liveNodesContain(replica.getNodeName())
&& (replica.getStr(ZkStateReader.STATE_PROP).equals(
ZkStateReader.ACTIVE)
|| replica.getStr(ZkStateReader.STATE_PROP).equals(
ZkStateReader.DOWN) || replica.getStr(
ZkStateReader.STATE_PROP).equals(ZkStateReader.RECOVERING))) {
log.debug("replica already exists on node, bad replica={}, existing replica={}, node name={}", badReplica.replica.getName(), replica.getName(), replica.getNodeName());
return true;
}
}
}
log.debug("replica does not yet exist on node: {}", baseUrl);
return false;
}
private static Object getNames(Collection<Replica> replicas) {
Set<String> names = new HashSet<>(replicas.size());
for (Replica replica : replicas) {
names.add(replica.getName());
}
return names;
}
private boolean createSolrCore(final String collection,
final String createUrl, final String dataDir, final String ulogDir,
final String coreNodeName, final String coreName) {
HttpSolrServer server = null;
try {
log.debug("create url={}", createUrl);
server = new HttpSolrServer(createUrl);
server.setConnectionTimeout(30000);
server.setSoTimeout(60000);
Create createCmd = new Create();
createCmd.setCollection(collection);
createCmd.setCoreNodeName(coreNodeName);
// TODO: how do we ensure unique coreName
// for now, the collections API will use unique names
createCmd.setCoreName(coreName);
createCmd.setDataDir(dataDir);
createCmd.setUlogDir(ulogDir);
server.request(createCmd);
} catch (Exception e) {
SolrException.log(log, "Exception trying to create new replica on " + createUrl, e);
return false;
} finally {
if (server != null) {
server.shutdown();
}
}
return true;
}
private static class ValueComparator implements Comparator<String> {
Map<String,Counts> map;
public ValueComparator(Map<String,Counts> map) {
this.map = map;
}
public int compare(String a, String b) {
if (map.get(a).negRankingWeight >= map.get(b).negRankingWeight) {
return 1;
} else {
return -1;
}
}
}
@Override
public void close() {
isClosed = true;
}
public boolean isClosed() {
return isClosed;
}
private static class Counts {
int collectionShardsOnNode = 0;
int negRankingWeight = 0;
int ourReplicas = 0;
private Counts() {
}
private Counts(int totalReplicas, int ourReplicas) {
this.negRankingWeight = totalReplicas;
this.ourReplicas = ourReplicas;
}
@Override
public String toString() {
return "Counts [negRankingWeight=" + negRankingWeight + ", sameSliceCount="
+ ourReplicas + "]";
}
}
static class DownReplica {
Replica replica;
Slice slice;
DocCollection collection;
@Override
public String toString() {
return "DownReplica [replica=" + replica.getName() + ", slice="
+ slice.getName() + ", collection=" + collection.getName() + "]";
}
}
}

View File

@ -17,7 +17,34 @@ package org.apache.solr.cloud;
* limitations under the License.
*/
import com.google.common.collect.ImmutableSet;
import static org.apache.solr.cloud.Assign.getNodesForNewShard;
import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.REPLICA_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDREPLICA;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDROLE;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.CLUSTERSTATUS;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.LIST;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.OVERSEERSTATUS;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.REMOVEROLE;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.solr.client.solrj.SolrResponse;
import org.apache.solr.client.solrj.SolrServerException;
@ -26,12 +53,12 @@ import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
import org.apache.solr.client.solrj.request.CoreAdminRequest;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.client.solrj.response.UpdateResponse;
import org.apache.solr.cloud.Assign.Node;
import org.apache.solr.cloud.DistributedQueue.QueueEvent;
import org.apache.solr.cloud.Overseer.LeaderStatus;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.cloud.Aliases;
import org.apache.solr.common.cloud.ClosableThread;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.CompositeIdRouter;
import org.apache.solr.common.cloud.DocCollection;
@ -70,42 +97,17 @@ import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import static org.apache.solr.cloud.Assign.Node;
import static org.apache.solr.cloud.Assign.getNodesForNewShard;
import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.REPLICA_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDREPLICA;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDROLE;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.CLUSTERSTATUS;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.LIST;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.OVERSEERSTATUS;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.REMOVEROLE;
import com.google.common.collect.ImmutableSet;
public class OverseerCollectionProcessor implements Runnable, ClosableThread {
public class OverseerCollectionProcessor implements Runnable, Closeable {
public static final String NUM_SLICES = "numShards";
// @Deprecated- see on ZkStateReader
public static final String REPLICATION_FACTOR = "replicationFactor";
// @Deprecated- see on ZkStateReader
public static final String MAX_SHARDS_PER_NODE = "maxShardsPerNode";
public static final String CREATE_NODE_SET = "createNodeSet";
@ -148,9 +150,9 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread {
public static final Map<String,Object> COLL_PROPS = ZkNodeProps.makeMap(
ROUTER, DocRouter.DEFAULT_NAME,
REPLICATION_FACTOR, "1",
MAX_SHARDS_PER_NODE, "1" );
ZkStateReader.REPLICATION_FACTOR, "1",
ZkStateReader.MAX_SHARDS_PER_NODE, "1",
ZkStateReader.AUTO_ADD_REPLICAS, "false");
public ExecutorService tpe ;
@ -1104,8 +1106,8 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread {
ShardHandler shardHandler = shardHandlerFactory.getShardHandler();
DocCollection collection = clusterState.getCollection(collectionName);
int maxShardsPerNode = collection.getInt(MAX_SHARDS_PER_NODE, 1);
int repFactor = message.getInt(REPLICATION_FACTOR, collection.getInt(REPLICATION_FACTOR, 1));
int maxShardsPerNode = collection.getInt(ZkStateReader.MAX_SHARDS_PER_NODE, 1);
int repFactor = message.getInt(ZkStateReader.REPLICATION_FACTOR, collection.getInt(ZkStateReader.REPLICATION_FACTOR, 1));
String createNodeSetStr = message.getStr(CREATE_NODE_SET);
ArrayList<Node> sortedNodeList = getNodesForNewShard(clusterState, collectionName, numSlices, maxShardsPerNode, repFactor, createNodeSetStr);
@ -1878,7 +1880,7 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread {
Map<String, Object> props = ZkNodeProps.makeMap(
Overseer.QUEUE_OPERATION, CREATECOLLECTION,
"name", tempSourceCollectionName,
REPLICATION_FACTOR, 1,
ZkStateReader.REPLICATION_FACTOR, 1,
NUM_SLICES, 1,
COLL_CONF, configName,
CREATE_NODE_SET, sourceLeader.getNodeName());
@ -2080,7 +2082,7 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread {
// look at the replication factor and see if it matches reality
// if it does not, find best nodes to create more cores
int repFactor = message.getInt( REPLICATION_FACTOR, 1);
int repFactor = message.getInt(ZkStateReader.REPLICATION_FACTOR, 1);
ShardHandler shardHandler = shardHandlerFactory.getShardHandler();
String async = null;
@ -2100,12 +2102,12 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread {
throw new SolrException(ErrorCode.BAD_REQUEST, NUM_SLICES + " is a required param");
}
int maxShardsPerNode = message.getInt(MAX_SHARDS_PER_NODE, 1);
int maxShardsPerNode = message.getInt(ZkStateReader.MAX_SHARDS_PER_NODE, 1);
String createNodeSetStr;
List<String> createNodeList = ((createNodeSetStr = message.getStr(CREATE_NODE_SET)) == null)?null:StrUtils.splitSmart(createNodeSetStr, ",", true);
if (repFactor <= 0) {
throw new SolrException(ErrorCode.BAD_REQUEST, REPLICATION_FACTOR + " must be greater than 0");
throw new SolrException(ErrorCode.BAD_REQUEST, ZkStateReader.REPLICATION_FACTOR + " must be greater than 0");
}
if (numSlices <= 0) {
@ -2132,7 +2134,7 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread {
if (repFactor > nodeList.size()) {
log.warn("Specified "
+ REPLICATION_FACTOR
+ ZkStateReader.REPLICATION_FACTOR
+ " of "
+ repFactor
+ " on collection "
@ -2146,11 +2148,11 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread {
int requestedShardsToCreate = numSlices * repFactor;
if (maxShardsAllowedToCreate < requestedShardsToCreate) {
throw new SolrException(ErrorCode.BAD_REQUEST, "Cannot create collection " + collectionName + ". Value of "
+ MAX_SHARDS_PER_NODE + " is " + maxShardsPerNode
+ ZkStateReader.MAX_SHARDS_PER_NODE + " is " + maxShardsPerNode
+ ", and the number of live nodes is " + nodeList.size()
+ ". This allows a maximum of " + maxShardsAllowedToCreate
+ " to be created. Value of " + NUM_SLICES + " is " + numSlices
+ " and value of " + REPLICATION_FACTOR + " is " + repFactor
+ " and value of " + ZkStateReader.REPLICATION_FACTOR + " is " + repFactor
+ ". This requires " + requestedShardsToCreate
+ " shards to be created (higher than the allowed number)");
}
@ -2301,7 +2303,7 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread {
ShardHandler shardHandler = shardHandlerFactory.getShardHandler();
if (node == null) {
node = getNodesForNewShard(clusterState, collection, coll.getSlices().size(), coll.getInt(MAX_SHARDS_PER_NODE, 1), coll.getInt(REPLICATION_FACTOR, 1), null).get(0).nodeName;
node = getNodesForNewShard(clusterState, collection, coll.getSlices().size(), coll.getInt(ZkStateReader.MAX_SHARDS_PER_NODE, 1), coll.getInt(ZkStateReader.REPLICATION_FACTOR, 1), null).get(0).nodeName;
log.info("Node not provided, Identified {} for creating new replica", node);
}
@ -2504,7 +2506,6 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread {
}
}
@Override
public boolean isClosed() {
return isClosed;
}

View File

@ -47,6 +47,7 @@ import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.cloud.BeforeReconnect;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.ClusterStateUtil;
import org.apache.solr.common.cloud.DefaultConnectionStrategy;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.OnReconnect;
@ -66,6 +67,7 @@ import org.apache.solr.core.CoreDescriptor;
import org.apache.solr.core.SolrCore;
import org.apache.solr.handler.component.ShardHandler;
import org.apache.solr.update.UpdateLog;
import org.apache.solr.update.UpdateShardHandler;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.ConnectionLossException;
@ -263,6 +265,9 @@ public final class ZkController {
// leaders/overseers
// with connection loss
try {
// unload solrcores that have been 'failed over'
throwErrorIfReplicaReplaced(descriptor);
register(descriptor.getName(), descriptor, true, true);
} catch (Exception e) {
SolrException.log(log, "Error registering SolrCore", e);
@ -554,14 +559,18 @@ public final class ZkController {
cmdExecutor.ensureExists(ZkStateReader.COLLECTIONS_ZKNODE, zkClient);
ShardHandler shardHandler;
UpdateShardHandler updateShardHandler;
String adminPath;
shardHandler = cc.getShardHandlerFactory().getShardHandler();
updateShardHandler = cc.getUpdateShardHandler();
adminPath = cc.getAdminPath();
if (!zkRunOnly) {
overseerElector = new LeaderElector(zkClient);
this.overseer = new Overseer(shardHandler, adminPath, zkStateReader, this);
ElectionContext context = new OverseerElectionContext(zkClient, overseer, getNodeName());
this.overseer = new Overseer(shardHandler, updateShardHandler,
adminPath, zkStateReader, this, cc.getConfig());
ElectionContext context = new OverseerElectionContext(zkClient,
overseer, getNodeName());
overseerElector.setup(context);
overseerElector.joinElection(context, false);
}
@ -1075,17 +1084,36 @@ public final class ZkController {
}
}
//assert cd.getCloudDescriptor().getShardId() != null;
ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, "state",
ZkStateReader.STATE_PROP, state,
ZkStateReader.BASE_URL_PROP, getBaseUrl(),
ZkStateReader.CORE_NAME_PROP, cd.getName(),
ZkStateReader.ROLES_PROP, cd.getCloudDescriptor().getRoles(),
ZkStateReader.NODE_NAME_PROP, getNodeName(),
ZkStateReader.SHARD_ID_PROP, cd.getCloudDescriptor().getShardId(),
ZkStateReader.COLLECTION_PROP, collection,
ZkStateReader.NUM_SHARDS_PROP, numShards != null ? numShards.toString() : null,
ZkStateReader.CORE_NODE_NAME_PROP, coreNodeName != null ? coreNodeName : null);
Map<String, Object> props = new HashMap<String, Object>();
props.put(Overseer.QUEUE_OPERATION, "state");
props.put(ZkStateReader.STATE_PROP, state);
props.put(ZkStateReader.BASE_URL_PROP, getBaseUrl());
props.put(ZkStateReader.CORE_NAME_PROP, cd.getName());
props.put(ZkStateReader.ROLES_PROP, cd.getCloudDescriptor().getRoles());
props.put(ZkStateReader.NODE_NAME_PROP, getNodeName());
props.put(ZkStateReader.SHARD_ID_PROP, cd.getCloudDescriptor().getShardId());
props.put(ZkStateReader.COLLECTION_PROP, collection);
if (numShards != null) {
props.put(ZkStateReader.NUM_SHARDS_PROP, numShards.toString());
}
if (coreNodeName != null) {
props.put(ZkStateReader.CORE_NODE_NAME_PROP, coreNodeName);
}
if (ClusterStateUtil.isAutoAddReplicas(getZkStateReader(), collection)) {
try (SolrCore core = cc.getCore(cd.getName())) {
if (core != null && core.getDirectoryFactory().isSharedStorage()) {
props.put("dataDir", core.getDataDir());
UpdateLog ulog = core.getUpdateHandler().getUpdateLog();
if (ulog != null) {
props.put("ulogDir", ulog.getLogDir());
}
}
}
}
ZkNodeProps m = new ZkNodeProps(props);
if (updateLastState) {
cd.getCloudDescriptor().lastPublished = state;
}
@ -1425,7 +1453,6 @@ public final class ZkController {
}
publish(cd, ZkStateReader.DOWN, false, true);
DocCollection collection = zkStateReader.getClusterState().getCollectionOrNull(cd.getCloudDescriptor().getCollectionName());
} catch (KeeperException e) {
log.error("", e);
throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e);
@ -1941,4 +1968,18 @@ public final class ZkController {
public String getLeaderInitiatedRecoveryZnodePath(String collection, String shardId, String coreNodeName) {
return getLeaderInitiatedRecoveryZnodePath(collection, shardId)+"/"+coreNodeName;
}
public void throwErrorIfReplicaReplaced(CoreDescriptor desc) {
ClusterState clusterState = getZkStateReader().getClusterState();
if (clusterState != null) {
DocCollection collection = clusterState.getCollectionOrNull(desc
.getCloudDescriptor().getCollectionName());
if (collection != null) {
boolean autoAddReplicas = ClusterStateUtil.isAutoAddReplicas( getZkStateReader(), collection.getName());
if (autoAddReplicas) {
CloudUtil.checkSharedFSFailoverReplaced(cc, desc);
}
}
}
}
}

View File

@ -134,6 +134,11 @@ public abstract class ConfigSolr {
private static final int DEFAULT_LEADER_CONFLICT_RESOLVE_WAIT = 180000;
private static final int DEFAULT_CORE_LOAD_THREADS = 3;
// TODO: tune defaults
private static final int DEFAULT_AUTO_REPLICA_FAILOVER_WAIT_AFTER_EXPIRATION = 30000;
private static final int DEFAULT_AUTO_REPLICA_FAILOVER_WORKLOOP_DELAY = 10000;
private static final int DEFAULT_AUTO_REPLICA_FAILOVER_BAD_NODE_EXPIRATION = 60000;
protected static final String DEFAULT_CORE_ADMIN_PATH = "/admin/cores";
public String getZkHostPort() {
@ -156,6 +161,18 @@ public abstract class ConfigSolr {
return get(CfgProp.SOLR_LEADERCONFLICTRESOLVEWAIT, DEFAULT_LEADER_CONFLICT_RESOLVE_WAIT);
}
public int getAutoReplicaFailoverWaitAfterExpiration() {
return get(CfgProp.SOLR_AUTOREPLICAFAILOVERWAITAFTEREXPIRATION, DEFAULT_AUTO_REPLICA_FAILOVER_WAIT_AFTER_EXPIRATION);
}
public int getAutoReplicaFailoverWorkLoopDelay() {
return get(CfgProp.SOLR_AUTOREPLICAFAILOVERWORKLOOPDELAY, DEFAULT_AUTO_REPLICA_FAILOVER_WORKLOOP_DELAY);
}
public int getAutoReplicaFailoverBadNodeExpiration() {
return get(CfgProp.SOLR_AUTOREPLICAFAILOVERBADNODEEXPIRATION, DEFAULT_AUTO_REPLICA_FAILOVER_BAD_NODE_EXPIRATION);
}
public boolean getGenericCoreNodeNames() {
return get(CfgProp.SOLR_GENERICCORENODENAMES, false);
}
@ -270,6 +287,10 @@ public abstract class ConfigSolr {
SOLR_LEADERCONFLICTRESOLVEWAIT,
SOLR_CONFIGSETBASEDIR,
SOLR_AUTOREPLICAFAILOVERWAITAFTEREXPIRATION,
SOLR_AUTOREPLICAFAILOVERWORKLOOPDELAY,
SOLR_AUTOREPLICAFAILOVERBADNODEEXPIRATION,
//TODO: Remove all of these elements for 5.0
SOLR_PERSISTENT,
SOLR_CORES_DEFAULT_CORE_NAME,

View File

@ -153,6 +153,9 @@ public class ConfigSolrXml extends ConfigSolr {
storeConfigPropertyAsInt(s, nl, CfgProp.SOLR_LEADERVOTEWAIT, "leaderVoteWait");
storeConfigPropertyAsInt(s, nl, CfgProp.SOLR_LEADERCONFLICTRESOLVEWAIT, "leaderConflictResolveWait");
storeConfigPropertyAsInt(s, nl, CfgProp.SOLR_ZKCLIENTTIMEOUT, "zkClientTimeout");
storeConfigPropertyAsInt(s, nl, CfgProp.SOLR_AUTOREPLICAFAILOVERBADNODEEXPIRATION, "autoReplicaFailoverBadNodeExpiration");
storeConfigPropertyAsInt(s, nl, CfgProp.SOLR_AUTOREPLICAFAILOVERWAITAFTEREXPIRATION, "autoReplicaFailoverWaitAfterExpiration");
storeConfigPropertyAsInt(s, nl, CfgProp.SOLR_AUTOREPLICAFAILOVERWORKLOOPDELAY, "autoReplicaFailoverWorkLoopDelay");
storeConfigPropertyAsString(s, nl, CfgProp.SOLR_HOST, "host");
storeConfigPropertyAsString(s, nl, CfgProp.SOLR_HOSTCONTEXT, "hostContext");

View File

@ -142,6 +142,9 @@ public class ConfigSolrXmlOld extends ConfigSolr {
storeConfigPropertyAsString(CfgProp.SOLR_HOSTPORT, "solr/cores/@hostPort");
storeConfigPropertyAsInt(CfgProp.SOLR_LEADERVOTEWAIT, "solr/cores/@leaderVoteWait");
storeConfigPropertyAsBoolean(CfgProp.SOLR_GENERICCORENODENAMES, "solr/cores/@genericCoreNodeNames");
storeConfigPropertyAsBoolean(CfgProp.SOLR_AUTOREPLICAFAILOVERBADNODEEXPIRATION, "solr/cores/@autoReplicaFailoverBadNodeExpiration");
storeConfigPropertyAsBoolean(CfgProp.SOLR_AUTOREPLICAFAILOVERWAITAFTEREXPIRATION, "solr/cores/@autoReplicaFailoverWaitAfterExpiration");
storeConfigPropertyAsBoolean(CfgProp.SOLR_AUTOREPLICAFAILOVERWORKLOOPDELAY, "solr/cores/@autoReplicaFailoverWorkLoopDelay");
storeConfigPropertyAsString(CfgProp.SOLR_MANAGEMENTPATH, "solr/cores/@managementPath");
storeConfigPropertyAsBoolean(CfgProp.SOLR_SHARESCHEMA, "solr/cores/@shareSchema");
storeConfigPropertyAsInt(CfgProp.SOLR_TRANSIENTCACHESIZE, "solr/cores/@transientCacheSize");

View File

@ -17,8 +17,20 @@
package org.apache.solr.core;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import static com.google.common.base.Preconditions.checkNotNull;
import java.io.File;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.solr.cloud.ZkController;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
@ -35,19 +47,8 @@ import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import static com.google.common.base.Preconditions.checkNotNull;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
/**
@ -248,6 +249,9 @@ public class CoreContainer {
creators.add(new Callable<SolrCore>() {
@Override
public SolrCore call() throws Exception {
if (zkSys.getZkController() != null) {
zkSys.getZkController().throwErrorIfReplicaReplaced(cd);
}
return create(cd, false);
}
});
@ -753,6 +757,9 @@ public class CoreContainer {
// the wait as a consequence of shutting down.
try {
if (core == null) {
if (zkSys.getZkController() != null) {
zkSys.getZkController().throwErrorIfReplicaReplaced(desc);
}
core = create(desc); // This should throw an error if it fails.
}
core.open();
@ -856,6 +863,10 @@ public class CoreContainer {
return zkSys.getZkController();
}
public ConfigSolr getConfig() {
return cfg;
}
/** The default ShardHandlerFactory used to communicate with other solr instances */
public ShardHandlerFactory getShardHandlerFactory() {
return shardHandlerFactory;

View File

@ -161,6 +161,13 @@ public abstract class DirectoryFactory implements NamedListInitializedPlugin,
*/
public abstract boolean isPersistent();
/**
* @return true if storage is shared.
*/
public boolean isSharedStorage() {
return false;
}
/**
* Releases the Directory so that it may be closed when it is no longer
* referenced.

View File

@ -86,6 +86,8 @@ public class HdfsDirectoryFactory extends CachingDirectoryFactory {
this.hdfsDataDir = params.get(HDFS_HOME);
if (this.hdfsDataDir != null && this.hdfsDataDir.length() == 0) {
this.hdfsDataDir = null;
} else {
LOG.info(HDFS_HOME + "=" + this.hdfsDataDir);
}
boolean kerberosEnabled = params.getBool(KERBEROS_ENABLED, false);
LOG.info("Solr Kerberos Authentication "
@ -253,6 +255,11 @@ public class HdfsDirectoryFactory extends CachingDirectoryFactory {
return true;
}
@Override
public boolean isSharedStorage() {
return true;
}
@Override
public boolean searchersReserveCommitPoints() {
return true;

View File

@ -23,9 +23,7 @@ import static org.apache.solr.cloud.OverseerCollectionProcessor.COLL_CONF;
import static org.apache.solr.cloud.OverseerCollectionProcessor.CREATESHARD;
import static org.apache.solr.cloud.OverseerCollectionProcessor.CREATE_NODE_SET;
import static org.apache.solr.cloud.OverseerCollectionProcessor.DELETEREPLICA;
import static org.apache.solr.cloud.OverseerCollectionProcessor.MAX_SHARDS_PER_NODE;
import static org.apache.solr.cloud.OverseerCollectionProcessor.NUM_SLICES;
import static org.apache.solr.cloud.OverseerCollectionProcessor.REPLICATION_FACTOR;
import static org.apache.solr.cloud.OverseerCollectionProcessor.REQUESTID;
import static org.apache.solr.cloud.OverseerCollectionProcessor.ROUTER;
import static org.apache.solr.cloud.OverseerCollectionProcessor.SHARDS_PROP;
@ -36,6 +34,8 @@ import static org.apache.solr.common.params.CollectionParams.CollectionAction.AD
import static org.apache.solr.common.params.CollectionParams.CollectionAction.CLUSTERPROP;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.OVERSEERSTATUS;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.REMOVEROLE;
import static org.apache.solr.common.cloud.ZkStateReader.MAX_SHARDS_PER_NODE;
import static org.apache.solr.common.cloud.ZkStateReader.AUTO_ADD_REPLICAS;
import java.io.IOException;
import java.util.ArrayList;
@ -59,7 +59,6 @@ import org.apache.solr.cloud.OverseerSolrResponse;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.ImplicitDocRouter;
import org.apache.solr.common.cloud.ZkCoreNodeProps;
import org.apache.solr.common.cloud.ZkNodeProps;
@ -469,15 +468,16 @@ public class CollectionsHandler extends RequestHandlerBase {
Overseer.QUEUE_OPERATION,
OverseerCollectionProcessor.CREATECOLLECTION,
"fromApi","true");
copyIfNotNull(req.getParams(),props,
copyIfNotNull(req.getParams(), props,
"name",
REPLICATION_FACTOR,
ZkStateReader.REPLICATION_FACTOR,
COLL_CONF,
NUM_SLICES,
MAX_SHARDS_PER_NODE,
CREATE_NODE_SET ,
CREATE_NODE_SET,
SHARDS_PROP,
ASYNC,
AUTO_ADD_REPLICAS,
"router.");
copyPropertiesIfNotNull(req.getParams(), props);
@ -504,7 +504,7 @@ public class CollectionsHandler extends RequestHandlerBase {
throw new SolrException(ErrorCode.BAD_REQUEST, "shards can be added only to 'implicit' collections" );
Map<String, Object> map = makeMap(QUEUE_OPERATION, CREATESHARD);
copyIfNotNull(req.getParams(),map,COLLECTION_PROP, SHARD_ID_PROP, REPLICATION_FACTOR,CREATE_NODE_SET, ASYNC);
copyIfNotNull(req.getParams(),map,COLLECTION_PROP, SHARD_ID_PROP, ZkStateReader.REPLICATION_FACTOR, CREATE_NODE_SET, ASYNC);
copyPropertiesIfNotNull(req.getParams(), map);
ZkNodeProps m = new ZkNodeProps(map);
handleResponse(CREATESHARD, m, rsp);

View File

@ -17,8 +17,25 @@
package org.apache.solr.handler.admin;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import static org.apache.solr.common.cloud.DocCollection.DOC_ROUTER;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.commons.lang.StringUtils;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.search.MatchAllDocsQuery;
@ -33,6 +50,7 @@ import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.DocRouter;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
@ -68,23 +86,8 @@ import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import static org.apache.solr.common.cloud.DocCollection.DOC_ROUTER;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
/**
*
@ -554,22 +557,26 @@ public class CoreAdminHandler extends RequestHandlerBase {
}
// TODO this should be moved into CoreContainer, really...
boolean preExisitingZkEntry = false;
try {
if (coreContainer.getZkController() != null) {
if(!Overseer.isLegacy(coreContainer.getZkController() .getZkStateReader().getClusterProps())){
if(dcore.getCloudDescriptor().getCoreNodeName() ==null) {
if (!Overseer.isLegacy(coreContainer.getZkController().getZkStateReader().getClusterProps())) {
if (dcore.getCloudDescriptor().getCoreNodeName() == null) {
throw new SolrException(ErrorCode.SERVER_ERROR,
"non legacy mode coreNodeName missing "+ params);
"non legacy mode coreNodeName missing " + params);
}
}
}
// make sure we can write out the descriptor first
coreContainer.getCoresLocator().create(coreContainer, dcore);
preExisitingZkEntry = checkIfCoreNodeNameAlreadyExists(dcore);
}
SolrCore core = coreContainer.create(dcore);
// only write out the descriptor if the core is successfully created
coreContainer.getCoresLocator().create(coreContainer, dcore);
if (coreContainer.getCoresLocator() instanceof SolrXMLCoresLocator) {
// hack - in this case we persist once more because a core create race might
// have dropped entries.
@ -578,7 +585,7 @@ public class CoreAdminHandler extends RequestHandlerBase {
rsp.add("core", core.getName());
}
catch (Exception ex) {
if (coreContainer.isZooKeeperAware() && dcore != null) {
if (coreContainer.isZooKeeperAware() && dcore != null && !preExisitingZkEntry) {
try {
coreContainer.getZkController().unregister(dcore.getName(), dcore);
} catch (InterruptedException e) {
@ -609,6 +616,27 @@ public class CoreAdminHandler extends RequestHandlerBase {
}
}
private boolean checkIfCoreNodeNameAlreadyExists(CoreDescriptor dcore) {
ZkStateReader zkStateReader = coreContainer.getZkController()
.getZkStateReader();
DocCollection collection = zkStateReader.getClusterState().getCollectionOrNull(dcore.getCollectionName());
if (collection != null) {
Collection<Slice> slices = collection.getSlices();
for (Slice slice : slices) {
Collection<Replica> replicas = slice.getReplicas();
for (Replica replica : replicas) {
if (replica.getName().equals(
dcore.getCloudDescriptor().getCoreNodeName())) {
return true;
}
}
}
}
return false;
}
/**
* Handle "RENAME" Action
*/

View File

@ -63,6 +63,7 @@ public class LocalSolrQueryRequest extends SolrQueryRequestBase {
public LocalSolrQueryRequest(SolrCore core, Map<String,String[]> args) {
super(core, new MultiMapSolrParams(args));
}
public LocalSolrQueryRequest(SolrCore core, SolrParams args) {
super(core, args);
}

View File

@ -305,7 +305,7 @@ public class HdfsUpdateLog extends UpdateLog {
if (ulogPluginInfo == null) return;
Path tlogDir = new Path(getTlogDir(core, ulogPluginInfo));
try {
if (fs.exists(tlogDir)) {
if (fs != null && fs.exists(tlogDir)) {
String[] files = getLogList(tlogDir);
for (String file : files) {
Path f = new Path(tlogDir, file);

View File

@ -47,13 +47,19 @@ public class UpdateShardHandler {
public UpdateShardHandler(ConfigSolr cfg) {
clientConnectionManager = new PoolingClientConnectionManager(SchemeRegistryFactory.createSystemDefault());
if (cfg != null ) {
clientConnectionManager.setMaxTotal(cfg.getMaxUpdateConnections());
clientConnectionManager.setDefaultMaxPerRoute(cfg.getMaxUpdateConnectionsPerHost());
}
ModifiableSolrParams params = new ModifiableSolrParams();
params.set(HttpClientUtil.PROP_SO_TIMEOUT, cfg.getDistributedSocketTimeout());
params.set(HttpClientUtil.PROP_CONNECTION_TIMEOUT, cfg.getDistributedConnectionTimeout());
if (cfg != null) {
params.set(HttpClientUtil.PROP_SO_TIMEOUT,
cfg.getDistributedSocketTimeout());
params.set(HttpClientUtil.PROP_CONNECTION_TIMEOUT,
cfg.getDistributedConnectionTimeout());
}
params.set(HttpClientUtil.PROP_USE_RETRY, false);
log.info("Creating UpdateShardHandler HTTP client with params: {}", params);
client = HttpClientUtil.createClient(params, clientConnectionManager);

View File

@ -25,3 +25,6 @@ log4j.logger.org.apache.solr.hadoop=INFO
#log4j.logger.org.apache.solr.update.TransactionLog=DEBUG
#log4j.logger.org.apache.solr.handler.ReplicationHandler=DEBUG
#log4j.logger.org.apache.solr.handler.SnapPuller=DEBUG
#log4j.logger.org.apache.solr.common.cloud.ClusterStateUtil=DEBUG
#log4j.logger.org.apache.solr.cloud.OverseerAutoReplicaFailoverThread=DEBUG

View File

@ -28,6 +28,9 @@
<bool name="genericCoreNodeNames">${genericCoreNodeNames:true}</bool>
<int name="distribUpdateConnTimeout">${distribUpdateConnTimeout:45000}</int>
<int name="distribUpdateSoTimeout">${distribUpdateSoTimeout:340000}</int>
<int name="autoReplicaFailoverWaitAfterExpiration">${autoReplicaFailoverWaitAfterExpiration:10000}</int>
<int name="autoReplicaFailoverWorkLoopDelay">${autoReplicaFailoverWorkLoopDelay:10000}</int>
<int name="autoReplicaFailoverBadNodeExpiration">${autoReplicaFailoverBadNodeExpiration:60000}</int>
</solrcloud>
<shardHandlerFactory name="shardHandlerFactory"

View File

@ -587,8 +587,8 @@ public class BasicDistributedZkTest extends AbstractFullDistribZkTestBase {
params.set("action", CollectionAction.CREATE.toString());
params.set(OverseerCollectionProcessor.NUM_SLICES, numShards);
params.set(OverseerCollectionProcessor.REPLICATION_FACTOR, numReplicas);
params.set(OverseerCollectionProcessor.MAX_SHARDS_PER_NODE, maxShardsPerNode);
params.set(ZkStateReader.REPLICATION_FACTOR, numReplicas);
params.set(ZkStateReader.MAX_SHARDS_PER_NODE, maxShardsPerNode);
if (createNodeSetStr != null) params.set(OverseerCollectionProcessor.CREATE_NODE_SET, createNodeSetStr);
int clientIndex = clients.size() > 1 ? random().nextInt(2) : 0;

View File

@ -28,6 +28,8 @@ import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.handler.component.HttpShardHandlerFactory;
import org.apache.solr.update.UpdateShardHandler;
import org.apache.solr.util.MockConfigSolr;
import org.apache.zookeeper.KeeperException;
import org.junit.After;
import org.junit.Before;
@ -253,10 +255,10 @@ public class ChaosMonkeyShardSplitTest extends ShardSplitTest {
SolrZkClient zkClient = new SolrZkClient(address, TIMEOUT);
ZkStateReader reader = new ZkStateReader(zkClient);
LeaderElector overseerElector = new LeaderElector(zkClient);
UpdateShardHandler updateShardHandler = new UpdateShardHandler(null);
// TODO: close Overseer
Overseer overseer = new Overseer(
new HttpShardHandlerFactory().getShardHandler(), "/admin/cores", reader,null);
new HttpShardHandlerFactory().getShardHandler(), updateShardHandler, "/admin/cores", reader, null, new MockConfigSolr());
overseer.close();
ElectionContext ec = new OverseerElectionContext(zkClient, overseer,
address.replaceAll("/", "_"));

View File

@ -256,6 +256,7 @@ public class ClusterStateUpdateTest extends SolrTestCaseJ4 {
System.clearProperty("zkHost");
System.clearProperty("hostPort");
System.clearProperty("solrcloud.update.delay");
System.clearProperty("solr.data.dir");
}
static void printLayout(String zkHost) throws Exception {

View File

@ -17,9 +17,9 @@ package org.apache.solr.cloud;
* limitations under the License.
*/
import static org.apache.solr.cloud.OverseerCollectionProcessor.MAX_SHARDS_PER_NODE;
import static org.apache.solr.common.cloud.ZkStateReader.MAX_SHARDS_PER_NODE;
import static org.apache.solr.cloud.OverseerCollectionProcessor.NUM_SLICES;
import static org.apache.solr.cloud.OverseerCollectionProcessor.REPLICATION_FACTOR;
import static org.apache.solr.common.cloud.ZkStateReader.REPLICATION_FACTOR;
import static org.apache.solr.common.cloud.ZkNodeProps.makeMap;
import java.io.File;

View File

@ -17,11 +17,11 @@ package org.apache.solr.cloud;
* limitations under the License.
*/
import static org.apache.solr.cloud.OverseerCollectionProcessor.MAX_SHARDS_PER_NODE;
import static org.apache.solr.cloud.OverseerCollectionProcessor.NUM_SLICES;
import static org.apache.solr.cloud.OverseerCollectionProcessor.REPLICATION_FACTOR;
import static org.apache.solr.cloud.OverseerCollectionProcessor.ROUTER;
import static org.apache.solr.cloud.OverseerCollectionProcessor.SHARDS_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.REPLICATION_FACTOR;
import static org.apache.solr.common.cloud.ZkStateReader.MAX_SHARDS_PER_NODE;
import static org.apache.solr.common.params.ShardParams._ROUTE_;
import java.util.ArrayList;

View File

@ -18,9 +18,8 @@ package org.apache.solr.cloud;
*/
import static org.apache.solr.cloud.OverseerCollectionProcessor.DELETEREPLICA;
import static org.apache.solr.cloud.OverseerCollectionProcessor.MAX_SHARDS_PER_NODE;
import static org.apache.solr.common.cloud.ZkStateReader.MAX_SHARDS_PER_NODE;
import static org.apache.solr.cloud.OverseerCollectionProcessor.NUM_SLICES;
import static org.apache.solr.cloud.OverseerCollectionProcessor.REPLICATION_FACTOR;
import static org.apache.solr.common.cloud.ZkNodeProps.makeMap;
import java.io.File;
@ -39,6 +38,7 @@ import org.apache.solr.client.solrj.response.CoreAdminResponse;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.MapSolrParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.NamedList;
@ -162,7 +162,7 @@ public class DeleteReplicaTest extends AbstractFullDistribZkTestBase {
.getZkStateReader().getClusterState().getLiveNodes().size())) + 1;
Map<String, Object> props = makeMap(
REPLICATION_FACTOR, replicationFactor,
ZkStateReader.REPLICATION_FACTOR, replicationFactor,
MAX_SHARDS_PER_NODE, maxShardsPerNode,
NUM_SLICES, numShards);
Map<String,List<Integer>> collectionInfos = new HashMap<>();

View File

@ -41,9 +41,9 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.apache.solr.cloud.OverseerCollectionProcessor.MAX_SHARDS_PER_NODE;
import static org.apache.solr.cloud.OverseerCollectionProcessor.NUM_SLICES;
import static org.apache.solr.cloud.OverseerCollectionProcessor.REPLICATION_FACTOR;
import static org.apache.solr.common.cloud.ZkStateReader.REPLICATION_FACTOR;
import static org.apache.solr.common.cloud.ZkStateReader.MAX_SHARDS_PER_NODE;
public class MigrateRouteKeyTest extends BasicDistributedZkTest {

View File

@ -402,22 +402,22 @@ public class OverseerCollectionProcessorTest extends SolrTestCaseJ4 {
if (sendCreateNodeList) {
props = new ZkNodeProps(Overseer.QUEUE_OPERATION,
OverseerCollectionProcessor.CREATECOLLECTION,
OverseerCollectionProcessor.REPLICATION_FACTOR,
ZkStateReader.REPLICATION_FACTOR,
replicationFactor.toString(), "name", COLLECTION_NAME,
"collection.configName", CONFIG_NAME,
OverseerCollectionProcessor.NUM_SLICES, numberOfSlices.toString(),
OverseerCollectionProcessor.MAX_SHARDS_PER_NODE,
ZkStateReader.MAX_SHARDS_PER_NODE,
maxShardsPerNode.toString(),
OverseerCollectionProcessor.CREATE_NODE_SET,
(createNodeList != null)?StrUtils.join(createNodeList, ','):null);
} else {
props = new ZkNodeProps(Overseer.QUEUE_OPERATION,
OverseerCollectionProcessor.CREATECOLLECTION,
OverseerCollectionProcessor.REPLICATION_FACTOR,
ZkStateReader.REPLICATION_FACTOR,
replicationFactor.toString(), "name", COLLECTION_NAME,
"collection.configName", CONFIG_NAME,
OverseerCollectionProcessor.NUM_SLICES, numberOfSlices.toString(),
OverseerCollectionProcessor.MAX_SHARDS_PER_NODE,
ZkStateReader.MAX_SHARDS_PER_NODE,
maxShardsPerNode.toString());
}
QueueEvent qe = new QueueEvent("id", ZkStateReader.toJSON(props), null){

View File

@ -18,9 +18,9 @@ package org.apache.solr.cloud;
*/
import static org.apache.solr.cloud.OverseerCollectionProcessor.MAX_SHARDS_PER_NODE;
import static org.apache.solr.cloud.OverseerCollectionProcessor.NUM_SLICES;
import static org.apache.solr.cloud.OverseerCollectionProcessor.REPLICATION_FACTOR;
import static org.apache.solr.common.cloud.ZkStateReader.REPLICATION_FACTOR;
import static org.apache.solr.common.cloud.ZkStateReader.MAX_SHARDS_PER_NODE;
import static org.apache.solr.cloud.OverseerCollectionProcessor.getSortedOverseerNodeNames;
import static org.apache.solr.cloud.OverseerCollectionProcessor.getLeaderNode;
import static org.apache.solr.common.cloud.ZkNodeProps.makeMap;

View File

@ -44,7 +44,9 @@ import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.handler.component.HttpShardHandlerFactory;
import org.apache.solr.update.UpdateShardHandler;
import org.apache.solr.util.DefaultSolrThreadFactory;
import org.apache.solr.util.MockConfigSolr;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.NoNodeException;
@ -981,8 +983,9 @@ public class OverseerTest extends SolrTestCaseJ4 {
overseers.get(overseers.size() -1).close();
overseers.get(overseers.size() -1).getZkStateReader().getZkClient().close();
}
UpdateShardHandler updateShardHandler = new UpdateShardHandler(null);
Overseer overseer = new Overseer(
new HttpShardHandlerFactory().getShardHandler(), "/admin/cores", reader,null);
new HttpShardHandlerFactory().getShardHandler(), updateShardHandler, "/admin/cores", reader, null, new MockConfigSolr());
overseers.add(overseer);
ElectionContext ec = new OverseerElectionContext(zkClient, overseer,
address.replaceAll("/", "_"));

View File

@ -17,18 +17,8 @@ package org.apache.solr.cloud;
* limitations under the License.
*/
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.common.SolrDocument;
import org.apache.solr.common.params.ShardParams;
import org.apache.solr.common.util.StrUtils;
import org.junit.BeforeClass;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
public class ShardRoutingCustomTest extends AbstractFullDistribZkTestBase {

View File

@ -52,9 +52,9 @@ import java.util.Random;
import java.util.Set;
import org.apache.lucene.util.LuceneTestCase.Slow;
import static org.apache.solr.cloud.OverseerCollectionProcessor.MAX_SHARDS_PER_NODE;
import static org.apache.solr.cloud.OverseerCollectionProcessor.NUM_SLICES;
import static org.apache.solr.cloud.OverseerCollectionProcessor.REPLICATION_FACTOR;
import static org.apache.solr.common.cloud.ZkStateReader.REPLICATION_FACTOR;
import static org.apache.solr.common.cloud.ZkStateReader.MAX_SHARDS_PER_NODE;
@Slow
public class ShardSplitTest extends BasicDistributedZkTest {

View File

@ -0,0 +1,194 @@
package org.apache.solr.cloud;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.util.Collection;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Future;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.lucene.util.LuceneTestCase.Slow;
import org.apache.lucene.util.LuceneTestCase.Nightly;
import org.apache.solr.SolrTestCaseJ4.SuppressSSL;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.response.CollectionAdminResponse;
import org.apache.solr.cloud.hdfs.HdfsTestUtil;
import org.apache.solr.common.cloud.ClusterStateUtil;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.util.DefaultSolrThreadFactory;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakScope;
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakScope.Scope;
@Nightly
@Slow
@SuppressSSL
@ThreadLeakScope(Scope.NONE) // hdfs client currently leaks thread(s)
public class SharedFSAutoReplicaFailoverTest extends AbstractFullDistribZkTestBase {
private static final boolean DEBUG = true;
private static MiniDFSCluster dfsCluster;
ThreadPoolExecutor executor = new ThreadPoolExecutor(0,
Integer.MAX_VALUE, 5, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
new DefaultSolrThreadFactory("testExecutor"));
CompletionService<Object> completionService;
Set<Future<Object>> pending;
@BeforeClass
public static void hdfsFailoverBeforeClass() throws Exception {
dfsCluster = HdfsTestUtil.setupClass(createTempDir().getAbsolutePath());
}
@AfterClass
public static void hdfsFailoverAfterClass() throws Exception {
HdfsTestUtil.teardownClass(dfsCluster);
dfsCluster = null;
}
@Before
@Override
public void setUp() throws Exception {
super.setUp();
useJettyDataDir = false;
System.setProperty("solr.xml.persist", "true");
}
protected String getSolrXml() {
return "solr-no-core.xml";
}
public SharedFSAutoReplicaFailoverTest() {
fixShardCount = true;
sliceCount = 2;
shardCount = 4;
completionService = new ExecutorCompletionService<>(executor);
pending = new HashSet<>();
checkCreatedVsState = false;
}
@Override
public void doTest() throws Exception {
try {
testBasics();
} finally {
if (DEBUG) {
super.printLayout();
}
}
}
// very slow tests, especially since jetty is started and stopped
// serially
private void testBasics() throws Exception {
String collection1 = "solrj_collection";
CollectionAdminResponse response = CollectionAdminRequest.createCollection(collection1, 2,
2, 2, null, "conf1", "myOwnField", true, cloudClient);
assertEquals(0, response.getStatus());
assertTrue(response.isSuccess());
waitForRecoveriesToFinish(collection1, false);
String collection2 = "solrj_collection2";
CollectionAdminResponse response2 = CollectionAdminRequest.createCollection(collection2, 2,
2, 2, null, "conf1", "myOwnField", false, cloudClient);
assertEquals(0, response2.getStatus());
assertTrue(response2.isSuccess());
waitForRecoveriesToFinish(collection2, false);
ChaosMonkey.stop(jettys.get(1));
ChaosMonkey.stop(jettys.get(2));
Thread.sleep(3000);
assertTrue("Timeout waiting for all live and active", ClusterStateUtil.waitForAllActiveAndLive(cloudClient.getZkStateReader(), collection1, 120000));
assertSliceAndReplicaCount(collection1);
assertEquals(4, getLiveAndActiveCount(collection1));
assertTrue(getLiveAndActiveCount(collection2) < 4);
ChaosMonkey.stop(jettys);
ChaosMonkey.stop(controlJetty);
assertTrue("Timeout waiting for all not live", ClusterStateUtil.waitForAllNotLive(cloudClient.getZkStateReader(), 45000));
ChaosMonkey.start(jettys);
ChaosMonkey.start(controlJetty);
assertTrue("Timeout waiting for all live and active", ClusterStateUtil.waitForAllActiveAndLive(cloudClient.getZkStateReader(), collection1, 120000));
assertSliceAndReplicaCount(collection1);
int jettyIndex = random().nextInt(jettys.size());
ChaosMonkey.stop(jettys.get(jettyIndex));
ChaosMonkey.start(jettys.get(jettyIndex));
assertTrue("Timeout waiting for all live and active", ClusterStateUtil.waitForAllActiveAndLive(cloudClient.getZkStateReader(), collection1, 60000));
assertSliceAndReplicaCount(collection1);
}
private int getLiveAndActiveCount(String collection1) {
Collection<Slice> slices;
slices = cloudClient.getZkStateReader().getClusterState().getActiveSlices(collection1);
int liveAndActive = 0;
for (Slice slice : slices) {
for (Replica replica : slice.getReplicas()) {
boolean live = cloudClient.getZkStateReader().getClusterState().liveNodesContain(replica.getNodeName());
boolean active = replica.getStr(ZkStateReader.STATE_PROP).equals(ZkStateReader.ACTIVE);
if (live && active) {
liveAndActive++;
}
}
}
return liveAndActive;
}
private void assertSliceAndReplicaCount(String collection) {
Collection<Slice> slices;
slices = cloudClient.getZkStateReader().getClusterState().getActiveSlices(collection);
assertEquals(2, slices.size());
for (Slice slice : slices) {
assertEquals(2, slice.getReplicas().size());
}
}
@Override
public void tearDown() throws Exception {
super.tearDown();
System.clearProperty("solr.xml.persist");
}
}

View File

@ -0,0 +1,375 @@
package org.apache.solr.cloud;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.io.Closeable;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.cloud.OverseerAutoReplicaFailoverThread.DownReplica;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkStateReader;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class SharedFSAutoReplicaFailoverUtilsTest extends SolrTestCaseJ4 {
private static final String NODE6 = "baseUrl6_";
private static final String NODE6_URL = "http://baseUrl6";
private static final String NODE5 = "baseUrl5_";
private static final String NODE5_URL = "http://baseUrl5";
private static final String NODE4 = "baseUrl4_";
private static final String NODE4_URL = "http://baseUrl4";
private static final String NODE3 = "baseUrl3_";
private static final String NODE3_URL = "http://baseUrl3";
private static final String NODE2 = "baseUrl2_";
private static final String NODE2_URL = "http://baseUrl2";
private static final String NODE1 = "baseUrl1_";
private static final String NODE1_URL = "http://baseUrl1";
private final static Pattern BLUEPRINT = Pattern.compile("([a-z])(\\d+)?(?:(['A','R','D','F']))?(\\*)?");
private int buildNumber = 1;
private List<Result> results;
@Before
public void setUp() throws Exception {
super.setUp();
results = new ArrayList<Result>();
}
@After
public void tearDown() throws Exception {
super.tearDown();
for (Result result : results) {
result.close();
}
}
@Test
public void testGetBestCreateUrlBasics() {
Result result = buildClusterState("csr1R*r2", NODE1);
String createUrl = OverseerAutoReplicaFailoverThread.getBestCreateUrl(result.reader, result.badReplica);
assertNull("Should be no live node to failover to", createUrl);
result = buildClusterState("csr1R*r2", NODE1, NODE2);
createUrl = OverseerAutoReplicaFailoverThread.getBestCreateUrl(result.reader, result.badReplica);
assertNull("Only failover candidate node already has a replica", createUrl);
result = buildClusterState("csr1R*r2sr3", NODE1, NODE2, NODE3);
createUrl = OverseerAutoReplicaFailoverThread.getBestCreateUrl(result.reader, result.badReplica);
assertEquals("Node3 does not have a replica from the bad slice and should be the best choice", NODE3_URL, createUrl);
result = buildClusterState("csr1R*r2-4sr3r4r5", NODE1, NODE2, NODE3);
createUrl = OverseerAutoReplicaFailoverThread.getBestCreateUrl(result.reader, result.badReplica);
assertTrue(createUrl.equals(NODE2_URL) || createUrl.equals(NODE3_URL));
result = buildClusterState("csr1*r2r3sr3r3sr4", NODE1, NODE2, NODE3, NODE4);
createUrl = OverseerAutoReplicaFailoverThread.getBestCreateUrl(result.reader, result.badReplica);
assertEquals(NODE4_URL, createUrl);
result = buildClusterState("csr1*r2sr3r3sr4sr4", NODE1, NODE2, NODE3, NODE4);
createUrl = OverseerAutoReplicaFailoverThread.getBestCreateUrl(result.reader, result.badReplica);
assertTrue(createUrl.equals(NODE3_URL) || createUrl.equals(NODE4_URL));
}
private static class Result implements Closeable {
DownReplica badReplica;
ZkStateReader reader;
@Override
public void close() throws IOException {
reader.close();
}
}
@Test
public void testGetBestCreateUrlMultipleCollections() throws Exception {
Result result = buildClusterState("csr*r2csr2", NODE1);
String createUrl = OverseerAutoReplicaFailoverThread.getBestCreateUrl(result.reader, result.badReplica);
assertEquals(null, createUrl);
result = buildClusterState("csr*r2csr2", NODE1);
createUrl = OverseerAutoReplicaFailoverThread.getBestCreateUrl(result.reader, result.badReplica);
assertEquals(null, createUrl);
result = buildClusterState("csr*r2csr2", NODE1, NODE2);
createUrl = OverseerAutoReplicaFailoverThread.getBestCreateUrl(result.reader, result.badReplica);
assertEquals(null, createUrl);
}
@Test
public void testGetBestCreateUrlMultipleCollections2() {
Result result = buildClusterState("csr*r2sr3cr2", NODE1);
String createUrl = OverseerAutoReplicaFailoverThread.getBestCreateUrl(result.reader, result.badReplica);
assertEquals(null, createUrl);
result = buildClusterState("csr*r2sr3cr2", NODE1, NODE2, NODE3);
createUrl = OverseerAutoReplicaFailoverThread.getBestCreateUrl(result.reader, result.badReplica);
assertEquals(NODE3_URL, createUrl);
}
@Test
public void testGetBestCreateUrlMultipleCollections3() {
Result result = buildClusterState("csr5r1sr4r2sr3r6csr2*r6sr5r3sr4r3", NODE1, NODE4, NODE5, NODE6);
String createUrl = OverseerAutoReplicaFailoverThread.getBestCreateUrl(result.reader, result.badReplica);
assertEquals(NODE1_URL, createUrl);
}
@Test
public void testGetBestCreateUrlMultipleCollections4() {
Result result = buildClusterState("csr1r4sr3r5sr2r6csr5r6sr4r6sr5*r4", NODE6);
String createUrl = OverseerAutoReplicaFailoverThread.getBestCreateUrl(result.reader, result.badReplica);
assertEquals(NODE6_URL, createUrl);
}
@Test
public void testFailOverToEmptySolrInstance() {
Result result = buildClusterState("csr1*r1sr1csr1", NODE2);
String createUrl = OverseerAutoReplicaFailoverThread.getBestCreateUrl(result.reader, result.badReplica);
assertEquals(NODE2_URL, createUrl);
}
@Test
public void testFavorForeignSlices() {
Result result = buildClusterState("csr*sr2csr3r3", NODE2, NODE3);
String createUrl = OverseerAutoReplicaFailoverThread.getBestCreateUrl(result.reader, result.badReplica);
assertEquals(NODE3_URL, createUrl);
result = buildClusterState("csr*sr2csr3r3r3r3r3r3r3", NODE2, NODE3);
createUrl = OverseerAutoReplicaFailoverThread.getBestCreateUrl(result.reader, result.badReplica);
assertEquals(NODE2_URL, createUrl);
}
@Test
public void testCollectionMaxNodesPerShard() {
Result result = buildClusterState("csr*sr2", 1, 1, NODE2);
String createUrl = OverseerAutoReplicaFailoverThread.getBestCreateUrl(result.reader, result.badReplica);
assertEquals(null, createUrl);
result = buildClusterState("csr*sr2", 1, 2, NODE2);
createUrl = OverseerAutoReplicaFailoverThread.getBestCreateUrl(result.reader, result.badReplica);
assertEquals(NODE2_URL, createUrl);
result = buildClusterState("csr*csr2r2", 1, 1, NODE2);
createUrl = OverseerAutoReplicaFailoverThread.getBestCreateUrl(result.reader, result.badReplica);
assertEquals(NODE2_URL, createUrl);
}
private Result buildClusterState(String string, String ... liveNodes) {
return buildClusterState(string, 1, liveNodes);
}
private Result buildClusterState(String string, int replicationFactor, String ... liveNodes) {
return buildClusterState(string, replicationFactor, 10, liveNodes);
}
/**
* This method lets you construct a complex ClusterState object by using simple strings of letters.
*
* c = collection, s = slice, r = replica, \d = node number (r2 means the replica is on node 2),
* state = [A,R,D,F], * = replica to replace, binds to the left.
*
* For example:
* csrr2rD*sr2csr
*
* Creates:
*
* 'csrr2rD*'
* A collection, a shard, a replica on node 1 (the default) that is active (the default), a replica on node 2, and a replica on node 1
* that has a state of down and is the replica we will be looking to put somewhere else (the *).
*
* 'sr2'
* Then, another shard that has a replica on node 2.
*
* 'csr'
* Then, another collection that has a shard with a single active replica on node 1.
*
* Result:
* {
* "collection2":{
* "maxShardsPerNode":"1",
* "replicationFactor":"1",
* "shards":{"slice1":{
* "state":"active",
* "replicas":{"replica5":{
* "state":"active",
* "node_name":"baseUrl1_",
* "base_url":"http://baseUrl1"}}}}},
* "collection1":{
* "maxShardsPerNode":"1",
* "replicationFactor":"1",
* "shards":{
* "slice1":{
* "state":"active",
* "replicas":{
* "replica3 (bad)":{
* "state":"down",
* "node_name":"baseUrl1_",
* "base_url":"http://baseUrl1"},
* "replica2":{
* "state":"active",
* "node_name":"baseUrl2_",
* "base_url":"http://baseUrl2"},
* "replica1":{
* "state":"active",
* "node_name":"baseUrl1_",
* "base_url":"http://baseUrl1"}}},
* "slice2":{
* "state":"active",
* "replicas":{"replica4":{
* "state":"active",
* "node_name":"baseUrl2_",
* "base_url":"http://baseUrl2"}}}}}}
*
*/
@SuppressWarnings("resource")
private Result buildClusterState(String clusterDescription, int replicationFactor, int maxShardsPerNode, String ... liveNodes) {
Result result = new Result();
Map<String,Slice> slices = null;
Map<String,Replica> replicas = null;
Map<String,Object> collectionProps = new HashMap<>();
collectionProps.put(ZkStateReader.MAX_SHARDS_PER_NODE, Integer.toString(maxShardsPerNode));
collectionProps.put(ZkStateReader.REPLICATION_FACTOR, Integer.toString(replicationFactor));
Map<String,DocCollection> collectionStates = new HashMap<>();
DocCollection docCollection = null;
Slice slice = null;
int replicaCount = 1;
Matcher m = BLUEPRINT.matcher(clusterDescription);
while (m.find()) {
Replica replica;
switch (m.group(1)) {
case "c":
slices = new HashMap<>();
docCollection = new DocCollection("collection" + (collectionStates.size() + 1), slices, collectionProps, null);
collectionStates.put(docCollection.getName(), docCollection);
break;
case "s":
replicas = new HashMap<>();
slice = new Slice("slice" + (slices.size() + 1), replicas, null);
slices.put(slice.getName(), slice);
break;
case "r":
Map<String,Object> replicaPropMap = new HashMap<>();
String node;
node = m.group(2);
if (node == null || node.trim().length() == 0) {
node = "1";
}
String state = ZkStateReader.ACTIVE;
String stateCode = m.group(3);
if (stateCode != null) {
switch (stateCode.charAt(0)) {
case 'S':
state = ZkStateReader.ACTIVE;
break;
case 'R':
state = ZkStateReader.RECOVERING;
break;
case 'D':
state = ZkStateReader.DOWN;
break;
case 'F':
state = ZkStateReader.RECOVERY_FAILED;
break;
default:
throw new IllegalArgumentException(
"Unexpected state for replica: " + stateCode);
}
}
String nodeName = "baseUrl" + node + "_";
String replicaName = "replica" + replicaCount++;
if ("*".equals(m.group(4))) {
replicaName += " (bad)";
}
replicaPropMap.put(ZkStateReader.NODE_NAME_PROP, nodeName);
replicaPropMap.put(ZkStateReader.BASE_URL_PROP, "http://baseUrl" + node);
replicaPropMap.put(ZkStateReader.STATE_PROP, state);
replica = new Replica(replicaName, replicaPropMap);
if ("*".equals(m.group(4))) {
result.badReplica = new DownReplica();
result.badReplica.replica = replica;
result.badReplica.slice = slice;
result.badReplica.collection = docCollection;
}
replicas.put(replica.getName(), replica);
break;
default:
break;
}
}
// trunk briefly had clusterstate taking a zkreader :( this was required to work around that - leaving
// until that issue is resolved.
MockZkStateReader reader = new MockZkStateReader(null, collectionStates.keySet());
ClusterState clusterState = new ClusterState(1, new HashSet<String>(Arrays.asList(liveNodes)), collectionStates);
reader = new MockZkStateReader(clusterState, collectionStates.keySet());
String json;
try {
json = new String(ZkStateReader.toJSON(clusterState), "UTF-8");
} catch (UnsupportedEncodingException e) {
throw new RuntimeException("Unexpected");
}
System.err.println("build:" + buildNumber++);
System.err.println(json);
assert result.badReplica != null : "Is there no bad replica?";
assert result.badReplica.slice != null : "Is there no bad replica?";
result.reader = reader;
results.add(result);
return result;
}
}

View File

@ -28,11 +28,14 @@ import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.core.ConfigSolr;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.core.CoreDescriptor;
import org.apache.solr.core.CoresLocator;
import org.apache.solr.handler.admin.CoreAdminHandler;
import org.apache.solr.handler.component.HttpShardHandlerFactory;
import org.apache.solr.handler.component.ShardHandlerFactory;
import org.apache.solr.update.UpdateShardHandler;
import org.apache.solr.util.ExternalPaths;
import org.apache.zookeeper.CreateMode;
import org.junit.AfterClass;
@ -315,6 +318,31 @@ public class ZkControllerTest extends SolrTestCaseJ4 {
@Override
public void load() {};
@Override
public ConfigSolr getConfig() {
return new ConfigSolr() {
@Override
public CoresLocator getCoresLocator() {
throw new UnsupportedOperationException();
}
@Override
protected String getShardHandlerFactoryConfigPath() {
throw new UnsupportedOperationException();
}
@Override
public boolean isPersistent() {
throw new UnsupportedOperationException();
}};
}
@Override
public UpdateShardHandler getUpdateShardHandler() {
return new UpdateShardHandler(null);
}
@Override
public String getAdminPath() {
return "/admin/cores";

View File

@ -61,13 +61,15 @@ public class HdfsTestUtil {
System.setProperty("test.cache.data", dir + File.separator + "hdfs" + File.separator + "cache");
System.setProperty("solr.lock.type", "hdfs");
System.setProperty("solr.hdfs.home", "/solr_hdfs_home");
System.setProperty("solr.hdfs.blockcache.global", Boolean.toString(LuceneTestCase.random().nextBoolean()));
final MiniDFSCluster dfsCluster = new MiniDFSCluster(conf, dataNodes, true, null);
dfsCluster.waitActive();
System.setProperty("solr.hdfs.home", getDataDir(dfsCluster, "solr_hdfs_home"));
NameNodeAdapter.enterSafeMode(dfsCluster.getNameNode(), false);
int rnd = LuceneTestCase.random().nextInt(10000);

View File

@ -138,10 +138,9 @@ public class TestReplicationHandler extends SolrTestCaseJ4 {
}
private static JettySolrRunner createJetty(SolrInstance instance) throws Exception {
System.setProperty("solr.data.dir", instance.getDataDir());
FileUtils.copyFile(new File(SolrTestCaseJ4.TEST_HOME(), "solr.xml"), new File(instance.getHomeDir(), "solr.xml"));
JettySolrRunner jetty = new JettySolrRunner(instance.getHomeDir(), "/solr", 0);
jetty.setDataDir(instance.getDataDir());
jetty.start();
return jetty;
}

View File

@ -63,10 +63,9 @@ public class TestReplicationHandlerBackup extends SolrJettyTestBase {
String backupKeepParamName = ReplicationHandler.NUMBER_BACKUPS_TO_KEEP_REQUEST_PARAM;
private static JettySolrRunner createJetty(TestReplicationHandler.SolrInstance instance) throws Exception {
System.setProperty("solr.data.dir", instance.getDataDir());
FileUtils.copyFile(new File(SolrTestCaseJ4.TEST_HOME(), "solr.xml"), new File(instance.getHomeDir(), "solr.xml"));
JettySolrRunner jetty = new JettySolrRunner(instance.getHomeDir(), "/solr", 0);
jetty.setDataDir(instance.getDataDir());
jetty.start();
return jetty;
}

View File

@ -90,8 +90,6 @@ public class TestRecoveryHdfs extends SolrTestCaseJ4 {
throw new RuntimeException(e);
}
//hdfsDataDir = hdfsUri + "/solr/shard1";
// System.setProperty("solr.data.dir", hdfsUri + "/solr/shard1");
System.setProperty("solr.ulog.dir", hdfsUri + "/solr/shard1");
initCore("solrconfig-tlog.xml","schema15.xml");
@ -100,7 +98,6 @@ public class TestRecoveryHdfs extends SolrTestCaseJ4 {
@AfterClass
public static void afterClass() throws Exception {
System.clearProperty("solr.ulog.dir");
System.clearProperty("solr.data.dir");
System.clearProperty("test.build.data");
System.clearProperty("test.cache.data");
deleteCore();

View File

@ -0,0 +1,43 @@
package org.apache.solr.util;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import org.apache.solr.core.ConfigSolr;
import org.apache.solr.core.CoresLocator;
/**
*
*/
public class MockConfigSolr extends ConfigSolr {
@Override
public CoresLocator getCoresLocator() {
return null;
}
@Override
protected String getShardHandlerFactoryConfigPath() {
return null;
}
@Override
public boolean isPersistent() {
return false;
}
}

View File

@ -76,6 +76,7 @@ public class CollectionAdminRequest extends SolrRequest
protected Integer numShards;
protected Integer maxShardsPerNode;
protected Integer replicationFactor;
protected Boolean autoAddReplicas;
public Create() {
@ -89,6 +90,7 @@ public class CollectionAdminRequest extends SolrRequest
public void setRouterField(String routerField) { this.routerField = routerField; }
public void setNumShards(Integer numShards) {this.numShards = numShards;}
public void setMaxShardsPerNode(Integer numShards) { this.maxShardsPerNode = numShards; }
public void setAutoAddReplicas(boolean autoAddReplicas) { this.autoAddReplicas = autoAddReplicas; }
public void setReplicationFactor(Integer repl) { this.replicationFactor = repl; }
public String getConfigName() { return configName; }
@ -98,6 +100,7 @@ public class CollectionAdminRequest extends SolrRequest
public Integer getNumShards() { return numShards; }
public Integer getMaxShardsPerNode() { return maxShardsPerNode; }
public Integer getReplicationFactor() { return replicationFactor; }
public Boolean getAutoAddReplicas() { return autoAddReplicas; }
@Override
public SolrParams getParams() {
@ -131,6 +134,9 @@ public class CollectionAdminRequest extends SolrRequest
if (asyncId != null) {
params.set("async", asyncId);
}
if (autoAddReplicas != null) {
params.set(ZkStateReader.AUTO_ADD_REPLICAS, autoAddReplicas);
}
return params;
}
@ -385,6 +391,28 @@ public class CollectionAdminRequest extends SolrRequest
return createCollection(name, shards, repl, maxShards, nodeSet, conf, routerField, server, null);
}
// creates collection using a compositeId router
public static CollectionAdminResponse createCollection( String name,
Integer shards, Integer repl, Integer maxShards,
String nodeSet,
String conf,
String routerField,
Boolean autoAddReplicas,
SolrServer server) throws SolrServerException, IOException
{
Create req = new Create();
req.setCollectionName(name);
req.setRouterName("compositeId");
req.setNumShards(shards);
req.setReplicationFactor(repl);
req.setMaxShardsPerNode(maxShards);
req.setCreateNodeSet(nodeSet);
req.setConfigName(conf);
req.setRouterField(routerField);
req.setAutoAddReplicas(autoAddReplicas);
return req.process( server );
}
// creates collection using a compositeId router
public static CollectionAdminResponse createCollection( String name,
Integer shards, Integer repl, Integer maxShards,

View File

@ -17,6 +17,10 @@ package org.apache.solr.common.cloud;
* limitations under the License.
*/
/**
* @deprecated because this class is no longer used internally and will be removed
*/
@Deprecated
public interface ClosableThread {
public void close();
public boolean isClosed();

View File

@ -193,7 +193,6 @@ public class ClusterState implements JSONWriter.Writable {
return getShardId(null, nodeName, coreName);
}
public String getShardId(String collectionName, String nodeName, String coreName) {
Collection<DocCollection> states = collectionStates.values();
if (collectionName != null) {
@ -216,6 +215,25 @@ public class ClusterState implements JSONWriter.Writable {
return null;
}
public String getShardIdByCoreNodeName(String collectionName, String coreNodeName) {
Collection<DocCollection> states = collectionStates.values();
if (collectionName != null) {
DocCollection c = getCollectionOrNull(collectionName);
if (c != null) states = Collections.singletonList(c);
}
for (DocCollection coll : states) {
for (Slice slice : coll.getSlices()) {
for (Replica replica : slice.getReplicas()) {
if (coreNodeName.equals(replica.getName())) {
return slice.getName();
}
}
}
}
return null;
}
/**
* Check if node is alive.
*/

View File

@ -0,0 +1,230 @@
package org.apache.solr.common.cloud;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
import java.util.Collection;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ClusterStateUtil {
private static Logger log = LoggerFactory.getLogger(ClusterStateUtil.class);
private static final int TIMEOUT_POLL_MS = 1000;
/**
* Wait to see *all* cores live and active.
*
* @param zkStateReader
* to use for ClusterState
* @param timeoutInMs
* how long to wait before giving up
* @return false if timed out
*/
public static boolean waitForAllActiveAndLive(ZkStateReader zkStateReader, int timeoutInMs) {
return waitForAllActiveAndLive(zkStateReader, null, timeoutInMs);
}
/**
* Wait to see *all* cores live and active.
*
* @param zkStateReader
* to use for ClusterState
* @param collection to look at
* @param timeoutInMs
* how long to wait before giving up
* @return false if timed out
*/
public static boolean waitForAllActiveAndLive(ZkStateReader zkStateReader, String collection,
int timeoutInMs) {
long timeout = System.nanoTime()
+ TimeUnit.NANOSECONDS.convert(timeoutInMs, TimeUnit.MILLISECONDS);
boolean success = false;
while (System.nanoTime() < timeout) {
success = true;
ClusterState clusterState = zkStateReader.getClusterState();
if (clusterState != null) {
Set<String> collections;
if (collection != null) {
collections = Collections.singleton(collection);
} else {
collections = clusterState.getCollections();
}
for (String coll : collections) {
DocCollection docCollection = clusterState.getCollection(coll);
Collection<Slice> slices = docCollection.getSlices();
for (Slice slice : slices) {
// only look at active shards
if (slice.getState().equals(Slice.ACTIVE)) {
Collection<Replica> replicas = slice.getReplicas();
for (Replica replica : replicas) {
// on a live node?
boolean live = clusterState.liveNodesContain(replica
.getNodeName());
String state = replica.getStr(ZkStateReader.STATE_PROP);
if (!live || !state.equals(ZkStateReader.ACTIVE)) {
// fail
success = false;
}
}
}
}
}
if (!success) {
try {
Thread.sleep(TIMEOUT_POLL_MS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new SolrException(ErrorCode.SERVER_ERROR, "Interrupted");
}
}
}
}
return success;
}
/**
* Wait to see an entry in the ClusterState with a specific coreNodeName and
* baseUrl.
*
* @param zkStateReader
* to use for ClusterState
* @param collection
* to look in
* @param coreNodeName
* to wait for
* @param baseUrl
* to wait for
* @param timeoutInMs
* how long to wait before giving up
* @return false if timed out
*/
public static boolean waitToSeeLive(ZkStateReader zkStateReader,
String collection, String coreNodeName, String baseUrl,
int timeoutInMs) {
long timeout = System.nanoTime()
+ TimeUnit.NANOSECONDS.convert(timeoutInMs, TimeUnit.MILLISECONDS);
while (System.nanoTime() < timeout) {
log.debug("waiting to see replica just created live collection={} replica={} baseUrl={}",
collection, coreNodeName, baseUrl);
ClusterState clusterState = zkStateReader.getClusterState();
if (clusterState != null) {
DocCollection docCollection = clusterState.getCollection(collection);
Collection<Slice> slices = docCollection.getSlices();
for (Slice slice : slices) {
// only look at active shards
if (slice.getState().equals(Slice.ACTIVE)) {
Collection<Replica> replicas = slice.getReplicas();
for (Replica replica : replicas) {
// on a live node?
boolean live = clusterState.liveNodesContain(replica.getNodeName());
String rcoreNodeName = replica.getName();
String rbaseUrl = replica.getStr(ZkStateReader.BASE_URL_PROP);
if (live && coreNodeName.equals(rcoreNodeName)
&& baseUrl.equals(rbaseUrl)) {
// found it
return true;
}
}
}
}
try {
Thread.sleep(TIMEOUT_POLL_MS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new SolrException(ErrorCode.SERVER_ERROR, "Interrupted");
}
}
}
log.error("Timed out waiting to see replica just created in cluster state. Continuing...");
return false;
}
public static boolean waitForAllNotLive(ZkStateReader zkStateReader, int timeoutInMs) {
return waitForAllNotLive(zkStateReader, null, timeoutInMs);
}
public static boolean waitForAllNotLive(ZkStateReader zkStateReader,
String collection, int timeoutInMs) {
long timeout = System.nanoTime()
+ TimeUnit.NANOSECONDS.convert(timeoutInMs, TimeUnit.MILLISECONDS);
boolean success = false;
while (System.nanoTime() < timeout) {
success = true;
ClusterState clusterState = zkStateReader.getClusterState();
if (clusterState != null) {
Set<String> collections;
if (collection == null) {
collections = clusterState.getCollections();
} else {
collections = Collections.singleton(collection);
}
for (String coll : collections) {
DocCollection docCollection = clusterState.getCollection(coll);
Collection<Slice> slices = docCollection.getSlices();
for (Slice slice : slices) {
// only look at active shards
if (slice.getState().equals(Slice.ACTIVE)) {
Collection<Replica> replicas = slice.getReplicas();
for (Replica replica : replicas) {
// on a live node?
boolean live = clusterState.liveNodesContain(replica
.getNodeName());
if (live) {
// fail
success = false;
}
}
}
}
}
if (!success) {
try {
Thread.sleep(TIMEOUT_POLL_MS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new SolrException(ErrorCode.SERVER_ERROR, "Interrupted");
}
}
}
}
return success;
}
public static boolean isAutoAddReplicas(ZkStateReader reader, String collection) {
ClusterState clusterState = reader.getClusterState();
if (clusterState != null) {
DocCollection docCollection = clusterState.getCollectionOrNull(collection);
if (docCollection != null) {
return docCollection.getAutoAddReplicas();
}
}
return false;
}
}

View File

@ -17,16 +17,17 @@ package org.apache.solr.common.cloud;
* limitations under the License.
*/
import org.noggit.JSONUtil;
import org.noggit.JSONWriter;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.noggit.JSONUtil;
import org.noggit.JSONWriter;
/**
* Models a Collection in zookeeper (but that Java name is obviously taken, hence "DocCollection")
*/
@ -40,6 +41,11 @@ public class DocCollection extends ZkNodeProps {
private final Map<String, Slice> activeSlices;
private final DocRouter router;
private final Integer replicationFactor;
private final Integer maxShardsPerNode;
private final boolean autoAddReplicas;
public DocCollection(String name, Map<String, Slice> slices, Map<String, Object> props, DocRouter router) {
this(name, slices, props, router, -1);
}
@ -56,6 +62,24 @@ public class DocCollection extends ZkNodeProps {
this.slices = slices;
this.activeSlices = new HashMap<>();
Object replicationFactorObject = (Object) props.get(ZkStateReader.REPLICATION_FACTOR);
if (replicationFactorObject != null) {
this.replicationFactor = Integer.parseInt(replicationFactorObject.toString());
} else {
this.replicationFactor = null;
}
Object maxShardsPerNodeObject = (Object) props.get(ZkStateReader.MAX_SHARDS_PER_NODE);
if (maxShardsPerNodeObject != null) {
this.maxShardsPerNode = Integer.parseInt(maxShardsPerNodeObject.toString());
} else {
this.maxShardsPerNode = null;
}
Object autoAddReplicasObject = (Object) props.get(ZkStateReader.AUTO_ADD_REPLICAS);
if (autoAddReplicasObject != null) {
this.autoAddReplicas = Boolean.parseBoolean(autoAddReplicasObject.toString());
} else {
this.autoAddReplicas = false;
}
Iterator<Map.Entry<String, Slice>> iter = slices.entrySet().iterator();
@ -112,7 +136,25 @@ public class DocCollection extends ZkNodeProps {
public int getVersion(){
return version;
}
/**
* @return replication factor for this collection or null if no
* replication factor exists.
*/
public Integer getReplicationFactor() {
return replicationFactor;
}
public boolean getAutoAddReplicas() {
return autoAddReplicas;
}
public int getMaxShardsPerNode() {
if (maxShardsPerNode == null) {
throw new SolrException(ErrorCode.BAD_REQUEST, ZkStateReader.MAX_SHARDS_PER_NODE + " is not in the cluster state.");
}
return maxShardsPerNode;
}

View File

@ -17,11 +17,11 @@ package org.apache.solr.common.cloud;
* the License.
*/
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.io.StringReader;
import java.io.StringWriter;
import java.io.UnsupportedEncodingException;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.concurrent.ExecutorService;
@ -59,7 +59,7 @@ import org.slf4j.LoggerFactory;
* ZooKeeper. This class handles synchronous connects and reconnections.
*
*/
public class SolrZkClient {
public class SolrZkClient implements Closeable {
// These should *only* be used for debugging or monitoring purposes
public static final AtomicLong numOpens = new AtomicLong();
public static final AtomicLong numCloses = new AtomicLong();
@ -87,6 +87,11 @@ public class SolrZkClient {
return zkClientTimeout;
}
// expert: for tests
public SolrZkClient() {
}
public SolrZkClient(String zkServerAddress, int zkClientTimeout) {
this(zkServerAddress, zkClientTimeout, new DefaultConnectionStrategy(), null);
}

View File

@ -32,26 +32,25 @@ import org.noggit.ObjectBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.URLDecoder;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class ZkStateReader {
public class ZkStateReader implements Closeable {
private static Logger log = LoggerFactory.getLogger(ZkStateReader.class);
public static final String BASE_URL_PROP = "base_url";
@ -75,6 +74,9 @@ public class ZkStateReader {
public static final String CLUSTER_STATE = "/clusterstate.json";
public static final String CLUSTER_PROPS = "/clusterprops.json";
public static final String REPLICATION_FACTOR = "replicationFactor";
public static final String MAX_SHARDS_PER_NODE = "maxShardsPerNode";
public static final String AUTO_ADD_REPLICAS = "autoAddReplicas";
public static final String ROLES = "/roles.json";
@ -91,7 +93,7 @@ public class ZkStateReader {
public static final String URL_SCHEME = "urlScheme";
private volatile ClusterState clusterState;
protected volatile ClusterState clusterState;
private static final long SOLRCLOUD_UPDATE_DELAY = Long.parseLong(System.getProperty("solrcloud.update.delay", "5000"));
@ -101,7 +103,6 @@ public class ZkStateReader {
//
// convenience methods... should these go somewhere else?
//
@ -440,11 +441,11 @@ public class ZkStateReader {
liveNodesSet.addAll(liveNodes);
if (!onlyLiveNodes) {
log.info("Updating cloud state from ZooKeeper... ");
log.debug("Updating cloud state from ZooKeeper... ");
clusterState = ClusterState.load(zkClient, liveNodesSet,this);
} else {
log.info("Updating live nodes from ZooKeeper... ({})", liveNodesSet.size());
log.debug("Updating live nodes from ZooKeeper... ({})", liveNodesSet.size());
clusterState = this.clusterState;
clusterState.setLiveNodes(liveNodesSet);
}
@ -453,16 +454,16 @@ public class ZkStateReader {
} else {
if (clusterStateUpdateScheduled) {
log.info("Cloud state update for ZooKeeper already scheduled");
log.debug("Cloud state update for ZooKeeper already scheduled");
return;
}
log.info("Scheduling cloud state update from ZooKeeper...");
log.debug("Scheduling cloud state update from ZooKeeper...");
clusterStateUpdateScheduled = true;
updateCloudExecutor.schedule(new Runnable() {
@Override
public void run() {
log.info("Updating cluster state from ZooKeeper...");
log.debug("Updating cluster state from ZooKeeper...");
synchronized (getUpdateLock()) {
clusterStateUpdateScheduled = false;
ClusterState clusterState;
@ -473,11 +474,11 @@ public class ZkStateReader {
liveNodesSet.addAll(liveNodes);
if (!onlyLiveNodes) {
log.info("Updating cloud state from ZooKeeper... ");
log.debug("Updating cloud state from ZooKeeper... ");
clusterState = ClusterState.load(zkClient, liveNodesSet,ZkStateReader.this);
} else {
log.info("Updating live nodes from ZooKeeper... ");
log.debug("Updating live nodes from ZooKeeper... ");
clusterState = ZkStateReader.this.clusterState;
clusterState.setLiveNodes(liveNodesSet);
@ -643,7 +644,7 @@ public class ZkStateReader {
public Map getClusterProps(){
Map result = null;
try {
if(getZkClient().exists(ZkStateReader.CLUSTER_PROPS,true)){
if(getZkClient().exists(ZkStateReader.CLUSTER_PROPS, true)){
result = (Map) ZkStateReader.fromJSON(getZkClient().getData(ZkStateReader.CLUSTER_PROPS, null, new Stat(), true)) ;
} else {
result= new LinkedHashMap();

View File

@ -17,7 +17,6 @@
package org.apache.solr.client.solrj;
import org.apache.solr.util.AbstractSolrTestCase;
import org.junit.BeforeClass;
@ -26,13 +25,12 @@ import org.junit.BeforeClass;
*
* This lets us try various SolrServer implementations with the same tests.
*
*
* @since solr 1.3
*/
abstract public class SolrExampleTestBase extends AbstractSolrTestCase
{
abstract public class SolrExampleTestBase extends AbstractSolrTestCase {
@Override
public String getSolrHome() { return "../../../example/solr/"; }
public String getSolrHome() {
return "../../../example/solr/";
}
@BeforeClass
public static void beforeClass() throws Exception {
@ -40,14 +38,20 @@ abstract public class SolrExampleTestBase extends AbstractSolrTestCase
}
@Override
public void setUp() throws Exception
{
public void setUp() throws Exception {
ignoreException("maxWarmingSearchers");
super.setUp();
// this sets the property for jetty starting SolrDispatchFilter
System.setProperty( "solr.solr.home", this.getSolrHome() );
System.setProperty( "solr.data.dir", this.initCoreDataDir.getCanonicalPath() );
System.setProperty("solr.solr.home", this.getSolrHome());
System.setProperty("solr.data.dir", this.initCoreDataDir.getCanonicalPath());
}
@Override
public void tearDown() throws Exception {
System.clearProperty("solr.solr.home");
System.clearProperty("solr.data.dir");
super.tearDown();
}
/**

View File

@ -310,7 +310,7 @@ public class TestLBHttpSolrServer extends SolrTestCaseJ4 {
public void startJetty() throws Exception {
jetty = new JettySolrRunner(getHomeDir(), "/solr", port, "bad_solrconfig.xml", null, true, null, sslConfig);
System.setProperty("solr.data.dir", getDataDir());
jetty.setDataDir(getDataDir());
jetty.start();
int newPort = jetty.getLocalPort();
if (port != 0 && newPort != port) {

View File

@ -88,6 +88,7 @@ public class JettyWebappTest extends SolrTestCaseJ4
server.stop();
} catch( Exception ex ) {}
System.clearProperty("tests.shardhandler.randomSeed");
System.clearProperty("solr.data.dir");
super.tearDown();
}

View File

@ -36,7 +36,6 @@ import junit.framework.Assert;
import org.apache.commons.io.FileUtils;
import org.apache.lucene.util.Constants;
import org.apache.lucene.util.LuceneTestCase;
import org.apache.lucene.util.TestUtil;
import org.apache.solr.client.solrj.SolrResponse;
import org.apache.solr.client.solrj.SolrServer;
@ -286,8 +285,13 @@ public abstract class BaseDistributedSearchTestCase extends SolrTestCaseJ4 {
super.tearDown();
}
protected JettySolrRunner createControlJetty() throws Exception {
JettySolrRunner jetty = createJetty(new File(getSolrHome()), testDir + "/control/data", null, getSolrConfigFile(), getSchemaFile());
return jetty;
}
protected void createServers(int numShards) throws Exception {
controlJetty = createJetty(new File(getSolrHome()), testDir + "/control/data", null, getSolrConfigFile(), getSchemaFile());
controlJetty = createControlJetty();
controlClient = createNewSolrServer(controlJetty.getLocalPort());

View File

@ -55,13 +55,15 @@ abstract public class SolrJettyTestBase extends SolrTestCaseJ4
ignoreException("maxWarmingSearchers");
// this sets the property for jetty starting SolrDispatchFilter
System.setProperty( "solr.data.dir", createTempDir().getCanonicalPath() );
context = context==null ? "/solr" : context;
SolrJettyTestBase.context = context;
jetty = new JettySolrRunner(solrHome, context, 0, configFile, schemaFile, stopAtShutdown, extraServlets, sslConfig);
// this sets the property for jetty starting SolrDispatchFilter
if (System.getProperty("solr.data.dir") == null && System.getProperty("solr.hdfs.home") == null) {
jetty.setDataDir(createTempDir().getCanonicalPath());
}
jetty.start();
port = jetty.getLocalPort();
log.info("Jetty Assigned Port#" + port);

View File

@ -18,11 +18,11 @@ package org.apache.solr.cloud;
*/
import static org.apache.solr.cloud.OverseerCollectionProcessor.CREATE_NODE_SET;
import static org.apache.solr.cloud.OverseerCollectionProcessor.MAX_SHARDS_PER_NODE;
import static org.apache.solr.cloud.OverseerCollectionProcessor.NUM_SLICES;
import static org.apache.solr.cloud.OverseerCollectionProcessor.REPLICATION_FACTOR;
import static org.apache.solr.cloud.OverseerCollectionProcessor.SHARDS_PROP;
import static org.apache.solr.common.cloud.ZkNodeProps.makeMap;
import static org.apache.solr.common.cloud.ZkStateReader.REPLICATION_FACTOR;
import static org.apache.solr.common.cloud.ZkStateReader.MAX_SHARDS_PER_NODE;
import java.io.File;
import java.io.IOException;
@ -62,7 +62,6 @@ import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkCoreNodeProps;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
@ -272,35 +271,46 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
protected void createServers(int numServers) throws Exception {
System.setProperty("collection", "control_collection");
String numShards = System.getProperty(ZkStateReader.NUM_SHARDS_PROP);
// we want hashes by default for the control, so set to 1 shard as opposed to leaving unset
// System.clearProperty(ZkStateReader.NUM_SHARDS_PROP);
String oldNumShards = System.getProperty(ZkStateReader.NUM_SHARDS_PROP);
System.setProperty(ZkStateReader.NUM_SHARDS_PROP, "1");
try {
File controlJettyDir = createTempDir();
setupJettySolrHome(controlJettyDir);
controlJetty = createJetty(controlJettyDir, testDir + "/control/data"); // don't pass shard name... let it default to "shard1"
System.clearProperty("collection");
if(numShards != null) {
System.setProperty(ZkStateReader.NUM_SHARDS_PROP, numShards);
} else {
System.clearProperty(ZkStateReader.NUM_SHARDS_PROP);
}
controlJetty = createJetty(controlJettyDir, useJettyDataDir ? getDataDir(testDir
+ "/control/data") : null); // don't pass shard name... let it default to
// "shard1"
controlClient = createNewSolrServer(controlJetty.getLocalPort());
if (sliceCount <= 0) {
// for now, just create the cloud client for the control if we don't create the normal cloud client.
// for now, just create the cloud client for the control if we don't
// create the normal cloud client.
// this can change if more tests need it.
controlClientCloud = createCloudClient("control_collection");
controlClientCloud.connect();
waitForCollection(controlClientCloud.getZkStateReader(), "control_collection", 0);
waitForCollection(controlClientCloud.getZkStateReader(),
"control_collection", 0);
// NOTE: we are skipping creation of the chaos monkey by returning here
cloudClient = controlClientCloud; // temporary - some code needs/uses cloudClient
cloudClient = controlClientCloud; // temporary - some code needs/uses
// cloudClient
return;
}
} finally {
System.clearProperty("collection");
if (oldNumShards != null) {
System.setProperty(ZkStateReader.NUM_SHARDS_PROP, oldNumShards);
} else {
System.clearProperty(ZkStateReader.NUM_SHARDS_PROP);
}
}
initCloud();
@ -1474,8 +1484,8 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
numShards = StrUtils.splitSmart(shardNames,',').size();
}
Integer replicationFactor = (Integer) collectionProps.get(REPLICATION_FACTOR);
if(numShards==null){
numShards = (Integer) OverseerCollectionProcessor.COLL_PROPS.get(REPLICATION_FACTOR);
if(replicationFactor==null){
replicationFactor = (Integer) OverseerCollectionProcessor.COLL_PROPS.get(REPLICATION_FACTOR);
}
if (confSetName != null) {
@ -1708,9 +1718,7 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
while (System.currentTimeMillis() < timeoutAt) {
getCommonCloudSolrServer().getZkStateReader().updateClusterState(true);
ClusterState clusterState = getCommonCloudSolrServer().getZkStateReader().getClusterState();
// Map<String,DocCollection> collections = clusterState
// .getCollectionStates();
if (! clusterState.hasCollection(collectionName)) {
if (!clusterState.hasCollection(collectionName)) {
found = false;
break;
}

View File

@ -35,6 +35,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.servlet.Filter;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
@ -230,8 +231,15 @@ public class ChaosMonkey {
}
}
public static void kill(CloudJettyRunner cjetty) throws Exception {
FilterHolder filterHolder = cjetty.jetty.getDispatchFilter();
public static void kill(List<JettySolrRunner> jettys) throws Exception {
for (JettySolrRunner jetty : jettys) {
kill(jetty);
}
}
public static void kill(JettySolrRunner jetty) throws Exception {
FilterHolder filterHolder = jetty.getDispatchFilter();
if (filterHolder != null) {
Filter filter = filterHolder.getFilter();
if (filter != null) {
@ -244,9 +252,8 @@ public class ChaosMonkey {
}
}
IpTables.blockPort(cjetty.jetty.getLocalPort());
IpTables.blockPort(jetty.getLocalPort());
JettySolrRunner jetty = cjetty.jetty;
monkeyLog("kill shard! " + jetty.getLocalPort());
jetty.stop();
@ -258,6 +265,10 @@ public class ChaosMonkey {
}
}
public static void kill(CloudJettyRunner cjetty) throws Exception {
kill(cjetty.jetty);
}
public void stopShard(String slice) throws Exception {
List<CloudJettyRunner> jetties = shardToJetty.get(slice);
for (CloudJettyRunner jetty : jetties) {
@ -556,10 +567,22 @@ public class ChaosMonkey {
return starts.get();
}
public static void stop(List<JettySolrRunner> jettys) throws Exception {
for (JettySolrRunner jetty : jettys) {
stop(jetty);
}
}
public static void stop(JettySolrRunner jetty) throws Exception {
stopJettySolrRunner(jetty);
}
public static void start(List<JettySolrRunner> jettys) throws Exception {
for (JettySolrRunner jetty : jettys) {
start(jetty);
}
}
public static boolean start(JettySolrRunner jetty) throws Exception {
IpTables.unblockPort(jetty.getLocalPort());

View File

@ -228,4 +228,5 @@ public class CloudInspectUtil {
return true;
}
}

View File

@ -0,0 +1,46 @@
package org.apache.solr.cloud;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.data.Stat;
// does not yet mock zkclient at all
public class MockSolrZkClient extends SolrZkClient {
public MockSolrZkClient() {
super();
}
@Override
public Boolean exists(final String path, boolean retryOnConnLoss)
throws KeeperException, InterruptedException {
return false;
}
@Override
public byte[] getData(final String path, final Watcher watcher, final Stat stat, boolean retryOnConnLoss)
throws KeeperException, InterruptedException {
return null;
}
}

View File

@ -0,0 +1,40 @@
package org.apache.solr.cloud;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.util.Set;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.ZkStateReader;
// does not yet mock zkclient at all
public class MockZkStateReader extends ZkStateReader {
private Set<String> collections;
public MockZkStateReader(ClusterState clusterState, Set<String> collections) {
super(new MockSolrZkClient());
this.clusterState = clusterState;
this.collections = collections;
}
public Set<String> getAllCollections(){
return collections;
}
}