mirror of https://github.com/apache/lucene.git
LUCENE-5675, LUCENE-5693: improve javadocs, disallow term vectors, fix precommit issues, remove trivial diffs, add new test case
git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/branches/lucene5675@1596938 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
f2afbded2f
commit
39f48c60e3
|
@ -24,21 +24,35 @@ import org.apache.lucene.codecs.FieldsProducer;
|
|||
import org.apache.lucene.codecs.PostingsFormat;
|
||||
import org.apache.lucene.codecs.PostingsReaderBase;
|
||||
import org.apache.lucene.codecs.PostingsWriterBase;
|
||||
import org.apache.lucene.codecs.blocktree.BlockTreeTermsReader;
|
||||
import org.apache.lucene.codecs.blocktree.BlockTreeTermsWriter;
|
||||
import org.apache.lucene.index.SegmentReadState;
|
||||
import org.apache.lucene.index.SegmentWriteState;
|
||||
import org.apache.lucene.search.LiveFieldValues;
|
||||
import org.apache.lucene.util.BytesRef;
|
||||
import org.apache.lucene.util.IOUtils;
|
||||
|
||||
/** A PostingsFormat for primary-key (ID) fields, that associates a
|
||||
* long version with each ID and enables fast (using only the terms index)
|
||||
* lookup for whether a given ID may have a version > N.
|
||||
/** A PostingsFormat optimized for primary-key (ID) fields that also
|
||||
* record a version (long) for each ID, delivered as a payload
|
||||
* created by {@link #longToBytes} during indexing. At search time,
|
||||
* the TermsEnum implementation {@link IDVersionSegmentTermsEnum}
|
||||
* enables fast (using only the terms index when possible) lookup for
|
||||
* whether a given ID was previously indexed with version > N (see
|
||||
* {@link IDVersionSegmentTermsEnum#seekExact(BytesRef,long)}.
|
||||
*
|
||||
* The field is indexed as DOCS_ONLY, but the user must feed in the
|
||||
* <p>This is most effective if the app assigns monotonically
|
||||
* increasing global version to each indexed doc. Then, during
|
||||
* indexing, use {@link
|
||||
* IDVersionSegmentTermsEnum#seekExact(BytesRef,long)} (along with
|
||||
* {@link LiveFieldValues}) to decide whether the document you are
|
||||
* about to index was already indexed with a higher version, and skip
|
||||
* it if so.
|
||||
*
|
||||
* <p>The field is effectively indexed as DOCS_ONLY and the docID is
|
||||
* pulsed into the terms dictionary, but the user must feed in the
|
||||
* version as a payload on the first token.
|
||||
*
|
||||
* The docID and version for each ID is inlined into the terms dict.
|
||||
* <p>NOTE: term vectors cannot be indexed with this field (not that
|
||||
* you should really ever want to do this).
|
||||
*
|
||||
* @lucene.experimental */
|
||||
|
||||
|
|
|
@ -24,10 +24,8 @@ import org.apache.lucene.codecs.CodecUtil;
|
|||
import org.apache.lucene.codecs.PushPostingsWriterBase;
|
||||
import org.apache.lucene.index.FieldInfo;
|
||||
import org.apache.lucene.index.SegmentWriteState;
|
||||
import org.apache.lucene.index.TermState;
|
||||
import org.apache.lucene.store.DataOutput;
|
||||
import org.apache.lucene.store.IndexOutput;
|
||||
import org.apache.lucene.util.Bits;
|
||||
import org.apache.lucene.util.BytesRef;
|
||||
|
||||
final class IDVersionPostingsWriter extends PushPostingsWriterBase {
|
||||
|
@ -67,6 +65,11 @@ final class IDVersionPostingsWriter extends PushPostingsWriterBase {
|
|||
if (fieldInfo.getIndexOptions() != FieldInfo.IndexOptions.DOCS_AND_FREQS_AND_POSITIONS) {
|
||||
throw new IllegalArgumentException("field must be index using IndexOptions.DOCS_AND_FREQS_AND_POSITIONS");
|
||||
}
|
||||
// LUCENE-5693: because CheckIndex cross-checks term vectors with postings even for deleted docs, and because our PF only indexes the
|
||||
// non-deleted documents on flush, CheckIndex will see this as corruption:
|
||||
if (fieldInfo.hasVectors()) {
|
||||
throw new IllegalArgumentException("field cannot index term vectors: CheckIndex will report this as index corruption");
|
||||
}
|
||||
lastState = emptyState;
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -32,14 +32,14 @@ import org.apache.lucene.util.ArrayUtil;
|
|||
import org.apache.lucene.util.Bits;
|
||||
import org.apache.lucene.util.BytesRef;
|
||||
import org.apache.lucene.util.RamUsageEstimator;
|
||||
import org.apache.lucene.util.fst.ByteSequenceOutputs;
|
||||
import org.apache.lucene.util.fst.FST;
|
||||
import org.apache.lucene.util.fst.Outputs;
|
||||
import org.apache.lucene.util.fst.PairOutputs.Pair;
|
||||
import org.apache.lucene.util.fst.PairOutputs;
|
||||
import org.apache.lucene.util.fst.Util;
|
||||
|
||||
/** Iterates through terms in this field */
|
||||
/** Iterates through terms in this field; this class is public so users
|
||||
* can cast it to call {@link #seekExact(BytesRef, long)} for
|
||||
* optimistic-concurreny, and also {@link #getVersion} to get the
|
||||
* version of the currently seek'd term. */
|
||||
public final class IDVersionSegmentTermsEnum extends TermsEnum {
|
||||
|
||||
// Lazy init:
|
||||
|
@ -225,12 +225,17 @@ public final class IDVersionSegmentTermsEnum extends TermsEnum {
|
|||
}
|
||||
}
|
||||
|
||||
/** Only valid if we are positioned. */
|
||||
/** Get the version of the currently seek'd term; only valid if we are
|
||||
* positioned. */
|
||||
public long getVersion() {
|
||||
return ((IDVersionTermState) currentFrame.state).idVersion;
|
||||
}
|
||||
|
||||
/** Returns false if the term does not exist, or it exists but its version is too old (< minIDVersion). */
|
||||
/** Optimized version of {@link #seekExact(BytesRef)} that can
|
||||
* sometimes fail-fast if the version indexed with the requested ID
|
||||
* is less than the specified minIDVersion. Applications that index
|
||||
* a monotonically increasing global version with each document can
|
||||
* use this for fast optimistic concurrency. */
|
||||
public boolean seekExact(final BytesRef target, long minIDVersion) throws IOException {
|
||||
|
||||
if (fr.index == null) {
|
||||
|
|
|
@ -34,7 +34,6 @@ import org.apache.lucene.index.IndexFileNames;
|
|||
import org.apache.lucene.index.SegmentWriteState;
|
||||
import org.apache.lucene.index.Terms;
|
||||
import org.apache.lucene.index.TermsEnum;
|
||||
import org.apache.lucene.store.DataOutput;
|
||||
import org.apache.lucene.store.IndexOutput;
|
||||
import org.apache.lucene.store.RAMOutputStream;
|
||||
import org.apache.lucene.util.ArrayUtil;
|
||||
|
@ -47,7 +46,6 @@ import org.apache.lucene.util.fst.ByteSequenceOutputs;
|
|||
import org.apache.lucene.util.fst.BytesRefFSTEnum;
|
||||
import org.apache.lucene.util.fst.FST;
|
||||
import org.apache.lucene.util.fst.NoOutputs;
|
||||
import org.apache.lucene.util.fst.Outputs;
|
||||
import org.apache.lucene.util.fst.PairOutputs.Pair;
|
||||
import org.apache.lucene.util.fst.PairOutputs;
|
||||
import org.apache.lucene.util.fst.PositiveIntOutputs;
|
||||
|
@ -166,7 +164,7 @@ final class VersionBlockTreeTermsWriter extends FieldsConsumer {
|
|||
}
|
||||
|
||||
private final List<FieldMetaData> fields = new ArrayList<>();
|
||||
private final String segment;
|
||||
// private final String segment;
|
||||
|
||||
/** Create a new writer. The number of items (terms or
|
||||
* sub-blocks) per block will aim to be between
|
||||
|
@ -211,7 +209,7 @@ final class VersionBlockTreeTermsWriter extends FieldsConsumer {
|
|||
CodecUtil.writeHeader(indexOut, TERMS_INDEX_CODEC_NAME, VERSION_CURRENT);
|
||||
|
||||
this.postingsWriter = postingsWriter;
|
||||
segment = state.segmentInfo.name;
|
||||
// segment = state.segmentInfo.name;
|
||||
|
||||
// System.out.println("BTW.init seg=" + state.segmentName);
|
||||
|
||||
|
@ -781,7 +779,7 @@ final class VersionBlockTreeTermsWriter extends FieldsConsumer {
|
|||
boolean absolute = true;
|
||||
long maxVersionInBlock = -1;
|
||||
|
||||
int countx = 0;
|
||||
// int countx = 0;
|
||||
if (isLeafBlock) {
|
||||
subIndices = null;
|
||||
for (PendingEntry ent : slice) {
|
||||
|
@ -979,12 +977,12 @@ final class VersionBlockTreeTermsWriter extends FieldsConsumer {
|
|||
longsSize,
|
||||
minTerm, maxTerm));
|
||||
} else {
|
||||
assert docsSeen.cardinality() == 0;
|
||||
// cannot assert this: we skip deleted docIDs in the postings:
|
||||
// assert docsSeen.cardinality() == 0;
|
||||
}
|
||||
}
|
||||
|
||||
private final RAMOutputStream suffixWriter = new RAMOutputStream();
|
||||
private final RAMOutputStream statsWriter = new RAMOutputStream();
|
||||
private final RAMOutputStream metaWriter = new RAMOutputStream();
|
||||
private final RAMOutputStream bytesWriter = new RAMOutputStream();
|
||||
}
|
||||
|
|
|
@ -62,7 +62,7 @@ class StringAndPayloadField extends Field {
|
|||
return ts;
|
||||
}
|
||||
|
||||
private static final class SingleTokenWithPayloadTokenStream extends TokenStream {
|
||||
static final class SingleTokenWithPayloadTokenStream extends TokenStream {
|
||||
|
||||
private final CharTermAttribute termAttribute = addAttribute(CharTermAttribute.class);
|
||||
private final PayloadAttribute payloadAttribute = addAttribute(PayloadAttribute.class);
|
||||
|
|
|
@ -20,36 +20,34 @@ package org.apache.lucene.codecs.idversion;
|
|||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import org.apache.lucene.analysis.Analyzer;
|
||||
import org.apache.lucene.analysis.CannedTokenStream;
|
||||
import org.apache.lucene.analysis.MockAnalyzer;
|
||||
import org.apache.lucene.analysis.Token;
|
||||
import org.apache.lucene.analysis.TokenStream;
|
||||
import org.apache.lucene.analysis.tokenattributes.CharTermAttribute;
|
||||
import org.apache.lucene.analysis.tokenattributes.PayloadAttribute;
|
||||
import org.apache.lucene.codecs.Codec;
|
||||
import org.apache.lucene.codecs.idversion.StringAndPayloadField.SingleTokenWithPayloadTokenStream;
|
||||
import org.apache.lucene.document.Document;
|
||||
import org.apache.lucene.document.Field;
|
||||
import org.apache.lucene.document.FieldType;
|
||||
import org.apache.lucene.index.BasePostingsFormatTestCase;
|
||||
import org.apache.lucene.index.ConcurrentMergeScheduler;
|
||||
import org.apache.lucene.index.DirectoryReader;
|
||||
import org.apache.lucene.index.DocsEnum;
|
||||
import org.apache.lucene.index.FieldInfo.IndexOptions;
|
||||
import org.apache.lucene.index.IndexReader;
|
||||
import org.apache.lucene.index.IndexWriter;
|
||||
import org.apache.lucene.index.IndexWriterConfig;
|
||||
import org.apache.lucene.index.MergeScheduler;
|
||||
import org.apache.lucene.index.MultiFields;
|
||||
import org.apache.lucene.index.PerThreadPKLookup;
|
||||
import org.apache.lucene.index.RandomIndexWriter;
|
||||
import org.apache.lucene.index.Term;
|
||||
import org.apache.lucene.index.TermsEnum;
|
||||
import org.apache.lucene.index.TieredMergePolicy;
|
||||
import org.apache.lucene.search.IndexSearcher;
|
||||
import org.apache.lucene.search.LiveFieldValues;
|
||||
import org.apache.lucene.search.SearcherFactory;
|
||||
import org.apache.lucene.search.SearcherManager;
|
||||
import org.apache.lucene.store.Directory;
|
||||
import org.apache.lucene.util.BytesRef;
|
||||
import org.apache.lucene.util.LuceneTestCase;
|
||||
|
@ -91,6 +89,98 @@ public class TestIDVersionPostingsFormat extends LuceneTestCase {
|
|||
String next();
|
||||
}
|
||||
|
||||
private IDSource getRandomIDs() {
|
||||
IDSource ids;
|
||||
switch (random().nextInt(6)) {
|
||||
case 0:
|
||||
// random simple
|
||||
if (VERBOSE) {
|
||||
System.out.println("TEST: use random simple ids");
|
||||
}
|
||||
ids = new IDSource() {
|
||||
@Override
|
||||
public String next() {
|
||||
return TestUtil.randomSimpleString(random());
|
||||
}
|
||||
};
|
||||
break;
|
||||
case 1:
|
||||
// random realistic unicode
|
||||
if (VERBOSE) {
|
||||
System.out.println("TEST: use random realistic unicode ids");
|
||||
}
|
||||
ids = new IDSource() {
|
||||
@Override
|
||||
public String next() {
|
||||
return TestUtil.randomRealisticUnicodeString(random());
|
||||
}
|
||||
};
|
||||
break;
|
||||
case 2:
|
||||
// sequential
|
||||
if (VERBOSE) {
|
||||
System.out.println("TEST: use seuquential ids");
|
||||
}
|
||||
ids = new IDSource() {
|
||||
int upto;
|
||||
@Override
|
||||
public String next() {
|
||||
return Integer.toString(upto++);
|
||||
}
|
||||
};
|
||||
break;
|
||||
case 3:
|
||||
// zero-pad sequential
|
||||
if (VERBOSE) {
|
||||
System.out.println("TEST: use zero-pad seuquential ids");
|
||||
}
|
||||
ids = new IDSource() {
|
||||
final int radix = TestUtil.nextInt(random(), Character.MIN_RADIX, Character.MAX_RADIX);
|
||||
final String zeroPad = String.format(Locale.ROOT, "%0" + TestUtil.nextInt(random(), 4, 20) + "d", 0);
|
||||
int upto;
|
||||
@Override
|
||||
public String next() {
|
||||
String s = Integer.toString(upto++);
|
||||
return zeroPad.substring(zeroPad.length() - s.length()) + s;
|
||||
}
|
||||
};
|
||||
break;
|
||||
case 4:
|
||||
// random long
|
||||
if (VERBOSE) {
|
||||
System.out.println("TEST: use random long ids");
|
||||
}
|
||||
ids = new IDSource() {
|
||||
final int radix = TestUtil.nextInt(random(), Character.MIN_RADIX, Character.MAX_RADIX);
|
||||
int upto;
|
||||
@Override
|
||||
public String next() {
|
||||
return Long.toString(random().nextLong() & 0x7ffffffffffffffL, radix);
|
||||
}
|
||||
};
|
||||
break;
|
||||
case 5:
|
||||
// zero-pad random long
|
||||
if (VERBOSE) {
|
||||
System.out.println("TEST: use zero-pad random long ids");
|
||||
}
|
||||
ids = new IDSource() {
|
||||
final int radix = TestUtil.nextInt(random(), Character.MIN_RADIX, Character.MAX_RADIX);
|
||||
final String zeroPad = String.format(Locale.ROOT, "%015d", 0);
|
||||
int upto;
|
||||
@Override
|
||||
public String next() {
|
||||
return Long.toString(random().nextLong() & 0x7ffffffffffffffL, radix);
|
||||
}
|
||||
};
|
||||
break;
|
||||
default:
|
||||
throw new AssertionError();
|
||||
}
|
||||
|
||||
return ids;
|
||||
}
|
||||
|
||||
// TODO make a similar test for BT, w/ varied IDs:
|
||||
|
||||
public void testRandom() throws Exception {
|
||||
|
@ -108,94 +198,7 @@ public class TestIDVersionPostingsFormat extends LuceneTestCase {
|
|||
System.out.println("TEST: numDocs=" + numDocs);
|
||||
}
|
||||
|
||||
IDSource ids;
|
||||
switch (random().nextInt(6)) {
|
||||
case 0:
|
||||
// random simple
|
||||
if (VERBOSE) {
|
||||
System.out.println(" use random simple ids");
|
||||
}
|
||||
ids = new IDSource() {
|
||||
@Override
|
||||
public String next() {
|
||||
return TestUtil.randomSimpleString(random());
|
||||
}
|
||||
};
|
||||
break;
|
||||
case 1:
|
||||
// random realistic unicode
|
||||
if (VERBOSE) {
|
||||
System.out.println(" use random realistic unicode ids");
|
||||
}
|
||||
ids = new IDSource() {
|
||||
@Override
|
||||
public String next() {
|
||||
return TestUtil.randomRealisticUnicodeString(random());
|
||||
}
|
||||
};
|
||||
break;
|
||||
case 2:
|
||||
// sequential
|
||||
if (VERBOSE) {
|
||||
System.out.println(" use seuquential ids");
|
||||
}
|
||||
ids = new IDSource() {
|
||||
int upto;
|
||||
@Override
|
||||
public String next() {
|
||||
return Integer.toString(upto++);
|
||||
}
|
||||
};
|
||||
break;
|
||||
case 3:
|
||||
// zero-pad sequential
|
||||
if (VERBOSE) {
|
||||
System.out.println(" use zero-pad seuquential ids");
|
||||
}
|
||||
ids = new IDSource() {
|
||||
final int radix = TestUtil.nextInt(random(), Character.MIN_RADIX, Character.MAX_RADIX);
|
||||
final String zeroPad = String.format(Locale.ROOT, "%0" + TestUtil.nextInt(random(), 4, 20) + "d", 0);
|
||||
int upto;
|
||||
@Override
|
||||
public String next() {
|
||||
String s = Integer.toString(upto++);
|
||||
return zeroPad.substring(zeroPad.length() - s.length()) + s;
|
||||
}
|
||||
};
|
||||
break;
|
||||
case 4:
|
||||
// random long
|
||||
if (VERBOSE) {
|
||||
System.out.println(" use random long ids");
|
||||
}
|
||||
ids = new IDSource() {
|
||||
final int radix = TestUtil.nextInt(random(), Character.MIN_RADIX, Character.MAX_RADIX);
|
||||
int upto;
|
||||
@Override
|
||||
public String next() {
|
||||
return Long.toString(random().nextLong() & 0x7ffffffffffffffL, radix);
|
||||
}
|
||||
};
|
||||
break;
|
||||
case 5:
|
||||
// zero-pad random long
|
||||
if (VERBOSE) {
|
||||
System.out.println(" use zero-pad random long ids");
|
||||
}
|
||||
ids = new IDSource() {
|
||||
final int radix = TestUtil.nextInt(random(), Character.MIN_RADIX, Character.MAX_RADIX);
|
||||
final String zeroPad = String.format(Locale.ROOT, "%015d", 0);
|
||||
int upto;
|
||||
@Override
|
||||
public String next() {
|
||||
return Long.toString(random().nextLong() & 0x7ffffffffffffffL, radix);
|
||||
}
|
||||
};
|
||||
break;
|
||||
default:
|
||||
throw new AssertionError();
|
||||
}
|
||||
|
||||
IDSource ids = getRandomIDs();
|
||||
String idPrefix;
|
||||
if (random().nextBoolean()) {
|
||||
idPrefix = "";
|
||||
|
@ -515,6 +518,36 @@ public class TestIDVersionPostingsFormat extends LuceneTestCase {
|
|||
dir.close();
|
||||
}
|
||||
|
||||
// LUCENE-5693: because CheckIndex cross-checks term vectors with postings even for deleted docs, and because our PF only indexes the
|
||||
// non-deleted documents on flush, CheckIndex will see this as corruption:
|
||||
public void testCannotIndexTermVectors() throws Exception {
|
||||
Directory dir = newDirectory();
|
||||
IndexWriterConfig iwc = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random()));
|
||||
iwc.setCodec(TestUtil.alwaysPostingsFormat(new IDVersionPostingsFormat()));
|
||||
RandomIndexWriter w = new RandomIndexWriter(random(), dir, iwc);
|
||||
Document doc = new Document();
|
||||
|
||||
FieldType ft = new FieldType(StringAndPayloadField.TYPE);
|
||||
ft.setStoreTermVectors(true);
|
||||
SingleTokenWithPayloadTokenStream ts = new SingleTokenWithPayloadTokenStream();
|
||||
BytesRef payload = new BytesRef(8);
|
||||
payload.length = 8;
|
||||
IDVersionPostingsFormat.longToBytes(17, payload);
|
||||
ts.setValue("foo", payload);
|
||||
Field field = new Field("id", ts, ft);
|
||||
doc.add(new Field("id", ts, ft));
|
||||
try {
|
||||
w.addDocument(doc);
|
||||
w.commit();
|
||||
fail("didn't hit expected exception");
|
||||
} catch (IllegalArgumentException iae) {
|
||||
// expected
|
||||
// iae.printStackTrace(System.out);
|
||||
}
|
||||
w.close();
|
||||
dir.close();
|
||||
}
|
||||
|
||||
public void testMoreThanOnceInSingleDoc() throws IOException {
|
||||
Directory dir = newDirectory();
|
||||
IndexWriterConfig iwc = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random()));
|
||||
|
@ -533,4 +566,232 @@ public class TestIDVersionPostingsFormat extends LuceneTestCase {
|
|||
w.close();
|
||||
dir.close();
|
||||
}
|
||||
|
||||
// Simulates optimistic concurrency in a distributed indexing app and confirms the latest version always wins:
|
||||
public void testGlobalVersions() throws Exception {
|
||||
Directory dir = newDirectory();
|
||||
IndexWriterConfig iwc = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random()));
|
||||
iwc.setCodec(TestUtil.alwaysPostingsFormat(new IDVersionPostingsFormat()));
|
||||
final RandomIndexWriter w = new RandomIndexWriter(random(), dir, iwc);
|
||||
|
||||
IDSource idsSource = getRandomIDs();
|
||||
int numIDs = atLeast(100);
|
||||
System.out.println("ids=" + numIDs);
|
||||
if (VERBOSE) {
|
||||
System.out.println("TEST: " + numIDs + " ids");
|
||||
}
|
||||
Set<String> idsSeen = new HashSet<String>();
|
||||
while (idsSeen.size() < numIDs) {
|
||||
idsSeen.add(idsSource.next());
|
||||
}
|
||||
final String[] ids = idsSeen.toArray(new String[numIDs]);
|
||||
|
||||
final Object[] locks = new Object[ids.length];
|
||||
for(int i=0;i<locks.length;i++) {
|
||||
locks[i] = new Object();
|
||||
}
|
||||
|
||||
final AtomicLong nextVersion = new AtomicLong();
|
||||
|
||||
final SearcherManager mgr = new SearcherManager(w.w, true, new SearcherFactory());
|
||||
|
||||
final Long missingValue = -1L;
|
||||
|
||||
final LiveFieldValues<IndexSearcher,Long> versionValues = new LiveFieldValues<IndexSearcher,Long>(mgr, missingValue) {
|
||||
@Override
|
||||
protected Long lookupFromSearcher(IndexSearcher s, String id) {
|
||||
// TODO: would be cleaner if we could do our PerThreadLookup here instead of "up above":
|
||||
// We always return missing: the caller then does a lookup against the current reader
|
||||
return missingValue;
|
||||
}
|
||||
};
|
||||
|
||||
// Maps to the version the id was lasted indexed with:
|
||||
final Map<String,Long> truth = new ConcurrentHashMap<>();
|
||||
|
||||
final CountDownLatch startingGun = new CountDownLatch(1);
|
||||
|
||||
Thread[] threads = new Thread[TestUtil.nextInt(random(), 2, 7)];
|
||||
|
||||
final int versionType = random().nextInt(3);
|
||||
|
||||
if (VERBOSE) {
|
||||
if (versionType == 0) {
|
||||
System.out.println("TEST: use random versions");
|
||||
} else if (versionType == 1) {
|
||||
System.out.println("TEST: use monotonic versions");
|
||||
} else {
|
||||
System.out.println("TEST: use nanotime versions");
|
||||
}
|
||||
}
|
||||
|
||||
// Run for 3 sec in normal tests, else 60 seconds for nightly:
|
||||
final long stopTime = System.currentTimeMillis() + (TEST_NIGHTLY ? 60000 : 3000);
|
||||
|
||||
for(int i=0;i<threads.length;i++) {
|
||||
threads[i] = new Thread() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
runForReal();
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
private void runForReal() throws IOException, InterruptedException {
|
||||
startingGun.await();
|
||||
PerThreadVersionPKLookup lookup = null;
|
||||
IndexReader lookupReader = null;
|
||||
while (System.currentTimeMillis() < stopTime) {
|
||||
|
||||
// Intentionally pull version first, and then sleep/yield, to provoke version conflicts:
|
||||
long newVersion;
|
||||
if (versionType == 0) {
|
||||
// Random:
|
||||
newVersion = random().nextLong() & 0x7fffffffffffffffL;
|
||||
} else if (versionType == 1) {
|
||||
// Monotonic
|
||||
newVersion = nextVersion.getAndIncrement();
|
||||
} else {
|
||||
newVersion = System.nanoTime();
|
||||
}
|
||||
|
||||
if (versionType != 0) {
|
||||
if (random().nextBoolean()) {
|
||||
Thread.yield();
|
||||
} else {
|
||||
Thread.sleep(TestUtil.nextInt(random(), 1, 4));
|
||||
}
|
||||
}
|
||||
|
||||
int x = random().nextInt(ids.length);
|
||||
|
||||
// TODO: we could relax this, if e.g. we assign indexer thread based on ID. This would ensure a given ID cannot be indexed at
|
||||
// the same time in multiple threads:
|
||||
|
||||
// Only one thread can update an ID at once:
|
||||
synchronized (locks[x]) {
|
||||
|
||||
String id = ids[x];
|
||||
|
||||
// We will attempt to index id with newVersion, but only do so if id wasn't yet indexed, or it was indexed with an older
|
||||
// version (< newVersion):
|
||||
|
||||
// Must lookup the RT value before pulling from the index, in case a reopen happens just after we lookup:
|
||||
Long currentVersion = versionValues.get(id);
|
||||
|
||||
IndexSearcher s = mgr.acquire();
|
||||
try {
|
||||
if (VERBOSE) System.out.println("\n" + Thread.currentThread().getName() + ": update id=" + id + " newVersion=" + newVersion);
|
||||
|
||||
if (lookup == null || lookupReader != s.getIndexReader()) {
|
||||
// TODO: sort of messy; we could add reopen to PerThreadVersionPKLookup?
|
||||
// TODO: this is thin ice .... that we don't incRef/decRef this reader we are implicitly holding onto:
|
||||
lookupReader = s.getIndexReader();
|
||||
if (VERBOSE) System.out.println(Thread.currentThread().getName() + ": open new PK lookup reader=" + lookupReader);
|
||||
lookup = new PerThreadVersionPKLookup(lookupReader, "id");
|
||||
}
|
||||
|
||||
Long truthVersion = truth.get(id);
|
||||
if (VERBOSE) System.out.println(Thread.currentThread().getName() + ": truthVersion=" + truthVersion);
|
||||
|
||||
boolean doIndex;
|
||||
if (currentVersion == missingValue) {
|
||||
if (VERBOSE) System.out.println(Thread.currentThread().getName() + ": id not in RT cache");
|
||||
int otherDocID = lookup.lookup(new BytesRef(id), newVersion+1);
|
||||
if (otherDocID == -1) {
|
||||
if (VERBOSE) System.out.println(Thread.currentThread().getName() + ": id not in index, or version is <= newVersion; will index");
|
||||
doIndex = true;
|
||||
} else {
|
||||
if (VERBOSE) System.out.println(Thread.currentThread().getName() + ": id is in index with version=" + lookup.getVersion() + "; will not index");
|
||||
doIndex = false;
|
||||
if (truthVersion.longValue() !=lookup.getVersion()) {
|
||||
System.out.println(Thread.currentThread() + ": now fail0!");
|
||||
}
|
||||
assertEquals(truthVersion.longValue(), lookup.getVersion());
|
||||
}
|
||||
} else {
|
||||
if (VERBOSE) System.out.println(Thread.currentThread().getName() + ": id is in RT cache: currentVersion=" + currentVersion);
|
||||
doIndex = newVersion > currentVersion;
|
||||
}
|
||||
|
||||
if (doIndex) {
|
||||
if (VERBOSE) System.out.println(Thread.currentThread().getName() + ": now index");
|
||||
boolean passes = truthVersion == null || truthVersion.longValue() <= newVersion;
|
||||
if (passes == false) {
|
||||
System.out.println(Thread.currentThread() + ": now fail!");
|
||||
}
|
||||
assertTrue(passes);
|
||||
Document doc = new Document();
|
||||
doc.add(makeIDField(id, newVersion));
|
||||
w.updateDocument(new Term("id", id), doc);
|
||||
truth.put(id, newVersion);
|
||||
versionValues.add(id, newVersion);
|
||||
} else {
|
||||
if (VERBOSE) System.out.println(Thread.currentThread().getName() + ": skip index");
|
||||
assertNotNull(truthVersion);
|
||||
assertTrue(truthVersion.longValue() >= newVersion);
|
||||
}
|
||||
} finally {
|
||||
mgr.release(s);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
threads[i].start();
|
||||
}
|
||||
|
||||
startingGun.countDown();
|
||||
|
||||
// Keep reopening the NRT reader until all indexing threads are done:
|
||||
refreshLoop:
|
||||
while (true) {
|
||||
Thread.sleep(TestUtil.nextInt(random(), 1, 10));
|
||||
mgr.maybeRefresh();
|
||||
for (Thread thread : threads) {
|
||||
if (thread.isAlive()) {
|
||||
continue refreshLoop;
|
||||
}
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
// Verify final index against truth:
|
||||
for(int i=0;i<2;i++) {
|
||||
mgr.maybeRefresh();
|
||||
IndexSearcher s = mgr.acquire();
|
||||
try {
|
||||
IndexReader r = s.getIndexReader();
|
||||
// cannot assert this: maybe not all IDs were indexed
|
||||
/*
|
||||
assertEquals(numIDs, r.numDocs());
|
||||
if (i == 1) {
|
||||
// After forceMerge no deleted docs:
|
||||
assertEquals(numIDs, r.maxDoc());
|
||||
}
|
||||
*/
|
||||
PerThreadVersionPKLookup lookup = new PerThreadVersionPKLookup(r, "id");
|
||||
for(Map.Entry<String,Long> ent : truth.entrySet()) {
|
||||
assertTrue(lookup.lookup(new BytesRef(ent.getKey()), -1L) != -1);
|
||||
assertEquals(ent.getValue().longValue(), lookup.getVersion());
|
||||
}
|
||||
} finally {
|
||||
mgr.release(s);
|
||||
}
|
||||
|
||||
if (i == 1) {
|
||||
break;
|
||||
}
|
||||
|
||||
// forceMerge and verify again
|
||||
w.forceMerge(1);
|
||||
}
|
||||
|
||||
mgr.close();
|
||||
w.close();
|
||||
dir.close();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -94,8 +94,6 @@ final class DefaultIndexingChain extends DocConsumer {
|
|||
// aborting on any exception from this method
|
||||
|
||||
int numDocs = state.segmentInfo.getDocCount();
|
||||
|
||||
// TODO: we could set liveDocs earlier and then fix DVs to also not write deleted docs:
|
||||
writeNorms(state);
|
||||
writeDocValues(state);
|
||||
|
||||
|
|
|
@ -461,7 +461,6 @@ class FreqProxFields extends Fields {
|
|||
posLeft = freq;
|
||||
pos = 0;
|
||||
startOffset = 0;
|
||||
|
||||
return docID;
|
||||
}
|
||||
|
||||
|
|
|
@ -1651,6 +1651,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit{
|
|||
if (doWait) {
|
||||
synchronized(this) {
|
||||
while(true) {
|
||||
|
||||
if (hitOOM) {
|
||||
throw new IllegalStateException("this writer hit an OutOfMemoryError; cannot complete forceMerge");
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue