LUCENE-4013: make oal.index.DocumentsWriter* classes package private

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1329419 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael McCandless 2012-04-23 20:13:57 +00:00
parent d580f9f2a6
commit 71bbc7a7b7
10 changed files with 81 additions and 35 deletions

View File

@ -40,7 +40,7 @@ import org.apache.lucene.util.ThreadInterruptedException;
* {@link IndexWriterConfig#getRAMPerThreadHardLimitMB()} to prevent address * {@link IndexWriterConfig#getRAMPerThreadHardLimitMB()} to prevent address
* space exhaustion. * space exhaustion.
*/ */
public final class DocumentsWriterFlushControl { final class DocumentsWriterFlushControl {
private final long hardMaxBytesPerDWPT; private final long hardMaxBytesPerDWPT;
private long activeBytes = 0; private long activeBytes = 0;

View File

@ -27,7 +27,7 @@ import org.apache.lucene.index.DocumentsWriterPerThread.FlushedSegment;
/** /**
* @lucene.internal * @lucene.internal
*/ */
public class DocumentsWriterFlushQueue { class DocumentsWriterFlushQueue {
private final Queue<FlushTicket> queue = new LinkedList<FlushTicket>(); private final Queue<FlushTicket> queue = new LinkedList<FlushTicket>();
// we track tickets separately since count must be present even before the ticket is // we track tickets separately since count must be present even before the ticket is
// constructed ie. queue.size would not reflect it. // constructed ie. queue.size would not reflect it.

View File

@ -37,7 +37,7 @@ import org.apache.lucene.util.InfoStream;
import org.apache.lucene.util.RamUsageEstimator; import org.apache.lucene.util.RamUsageEstimator;
import org.apache.lucene.util.MutableBits; import org.apache.lucene.util.MutableBits;
public class DocumentsWriterPerThread { class DocumentsWriterPerThread {
/** /**
* The IndexingChain must define the {@link #getChain(DocumentsWriterPerThread)} method * The IndexingChain must define the {@link #getChain(DocumentsWriterPerThread)} method

View File

@ -36,7 +36,7 @@ import org.apache.lucene.util.SetOnce;
* new {@link DocumentsWriterPerThread} instance. * new {@link DocumentsWriterPerThread} instance.
* </p> * </p>
*/ */
public abstract class DocumentsWriterPerThreadPool { abstract class DocumentsWriterPerThreadPool {
/** /**
* {@link ThreadState} references and guards a * {@link ThreadState} references and guards a
@ -50,7 +50,7 @@ public abstract class DocumentsWriterPerThreadPool {
* before accessing the state. * before accessing the state.
*/ */
@SuppressWarnings("serial") @SuppressWarnings("serial")
public final static class ThreadState extends ReentrantLock { final static class ThreadState extends ReentrantLock {
DocumentsWriterPerThread dwpt; DocumentsWriterPerThread dwpt;
// TODO this should really be part of DocumentsWriterFlushControl // TODO this should really be part of DocumentsWriterFlushControl
// write access guarded by DocumentsWriterFlushControl // write access guarded by DocumentsWriterFlushControl
@ -127,7 +127,7 @@ public abstract class DocumentsWriterPerThreadPool {
/** /**
* Creates a new {@link DocumentsWriterPerThreadPool} with a given maximum of {@link ThreadState}s. * Creates a new {@link DocumentsWriterPerThreadPool} with a given maximum of {@link ThreadState}s.
*/ */
public DocumentsWriterPerThreadPool(int maxNumThreadStates) { DocumentsWriterPerThreadPool(int maxNumThreadStates) {
if (maxNumThreadStates < 1) { if (maxNumThreadStates < 1) {
throw new IllegalArgumentException("maxNumThreadStates must be >= 1 but was: " + maxNumThreadStates); throw new IllegalArgumentException("maxNumThreadStates must be >= 1 but was: " + maxNumThreadStates);
} }
@ -135,7 +135,7 @@ public abstract class DocumentsWriterPerThreadPool {
numThreadStatesActive = 0; numThreadStatesActive = 0;
} }
public void initialize(DocumentsWriter documentsWriter, FieldNumberBiMap globalFieldMap, IndexWriterConfig config) { void initialize(DocumentsWriter documentsWriter, FieldNumberBiMap globalFieldMap, IndexWriterConfig config) {
this.documentsWriter.set(documentsWriter); // thread pool is bound to DW this.documentsWriter.set(documentsWriter); // thread pool is bound to DW
this.globalFieldMap.set(globalFieldMap); this.globalFieldMap.set(globalFieldMap);
for (int i = 0; i < threadStates.length; i++) { for (int i = 0; i < threadStates.length; i++) {
@ -148,14 +148,14 @@ public abstract class DocumentsWriterPerThreadPool {
* Returns the max number of {@link ThreadState} instances available in this * Returns the max number of {@link ThreadState} instances available in this
* {@link DocumentsWriterPerThreadPool} * {@link DocumentsWriterPerThreadPool}
*/ */
public int getMaxThreadStates() { int getMaxThreadStates() {
return threadStates.length; return threadStates.length;
} }
/** /**
* Returns the active number of {@link ThreadState} instances. * Returns the active number of {@link ThreadState} instances.
*/ */
public int getActiveThreadState() { int getActiveThreadState() {
return numThreadStatesActive; return numThreadStatesActive;
} }
@ -169,7 +169,7 @@ public abstract class DocumentsWriterPerThreadPool {
* @return a new {@link ThreadState} iff any new state is available otherwise * @return a new {@link ThreadState} iff any new state is available otherwise
* <code>null</code> * <code>null</code>
*/ */
public synchronized ThreadState newThreadState() { synchronized ThreadState newThreadState() {
if (numThreadStatesActive < threadStates.length) { if (numThreadStatesActive < threadStates.length) {
final ThreadState threadState = threadStates[numThreadStatesActive]; final ThreadState threadState = threadStates[numThreadStatesActive];
threadState.lock(); // lock so nobody else will get this ThreadState threadState.lock(); // lock so nobody else will get this ThreadState
@ -211,7 +211,7 @@ public abstract class DocumentsWriterPerThreadPool {
/** /**
* Deactivate all unreleased threadstates * Deactivate all unreleased threadstates
*/ */
protected synchronized void deactivateUnreleasedStates() { synchronized void deactivateUnreleasedStates() {
for (int i = numThreadStatesActive; i < threadStates.length; i++) { for (int i = numThreadStatesActive; i < threadStates.length; i++) {
final ThreadState threadState = threadStates[i]; final ThreadState threadState = threadStates[i];
threadState.lock(); threadState.lock();
@ -223,7 +223,7 @@ public abstract class DocumentsWriterPerThreadPool {
} }
} }
protected DocumentsWriterPerThread replaceForFlush(ThreadState threadState, boolean closed) { DocumentsWriterPerThread replaceForFlush(ThreadState threadState, boolean closed) {
assert threadState.isHeldByCurrentThread(); assert threadState.isHeldByCurrentThread();
assert globalFieldMap.get() != null; assert globalFieldMap.get() != null;
final DocumentsWriterPerThread dwpt = threadState.dwpt; final DocumentsWriterPerThread dwpt = threadState.dwpt;
@ -238,7 +238,7 @@ public abstract class DocumentsWriterPerThreadPool {
return dwpt; return dwpt;
} }
public void recycle(DocumentsWriterPerThread dwpt) { void recycle(DocumentsWriterPerThread dwpt) {
// don't recycle DWPT by default // don't recycle DWPT by default
} }
@ -266,7 +266,7 @@ public abstract class DocumentsWriterPerThreadPool {
* waiting to acquire its lock or <code>null</code> if no {@link ThreadState} * waiting to acquire its lock or <code>null</code> if no {@link ThreadState}
* is yet visible to the calling thread. * is yet visible to the calling thread.
*/ */
protected ThreadState minContendedThreadState() { ThreadState minContendedThreadState() {
ThreadState minThreadState = null; ThreadState minThreadState = null;
final int limit = numThreadStatesActive; final int limit = numThreadStatesActive;
for (int i = 0; i < limit; i++) { for (int i = 0; i < limit; i++) {

View File

@ -47,7 +47,7 @@ import org.apache.lucene.index.DocumentsWriterPerThreadPool.ThreadState;
* pending iff the global active RAM consumption is >= the configured max RAM * pending iff the global active RAM consumption is >= the configured max RAM
* buffer. * buffer.
*/ */
public class FlushByRamOrCountsPolicy extends FlushPolicy { class FlushByRamOrCountsPolicy extends FlushPolicy {
@Override @Override
public void onDelete(DocumentsWriterFlushControl control, ThreadState state) { public void onDelete(DocumentsWriterFlushControl control, ThreadState state) {

View File

@ -50,7 +50,7 @@ import org.apache.lucene.util.SetOnce;
* @see DocumentsWriterPerThread * @see DocumentsWriterPerThread
* @see IndexWriterConfig#setFlushPolicy(FlushPolicy) * @see IndexWriterConfig#setFlushPolicy(FlushPolicy)
*/ */
public abstract class FlushPolicy { abstract class FlushPolicy {
protected final SetOnce<DocumentsWriter> writer = new SetOnce<DocumentsWriter>(); protected final SetOnce<DocumentsWriter> writer = new SetOnce<DocumentsWriter>();
protected IndexWriterConfig indexWriterConfig; protected IndexWriterConfig indexWriterConfig;

View File

@ -571,8 +571,8 @@ public final class IndexWriterConfig implements Cloneable {
* </p> * </p>
* <p> * <p>
* NOTE: This only takes effect when IndexWriter is first created.</p>*/ * NOTE: This only takes effect when IndexWriter is first created.</p>*/
public IndexWriterConfig setIndexerThreadPool(DocumentsWriterPerThreadPool threadPool) { IndexWriterConfig setIndexerThreadPool(DocumentsWriterPerThreadPool threadPool) {
if(threadPool == null) { if (threadPool == null) {
throw new IllegalArgumentException("DocumentsWriterPerThreadPool must not be nul"); throw new IllegalArgumentException("DocumentsWriterPerThreadPool must not be nul");
} }
this.indexerThreadPool = threadPool; this.indexerThreadPool = threadPool;
@ -582,10 +582,32 @@ public final class IndexWriterConfig implements Cloneable {
/** Returns the configured {@link DocumentsWriterPerThreadPool} instance. /** Returns the configured {@link DocumentsWriterPerThreadPool} instance.
* @see #setIndexerThreadPool(DocumentsWriterPerThreadPool) * @see #setIndexerThreadPool(DocumentsWriterPerThreadPool)
* @return the configured {@link DocumentsWriterPerThreadPool} instance.*/ * @return the configured {@link DocumentsWriterPerThreadPool} instance.*/
public DocumentsWriterPerThreadPool getIndexerThreadPool() { DocumentsWriterPerThreadPool getIndexerThreadPool() {
return this.indexerThreadPool; return this.indexerThreadPool;
} }
/**
* Sets the max number of simultaneous threads that may be indexing documents
* at once in IndexWriter. Values &lt; 1 are invalid and if passed
* <code>maxThreadStates</code> will be set to
* {@link #DEFAULT_MAX_THREAD_STATES}.
*
* <p>Only takes effect when IndexWriter is first created. */
public IndexWriterConfig setMaxThreadStates(int maxThreadStates) {
this.indexerThreadPool = new ThreadAffinityDocumentsWriterThreadPool(maxThreadStates);
return this;
}
/** Returns the max number of simultaneous threads that
* may be indexing documents at once in IndexWriter. */
public int getMaxThreadStates() {
try {
return ((ThreadAffinityDocumentsWriterThreadPool) indexerThreadPool).getMaxThreadStates();
} catch (ClassCastException cce) {
throw new IllegalStateException(cce);
}
}
/** By default, IndexWriter does not pool the /** By default, IndexWriter does not pool the
* SegmentReaders it must open for deletions and * SegmentReaders it must open for deletions and
* merging, unless a near-real-time reader has been * merging, unless a near-real-time reader has been

View File

@ -30,7 +30,7 @@ import org.apache.lucene.index.DocumentsWriterPerThreadPool.ThreadState; //javad
* {@link ThreadAffinityDocumentsWriterThreadPool} tries to find the currently * {@link ThreadAffinityDocumentsWriterThreadPool} tries to find the currently
* minimal contended {@link ThreadState}. * minimal contended {@link ThreadState}.
*/ */
public class ThreadAffinityDocumentsWriterThreadPool extends DocumentsWriterPerThreadPool { class ThreadAffinityDocumentsWriterThreadPool extends DocumentsWriterPerThreadPool {
private Map<Thread, ThreadState> threadBindings = new ConcurrentHashMap<Thread, ThreadState>(); private Map<Thread, ThreadState> threadBindings = new ConcurrentHashMap<Thread, ThreadState>();
/** /**

View File

@ -20,7 +20,7 @@ import java.util.Random;
/** /**
* *
* A {@link DocumentsWriterPerThreadPool} that selects thread states at random. * A <code>DocumentsWriterPerThreadPool<code> that selects thread states at random.
* *
* @lucene.internal * @lucene.internal
* @lucene.experimental * @lucene.experimental

View File

@ -26,6 +26,7 @@ import java.lang.annotation.Inherited;
import java.lang.annotation.Retention; import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy; import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target; import java.lang.annotation.Target;
import java.lang.reflect.Method;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
@ -35,8 +36,8 @@ import java.util.IdentityHashMap;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Locale; import java.util.Locale;
import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.Map;
import java.util.Random; import java.util.Random;
import java.util.Set; import java.util.Set;
import java.util.TimeZone; import java.util.TimeZone;
@ -53,11 +54,11 @@ import org.apache.lucene.document.Field;
import org.apache.lucene.document.FieldType; import org.apache.lucene.document.FieldType;
import org.apache.lucene.index.AtomicReader; import org.apache.lucene.index.AtomicReader;
import org.apache.lucene.index.CompositeReader; import org.apache.lucene.index.CompositeReader;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.FieldFilterAtomicReader; import org.apache.lucene.index.FieldFilterAtomicReader;
import org.apache.lucene.index.FieldInfo; import org.apache.lucene.index.FieldInfo;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexReader.ReaderClosedListener; import org.apache.lucene.index.IndexReader.ReaderClosedListener;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriterConfig; import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.LogByteSizeMergePolicy; import org.apache.lucene.index.LogByteSizeMergePolicy;
import org.apache.lucene.index.LogDocMergePolicy; import org.apache.lucene.index.LogDocMergePolicy;
@ -70,25 +71,24 @@ import org.apache.lucene.index.RandomDocumentsWriterPerThreadPool;
import org.apache.lucene.index.SegmentReader; import org.apache.lucene.index.SegmentReader;
import org.apache.lucene.index.SerialMergeScheduler; import org.apache.lucene.index.SerialMergeScheduler;
import org.apache.lucene.index.SlowCompositeReaderWrapper; import org.apache.lucene.index.SlowCompositeReaderWrapper;
import org.apache.lucene.index.ThreadAffinityDocumentsWriterThreadPool;
import org.apache.lucene.index.TieredMergePolicy; import org.apache.lucene.index.TieredMergePolicy;
import org.apache.lucene.search.AssertingIndexSearcher; import org.apache.lucene.search.AssertingIndexSearcher;
import org.apache.lucene.search.BooleanQuery; import org.apache.lucene.search.BooleanQuery;
import org.apache.lucene.search.FieldCache;
import org.apache.lucene.search.FieldCache.CacheEntry; import org.apache.lucene.search.FieldCache.CacheEntry;
import org.apache.lucene.search.FieldCache;
import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.QueryUtils.FCInvisibleMultiReader;
import org.apache.lucene.search.RandomSimilarityProvider; import org.apache.lucene.search.RandomSimilarityProvider;
import org.apache.lucene.search.similarities.DefaultSimilarity; import org.apache.lucene.search.similarities.DefaultSimilarity;
import org.apache.lucene.search.similarities.Similarity; import org.apache.lucene.search.similarities.Similarity;
import org.apache.lucene.search.QueryUtils.FCInvisibleMultiReader;
import org.apache.lucene.store.Directory; import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory; import org.apache.lucene.store.FSDirectory;
import org.apache.lucene.store.FlushInfo; import org.apache.lucene.store.FlushInfo;
import org.apache.lucene.store.IOContext; import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.LockFactory; import org.apache.lucene.store.LockFactory;
import org.apache.lucene.store.MergeInfo; import org.apache.lucene.store.MergeInfo;
import org.apache.lucene.store.MockDirectoryWrapper;
import org.apache.lucene.store.MockDirectoryWrapper.Throttling; import org.apache.lucene.store.MockDirectoryWrapper.Throttling;
import org.apache.lucene.store.MockDirectoryWrapper;
import org.apache.lucene.store.NRTCachingDirectory; import org.apache.lucene.store.NRTCachingDirectory;
import org.apache.lucene.util.FieldCacheSanityChecker.Insanity; import org.apache.lucene.util.FieldCacheSanityChecker.Insanity;
import org.junit.After; import org.junit.After;
@ -108,7 +108,6 @@ import org.junit.runner.Runner;
import org.junit.runner.notification.RunListener; import org.junit.runner.notification.RunListener;
import org.junit.runners.model.MultipleFailureException; import org.junit.runners.model.MultipleFailureException;
import org.junit.runners.model.Statement; import org.junit.runners.model.Statement;
import com.carrotsearch.randomizedtesting.JUnit4MethodProvider; import com.carrotsearch.randomizedtesting.JUnit4MethodProvider;
import com.carrotsearch.randomizedtesting.MixWithSuiteName; import com.carrotsearch.randomizedtesting.MixWithSuiteName;
import com.carrotsearch.randomizedtesting.RandomizedContext; import com.carrotsearch.randomizedtesting.RandomizedContext;
@ -1068,12 +1067,37 @@ public abstract class LuceneTestCase extends Assert {
if (r.nextBoolean()) { if (r.nextBoolean()) {
int maxNumThreadStates = rarely(r) ? _TestUtil.nextInt(r, 5, 20) // crazy value int maxNumThreadStates = rarely(r) ? _TestUtil.nextInt(r, 5, 20) // crazy value
: _TestUtil.nextInt(r, 1, 4); // reasonable value : _TestUtil.nextInt(r, 1, 4); // reasonable value
if (rarely(r)) {
// random thread pool Method setIndexerThreadPoolMethod = null;
c.setIndexerThreadPool(new RandomDocumentsWriterPerThreadPool(maxNumThreadStates, r)); try {
} else { // Retrieve the package-private setIndexerThreadPool
// random thread pool // method:
c.setIndexerThreadPool(new ThreadAffinityDocumentsWriterThreadPool(maxNumThreadStates)); for(Method m : IndexWriterConfig.class.getDeclaredMethods()) {
if (m.getName().equals("setIndexerThreadPool")) {
m.setAccessible(true);
setIndexerThreadPoolMethod = m;
break;
}
}
} catch (Exception e) {
// Should not happen?
throw new RuntimeException(e);
}
if (setIndexerThreadPoolMethod == null) {
throw new RuntimeException("failed to lookup IndexWriterConfig.setIndexerThreadPool method");
}
try {
if (rarely(r)) {
// random thread pool
setIndexerThreadPoolMethod.invoke(c, new RandomDocumentsWriterPerThreadPool(maxNumThreadStates, r));
} else {
// random thread pool
c.setMaxThreadStates(maxNumThreadStates);
}
} catch (Exception e) {
throw new RuntimeException(e);
} }
} }