mirror of https://github.com/apache/lucene.git
LUCENE-3773: minor improvements to DWPTThreadPool
git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1243268 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
f68149b978
commit
d5056929d0
|
@ -19,7 +19,6 @@ package org.apache.lucene.index;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.util.Collection;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
|
@ -213,18 +212,18 @@ final class DocumentsWriter {
|
|||
infoStream.message("DW", "abort");
|
||||
}
|
||||
|
||||
final Iterator<ThreadState> threadsIterator = perThreadPool.getActivePerThreadsIterator();
|
||||
while (threadsIterator.hasNext()) {
|
||||
final ThreadState perThread = threadsIterator.next();
|
||||
final int limit = perThreadPool.getActiveThreadState();
|
||||
for (int i = 0; i < limit; i++) {
|
||||
final ThreadState perThread = perThreadPool.getThreadState(i);
|
||||
perThread.lock();
|
||||
try {
|
||||
if (perThread.isActive()) { // we might be closed
|
||||
try {
|
||||
perThread.perThread.abort();
|
||||
perThread.dwpt.abort();
|
||||
} catch (IOException ex) {
|
||||
// continue
|
||||
} finally {
|
||||
perThread.perThread.checkAndResetHasAborted();
|
||||
perThread.dwpt.checkAndResetHasAborted();
|
||||
flushControl.doOnAbort(perThread);
|
||||
}
|
||||
} else {
|
||||
|
@ -338,7 +337,7 @@ final class DocumentsWriter {
|
|||
assert false: "perThread is not active but we are still open";
|
||||
}
|
||||
|
||||
final DocumentsWriterPerThread dwpt = perThread.perThread;
|
||||
final DocumentsWriterPerThread dwpt = perThread.dwpt;
|
||||
try {
|
||||
final int docCount = dwpt.updateDocuments(docs, analyzer, delTerm);
|
||||
numDocsInRAM.addAndGet(docCount);
|
||||
|
@ -372,7 +371,7 @@ final class DocumentsWriter {
|
|||
assert false: "perThread is not active but we are still open";
|
||||
}
|
||||
|
||||
final DocumentsWriterPerThread dwpt = perThread.perThread;
|
||||
final DocumentsWriterPerThread dwpt = perThread.dwpt;
|
||||
try {
|
||||
dwpt.updateDocument(doc, analyzer, delTerm);
|
||||
numDocsInRAM.incrementAndGet();
|
||||
|
@ -587,22 +586,4 @@ final class DocumentsWriter {
|
|||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
// use by IW during close to assert all DWPT are inactive after final flush
|
||||
boolean assertNoActiveDWPT() {
|
||||
Iterator<ThreadState> activePerThreadsIterator = perThreadPool.getAllPerThreadsIterator();
|
||||
while(activePerThreadsIterator.hasNext()) {
|
||||
ThreadState next = activePerThreadsIterator.next();
|
||||
next.lock();
|
||||
try {
|
||||
assert !next.isActive();
|
||||
} finally {
|
||||
next.unlock();
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,8 +18,8 @@ package org.apache.lucene.index;
|
|||
*/
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.IdentityHashMap;
|
||||
import java.util.List;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.LinkedList;
|
||||
import java.util.Queue;
|
||||
|
@ -51,6 +51,8 @@ public final class DocumentsWriterFlushControl {
|
|||
private final Queue<DocumentsWriterPerThread> flushQueue = new LinkedList<DocumentsWriterPerThread>();
|
||||
// only for safety reasons if a DWPT is close to the RAM limit
|
||||
private final Queue<BlockedFlush> blockedFlushes = new LinkedList<BlockedFlush>();
|
||||
private final IdentityHashMap<DocumentsWriterPerThread, Long> flushingWriters = new IdentityHashMap<DocumentsWriterPerThread, Long>();
|
||||
|
||||
|
||||
double maxConfiguredRamBuffer = 0;
|
||||
long peakActiveBytes = 0;// only with assert
|
||||
|
@ -61,7 +63,6 @@ public final class DocumentsWriterFlushControl {
|
|||
private final DocumentsWriterPerThreadPool perThreadPool;
|
||||
private final FlushPolicy flushPolicy;
|
||||
private boolean closed = false;
|
||||
private final HashMap<DocumentsWriterPerThread, Long> flushingWriters = new HashMap<DocumentsWriterPerThread, Long>();
|
||||
private final DocumentsWriter documentsWriter;
|
||||
private final IndexWriterConfig config;
|
||||
|
||||
|
@ -122,7 +123,7 @@ public final class DocumentsWriterFlushControl {
|
|||
}
|
||||
|
||||
private void commitPerThreadBytes(ThreadState perThread) {
|
||||
final long delta = perThread.perThread.bytesUsed()
|
||||
final long delta = perThread.dwpt.bytesUsed()
|
||||
- perThread.bytesUsed;
|
||||
perThread.bytesUsed += delta;
|
||||
/*
|
||||
|
@ -212,7 +213,7 @@ public final class DocumentsWriterFlushControl {
|
|||
*/
|
||||
public synchronized void setFlushPending(ThreadState perThread) {
|
||||
assert !perThread.flushPending;
|
||||
if (perThread.perThread.getNumDocsInRAM() > 0) {
|
||||
if (perThread.dwpt.getNumDocsInRAM() > 0) {
|
||||
perThread.flushPending = true; // write access synced
|
||||
final long bytes = perThread.bytesUsed;
|
||||
flushBytes += bytes;
|
||||
|
@ -295,18 +296,21 @@ public final class DocumentsWriterFlushControl {
|
|||
}
|
||||
|
||||
DocumentsWriterPerThread nextPendingFlush() {
|
||||
int numPending;
|
||||
boolean fullFlush;
|
||||
synchronized (this) {
|
||||
final DocumentsWriterPerThread poll;
|
||||
if ((poll = flushQueue.poll()) != null) {
|
||||
stallControl.updateStalled(this);
|
||||
return poll;
|
||||
}
|
||||
fullFlush = this.fullFlush;
|
||||
numPending = this.numPending;
|
||||
}
|
||||
if (numPending > 0 && !fullFlush) { // don't check if we are doing a full flush
|
||||
final Iterator<ThreadState> allActiveThreads = perThreadPool
|
||||
.getActivePerThreadsIterator();
|
||||
while (allActiveThreads.hasNext() && numPending > 0) {
|
||||
ThreadState next = allActiveThreads.next();
|
||||
final int limit = perThreadPool.getActiveThreadState();
|
||||
for (int i = 0; i < limit && numPending > 0; i++) {
|
||||
final ThreadState next = perThreadPool.getThreadState(i);
|
||||
if (next.flushPending) {
|
||||
final DocumentsWriterPerThread dwpt = tryCheckoutForFlush(next);
|
||||
if (dwpt != null) {
|
||||
|
@ -327,9 +331,29 @@ public final class DocumentsWriterFlushControl {
|
|||
/**
|
||||
* Returns an iterator that provides access to all currently active {@link ThreadState}s
|
||||
*/
|
||||
public Iterator<ThreadState> allActiveThreads() {
|
||||
return perThreadPool.getActivePerThreadsIterator();
|
||||
public Iterator<ThreadState> allActiveThreadStates() {
|
||||
return getPerThreadsIterator(perThreadPool.getActiveThreadState());
|
||||
}
|
||||
|
||||
private Iterator<ThreadState> getPerThreadsIterator(final int upto) {
|
||||
return new Iterator<ThreadState>() {
|
||||
int i = 0;
|
||||
|
||||
public boolean hasNext() {
|
||||
return i < upto;
|
||||
}
|
||||
|
||||
public ThreadState next() {
|
||||
return perThreadPool.getThreadState(i++);
|
||||
}
|
||||
|
||||
public void remove() {
|
||||
throw new UnsupportedOperationException("remove() not supported.");
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
|
||||
|
||||
synchronized void doOnDelete() {
|
||||
// pass null this is a global delete no update
|
||||
|
@ -369,7 +393,7 @@ public final class DocumentsWriterFlushControl {
|
|||
boolean success = false;
|
||||
try {
|
||||
if (perThread.isActive()
|
||||
&& perThread.perThread.deleteQueue != documentsWriter.deleteQueue) {
|
||||
&& perThread.dwpt.deleteQueue != documentsWriter.deleteQueue) {
|
||||
// There is a flush-all in process and this DWPT is
|
||||
// now stale -- enroll it for flush and try for
|
||||
// another DWPT:
|
||||
|
@ -397,23 +421,23 @@ public final class DocumentsWriterFlushControl {
|
|||
DocumentsWriterDeleteQueue newQueue = new DocumentsWriterDeleteQueue(flushingQueue.generation+1);
|
||||
documentsWriter.deleteQueue = newQueue;
|
||||
}
|
||||
final Iterator<ThreadState> allActiveThreads = perThreadPool.getActivePerThreadsIterator();
|
||||
while (allActiveThreads.hasNext()) {
|
||||
final ThreadState next = allActiveThreads.next();
|
||||
final int limit = perThreadPool.getActiveThreadState();
|
||||
for (int i = 0; i < limit; i++) {
|
||||
final ThreadState next = perThreadPool.getThreadState(i);
|
||||
next.lock();
|
||||
try {
|
||||
if (!next.isActive()) {
|
||||
continue;
|
||||
}
|
||||
assert next.perThread.deleteQueue == flushingQueue
|
||||
|| next.perThread.deleteQueue == documentsWriter.deleteQueue : " flushingQueue: "
|
||||
assert next.dwpt.deleteQueue == flushingQueue
|
||||
|| next.dwpt.deleteQueue == documentsWriter.deleteQueue : " flushingQueue: "
|
||||
+ flushingQueue
|
||||
+ " currentqueue: "
|
||||
+ documentsWriter.deleteQueue
|
||||
+ " perThread queue: "
|
||||
+ next.perThread.deleteQueue
|
||||
+ " numDocsInRam: " + next.perThread.getNumDocsInRAM();
|
||||
if (next.perThread.deleteQueue != flushingQueue) {
|
||||
+ next.dwpt.deleteQueue
|
||||
+ " numDocsInRam: " + next.dwpt.getNumDocsInRAM();
|
||||
if (next.dwpt.deleteQueue != flushingQueue) {
|
||||
// this one is already a new DWPT
|
||||
continue;
|
||||
}
|
||||
|
@ -437,12 +461,12 @@ public final class DocumentsWriterFlushControl {
|
|||
}
|
||||
|
||||
private boolean assertActiveDeleteQueue(DocumentsWriterDeleteQueue queue) {
|
||||
final Iterator<ThreadState> allActiveThreads = perThreadPool.getActivePerThreadsIterator();
|
||||
while (allActiveThreads.hasNext()) {
|
||||
final ThreadState next = allActiveThreads.next();
|
||||
final int limit = perThreadPool.getActiveThreadState();
|
||||
for (int i = 0; i < limit; i++) {
|
||||
final ThreadState next = perThreadPool.getThreadState(i);
|
||||
next.lock();
|
||||
try {
|
||||
assert !next.isActive() || next.perThread.deleteQueue == queue;
|
||||
assert !next.isActive() || next.dwpt.deleteQueue == queue;
|
||||
} finally {
|
||||
next.unlock();
|
||||
}
|
||||
|
@ -454,9 +478,9 @@ public final class DocumentsWriterFlushControl {
|
|||
|
||||
void addFlushableState(ThreadState perThread) {
|
||||
if (documentsWriter.infoStream.isEnabled("DWFC")) {
|
||||
documentsWriter.infoStream.message("DWFC", Thread.currentThread().getName() + ": addFlushableState " + perThread.perThread);
|
||||
documentsWriter.infoStream.message("DWFC", Thread.currentThread().getName() + ": addFlushableState " + perThread.dwpt);
|
||||
}
|
||||
final DocumentsWriterPerThread dwpt = perThread.perThread;
|
||||
final DocumentsWriterPerThread dwpt = perThread.dwpt;
|
||||
assert perThread.isHeldByCurrentThread();
|
||||
assert perThread.isActive();
|
||||
assert fullFlush;
|
||||
|
@ -473,9 +497,9 @@ public final class DocumentsWriterFlushControl {
|
|||
}
|
||||
} else {
|
||||
if (closed) {
|
||||
perThread.resetWriter(null); // make this state inactive
|
||||
perThreadPool.deactivateThreadState(perThread); // make this state inactive
|
||||
} else {
|
||||
dwpt.initialize();
|
||||
perThreadPool.reinitThreadState(perThread);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -597,4 +621,6 @@ public final class DocumentsWriterFlushControl {
|
|||
boolean anyStalledThreads() {
|
||||
return stallControl.anyStalledThreads();
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
|
|
@ -16,7 +16,6 @@ package org.apache.lucene.index;
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import java.util.Iterator;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
|
||||
import org.apache.lucene.index.FieldInfos.FieldNumberBiMap;
|
||||
|
@ -38,11 +37,6 @@ import org.apache.lucene.util.SetOnce;
|
|||
* </p>
|
||||
*/
|
||||
public abstract class DocumentsWriterPerThreadPool {
|
||||
/** The maximum number of simultaneous threads that may be
|
||||
* indexing documents at once in IndexWriter; if more
|
||||
* than this many threads arrive they will wait for
|
||||
* others to finish. */
|
||||
public final static int DEFAULT_MAX_THREAD_STATES = 8;
|
||||
|
||||
/**
|
||||
* {@link ThreadState} references and guards a
|
||||
|
@ -57,17 +51,18 @@ public abstract class DocumentsWriterPerThreadPool {
|
|||
*/
|
||||
@SuppressWarnings("serial")
|
||||
public final static class ThreadState extends ReentrantLock {
|
||||
// package private for FlushPolicy
|
||||
DocumentsWriterPerThread perThread;
|
||||
DocumentsWriterPerThread dwpt;
|
||||
// TODO this should really be part of DocumentsWriterFlushControl
|
||||
// write access guarded by DocumentsWriterFlushControl
|
||||
volatile boolean flushPending = false;
|
||||
// TODO this should really be part of DocumentsWriterFlushControl
|
||||
// write access guarded by DocumentsWriterFlushControl
|
||||
long bytesUsed = 0;
|
||||
// guarded by Reentrant lock
|
||||
private boolean isActive = true;
|
||||
|
||||
ThreadState(DocumentsWriterPerThread perThread) {
|
||||
this.perThread = perThread;
|
||||
ThreadState(DocumentsWriterPerThread dpwt) {
|
||||
this.dwpt = dpwt;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -76,12 +71,12 @@ public abstract class DocumentsWriterPerThreadPool {
|
|||
* for indexing anymore.
|
||||
* @see #isActive()
|
||||
*/
|
||||
void resetWriter(DocumentsWriterPerThread perThread) {
|
||||
private void resetWriter(DocumentsWriterPerThread dwpt) {
|
||||
assert this.isHeldByCurrentThread();
|
||||
if (perThread == null) {
|
||||
if (dwpt == null) {
|
||||
isActive = false;
|
||||
}
|
||||
this.perThread = perThread;
|
||||
this.dwpt = dwpt;
|
||||
this.bytesUsed = 0;
|
||||
this.flushPending = false;
|
||||
}
|
||||
|
@ -112,7 +107,7 @@ public abstract class DocumentsWriterPerThreadPool {
|
|||
public DocumentsWriterPerThread getDocumentsWriterPerThread() {
|
||||
assert this.isHeldByCurrentThread();
|
||||
// public for FlushPolicy
|
||||
return perThread;
|
||||
return dwpt;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -124,40 +119,37 @@ public abstract class DocumentsWriterPerThreadPool {
|
|||
}
|
||||
}
|
||||
|
||||
private final ThreadState[] perThreads;
|
||||
private final ThreadState[] threadStates;
|
||||
private volatile int numThreadStatesActive;
|
||||
private FieldNumberBiMap globalFieldMap;
|
||||
private final SetOnce<FieldNumberBiMap> globalFieldMap = new SetOnce<FieldNumberBiMap>();
|
||||
private final SetOnce<DocumentsWriter> documentsWriter = new SetOnce<DocumentsWriter>();
|
||||
|
||||
/**
|
||||
* Creates a new {@link DocumentsWriterPerThreadPool} with max.
|
||||
* {@link #DEFAULT_MAX_THREAD_STATES} thread states.
|
||||
* Creates a new {@link DocumentsWriterPerThreadPool} with a given maximum of {@link ThreadState}s.
|
||||
*/
|
||||
public DocumentsWriterPerThreadPool() {
|
||||
this(DEFAULT_MAX_THREAD_STATES);
|
||||
}
|
||||
|
||||
public DocumentsWriterPerThreadPool(int maxNumPerThreads) {
|
||||
maxNumPerThreads = (maxNumPerThreads < 1) ? DEFAULT_MAX_THREAD_STATES : maxNumPerThreads;
|
||||
perThreads = new ThreadState[maxNumPerThreads];
|
||||
public DocumentsWriterPerThreadPool(int maxNumThreadStates) {
|
||||
if (maxNumThreadStates < 1) {
|
||||
throw new IllegalArgumentException("maxNumThreadStates must be >= 1 but was: " + maxNumThreadStates);
|
||||
}
|
||||
threadStates = new ThreadState[maxNumThreadStates];
|
||||
numThreadStatesActive = 0;
|
||||
}
|
||||
|
||||
public void initialize(DocumentsWriter documentsWriter, FieldNumberBiMap globalFieldMap, IndexWriterConfig config) {
|
||||
this.documentsWriter.set(documentsWriter); // thread pool is bound to DW
|
||||
this.globalFieldMap = globalFieldMap;
|
||||
for (int i = 0; i < perThreads.length; i++) {
|
||||
this.globalFieldMap.set(globalFieldMap);
|
||||
for (int i = 0; i < threadStates.length; i++) {
|
||||
final FieldInfos infos = new FieldInfos(globalFieldMap);
|
||||
perThreads[i] = new ThreadState(new DocumentsWriterPerThread(documentsWriter.directory, documentsWriter, infos, documentsWriter.chain));
|
||||
threadStates[i] = new ThreadState(new DocumentsWriterPerThread(documentsWriter.directory, documentsWriter, infos, documentsWriter.chain));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Returns the max number of {@link ThreadState} instances available in this
|
||||
* {@link DocumentsWriterPerThreadPool}
|
||||
*/
|
||||
public int getMaxThreadStates() {
|
||||
return perThreads.length;
|
||||
return threadStates.length;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -178,16 +170,16 @@ public abstract class DocumentsWriterPerThreadPool {
|
|||
* <code>null</code>
|
||||
*/
|
||||
public synchronized ThreadState newThreadState() {
|
||||
if (numThreadStatesActive < perThreads.length) {
|
||||
final ThreadState threadState = perThreads[numThreadStatesActive];
|
||||
if (numThreadStatesActive < threadStates.length) {
|
||||
final ThreadState threadState = threadStates[numThreadStatesActive];
|
||||
threadState.lock(); // lock so nobody else will get this ThreadState
|
||||
boolean unlock = true;
|
||||
try {
|
||||
if (threadState.isActive()) {
|
||||
// unreleased thread states are deactivated during DW#close()
|
||||
numThreadStatesActive++; // increment will publish the ThreadState
|
||||
assert threadState.perThread != null;
|
||||
threadState.perThread.initialize();
|
||||
assert threadState.dwpt != null;
|
||||
threadState.dwpt.initialize();
|
||||
unlock = false;
|
||||
return threadState;
|
||||
}
|
||||
|
@ -205,12 +197,12 @@ public abstract class DocumentsWriterPerThreadPool {
|
|||
}
|
||||
|
||||
private synchronized boolean assertUnreleasedThreadStatesInactive() {
|
||||
for (int i = numThreadStatesActive; i < perThreads.length; i++) {
|
||||
assert perThreads[i].tryLock() : "unreleased threadstate should not be locked";
|
||||
for (int i = numThreadStatesActive; i < threadStates.length; i++) {
|
||||
assert threadStates[i].tryLock() : "unreleased threadstate should not be locked";
|
||||
try {
|
||||
assert !perThreads[i].isActive() : "expected unreleased thread state to be inactive";
|
||||
assert !threadStates[i].isActive() : "expected unreleased thread state to be inactive";
|
||||
} finally {
|
||||
perThreads[i].unlock();
|
||||
threadStates[i].unlock();
|
||||
}
|
||||
}
|
||||
return true;
|
||||
|
@ -220,8 +212,8 @@ public abstract class DocumentsWriterPerThreadPool {
|
|||
* Deactivate all unreleased threadstates
|
||||
*/
|
||||
protected synchronized void deactivateUnreleasedStates() {
|
||||
for (int i = numThreadStatesActive; i < perThreads.length; i++) {
|
||||
final ThreadState threadState = perThreads[i];
|
||||
for (int i = numThreadStatesActive; i < threadStates.length; i++) {
|
||||
final ThreadState threadState = threadStates[i];
|
||||
threadState.lock();
|
||||
try {
|
||||
threadState.resetWriter(null);
|
||||
|
@ -233,9 +225,10 @@ public abstract class DocumentsWriterPerThreadPool {
|
|||
|
||||
protected DocumentsWriterPerThread replaceForFlush(ThreadState threadState, boolean closed) {
|
||||
assert threadState.isHeldByCurrentThread();
|
||||
final DocumentsWriterPerThread dwpt = threadState.perThread;
|
||||
assert globalFieldMap.get() != null;
|
||||
final DocumentsWriterPerThread dwpt = threadState.dwpt;
|
||||
if (!closed) {
|
||||
final FieldInfos infos = new FieldInfos(globalFieldMap);
|
||||
final FieldInfos infos = new FieldInfos(globalFieldMap.get());
|
||||
final DocumentsWriterPerThread newDwpt = new DocumentsWriterPerThread(dwpt, infos);
|
||||
newDwpt.initialize();
|
||||
threadState.resetWriter(newDwpt);
|
||||
|
@ -251,45 +244,19 @@ public abstract class DocumentsWriterPerThreadPool {
|
|||
|
||||
public abstract ThreadState getAndLock(Thread requestingThread, DocumentsWriter documentsWriter);
|
||||
|
||||
|
||||
/**
|
||||
* Returns an iterator providing access to all {@link ThreadState}
|
||||
* instances.
|
||||
*/
|
||||
// TODO: new Iterator per indexed doc is overkill...?
|
||||
public Iterator<ThreadState> getAllPerThreadsIterator() {
|
||||
return getPerThreadsIterator(this.perThreads.length);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns an iterator providing access to all active {@link ThreadState}
|
||||
* instances.
|
||||
* <p>
|
||||
* Note: The returned iterator will only iterator
|
||||
* {@link ThreadState}s that are active at the point in time when this method
|
||||
* has been called.
|
||||
* Returns the <i>i</i>th active {@link ThreadState} where <i>i</i> is the
|
||||
* given ord.
|
||||
*
|
||||
* @param ord
|
||||
* the ordinal of the {@link ThreadState}
|
||||
* @return the <i>i</i>th active {@link ThreadState} where <i>i</i> is the
|
||||
* given ord.
|
||||
*/
|
||||
// TODO: new Iterator per indexed doc is overkill...?
|
||||
public Iterator<ThreadState> getActivePerThreadsIterator() {
|
||||
return getPerThreadsIterator(numThreadStatesActive);
|
||||
}
|
||||
|
||||
private Iterator<ThreadState> getPerThreadsIterator(final int upto) {
|
||||
return new Iterator<ThreadState>() {
|
||||
int i = 0;
|
||||
|
||||
public boolean hasNext() {
|
||||
return i < upto;
|
||||
}
|
||||
|
||||
public ThreadState next() {
|
||||
return perThreads[i++];
|
||||
}
|
||||
|
||||
public void remove() {
|
||||
throw new UnsupportedOperationException("remove() not supported.");
|
||||
}
|
||||
};
|
||||
ThreadState getThreadState(int ord) {
|
||||
assert ord < numThreadStatesActive;
|
||||
return threadStates[ord];
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -299,14 +266,59 @@ public abstract class DocumentsWriterPerThreadPool {
|
|||
*/
|
||||
protected ThreadState minContendedThreadState() {
|
||||
ThreadState minThreadState = null;
|
||||
// TODO: new Iterator per indexed doc is overkill...?
|
||||
final Iterator<ThreadState> it = getActivePerThreadsIterator();
|
||||
while (it.hasNext()) {
|
||||
final ThreadState state = it.next();
|
||||
final int limit = numThreadStatesActive;
|
||||
for (int i = 0; i < limit; i++) {
|
||||
final ThreadState state = threadStates[i];
|
||||
if (minThreadState == null || state.getQueueLength() < minThreadState.getQueueLength()) {
|
||||
minThreadState = state;
|
||||
}
|
||||
}
|
||||
return minThreadState;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the number of currently deactivated {@link ThreadState} instances.
|
||||
* A deactivated {@link ThreadState} should not be used for indexing anymore.
|
||||
*
|
||||
* @return the number of currently deactivated {@link ThreadState} instances.
|
||||
*/
|
||||
int numDeactivatedThreadStates() {
|
||||
int count = 0;
|
||||
for (int i = 0; i < threadStates.length; i++) {
|
||||
final ThreadState threadState = threadStates[i];
|
||||
threadState.lock();
|
||||
try {
|
||||
if (!threadState.isActive) {
|
||||
count++;
|
||||
}
|
||||
} finally {
|
||||
threadState.unlock();
|
||||
}
|
||||
}
|
||||
return count;
|
||||
}
|
||||
|
||||
/**
|
||||
* Deactivates an active {@link ThreadState}. Inactive {@link ThreadState} can
|
||||
* not be used for indexing anymore once they are deactivated. This method should only be used
|
||||
* if the parent {@link DocumentsWriter} is closed or aborted.
|
||||
*
|
||||
* @param threadState the state to deactivate
|
||||
*/
|
||||
void deactivateThreadState(ThreadState threadState) {
|
||||
assert threadState.isActive();
|
||||
threadState.resetWriter(null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Reinitialized an active {@link ThreadState}. A {@link ThreadState} should
|
||||
* only be reinitialized if it is active without any pending documents.
|
||||
*
|
||||
* @param threadState the state to reinitialize
|
||||
*/
|
||||
void reinitThreadState(ThreadState threadState) {
|
||||
assert threadState.isActive;
|
||||
assert threadState.dwpt.getNumDocsInRAM() == 0;
|
||||
threadState.dwpt.initialize();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -72,7 +72,7 @@ public class FlushByRamOrCountsPolicy extends FlushPolicy {
|
|||
@Override
|
||||
public void onInsert(DocumentsWriterFlushControl control, ThreadState state) {
|
||||
if (flushOnDocCount()
|
||||
&& state.perThread.getNumDocsInRAM() >= indexWriterConfig
|
||||
&& state.dwpt.getNumDocsInRAM() >= indexWriterConfig
|
||||
.getMaxBufferedDocs()) {
|
||||
// Flush this state by num docs
|
||||
control.setFlushPending(state);
|
||||
|
|
|
@ -75,9 +75,7 @@ public abstract class FlushPolicy {
|
|||
*/
|
||||
public void onUpdate(DocumentsWriterFlushControl control, ThreadState state) {
|
||||
onInsert(control, state);
|
||||
if (!state.flushPending) {
|
||||
onDelete(control, state);
|
||||
}
|
||||
onDelete(control, state);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -107,17 +105,17 @@ public abstract class FlushPolicy {
|
|||
*/
|
||||
protected ThreadState findLargestNonPendingWriter(
|
||||
DocumentsWriterFlushControl control, ThreadState perThreadState) {
|
||||
assert perThreadState.perThread.getNumDocsInRAM() > 0;
|
||||
assert perThreadState.dwpt.getNumDocsInRAM() > 0;
|
||||
long maxRamSoFar = perThreadState.bytesUsed;
|
||||
// the dwpt which needs to be flushed eventually
|
||||
ThreadState maxRamUsingThreadState = perThreadState;
|
||||
assert !perThreadState.flushPending : "DWPT should have flushed";
|
||||
Iterator<ThreadState> activePerThreadsIterator = control.allActiveThreads();
|
||||
Iterator<ThreadState> activePerThreadsIterator = control.allActiveThreadStates();
|
||||
while (activePerThreadsIterator.hasNext()) {
|
||||
ThreadState next = activePerThreadsIterator.next();
|
||||
if (!next.flushPending) {
|
||||
final long nextRam = next.bytesUsed;
|
||||
if (nextRam > maxRamSoFar && next.perThread.getNumDocsInRAM() > 0) {
|
||||
if (nextRam > maxRamSoFar && next.dwpt.getNumDocsInRAM() > 0) {
|
||||
maxRamSoFar = nextRam;
|
||||
maxRamUsingThreadState = next;
|
||||
}
|
||||
|
|
|
@ -1144,7 +1144,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit {
|
|||
synchronized(this) {
|
||||
closed = true;
|
||||
}
|
||||
assert oldWriter.assertNoActiveDWPT();
|
||||
assert oldWriter.perThreadPool.numDeactivatedThreadStates() == oldWriter.perThreadPool.getMaxThreadStates();
|
||||
} catch (OutOfMemoryError oom) {
|
||||
handleOOM(oom, "closeInternal");
|
||||
} finally {
|
||||
|
|
|
@ -94,6 +94,13 @@ public final class IndexWriterConfig implements Cloneable {
|
|||
|
||||
/** Default value is 1945. Change using {@link #setRAMPerThreadHardLimitMB(int)} */
|
||||
public static final int DEFAULT_RAM_PER_THREAD_HARD_LIMIT_MB = 1945;
|
||||
|
||||
/** The maximum number of simultaneous threads that may be
|
||||
* indexing documents at once in IndexWriter; if more
|
||||
* than this many threads arrive they will wait for
|
||||
* others to finish. Default value is 8. */
|
||||
public final static int DEFAULT_MAX_THREAD_STATES = 8;
|
||||
|
||||
/**
|
||||
* Sets the default (for any instance) maximum time to wait for a write lock
|
||||
* (in milliseconds).
|
||||
|
@ -172,7 +179,7 @@ public final class IndexWriterConfig implements Cloneable {
|
|||
}
|
||||
flushPolicy = new FlushByRamOrCountsPolicy();
|
||||
readerPooling = DEFAULT_READER_POOLING;
|
||||
indexerThreadPool = new ThreadAffinityDocumentsWriterThreadPool();
|
||||
indexerThreadPool = new ThreadAffinityDocumentsWriterThreadPool(DEFAULT_MAX_THREAD_STATES);
|
||||
readerTermsIndexDivisor = DEFAULT_READER_TERMS_INDEX_DIVISOR;
|
||||
perThreadHardLimitMB = DEFAULT_RAM_PER_THREAD_HARD_LIMIT_MB;
|
||||
}
|
||||
|
@ -554,8 +561,8 @@ public final class IndexWriterConfig implements Cloneable {
|
|||
* IndexWriter to assign thread-states to incoming indexing threads. If no
|
||||
* {@link DocumentsWriterPerThreadPool} is set {@link IndexWriter} will use
|
||||
* {@link ThreadAffinityDocumentsWriterThreadPool} with max number of
|
||||
* thread-states set to {@link DocumentsWriterPerThreadPool#DEFAULT_MAX_THREAD_STATES} (see
|
||||
* {@link DocumentsWriterPerThreadPool#DEFAULT_MAX_THREAD_STATES}).
|
||||
* thread-states set to {@link #DEFAULT_MAX_THREAD_STATES} (see
|
||||
* {@link #DEFAULT_MAX_THREAD_STATES}).
|
||||
* </p>
|
||||
* <p>
|
||||
* NOTE: The given {@link DocumentsWriterPerThreadPool} instance must not be used with
|
||||
|
|
|
@ -34,13 +34,8 @@ public class ThreadAffinityDocumentsWriterThreadPool extends DocumentsWriterPerT
|
|||
private Map<Thread, ThreadState> threadBindings = new ConcurrentHashMap<Thread, ThreadState>();
|
||||
|
||||
/**
|
||||
* Creates a new {@link DocumentsWriterPerThreadPool} with max.
|
||||
* {@link #DEFAULT_MAX_THREAD_STATES} thread states.
|
||||
* Creates a new {@link ThreadAffinityDocumentsWriterThreadPool} with a given maximum of {@link ThreadState}s.
|
||||
*/
|
||||
public ThreadAffinityDocumentsWriterThreadPool() {
|
||||
this(DEFAULT_MAX_THREAD_STATES);
|
||||
}
|
||||
|
||||
public ThreadAffinityDocumentsWriterThreadPool(int maxNumPerThreads) {
|
||||
super(maxNumPerThreads);
|
||||
assert getMaxThreadStates() >= 1;
|
||||
|
|
|
@ -281,10 +281,10 @@ public class TestFlushByRamOrCountsPolicy extends LuceneTestCase {
|
|||
}
|
||||
|
||||
protected void assertActiveBytesAfter(DocumentsWriterFlushControl flushControl) {
|
||||
Iterator<ThreadState> allActiveThreads = flushControl.allActiveThreads();
|
||||
Iterator<ThreadState> allActiveThreads = flushControl.allActiveThreadStates();
|
||||
long bytesUsed = 0;
|
||||
while (allActiveThreads.hasNext()) {
|
||||
bytesUsed += allActiveThreads.next().perThread.bytesUsed();
|
||||
bytesUsed += allActiveThreads.next().dwpt.bytesUsed();
|
||||
}
|
||||
assertEquals(bytesUsed, flushControl.activeBytes());
|
||||
}
|
||||
|
@ -343,7 +343,7 @@ public class TestFlushByRamOrCountsPolicy extends LuceneTestCase {
|
|||
if (state.flushPending) {
|
||||
toFlush = state;
|
||||
} else if (flushOnDeleteTerms()
|
||||
&& state.perThread.pendingDeletes.numTermDeletes.get() >= indexWriterConfig
|
||||
&& state.dwpt.pendingDeletes.numTermDeletes.get() >= indexWriterConfig
|
||||
.getMaxBufferedDeleteTerms()) {
|
||||
toFlush = state;
|
||||
} else {
|
||||
|
@ -376,7 +376,7 @@ public class TestFlushByRamOrCountsPolicy extends LuceneTestCase {
|
|||
if (state.flushPending) {
|
||||
toFlush = state;
|
||||
} else if (flushOnDocCount()
|
||||
&& state.perThread.getNumDocsInRAM() >= indexWriterConfig
|
||||
&& state.dwpt.getNumDocsInRAM() >= indexWriterConfig
|
||||
.getMaxBufferedDocs()) {
|
||||
toFlush = state;
|
||||
} else if (flushOnRAM()
|
||||
|
@ -397,7 +397,7 @@ public class TestFlushByRamOrCountsPolicy extends LuceneTestCase {
|
|||
hasMarkedPending = true;
|
||||
} else {
|
||||
peakBytesWithoutFlush = Math.max(activeBytes, peakBytesWithoutFlush);
|
||||
peakDocCountWithoutFlush = Math.max(state.perThread.getNumDocsInRAM(),
|
||||
peakDocCountWithoutFlush = Math.max(state.dwpt.getNumDocsInRAM(),
|
||||
peakDocCountWithoutFlush);
|
||||
}
|
||||
|
||||
|
@ -409,7 +409,7 @@ public class TestFlushByRamOrCountsPolicy extends LuceneTestCase {
|
|||
|
||||
static void findPending(DocumentsWriterFlushControl flushControl,
|
||||
ArrayList<ThreadState> pending, ArrayList<ThreadState> notPending) {
|
||||
Iterator<ThreadState> allActiveThreads = flushControl.allActiveThreads();
|
||||
Iterator<ThreadState> allActiveThreads = flushControl.allActiveThreadStates();
|
||||
while (allActiveThreads.hasNext()) {
|
||||
ThreadState next = allActiveThreads.next();
|
||||
if (next.flushPending) {
|
||||
|
|
Loading…
Reference in New Issue