LUCENE-6676: remove isActive boolean

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1691034 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael McCandless 2015-07-14 17:11:45 +00:00
parent 1498d5b0b7
commit 24136f1fe5
3 changed files with 26 additions and 75 deletions

View File

@ -266,7 +266,6 @@ final class DocumentsWriter implements Closeable, Accountable {
/** Returns how many documents were aborted. */ /** Returns how many documents were aborted. */
private int abortThreadState(final ThreadState perThread) { private int abortThreadState(final ThreadState perThread) {
assert perThread.isHeldByCurrentThread(); assert perThread.isHeldByCurrentThread();
if (perThread.isActive()) { // we might be closed
if (perThread.isInitialized()) { if (perThread.isInitialized()) {
try { try {
int abortedDocCount = perThread.dwpt.getNumDocsInRAM(); int abortedDocCount = perThread.dwpt.getNumDocsInRAM();
@ -281,10 +280,6 @@ final class DocumentsWriter implements Closeable, Accountable {
// This DWPT was never initialized so it has no indexed documents: // This DWPT was never initialized so it has no indexed documents:
return 0; return 0;
} }
} else {
assert closed;
return 0;
}
} }
synchronized void unlockAllAfterAbortAll(IndexWriter indexWriter) { synchronized void unlockAllAfterAbortAll(IndexWriter indexWriter) {
@ -393,9 +388,8 @@ final class DocumentsWriter implements Closeable, Accountable {
} }
private void ensureInitialized(ThreadState state) throws IOException { private void ensureInitialized(ThreadState state) throws IOException {
if (state.isActive() && state.dwpt == null) { if (state.dwpt == null) {
final FieldInfos.Builder infos = new FieldInfos.Builder( final FieldInfos.Builder infos = new FieldInfos.Builder(writer.globalFieldNumberMap);
writer.globalFieldNumberMap);
state.dwpt = new DocumentsWriterPerThread(writer, writer.newSegmentName(), directoryOrig, state.dwpt = new DocumentsWriterPerThread(writer, writer.newSegmentName(), directoryOrig,
directory, config, infoStream, deleteQueue, infos, directory, config, infoStream, deleteQueue, infos,
writer.pendingNumDocs, writer.enableTestPoints); writer.pendingNumDocs, writer.enableTestPoints);
@ -410,10 +404,9 @@ final class DocumentsWriter implements Closeable, Accountable {
final DocumentsWriterPerThread flushingDWPT; final DocumentsWriterPerThread flushingDWPT;
try { try {
if (!perThread.isActive()) { // This must happen after we've pulled the ThreadState because IW.close
// waits for all ThreadStates to be released:
ensureOpen(); ensureOpen();
assert false: "perThread is not active but we are still open";
}
ensureInitialized(perThread); ensureInitialized(perThread);
assert perThread.isInitialized(); assert perThread.isInitialized();
final DocumentsWriterPerThread dwpt = perThread.dwpt; final DocumentsWriterPerThread dwpt = perThread.dwpt;
@ -448,10 +441,9 @@ final class DocumentsWriter implements Closeable, Accountable {
final DocumentsWriterPerThread flushingDWPT; final DocumentsWriterPerThread flushingDWPT;
try { try {
if (!perThread.isActive()) { // This must happen after we've pulled the ThreadState because IW.close
// waits for all ThreadStates to be released:
ensureOpen(); ensureOpen();
assert false: "perThread is not active but we are still open";
}
ensureInitialized(perThread); ensureInitialized(perThread);
assert perThread.isInitialized(); assert perThread.isInitialized();
final DocumentsWriterPerThread dwpt = perThread.dwpt; final DocumentsWriterPerThread dwpt = perThread.dwpt;

View File

@ -288,7 +288,7 @@ final class DocumentsWriterFlushControl implements Accountable {
} }
assert assertMemory(); assert assertMemory();
// Take it out of the loop this DWPT is stale // Take it out of the loop this DWPT is stale
perThreadPool.reset(state, closed); perThreadPool.reset(state);
} finally { } finally {
updateStallState(); updateStallState();
} }
@ -306,7 +306,7 @@ final class DocumentsWriterFlushControl implements Accountable {
assert fullFlush : "can not block if fullFlush == false"; assert fullFlush : "can not block if fullFlush == false";
final DocumentsWriterPerThread dwpt; final DocumentsWriterPerThread dwpt;
final long bytes = perThread.bytesUsed; final long bytes = perThread.bytesUsed;
dwpt = perThreadPool.reset(perThread, closed); dwpt = perThreadPool.reset(perThread);
numPending--; numPending--;
blockedFlushes.add(new BlockedFlush(dwpt, bytes)); blockedFlushes.add(new BlockedFlush(dwpt, bytes));
} finally { } finally {
@ -314,8 +314,7 @@ final class DocumentsWriterFlushControl implements Accountable {
} }
} }
private DocumentsWriterPerThread internalTryCheckOutForFlush( private DocumentsWriterPerThread internalTryCheckOutForFlush(ThreadState perThread) {
ThreadState perThread) {
assert Thread.holdsLock(this); assert Thread.holdsLock(this);
assert perThread.flushPending; assert perThread.flushPending;
try { try {
@ -327,7 +326,7 @@ final class DocumentsWriterFlushControl implements Accountable {
final DocumentsWriterPerThread dwpt; final DocumentsWriterPerThread dwpt;
final long bytes = perThread.bytesUsed; // do that before final long bytes = perThread.bytesUsed; // do that before
// replace! // replace!
dwpt = perThreadPool.reset(perThread, closed); dwpt = perThreadPool.reset(perThread);
assert !flushingWriters.containsKey(dwpt) : "DWPT is already flushing"; assert !flushingWriters.containsKey(dwpt) : "DWPT is already flushing";
// Record the flushing DWPT to reduce flushBytes in doAfterFlush // Record the flushing DWPT to reduce flushBytes in doAfterFlush
flushingWriters.put(dwpt, Long.valueOf(bytes)); flushingWriters.put(dwpt, Long.valueOf(bytes));
@ -379,10 +378,8 @@ final class DocumentsWriterFlushControl implements Accountable {
synchronized void setClosed() { synchronized void setClosed() {
// set by DW to signal that we should not release new DWPT after close // set by DW to signal that we should not release new DWPT after close
if (!closed) {
this.closed = true; this.closed = true;
} }
}
/** /**
* Returns an iterator that provides access to all currently active {@link ThreadState}s * Returns an iterator that provides access to all currently active {@link ThreadState}s
@ -492,9 +489,6 @@ final class DocumentsWriterFlushControl implements Accountable {
next.lock(); next.lock();
try { try {
if (!next.isInitialized()) { if (!next.isInitialized()) {
if (closed && next.isActive()) {
perThreadPool.deactivateThreadState(next);
}
continue; continue;
} }
assert next.dwpt.deleteQueue == flushingQueue assert next.dwpt.deleteQueue == flushingQueue
@ -564,7 +558,7 @@ final class DocumentsWriterFlushControl implements Accountable {
fullFlushBuffer.add(flushingDWPT); fullFlushBuffer.add(flushingDWPT);
} }
} else { } else {
perThreadPool.reset(perThread, closed); // make this state inactive perThreadPool.reset(perThread); // make this state inactive
} }
} }

View File

@ -60,20 +60,11 @@ final class DocumentsWriterPerThreadPool {
// TODO this should really be part of DocumentsWriterFlushControl // TODO this should really be part of DocumentsWriterFlushControl
// write access guarded by DocumentsWriterFlushControl // write access guarded by DocumentsWriterFlushControl
long bytesUsed = 0; long bytesUsed = 0;
// guarded by Reentrant lock
private boolean isActive = true;
ThreadState(DocumentsWriterPerThread dpwt) { ThreadState(DocumentsWriterPerThread dpwt) {
this.dwpt = dpwt; this.dwpt = dpwt;
} }
/** Mark this ThreadState as inactive, setting dwpt to null.
* @see #isActive() */
private void deactivate() {
isActive = false;
reset();
}
private void reset() { private void reset() {
assert this.isHeldByCurrentThread(); assert this.isHeldByCurrentThread();
this.dwpt = null; this.dwpt = null;
@ -81,19 +72,9 @@ final class DocumentsWriterPerThreadPool {
this.flushPending = false; this.flushPending = false;
} }
/**
* Returns <code>true</code> if this ThreadState is still open. This will
* only return <code>false</code> iff the DW has been closed and this
* ThreadState is already checked out for flush.
*/
boolean isActive() {
assert this.isHeldByCurrentThread();
return isActive;
}
boolean isInitialized() { boolean isInitialized() {
assert this.isHeldByCurrentThread(); assert this.isHeldByCurrentThread();
return isActive() && dwpt != null; return dwpt != null;
} }
/** /**
@ -170,14 +151,10 @@ final class DocumentsWriterPerThreadPool {
return threadState; return threadState;
} }
DocumentsWriterPerThread reset(ThreadState threadState, boolean closed) { DocumentsWriterPerThread reset(ThreadState threadState) {
assert threadState.isHeldByCurrentThread(); assert threadState.isHeldByCurrentThread();
final DocumentsWriterPerThread dwpt = threadState.dwpt; final DocumentsWriterPerThread dwpt = threadState.dwpt;
if (!closed) {
threadState.reset(); threadState.reset();
} else {
threadState.deactivate();
}
return dwpt; return dwpt;
} }
@ -267,16 +244,4 @@ final class DocumentsWriterPerThreadPool {
} }
return minThreadState; return minThreadState;
} }
/**
* Deactivates an active {@link ThreadState}. Inactive {@link ThreadState} can
* not be used for indexing anymore once they are deactivated. This method should only be used
* if the parent {@link DocumentsWriter} is closed or aborted.
*
* @param threadState the state to deactivate
*/
void deactivateThreadState(ThreadState threadState) {
assert threadState.isActive();
threadState.deactivate();
}
} }