LUCENE-4304: remove PayloadProcessorProvider, map facet ords using a FilterAtomicReader

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1372858 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Muir 2012-08-14 12:33:36 +00:00
parent 0eda73aa3c
commit 81bff57343
15 changed files with 261 additions and 680 deletions

View File

@ -35,6 +35,13 @@ API Changes
the leaf atomic reader contexts for all readers in the tree.
(Uwe Schindler, Robert Muir)
* LUCENE-4304: removed PayloadProcessorProvider. If you want to change
payloads (or other things) when merging indexes, its recommended
to just use a FilterAtomicReader + IndexWriter.addIndexes. See the
OrdinalMappingAtomicReader and TaxonomyMergeUtils in the facets
module if you want an example of this.
(Mike McCandless, Uwe Schindler, Shai Erera, Robert Muir)
Bug Fixes
* LUCENE-4297: BooleanScorer2 would multiply the coord() factor

View File

@ -123,18 +123,7 @@ public final class MappingMultiDocsAndPositionsEnum extends DocsAndPositionsEnum
@Override
public BytesRef getPayload() throws IOException {
BytesRef payload = current.getPayload();
if (mergeState.currentPayloadProcessor[upto] != null && payload != null) {
// to not violate the D&P api, we must give the processor a private copy
// TODO: reuse a BytesRef if there is a PPP
payload = BytesRef.deepCopyOf(payload);
mergeState.currentPayloadProcessor[upto].processPayload(payload);
if (payload.length == 0) {
// don't let PayloadProcessors corrumpt the index
return null;
}
}
return payload;
return current.getPayload();
}
}

View File

@ -27,8 +27,6 @@ import org.apache.lucene.index.FieldInfo;
import org.apache.lucene.index.FieldInfos;
import org.apache.lucene.index.Fields;
import org.apache.lucene.index.MergeState;
import org.apache.lucene.index.PayloadProcessorProvider.PayloadProcessor;
import org.apache.lucene.index.PayloadProcessorProvider.ReaderPayloadProcessor;
import org.apache.lucene.index.Terms;
import org.apache.lucene.index.TermsEnum;
import org.apache.lucene.search.DocIdSetIterator;
@ -170,12 +168,7 @@ public abstract class TermVectorsWriter implements Closeable {
final AtomicReader reader = mergeState.readers.get(i);
final int maxDoc = reader.maxDoc();
final Bits liveDocs = reader.getLiveDocs();
// set PayloadProcessor
if (mergeState.payloadProcessorProvider != null) {
mergeState.currentReaderPayloadProcessor = mergeState.readerPayloadProcessor[i];
} else {
mergeState.currentReaderPayloadProcessor = null;
}
for (int docID = 0; docID < maxDoc; docID++) {
if (liveDocs != null && !liveDocs.get(docID)) {
// skip deleted docs
@ -215,9 +208,6 @@ public abstract class TermVectorsWriter implements Closeable {
TermsEnum termsEnum = null;
DocsAndPositionsEnum docsAndPositionsEnum = null;
final ReaderPayloadProcessor readerPayloadProcessor = mergeState.currentReaderPayloadProcessor;
PayloadProcessor payloadProcessor = null;
for(String fieldName : vectors) {
final FieldInfo fieldInfo = mergeState.fieldInfos.fieldInfo(fieldName);
@ -250,10 +240,6 @@ public abstract class TermVectorsWriter implements Closeable {
final int freq = (int) termsEnum.totalTermFreq();
startTerm(termsEnum.term(), freq);
if (hasPayloads && readerPayloadProcessor != null) {
payloadProcessor = readerPayloadProcessor.getProcessor(fieldName, termsEnum.term());
}
if (hasPositions || hasOffsets) {
docsAndPositionsEnum = termsEnum.docsAndPositions(null, docsAndPositionsEnum);
@ -268,17 +254,7 @@ public abstract class TermVectorsWriter implements Closeable {
final int startOffset = docsAndPositionsEnum.startOffset();
final int endOffset = docsAndPositionsEnum.endOffset();
BytesRef payload = docsAndPositionsEnum.getPayload();
if (payloadProcessor != null && payload != null) {
// to not violate the D&P api, we must give the processor a private copy
payload = BytesRef.deepCopyOf(payload);
payloadProcessor.processPayload(payload);
if (payload.length == 0) {
// don't let PayloadProcessors corrumpt the index
payload = null;
}
}
final BytesRef payload = docsAndPositionsEnum.getPayload();
assert !hasPositions || pos >= 0;
addPosition(pos, startOffset, endOffset, payload);

View File

@ -154,14 +154,7 @@ public abstract class TermsConsumer {
postingsEnumIn = (MultiDocsAndPositionsEnum) termsEnum.docsAndPositions(null, postingsEnumIn, DocsAndPositionsEnum.FLAG_PAYLOADS);
assert postingsEnumIn != null;
postingsEnum.reset(postingsEnumIn);
// set PayloadProcessor
if (mergeState.payloadProcessorProvider != null) {
for (int i = 0; i < mergeState.readers.size(); i++) {
if (mergeState.readerPayloadProcessor[i] != null) {
mergeState.currentPayloadProcessor[i] = mergeState.readerPayloadProcessor[i].getProcessor(mergeState.fieldInfo.name, term);
}
}
}
final PostingsConsumer postingsConsumer = startTerm(term);
final TermStats stats = postingsConsumer.merge(mergeState, postingsEnum, visitedDocs);
if (stats.docFreq > 0) {
@ -188,14 +181,7 @@ public abstract class TermsConsumer {
postingsEnumIn = (MultiDocsAndPositionsEnum) termsEnum.docsAndPositions(null, postingsEnumIn);
assert postingsEnumIn != null;
postingsEnum.reset(postingsEnumIn);
// set PayloadProcessor
if (mergeState.payloadProcessorProvider != null) {
for (int i = 0; i < mergeState.readers.size(); i++) {
if (mergeState.readerPayloadProcessor[i] != null) {
mergeState.currentPayloadProcessor[i] = mergeState.readerPayloadProcessor[i].getProcessor(mergeState.fieldInfo.name, term);
}
}
}
final PostingsConsumer postingsConsumer = startTerm(term);
final TermStats stats = postingsConsumer.merge(mergeState, postingsEnum, visitedDocs);
if (stats.docFreq > 0) {

View File

@ -315,12 +315,7 @@ public final class Lucene40TermVectorsWriter extends TermVectorsWriter {
int numDocs = 0;
for (int i = 0; i < mergeState.readers.size(); i++) {
final AtomicReader reader = mergeState.readers.get(i);
// set PayloadProcessor
if (mergeState.payloadProcessorProvider != null) {
mergeState.currentReaderPayloadProcessor = mergeState.readerPayloadProcessor[i];
} else {
mergeState.currentReaderPayloadProcessor = null;
}
final SegmentReader matchingSegmentReader = mergeState.matchingSegmentReaders[idx++];
Lucene40TermVectorsReader matchingVectorsReader = null;
if (matchingSegmentReader != null) {
@ -353,8 +348,8 @@ public final class Lucene40TermVectorsWriter extends TermVectorsWriter {
final int maxDoc = reader.maxDoc();
final Bits liveDocs = reader.getLiveDocs();
int totalNumDocs = 0;
if (matchingVectorsReader != null && mergeState.currentReaderPayloadProcessor == null) {
// We can bulk-copy because the fieldInfos are "congruent" and there is no payload processor
if (matchingVectorsReader != null) {
// We can bulk-copy because the fieldInfos are "congruent"
for (int docNum = 0; docNum < maxDoc;) {
if (!liveDocs.get(docNum)) {
// skip deleted docs
@ -404,8 +399,8 @@ public final class Lucene40TermVectorsWriter extends TermVectorsWriter {
int rawDocLengths2[])
throws IOException {
final int maxDoc = reader.maxDoc();
if (matchingVectorsReader != null && mergeState.currentReaderPayloadProcessor == null) {
// We can bulk-copy because the fieldInfos are "congruent" and there is no payload processor
if (matchingVectorsReader != null) {
// We can bulk-copy because the fieldInfos are "congruent"
int docCount = 0;
while (docCount < maxDoc) {
int len = Math.min(MAX_RAW_MERGE_DOCS, maxDoc - docCount);

View File

@ -260,9 +260,6 @@ public class IndexWriter implements Closeable, TwoPhaseCommit {
// to allow users to query an IndexWriter settings.
private final LiveIndexWriterConfig config;
// The PayloadProcessorProvider to use when segments are merged
private PayloadProcessorProvider payloadProcessorProvider;
DirectoryReader getReader() throws IOException {
return getReader(true);
}
@ -2406,8 +2403,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit {
false, codec, null, null);
SegmentMerger merger = new SegmentMerger(info, infoStream, trackingDir, config.getTermIndexInterval(),
MergeState.CheckAbort.NONE, payloadProcessorProvider,
globalFieldNumberMap, context);
MergeState.CheckAbort.NONE, globalFieldNumberMap, context);
for (IndexReader reader : readers) { // add new indexes
merger.add(reader);
@ -3510,7 +3506,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit {
final TrackingDirectoryWrapper dirWrapper = new TrackingDirectoryWrapper(directory);
SegmentMerger merger = new SegmentMerger(merge.info.info, infoStream, dirWrapper, config.getTermIndexInterval(), checkAbort,
payloadProcessorProvider, globalFieldNumberMap, context);
globalFieldNumberMap, context);
if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "merging " + segString(merge.segments));
@ -4065,38 +4061,6 @@ public class IndexWriter implements Closeable, TwoPhaseCommit {
synchronized void deletePendingFiles() throws IOException {
deleter.deletePendingFiles();
}
/**
* Sets the {@link PayloadProcessorProvider} to use when merging payloads.
* Note that the given <code>pcp</code> will be invoked for every segment that
* is merged, not only external ones that are given through
* {@link #addIndexes}. If you want only the payloads of the external segments
* to be processed, you can return <code>null</code> whenever a
* {@link PayloadProcessorProvider.ReaderPayloadProcessor} is requested for the {@link Directory} of the
* {@link IndexWriter}.
* <p>
* The default is <code>null</code> which means payloads are processed
* normally (copied) during segment merges. You can also unset it by passing
* <code>null</code>.
* <p>
* <b>NOTE:</b> the set {@link PayloadProcessorProvider} will be in effect
* immediately, potentially for already running merges too. If you want to be
* sure it is used for further operations only, such as {@link #addIndexes} or
* {@link #forceMerge}, you can call {@link #waitForMerges()} before.
*/
public void setPayloadProcessorProvider(PayloadProcessorProvider pcp) {
ensureOpen();
payloadProcessorProvider = pcp;
}
/**
* Returns the {@link PayloadProcessorProvider} that is used during segment
* merges to process payloads.
*/
public PayloadProcessorProvider getPayloadProcessorProvider() {
ensureOpen();
return payloadProcessorProvider;
}
/**
* NOTE: this method creates a compound file for all files returned by

View File

@ -19,8 +19,6 @@ package org.apache.lucene.index;
import java.util.List;
import org.apache.lucene.index.PayloadProcessorProvider.PayloadProcessor;
import org.apache.lucene.index.PayloadProcessorProvider.ReaderPayloadProcessor;
import org.apache.lucene.store.Directory;
import org.apache.lucene.util.Bits;
import org.apache.lucene.util.InfoStream;
@ -194,14 +192,6 @@ public class MergeState {
// Updated per field;
public FieldInfo fieldInfo;
// Used to process payloads
// TODO: this is a FactoryFactory here basically
// and we could make a codec(wrapper) to do all of this privately so IW is uninvolved
public PayloadProcessorProvider payloadProcessorProvider;
public ReaderPayloadProcessor[] readerPayloadProcessor;
public ReaderPayloadProcessor currentReaderPayloadProcessor;
public PayloadProcessor[] currentPayloadProcessor;
// TODO: get rid of this? it tells you which segments are 'aligned' (e.g. for bulk merging)
// but is this really so expensive to compute again in different components, versus once in SM?
public SegmentReader[] matchingSegmentReaders;

View File

@ -1,81 +0,0 @@
package org.apache.lucene.index;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.io.IOException;
import org.apache.lucene.store.Directory;
import org.apache.lucene.util.BytesRef;
/**
* Provides a {@link ReaderPayloadProcessor} to be used for a {@link Directory}.
* This allows using different {@link ReaderPayloadProcessor}s for different
* source {@link AtomicReader}, for e.g. to perform different processing of payloads of
* different directories.
* <p>
* <b>NOTE:</b> to avoid processing payloads of certain directories, you can
* return <code>null</code> in {@link #getReaderProcessor}.
* <p>
* <b>NOTE:</b> it is possible that the same {@link ReaderPayloadProcessor} will be
* requested for the same {@link Directory} concurrently. Therefore, to avoid
* concurrency issues you should return different instances for different
* threads. Usually, if your {@link ReaderPayloadProcessor} does not maintain state
* this is not a problem. The merge code ensures that the
* {@link ReaderPayloadProcessor} instance you return will be accessed by one
* thread to obtain the {@link PayloadProcessor}s for different terms.
*
* @lucene.experimental
*/
public abstract class PayloadProcessorProvider {
/**
* Returns a {@link PayloadProcessor} for a given {@link Term} which allows
* processing the payloads of different terms differently. If you intent to
* process all your payloads the same way, then you can ignore the given term.
* <p>
* <b>NOTE:</b> if you protect your {@link ReaderPayloadProcessor} from
* concurrency issues, then you shouldn't worry about any such issues when
* {@link PayloadProcessor}s are requested for different terms.
*/
public static abstract class ReaderPayloadProcessor {
/** Returns a {@link PayloadProcessor} for the given term. */
public abstract PayloadProcessor getProcessor(String field, BytesRef text) throws IOException;
}
/**
* Processes the given payload.
*
* @lucene.experimental
*/
public static abstract class PayloadProcessor {
/** Process the incoming payload and stores the result in the given {@link BytesRef}. */
public abstract void processPayload(BytesRef payload) throws IOException;
}
/**
* Returns a {@link ReaderPayloadProcessor} for the given {@link Directory},
* through which {@link PayloadProcessor}s can be obtained for each
* {@link Term}, or <code>null</code> if none should be used.
*/
public abstract ReaderPayloadProcessor getReaderProcessor(AtomicReader reader) throws IOException;
}

View File

@ -56,13 +56,11 @@ final class SegmentMerger {
// note, just like in codec apis Directory 'dir' is NOT the same as segmentInfo.dir!!
SegmentMerger(SegmentInfo segmentInfo, InfoStream infoStream, Directory dir, int termIndexInterval,
MergeState.CheckAbort checkAbort, PayloadProcessorProvider payloadProcessorProvider,
FieldInfos.FieldNumbers fieldNumbers, IOContext context) {
MergeState.CheckAbort checkAbort, FieldInfos.FieldNumbers fieldNumbers, IOContext context) {
mergeState.segmentInfo = segmentInfo;
mergeState.infoStream = infoStream;
mergeState.readers = new ArrayList<AtomicReader>();
mergeState.checkAbort = checkAbort;
mergeState.payloadProcessorProvider = payloadProcessorProvider;
directory = dir;
this.termIndexInterval = termIndexInterval;
this.codec = segmentInfo.getCodec();
@ -274,8 +272,6 @@ final class SegmentMerger {
// Remap docIDs
mergeState.docMaps = new MergeState.DocMap[numReaders];
mergeState.docBase = new int[numReaders];
mergeState.readerPayloadProcessor = new PayloadProcessorProvider.ReaderPayloadProcessor[numReaders];
mergeState.currentPayloadProcessor = new PayloadProcessorProvider.PayloadProcessor[numReaders];
int docBase = 0;
@ -289,10 +285,6 @@ final class SegmentMerger {
mergeState.docMaps[i] = docMap;
docBase += docMap.numDocs();
if (mergeState.payloadProcessorProvider != null) {
mergeState.readerPayloadProcessor[i] = mergeState.payloadProcessorProvider.getReaderProcessor(reader);
}
i++;
}

View File

@ -204,7 +204,7 @@ public class TestDoc extends LuceneTestCase {
final SegmentInfo si = new SegmentInfo(si1.info.dir, Constants.LUCENE_MAIN_VERSION, merged, -1, false, codec, null, null);
SegmentMerger merger = new SegmentMerger(si, InfoStream.getDefault(), trackingDir, IndexWriterConfig.DEFAULT_TERM_INDEX_INTERVAL,
MergeState.CheckAbort.NONE, null, new FieldInfos.FieldNumbers(), context);
MergeState.CheckAbort.NONE, new FieldInfos.FieldNumbers(), context);
merger.add(r1);
merger.add(r2);

View File

@ -1,271 +0,0 @@
package org.apache.lucene.index;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import org.apache.lucene.analysis.MockAnalyzer;
import org.apache.lucene.analysis.MockTokenizer;
import org.apache.lucene.analysis.TokenStream;
import org.apache.lucene.analysis.tokenattributes.CharTermAttribute;
import org.apache.lucene.analysis.tokenattributes.PayloadAttribute;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.FieldType;
import org.apache.lucene.document.TextField;
import org.apache.lucene.index.PayloadProcessorProvider.ReaderPayloadProcessor;
import org.apache.lucene.index.PayloadProcessorProvider.PayloadProcessor;
import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.store.Directory;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.LuceneTestCase;
import org.junit.Test;
public class TestPayloadProcessorProvider extends LuceneTestCase {
private static final class PerDirPayloadProcessor extends PayloadProcessorProvider {
private final Map<Directory, ReaderPayloadProcessor> processors;
public PerDirPayloadProcessor(Map<Directory, ReaderPayloadProcessor> processors) {
this.processors = processors;
}
@Override
public ReaderPayloadProcessor getReaderProcessor(AtomicReader reader) {
if (reader instanceof SegmentReader) {
return processors.get(((SegmentReader) reader).directory());
} else {
throw new UnsupportedOperationException("This shouldnot happen in this test: Reader is no SegmentReader");
}
}
}
private static final class PerTermPayloadProcessor extends ReaderPayloadProcessor {
@Override
public PayloadProcessor getProcessor(String field, BytesRef text) {
// don't process payloads of terms other than "p:p1"
if (!field.equals("p") || !text.bytesEquals(new BytesRef("p1"))) {
return null;
}
// All other terms are processed the same way
return new DeletePayloadProcessor();
}
}
/** deletes the incoming payload */
private static final class DeletePayloadProcessor extends PayloadProcessor {
@Override
public void processPayload(BytesRef payload) {
payload.length = 0;
}
}
private static final class PayloadTokenStream extends TokenStream {
private final PayloadAttribute payload = addAttribute(PayloadAttribute.class);
private final CharTermAttribute term = addAttribute(CharTermAttribute.class);
private boolean called = false;
private String t;
public PayloadTokenStream(String t) {
this.t = t;
}
@Override
public boolean incrementToken() {
if (called) {
return false;
}
called = true;
byte[] p = new byte[] { 1 };
payload.setPayload(new BytesRef(p));
term.append(t);
return true;
}
@Override
public void reset() throws IOException {
super.reset();
called = false;
term.setEmpty();
}
}
private static final int NUM_DOCS = 10;
private void populateDirs(Random random, Directory[] dirs, boolean multipleCommits)
throws IOException {
for (int i = 0; i < dirs.length; i++) {
dirs[i] = newDirectory();
populateDocs(random, dirs[i], multipleCommits);
verifyPayloadExists(dirs[i], "p", new BytesRef("p1"), NUM_DOCS);
verifyPayloadExists(dirs[i], "p", new BytesRef("p2"), NUM_DOCS);
}
}
private void populateDocs(Random random, Directory dir, boolean multipleCommits)
throws IOException {
IndexWriter writer = new IndexWriter(
dir,
newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random, MockTokenizer.WHITESPACE, false)).
setMergePolicy(newLogMergePolicy(10))
);
TokenStream payloadTS1 = new PayloadTokenStream("p1");
TokenStream payloadTS2 = new PayloadTokenStream("p2");
FieldType customType = new FieldType(TextField.TYPE_NOT_STORED);
customType.setOmitNorms(true);
for (int i = 0; i < NUM_DOCS; i++) {
Document doc = new Document();
doc.add(newField("id", "doc" + i, customType));
doc.add(newTextField("content", "doc content " + i, Field.Store.NO));
if (random.nextBoolean()) {
doc.add(new TextField("p", payloadTS1));
doc.add(new TextField("p", payloadTS2));
} else {
FieldType type = new FieldType(TextField.TYPE_NOT_STORED);
type.setStoreTermVectors(true);
type.setStoreTermVectorPositions(true);
type.setStoreTermVectorPayloads(true);
type.setStoreTermVectorOffsets(random.nextBoolean());
doc.add(new Field("p", payloadTS1, type));
doc.add(new Field("p", payloadTS2, type));
}
writer.addDocument(doc);
if (multipleCommits && (i % 4 == 0)) {
writer.commit();
}
}
writer.close();
}
private void verifyPayloadExists(Directory dir, String field, BytesRef text, int numExpected)
throws IOException {
IndexReader reader = DirectoryReader.open(dir);
try {
int numPayloads = 0;
DocsAndPositionsEnum tpe = MultiFields.getTermPositionsEnum(reader, null, field, text);
while (tpe.nextDoc() != DocIdSetIterator.NO_MORE_DOCS) {
tpe.nextPosition();
BytesRef payload = tpe.getPayload();
if (payload != null) {
assertEquals(1, payload.length);
assertEquals(1, payload.bytes[0]);
++numPayloads;
}
}
assertEquals(numExpected, numPayloads);
} finally {
reader.close();
}
}
private void doTest(Random random, boolean addToEmptyIndex,
int numExpectedPayloads, boolean multipleCommits) throws IOException {
Directory[] dirs = new Directory[2];
populateDirs(random, dirs, multipleCommits);
Directory dir = newDirectory();
if (!addToEmptyIndex) {
populateDocs(random, dir, multipleCommits);
verifyPayloadExists(dir, "p", new BytesRef("p1"), NUM_DOCS);
verifyPayloadExists(dir, "p", new BytesRef("p2"), NUM_DOCS);
}
// Add two source dirs. By not adding the dest dir, we ensure its payloads
// won't get processed.
Map<Directory, ReaderPayloadProcessor> processors = new HashMap<Directory, ReaderPayloadProcessor>();
for (Directory d : dirs) {
processors.put(d, new PerTermPayloadProcessor());
}
IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random, MockTokenizer.WHITESPACE, false)));
writer.setPayloadProcessorProvider(new PerDirPayloadProcessor(processors));
IndexReader[] readers = new IndexReader[dirs.length];
for (int i = 0; i < readers.length; i++) {
readers[i] = DirectoryReader.open(dirs[i]);
}
try {
writer.addIndexes(readers);
} finally {
for (IndexReader r : readers) {
r.close();
}
}
writer.close();
verifyPayloadExists(dir, "p", new BytesRef("p1"), numExpectedPayloads);
// the second term should always have all payloads
numExpectedPayloads = NUM_DOCS * dirs.length
+ (addToEmptyIndex ? 0 : NUM_DOCS);
verifyPayloadExists(dir, "p", new BytesRef("p2"), numExpectedPayloads);
for (Directory d : dirs)
d.close();
dir.close();
}
@Test
public void testAddIndexes() throws Exception {
// addIndexes - single commit in each
doTest(random(), true, 0, false);
// addIndexes - multiple commits in each
doTest(random(), true, 0, true);
}
@Test
public void testAddIndexesIntoExisting() throws Exception {
// addIndexes - single commit in each
doTest(random(), false, NUM_DOCS, false);
// addIndexes - multiple commits in each
doTest(random(), false, NUM_DOCS, true);
}
@Test
public void testRegularMerges() throws Exception {
Directory dir = newDirectory();
populateDocs(random(), dir, true);
verifyPayloadExists(dir, "p", new BytesRef("p1"), NUM_DOCS);
verifyPayloadExists(dir, "p", new BytesRef("p2"), NUM_DOCS);
// Add two source dirs. By not adding the dest dir, we ensure its payloads
// won't get processed.
Map<Directory, ReaderPayloadProcessor> processors = new HashMap<Directory, ReaderPayloadProcessor>();
processors.put(dir, new PerTermPayloadProcessor());
IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random(), MockTokenizer.WHITESPACE, false)));
writer.setPayloadProcessorProvider(new PerDirPayloadProcessor(processors));
writer.forceMerge(1);
writer.close();
verifyPayloadExists(dir, "p", new BytesRef("p1"), 0);
verifyPayloadExists(dir, "p", new BytesRef("p2"), NUM_DOCS);
dir.close();
}
}

View File

@ -82,7 +82,7 @@ public class TestSegmentMerger extends LuceneTestCase {
final SegmentInfo si = new SegmentInfo(mergedDir, Constants.LUCENE_MAIN_VERSION, mergedSegment, -1, false, codec, null, null);
SegmentMerger merger = new SegmentMerger(si, InfoStream.getDefault(), mergedDir, IndexWriterConfig.DEFAULT_TERM_INDEX_INTERVAL,
MergeState.CheckAbort.NONE, null, new FieldInfos.FieldNumbers(), newIOContext(random()));
MergeState.CheckAbort.NONE, new FieldInfos.FieldNumbers(), newIOContext(random()));
merger.add(reader1);
merger.add(reader2);
MergeState mergeState = merger.merge();

View File

@ -1,17 +1,20 @@
package org.apache.lucene.facet.example.merge;
import java.io.IOException;
import java.util.List;
import org.apache.lucene.index.AtomicReader;
import org.apache.lucene.index.AtomicReaderContext;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.PayloadProcessorProvider;
import org.apache.lucene.index.MultiReader;
import org.apache.lucene.store.Directory;
import org.apache.lucene.facet.example.ExampleUtils;
import org.apache.lucene.facet.index.FacetsPayloadProcessorProvider;
import org.apache.lucene.facet.index.OrdinalMappingAtomicReader;
import org.apache.lucene.facet.index.params.DefaultFacetIndexingParams;
import org.apache.lucene.facet.index.params.FacetIndexingParams;
import org.apache.lucene.facet.taxonomy.directory.DirectoryTaxonomyWriter;
import org.apache.lucene.facet.taxonomy.directory.DirectoryTaxonomyWriter.DiskOrdinalMap;
import org.apache.lucene.facet.taxonomy.directory.DirectoryTaxonomyWriter.MemoryOrdinalMap;
@ -84,13 +87,17 @@ public class TaxonomyMergeUtils {
// merge the taxonomies
destTaxWriter.addTaxonomy(srcTaxDir, map);
PayloadProcessorProvider payloadProcessor = new FacetsPayloadProcessorProvider(
srcIndexDir, map.getMap(), new DefaultFacetIndexingParams());
destIndexWriter.setPayloadProcessorProvider(payloadProcessor);
int ordinalMap[] = map.getMap();
FacetIndexingParams params = new DefaultFacetIndexingParams();
IndexReader reader = DirectoryReader.open(srcIndexDir);
DirectoryReader reader = DirectoryReader.open(srcIndexDir, -1);
List<AtomicReaderContext> leaves = reader.leaves();
AtomicReader wrappedLeaves[] = new AtomicReader[leaves.size()];
for (int i = 0; i < leaves.size(); i++) {
wrappedLeaves[i] = new OrdinalMappingAtomicReader(leaves.get(i).reader(), ordinalMap, params);
}
try {
destIndexWriter.addIndexes(reader);
destIndexWriter.addIndexes(new MultiReader(wrappedLeaves));
// commit changes to taxonomy and index respectively.
destTaxWriter.commit();

View File

@ -1,197 +0,0 @@
package org.apache.lucene.facet.index;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.HashMap;
import java.util.Map;
import org.apache.lucene.index.AtomicReader;
import org.apache.lucene.index.PayloadProcessorProvider;
import org.apache.lucene.index.PayloadProcessorProvider.ReaderPayloadProcessor; // javadocs
import org.apache.lucene.index.SegmentReader;
import org.apache.lucene.index.Term;
import org.apache.lucene.store.Directory;
import org.apache.lucene.facet.index.params.CategoryListParams;
import org.apache.lucene.facet.index.params.FacetIndexingParams;
import org.apache.lucene.facet.taxonomy.directory.DirectoryTaxonomyWriter.OrdinalMap;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.encoding.IntDecoder;
import org.apache.lucene.util.encoding.IntEncoder;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* A {@link PayloadProcessorProvider} for updating facets ordinal references,
* based on an ordinal map. You should use this code in conjunction with merging
* taxonomies - after you merge taxonomies, you receive an {@link OrdinalMap}
* which maps the 'old' payloads to the 'new' ones. You can use that map to
* re-map the payloads which contain the facets information (ordinals) either
* before or while merging the indexes.
* <p>
* For re-mapping the ordinals before you merge the indexes, do the following:
*
* <pre>
* // merge the old taxonomy with the new one.
* OrdinalMap map = LuceneTaxonomyWriter.addTaxonomies();
* int[] ordmap = map.getMap();
*
* // re-map the ordinals on the old directory.
* Directory oldDir;
* FacetsPayloadProcessorProvider fppp = new FacetsPayloadProcessorProvider(
* oldDir, ordmap);
* IndexWriterConfig conf = new IndexWriterConfig(VER, ANALYZER);
* conf.setMergePolicy(new ForceOptimizeMergePolicy());
* IndexWriter writer = new IndexWriter(oldDir, conf);
* writer.setPayloadProcessorProvider(fppp);
* writer.forceMerge(1);
* writer.close();
*
* // merge that directory with the new index.
* IndexWriter newWriter; // opened on the 'new' Directory
* newWriter.addIndexes(oldDir);
* newWriter.commit();
* </pre>
*
* For re-mapping the ordinals during index merge, do the following:
*
* <pre>
* // merge the old taxonomy with the new one.
* OrdinalMap map = LuceneTaxonomyWriter.addTaxonomies();
* int[] ordmap = map.getMap();
*
* // Add the index and re-map ordinals on the go
* IndexReader r = IndexReader.open(oldDir);
* IndexWriterConfig conf = new IndexWriterConfig(VER, ANALYZER);
* IndexWriter writer = new IndexWriter(newDir, conf);
* writer.setPayloadProcessorProvider(fppp);
* writer.addIndexes(r);
* writer.commit();
* </pre>
* <p>
* <b>NOTE:</b> while the second example looks simpler, IndexWriter may trigger
* a long merge due to addIndexes. The first example avoids this perhaps
* unneeded merge, as well as can be done separately (e.g. on another node)
* before the index is merged.
*
* @lucene.experimental
*/
public class FacetsPayloadProcessorProvider extends PayloadProcessorProvider {
private final Directory workDir;
private final ReaderPayloadProcessor dirProcessor;
/**
* Construct FacetsPayloadProcessorProvider with FacetIndexingParams
*
* @param dir the {@link Directory} containing the segments to update
* @param ordinalMap an array mapping previous facets ordinals to new ones
* @param indexingParams the facets indexing parameters
*/
public FacetsPayloadProcessorProvider(Directory dir, int[] ordinalMap,
FacetIndexingParams indexingParams) {
workDir = dir;
dirProcessor = new FacetsDirPayloadProcessor(indexingParams, ordinalMap);
}
@Override
public ReaderPayloadProcessor getReaderProcessor(AtomicReader reader) throws IOException {
if (reader instanceof SegmentReader) {
if (workDir == ((SegmentReader) reader).directory()) {
return dirProcessor;
}
}
return null;
}
/**
* {@link ReaderPayloadProcessor} that processes
* facet ordinals according to the passed in {@link FacetIndexingParams}.
*/
public static class FacetsDirPayloadProcessor extends ReaderPayloadProcessor {
private final Map<Term, CategoryListParams> termMap = new HashMap<Term, CategoryListParams>(1);
private final int[] ordinalMap;
/**
* Construct FacetsDirPayloadProcessor with custom FacetIndexingParams
* @param ordinalMap an array mapping previous facets ordinals to new ones
* @param indexingParams the facets indexing parameters
*/
protected FacetsDirPayloadProcessor(FacetIndexingParams indexingParams, int[] ordinalMap) {
this.ordinalMap = ordinalMap;
for (CategoryListParams params: indexingParams.getAllCategoryListParams()) {
termMap.put(params.getTerm(), params);
}
}
@Override
public PayloadProcessor getProcessor(String field, BytesRef bytes) throws IOException {
// TODO (Facet): don't create terms
CategoryListParams params = termMap.get(new Term(field, bytes));
if (params == null) {
return null;
}
return new FacetsPayloadProcessor(params, ordinalMap);
}
}
/** A PayloadProcessor for updating facets ordinal references, based on an ordinal map */
public static class FacetsPayloadProcessor extends PayloadProcessor {
private final IntEncoder encoder;
private final IntDecoder decoder;
private final int[] ordinalMap;
private final ByteArrayOutputStream os = new ByteArrayOutputStream();
/**
* @param params defines the encoding of facet ordinals as payload
* @param ordinalMap an array mapping previous facets ordinals to new ones
*/
protected FacetsPayloadProcessor(CategoryListParams params, int[] ordinalMap) {
encoder = params.createEncoder();
decoder = encoder.createMatchingDecoder();
this.ordinalMap = ordinalMap;
}
@Override
public void processPayload(BytesRef payload) throws IOException {
InputStream is = new ByteArrayInputStream(payload.bytes, payload.offset, payload.length);
decoder.reInit(is);
os.reset();
encoder.reInit(os);
long ordinal;
while ((ordinal = decoder.decode()) != IntDecoder.EOS) {
int newOrdinal = ordinalMap[(int)ordinal];
encoder.encode(newOrdinal);
}
encoder.close();
// TODO (Facet): avoid copy?
byte out[] = os.toByteArray();
payload.bytes = out;
payload.offset = 0;
payload.length = out.length;
}
}
}

View File

@ -0,0 +1,224 @@
package org.apache.lucene.facet.index;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.HashMap;
import java.util.Map;
import org.apache.lucene.facet.index.params.CategoryListParams;
import org.apache.lucene.facet.index.params.DefaultFacetIndexingParams;
import org.apache.lucene.facet.index.params.FacetIndexingParams;
import org.apache.lucene.facet.taxonomy.directory.DirectoryTaxonomyWriter.OrdinalMap;
import org.apache.lucene.index.AtomicReader;
import org.apache.lucene.index.DocsAndPositionsEnum;
import org.apache.lucene.index.Fields;
import org.apache.lucene.index.FilterAtomicReader;
import org.apache.lucene.index.Term;
import org.apache.lucene.index.Terms;
import org.apache.lucene.index.TermsEnum;
import org.apache.lucene.util.Bits;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.encoding.IntDecoder;
import org.apache.lucene.util.encoding.IntEncoder;
/**
* A {@link FilterAtomicReader} for updating facets ordinal references,
* based on an ordinal map. You should use this code in conjunction with merging
* taxonomies - after you merge taxonomies, you receive an {@link OrdinalMap}
* which maps the 'old' payloads to the 'new' ones. You can use that map to
* re-map the payloads which contain the facets information (ordinals) either
* before or while merging the indexes.
* <p>
* For re-mapping the ordinals during index merge, do the following:
*
* <pre class="prettyprint">
* // merge the old taxonomy with the new one.
* OrdinalMap map = DirectoryTaxonomyWriter.addTaxonomies();
* int[] ordmap = map.getMap();
*
* // Add the index and re-map ordinals on the go
* DirectoryReader reader = DirectoryReader.open(oldDir);
* IndexWriterConfig conf = new IndexWriterConfig(VER, ANALYZER);
* IndexWriter writer = new IndexWriter(newDir, conf);
* List<AtomicReaderContext> leaves = reader.leaves();
* AtomicReader wrappedLeaves[] = new AtomicReader[leaves.size()];
* for (int i = 0; i < leaves.size(); i++) {
* wrappedLeaves[i] = new OrdinalMappingAtomicReader(leaves.get(i).reader(), ordmap);
* }
* writer.addIndexes(new MultiReader(wrappedLeaves));
* writer.commit();
* </pre>
*
* @lucene.experimental
*/
public class OrdinalMappingAtomicReader extends FilterAtomicReader {
private final int[] ordinalMap;
// a little obtuse: but we dont need to create Term objects this way
private final Map<String,Map<BytesRef,CategoryListParams>> termMap =
new HashMap<String,Map<BytesRef,CategoryListParams>>(1);
/**
* Wraps an AtomicReader, mapping ordinals according to the ordinalMap.
* Calls {@link #OrdinalMappingAtomicReader(AtomicReader, int[], FacetIndexingParams)
* OrdinalMappingAtomicReader(in, ordinalMap, new DefaultFacetIndexingParams())}
*/
public OrdinalMappingAtomicReader(AtomicReader in, int[] ordinalMap) {
this(in, ordinalMap, new DefaultFacetIndexingParams());
}
/**
* Wraps an AtomicReader, mapping ordinals according to the ordinalMap,
* using the provided indexingParams.
*/
public OrdinalMappingAtomicReader(AtomicReader in, int[] ordinalMap, FacetIndexingParams indexingParams) {
super(in);
this.ordinalMap = ordinalMap;
for (CategoryListParams params: indexingParams.getAllCategoryListParams()) {
Term term = params.getTerm();
Map<BytesRef,CategoryListParams> fieldMap = termMap.get(term.field());
if (fieldMap == null) {
fieldMap = new HashMap<BytesRef,CategoryListParams>(1);
termMap.put(term.field(), fieldMap);
}
fieldMap.put(term.bytes(), params);
}
}
@Override
public Fields getTermVectors(int docID) throws IOException {
Fields fields = super.getTermVectors(docID);
if (fields == null) {
return null;
} else {
return new OrdinalMappingFields(fields);
}
}
@Override
public Fields fields() throws IOException {
Fields fields = super.fields();
if (fields == null) {
return null;
} else {
return new OrdinalMappingFields(fields);
}
}
private class OrdinalMappingFields extends FilterFields {
public OrdinalMappingFields(Fields in) {
super(in);
}
@Override
public Terms terms(String field) throws IOException {
Terms terms = super.terms(field);
if (terms == null) {
return terms;
}
Map<BytesRef,CategoryListParams> termsMap = termMap.get(field);
if (termsMap == null) {
return terms;
} else {
return new OrdinalMappingTerms(terms, termsMap);
}
}
}
private class OrdinalMappingTerms extends FilterTerms {
private final Map<BytesRef,CategoryListParams> termsMap;
public OrdinalMappingTerms(Terms in, Map<BytesRef,CategoryListParams> termsMap) {
super(in);
this.termsMap = termsMap;
}
@Override
public TermsEnum iterator(TermsEnum reuse) throws IOException {
// TODO: should we reuse the inner termsenum?
return new OrdinalMappingTermsEnum(super.iterator(reuse), termsMap);
}
}
private class OrdinalMappingTermsEnum extends FilterTermsEnum {
private final Map<BytesRef,CategoryListParams> termsMap;
public OrdinalMappingTermsEnum(TermsEnum in, Map<BytesRef,CategoryListParams> termsMap) {
super(in);
this.termsMap = termsMap;
}
@Override
public DocsAndPositionsEnum docsAndPositions(Bits liveDocs, DocsAndPositionsEnum reuse, int flags) throws IOException {
// TODO: we could reuse our D&P enum if we need
DocsAndPositionsEnum inner = super.docsAndPositions(liveDocs, reuse, flags);
if (inner == null) {
return inner;
}
CategoryListParams params = termsMap.get(term());
if (params == null) {
return inner;
}
return new OrdinalMappingDocsAndPositionsEnum(inner, params);
}
}
private class OrdinalMappingDocsAndPositionsEnum extends FilterDocsAndPositionsEnum {
private final IntEncoder encoder;
private final IntDecoder decoder;
private final ByteArrayOutputStream os = new ByteArrayOutputStream();
private final BytesRef payloadOut = new BytesRef();
public OrdinalMappingDocsAndPositionsEnum(DocsAndPositionsEnum in, CategoryListParams params) {
super(in);
encoder = params.createEncoder();
decoder = encoder.createMatchingDecoder();
}
@Override
public BytesRef getPayload() throws IOException {
BytesRef payload = super.getPayload();
if (payload == null) {
return payload;
} else {
InputStream is = new ByteArrayInputStream(payload.bytes, payload.offset, payload.length);
decoder.reInit(is);
os.reset();
encoder.reInit(os);
long ordinal;
while ((ordinal = decoder.decode()) != IntDecoder.EOS) {
int newOrdinal = ordinalMap[(int)ordinal];
encoder.encode(newOrdinal);
}
encoder.close();
// TODO (Facet): avoid copy?
byte out[] = os.toByteArray();
payloadOut.bytes = out;
payloadOut.offset = 0;
payloadOut.length = out.length;
return payloadOut;
}
}
}
}