mirror of https://github.com/apache/lucene.git
LUCENE-5644: add test case
git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1593023 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
4a84567693
commit
abde483cd4
|
@ -0,0 +1,230 @@
|
|||
package org.apache.lucene.index;
|
||||
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.CyclicBarrier;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.apache.lucene.analysis.MockAnalyzer;
|
||||
import org.apache.lucene.document.Document;
|
||||
import org.apache.lucene.document.Field;
|
||||
import org.apache.lucene.store.Directory;
|
||||
import org.apache.lucene.util.IOUtils;
|
||||
import org.apache.lucene.util.LuceneTestCase;
|
||||
import org.apache.lucene.util.TestUtil;
|
||||
|
||||
public class TestIndexWriterThreadsToSegments extends LuceneTestCase {
|
||||
|
||||
// LUCENE-5644: for first segment, two threads each indexed one doc (likely concurrently), but for second segment, each thread indexed the
|
||||
// doc NOT at the same time, and should have shared the same thread state / segment
|
||||
public void testSegmentCountOnFlushBasic() throws Exception {
|
||||
Directory dir = newDirectory();
|
||||
final IndexWriter w = new IndexWriter(dir, new IndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random())));
|
||||
final CountDownLatch startingGun = new CountDownLatch(1);
|
||||
final CountDownLatch startDone = new CountDownLatch(2);
|
||||
final CountDownLatch middleGun = new CountDownLatch(1);
|
||||
final CountDownLatch finalGun = new CountDownLatch(1);
|
||||
Thread[] threads = new Thread[2];
|
||||
for(int i=0;i<threads.length;i++) {
|
||||
final int threadID = i;
|
||||
threads[i] = new Thread() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
startingGun.await();
|
||||
Document doc = new Document();
|
||||
doc.add(newTextField("field", "here is some text", Field.Store.NO));
|
||||
w.addDocument(doc);
|
||||
startDone.countDown();
|
||||
|
||||
middleGun.await();
|
||||
if (threadID == 0) {
|
||||
w.addDocument(doc);
|
||||
} else {
|
||||
finalGun.await();
|
||||
w.addDocument(doc);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
};
|
||||
threads[i].start();
|
||||
}
|
||||
|
||||
startingGun.countDown();
|
||||
startDone.await();
|
||||
|
||||
IndexReader r = DirectoryReader.open(w, true);
|
||||
assertEquals(2, r.numDocs());
|
||||
int numSegments = r.leaves().size();
|
||||
// 1 segment if the threads ran sequentially, else 2:
|
||||
assertTrue(numSegments <= 2);
|
||||
r.close();
|
||||
|
||||
middleGun.countDown();
|
||||
threads[0].join();
|
||||
|
||||
finalGun.countDown();
|
||||
threads[1].join();
|
||||
|
||||
r = DirectoryReader.open(w, true);
|
||||
assertEquals(4, r.numDocs());
|
||||
// Both threads should have shared a single thread state since they did not try to index concurrently:
|
||||
assertEquals(1+numSegments, r.leaves().size());
|
||||
r.close();
|
||||
|
||||
w.close();
|
||||
dir.close();
|
||||
}
|
||||
|
||||
/** Maximum number of simultaneous threads to use for each iteration. */
|
||||
private static final int MAX_THREADS_AT_ONCE = 10;
|
||||
|
||||
static class CheckSegmentCount implements Runnable, Closeable {
|
||||
private final IndexWriter w;
|
||||
private final AtomicInteger maxThreadCountPerIter;
|
||||
private final AtomicInteger indexingCount;
|
||||
private DirectoryReader r;
|
||||
|
||||
public CheckSegmentCount(IndexWriter w, AtomicInteger maxThreadCountPerIter, AtomicInteger indexingCount) throws IOException {
|
||||
this.w = w;
|
||||
this.maxThreadCountPerIter = maxThreadCountPerIter;
|
||||
this.indexingCount = indexingCount;
|
||||
r = DirectoryReader.open(w, true);
|
||||
assertEquals(0, r.leaves().size());
|
||||
setNextIterThreadCount();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
int oldSegmentCount = r.leaves().size();
|
||||
DirectoryReader r2 = DirectoryReader.openIfChanged(r);
|
||||
assertNotNull(r2);
|
||||
r.close();
|
||||
r = r2;
|
||||
int maxThreadStates = w.getConfig().getMaxThreadStates();
|
||||
int maxExpectedSegments = oldSegmentCount + Math.min(maxThreadStates, maxThreadCountPerIter.get());
|
||||
if (VERBOSE) {
|
||||
System.out.println("TEST: iter done; now verify oldSegCount=" + oldSegmentCount + " newSegCount=" + r2.leaves().size() + " maxExpected=" + maxExpectedSegments);
|
||||
}
|
||||
// NOTE: it won't necessarily be ==, in case some threads were strangely scheduled and never conflicted with one another (should be uncommon...?):
|
||||
assertTrue(r.leaves().size() <= maxExpectedSegments);
|
||||
setNextIterThreadCount();
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
private void setNextIterThreadCount() {
|
||||
indexingCount.set(0);
|
||||
maxThreadCountPerIter.set(TestUtil.nextInt(random(), 1, MAX_THREADS_AT_ONCE));
|
||||
if (VERBOSE) {
|
||||
System.out.println("TEST: iter set maxThreadCount=" + maxThreadCountPerIter.get());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
r.close();
|
||||
r = null;
|
||||
}
|
||||
}
|
||||
|
||||
// LUCENE-5644: index docs w/ multiple threads but in between flushes we limit how many threads can index concurrently in the next
|
||||
// iteration, and then verify that no more segments were flushed than number of threads:
|
||||
public void testSegmentCountOnFlushRandom() throws Exception {
|
||||
Directory dir = newDirectory();
|
||||
IndexWriterConfig iwc = new IndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random()));
|
||||
|
||||
int maxThreadStates = TestUtil.nextInt(random(), 1, 12);
|
||||
|
||||
if (VERBOSE) {
|
||||
System.out.println("TEST: maxThreadStates=" + maxThreadStates);
|
||||
}
|
||||
|
||||
// Never trigger flushes (so we only flush on getReader):
|
||||
iwc.setMaxBufferedDocs(100000000);
|
||||
iwc.setRAMBufferSizeMB(-1);
|
||||
iwc.setMaxThreadStates(maxThreadStates);
|
||||
|
||||
// Never trigger merges (so we can simplistically count flushed segments):
|
||||
iwc.setMergePolicy(NoMergePolicy.INSTANCE);
|
||||
|
||||
final IndexWriter w = new IndexWriter(dir, iwc);
|
||||
|
||||
// How many threads are indexing in the current cycle:
|
||||
final AtomicInteger indexingCount = new AtomicInteger();
|
||||
|
||||
// How many threads we will use on each cycle:
|
||||
final AtomicInteger maxThreadCount = new AtomicInteger();
|
||||
|
||||
CheckSegmentCount checker = new CheckSegmentCount(w, maxThreadCount, indexingCount);
|
||||
|
||||
// We spin up 10 threads up front, but then in between flushes we limit how many can run on each iteration
|
||||
final int ITERS = 100;
|
||||
Thread[] threads = new Thread[MAX_THREADS_AT_ONCE];
|
||||
|
||||
// We use this to stop all threads once they've indexed their docs in the current iter, and pull a new NRT reader, and verify the
|
||||
// segment count:
|
||||
final CyclicBarrier barrier = new CyclicBarrier(MAX_THREADS_AT_ONCE, checker);
|
||||
|
||||
for(int i=0;i<threads.length;i++) {
|
||||
threads[i] = new Thread() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
for(int iter=0;iter<ITERS;iter++) {
|
||||
if (indexingCount.incrementAndGet() <= maxThreadCount.get()) {
|
||||
if (VERBOSE) {
|
||||
System.out.println("TEST: " + Thread.currentThread().getName() + ": do index");
|
||||
}
|
||||
|
||||
// We get to index on this cycle:
|
||||
Document doc = new Document();
|
||||
doc.add(newTextField("field", "here is some text that is a bit longer than normal trivial text", Field.Store.NO));
|
||||
for(int j=0;j<200;j++) {
|
||||
w.addDocument(doc);
|
||||
}
|
||||
} else {
|
||||
// We lose: no indexing for us on this cycle
|
||||
if (VERBOSE) {
|
||||
System.out.println("TEST: " + Thread.currentThread().getName() + ": don't index");
|
||||
}
|
||||
}
|
||||
barrier.await();
|
||||
}
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
};
|
||||
threads[i].start();
|
||||
}
|
||||
|
||||
for(int i=0;i<threads.length;i++) {
|
||||
threads[i].join();
|
||||
}
|
||||
|
||||
IOUtils.close(checker, w, dir);
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue