mirror of https://github.com/apache/lucene.git
SOLR-4114: Allow creating more than one shard per instance with the Collection API.
git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1417045 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
d3d4ee84cd
commit
9469102f30
|
@ -106,6 +106,8 @@ New Features
|
|||
* SOLR-4087: Add MAX_DOC_FREQ option to MoreLikeThis.
|
||||
(Andrew Janowczyk via Mark Miller)
|
||||
|
||||
* SOLR-4114: Allow creating more than one shard per instance with the
|
||||
Collection API. (Per Steffensen, Mark Miller)
|
||||
|
||||
Optimizations
|
||||
----------------------
|
||||
|
|
|
@ -43,8 +43,13 @@ import org.slf4j.Logger;
|
|||
import org.slf4j.LoggerFactory;
|
||||
|
||||
public class OverseerCollectionProcessor implements Runnable {
|
||||
|
||||
public static final String NUM_SLICES = "numShards";
|
||||
|
||||
public static final String REPLICATION_FACTOR = "replicationFactor";
|
||||
|
||||
|
||||
public static final String MAX_SHARDS_PER_NODE = "maxShardsPerNode";
|
||||
|
||||
public static final String DELETECOLLECTION = "deletecollection";
|
||||
|
||||
public static final String CREATECOLLECTION = "createcollection";
|
||||
|
@ -164,120 +169,147 @@ public class OverseerCollectionProcessor implements Runnable {
|
|||
SolrException.log(log, "collection already exists: " + collectionName);
|
||||
return false;
|
||||
}
|
||||
|
||||
// look at the replication factor and see if it matches reality
|
||||
// if it does not, find best nodes to create more cores
|
||||
|
||||
String numReplicasString = message.getStr(REPLICATION_FACTOR);
|
||||
int numReplicas;
|
||||
try {
|
||||
numReplicas = numReplicasString == null ? 0 : Integer.parseInt(numReplicasString);
|
||||
} catch (Exception ex) {
|
||||
SolrException.log(log, "Could not parse " + REPLICATION_FACTOR, ex);
|
||||
return false;
|
||||
}
|
||||
String numShardsString = message.getStr("numShards");
|
||||
int numShards;
|
||||
try {
|
||||
numShards = numShardsString == null ? 0 : Integer.parseInt(numShardsString);
|
||||
} catch (Exception ex) {
|
||||
SolrException.log(log, "Could not parse numShards", ex);
|
||||
return false;
|
||||
}
|
||||
|
||||
if (numReplicas < 0) {
|
||||
SolrException.log(log, REPLICATION_FACTOR + " must be > 0");
|
||||
return false;
|
||||
}
|
||||
|
||||
if (numShards < 0) {
|
||||
SolrException.log(log, "numShards must be > 0");
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
String name = message.getStr("name");
|
||||
String configName = message.getStr("collection.configName");
|
||||
|
||||
// we need to look at every node and see how many cores it serves
|
||||
// add our new cores to existing nodes serving the least number of cores
|
||||
// but (for now) require that each core goes on a distinct node.
|
||||
|
||||
// TODO: add smarter options that look at the current number of cores per node?
|
||||
// for now we just go random
|
||||
Set<String> nodes = clusterState.getLiveNodes();
|
||||
List<String> nodeList = new ArrayList<String>(nodes.size());
|
||||
nodeList.addAll(nodes);
|
||||
Collections.shuffle(nodeList);
|
||||
|
||||
int numNodes = numShards * (numReplicas + 1);
|
||||
if (nodeList.size() < numNodes) {
|
||||
log.warn("Not enough nodes available to satisfy create collection request for collection:"
|
||||
+ collectionName
|
||||
+ " nodes needed:"
|
||||
+ numNodes
|
||||
+ " nodes available:"
|
||||
+ nodeList.size() + " - using nodes available");
|
||||
}
|
||||
|
||||
List<String> createOnNodes = nodeList.subList(0, Math.min(nodeList.size(), numNodes));
|
||||
|
||||
log.info("Create collection " + name + " on " + createOnNodes);
|
||||
|
||||
for (String replica : createOnNodes) {
|
||||
// TODO: this does not work if original url had _ in it
|
||||
// We should have a master list
|
||||
// look at the replication factor and see if it matches reality
|
||||
// if it does not, find best nodes to create more cores
|
||||
|
||||
// Need to create new params for each request
|
||||
ModifiableSolrParams params = new ModifiableSolrParams();
|
||||
params.set(CoreAdminParams.ACTION, CoreAdminAction.CREATE.toString());
|
||||
int numReplica = msgStrToInt(message, REPLICATION_FACTOR, 0);
|
||||
int numSlices = msgStrToInt(message, NUM_SLICES, 0);
|
||||
int maxShardsPerNode = msgStrToInt(message, MAX_SHARDS_PER_NODE, 1);
|
||||
|
||||
replica = replica.replaceAll("_", "/");
|
||||
params.set(CoreAdminParams.NAME, name);
|
||||
params.set("collection.configName", configName);
|
||||
params.set("numShards", numShards);
|
||||
ShardRequest sreq = new ShardRequest();
|
||||
params.set("qt", adminPath);
|
||||
sreq.purpose = 1;
|
||||
// TODO: this sucks
|
||||
if (replica.startsWith("http://")) replica = replica.substring(7);
|
||||
sreq.shards = new String[] {replica};
|
||||
sreq.actualShards = sreq.shards;
|
||||
sreq.params = params;
|
||||
if (numReplica < 0) {
|
||||
SolrException.log(log, REPLICATION_FACTOR + " must be > 0");
|
||||
return false;
|
||||
}
|
||||
|
||||
shardHandler.submit(sreq, replica, sreq.params);
|
||||
}
|
||||
|
||||
int failed = 0;
|
||||
ShardResponse srsp;
|
||||
do {
|
||||
srsp = shardHandler.takeCompletedOrError();
|
||||
if (srsp != null) {
|
||||
Throwable e = srsp.getException();
|
||||
if (e != null) {
|
||||
// should we retry?
|
||||
// TODO: we should return errors to the client
|
||||
// TODO: what if one fails and others succeed?
|
||||
failed++;
|
||||
log.error("Error talking to shard: " + srsp.getShard(), e);
|
||||
if (numSlices < 0) {
|
||||
SolrException.log(log, NUM_SLICES + " must be > 0");
|
||||
return false;
|
||||
}
|
||||
|
||||
String configName = message.getStr("collection.configName");
|
||||
|
||||
// we need to look at every node and see how many cores it serves
|
||||
// add our new cores to existing nodes serving the least number of cores
|
||||
// but (for now) require that each core goes on a distinct node.
|
||||
|
||||
// TODO: add smarter options that look at the current number of cores per
|
||||
// node?
|
||||
// for now we just go random
|
||||
Set<String> nodes = clusterState.getLiveNodes();
|
||||
List<String> nodeList = new ArrayList<String>(nodes.size());
|
||||
nodeList.addAll(nodes);
|
||||
Collections.shuffle(nodeList);
|
||||
|
||||
if (nodeList.size() <= 0) {
|
||||
log.error("Cannot create collection " + collectionName
|
||||
+ ". No live Solr-instaces");
|
||||
return false;
|
||||
}
|
||||
|
||||
int numShardsPerSlice = numReplica + 1;
|
||||
if (numShardsPerSlice > nodeList.size()) {
|
||||
log.warn("Specified "
|
||||
+ REPLICATION_FACTOR
|
||||
+ " of "
|
||||
+ numReplica
|
||||
+ " on collection "
|
||||
+ collectionName
|
||||
+ " is higher than or equal to the number of Solr instances currently live ("
|
||||
+ nodeList.size()
|
||||
+ "). Its unusual to run two replica of the same slice on the same Solr-instance.");
|
||||
}
|
||||
|
||||
int maxShardsAllowedToCreate = maxShardsPerNode * nodeList.size();
|
||||
int requestedShardsToCreate = numSlices * numShardsPerSlice;
|
||||
if (maxShardsAllowedToCreate < requestedShardsToCreate) {
|
||||
log.error("Cannot create collection " + collectionName + ". Value of "
|
||||
+ MAX_SHARDS_PER_NODE + " is " + maxShardsPerNode
|
||||
+ ", and the number of live nodes is " + nodeList.size()
|
||||
+ ". This allows a maximum of " + maxShardsAllowedToCreate
|
||||
+ " to be created. Value of " + NUM_SLICES + " is " + numSlices
|
||||
+ " and value of " + REPLICATION_FACTOR + " is " + numReplica
|
||||
+ ". This requires " + requestedShardsToCreate
|
||||
+ " shards to be created (higher than the allowed number)");
|
||||
return false;
|
||||
}
|
||||
|
||||
for (int i = 1; i <= numSlices; i++) {
|
||||
for (int j = 1; j <= numShardsPerSlice; j++) {
|
||||
String nodeName = nodeList.get(((i - 1) + (j - 1)) % nodeList.size());
|
||||
String sliceName = "shard" + i;
|
||||
String shardName = collectionName + "_" + sliceName + "_replica" + j;
|
||||
log.info("Creating shard " + shardName + " as part of slice "
|
||||
+ sliceName + " of collection " + collectionName + " on "
|
||||
+ nodeName);
|
||||
|
||||
// Need to create new params for each request
|
||||
ModifiableSolrParams params = new ModifiableSolrParams();
|
||||
params.set(CoreAdminParams.ACTION, CoreAdminAction.CREATE.toString());
|
||||
|
||||
params.set(CoreAdminParams.NAME, shardName);
|
||||
params.set("collection.configName", configName);
|
||||
params.set(CoreAdminParams.COLLECTION, collectionName);
|
||||
params.set(CoreAdminParams.SHARD, sliceName);
|
||||
params.set(ZkStateReader.NUM_SHARDS_PROP, numSlices);
|
||||
|
||||
ShardRequest sreq = new ShardRequest();
|
||||
params.set("qt", adminPath);
|
||||
sreq.purpose = 1;
|
||||
// TODO: this does not work if original url had _ in it
|
||||
// We should have a master list
|
||||
String replica = nodeName.replaceAll("_", "/");
|
||||
if (replica.startsWith("http://")) replica = replica.substring(7);
|
||||
sreq.shards = new String[] {replica};
|
||||
sreq.actualShards = sreq.shards;
|
||||
sreq.params = params;
|
||||
|
||||
shardHandler.submit(sreq, replica, sreq.params);
|
||||
|
||||
}
|
||||
}
|
||||
} while (srsp != null);
|
||||
|
||||
|
||||
// if all calls succeeded, return true
|
||||
if (failed > 0) {
|
||||
|
||||
int failed = 0;
|
||||
ShardResponse srsp;
|
||||
do {
|
||||
srsp = shardHandler.takeCompletedOrError();
|
||||
if (srsp != null) {
|
||||
Throwable e = srsp.getException();
|
||||
if (e != null) {
|
||||
// should we retry?
|
||||
// TODO: we should return errors to the client
|
||||
// TODO: what if one fails and others succeed?
|
||||
failed++;
|
||||
log.error("Error talking to shard: " + srsp.getShard(), e);
|
||||
}
|
||||
}
|
||||
} while (srsp != null);
|
||||
|
||||
// if all calls succeeded, return true
|
||||
if (failed > 0) {
|
||||
return false;
|
||||
}
|
||||
log.info("Successfully created all shards for collection "
|
||||
+ collectionName);
|
||||
return true;
|
||||
} catch (Exception ex) {
|
||||
// Expecting that the necessary logging has already been performed
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
private boolean collectionCmd(ClusterState clusterState, ZkNodeProps message, ModifiableSolrParams params) {
|
||||
log.info("Executing Collection Cmd : " + params);
|
||||
String name = message.getStr("name");
|
||||
String collectionName = message.getStr("name");
|
||||
|
||||
DocCollection coll = clusterState.getCollection(collectionName);
|
||||
|
||||
if (coll == null) {
|
||||
throw new SolrException(ErrorCode.BAD_REQUEST,
|
||||
"Could not find collection:" + collectionName);
|
||||
}
|
||||
|
||||
DocCollection coll = clusterState.getCollection(name);
|
||||
|
||||
for (Map.Entry<String,Slice> entry : coll.getSlicesMap().entrySet()) {
|
||||
Slice slice = entry.getValue();
|
||||
Map<String,Replica> shards = slice.getReplicasMap();
|
||||
|
@ -332,4 +364,15 @@ public class OverseerCollectionProcessor implements Runnable {
|
|||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
private int msgStrToInt(ZkNodeProps message, String key, Integer def)
|
||||
throws Exception {
|
||||
String str = message.getStr(key);
|
||||
try {
|
||||
return str == null ? def : Integer.parseInt(str);
|
||||
} catch (Exception ex) {
|
||||
SolrException.log(log, "Could not parse " + key, ex);
|
||||
throw ex;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -179,11 +179,12 @@ public class CollectionsHandler extends RequestHandlerBase {
|
|||
Integer numReplicas = req.getParams().getInt(OverseerCollectionProcessor.REPLICATION_FACTOR, 0);
|
||||
String name = req.getParams().required().get("name");
|
||||
String configName = req.getParams().get("collection.configName");
|
||||
String numShards = req.getParams().get("numShards");
|
||||
String numShards = req.getParams().get(OverseerCollectionProcessor.NUM_SLICES);
|
||||
String maxShardsPerNode = req.getParams().get(OverseerCollectionProcessor.MAX_SHARDS_PER_NODE);
|
||||
|
||||
ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION,
|
||||
OverseerCollectionProcessor.CREATECOLLECTION, OverseerCollectionProcessor.REPLICATION_FACTOR, numReplicas.toString(), "name", name,
|
||||
"collection.configName", configName, "numShards", numShards);
|
||||
"collection.configName", configName, OverseerCollectionProcessor.NUM_SLICES, numShards, OverseerCollectionProcessor.MAX_SHARDS_PER_NODE, maxShardsPerNode);
|
||||
|
||||
// TODO: what if you want to block until the collection is available?
|
||||
coreContainer.getZkController().getOverseerCollectionQueue().offer(ZkStateReader.toJSON(m));
|
||||
|
|
|
@ -385,7 +385,7 @@ public class BasicDistributedZkTest extends AbstractFullDistribZkTestBase {
|
|||
server.request(unloadCmd);
|
||||
|
||||
// there should be only one shard
|
||||
Slice shard2 = solrj.getZkStateReader().getClusterState().getSlice(collection, "shard2");
|
||||
Slice shard2 = getCommonCloudSolrServer().getZkStateReader().getClusterState().getSlice(collection, "shard2");
|
||||
long timeoutAt = System.currentTimeMillis() + 30000;
|
||||
while (shard2 != null) {
|
||||
if (System.currentTimeMillis() > timeoutAt) {
|
||||
|
@ -394,12 +394,12 @@ public class BasicDistributedZkTest extends AbstractFullDistribZkTestBase {
|
|||
}
|
||||
|
||||
Thread.sleep(50);
|
||||
shard2 = solrj.getZkStateReader().getClusterState().getSlice(collection, "shard2");
|
||||
shard2 = getCommonCloudSolrServer().getZkStateReader().getClusterState().getSlice(collection, "shard2");
|
||||
}
|
||||
|
||||
Slice shard1 = solrj.getZkStateReader().getClusterState().getSlice(collection, "shard1");
|
||||
Slice shard1 = getCommonCloudSolrServer().getZkStateReader().getClusterState().getSlice(collection, "shard1");
|
||||
assertNotNull(shard1);
|
||||
assertTrue(solrj.getZkStateReader().getClusterState().getCollections().contains(collection));
|
||||
assertTrue(getCommonCloudSolrServer().getZkStateReader().getClusterState().getCollections().contains(collection));
|
||||
|
||||
// now unload one of the other
|
||||
unloadCmd = new Unload(false);
|
||||
|
@ -409,7 +409,7 @@ public class BasicDistributedZkTest extends AbstractFullDistribZkTestBase {
|
|||
//printLayout();
|
||||
// the collection should be gone
|
||||
timeoutAt = System.currentTimeMillis() + 30000;
|
||||
while (solrj.getZkStateReader().getClusterState().getCollections().contains(collection)) {
|
||||
while (getCommonCloudSolrServer().getZkStateReader().getClusterState().getCollections().contains(collection)) {
|
||||
if (System.currentTimeMillis() > timeoutAt) {
|
||||
printLayout();
|
||||
fail("Still found collection");
|
||||
|
@ -437,7 +437,7 @@ public class BasicDistributedZkTest extends AbstractFullDistribZkTestBase {
|
|||
createCmd.setDataDir(core1DataDir);
|
||||
server.request(createCmd);
|
||||
|
||||
ZkStateReader zkStateReader = solrj.getZkStateReader();
|
||||
ZkStateReader zkStateReader = getCommonCloudSolrServer().getZkStateReader();
|
||||
|
||||
zkStateReader.updateClusterState(true);
|
||||
|
||||
|
@ -738,16 +738,17 @@ public class BasicDistributedZkTest extends AbstractFullDistribZkTestBase {
|
|||
int cnt = atLeast(3);
|
||||
|
||||
for (int i = 0; i < cnt; i++) {
|
||||
createCollection(collectionInfos, i,
|
||||
_TestUtil.nextInt(random(), 0, shardCount) + 1,
|
||||
_TestUtil.nextInt(random(), 0, 3) + 1);
|
||||
int numShards = _TestUtil.nextInt(random(), 0, shardCount) + 1;
|
||||
int numReplica = _TestUtil.nextInt(random(), 0, 3) + 1;
|
||||
int maxShardsPerNode = (((numShards * (numReplica + 1)) / getCommonCloudSolrServer().getZkStateReader().getClusterState().getLiveNodes().size())) + 1;
|
||||
createCollection(collectionInfos, i, numShards, numReplica, maxShardsPerNode);
|
||||
}
|
||||
|
||||
Set<Entry<String,List<Integer>>> collectionInfosEntrySet = collectionInfos.entrySet();
|
||||
for (Entry<String,List<Integer>> entry : collectionInfosEntrySet) {
|
||||
String collection = entry.getKey();
|
||||
List<Integer> list = entry.getValue();
|
||||
checkForCollection(collection, list.get(0));
|
||||
checkForCollection(collection, list);
|
||||
|
||||
String url = getUrlFromZk(collection);
|
||||
|
||||
|
@ -756,7 +757,7 @@ public class BasicDistributedZkTest extends AbstractFullDistribZkTestBase {
|
|||
// poll for a second - it can take a moment before we are ready to serve
|
||||
waitForNon403or404or503(collectionClient);
|
||||
}
|
||||
ZkStateReader zkStateReader = solrj.getZkStateReader();
|
||||
ZkStateReader zkStateReader = getCommonCloudSolrServer().getZkStateReader();
|
||||
for (int j = 0; j < cnt; j++) {
|
||||
waitForRecoveriesToFinish("awholynewcollection_" + j, zkStateReader, false);
|
||||
}
|
||||
|
@ -851,7 +852,10 @@ public class BasicDistributedZkTest extends AbstractFullDistribZkTestBase {
|
|||
request.setPath("/admin/collections");
|
||||
createNewSolrServer("", baseUrl).request(request);
|
||||
|
||||
checkForCollection(collectionName, 1);
|
||||
List<Integer> list = new ArrayList<Integer> (2);
|
||||
list.add(1);
|
||||
list.add(1);
|
||||
checkForCollection(collectionName, list);
|
||||
|
||||
url = getUrlFromZk(collectionName);
|
||||
|
||||
|
@ -863,21 +867,34 @@ public class BasicDistributedZkTest extends AbstractFullDistribZkTestBase {
|
|||
for (int j = 0; j < cnt; j++) {
|
||||
waitForRecoveriesToFinish(collectionName, zkStateReader, false);
|
||||
}
|
||||
|
||||
// test maxShardsPerNode
|
||||
int liveNodes = getCommonCloudSolrServer().getZkStateReader().getClusterState().getLiveNodes().size();
|
||||
int numShards = (liveNodes/2) + 1;
|
||||
int numReplica = 1;
|
||||
int maxShardsPerNode = 1;
|
||||
collectionInfos = new HashMap<String,List<Integer>>();
|
||||
createCollection(collectionInfos, cnt, numShards, numReplica, maxShardsPerNode);
|
||||
|
||||
// TODO: enable this check after removing the 60 second wait in it
|
||||
//checkCollectionIsNotCreated(collectionInfos.keySet().iterator().next());
|
||||
}
|
||||
|
||||
|
||||
protected void createCollection(Map<String,List<Integer>> collectionInfos,
|
||||
int i, int numShards, int numReplicas) throws SolrServerException, IOException {
|
||||
int i, int numShards, int numReplica, int maxShardsPerNode) throws SolrServerException, IOException {
|
||||
ModifiableSolrParams params = new ModifiableSolrParams();
|
||||
params.set("action", CollectionAction.CREATE.toString());
|
||||
|
||||
params.set("numShards", numShards);
|
||||
params.set(OverseerCollectionProcessor.REPLICATION_FACTOR, numReplicas);
|
||||
params.set(OverseerCollectionProcessor.NUM_SLICES, numShards);
|
||||
params.set(OverseerCollectionProcessor.REPLICATION_FACTOR, numReplica);
|
||||
params.set(OverseerCollectionProcessor.MAX_SHARDS_PER_NODE, maxShardsPerNode);
|
||||
String collectionName = "awholynewcollection_" + i;
|
||||
int clientIndex = random().nextInt(2);
|
||||
List<Integer> list = new ArrayList<Integer>();
|
||||
list.add(numShards);
|
||||
list.add(numReplicas);
|
||||
list.add(numReplica);
|
||||
list.add(maxShardsPerNode);
|
||||
collectionInfos.put(collectionName, list);
|
||||
params.set("name", collectionName);
|
||||
SolrRequest request = new QueryRequest(params);
|
||||
|
@ -923,7 +940,7 @@ public class BasicDistributedZkTest extends AbstractFullDistribZkTestBase {
|
|||
|
||||
private void collectStartTimes(String collectionName,
|
||||
Map<String,Long> urlToTime) throws SolrServerException, IOException {
|
||||
Map<String,DocCollection> collections = solrj.getZkStateReader()
|
||||
Map<String,DocCollection> collections = getCommonCloudSolrServer().getZkStateReader()
|
||||
.getClusterState().getCollectionStates();
|
||||
if (collections.containsKey(collectionName)) {
|
||||
Map<String,Slice> slices = collections.get(collectionName).getSlicesMap();
|
||||
|
@ -951,7 +968,7 @@ public class BasicDistributedZkTest extends AbstractFullDistribZkTestBase {
|
|||
}
|
||||
|
||||
private String getUrlFromZk(String collection) {
|
||||
ClusterState clusterState = solrj.getZkStateReader().getClusterState();
|
||||
ClusterState clusterState = getCommonCloudSolrServer().getZkStateReader().getClusterState();
|
||||
Map<String,Slice> slices = clusterState.getCollectionStates().get(collection).getSlicesMap();
|
||||
|
||||
if (slices == null) {
|
||||
|
@ -965,7 +982,7 @@ public class BasicDistributedZkTest extends AbstractFullDistribZkTestBase {
|
|||
for (Map.Entry<String,Replica> shardEntry : shardEntries) {
|
||||
final ZkNodeProps node = shardEntry.getValue();
|
||||
if (clusterState.liveNodesContain(node.getStr(ZkStateReader.NODE_NAME_PROP))) {
|
||||
return new ZkCoreNodeProps(node).getCoreUrl();
|
||||
return ZkCoreNodeProps.getCoreUrl(node.getStr(ZkStateReader.BASE_URL_PROP), collection); //new ZkCoreNodeProps(node).getCoreUrl();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -974,7 +991,7 @@ public class BasicDistributedZkTest extends AbstractFullDistribZkTestBase {
|
|||
}
|
||||
|
||||
private ZkCoreNodeProps getLeaderUrlFromZk(String collection, String slice) {
|
||||
ClusterState clusterState = solrj.getZkStateReader().getClusterState();
|
||||
ClusterState clusterState = getCommonCloudSolrServer().getZkStateReader().getClusterState();
|
||||
ZkNodeProps leader = clusterState.getLeader(collection, slice);
|
||||
if (leader == null) {
|
||||
throw new RuntimeException("Could not find leader:" + collection + " " + slice);
|
||||
|
@ -1008,33 +1025,60 @@ public class BasicDistributedZkTest extends AbstractFullDistribZkTestBase {
|
|||
fail("Could not find the new collection - " + exp.code() + " : " + collectionClient.getBaseURL());
|
||||
}
|
||||
|
||||
private void checkForCollection(String collectionName, int expectedSlices)
|
||||
throws Exception {
|
||||
// check for an expectedSlices new collection - we poll the state
|
||||
long timeoutAt = System.currentTimeMillis() + 60000;
|
||||
boolean found = false;
|
||||
boolean sliceMatch = false;
|
||||
while (System.currentTimeMillis() < timeoutAt) {
|
||||
ClusterState clusterState = solrj.getZkStateReader().getClusterState();
|
||||
private String checkCollectionExpectations(String collectionName, List<Integer> numShardsNumReplicaList) {
|
||||
ClusterState clusterState = getCommonCloudSolrServer().getZkStateReader().getClusterState();
|
||||
|
||||
int expectedSlices = numShardsNumReplicaList.get(0);
|
||||
// The Math.min thing is here, because we expect replication-factor to be reduced to if there are not enough live nodes to spread all shards of a collection over different nodes
|
||||
int expectedShardsPerSlice = numShardsNumReplicaList.get(1) + 1;
|
||||
int expectedTotalShards = expectedSlices * expectedShardsPerSlice;
|
||||
|
||||
Map<String,DocCollection> collections = clusterState
|
||||
.getCollectionStates();
|
||||
if (collections.containsKey(collectionName)) {
|
||||
Map<String,Slice> slices = collections.get(collectionName).getSlicesMap();
|
||||
// did we find expectedSlices slices/shards?
|
||||
if (slices.size() == expectedSlices) {
|
||||
sliceMatch = true;
|
||||
|
||||
if (slices.size() != expectedSlices) {
|
||||
return "Found new collection " + collectionName + ", but mismatch on number of slices. Expected: " + expectedSlices + ", actual: " + slices.size();
|
||||
}
|
||||
int totalShards = 0;
|
||||
for (String sliceName : slices.keySet()) {
|
||||
totalShards += slices.get(sliceName).getReplicas().size();
|
||||
}
|
||||
if (totalShards != expectedTotalShards) {
|
||||
return "Found new collection " + collectionName + " with correct number of slices, but mismatch on number of shards. Expected: " + expectedTotalShards + ", actual: " + totalShards;
|
||||
}
|
||||
found = true;
|
||||
return null;
|
||||
} else {
|
||||
return "Could not find new collection " + collectionName;
|
||||
}
|
||||
}
|
||||
|
||||
private void checkForCollection(String collectionName, List<Integer> numShardsNumReplicaList)
|
||||
throws Exception {
|
||||
// check for an expectedSlices new collection - we poll the state
|
||||
long timeoutAt = System.currentTimeMillis() + 120000;
|
||||
boolean success = false;
|
||||
String checkResult = "Didnt get to perform a single check";
|
||||
while (System.currentTimeMillis() < timeoutAt) {
|
||||
checkResult = checkCollectionExpectations(collectionName, numShardsNumReplicaList);
|
||||
if (checkResult == null) {
|
||||
success = true;
|
||||
break;
|
||||
}
|
||||
Thread.sleep(500);
|
||||
}
|
||||
if (!found) {
|
||||
if (!sliceMatch) {
|
||||
fail("Could not find new " + expectedSlices + " slice collection called " + collectionName);
|
||||
if (!success) {
|
||||
super.printLayout();
|
||||
fail(checkResult);
|
||||
}
|
||||
}
|
||||
|
||||
private void checkCollectionIsNotCreated(String collectionName)
|
||||
throws Exception {
|
||||
// nocommit
|
||||
Thread.sleep(60000);
|
||||
assertFalse(collectionName + " not supposed to exist", getCommonCloudSolrServer().getZkStateReader().getClusterState().getCollections().contains(collectionName));
|
||||
}
|
||||
|
||||
private void checkForMissingCollection(String collectionName)
|
||||
|
@ -1043,8 +1087,8 @@ public class BasicDistributedZkTest extends AbstractFullDistribZkTestBase {
|
|||
long timeoutAt = System.currentTimeMillis() + 15000;
|
||||
boolean found = true;
|
||||
while (System.currentTimeMillis() < timeoutAt) {
|
||||
solrj.getZkStateReader().updateClusterState(true);
|
||||
ClusterState clusterState = solrj.getZkStateReader().getClusterState();
|
||||
getCommonCloudSolrServer().getZkStateReader().updateClusterState(true);
|
||||
ClusterState clusterState = getCommonCloudSolrServer().getZkStateReader().getClusterState();
|
||||
Map<String,DocCollection> collections = clusterState
|
||||
.getCollectionStates();
|
||||
if (!collections.containsKey(collectionName)) {
|
||||
|
@ -1208,9 +1252,9 @@ public class BasicDistributedZkTest extends AbstractFullDistribZkTestBase {
|
|||
|
||||
|
||||
// no one should be recovering
|
||||
waitForRecoveriesToFinish(oneInstanceCollection2, solrj.getZkStateReader(), false, true);
|
||||
waitForRecoveriesToFinish(oneInstanceCollection2, getCommonCloudSolrServer().getZkStateReader(), false, true);
|
||||
|
||||
assertAllActive(oneInstanceCollection2, solrj.getZkStateReader());
|
||||
assertAllActive(oneInstanceCollection2, getCommonCloudSolrServer().getZkStateReader());
|
||||
|
||||
//printLayout();
|
||||
|
||||
|
@ -1231,7 +1275,7 @@ public class BasicDistributedZkTest extends AbstractFullDistribZkTestBase {
|
|||
|
||||
query.set("collection", oneInstanceCollection2);
|
||||
query.set("distrib", true);
|
||||
long allDocs = solrj.query(query).getResults().getNumFound();
|
||||
long allDocs = getCommonCloudSolrServer().query(query).getResults().getNumFound();
|
||||
|
||||
// System.out.println("1:" + oneDocs);
|
||||
// System.out.println("2:" + twoDocs);
|
||||
|
@ -1245,7 +1289,7 @@ public class BasicDistributedZkTest extends AbstractFullDistribZkTestBase {
|
|||
assertEquals(3, allDocs);
|
||||
|
||||
// we added a role of none on these creates - check for it
|
||||
ZkStateReader zkStateReader = solrj.getZkStateReader();
|
||||
ZkStateReader zkStateReader = getCommonCloudSolrServer().getZkStateReader();
|
||||
zkStateReader.updateClusterState(true);
|
||||
Map<String,Slice> slices = zkStateReader.getClusterState().getSlicesMap(oneInstanceCollection2);
|
||||
assertNotNull(slices);
|
||||
|
@ -1253,7 +1297,7 @@ public class BasicDistributedZkTest extends AbstractFullDistribZkTestBase {
|
|||
assertEquals("none", roles);
|
||||
|
||||
|
||||
ZkCoreNodeProps props = new ZkCoreNodeProps(solrj.getZkStateReader().getClusterState().getLeader(oneInstanceCollection2, "slice1"));
|
||||
ZkCoreNodeProps props = new ZkCoreNodeProps(getCommonCloudSolrServer().getZkStateReader().getClusterState().getLeader(oneInstanceCollection2, "slice1"));
|
||||
|
||||
// now test that unloading a core gets us a new leader
|
||||
HttpSolrServer server = new HttpSolrServer(baseUrl);
|
||||
|
@ -1316,8 +1360,8 @@ public class BasicDistributedZkTest extends AbstractFullDistribZkTestBase {
|
|||
SolrServer client3 = createNewSolrServer(oneInstanceCollection + "3", baseUrl);
|
||||
SolrServer client4 = createNewSolrServer(oneInstanceCollection + "4", baseUrl);
|
||||
|
||||
waitForRecoveriesToFinish(oneInstanceCollection, solrj.getZkStateReader(), false);
|
||||
assertAllActive(oneInstanceCollection, solrj.getZkStateReader());
|
||||
waitForRecoveriesToFinish(oneInstanceCollection, getCommonCloudSolrServer().getZkStateReader(), false);
|
||||
assertAllActive(oneInstanceCollection, getCommonCloudSolrServer().getZkStateReader());
|
||||
|
||||
client2.add(getDoc(id, "1"));
|
||||
client3.add(getDoc(id, "2"));
|
||||
|
@ -1333,7 +1377,7 @@ public class BasicDistributedZkTest extends AbstractFullDistribZkTestBase {
|
|||
|
||||
query.set("collection", oneInstanceCollection);
|
||||
query.set("distrib", true);
|
||||
long allDocs = solrj.query(query).getResults().getNumFound();
|
||||
long allDocs = getCommonCloudSolrServer().query(query).getResults().getNumFound();
|
||||
|
||||
// System.out.println("1:" + oneDocs);
|
||||
// System.out.println("2:" + twoDocs);
|
||||
|
@ -1401,21 +1445,21 @@ public class BasicDistributedZkTest extends AbstractFullDistribZkTestBase {
|
|||
indexDoc("collection2", getDoc(id, "10000000"));
|
||||
indexDoc("collection2", getDoc(id, "10000001"));
|
||||
indexDoc("collection2", getDoc(id, "10000003"));
|
||||
solrj.setDefaultCollection("collection2");
|
||||
solrj.add(getDoc(id, "10000004"));
|
||||
solrj.setDefaultCollection(null);
|
||||
getCommonCloudSolrServer().setDefaultCollection("collection2");
|
||||
getCommonCloudSolrServer().add(getDoc(id, "10000004"));
|
||||
getCommonCloudSolrServer().setDefaultCollection(null);
|
||||
|
||||
indexDoc("collection3", getDoc(id, "20000000"));
|
||||
indexDoc("collection3", getDoc(id, "20000001"));
|
||||
solrj.setDefaultCollection("collection3");
|
||||
solrj.add(getDoc(id, "10000005"));
|
||||
solrj.setDefaultCollection(null);
|
||||
getCommonCloudSolrServer().setDefaultCollection("collection3");
|
||||
getCommonCloudSolrServer().add(getDoc(id, "10000005"));
|
||||
getCommonCloudSolrServer().setDefaultCollection(null);
|
||||
|
||||
otherCollectionClients.get("collection2").get(0).commit();
|
||||
otherCollectionClients.get("collection3").get(0).commit();
|
||||
|
||||
solrj.setDefaultCollection("collection1");
|
||||
long collection1Docs = solrj.query(new SolrQuery("*:*")).getResults()
|
||||
getCommonCloudSolrServer().setDefaultCollection("collection1");
|
||||
long collection1Docs = getCommonCloudSolrServer().query(new SolrQuery("*:*")).getResults()
|
||||
.getNumFound();
|
||||
|
||||
long collection2Docs = otherCollectionClients.get("collection2").get(0)
|
||||
|
@ -1436,19 +1480,19 @@ public class BasicDistributedZkTest extends AbstractFullDistribZkTestBase {
|
|||
assertEquals(collection1Docs + collection2Docs + collection3Docs, found);
|
||||
|
||||
// try to search multiple with cloud client
|
||||
found = solrj.query(query).getResults().getNumFound();
|
||||
found = getCommonCloudSolrServer().query(query).getResults().getNumFound();
|
||||
assertEquals(collection1Docs + collection2Docs + collection3Docs, found);
|
||||
|
||||
query.set("collection", "collection2,collection3");
|
||||
found = solrj.query(query).getResults().getNumFound();
|
||||
found = getCommonCloudSolrServer().query(query).getResults().getNumFound();
|
||||
assertEquals(collection2Docs + collection3Docs, found);
|
||||
|
||||
query.set("collection", "collection3");
|
||||
found = solrj.query(query).getResults().getNumFound();
|
||||
found = getCommonCloudSolrServer().query(query).getResults().getNumFound();
|
||||
assertEquals(collection3Docs, found);
|
||||
|
||||
query.remove("collection");
|
||||
found = solrj.query(query).getResults().getNumFound();
|
||||
found = getCommonCloudSolrServer().query(query).getResults().getNumFound();
|
||||
assertEquals(collection1Docs, found);
|
||||
|
||||
assertEquals(collection3Docs, collection2Docs - 1);
|
||||
|
@ -1524,7 +1568,21 @@ public class BasicDistributedZkTest extends AbstractFullDistribZkTestBase {
|
|||
}
|
||||
}
|
||||
|
||||
volatile CloudSolrServer solrj;
|
||||
volatile CloudSolrServer commondCloudSolrServer;
|
||||
private CloudSolrServer getCommonCloudSolrServer() {
|
||||
if (commondCloudSolrServer == null) {
|
||||
synchronized(this) {
|
||||
try {
|
||||
commondCloudSolrServer = new CloudSolrServer(zkServer.getZkAddress());
|
||||
commondCloudSolrServer.setDefaultCollection(DEFAULT_COLLECTION);
|
||||
commondCloudSolrServer.connect();
|
||||
} catch (MalformedURLException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
return commondCloudSolrServer;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected QueryResponse queryServer(ModifiableSolrParams params) throws SolrServerException {
|
||||
|
@ -1532,31 +1590,18 @@ public class BasicDistributedZkTest extends AbstractFullDistribZkTestBase {
|
|||
if (r.nextBoolean())
|
||||
return super.queryServer(params);
|
||||
|
||||
// use the distributed solrj client
|
||||
if (solrj == null) {
|
||||
synchronized(this) {
|
||||
try {
|
||||
CloudSolrServer server = new CloudSolrServer(zkServer.getZkAddress());
|
||||
server.setDefaultCollection(DEFAULT_COLLECTION);
|
||||
solrj = server;
|
||||
} catch (MalformedURLException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (r.nextBoolean())
|
||||
params.set("collection",DEFAULT_COLLECTION);
|
||||
|
||||
QueryResponse rsp = solrj.query(params);
|
||||
QueryResponse rsp = getCommonCloudSolrServer().query(params);
|
||||
return rsp;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void tearDown() throws Exception {
|
||||
super.tearDown();
|
||||
if (solrj != null) {
|
||||
solrj.shutdown();
|
||||
if (commondCloudSolrServer != null) {
|
||||
commondCloudSolrServer.shutdown();
|
||||
}
|
||||
System.clearProperty("numShards");
|
||||
System.clearProperty("zkHost");
|
||||
|
|
Loading…
Reference in New Issue