diff --git a/lucene/core/src/java/org/apache/lucene/index/DefaultIndexingChain.java b/lucene/core/src/java/org/apache/lucene/index/DefaultIndexingChain.java index caafdf685af..bbef6e8f474 100644 --- a/lucene/core/src/java/org/apache/lucene/index/DefaultIndexingChain.java +++ b/lucene/core/src/java/org/apache/lucene/index/DefaultIndexingChain.java @@ -337,7 +337,7 @@ final class DefaultIndexingChain extends DocConsumer { IndexableFieldType fieldType = field.fieldType(); verifyFieldType(fieldName, fieldType); - + PerField fp = getOrAddField(fieldName, fieldType, false); if (fieldType.stored()) { try { @@ -392,8 +392,9 @@ final class DefaultIndexingChain extends DocConsumer { private void indexDocValue(PerField fp, DocValuesType dvType, StorableField field) throws IOException { if (fp.fieldInfo.getDocValuesType() == DocValuesType.NONE) { - // This will throw an exc if the caller tried to - // change the DV type for the field: + // This is the first time we are seeing this field indexed with doc values, so we + // now record the DV type so that any future attempt to (illegally) change + // the DV type of this field, will throw an IllegalArgExc: fieldInfos.globalFieldNumbers.setDocValuesType(fp.fieldInfo.number, fp.fieldInfo.name, dvType); } fp.fieldInfo.setDocValuesType(dvType); diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java index d1e2f706e46..1a0b1691b8c 100644 --- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java +++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java @@ -240,6 +240,7 @@ final class DocumentsWriter implements Closeable, Accountable { try { deleteQueue.clear(); final int limit = perThreadPool.getMaxThreadStates(); + perThreadPool.setAbort(); for (int i = 0; i < limit; i++) { final ThreadState perThread = perThreadPool.getThreadState(i); perThread.lock(); @@ -254,7 +255,7 @@ final class DocumentsWriter implements Closeable, Accountable { if (infoStream.isEnabled("DW")) { infoStream.message("DW", "finished lockAndAbortAll success=" + success); } - if (!success) { + if (success == false) { // if something happens here we unlock all states again unlockAllAfterAbortAll(indexWriter); } @@ -291,13 +292,14 @@ final class DocumentsWriter implements Closeable, Accountable { infoStream.message("DW", "unlockAll"); } final int limit = perThreadPool.getMaxThreadStates(); + perThreadPool.clearAbort(); for (int i = 0; i < limit; i++) { try { final ThreadState perThread = perThreadPool.getThreadState(i); if (perThread.isHeldByCurrentThread()) { perThread.unlock(); } - } catch(Throwable e) { + } catch (Throwable e) { if (infoStream.isEnabled("DW")) { infoStream.message("DW", "unlockAll: could not unlock state: " + i + " msg:" + e.getMessage()); } diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java index b3048a55b31..a164f8ce753 100644 --- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java +++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java @@ -381,7 +381,6 @@ final class DocumentsWriterFlushControl implements Accountable { // set by DW to signal that we should not release new DWPT after close if (!closed) { this.closed = true; - perThreadPool.deactivateUnreleasedStates(); } } diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java index 5a456aba073..63d5efc87b7 100644 --- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java +++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java @@ -16,7 +16,7 @@ package org.apache.lucene.index; * limitations under the License. */ -import org.apache.lucene.store.AlreadyClosedException; +import org.apache.lucene.util.ThreadInterruptedException; import java.util.ArrayList; import java.util.List; @@ -66,15 +66,9 @@ final class DocumentsWriterPerThreadPool { this.dwpt = dpwt; } - /** - * Resets the internal {@link DocumentsWriterPerThread} with the given one. - * if the given DWPT is null this ThreadState is marked as inactive and should not be used - * for indexing anymore. - * @see #isActive() - */ - + /** Mark this ThreadState as inactive, setting dwpt to null. + * @see #isActive() */ private void deactivate() { - assert this.isHeldByCurrentThread(); isActive = false; reset(); } @@ -130,15 +124,25 @@ final class DocumentsWriterPerThreadPool { } private final List threadStates = new ArrayList<>(); - private volatile int numThreadStatesActive; private final List freeList = new ArrayList<>(); + private boolean aborted; + /** * Returns the active number of {@link ThreadState} instances. */ synchronized int getActiveThreadStateCount() { - return numThreadStatesActive; + return threadStates.size(); + } + + synchronized void setAbort() { + aborted = true; + } + + synchronized void clearAbort() { + aborted = false; + notifyAll(); } /** @@ -152,67 +156,19 @@ final class DocumentsWriterPerThreadPool { * null */ private synchronized ThreadState newThreadState() { - assert numThreadStatesActive <= threadStates.size(); - - if (numThreadStatesActive == threadStates.size()) { - threadStates.add(new ThreadState(null)); - } - - ThreadState threadState = threadStates.get(numThreadStatesActive); + while (aborted) { + try { + wait(); + } catch (InterruptedException ie) { + throw new ThreadInterruptedException(ie); + } + } + ThreadState threadState = new ThreadState(null); threadState.lock(); // lock so nobody else will get this ThreadState - boolean unlock = true; - try { - if (threadState.isActive()) { - // unreleased thread states are deactivated during DW#close() - numThreadStatesActive++; // increment will publish the ThreadState - //System.out.println("activeCount=" + numThreadStatesActive); - assert threadState.dwpt == null; - unlock = false; - return threadState; - } - // we are closed: unlock since the threadstate is not active anymore - assert assertUnreleasedThreadStatesInactive(); - throw new AlreadyClosedException("this IndexWriter is closed"); - } finally { - if (unlock) { - // in any case make sure we unlock if we fail - threadState.unlock(); - } - } + threadStates.add(threadState); + return threadState; } - // Used by assert - private synchronized boolean assertUnreleasedThreadStatesInactive() { - for (int i = numThreadStatesActive; i < threadStates.size(); i++) { - ThreadState threadState = threadStates.get(i); - assert threadState.tryLock() : "unreleased threadstate should not be locked"; - try { - assert !threadState.isInitialized() : "expected unreleased thread state to be inactive"; - } finally { - threadState.unlock(); - } - } - return true; - } - - /** - * Deactivate all unreleased threadstates - */ - synchronized void deactivateUnreleasedStates() { - for (int i = numThreadStatesActive; i < threadStates.size(); i++) { - final ThreadState threadState = threadStates.get(i); - threadState.lock(); - try { - threadState.deactivate(); - } finally { - threadState.unlock(); - } - } - - // In case any threads are waiting for indexing: - notifyAll(); - } - DocumentsWriterPerThread reset(ThreadState threadState, boolean closed) { assert threadState.isHeldByCurrentThread(); final DocumentsWriterPerThread dwpt = threadState.dwpt; @@ -303,9 +259,7 @@ final class DocumentsWriterPerThreadPool { */ ThreadState minContendedThreadState() { ThreadState minThreadState = null; - final int limit = numThreadStatesActive; - for (int i = 0; i < limit; i++) { - final ThreadState state = threadStates.get(i); + for (ThreadState state : threadStates) { if (minThreadState == null || state.getQueueLength() < minThreadState.getQueueLength()) { minThreadState = state; }