mirror of https://github.com/apache/lucene.git
LUCENE-6659: never allocate a new ThreadState when DW is aborting
git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1690826 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
a886c663d3
commit
b922713826
|
@ -337,7 +337,7 @@ final class DefaultIndexingChain extends DocConsumer {
|
|||
IndexableFieldType fieldType = field.fieldType();
|
||||
|
||||
verifyFieldType(fieldName, fieldType);
|
||||
|
||||
|
||||
PerField fp = getOrAddField(fieldName, fieldType, false);
|
||||
if (fieldType.stored()) {
|
||||
try {
|
||||
|
@ -392,8 +392,9 @@ final class DefaultIndexingChain extends DocConsumer {
|
|||
private void indexDocValue(PerField fp, DocValuesType dvType, StorableField field) throws IOException {
|
||||
|
||||
if (fp.fieldInfo.getDocValuesType() == DocValuesType.NONE) {
|
||||
// This will throw an exc if the caller tried to
|
||||
// change the DV type for the field:
|
||||
// This is the first time we are seeing this field indexed with doc values, so we
|
||||
// now record the DV type so that any future attempt to (illegally) change
|
||||
// the DV type of this field, will throw an IllegalArgExc:
|
||||
fieldInfos.globalFieldNumbers.setDocValuesType(fp.fieldInfo.number, fp.fieldInfo.name, dvType);
|
||||
}
|
||||
fp.fieldInfo.setDocValuesType(dvType);
|
||||
|
|
|
@ -240,6 +240,7 @@ final class DocumentsWriter implements Closeable, Accountable {
|
|||
try {
|
||||
deleteQueue.clear();
|
||||
final int limit = perThreadPool.getMaxThreadStates();
|
||||
perThreadPool.setAbort();
|
||||
for (int i = 0; i < limit; i++) {
|
||||
final ThreadState perThread = perThreadPool.getThreadState(i);
|
||||
perThread.lock();
|
||||
|
@ -254,7 +255,7 @@ final class DocumentsWriter implements Closeable, Accountable {
|
|||
if (infoStream.isEnabled("DW")) {
|
||||
infoStream.message("DW", "finished lockAndAbortAll success=" + success);
|
||||
}
|
||||
if (!success) {
|
||||
if (success == false) {
|
||||
// if something happens here we unlock all states again
|
||||
unlockAllAfterAbortAll(indexWriter);
|
||||
}
|
||||
|
@ -291,13 +292,14 @@ final class DocumentsWriter implements Closeable, Accountable {
|
|||
infoStream.message("DW", "unlockAll");
|
||||
}
|
||||
final int limit = perThreadPool.getMaxThreadStates();
|
||||
perThreadPool.clearAbort();
|
||||
for (int i = 0; i < limit; i++) {
|
||||
try {
|
||||
final ThreadState perThread = perThreadPool.getThreadState(i);
|
||||
if (perThread.isHeldByCurrentThread()) {
|
||||
perThread.unlock();
|
||||
}
|
||||
} catch(Throwable e) {
|
||||
} catch (Throwable e) {
|
||||
if (infoStream.isEnabled("DW")) {
|
||||
infoStream.message("DW", "unlockAll: could not unlock state: " + i + " msg:" + e.getMessage());
|
||||
}
|
||||
|
|
|
@ -381,7 +381,6 @@ final class DocumentsWriterFlushControl implements Accountable {
|
|||
// set by DW to signal that we should not release new DWPT after close
|
||||
if (!closed) {
|
||||
this.closed = true;
|
||||
perThreadPool.deactivateUnreleasedStates();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -16,7 +16,7 @@ package org.apache.lucene.index;
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import org.apache.lucene.store.AlreadyClosedException;
|
||||
import org.apache.lucene.util.ThreadInterruptedException;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
@ -66,15 +66,9 @@ final class DocumentsWriterPerThreadPool {
|
|||
this.dwpt = dpwt;
|
||||
}
|
||||
|
||||
/**
|
||||
* Resets the internal {@link DocumentsWriterPerThread} with the given one.
|
||||
* if the given DWPT is <code>null</code> this ThreadState is marked as inactive and should not be used
|
||||
* for indexing anymore.
|
||||
* @see #isActive()
|
||||
*/
|
||||
|
||||
/** Mark this ThreadState as inactive, setting dwpt to null.
|
||||
* @see #isActive() */
|
||||
private void deactivate() {
|
||||
assert this.isHeldByCurrentThread();
|
||||
isActive = false;
|
||||
reset();
|
||||
}
|
||||
|
@ -130,15 +124,25 @@ final class DocumentsWriterPerThreadPool {
|
|||
}
|
||||
|
||||
private final List<ThreadState> threadStates = new ArrayList<>();
|
||||
private volatile int numThreadStatesActive;
|
||||
|
||||
private final List<ThreadState> freeList = new ArrayList<>();
|
||||
|
||||
private boolean aborted;
|
||||
|
||||
/**
|
||||
* Returns the active number of {@link ThreadState} instances.
|
||||
*/
|
||||
synchronized int getActiveThreadStateCount() {
|
||||
return numThreadStatesActive;
|
||||
return threadStates.size();
|
||||
}
|
||||
|
||||
synchronized void setAbort() {
|
||||
aborted = true;
|
||||
}
|
||||
|
||||
synchronized void clearAbort() {
|
||||
aborted = false;
|
||||
notifyAll();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -152,67 +156,19 @@ final class DocumentsWriterPerThreadPool {
|
|||
* <code>null</code>
|
||||
*/
|
||||
private synchronized ThreadState newThreadState() {
|
||||
assert numThreadStatesActive <= threadStates.size();
|
||||
|
||||
if (numThreadStatesActive == threadStates.size()) {
|
||||
threadStates.add(new ThreadState(null));
|
||||
}
|
||||
|
||||
ThreadState threadState = threadStates.get(numThreadStatesActive);
|
||||
while (aborted) {
|
||||
try {
|
||||
wait();
|
||||
} catch (InterruptedException ie) {
|
||||
throw new ThreadInterruptedException(ie);
|
||||
}
|
||||
}
|
||||
ThreadState threadState = new ThreadState(null);
|
||||
threadState.lock(); // lock so nobody else will get this ThreadState
|
||||
boolean unlock = true;
|
||||
try {
|
||||
if (threadState.isActive()) {
|
||||
// unreleased thread states are deactivated during DW#close()
|
||||
numThreadStatesActive++; // increment will publish the ThreadState
|
||||
//System.out.println("activeCount=" + numThreadStatesActive);
|
||||
assert threadState.dwpt == null;
|
||||
unlock = false;
|
||||
return threadState;
|
||||
}
|
||||
// we are closed: unlock since the threadstate is not active anymore
|
||||
assert assertUnreleasedThreadStatesInactive();
|
||||
throw new AlreadyClosedException("this IndexWriter is closed");
|
||||
} finally {
|
||||
if (unlock) {
|
||||
// in any case make sure we unlock if we fail
|
||||
threadState.unlock();
|
||||
}
|
||||
}
|
||||
threadStates.add(threadState);
|
||||
return threadState;
|
||||
}
|
||||
|
||||
// Used by assert
|
||||
private synchronized boolean assertUnreleasedThreadStatesInactive() {
|
||||
for (int i = numThreadStatesActive; i < threadStates.size(); i++) {
|
||||
ThreadState threadState = threadStates.get(i);
|
||||
assert threadState.tryLock() : "unreleased threadstate should not be locked";
|
||||
try {
|
||||
assert !threadState.isInitialized() : "expected unreleased thread state to be inactive";
|
||||
} finally {
|
||||
threadState.unlock();
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Deactivate all unreleased threadstates
|
||||
*/
|
||||
synchronized void deactivateUnreleasedStates() {
|
||||
for (int i = numThreadStatesActive; i < threadStates.size(); i++) {
|
||||
final ThreadState threadState = threadStates.get(i);
|
||||
threadState.lock();
|
||||
try {
|
||||
threadState.deactivate();
|
||||
} finally {
|
||||
threadState.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
// In case any threads are waiting for indexing:
|
||||
notifyAll();
|
||||
}
|
||||
|
||||
DocumentsWriterPerThread reset(ThreadState threadState, boolean closed) {
|
||||
assert threadState.isHeldByCurrentThread();
|
||||
final DocumentsWriterPerThread dwpt = threadState.dwpt;
|
||||
|
@ -303,9 +259,7 @@ final class DocumentsWriterPerThreadPool {
|
|||
*/
|
||||
ThreadState minContendedThreadState() {
|
||||
ThreadState minThreadState = null;
|
||||
final int limit = numThreadStatesActive;
|
||||
for (int i = 0; i < limit; i++) {
|
||||
final ThreadState state = threadStates.get(i);
|
||||
for (ThreadState state : threadStates) {
|
||||
if (minThreadState == null || state.getQueueLength() < minThreadState.getQueueLength()) {
|
||||
minThreadState = state;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue