LUCENE-5644: switch to simpler LIFO thread to ThreadState allocator during indexing

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1593226 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael McCandless 2014-05-08 10:13:24 +00:00
parent 52a33e54e3
commit 8806d866fc
11 changed files with 90 additions and 287 deletions

View File

@ -435,7 +435,7 @@ final class DocumentsWriter implements Closeable {
final boolean isUpdate = delTerm != null;
flushingDWPT = flushControl.doAfterDocument(perThread, isUpdate);
} finally {
perThread.unlock();
perThreadPool.release(perThread);
}
return postUpdate(flushingDWPT, hasEvents);
@ -476,7 +476,7 @@ final class DocumentsWriter implements Closeable {
final boolean isUpdate = delTerm != null;
flushingDWPT = flushControl.doAfterDocument(perThread, isUpdate);
} finally {
perThread.unlock();
perThreadPool.release(perThread);
}
return postUpdate(flushingDWPT, hasEvents);

View File

@ -458,7 +458,7 @@ final class DocumentsWriterFlushControl {
return perThread;
} finally {
if (!success) { // make sure we unlock if this fails
perThread.unlock();
perThreadPool.release(perThread);
}
}
}

View File

@ -18,6 +18,8 @@ package org.apache.lucene.index;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.lucene.util.ThreadInterruptedException;
/**
* {@link DocumentsWriterPerThreadPool} controls {@link ThreadState} instances
* and their thread assignments during indexing. Each {@link ThreadState} holds
@ -33,7 +35,7 @@ import java.util.concurrent.locks.ReentrantLock;
* new {@link DocumentsWriterPerThread} instance.
* </p>
*/
abstract class DocumentsWriterPerThreadPool implements Cloneable {
final class DocumentsWriterPerThreadPool implements Cloneable {
/**
* {@link ThreadState} references and guards a
@ -125,9 +127,12 @@ abstract class DocumentsWriterPerThreadPool implements Cloneable {
}
}
private ThreadState[] threadStates;
private final ThreadState[] threadStates;
private volatile int numThreadStatesActive;
private final ThreadState[] freeList;
private int freeCount;
/**
* Creates a new {@link DocumentsWriterPerThreadPool} with a given maximum of {@link ThreadState}s.
*/
@ -140,6 +145,7 @@ abstract class DocumentsWriterPerThreadPool implements Cloneable {
for (int i = 0; i < threadStates.length; i++) {
threadStates[i] = new ThreadState(null);
}
freeList = new ThreadState[maxNumThreadStates];
}
@Override
@ -148,19 +154,8 @@ abstract class DocumentsWriterPerThreadPool implements Cloneable {
if (numThreadStatesActive != 0) {
throw new IllegalStateException("clone this object before it is used!");
}
DocumentsWriterPerThreadPool clone;
try {
clone = (DocumentsWriterPerThreadPool) super.clone();
} catch (CloneNotSupportedException e) {
// should not happen
throw new RuntimeException(e);
}
clone.threadStates = new ThreadState[threadStates.length];
for (int i = 0; i < threadStates.length; i++) {
clone.threadStates[i] = new ThreadState(null);
}
return clone;
return new DocumentsWriterPerThreadPool(threadStates.length);
}
/**
@ -189,30 +184,29 @@ abstract class DocumentsWriterPerThreadPool implements Cloneable {
* @return a new {@link ThreadState} iff any new state is available otherwise
* <code>null</code>
*/
synchronized ThreadState newThreadState() {
if (numThreadStatesActive < threadStates.length) {
final ThreadState threadState = threadStates[numThreadStatesActive];
threadState.lock(); // lock so nobody else will get this ThreadState
boolean unlock = true;
try {
if (threadState.isActive()) {
// unreleased thread states are deactivated during DW#close()
numThreadStatesActive++; // increment will publish the ThreadState
assert threadState.dwpt == null;
unlock = false;
return threadState;
}
// unlock since the threadstate is not active anymore - we are closed!
assert assertUnreleasedThreadStatesInactive();
return null;
} finally {
if (unlock) {
// in any case make sure we unlock if we fail
threadState.unlock();
}
private ThreadState newThreadState() {
assert numThreadStatesActive < threadStates.length;
final ThreadState threadState = threadStates[numThreadStatesActive];
threadState.lock(); // lock so nobody else will get this ThreadState
boolean unlock = true;
try {
if (threadState.isActive()) {
// unreleased thread states are deactivated during DW#close()
numThreadStatesActive++; // increment will publish the ThreadState
//System.out.println("activeCount=" + numThreadStatesActive);
assert threadState.dwpt == null;
unlock = false;
return threadState;
}
// we are closed: unlock since the threadstate is not active anymore
assert assertUnreleasedThreadStatesInactive();
return null;
} finally {
if (unlock) {
// in any case make sure we unlock if we fail
threadState.unlock();
}
}
return null;
}
private synchronized boolean assertUnreleasedThreadStatesInactive() {
@ -240,6 +234,9 @@ abstract class DocumentsWriterPerThreadPool implements Cloneable {
threadState.unlock();
}
}
// In case any threads are waiting for indexing:
notifyAll();
}
DocumentsWriterPerThread reset(ThreadState threadState, boolean closed) {
@ -256,11 +253,48 @@ abstract class DocumentsWriterPerThreadPool implements Cloneable {
void recycle(DocumentsWriterPerThread dwpt) {
// don't recycle DWPT by default
}
// you cannot subclass this without being in o.a.l.index package anyway, so
// the class is already pkg-private... fix me: see LUCENE-4013
abstract ThreadState getAndLock(Thread requestingThread, DocumentsWriter documentsWriter);
/** This method is used by DocumentsWriter/FlushControl to obtain a ThreadState to do an indexing operation (add/updateDocument). */
ThreadState getAndLock(Thread requestingThread, DocumentsWriter documentsWriter) {
ThreadState threadState = null;
synchronized (this) {
while (true) {
if (freeCount > 0) {
// Important that we are LIFO here! This way if number of concurrent indexing threads was once high, but has now reduced, we only use a
// limited number of thread states:
threadState = freeList[freeCount-1];
freeCount--;
break;
} else if (numThreadStatesActive < threadStates.length) {
// ThreadState is already locked before return by this method:
return newThreadState();
} else {
// Wait until a thread state frees up:
try {
wait();
} catch (InterruptedException ie) {
throw new ThreadInterruptedException(ie);
}
}
}
}
// This could take time, e.g. if the threadState is [briefly] checked for flushing:
threadState.lock();
return threadState;
}
void release(ThreadState state) {
state.unlock();
synchronized (this) {
assert freeCount < freeList.length;
freeList[freeCount++] = state;
// In case any thread is waiting, wake one of them up since we just released a thread state; notify() should be sufficient but we do
// notifyAll defensively:
notifyAll();
}
}
/**
* Returns the <i>i</i>th active {@link ThreadState} where <i>i</i> is the

View File

@ -345,11 +345,7 @@ public final class IndexWriterConfig extends LiveIndexWriterConfig implements Cl
}
/** Expert: Sets the {@link DocumentsWriterPerThreadPool} instance used by the
* IndexWriter to assign thread-states to incoming indexing threads. If no
* {@link DocumentsWriterPerThreadPool} is set {@link IndexWriter} will use
* {@link ThreadAffinityDocumentsWriterThreadPool} with max number of
* thread-states set to {@link #DEFAULT_MAX_THREAD_STATES} (see
* {@link #DEFAULT_MAX_THREAD_STATES}).
* IndexWriter to assign thread-states to incoming indexing threads.
* </p>
* <p>
* NOTE: The given {@link DocumentsWriterPerThreadPool} instance must not be used with
@ -379,17 +375,13 @@ public final class IndexWriterConfig extends LiveIndexWriterConfig implements Cl
*
* <p>Only takes effect when IndexWriter is first created. */
public IndexWriterConfig setMaxThreadStates(int maxThreadStates) {
this.indexerThreadPool = new ThreadAffinityDocumentsWriterThreadPool(maxThreadStates);
this.indexerThreadPool = new DocumentsWriterPerThreadPool(maxThreadStates);
return this;
}
@Override
public int getMaxThreadStates() {
try {
return ((ThreadAffinityDocumentsWriterThreadPool) indexerThreadPool).getMaxThreadStates();
} catch (ClassCastException cce) {
throw new IllegalStateException(cce);
}
return indexerThreadPool.getMaxThreadStates();
}
/** By default, IndexWriter does not pool the

View File

@ -125,7 +125,7 @@ public class LiveIndexWriterConfig {
mergePolicy = new TieredMergePolicy();
flushPolicy = new FlushByRamOrCountsPolicy();
readerPooling = IndexWriterConfig.DEFAULT_READER_POOLING;
indexerThreadPool = new ThreadAffinityDocumentsWriterThreadPool(IndexWriterConfig.DEFAULT_MAX_THREAD_STATES);
indexerThreadPool = new DocumentsWriterPerThreadPool(IndexWriterConfig.DEFAULT_MAX_THREAD_STATES);
perThreadHardLimitMB = IndexWriterConfig.DEFAULT_RAM_PER_THREAD_HARD_LIMIT_MB;
}
@ -404,11 +404,7 @@ public class LiveIndexWriterConfig {
* documents at once in IndexWriter.
*/
public int getMaxThreadStates() {
try {
return ((ThreadAffinityDocumentsWriterThreadPool) indexerThreadPool).getMaxThreadStates();
} catch (ClassCastException cce) {
throw new IllegalStateException(cce);
}
return indexerThreadPool.getMaxThreadStates();
}
/**

View File

@ -1,111 +0,0 @@
package org.apache.lucene.index;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.util.Map;
import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.lucene.index.DocumentsWriterPerThreadPool.ThreadState; //javadoc
/**
* A {@link DocumentsWriterPerThreadPool} implementation that tries to assign an
* indexing thread to the same {@link ThreadState} each time the thread tries to
* obtain a {@link ThreadState}. Once a new {@link ThreadState} is created it is
* associated with the creating thread. Subsequently, if the threads associated
* {@link ThreadState} is not in use it will be associated with the requesting
* thread. Otherwise, if the {@link ThreadState} is used by another thread
* {@link ThreadAffinityDocumentsWriterThreadPool} tries to find the currently
* minimal contended {@link ThreadState}.
*/
class ThreadAffinityDocumentsWriterThreadPool extends DocumentsWriterPerThreadPool {
private Map<Thread, ThreadState> threadBindings = new ConcurrentHashMap<>();
/**
* Creates a new {@link ThreadAffinityDocumentsWriterThreadPool} with a given maximum of {@link ThreadState}s.
*/
public ThreadAffinityDocumentsWriterThreadPool(int maxNumPerThreads) {
super(maxNumPerThreads);
assert getMaxThreadStates() >= 1;
}
@Override
public ThreadState getAndLock(Thread requestingThread, DocumentsWriter documentsWriter) {
ThreadState threadState = threadBindings.get(requestingThread);
if (threadState != null && threadState.tryLock()) {
return threadState;
}
ThreadState minThreadState = null;
/* TODO -- another thread could lock the minThreadState we just got while
we should somehow prevent this. */
// Find the state that has minimum number of threads waiting
minThreadState = minContendedThreadState();
if (minThreadState == null || minThreadState.hasQueuedThreads()) {
final ThreadState newState = newThreadState(); // state is already locked if non-null
if (newState != null) {
assert newState.isHeldByCurrentThread();
threadBindings.put(requestingThread, newState);
return newState;
} else if (minThreadState == null) {
/*
* no new threadState available we just take the minContented one
* This must return a valid thread state since we accessed the
* synced context in newThreadState() above.
*/
minThreadState = minContendedThreadState();
}
} else {
threadBindings.put(requestingThread, minThreadState);
}
assert minThreadState != null: "ThreadState is null";
minThreadState.lock();
if (minThreadState.isInitialized() == false) {
// Another thread just flushed this thread state and cleared our binding; put it back:
threadBindings.put(requestingThread, minThreadState); // make sure we get the same state next time
}
return minThreadState;
}
@Override
DocumentsWriterPerThread reset(ThreadState threadState, boolean closed) {
// Remove all previous bindings to this ThreadState on flush:
Iterator<Map.Entry<Thread,ThreadState>> it = threadBindings.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<Thread,ThreadState> ent = it.next();
if (ent.getValue() == threadState) {
it.remove();
}
}
return super.reset(threadState, closed);
}
@Override
public ThreadAffinityDocumentsWriterThreadPool clone() {
ThreadAffinityDocumentsWriterThreadPool clone = (ThreadAffinityDocumentsWriterThreadPool) super.clone();
clone.threadBindings = new ConcurrentHashMap<>();
return clone;
}
@Override
public String toString() {
return "ThreadAffinityDocumentsWriterThreadPool(maxThreadStates=" + getMaxThreadStates() + ")";
}
}

View File

@ -71,7 +71,7 @@ public class TestFlushByRamOrCountsPolicy extends LuceneTestCase {
IndexWriterConfig iwc = newIndexWriterConfig(TEST_VERSION_CURRENT,
analyzer).setFlushPolicy(flushPolicy);
final int numDWPT = 1 + atLeast(2);
DocumentsWriterPerThreadPool threadPool = new ThreadAffinityDocumentsWriterThreadPool(
DocumentsWriterPerThreadPool threadPool = new DocumentsWriterPerThreadPool(
numDWPT);
iwc.setIndexerThreadPool(threadPool);
iwc.setRAMBufferSizeMB(maxRamMB);
@ -128,7 +128,7 @@ public class TestFlushByRamOrCountsPolicy extends LuceneTestCase {
new MockAnalyzer(random())).setFlushPolicy(flushPolicy);
final int numDWPT = 1 + atLeast(2);
DocumentsWriterPerThreadPool threadPool = new ThreadAffinityDocumentsWriterThreadPool(
DocumentsWriterPerThreadPool threadPool = new DocumentsWriterPerThreadPool(
numDWPT);
iwc.setIndexerThreadPool(threadPool);
iwc.setMaxBufferedDocs(2 + atLeast(10));
@ -179,7 +179,7 @@ public class TestFlushByRamOrCountsPolicy extends LuceneTestCase {
iwc.setFlushPolicy(flushPolicy);
final int numDWPT = 1 + random().nextInt(8);
DocumentsWriterPerThreadPool threadPool = new ThreadAffinityDocumentsWriterThreadPool(
DocumentsWriterPerThreadPool threadPool = new DocumentsWriterPerThreadPool(
numDWPT);
iwc.setIndexerThreadPool(threadPool);
@ -245,7 +245,7 @@ public class TestFlushByRamOrCountsPolicy extends LuceneTestCase {
FlushPolicy flushPolicy = new FlushByRamOrCountsPolicy();
iwc.setFlushPolicy(flushPolicy);
DocumentsWriterPerThreadPool threadPool = new ThreadAffinityDocumentsWriterThreadPool(
DocumentsWriterPerThreadPool threadPool = new DocumentsWriterPerThreadPool(
numThreads[i]== 1 ? 1 : 2);
iwc.setIndexerThreadPool(threadPool);
// with such a small ram buffer we should be stalled quiet quickly

View File

@ -75,7 +75,7 @@ public class TestIndexWriterConfig extends LuceneTestCase {
assertTrue(DocumentsWriterPerThread.defaultIndexingChain == conf.getIndexingChain());
assertNull(conf.getMergedSegmentWarmer());
assertEquals(TieredMergePolicy.class, conf.getMergePolicy().getClass());
assertEquals(ThreadAffinityDocumentsWriterThreadPool.class, conf.getIndexerThreadPool().getClass());
assertEquals(DocumentsWriterPerThreadPool.class, conf.getIndexerThreadPool().getClass());
assertEquals(FlushByRamOrCountsPolicy.class, conf.getFlushPolicy().getClass());
assertEquals(IndexWriterConfig.DEFAULT_RAM_PER_THREAD_HARD_LIMIT_MB, conf.getRAMPerThreadHardLimitMB());
assertEquals(Codec.getDefault(), conf.getCodec());

View File

@ -199,7 +199,7 @@ public class TestStressIndexing2 extends LuceneTestCase {
Map<String,Document> docs = new HashMap<>();
IndexWriter w = RandomIndexWriter.mockIndexWriter(dir, newIndexWriterConfig(
TEST_VERSION_CURRENT, new MockAnalyzer(random())).setOpenMode(OpenMode.CREATE)
.setRAMBufferSizeMB(0.1).setMaxBufferedDocs(maxBufferedDocs).setIndexerThreadPool(new ThreadAffinityDocumentsWriterThreadPool(maxThreadStates))
.setRAMBufferSizeMB(0.1).setMaxBufferedDocs(maxBufferedDocs).setIndexerThreadPool(new DocumentsWriterPerThreadPool(maxThreadStates))
.setReaderPooling(doReaderPooling).setMergePolicy(newLogMergePolicy()), new YieldTestPoint());
LogMergePolicy lmp = (LogMergePolicy) w.getConfig().getMergePolicy();
lmp.setNoCFSRatio(0.0);

View File

@ -1,90 +0,0 @@
package org.apache.lucene.index;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.util.Random;
/**
*
* A <code>DocumentsWriterPerThreadPool<code> that selects thread states at random.
*
* @lucene.internal
* @lucene.experimental
*/
class RandomDocumentsWriterPerThreadPool extends DocumentsWriterPerThreadPool {
private final ThreadState[] states;
private final Random random;
private final int maxRetry;
public RandomDocumentsWriterPerThreadPool(int maxNumPerThreads, Random random) {
super(maxNumPerThreads);
assert getMaxThreadStates() >= 1;
states = new ThreadState[maxNumPerThreads];
this.random = new Random(random.nextLong());
this.maxRetry = 1 + random.nextInt(10);
}
@Override
ThreadState getAndLock(Thread requestingThread,
DocumentsWriter documentsWriter) {
ThreadState threadState = null;
if (getActiveThreadState() == 0) {
synchronized (this) {
if (getActiveThreadState() == 0) {
threadState = states[0] = newThreadState();
return threadState;
}
}
}
assert getActiveThreadState() > 0;
for (int i = 0; i < maxRetry; i++) {
int ord = random.nextInt(getActiveThreadState());
synchronized (this) {
threadState = states[ord];
assert threadState != null;
}
if (threadState.tryLock()) {
return threadState;
}
if (random.nextInt(20) == 0) {
break;
}
}
/*
* only try to create a new threadstate if we can not lock the randomly
* selected state. this is important since some tests rely on a single
* threadstate in the single threaded case. Eventually it would be nice if
* we would not have this limitation but for now we just make sure we only
* allocate one threadstate if indexing is single threaded
*/
synchronized (this) {
ThreadState newThreadState = newThreadState();
if (newThreadState != null) { // did we get a new state?
threadState = states[getActiveThreadState() - 1] = newThreadState;
assert threadState.isHeldByCurrentThread();
return threadState;
}
// if no new state is available lock the random one
}
assert threadState != null;
threadState.lock();
return threadState;
}
}

View File

@ -915,25 +915,7 @@ public abstract class LuceneTestCase extends Assert {
int maxNumThreadStates = rarely(r) ? TestUtil.nextInt(r, 5, 20) // crazy value
: TestUtil.nextInt(r, 1, 4); // reasonable value
try {
if (rarely(r)) {
// Retrieve the package-private setIndexerThreadPool
// method:
Method setIndexerThreadPoolMethod = IndexWriterConfig.class.getDeclaredMethod("setIndexerThreadPool",
Class.forName("org.apache.lucene.index.DocumentsWriterPerThreadPool"));
setIndexerThreadPoolMethod.setAccessible(true);
Class<?> clazz = Class.forName("org.apache.lucene.index.RandomDocumentsWriterPerThreadPool");
Constructor<?> ctor = clazz.getConstructor(int.class, Random.class);
ctor.setAccessible(true);
// random thread pool
setIndexerThreadPoolMethod.invoke(c, ctor.newInstance(maxNumThreadStates, r));
} else {
// random thread pool
c.setMaxThreadStates(maxNumThreadStates);
}
} catch (Exception e) {
Rethrow.rethrow(e);
}
c.setMaxThreadStates(maxNumThreadStates);
}
c.setMergePolicy(newMergePolicy(r));