mirror of https://github.com/apache/lucene.git
LUCENE-1959 Add MultiPassIndexSplitter.
git-svn-id: https://svn.apache.org/repos/asf/lucene/java/trunk@824798 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
701d8d0dfd
commit
7d0d4ecc44
|
@ -41,10 +41,13 @@ New features
|
|||
segment merges to give better search performance in a mixed
|
||||
indexing/searching environment. (John Wang via Mike McCandless)
|
||||
|
||||
* LUCENE-1959: Add IndexSplitter tool, to copy specific segments out
|
||||
of the index into a new index. It can also list the segments in
|
||||
the index, and delete specified segments. (Jason Rutherglen via
|
||||
Mike McCandless)
|
||||
* LUCENE-1959: Add index splitting tools. The IndexSplitter tool works
|
||||
on multi-segment (non optimized) indexes and it can copy specific
|
||||
segments out of the index into a new index. It can also list the
|
||||
segments in the index, and delete specified segments. (Jason Rutherglen via
|
||||
Mike McCandless). MultiPassIndexSplitter can split any index into
|
||||
any number of output parts, at the cost of doing multiple passes over
|
||||
the input index. (Andrzej Bialecki)
|
||||
|
||||
Optimizations
|
||||
|
||||
|
|
|
@ -0,0 +1,236 @@
|
|||
package org.apache.lucene.index;
|
||||
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
|
||||
import org.apache.lucene.analysis.WhitespaceAnalyzer;
|
||||
import org.apache.lucene.index.IndexWriter.MaxFieldLength;
|
||||
import org.apache.lucene.store.Directory;
|
||||
import org.apache.lucene.store.FSDirectory;
|
||||
import org.apache.lucene.util.OpenBitSet;
|
||||
|
||||
/**
|
||||
* This tool splits input index into multiple equal parts. The method employed
|
||||
* here uses {@link IndexWriter#addIndexes(IndexReader[])} where the input data
|
||||
* comes from the input index with artificially applied deletes to the document
|
||||
* id-s that fall outside the selected partition.
|
||||
* <p>Note 1: Deletes are only applied to a buffered list of deleted docs and
|
||||
* don't affect the source index - this tool works also with read-only indexes.
|
||||
* <p>Note 2: the disadvantage of this tool is that source index needs to be
|
||||
* read as many times as there are parts to be created, hence the name of this
|
||||
* tool.
|
||||
*/
|
||||
public class MultiPassIndexSplitter {
|
||||
|
||||
/**
|
||||
* Split source index into multiple parts.
|
||||
* @param input source index, can be read-only, can have deletions, can have
|
||||
* multiple segments (or multiple readers).
|
||||
* @param outputs list of directories where the output parts will be stored.
|
||||
* @param seq if true, then the source index will be split into equal
|
||||
* increasing ranges of document id-s. If false, source document id-s will be
|
||||
* assigned in a deterministic round-robin fashion to one of the output splits.
|
||||
* @throws IOException
|
||||
*/
|
||||
public void split(IndexReader input, Directory[] outputs, boolean seq) throws IOException {
|
||||
if (outputs == null || outputs.length < 2) {
|
||||
throw new IOException("Invalid number of outputs.");
|
||||
}
|
||||
if (input == null || input.numDocs() < 2) {
|
||||
throw new IOException("Not enough documents for splitting");
|
||||
}
|
||||
int numParts = outputs.length;
|
||||
// wrap a potentially read-only input
|
||||
// this way we don't have to preserve original deletions because neither
|
||||
// deleteDocument(int) or undeleteAll() is applied to the wrapped input index.
|
||||
input = new FakeDeleteIndexReader(input);
|
||||
int maxDoc = input.maxDoc();
|
||||
int partLen = maxDoc / numParts;
|
||||
for (int i = 0; i < numParts; i++) {
|
||||
input.undeleteAll();
|
||||
if (seq) { // sequential range
|
||||
int lo = partLen * i;
|
||||
int hi = lo + partLen;
|
||||
// below range
|
||||
for (int j = 0; j < lo; j++) {
|
||||
input.deleteDocument(j);
|
||||
}
|
||||
// above range - last part collects all id-s that remained due to
|
||||
// integer rounding errors
|
||||
if (i < numParts - 1) {
|
||||
for (int j = hi; j < maxDoc; j++) {
|
||||
input.deleteDocument(j);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// round-robin
|
||||
for (int j = 0; j < maxDoc; j++) {
|
||||
if ((j + numParts - i) % numParts != 0) {
|
||||
input.deleteDocument(j);
|
||||
}
|
||||
}
|
||||
}
|
||||
IndexWriter w = new IndexWriter(outputs[i], new WhitespaceAnalyzer(),
|
||||
true, MaxFieldLength.UNLIMITED);
|
||||
System.err.println("Writing part " + (i + 1) + " ...");
|
||||
w.addIndexes(new IndexReader[]{input});
|
||||
w.close();
|
||||
}
|
||||
System.err.println("Done.");
|
||||
}
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
if (args.length < 5) {
|
||||
System.err.println("Usage: MultiPassIndexSplitter -out <outputDir> -num <numParts> [-seq] <inputIndex1> [<inputIndex2 ...]");
|
||||
System.err.println("\tinputIndex\tpath to input index, multiple values are ok");
|
||||
System.err.println("\t-out ouputDir\tpath to output directory to contain partial indexes");
|
||||
System.err.println("\t-num numParts\tnumber of parts to produce");
|
||||
System.err.println("\t-seq\tsequential docid-range split (default is round-robin)");
|
||||
System.exit(-1);
|
||||
}
|
||||
ArrayList<IndexReader> indexes = new ArrayList<IndexReader>();
|
||||
String outDir = null;
|
||||
int numParts = -1;
|
||||
boolean seq = false;
|
||||
for (int i = 0; i < args.length; i++) {
|
||||
if (args[i].equals("-out")) {
|
||||
outDir = args[++i];
|
||||
} else if (args[i].equals("-num")) {
|
||||
numParts = Integer.parseInt(args[++i]);
|
||||
} else if (args[i].equals("-seq")) {
|
||||
seq = true;
|
||||
} else {
|
||||
File file = new File(args[i]);
|
||||
if (!file.exists() || !file.isDirectory()) {
|
||||
System.err.println("Invalid input path - skipping: " + file);
|
||||
continue;
|
||||
}
|
||||
Directory dir = FSDirectory.open(new File(args[i]));
|
||||
try {
|
||||
if (!IndexReader.indexExists(dir)) {
|
||||
System.err.println("Invalid input index - skipping: " + file);
|
||||
continue;
|
||||
}
|
||||
} catch (Exception e) {
|
||||
System.err.println("Invalid input index - skipping: " + file);
|
||||
continue;
|
||||
}
|
||||
indexes.add(IndexReader.open(dir, true));
|
||||
}
|
||||
}
|
||||
if (outDir == null) {
|
||||
throw new Exception("Required argument missing: -out outputDir");
|
||||
}
|
||||
if (numParts < 2) {
|
||||
throw new Exception("Invalid value of required argument: -num numParts");
|
||||
}
|
||||
if (indexes.size() == 0) {
|
||||
throw new Exception("No input indexes to process");
|
||||
}
|
||||
File out = new File(outDir);
|
||||
if (!out.mkdirs()) {
|
||||
throw new Exception("Can't create output directory: " + out);
|
||||
}
|
||||
Directory[] dirs = new Directory[numParts];
|
||||
for (int i = 0; i < numParts; i++) {
|
||||
dirs[i] = FSDirectory.open(new File(out, "part-" + i));
|
||||
}
|
||||
MultiPassIndexSplitter splitter = new MultiPassIndexSplitter();
|
||||
IndexReader input;
|
||||
if (indexes.size() == 1) {
|
||||
input = indexes.get(0);
|
||||
} else {
|
||||
input = new MultiReader((IndexReader[])indexes.toArray(new IndexReader[indexes.size()]));
|
||||
}
|
||||
splitter.split(input, dirs, seq);
|
||||
}
|
||||
|
||||
/**
|
||||
* This class pretends that it can write deletions to the underlying index.
|
||||
* Instead, deletions are buffered in a bitset and overlaid with the original
|
||||
* list of deletions.
|
||||
*/
|
||||
public static class FakeDeleteIndexReader extends FilterIndexReader {
|
||||
OpenBitSet dels;
|
||||
OpenBitSet oldDels = null;
|
||||
|
||||
public FakeDeleteIndexReader(IndexReader in) {
|
||||
super(in);
|
||||
dels = new OpenBitSet(in.maxDoc());
|
||||
if (in.hasDeletions()) {
|
||||
oldDels = new OpenBitSet(in.maxDoc());
|
||||
for (int i = 0; i < in.maxDoc(); i++) {
|
||||
if (in.isDeleted(i)) oldDels.set(i);
|
||||
}
|
||||
dels.or(oldDels);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public int numDocs() {
|
||||
return in.maxDoc() - (int)dels.cardinality();
|
||||
}
|
||||
|
||||
/**
|
||||
* Just removes our overlaid deletions - does not undelete the original
|
||||
* deletions.
|
||||
*/
|
||||
@Override
|
||||
protected void doUndeleteAll() throws CorruptIndexException, IOException {
|
||||
dels = new OpenBitSet(in.maxDoc());
|
||||
if (oldDels != null) {
|
||||
dels.or(oldDels);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doDelete(int n) throws CorruptIndexException, IOException {
|
||||
dels.set(n);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasDeletions() {
|
||||
return !dels.isEmpty();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isDeleted(int n) {
|
||||
return dels.get(n);
|
||||
}
|
||||
|
||||
@Override
|
||||
public TermPositions termPositions() throws IOException {
|
||||
return new FilterTermPositions(in.termPositions()) {
|
||||
|
||||
@Override
|
||||
public boolean next() throws IOException {
|
||||
boolean res;
|
||||
while ((res = super.next())) {
|
||||
if (!dels.get(doc())) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
return res;
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,127 @@
|
|||
package org.apache.lucene.index;
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import org.apache.lucene.analysis.WhitespaceAnalyzer;
|
||||
import org.apache.lucene.document.Document;
|
||||
import org.apache.lucene.document.Field;
|
||||
import org.apache.lucene.index.IndexWriter.MaxFieldLength;
|
||||
import org.apache.lucene.store.Directory;
|
||||
import org.apache.lucene.store.RAMDirectory;
|
||||
|
||||
import junit.framework.TestCase;
|
||||
|
||||
public class TestMultiPassIndexSplitter extends TestCase {
|
||||
IndexReader input;
|
||||
int NUM_DOCS = 11;
|
||||
|
||||
public void setUp() throws Exception {
|
||||
RAMDirectory dir = new RAMDirectory();
|
||||
IndexWriter w = new IndexWriter(dir, new WhitespaceAnalyzer(), true,
|
||||
MaxFieldLength.LIMITED);
|
||||
Document doc;
|
||||
for (int i = 0; i < NUM_DOCS; i++) {
|
||||
doc = new Document();
|
||||
doc.add(new Field("id", i + "", Field.Store.YES, Field.Index.NOT_ANALYZED));
|
||||
doc.add(new Field("f", i + " " + i, Field.Store.YES, Field.Index.ANALYZED));
|
||||
w.addDocument(doc);
|
||||
}
|
||||
w.close();
|
||||
input = IndexReader.open(dir, false);
|
||||
// delete the last doc
|
||||
input.deleteDocument(input.maxDoc() - 1);
|
||||
input = input.reopen(true);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test round-robin splitting.
|
||||
*/
|
||||
public void testSplitRR() throws Exception {
|
||||
MultiPassIndexSplitter splitter = new MultiPassIndexSplitter();
|
||||
Directory[] dirs = new Directory[]{
|
||||
new RAMDirectory(),
|
||||
new RAMDirectory(),
|
||||
new RAMDirectory()
|
||||
};
|
||||
splitter.split(input, dirs, false);
|
||||
IndexReader ir;
|
||||
ir = IndexReader.open(dirs[0], true);
|
||||
assertTrue(ir.numDocs() - NUM_DOCS / 3 <= 1); // rounding error
|
||||
Document doc = ir.document(0);
|
||||
assertEquals("0", doc.get("id"));
|
||||
Term t;
|
||||
TermEnum te;
|
||||
t = new Term("id", "1");
|
||||
te = ir.terms(t);
|
||||
assertNotSame(t, te.term());
|
||||
ir.close();
|
||||
ir = IndexReader.open(dirs[1], true);
|
||||
assertTrue(ir.numDocs() - NUM_DOCS / 3 <= 1);
|
||||
doc = ir.document(0);
|
||||
assertEquals("1", doc.get("id"));
|
||||
t = new Term("id", "0");
|
||||
te = ir.terms(t);
|
||||
assertNotSame(t, te.term());
|
||||
ir.close();
|
||||
ir = IndexReader.open(dirs[2], true);
|
||||
assertTrue(ir.numDocs() - NUM_DOCS / 3 <= 1);
|
||||
doc = ir.document(0);
|
||||
assertEquals("2", doc.get("id"));
|
||||
t = new Term("id", "1");
|
||||
te = ir.terms(t);
|
||||
assertNotSame(t, te.term());
|
||||
t = new Term("id", "0");
|
||||
te = ir.terms(t);
|
||||
assertNotSame(t, te.term());
|
||||
}
|
||||
|
||||
/**
|
||||
* Test sequential splitting.
|
||||
*/
|
||||
public void testSplitSeq() throws Exception {
|
||||
MultiPassIndexSplitter splitter = new MultiPassIndexSplitter();
|
||||
Directory[] dirs = new Directory[]{
|
||||
new RAMDirectory(),
|
||||
new RAMDirectory(),
|
||||
new RAMDirectory()
|
||||
};
|
||||
splitter.split(input, dirs, true);
|
||||
IndexReader ir;
|
||||
ir = IndexReader.open(dirs[0], true);
|
||||
assertTrue(ir.numDocs() - NUM_DOCS / 3 <= 1);
|
||||
Document doc = ir.document(0);
|
||||
assertEquals("0", doc.get("id"));
|
||||
int start = ir.numDocs();
|
||||
ir.close();
|
||||
ir = IndexReader.open(dirs[1], true);
|
||||
assertTrue(ir.numDocs() - NUM_DOCS / 3 <= 1);
|
||||
doc = ir.document(0);
|
||||
assertEquals(start + "", doc.get("id"));
|
||||
start += ir.numDocs();
|
||||
ir.close();
|
||||
ir = IndexReader.open(dirs[2], true);
|
||||
assertTrue(ir.numDocs() - NUM_DOCS / 3 <= 1);
|
||||
doc = ir.document(0);
|
||||
assertEquals(start + "", doc.get("id"));
|
||||
// make sure the deleted doc is not here
|
||||
Term t;
|
||||
TermEnum te;
|
||||
t = new Term("id", (NUM_DOCS - 1) + "");
|
||||
te = ir.terms(t);
|
||||
assertNotSame(t, te.term());
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue