LUCENE-1585: Allow to control how payloads are merged (trunk)

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@944220 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Shai Erera 2010-05-14 12:12:00 +00:00
parent f753fb2873
commit cb13ac0de9
10 changed files with 428 additions and 7 deletions

View File

@ -163,6 +163,13 @@ New features
applications that have many unique terms, since it reduces how often
a new segment must be flushed given a fixed RAM buffer size.
* LUCENE-1585: IndexWriter now accepts a PayloadProcessorProvider which can
return a DirPayloadProcessor for a given Directory, which returns a
PayloadProcessor for a given Term. The PayloadProcessor will be used to
process the payloads of the segments as they are merged (e.g. if one wants to
rewrite payloads of external indexes as they are added, or of local ones).
(Shai Erera, Michael Busch, Mike McCandless)
======================= Lucene 3.x (not yet released) =======================
Changes in backwards compatibility policy

View File

@ -20,6 +20,7 @@ package org.apache.lucene.index;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.document.Document;
import org.apache.lucene.index.IndexWriterConfig.OpenMode;
import org.apache.lucene.index.PayloadProcessorProvider.DirPayloadProcessor;
import org.apache.lucene.search.Similarity;
import org.apache.lucene.search.Query;
import org.apache.lucene.store.Directory;
@ -326,6 +327,9 @@ public class IndexWriter implements Closeable {
// to allow users to query an IndexWriter settings.
private final IndexWriterConfig config;
// The PayloadProcessorProvider to use when segments are merged
private PayloadProcessorProvider payloadProcessorProvider;
/**
* Expert: returns a readonly reader, covering all
* committed as well as un-committed changes to the index.
@ -3319,7 +3323,7 @@ public class IndexWriter implements Closeable {
try {
mergedName = newSegmentName();
merger = new SegmentMerger(directory, termIndexInterval, mergedName, null, codecs);
merger = new SegmentMerger(directory, termIndexInterval, mergedName, null, codecs, payloadProcessorProvider);
SegmentReader sReader = null;
synchronized(this) {
@ -4340,7 +4344,7 @@ public class IndexWriter implements Closeable {
if (infoStream != null)
message("merging " + merge.segString(directory));
merger = new SegmentMerger(directory, termIndexInterval, mergedName, merge, codecs);
merger = new SegmentMerger(directory, termIndexInterval, mergedName, merge, codecs, payloadProcessorProvider);
merge.readers = new SegmentReader[numSegments];
merge.readersClone = new SegmentReader[numSegments];
@ -4974,5 +4978,36 @@ public class IndexWriter implements Closeable {
deleter.deletePendingFiles();
deleter.revisitPolicy();
}
/**
* Sets the {@link PayloadProcessorProvider} to use when merging payloads.
* Note that the given <code>pcp</code> will be invoked for every segment that
* is merged, not only external ones that are given through
* {@link IndexWriter#addIndexes} or {@link IndexWriter#addIndexesNoOptimize}.
* If you want only the payloads of the external segments to be processed, you
* can return <code>null</code> whenever a {@link DirPayloadProcessor} is
* requested for the {@link Directory} of the {@link IndexWriter}.
* <p>
* The default is <code>null</code> which means payloads are processed
* normally (copied) during segment merges. You can also unset it by passing
* <code>null</code>.
* <p>
* <b>NOTE:</b> the set {@link PayloadProcessorProvider} will be in effect
* immediately, potentially for already running merges too. If you want to be
* sure it is used for further operations only, such as {@link #addIndexes} or
* {@link #optimize}, you can call {@link #waitForMerges()} before.
*/
public void setPayloadProcessorProvider(PayloadProcessorProvider pcp) {
payloadProcessorProvider = pcp;
}
/**
* Returns the {@link PayloadProcessorProvider} that is used during segment
* merges to process payloads.
*/
public PayloadProcessorProvider getPayloadProcessorProvider() {
return payloadProcessorProvider;
}
}

View File

@ -0,0 +1,81 @@
package org.apache.lucene.index;
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.io.IOException;
import org.apache.lucene.store.Directory;
import org.apache.lucene.util.BytesRef;
/**
* Provides a {@link DirPayloadProcessor} to be used for a {@link Directory}.
* This allows using differnt {@link DirPayloadProcessor}s for different
* directories, for e.g. to perform different processing of payloads of
* different directories.
* <p>
* <b>NOTE:</b> to avoid processing payloads of certain directories, you can
* return <code>null</code> in {@link #getDirProcessor}.
* <p>
* <b>NOTE:</b> it is possible that the same {@link DirPayloadProcessor} will be
* requested for the same {@link Directory} concurrently. Therefore, to avoid
* concurrency issues you should return different instances for different
* threads. Usually, if your {@link DirPayloadProcessor} does not maintain state
* this is not a problem. The merge code ensures that the
* {@link DirPayloadProcessor} instance you return will be accessed by one
* thread to obtain the {@link PayloadProcessor}s for different terms.
*
* @lucene.experimental
*/
public abstract class PayloadProcessorProvider {
/**
* Returns a {@link PayloadProcessor} for a given {@link Term} which allows
* processing the payloads of different terms differently. If you intent to
* process all your payloads the same way, then you can ignore the given term.
* <p>
* <b>NOTE:</b> if you protect your {@link DirPayloadProcessor} from
* concurrency issues, then you shouldn't worry about any such issues when
* {@link PayloadProcessor}s are requested for different terms.
*/
public static abstract class DirPayloadProcessor {
/** Returns a {@link PayloadProcessor} for the given term. */
public abstract PayloadProcessor getProcessor(String field, BytesRef text) throws IOException;
}
/**
* Processes the given payload.
*
* @lucene.experimental
*/
public static abstract class PayloadProcessor {
/** Process the incoming payload and stores the result in the given {@link BytesRef}. */
public abstract void processPayload(BytesRef payload) throws IOException;
}
/**
* Returns a {@link DirPayloadProcessor} for the given {@link Directory},
* through which {@link PayloadProcessor}s can be obtained for each
* {@link Term}, or <code>null</code> if none should be used.
*/
public abstract DirPayloadProcessor getDirProcessor(Directory dir) throws IOException;
}

View File

@ -79,7 +79,10 @@ final class SegmentMerger {
private Codec codec;
private SegmentWriteState segmentWriteState;
SegmentMerger(Directory dir, int termIndexInterval, String name, MergePolicy.OneMerge merge, CodecProvider codecs) {
private PayloadProcessorProvider payloadProcessorProvider;
SegmentMerger(Directory dir, int termIndexInterval, String name, MergePolicy.OneMerge merge, CodecProvider codecs, PayloadProcessorProvider payloadProcessorProvider) {
this.payloadProcessorProvider = payloadProcessorProvider;
directory = dir;
this.codecs = codecs;
segment = name;
@ -597,6 +600,9 @@ final class SegmentMerger {
mergeState.delCounts = new int[mergeState.readerCount];
mergeState.docMaps = new int[mergeState.readerCount][];
mergeState.docBase = new int[mergeState.readerCount];
mergeState.hasPayloadProcessorProvider = payloadProcessorProvider != null;
mergeState.dirPayloadProcessor = new PayloadProcessorProvider.DirPayloadProcessor[mergeState.readerCount];
mergeState.currentPayloadProcessor = new PayloadProcessorProvider.PayloadProcessor[mergeState.readerCount];
docBase = 0;
int inputDocBase = 0;
@ -629,6 +635,10 @@ final class SegmentMerger {
}
assert delCount == mergeState.delCounts[i]: "reader delCount=" + mergeState.delCounts[i] + " vs recomputed delCount=" + delCount;
}
if (payloadProcessorProvider != null) {
mergeState.dirPayloadProcessor[i] = payloadProcessorProvider.getDirProcessor(reader.directory());
}
}
starts[mergeState.readerCount] = inputDocBase;

View File

@ -110,7 +110,11 @@ public final class MappingMultiDocsAndPositionsEnum extends DocsAndPositionsEnum
@Override
public BytesRef getPayload() throws IOException {
return current.getPayload();
BytesRef payload = current.getPayload();
if (mergeState.currentPayloadProcessor[upto] != null) {
mergeState.currentPayloadProcessor[upto].processPayload(payload);
}
return payload;
}
@Override

View File

@ -20,6 +20,8 @@ package org.apache.lucene.index.codecs;
import org.apache.lucene.index.FieldInfo;
import org.apache.lucene.index.FieldInfos;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.PayloadProcessorProvider.DirPayloadProcessor;
import org.apache.lucene.index.PayloadProcessorProvider.PayloadProcessor;
import org.apache.lucene.util.Bits;
import java.util.List;
@ -38,5 +40,10 @@ public class MergeState {
// Updated per field;
public FieldInfo fieldInfo;
// Used to process payloads
public boolean hasPayloadProcessorProvider;
public DirPayloadProcessor[] dirPayloadProcessor;
public PayloadProcessor[] currentPayloadProcessor;
}

View File

@ -85,6 +85,14 @@ public abstract class TermsConsumer {
postingsEnumIn = (MultiDocsAndPositionsEnum) termsEnum.docsAndPositions(mergeState.multiDeletedDocs, postingsEnumIn);
if (postingsEnumIn != null) {
postingsEnum.reset(postingsEnumIn);
// set PayloadProcessor
if (mergeState.hasPayloadProcessorProvider) {
for (int i = 0; i < mergeState.readerCount; i++) {
if (mergeState.dirPayloadProcessor[i] != null) {
mergeState.currentPayloadProcessor[i] = mergeState.dirPayloadProcessor[i].getProcessor(mergeState.fieldInfo.name, term);
}
}
}
final PostingsConsumer postingsConsumer = startTerm(term);
final int numDocs = postingsConsumer.merge(mergeState, postingsEnum);
if (numDocs > 0) {

View File

@ -186,7 +186,7 @@ public class TestDoc extends LuceneTestCase {
SegmentReader r1 = SegmentReader.get(true, si1, IndexReader.DEFAULT_TERMS_INDEX_DIVISOR);
SegmentReader r2 = SegmentReader.get(true, si2, IndexReader.DEFAULT_TERMS_INDEX_DIVISOR);
SegmentMerger merger = new SegmentMerger(si1.dir, IndexWriter.DEFAULT_TERM_INDEX_INTERVAL, merged, null, CodecProvider.getDefault());
SegmentMerger merger = new SegmentMerger(si1.dir, IndexWriter.DEFAULT_TERM_INDEX_INTERVAL, merged, null, CodecProvider.getDefault(), null);
merger.add(r1);
merger.add(r2);

View File

@ -0,0 +1,269 @@
package org.apache.lucene.index;
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import static org.junit.Assert.*;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.apache.lucene.analysis.TokenStream;
import org.apache.lucene.analysis.WhitespaceAnalyzer;
import org.apache.lucene.analysis.tokenattributes.CharTermAttribute;
import org.apache.lucene.analysis.tokenattributes.PayloadAttribute;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.Field.Index;
import org.apache.lucene.document.Field.Store;
import org.apache.lucene.index.PayloadProcessorProvider.DirPayloadProcessor;
import org.apache.lucene.index.PayloadProcessorProvider.PayloadProcessor;
import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.MockRAMDirectory;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.LuceneTestCaseJ4;
import org.junit.Test;
public class TestPayloadProcessorProvider extends LuceneTestCaseJ4 {
private static final class PerDirPayloadProcessor extends PayloadProcessorProvider {
private Map<Directory, DirPayloadProcessor> processors;
public PerDirPayloadProcessor(Map<Directory, DirPayloadProcessor> processors) {
this.processors = processors;
}
@Override
public DirPayloadProcessor getDirProcessor(Directory dir) throws IOException {
return processors.get(dir);
}
}
private static final class PerTermPayloadProcessor extends DirPayloadProcessor {
@Override
public PayloadProcessor getProcessor(String field, BytesRef text) throws IOException {
// don't process payloads of terms other than "p:p1"
if (!field.equals("p") || !text.bytesEquals(new BytesRef("p1"))) {
return null;
}
// All other terms are processed the same way
return new DeletePayloadProcessor();
}
}
/** deletes the incoming payload */
private static final class DeletePayloadProcessor extends PayloadProcessor {
@Override
public void processPayload(BytesRef payload) throws IOException {
payload.length = 0;
}
}
private static final class PayloadTokenStream extends TokenStream {
private final PayloadAttribute payload = addAttribute(PayloadAttribute.class);
private final CharTermAttribute term = addAttribute(CharTermAttribute.class);
private boolean called = false;
private String t;
public PayloadTokenStream(String t) {
this.t = t;
}
@Override
public boolean incrementToken() throws IOException {
if (called) {
return false;
}
called = true;
byte[] p = new byte[] { 1 };
payload.setPayload(new Payload(p));
term.append(t);
return true;
}
@Override
public void reset() throws IOException {
super.reset();
called = false;
term.setEmpty();
}
}
private static final int NUM_DOCS = 10;
private IndexWriterConfig getConfig() {
return new IndexWriterConfig(TEST_VERSION_CURRENT, new WhitespaceAnalyzer(
TEST_VERSION_CURRENT));
}
private void populateDirs(Directory[] dirs, boolean multipleCommits)
throws IOException {
for (int i = 0; i < dirs.length; i++) {
dirs[i] = new MockRAMDirectory();
populateDocs(dirs[i], multipleCommits);
verifyPayloadExists(dirs[i], "p", new BytesRef("p1"), NUM_DOCS);
verifyPayloadExists(dirs[i], "p", new BytesRef("p2"), NUM_DOCS);
}
}
private void populateDocs(Directory dir, boolean multipleCommits)
throws IOException {
IndexWriter writer = new IndexWriter(dir, getConfig());
TokenStream payloadTS1 = new PayloadTokenStream("p1");
TokenStream payloadTS2 = new PayloadTokenStream("p2");
for (int i = 0; i < NUM_DOCS; i++) {
Document doc = new Document();
doc.add(new Field("id", "doc" + i, Store.NO, Index.NOT_ANALYZED_NO_NORMS));
doc.add(new Field("content", "doc content " + i, Store.NO, Index.ANALYZED));
doc.add(new Field("p", payloadTS1));
doc.add(new Field("p", payloadTS2));
writer.addDocument(doc);
if (multipleCommits && (i % 4 == 0)) {
writer.commit();
}
}
writer.close();
}
private void verifyPayloadExists(Directory dir, String field, BytesRef text, int numExpected)
throws IOException {
IndexReader reader = IndexReader.open(dir);
try {
int numPayloads = 0;
DocsAndPositionsEnum tpe = MultiFields.getTermPositionsEnum(reader, null, field, text);
while (tpe.nextDoc() != DocIdSetIterator.NO_MORE_DOCS) {
tpe.nextPosition();
if (tpe.hasPayload()) {
BytesRef payload = tpe.getPayload();
assertEquals(1, payload.length);
assertEquals(1, payload.bytes[0]);
++numPayloads;
}
}
assertEquals(numExpected, numPayloads);
} finally {
reader.close();
}
}
private void doTest(boolean addIndexesNoOptimize, boolean addToEmptyIndex,
int numExpectedPayloads, boolean multipleCommits) throws IOException {
Directory[] dirs = new Directory[2];
populateDirs(dirs, multipleCommits);
Directory dir = new MockRAMDirectory();
if (!addToEmptyIndex) {
populateDocs(dir, multipleCommits);
verifyPayloadExists(dir, "p", new BytesRef("p1"), NUM_DOCS);
verifyPayloadExists(dir, "p", new BytesRef("p2"), NUM_DOCS);
}
// Add two source dirs. By not adding the dest dir, we ensure its payloads
// won't get processed.
Map<Directory, DirPayloadProcessor> processors = new HashMap<Directory, DirPayloadProcessor>();
for (Directory d : dirs) {
processors.put(d, new PerTermPayloadProcessor());
}
IndexWriter writer = new IndexWriter(dir, getConfig());
writer.setPayloadProcessorProvider(new PerDirPayloadProcessor(processors));
if (!addIndexesNoOptimize) {
IndexReader[] readers = new IndexReader[dirs.length];
for (int i = 0; i < readers.length; i++) {
readers[i] = IndexReader.open(dirs[i]);
}
try {
writer.addIndexes(readers);
} finally {
for (IndexReader r : readers) {
r.close();
}
}
} else {
writer.addIndexesNoOptimize(dirs);
}
writer.close();
verifyPayloadExists(dir, "p", new BytesRef("p1"), numExpectedPayloads);
// the second term should always have all payloads
numExpectedPayloads = NUM_DOCS * dirs.length
+ (addToEmptyIndex ? 0 : NUM_DOCS);
verifyPayloadExists(dir, "p", new BytesRef("p2"), numExpectedPayloads);
}
@Test
public void testAddIndexes() throws Exception {
// addIndexes - single commit in each
doTest(false, true, 0, false);
// addIndexes - multiple commits in each
doTest(false, true, 0, true);
// addIndexesNoOptimize - single commit in each
doTest(true, true, 0, false);
// addIndexesNoOptimize - multiple commits in each
doTest(true, true, 0, true);
}
@Test
public void testAddIndexesIntoExisting() throws Exception {
// addIndexes - single commit in each
doTest(false, false, NUM_DOCS, false);
// addIndexes - multiple commits in each
doTest(false, false, NUM_DOCS, true);
// addIndexesNoOptimize - single commit in each
doTest(true, false, NUM_DOCS, false);
// addIndexesNoOptimize - multiple commits in each
doTest(true, false, NUM_DOCS, true);
}
@Test
public void testRegularMerges() throws Exception {
Directory dir = new MockRAMDirectory();
populateDocs(dir, true);
verifyPayloadExists(dir, "p", new BytesRef("p1"), NUM_DOCS);
verifyPayloadExists(dir, "p", new BytesRef("p2"), NUM_DOCS);
// Add two source dirs. By not adding the dest dir, we ensure its payloads
// won't get processed.
Map<Directory, DirPayloadProcessor> processors = new HashMap<Directory, DirPayloadProcessor>();
processors.put(dir, new PerTermPayloadProcessor());
IndexWriter writer = new IndexWriter(dir, getConfig());
writer.setPayloadProcessorProvider(new PerDirPayloadProcessor(processors));
writer.optimize();
writer.close();
verifyPayloadExists(dir, "p", new BytesRef("p1"), 0);
verifyPayloadExists(dir, "p", new BytesRef("p2"), NUM_DOCS);
}
}

View File

@ -65,7 +65,7 @@ public class TestSegmentMerger extends LuceneTestCase {
}
public void testMerge() throws IOException {
SegmentMerger merger = new SegmentMerger(mergedDir, IndexWriter.DEFAULT_TERM_INDEX_INTERVAL, mergedSegment, null, CodecProvider.getDefault());
SegmentMerger merger = new SegmentMerger(mergedDir, IndexWriter.DEFAULT_TERM_INDEX_INTERVAL, mergedSegment, null, CodecProvider.getDefault(), null);
merger.add(reader1);
merger.add(reader2);
int docsMerged = merger.merge();