mirror of https://github.com/apache/lucene.git
SOLR-13399: SPLITSHARD splitByPrefix for compositeId
This commit is contained in:
parent
b619bcd1fa
commit
c5cabf1e03
|
@ -81,6 +81,10 @@ New Features
|
|||
* SOLR-13375: Two dimensional routed aliases are now available for organizing collections based on the data values
|
||||
of two fields. (Gus Heck)
|
||||
|
||||
* SOLR-13399: SPLITSHARD implements a new splitByPrefix option that takes into account the actual document distribution
|
||||
when using compositeIds. The id prefix should be indexed into the "id_prefix" field for this feature to work.
|
||||
(yonik)
|
||||
|
||||
|
||||
Improvements
|
||||
----------------------
|
||||
|
|
|
@ -59,6 +59,7 @@ import org.apache.solr.common.params.CommonParams;
|
|||
import org.apache.solr.common.params.CoreAdminParams;
|
||||
import org.apache.solr.common.params.ModifiableSolrParams;
|
||||
import org.apache.solr.common.util.NamedList;
|
||||
import org.apache.solr.common.util.SimpleOrderedMap;
|
||||
import org.apache.solr.common.util.Utils;
|
||||
import org.apache.solr.handler.component.ShardHandler;
|
||||
import org.apache.solr.update.SolrIndexSplitter;
|
||||
|
@ -101,6 +102,8 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
|
|||
}
|
||||
|
||||
public boolean split(ClusterState clusterState, ZkNodeProps message, NamedList<Object> results) throws Exception {
|
||||
final String asyncId = message.getStr(ASYNC);
|
||||
|
||||
boolean waitForFinalState = message.getBool(CommonAdminParams.WAIT_FOR_FINAL_STATE, false);
|
||||
String methodStr = message.getStr(CommonAdminParams.SPLIT_METHOD, SolrIndexSplitter.SplitMethod.REWRITE.toLower());
|
||||
SolrIndexSplitter.SplitMethod splitMethod = SolrIndexSplitter.SplitMethod.get(methodStr);
|
||||
|
@ -202,7 +205,50 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
|
|||
|
||||
List<Map<String, Object>> replicas = new ArrayList<>((repFactor - 1) * 2);
|
||||
|
||||
@SuppressWarnings("deprecation")
|
||||
ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler(ocmh.overseer.getCoreContainer().getUpdateShardHandler().getDefaultHttpClient());
|
||||
|
||||
|
||||
if (message.getBool(CommonAdminParams.SPLIT_BY_PREFIX, true)) {
|
||||
t = timings.sub("getRanges");
|
||||
|
||||
log.info("Requesting split ranges from replica " + parentShardLeader.getName() + " as part of slice " + slice + " of collection "
|
||||
+ collectionName + " on " + parentShardLeader);
|
||||
|
||||
ModifiableSolrParams params = new ModifiableSolrParams();
|
||||
params.set(CoreAdminParams.ACTION, CoreAdminParams.CoreAdminAction.SPLIT.toString());
|
||||
params.set(CoreAdminParams.GET_RANGES, "true");
|
||||
params.set(CommonAdminParams.SPLIT_METHOD, splitMethod.toLower());
|
||||
params.set(CoreAdminParams.CORE, parentShardLeader.getStr("core"));
|
||||
int numSubShards = message.getInt(NUM_SUB_SHARDS, DEFAULT_NUM_SUB_SHARDS);
|
||||
params.set(NUM_SUB_SHARDS, Integer.toString(numSubShards));
|
||||
|
||||
{
|
||||
final ShardRequestTracker shardRequestTracker = ocmh.asyncRequestTracker(asyncId);
|
||||
shardRequestTracker.sendShardRequest(parentShardLeader.getNodeName(), params, shardHandler);
|
||||
SimpleOrderedMap<Object> getRangesResults = new SimpleOrderedMap<>();
|
||||
shardRequestTracker.processResponses(getRangesResults, shardHandler, true, "SPLITSHARD failed to invoke SPLIT.getRanges core admin command");
|
||||
|
||||
// Extract the recommended splits from the shard response (if it exists)
|
||||
// example response: getRangesResults={success={127.0.0.1:62086_solr={responseHeader={status=0,QTime=1},ranges=10-20,3a-3f}}}
|
||||
NamedList successes = (NamedList)getRangesResults.get("success");
|
||||
if (successes != null && successes.size() > 0) {
|
||||
NamedList shardRsp = (NamedList)successes.getVal(0);
|
||||
String splits = (String)shardRsp.get(CoreAdminParams.RANGES);
|
||||
if (splits != null) {
|
||||
log.info("Resulting split range to be used is " + splits);
|
||||
// change the message to use the recommended split ranges
|
||||
message = message.plus(CoreAdminParams.RANGES, splits);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
t.stop();
|
||||
}
|
||||
|
||||
|
||||
t = timings.sub("fillRanges");
|
||||
|
||||
String rangesStr = fillRanges(ocmh.cloudManager, message, collection, parentSlice, subRanges, subSlices, subShardNames, firstNrtReplica);
|
||||
t.stop();
|
||||
|
||||
|
@ -241,7 +287,6 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
|
|||
collection = clusterState.getCollection(collectionName);
|
||||
}
|
||||
|
||||
final String asyncId = message.getStr(ASYNC);
|
||||
String nodeName = parentShardLeader.getNodeName();
|
||||
|
||||
t = timings.sub("createSubSlicesAndLeadersInState");
|
||||
|
@ -293,8 +338,7 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
|
|||
ocmh.addReplica(clusterState, new ZkNodeProps(propMap), results, null);
|
||||
}
|
||||
|
||||
@SuppressWarnings("deprecation")
|
||||
ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler(ocmh.overseer.getCoreContainer().getUpdateShardHandler().getDefaultHttpClient());
|
||||
|
||||
{
|
||||
final ShardRequestTracker syncRequestTracker = ocmh.syncRequestTracker();
|
||||
syncRequestTracker.processResponses(results, shardHandler, true, "SPLITSHARD failed to create subshard leaders");
|
||||
|
|
|
@ -17,16 +17,24 @@
|
|||
|
||||
package org.apache.solr.handler.admin;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.lang.invoke.MethodHandles;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.TreeMap;
|
||||
|
||||
import org.apache.lucene.index.MultiTerms;
|
||||
import org.apache.lucene.index.Terms;
|
||||
import org.apache.lucene.index.TermsEnum;
|
||||
import org.apache.lucene.util.BytesRef;
|
||||
import org.apache.solr.cloud.CloudDescriptor;
|
||||
import org.apache.solr.cloud.ZkShardTerms;
|
||||
import org.apache.solr.common.SolrException;
|
||||
import org.apache.solr.common.cloud.ClusterState;
|
||||
import org.apache.solr.common.cloud.CompositeIdRouter;
|
||||
import org.apache.solr.common.cloud.DocCollection;
|
||||
import org.apache.solr.common.cloud.DocRouter;
|
||||
import org.apache.solr.common.cloud.Slice;
|
||||
|
@ -36,13 +44,16 @@ import org.apache.solr.common.params.SolrParams;
|
|||
import org.apache.solr.core.SolrCore;
|
||||
import org.apache.solr.request.LocalSolrQueryRequest;
|
||||
import org.apache.solr.request.SolrQueryRequest;
|
||||
import org.apache.solr.search.SolrIndexSearcher;
|
||||
import org.apache.solr.update.SolrIndexSplitter;
|
||||
import org.apache.solr.update.SplitIndexCommand;
|
||||
import org.apache.solr.util.RefCounted;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import static org.apache.solr.common.cloud.DocCollection.DOC_ROUTER;
|
||||
import static org.apache.solr.common.params.CommonParams.PATH;
|
||||
import static org.apache.solr.common.params.CoreAdminParams.GET_RANGES;
|
||||
|
||||
|
||||
class SplitOp implements CoreAdminHandler.CoreAdminOp {
|
||||
|
@ -52,6 +63,16 @@ class SplitOp implements CoreAdminHandler.CoreAdminOp {
|
|||
@Override
|
||||
public void execute(CoreAdminHandler.CallInfo it) throws Exception {
|
||||
SolrParams params = it.req.getParams();
|
||||
|
||||
String splitKey = params.get("split.key");
|
||||
String[] newCoreNames = params.getParams("targetCore");
|
||||
String cname = params.get(CoreAdminParams.CORE, "");
|
||||
|
||||
if ( params.getBool(GET_RANGES, false) ) {
|
||||
handleGetRanges(it, cname);
|
||||
return;
|
||||
}
|
||||
|
||||
List<DocRouter.Range> ranges = null;
|
||||
|
||||
String[] pathsArr = params.getParams(PATH);
|
||||
|
@ -71,9 +92,6 @@ class SplitOp implements CoreAdminHandler.CoreAdminOp {
|
|||
}
|
||||
}
|
||||
}
|
||||
String splitKey = params.get("split.key");
|
||||
String[] newCoreNames = params.getParams("targetCore");
|
||||
String cname = params.get(CoreAdminParams.CORE, "");
|
||||
|
||||
if ((pathsArr == null || pathsArr.length == 0) && (newCoreNames == null || newCoreNames.length == 0)) {
|
||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Either path or targetCore param must be specified");
|
||||
|
@ -166,4 +184,228 @@ class SplitOp implements CoreAdminHandler.CoreAdminOp {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
// This is called when splitByPrefix is used.
|
||||
// The overseer called us to get recommended splits taking into
|
||||
// account actual document distribution over the hash space.
|
||||
private void handleGetRanges(CoreAdminHandler.CallInfo it, String coreName) throws Exception {
|
||||
|
||||
SolrCore parentCore = it.handler.coreContainer.getCore(coreName);
|
||||
if (parentCore == null) {
|
||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Unknown core " + coreName);
|
||||
}
|
||||
|
||||
RefCounted<SolrIndexSearcher> searcherHolder = parentCore.getRealtimeSearcher();
|
||||
|
||||
try {
|
||||
if (!it.handler.coreContainer.isZooKeeperAware()) {
|
||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Shard splitByPrefix requires SolrCloud mode.");
|
||||
} else {
|
||||
String routeFieldName = "id";
|
||||
String prefixField = "id_prefix";
|
||||
|
||||
ClusterState clusterState = it.handler.coreContainer.getZkController().getClusterState();
|
||||
String collectionName = parentCore.getCoreDescriptor().getCloudDescriptor().getCollectionName();
|
||||
DocCollection collection = clusterState.getCollection(collectionName);
|
||||
String sliceName = parentCore.getCoreDescriptor().getCloudDescriptor().getShardId();
|
||||
Slice slice = collection.getSlice(sliceName);
|
||||
DocRouter router = collection.getRouter() != null ? collection.getRouter() : DocRouter.DEFAULT;
|
||||
DocRouter.Range currentRange = slice.getRange();
|
||||
|
||||
Object routerObj = collection.get(DOC_ROUTER); // for back-compat with Solr 4.4
|
||||
if (routerObj instanceof Map) {
|
||||
Map routerProps = (Map) routerObj;
|
||||
routeFieldName = (String) routerProps.get("field");
|
||||
}
|
||||
|
||||
Collection<RangeCount> counts = getHashHistogram(searcherHolder.get(), prefixField, router, collection);
|
||||
Collection<DocRouter.Range> splits = getSplits(counts, currentRange);
|
||||
String splitString = toSplitString(splits);
|
||||
|
||||
if (splitString == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
it.rsp.add(CoreAdminParams.RANGES, splitString);
|
||||
}
|
||||
} finally {
|
||||
if (searcherHolder != null) searcherHolder.decref();
|
||||
if (parentCore != null) parentCore.close();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
static class RangeCount implements Comparable<RangeCount> {
|
||||
DocRouter.Range range;
|
||||
int count;
|
||||
|
||||
public RangeCount(DocRouter.Range range, int count) {
|
||||
this.range = range;
|
||||
this.count = count;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return range.hashCode();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
if (!(obj instanceof RangeCount)) return false;
|
||||
return this.range.equals( ((RangeCount)obj).range );
|
||||
}
|
||||
|
||||
@Override
|
||||
public int compareTo(RangeCount o) {
|
||||
return this.range.compareTo(o.range);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
static String toSplitString(Collection<DocRouter.Range> splits) throws Exception {
|
||||
if (splits == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
StringBuilder sb = new StringBuilder();
|
||||
for (DocRouter.Range range : splits) {
|
||||
if (sb.length() > 0) {
|
||||
sb.append(",");
|
||||
}
|
||||
sb.append(range);
|
||||
}
|
||||
|
||||
|
||||
return sb.toString();
|
||||
}
|
||||
|
||||
|
||||
// Returns a list of range counts sorted by the range lower bound
|
||||
static Collection<RangeCount> getHashHistogram(SolrIndexSearcher searcher, String prefixField, DocRouter router, DocCollection collection) throws IOException {
|
||||
TreeMap<DocRouter.Range,RangeCount> counts = new TreeMap<>();
|
||||
|
||||
Terms terms = MultiTerms.getTerms(searcher.getIndexReader(), prefixField);
|
||||
if (terms == null) {
|
||||
return counts.values();
|
||||
}
|
||||
|
||||
TermsEnum termsEnum = terms.iterator();
|
||||
for (;;) {
|
||||
BytesRef term = termsEnum.next();
|
||||
if (term == null) break;
|
||||
|
||||
String termStr = term.utf8ToString();
|
||||
int firstSep = termStr.indexOf(CompositeIdRouter.SEPARATOR);
|
||||
// truncate to first separator since we don't support multiple levels currently
|
||||
if (firstSep != termStr.length()-1 && firstSep > 0) {
|
||||
termStr = termStr.substring(0, firstSep+1);
|
||||
}
|
||||
DocRouter.Range range = router.getSearchRangeSingle(termStr, null, collection);
|
||||
int numDocs = termsEnum.docFreq();
|
||||
|
||||
RangeCount rangeCount = new RangeCount(range, numDocs);
|
||||
|
||||
RangeCount prev = counts.put(rangeCount.range, rangeCount);
|
||||
if (prev != null) {
|
||||
// we hit a hash collision or truncated a prefix to first level, so add the buckets together.
|
||||
rangeCount.count += prev.count;
|
||||
}
|
||||
}
|
||||
|
||||
return counts.values();
|
||||
}
|
||||
|
||||
|
||||
// returns the list of recommended splits, or null if there is not enough information
|
||||
static Collection<DocRouter.Range> getSplits(Collection<RangeCount> rawCounts, DocRouter.Range currentRange) throws Exception {
|
||||
|
||||
int totalCount = 0;
|
||||
RangeCount biggest = null; // keep track of the largest in case we need to split it out into it's own shard
|
||||
RangeCount last = null; // keep track of what the last range is
|
||||
|
||||
// Remove counts that don't overlap with currentRange (can happen if someone overrode document routing)
|
||||
List<RangeCount> counts = new ArrayList<>(rawCounts.size());
|
||||
for (RangeCount rangeCount : rawCounts) {
|
||||
if (!rangeCount.range.overlaps(currentRange)) {
|
||||
continue;
|
||||
}
|
||||
totalCount += rangeCount.count;
|
||||
if (biggest == null || rangeCount.count > biggest.count) {
|
||||
biggest = rangeCount;
|
||||
}
|
||||
counts.add(rangeCount);
|
||||
last = rangeCount;
|
||||
}
|
||||
|
||||
if (counts.size() == 0) {
|
||||
// we don't have any data to go off of, so do the split the normal way
|
||||
return null;
|
||||
}
|
||||
|
||||
|
||||
List<DocRouter.Range> targetRanges = new ArrayList<>();
|
||||
|
||||
if (counts.size() == 1) {
|
||||
// We have a single range, so we should split it.
|
||||
|
||||
// It may already be a partial range, so figure that out
|
||||
int lower = Math.max(last.range.min, currentRange.min);
|
||||
int upper = Math.min(last.range.max, currentRange.max);
|
||||
int mid = lower + (upper-lower)/2;
|
||||
if (mid == lower || mid == upper) {
|
||||
// shard too small... this should pretty much never happen, but use default split logic if it does.
|
||||
return null;
|
||||
}
|
||||
|
||||
// Make sure to include the shard's current range in the new ranges so we don't create useless empty shards.
|
||||
DocRouter.Range lowerRange = new DocRouter.Range(currentRange.min, mid);
|
||||
DocRouter.Range upperRange = new DocRouter.Range(mid+1, currentRange.max);
|
||||
targetRanges.add(lowerRange);
|
||||
targetRanges.add(upperRange);
|
||||
|
||||
return targetRanges;
|
||||
}
|
||||
|
||||
// We have at least two ranges, so we want to partition the ranges
|
||||
// and avoid splitting any individual range.
|
||||
|
||||
int targetCount = totalCount / 2;
|
||||
RangeCount middle = null;
|
||||
RangeCount prev = null;
|
||||
int currCount = 0;
|
||||
for (RangeCount rangeCount : counts) {
|
||||
currCount += rangeCount.count;
|
||||
if (currCount >= targetCount) { // this should at least be true on the last range
|
||||
middle = rangeCount;
|
||||
break;
|
||||
}
|
||||
prev = rangeCount;
|
||||
}
|
||||
|
||||
// check if using the range before the middle one would make a better split point
|
||||
int overError = currCount - targetCount; // error if we include middle in first split
|
||||
int underError = targetCount - (currCount - middle.count); // error if we include middle in second split
|
||||
if (underError < overError) {
|
||||
middle = prev;
|
||||
}
|
||||
|
||||
// if the middle range turned out to be the last one, pick the one before it instead
|
||||
if (middle == last) {
|
||||
middle = prev;
|
||||
}
|
||||
|
||||
// Make sure to include the shard's current range in the new ranges so we don't create useless empty shards.
|
||||
DocRouter.Range lowerRange = new DocRouter.Range(currentRange.min, middle.range.max);
|
||||
DocRouter.Range upperRange = new DocRouter.Range(middle.range.max+1, currentRange.max);
|
||||
targetRanges.add(lowerRange);
|
||||
targetRanges.add(upperRange);
|
||||
|
||||
return targetRanges;
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
|
|
@ -36,4 +36,15 @@
|
|||
</fieldType>
|
||||
|
||||
<uniqueKey>id</uniqueKey>
|
||||
|
||||
<!-- needed for splitByPrefix -->
|
||||
<field name="id_prefix" type="composite_id_prefix" indexed="true" stored="false"/>
|
||||
<copyField source="id" dest="id_prefix"/>
|
||||
<fieldtype name="composite_id_prefix" class="solr.TextField">
|
||||
<analyzer>
|
||||
<tokenizer class="solr.PatternTokenizerFactory" pattern=".*!" group="0"/>
|
||||
</analyzer>
|
||||
</fieldtype>
|
||||
|
||||
|
||||
</schema>
|
||||
|
|
|
@ -0,0 +1,259 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.solr.cloud.api.collections;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.lang.invoke.MethodHandles;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.solr.client.solrj.SolrServerException;
|
||||
import org.apache.solr.client.solrj.impl.CloudSolrClient;
|
||||
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
|
||||
import org.apache.solr.cloud.SolrCloudTestCase;
|
||||
import org.apache.solr.common.SolrInputDocument;
|
||||
import org.apache.solr.common.cloud.CompositeIdRouter;
|
||||
import org.apache.solr.common.cloud.DocCollection;
|
||||
import org.apache.solr.common.cloud.DocRouter;
|
||||
import org.apache.solr.common.cloud.Slice;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
//
|
||||
// This class tests higher level SPLITSHARD functionality when splitByPrefix is specified.
|
||||
// See SplitHandlerTest for random tests of lower-level split selection logic.
|
||||
//
|
||||
public class SplitByPrefixTest extends SolrCloudTestCase {
|
||||
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||
|
||||
private final String COLLECTION_NAME = "c1";
|
||||
|
||||
@BeforeClass
|
||||
public static void setupCluster() throws Exception {
|
||||
System.setProperty("managed.schema.mutable", "true"); // needed by cloud-managed config set
|
||||
|
||||
configureCluster(1)
|
||||
.addConfig("conf", configset("cloud-managed")) // cloud-managed has the id copyfield to id_prefix
|
||||
.configure();
|
||||
}
|
||||
|
||||
@Before
|
||||
@Override
|
||||
public void setUp() throws Exception {
|
||||
super.setUp();
|
||||
}
|
||||
|
||||
@After
|
||||
@Override
|
||||
public void tearDown() throws Exception {
|
||||
super.tearDown();
|
||||
cluster.deleteAllCollections();
|
||||
}
|
||||
|
||||
static class Prefix implements Comparable<Prefix> {
|
||||
String key;
|
||||
DocRouter.Range range;
|
||||
|
||||
@Override
|
||||
public int compareTo(Prefix o) {
|
||||
return range.compareTo(o.range);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "prefix=" + key + ",range="+range;
|
||||
}
|
||||
}
|
||||
|
||||
// find prefixes (shard keys) matching certain criteria
|
||||
public List<Prefix> findPrefixes(int numToFind, int lowerBound, int upperBound) {
|
||||
CompositeIdRouter router = new CompositeIdRouter();
|
||||
|
||||
ArrayList<Prefix> prefixes = new ArrayList<>();
|
||||
int maxTries = 1000000;
|
||||
int numFound = 0;
|
||||
for (int i=0; i<maxTries; i++) {
|
||||
String shardKey = Integer.toHexString(i)+"!";
|
||||
DocRouter.Range range = router.getSearchRangeSingle(shardKey, null, null);
|
||||
int lower = range.min;
|
||||
if (lower >= lowerBound && lower <= upperBound) {
|
||||
Prefix prefix = new Prefix();
|
||||
prefix.key = shardKey;
|
||||
prefix.range = range;
|
||||
prefixes.add(prefix);
|
||||
if (++numFound >= numToFind) break;
|
||||
}
|
||||
}
|
||||
|
||||
Collections.sort(prefixes);
|
||||
|
||||
return prefixes;
|
||||
}
|
||||
|
||||
// remove duplicate prefixes from the sorted prefix list
|
||||
List<Prefix> removeDups(List<Prefix> prefixes) {
|
||||
ArrayList<Prefix> result = new ArrayList<>();
|
||||
Prefix last = null;
|
||||
for (Prefix prefix : prefixes) {
|
||||
if (last!=null && prefix.range.equals(last.range)) {
|
||||
continue;
|
||||
}
|
||||
last = prefix;
|
||||
result.add(prefix);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
// Randomly add a second level prefix to test that
|
||||
// they are all collapsed to a single bucket. This behavior should change if/when counting support
|
||||
// for more levels of compositeId values
|
||||
SolrInputDocument getDoc(String prefix, String unique) {
|
||||
String secondLevel = "";
|
||||
if (random().nextBoolean()) {
|
||||
secondLevel="" + random().nextInt(2) + "!";
|
||||
}
|
||||
return sdoc("id", prefix + secondLevel + unique);
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void doTest() throws IOException, SolrServerException {
|
||||
CollectionAdminRequest
|
||||
.createCollection(COLLECTION_NAME, "conf", 1, 1)
|
||||
.setMaxShardsPerNode(100)
|
||||
.process(cluster.getSolrClient());
|
||||
|
||||
cluster.waitForActiveCollection(COLLECTION_NAME, 1, 1);
|
||||
|
||||
|
||||
CloudSolrClient client = cluster.getSolrClient();
|
||||
client.setDefaultCollection(COLLECTION_NAME);
|
||||
|
||||
// splitting an empty collection by prefix should still work (i.e. fall back to old method of just dividing the hash range
|
||||
|
||||
CollectionAdminRequest.SplitShard splitShard = CollectionAdminRequest.splitShard(COLLECTION_NAME)
|
||||
.setNumSubShards(2)
|
||||
.setSplitByPrefix(true)
|
||||
.setShardName("shard1");
|
||||
splitShard.process(client);
|
||||
waitForState("Timed out waiting for sub shards to be active.",
|
||||
COLLECTION_NAME, activeClusterShape(2, 3)); // expectedReplicas==3 because original replica still exists (just inactive)
|
||||
|
||||
|
||||
List<Prefix> prefixes = findPrefixes(20, 0, 0x00ffffff);
|
||||
List<Prefix> uniquePrefixes = removeDups(prefixes);
|
||||
if (uniquePrefixes.size() % 2 == 1) { // make it an even sized list so we can split it exactly in two
|
||||
uniquePrefixes.remove(uniquePrefixes.size()-1);
|
||||
}
|
||||
log.info("Unique prefixes: " + uniquePrefixes);
|
||||
|
||||
for (Prefix prefix : uniquePrefixes) {
|
||||
client.add( getDoc(prefix.key, "doc1") );
|
||||
client.add( getDoc(prefix.key, "doc2") );
|
||||
}
|
||||
client.commit();
|
||||
|
||||
|
||||
splitShard = CollectionAdminRequest.splitShard(COLLECTION_NAME)
|
||||
.setSplitByPrefix(true)
|
||||
.setShardName("shard1_1"); // should start out with the range of 0-7fffffff
|
||||
splitShard.process(client);
|
||||
waitForState("Timed out waiting for sub shards to be active.",
|
||||
COLLECTION_NAME, activeClusterShape(3, 5));
|
||||
|
||||
// OK, now let's check that the correct split point was chosen
|
||||
// We can use the router to find the shards for the middle prefixes and they should be different.
|
||||
|
||||
DocCollection collection = client.getZkStateReader().getClusterState().getCollection(COLLECTION_NAME);
|
||||
Collection<Slice> slices1 = collection.getRouter().getSearchSlicesSingle(uniquePrefixes.get(uniquePrefixes.size()/2 - 1).key, null, collection);
|
||||
Collection<Slice> slices2 = collection.getRouter().getSearchSlicesSingle(uniquePrefixes.get(uniquePrefixes.size()/2 ).key, null, collection);
|
||||
|
||||
Slice slice1 = slices1.iterator().next();
|
||||
Slice slice2 = slices2.iterator().next();
|
||||
|
||||
assertTrue(slices1.size() == 1 && slices2.size() == 1);
|
||||
assertTrue(slice1 != slice2);
|
||||
|
||||
|
||||
//
|
||||
// now lets add enough documents to the first prefix to get it split out on it's own
|
||||
//
|
||||
for (int i=0; i<uniquePrefixes.size(); i++) {
|
||||
client.add( getDoc(uniquePrefixes.get(0).key, "doc"+(i+100)));
|
||||
}
|
||||
client.commit();
|
||||
|
||||
splitShard = CollectionAdminRequest.splitShard(COLLECTION_NAME)
|
||||
.setSplitByPrefix(true)
|
||||
.setShardName(slice1.getName());
|
||||
splitShard.process(client);
|
||||
waitForState("Timed out waiting for sub shards to be active.",
|
||||
COLLECTION_NAME, activeClusterShape(4, 7));
|
||||
|
||||
collection = client.getZkStateReader().getClusterState().getCollection(COLLECTION_NAME);
|
||||
slices1 = collection.getRouter().getSearchSlicesSingle(uniquePrefixes.get(0).key, null, collection);
|
||||
slices2 = collection.getRouter().getSearchSlicesSingle(uniquePrefixes.get(1).key, null, collection);
|
||||
|
||||
slice1 = slices1.iterator().next();
|
||||
slice2 = slices2.iterator().next();
|
||||
|
||||
assertTrue(slices1.size() == 1 && slices2.size() == 1);
|
||||
assertTrue(slice1 != slice2);
|
||||
|
||||
|
||||
// Now if we call split (with splitByPrefix) on a shard that has a single prefix, it should split it in half
|
||||
|
||||
splitShard = CollectionAdminRequest.splitShard(COLLECTION_NAME)
|
||||
.setSplitByPrefix(true)
|
||||
.setShardName(slice1.getName());
|
||||
splitShard.process(client);
|
||||
waitForState("Timed out waiting for sub shards to be active.",
|
||||
COLLECTION_NAME, activeClusterShape(5, 9));
|
||||
|
||||
collection = client.getZkStateReader().getClusterState().getCollection(COLLECTION_NAME);
|
||||
slices1 = collection.getRouter().getSearchSlicesSingle(uniquePrefixes.get(0).key, null, collection);
|
||||
slice1 = slices1.iterator().next();
|
||||
|
||||
assertTrue(slices1.size() == 2);
|
||||
|
||||
//
|
||||
// split one more time, this time on a partial prefix/bucket
|
||||
//
|
||||
splitShard = CollectionAdminRequest.splitShard(COLLECTION_NAME)
|
||||
.setSplitByPrefix(true)
|
||||
.setShardName(slice1.getName());
|
||||
splitShard.process(client);
|
||||
waitForState("Timed out waiting for sub shards to be active.",
|
||||
COLLECTION_NAME, activeClusterShape(6, 11));
|
||||
|
||||
collection = client.getZkStateReader().getClusterState().getCollection(COLLECTION_NAME);
|
||||
slices1 = collection.getRouter().getSearchSlicesSingle(uniquePrefixes.get(0).key, null, collection);
|
||||
|
||||
assertTrue(slices1.size() == 3);
|
||||
|
||||
// System.err.println("### STATE=" + cluster.getSolrClient().getZkStateReader().getClusterState().getCollection(COLLECTION_NAME));
|
||||
// System.err.println("### getActiveSlices()=" + cluster.getSolrClient().getZkStateReader().getClusterState().getCollection(COLLECTION_NAME).getActiveSlices());
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,218 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.solr.handler.admin;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Random;
|
||||
|
||||
import org.apache.solr.SolrTestCaseJ4;
|
||||
import org.apache.solr.common.cloud.DocRouter;
|
||||
import org.junit.Test;
|
||||
|
||||
// test low level splitByPrefix range recommendations.
|
||||
// This is here to access package private methods.
|
||||
// See SplitByPrefixTest for cloud level tests of SPLITSHARD that use this by passing getRanges with the SPLIT command
|
||||
public class SplitHandlerTest extends SolrTestCaseJ4 {
|
||||
|
||||
|
||||
void verifyContiguous(Collection<DocRouter.Range> results, DocRouter.Range currentRange) {
|
||||
if (results == null) return;
|
||||
|
||||
assertTrue(results.size() > 1);
|
||||
|
||||
DocRouter.Range prev = null;
|
||||
for (DocRouter.Range range : results) {
|
||||
if (prev == null) {
|
||||
// first range
|
||||
assertEquals(range.min, currentRange.min);
|
||||
} else {
|
||||
// make sure produced ranges are contiguous
|
||||
assertEquals(range.min, prev.max + 1);
|
||||
}
|
||||
prev = range;
|
||||
}
|
||||
assertEquals(prev.max, currentRange.max);
|
||||
}
|
||||
|
||||
|
||||
// bias around special numbers
|
||||
int randomBound(Random rand) {
|
||||
int ret = 0;
|
||||
switch(rand.nextInt(10)) {
|
||||
case 0: ret = Integer.MIN_VALUE; break;
|
||||
case 1: ret = Integer.MAX_VALUE; break;
|
||||
case 2: ret = 0; break;
|
||||
default: ret = rand.nextInt();
|
||||
}
|
||||
if (rand.nextBoolean()) {
|
||||
ret += rand.nextInt(2000) - 1000;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRandomSplitRecommendations() throws Exception {
|
||||
Random rand = random();
|
||||
for (int i=0; i<10000; i++) { // 1M takes ~ 1 sec
|
||||
doRandomSplitRecommendation(rand);
|
||||
}
|
||||
}
|
||||
|
||||
public void doRandomSplitRecommendation(Random rand) throws Exception {
|
||||
int low = 0;
|
||||
int high = 0;
|
||||
|
||||
while (high-low < 10) {
|
||||
low = randomBound(rand);
|
||||
high = randomBound(rand);
|
||||
if (low > high) {
|
||||
int tmp = low;
|
||||
low = high;
|
||||
high = tmp;
|
||||
}
|
||||
}
|
||||
|
||||
DocRouter.Range curr = new DocRouter.Range(low,high);
|
||||
|
||||
|
||||
int maxRanges = rand.nextInt(20);
|
||||
|
||||
int start = low;
|
||||
|
||||
// bucket can start before or after
|
||||
if (rand.nextBoolean()) {
|
||||
start += rand.nextInt(200) - 100;
|
||||
if (start > low) {
|
||||
// underflow
|
||||
start = Integer.MIN_VALUE;
|
||||
}
|
||||
}
|
||||
|
||||
List<SplitOp.RangeCount> counts = new ArrayList<>(maxRanges);
|
||||
for (;;) {
|
||||
int end = start + rand.nextInt(100) + 1;
|
||||
if (end < start) {
|
||||
// overflow
|
||||
end = Integer.MAX_VALUE;
|
||||
}
|
||||
counts.add( new SplitOp.RangeCount(new DocRouter.Range(start, end), rand.nextInt(1000)+1));
|
||||
if (counts.size() >= maxRanges) break;
|
||||
if (counts.size() == maxRanges / 2 && rand.nextBoolean()) {
|
||||
// transition toward the end of the range (more boundary cases for large ranges)
|
||||
start = high - rand.nextInt(100);
|
||||
start = Math.max(start, end+1);
|
||||
} else {
|
||||
start = end + 1;
|
||||
}
|
||||
if (rand.nextBoolean()) {
|
||||
start += rand.nextInt(100);
|
||||
}
|
||||
if (start < end) {
|
||||
// overflow
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
Collection<DocRouter.Range> results = SplitOp.getSplits(counts, curr);
|
||||
verifyContiguous(results, curr);
|
||||
} catch (Throwable e) {
|
||||
// System.err.println(e);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testSplitRecommendations() throws Exception {
|
||||
|
||||
// split whole range exactly in two
|
||||
DocRouter.Range curr = new DocRouter.Range(10,15);
|
||||
List<SplitOp.RangeCount> counts = new ArrayList<>();
|
||||
counts.add(new SplitOp.RangeCount(new DocRouter.Range(10,15), 100));
|
||||
Collection<DocRouter.Range> results = SplitOp.getSplits(counts, curr);
|
||||
assertEquals(12, results.iterator().next().max);
|
||||
verifyContiguous(results, curr);
|
||||
|
||||
// make sure range with docs is split in half even if current range of shard is bigger
|
||||
curr = new DocRouter.Range(-100,101);
|
||||
counts = new ArrayList<>();
|
||||
counts.add(new SplitOp.RangeCount(new DocRouter.Range(10,15), 100));
|
||||
results = SplitOp.getSplits(counts, curr);
|
||||
assertEquals(12, results.iterator().next().max);
|
||||
verifyContiguous(results, curr);
|
||||
|
||||
// don't freak out if we encounter some ranges outside of the current defined shard range
|
||||
// this can happen since document routing can be overridden.
|
||||
curr = new DocRouter.Range(-100,101);
|
||||
counts = new ArrayList<>();
|
||||
counts.add(new SplitOp.RangeCount(new DocRouter.Range(-1000,-990), 100));
|
||||
counts.add(new SplitOp.RangeCount(new DocRouter.Range(-980,-970), 2));
|
||||
counts.add(new SplitOp.RangeCount(new DocRouter.Range(10,15), 100));
|
||||
counts.add(new SplitOp.RangeCount(new DocRouter.Range(1000,1010), 5));
|
||||
counts.add(new SplitOp.RangeCount(new DocRouter.Range(1020,1030), 7));
|
||||
results = SplitOp.getSplits(counts, curr);
|
||||
assertEquals(12, results.iterator().next().max);
|
||||
verifyContiguous(results, curr);
|
||||
|
||||
|
||||
// splitting counts of [1,4,3] should result in [1,4],[3]
|
||||
// splitting count sof [3,4,1] should result in [3],[4,1]
|
||||
// The current implementation has specific code for the latter case (hence this is needed for code coverage)
|
||||
// The random tests *should* catch this as well though.
|
||||
curr = new DocRouter.Range(-100,101);
|
||||
counts = new ArrayList<>();
|
||||
counts.add(new SplitOp.RangeCount(new DocRouter.Range(0,9), 1));
|
||||
counts.add(new SplitOp.RangeCount(new DocRouter.Range(10,19), 4));
|
||||
counts.add(new SplitOp.RangeCount(new DocRouter.Range(20,29), 3));
|
||||
results = SplitOp.getSplits(counts, curr);
|
||||
assertEquals(19, results.iterator().next().max);
|
||||
verifyContiguous(results, curr);
|
||||
|
||||
curr = new DocRouter.Range(-100,101);
|
||||
counts = new ArrayList<>();
|
||||
counts.add(new SplitOp.RangeCount(new DocRouter.Range(0,9), 3));
|
||||
counts.add(new SplitOp.RangeCount(new DocRouter.Range(10,19), 4));
|
||||
counts.add(new SplitOp.RangeCount(new DocRouter.Range(20,29), 1));
|
||||
results = SplitOp.getSplits(counts, curr);
|
||||
assertEquals(9, results.iterator().next().max);
|
||||
verifyContiguous(results, curr);
|
||||
|
||||
|
||||
// test that if largest count is first
|
||||
curr = new DocRouter.Range(-100,101);
|
||||
counts = new ArrayList<>();
|
||||
counts.add(new SplitOp.RangeCount(new DocRouter.Range(0,9), 4));
|
||||
counts.add(new SplitOp.RangeCount(new DocRouter.Range(10,19), 1));
|
||||
counts.add(new SplitOp.RangeCount(new DocRouter.Range(20,29), 1));
|
||||
results = SplitOp.getSplits(counts, curr);
|
||||
assertEquals(9, results.iterator().next().max);
|
||||
verifyContiguous(results, curr);
|
||||
|
||||
// test that if largest count is last (this has specific code since we don't get over midpoint until the last range and then need to back up)
|
||||
curr = new DocRouter.Range(-100,101);
|
||||
counts = new ArrayList<>();
|
||||
counts.add(new SplitOp.RangeCount(new DocRouter.Range(0,9), 1));
|
||||
counts.add(new SplitOp.RangeCount(new DocRouter.Range(10,19), 1));
|
||||
counts.add(new SplitOp.RangeCount(new DocRouter.Range(20,29), 4));
|
||||
results = SplitOp.getSplits(counts, curr);
|
||||
assertEquals(19, results.iterator().next().max);
|
||||
verifyContiguous(results, curr);
|
||||
}
|
||||
|
||||
}
|
|
@ -1357,6 +1357,7 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
|
|||
protected String splitKey;
|
||||
protected String shard;
|
||||
protected String splitMethod;
|
||||
protected Boolean splitByPrefix;
|
||||
protected Integer numSubShards;
|
||||
protected Float splitFuzz;
|
||||
|
||||
|
@ -1420,6 +1421,15 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
|
|||
return this;
|
||||
}
|
||||
|
||||
public Boolean getSplitByPrefix() {
|
||||
return splitByPrefix;
|
||||
}
|
||||
|
||||
public SplitShard setSplitByPrefix(Boolean splitByPrefix) {
|
||||
this.splitByPrefix = splitByPrefix;
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public SolrParams getParams() {
|
||||
ModifiableSolrParams params = (ModifiableSolrParams) super.getParams();
|
||||
|
@ -1441,6 +1451,10 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
|
|||
params.set(CommonAdminParams.SPLIT_FUZZ, String.valueOf(splitFuzz));
|
||||
}
|
||||
|
||||
if (splitByPrefix != null) {
|
||||
params.set(CommonAdminParams.SPLIT_BY_PREFIX, splitByPrefix);
|
||||
}
|
||||
|
||||
if(properties != null) {
|
||||
addProperties(params, properties);
|
||||
}
|
||||
|
|
|
@ -73,6 +73,23 @@ public class CompositeIdRouter extends HashBasedRouter {
|
|||
return new KeyParser(routeKey).getRange();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Range getSearchRangeSingle(String shardKey, SolrParams params, DocCollection collection) {
|
||||
if (shardKey == null) {
|
||||
// search across whole range
|
||||
return fullRange();
|
||||
}
|
||||
String id = shardKey;
|
||||
|
||||
if (shardKey.indexOf(SEPARATOR) < 0) {
|
||||
// shardKey is a simple id, so don't do a range
|
||||
int hash = Hash.murmurhash3_x86_32(id, 0, id.length(), 0);
|
||||
return new Range(hash, hash);
|
||||
}
|
||||
|
||||
return new KeyParser(id).getRange();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Collection<Slice> getSearchSlicesSingle(String shardKey, SolrParams params, DocCollection collection) {
|
||||
if (shardKey == null) {
|
||||
|
|
|
@ -221,6 +221,14 @@ public abstract class DocRouter {
|
|||
**/
|
||||
public abstract Collection<Slice> getSearchSlicesSingle(String shardKey, SolrParams params, DocCollection collection);
|
||||
|
||||
/** This method is consulted to determine what search range (the part of the hash ring) should be queried for a request when
|
||||
* an explicit shards parameter was not used.
|
||||
* This method only accepts a single shard key (or null).
|
||||
*/
|
||||
public Range getSearchRangeSingle(String shardKey, SolrParams params, DocCollection collection) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
public abstract boolean isTargetSlice(String id, SolrInputDocument sdoc, SolrParams params, String shardId, DocCollection collection);
|
||||
|
||||
public abstract String getName();
|
||||
|
|
|
@ -27,6 +27,8 @@ public interface CommonAdminParams
|
|||
String IN_PLACE_MOVE = "inPlaceMove";
|
||||
/** Method to use for shard splitting. */
|
||||
String SPLIT_METHOD = "splitMethod";
|
||||
/** Check distribution of documents to prefixes in shard to determine how to split */
|
||||
String SPLIT_BY_PREFIX = "splitByPrefix";
|
||||
/** Number of sub-shards to create. **/
|
||||
String NUM_SUB_SHARDS = "numSubShards";
|
||||
/** Timeout for replicas to become active. */
|
||||
|
|
|
@ -85,6 +85,8 @@ public abstract class CoreAdminParams
|
|||
/** The hash ranges to be used to split a shard or an index */
|
||||
public final static String RANGES = "ranges";
|
||||
|
||||
public final static String GET_RANGES = "getRanges";
|
||||
|
||||
public static final String ROLES = "roles";
|
||||
|
||||
public static final String REQUESTID = "requestid";
|
||||
|
|
Loading…
Reference in New Issue