mirror of
https://github.com/apache/lucene.git
synced 2025-02-20 17:07:09 +00:00
SOLR-3755: basic shard splitting code
git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1379195 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
cfef80af55
commit
7754d1d105
@ -19,10 +19,13 @@ package org.apache.solr.handler.admin;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.Date;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
|
||||
@ -36,6 +39,7 @@ import org.apache.solr.cloud.ZkController;
|
||||
import org.apache.solr.common.SolrException;
|
||||
import org.apache.solr.common.SolrException.ErrorCode;
|
||||
import org.apache.solr.common.cloud.ClusterState;
|
||||
import org.apache.solr.common.cloud.HashPartitioner;
|
||||
import org.apache.solr.common.cloud.Slice;
|
||||
import org.apache.solr.common.cloud.ZkNodeProps;
|
||||
import org.apache.solr.common.cloud.ZkStateReader;
|
||||
@ -57,6 +61,7 @@ import org.apache.solr.request.SolrQueryRequest;
|
||||
import org.apache.solr.response.SolrQueryResponse;
|
||||
import org.apache.solr.search.SolrIndexSearcher;
|
||||
import org.apache.solr.update.MergeIndexesCommand;
|
||||
import org.apache.solr.update.SplitIndexCommand;
|
||||
import org.apache.solr.update.processor.UpdateRequestProcessor;
|
||||
import org.apache.solr.update.processor.UpdateRequestProcessorChain;
|
||||
import org.apache.solr.util.NumberUtils;
|
||||
@ -171,6 +176,11 @@ public class CoreAdminHandler extends RequestHandlerBase {
|
||||
break;
|
||||
}
|
||||
|
||||
case SPLIT: {
|
||||
doPersist = this.handleSplitAction(req, rsp);
|
||||
break;
|
||||
}
|
||||
|
||||
case PREPRECOVERY: {
|
||||
this.handleWaitForStateAction(req, rsp);
|
||||
break;
|
||||
@ -202,6 +212,62 @@ public class CoreAdminHandler extends RequestHandlerBase {
|
||||
rsp.setHttpCaching(false);
|
||||
}
|
||||
|
||||
|
||||
protected boolean handleSplitAction(SolrQueryRequest adminReq, SolrQueryResponse rsp) throws IOException {
|
||||
SolrParams params = adminReq.getParams();
|
||||
// partitions=N (split into N partitions, leaving it up to solr what the ranges are and where to put them)
|
||||
// path - multiValued param, or comma separated param? Only creates indexes, not cores
|
||||
|
||||
List<HashPartitioner.Range> ranges = null;
|
||||
// boolean closeDirectories = true;
|
||||
// DirectoryFactory dirFactory = null;
|
||||
|
||||
|
||||
String cname = params.get(CoreAdminParams.CORE, "");
|
||||
SolrCore core = coreContainer.getCore(cname);
|
||||
SolrQueryRequest req = new LocalSolrQueryRequest(core, params);
|
||||
try {
|
||||
|
||||
String[] pathsArr = params.getParams("path");
|
||||
List<String> paths = null;
|
||||
|
||||
String rangesStr = params.get("ranges"); // ranges=a-b,c-d,e-f
|
||||
|
||||
|
||||
// dirFactory = core.getDirectoryFactory();
|
||||
|
||||
|
||||
if (pathsArr != null) {
|
||||
|
||||
paths = Arrays.asList(pathsArr);
|
||||
|
||||
if (rangesStr == null) {
|
||||
HashPartitioner hp = new HashPartitioner();
|
||||
// should this be static?
|
||||
// TODO: use real range if we know it. If we don't know it, we should prob
|
||||
// split on every other doc rather than on a hash?
|
||||
ranges = hp.partitionRange(pathsArr.length, Integer.MIN_VALUE, Integer.MAX_VALUE);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
SplitIndexCommand cmd = new SplitIndexCommand(req, paths, ranges);
|
||||
core.getUpdateHandler().split(cmd);
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("ERROR executing split:", e);
|
||||
throw new RuntimeException(e);
|
||||
|
||||
} finally {
|
||||
if (req != null) req.close();
|
||||
if (core != null) core.close();
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
protected boolean handleMergeAction(SolrQueryRequest req, SolrQueryResponse rsp) throws IOException {
|
||||
SolrParams params = req.getParams();
|
||||
String cname = params.required().get(CoreAdminParams.CORE);
|
||||
|
@ -738,6 +738,13 @@ public class DirectUpdateHandler2 extends UpdateHandler implements SolrCoreState
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void split(SplitIndexCommand cmd) throws IOException {
|
||||
// TODO: do a commit first?
|
||||
SolrIndexSplitter splitter = new SolrIndexSplitter(cmd);
|
||||
splitter.split();
|
||||
}
|
||||
|
||||
/////////////////////////////////////////////////////////////////////
|
||||
// SolrInfoMBean stuff: Statistics and Module Info
|
||||
/////////////////////////////////////////////////////////////////////
|
||||
|
183
solr/core/src/java/org/apache/solr/update/SolrIndexSplitter.java
Normal file
183
solr/core/src/java/org/apache/solr/update/SolrIndexSplitter.java
Normal file
@ -0,0 +1,183 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.solr.update;
|
||||
|
||||
import org.apache.lucene.index.AtomicReader;
|
||||
import org.apache.lucene.index.AtomicReaderContext;
|
||||
import org.apache.lucene.index.DocsEnum;
|
||||
import org.apache.lucene.index.Fields;
|
||||
import org.apache.lucene.index.FilterAtomicReader;
|
||||
import org.apache.lucene.index.IndexReader;
|
||||
import org.apache.lucene.index.IndexWriter;
|
||||
import org.apache.lucene.index.Terms;
|
||||
import org.apache.lucene.index.TermsEnum;
|
||||
import org.apache.lucene.util.Bits;
|
||||
import org.apache.lucene.util.BytesRef;
|
||||
import org.apache.lucene.util.IOUtils;
|
||||
import org.apache.lucene.util.OpenBitSet;
|
||||
import org.apache.solr.common.cloud.HashPartitioner;
|
||||
import org.apache.solr.common.util.Hash;
|
||||
import org.apache.solr.core.SolrCore;
|
||||
import org.apache.solr.schema.SchemaField;
|
||||
import org.apache.solr.search.SolrIndexSearcher;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
public class SolrIndexSplitter {
|
||||
public static Logger log = LoggerFactory.getLogger(SolrIndexSplitter.class);
|
||||
|
||||
SolrIndexSearcher searcher;
|
||||
SchemaField field;
|
||||
List<HashPartitioner.Range> ranges;
|
||||
HashPartitioner.Range[] rangesArr; // same as ranges list, but an array for extra speed in inner loops
|
||||
List<String> paths;
|
||||
|
||||
public SolrIndexSplitter(SplitIndexCommand cmd) {
|
||||
field = cmd.getReq().getSchema().getUniqueKeyField();
|
||||
searcher = cmd.getReq().getSearcher();
|
||||
ranges = cmd.ranges;
|
||||
rangesArr = ranges.toArray(new HashPartitioner.Range[ranges.size()]);
|
||||
paths = cmd.paths;
|
||||
}
|
||||
|
||||
public void split() throws IOException {
|
||||
|
||||
List<AtomicReaderContext> leaves = searcher.getTopReaderContext().leaves();
|
||||
List<OpenBitSet[]> segmentDocSets = new ArrayList<OpenBitSet[]>(leaves.size());
|
||||
|
||||
log.info("SolrIndexSplitter: partitions=" + ranges.size() + " segments="+leaves.size());
|
||||
|
||||
for (AtomicReaderContext readerContext : leaves) {
|
||||
assert readerContext.ordInParent == segmentDocSets.size(); // make sure we're going in order
|
||||
OpenBitSet[] docSets = split(readerContext);
|
||||
segmentDocSets.add( docSets );
|
||||
}
|
||||
|
||||
|
||||
// would it be more efficient to write segment-at-a-time to each new index?
|
||||
// - need to worry about number of open descriptors
|
||||
// - need to worry about if IW.addIndexes does a sync or not...
|
||||
|
||||
IndexReader[] subReaders = new IndexReader[leaves.size()];
|
||||
for (int partitionNumber=0; partitionNumber<ranges.size(); partitionNumber++) {
|
||||
log.info("SolrIndexSplitter: partition #" + partitionNumber + " range=" + ranges.get(partitionNumber));
|
||||
|
||||
for (int segmentNumber = 0; segmentNumber<subReaders.length; segmentNumber++) {
|
||||
subReaders[segmentNumber] = new LiveDocsReader( leaves.get(segmentNumber), segmentDocSets.get(segmentNumber)[partitionNumber] );
|
||||
}
|
||||
|
||||
String path = paths.get(partitionNumber);
|
||||
boolean success = false;
|
||||
SolrCore core = searcher.getCore();
|
||||
IndexWriter iw = new SolrIndexWriter("SplittingIndexWriter"+partitionNumber + " " + ranges.get(partitionNumber), path,
|
||||
core.getDirectoryFactory(), true, core.getSchema(),
|
||||
core.getSolrConfig().indexConfig, core.getDeletionPolicy(), core.getCodec(), true);
|
||||
|
||||
try {
|
||||
iw.addIndexes(subReaders);
|
||||
// TODO: will many deletes have been removed, or should we optimize?
|
||||
success = true;
|
||||
} finally {
|
||||
if (success) {
|
||||
IOUtils.close(iw);
|
||||
} else {
|
||||
IOUtils.closeWhileHandlingException(iw);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
||||
OpenBitSet[] split(AtomicReaderContext readerContext) throws IOException {
|
||||
AtomicReader reader = readerContext.reader();
|
||||
OpenBitSet[] docSets = new OpenBitSet[ranges.size()];
|
||||
for (int i=0; i<docSets.length; i++) {
|
||||
docSets[i] = new OpenBitSet(reader.maxDoc());
|
||||
}
|
||||
Bits liveDocs = reader.getLiveDocs();
|
||||
|
||||
Fields fields = reader.fields();
|
||||
Terms terms = fields==null ? null : fields.terms(field.getName());
|
||||
TermsEnum termsEnum = terms==null ? null : terms.iterator(null);
|
||||
if (termsEnum == null) return docSets;
|
||||
|
||||
BytesRef term = null;
|
||||
DocsEnum docsEnum = null;
|
||||
|
||||
for (;;) {
|
||||
term = termsEnum.next();
|
||||
if (term == null) break;
|
||||
|
||||
// figure out the hash for the term
|
||||
// TODO: hook in custom hashes (or store hashes)
|
||||
int hash = Hash.murmurhash3_x86_32(term.bytes, term.offset, term.length, 0);
|
||||
|
||||
docsEnum = termsEnum.docs(liveDocs, docsEnum, 0x0);
|
||||
for (;;) {
|
||||
int doc = docsEnum.nextDoc();
|
||||
if (doc == DocsEnum.NO_MORE_DOCS) break;
|
||||
for (int i=0; i<rangesArr.length; i++) { // inner-loop: use array here for extra speed.
|
||||
if (rangesArr[i].includes(hash)) {
|
||||
docSets[i].fastSet(doc);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return docSets;
|
||||
}
|
||||
|
||||
|
||||
// change livedocs on the reader to delete those docs we don't want
|
||||
static class LiveDocsReader extends FilterAtomicReader {
|
||||
final OpenBitSet liveDocs;
|
||||
final int numDocs;
|
||||
|
||||
public LiveDocsReader(AtomicReaderContext context, OpenBitSet liveDocs) throws IOException {
|
||||
super(context.reader());
|
||||
this.liveDocs = liveDocs;
|
||||
this.numDocs = (int)liveDocs.cardinality();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int numDocs() {
|
||||
return numDocs;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasDeletions() {
|
||||
return (in.maxDoc() != numDocs);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Bits getLiveDocs() {
|
||||
return liveDocs;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
@ -0,0 +1,56 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.solr.update;
|
||||
|
||||
import org.apache.solr.common.cloud.HashPartitioner;
|
||||
import org.apache.solr.request.SolrQueryRequest;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* A merge indexes command encapsulated in an object.
|
||||
*
|
||||
* @since solr 1.4
|
||||
*
|
||||
*/
|
||||
public class SplitIndexCommand extends UpdateCommand {
|
||||
// public List<Directory> dirs;
|
||||
public List<String> paths;
|
||||
public List<HashPartitioner.Range> ranges;
|
||||
// TODO: allow specification of custom hash function
|
||||
|
||||
public SplitIndexCommand(SolrQueryRequest req, List<String> paths, List<HashPartitioner.Range> ranges) {
|
||||
super(req);
|
||||
this.paths = paths;
|
||||
this.ranges = ranges;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String name() {
|
||||
return "split";
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
StringBuilder sb = new StringBuilder(super.toString());
|
||||
sb.append(",paths=" + paths);
|
||||
sb.append(",ranges=" + ranges);
|
||||
sb.append('}');
|
||||
return sb.toString();
|
||||
}
|
||||
}
|
@ -179,4 +179,6 @@ public abstract class UpdateHandler implements SolrInfoMBean {
|
||||
{
|
||||
optimizeCallbacks.add( listener );
|
||||
}
|
||||
|
||||
public abstract void split(SplitIndexCommand cmd) throws IOException;
|
||||
}
|
||||
|
@ -27,12 +27,25 @@ public class TestHashPartitioner extends SolrTestCaseJ4 {
|
||||
|
||||
public void testMapHashes() throws Exception {
|
||||
HashPartitioner hp = new HashPartitioner();
|
||||
|
||||
for (int i = 1; i <= 30000; i++) {
|
||||
List<Range> ranges = hp.partitionRange(i);
|
||||
|
||||
List<Range> ranges;
|
||||
|
||||
// make sure the partitioner uses the "natural" boundaries and doesn't suffer from an off-by-one
|
||||
ranges = hp.partitionRange(2, Integer.MIN_VALUE, Integer.MAX_VALUE);
|
||||
assertEquals(Integer.MIN_VALUE, ranges.get(0).min);
|
||||
assertEquals(0x80000000, ranges.get(0).min);
|
||||
assertEquals(0xffffffff, ranges.get(0).max);
|
||||
assertEquals(0x00000000, ranges.get(1).min);
|
||||
assertEquals(0x7fffffff, ranges.get(1).max);
|
||||
|
||||
ranges = hp.partitionRange(2, 0, 0x7fffffff);
|
||||
assertEquals(0x00000000, ranges.get(0).min);
|
||||
assertEquals(0x3fffffff, ranges.get(0).max);
|
||||
assertEquals(0x40000000, ranges.get(1).min);
|
||||
assertEquals(0x7fffffff, ranges.get(1).max);
|
||||
|
||||
for (int i = 1; i <= 30000; i += 13) {
|
||||
ranges = hp.partitionRange(i, Integer.MIN_VALUE, Integer.MAX_VALUE);
|
||||
assertEquals(i, ranges.size());
|
||||
|
||||
assertTrue("First range does not start before " + Integer.MIN_VALUE
|
||||
+ " it is:" + ranges.get(0).min,
|
||||
ranges.get(0).min <= Integer.MIN_VALUE);
|
||||
|
@ -226,7 +226,7 @@ public class ClusterState implements JSONWriter.Writable {
|
||||
shardList.addAll(shards);
|
||||
Collections.sort(shardList);
|
||||
|
||||
ranges = hp.partitionRange(shards.size());
|
||||
ranges = hp.partitionRange(shards.size(), Integer.MIN_VALUE, Integer.MAX_VALUE);
|
||||
|
||||
rangeInfo.ranges = ranges;
|
||||
rangeInfo.shardList = shardList;
|
||||
@ -243,7 +243,7 @@ public class ClusterState implements JSONWriter.Writable {
|
||||
|
||||
int cnt = 0;
|
||||
for (Range range : rangInfo.ranges) {
|
||||
if (hash < range.max) {
|
||||
if (range.includes(hash)) {
|
||||
return rangInfo.shardList.get(cnt);
|
||||
}
|
||||
cnt++;
|
||||
|
@ -25,39 +25,57 @@ import java.util.List;
|
||||
*
|
||||
*/
|
||||
public class HashPartitioner {
|
||||
|
||||
|
||||
// Hash ranges can't currently "wrap" - i.e. max must be greater or equal to min.
|
||||
public static class Range {
|
||||
public long min;
|
||||
public long max;
|
||||
public int min; // inclusive
|
||||
public int max; // inclusive
|
||||
|
||||
public Range(long min, long max) {
|
||||
public Range(int min, int max) {
|
||||
this.min = min;
|
||||
this.max = max;
|
||||
}
|
||||
|
||||
public boolean includes(int hash) {
|
||||
return hash >= min && hash <= max;
|
||||
}
|
||||
|
||||
public String toString() {
|
||||
return Integer.toHexString(min) + '-' + Integer.toHexString(max);
|
||||
}
|
||||
|
||||
public static Range fromString(String range) {
|
||||
return null; // TODO
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* works up to 65537 before requested num of ranges is one short
|
||||
*
|
||||
*
|
||||
* @param partitions
|
||||
* @return Range for each partition
|
||||
*/
|
||||
public List<Range> partitionRange(int partitions) {
|
||||
// some hokey code to partition the int space
|
||||
long range = Integer.MAX_VALUE + (Math.abs((long) Integer.MIN_VALUE));
|
||||
long srange = range / partitions;
|
||||
|
||||
public List<Range> partitionRange(int partitions, int min, int max) {
|
||||
assert max >= min;
|
||||
long range = (long)max - (long)min;
|
||||
long srange = Math.max(1, range / partitions);
|
||||
|
||||
List<Range> ranges = new ArrayList<Range>(partitions);
|
||||
|
||||
long end = 0;
|
||||
long start = Integer.MIN_VALUE;
|
||||
|
||||
while (end < Integer.MAX_VALUE) {
|
||||
|
||||
long start = min;
|
||||
long end = start;
|
||||
|
||||
while (end < max) {
|
||||
end = start + srange;
|
||||
ranges.add(new Range(start, end));
|
||||
// make last range always end exactly on MAX_VALUE
|
||||
if (ranges.size() == partitions - 1) {
|
||||
end = max;
|
||||
}
|
||||
ranges.add(new Range((int)start, (int)end));
|
||||
start = end + 1L;
|
||||
}
|
||||
|
||||
|
||||
return ranges;
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -92,7 +92,8 @@ public interface CoreAdminParams
|
||||
SWAP,
|
||||
RENAME,
|
||||
MERGEINDEXES,
|
||||
PREPRECOVERY,
|
||||
SPLIT,
|
||||
PREPRECOVERY,
|
||||
REQUESTRECOVERY,
|
||||
REQUESTSYNCSHARD;
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user