From 8806d866fc169261fe176197815f3011380a8470 Mon Sep 17 00:00:00 2001
From: Michael McCandless
null
*/
- synchronized ThreadState newThreadState() {
- if (numThreadStatesActive < threadStates.length) {
- final ThreadState threadState = threadStates[numThreadStatesActive];
- threadState.lock(); // lock so nobody else will get this ThreadState
- boolean unlock = true;
- try {
- if (threadState.isActive()) {
- // unreleased thread states are deactivated during DW#close()
- numThreadStatesActive++; // increment will publish the ThreadState
- assert threadState.dwpt == null;
- unlock = false;
- return threadState;
- }
- // unlock since the threadstate is not active anymore - we are closed!
- assert assertUnreleasedThreadStatesInactive();
- return null;
- } finally {
- if (unlock) {
- // in any case make sure we unlock if we fail
- threadState.unlock();
- }
+ private ThreadState newThreadState() {
+ assert numThreadStatesActive < threadStates.length;
+ final ThreadState threadState = threadStates[numThreadStatesActive];
+ threadState.lock(); // lock so nobody else will get this ThreadState
+ boolean unlock = true;
+ try {
+ if (threadState.isActive()) {
+ // unreleased thread states are deactivated during DW#close()
+ numThreadStatesActive++; // increment will publish the ThreadState
+ //System.out.println("activeCount=" + numThreadStatesActive);
+ assert threadState.dwpt == null;
+ unlock = false;
+ return threadState;
+ }
+ // we are closed: unlock since the threadstate is not active anymore
+ assert assertUnreleasedThreadStatesInactive();
+ return null;
+ } finally {
+ if (unlock) {
+ // in any case make sure we unlock if we fail
+ threadState.unlock();
}
}
- return null;
}
private synchronized boolean assertUnreleasedThreadStatesInactive() {
@@ -240,6 +234,9 @@ abstract class DocumentsWriterPerThreadPool implements Cloneable {
threadState.unlock();
}
}
+
+ // In case any threads are waiting for indexing:
+ notifyAll();
}
DocumentsWriterPerThread reset(ThreadState threadState, boolean closed) {
@@ -256,11 +253,48 @@ abstract class DocumentsWriterPerThreadPool implements Cloneable {
void recycle(DocumentsWriterPerThread dwpt) {
// don't recycle DWPT by default
}
-
- // you cannot subclass this without being in o.a.l.index package anyway, so
- // the class is already pkg-private... fix me: see LUCENE-4013
- abstract ThreadState getAndLock(Thread requestingThread, DocumentsWriter documentsWriter);
+ /** This method is used by DocumentsWriter/FlushControl to obtain a ThreadState to do an indexing operation (add/updateDocument). */
+ ThreadState getAndLock(Thread requestingThread, DocumentsWriter documentsWriter) {
+ ThreadState threadState = null;
+ synchronized (this) {
+ while (true) {
+ if (freeCount > 0) {
+ // Important that we are LIFO here! This way if number of concurrent indexing threads was once high, but has now reduced, we only use a
+ // limited number of thread states:
+ threadState = freeList[freeCount-1];
+ freeCount--;
+ break;
+ } else if (numThreadStatesActive < threadStates.length) {
+ // ThreadState is already locked before return by this method:
+ return newThreadState();
+ } else {
+ // Wait until a thread state frees up:
+ try {
+ wait();
+ } catch (InterruptedException ie) {
+ throw new ThreadInterruptedException(ie);
+ }
+ }
+ }
+ }
+
+ // This could take time, e.g. if the threadState is [briefly] checked for flushing:
+ threadState.lock();
+
+ return threadState;
+ }
+
+ void release(ThreadState state) {
+ state.unlock();
+ synchronized (this) {
+ assert freeCount < freeList.length;
+ freeList[freeCount++] = state;
+ // In case any thread is waiting, wake one of them up since we just released a thread state; notify() should be sufficient but we do
+ // notifyAll defensively:
+ notifyAll();
+ }
+ }
/**
* Returns the ith active {@link ThreadState} where i is the
diff --git a/lucene/core/src/java/org/apache/lucene/index/IndexWriterConfig.java b/lucene/core/src/java/org/apache/lucene/index/IndexWriterConfig.java
index eb295a7c50f..b06126797df 100644
--- a/lucene/core/src/java/org/apache/lucene/index/IndexWriterConfig.java
+++ b/lucene/core/src/java/org/apache/lucene/index/IndexWriterConfig.java
@@ -345,11 +345,7 @@ public final class IndexWriterConfig extends LiveIndexWriterConfig implements Cl
}
/** Expert: Sets the {@link DocumentsWriterPerThreadPool} instance used by the
- * IndexWriter to assign thread-states to incoming indexing threads. If no
- * {@link DocumentsWriterPerThreadPool} is set {@link IndexWriter} will use
- * {@link ThreadAffinityDocumentsWriterThreadPool} with max number of
- * thread-states set to {@link #DEFAULT_MAX_THREAD_STATES} (see
- * {@link #DEFAULT_MAX_THREAD_STATES}).
+ * IndexWriter to assign thread-states to incoming indexing threads.
*
* * NOTE: The given {@link DocumentsWriterPerThreadPool} instance must not be used with @@ -379,17 +375,13 @@ public final class IndexWriterConfig extends LiveIndexWriterConfig implements Cl * *
Only takes effect when IndexWriter is first created. */
public IndexWriterConfig setMaxThreadStates(int maxThreadStates) {
- this.indexerThreadPool = new ThreadAffinityDocumentsWriterThreadPool(maxThreadStates);
+ this.indexerThreadPool = new DocumentsWriterPerThreadPool(maxThreadStates);
return this;
}
@Override
public int getMaxThreadStates() {
- try {
- return ((ThreadAffinityDocumentsWriterThreadPool) indexerThreadPool).getMaxThreadStates();
- } catch (ClassCastException cce) {
- throw new IllegalStateException(cce);
- }
+ return indexerThreadPool.getMaxThreadStates();
}
/** By default, IndexWriter does not pool the
diff --git a/lucene/core/src/java/org/apache/lucene/index/LiveIndexWriterConfig.java b/lucene/core/src/java/org/apache/lucene/index/LiveIndexWriterConfig.java
index f4f47d7846f..f3a3e780db1 100644
--- a/lucene/core/src/java/org/apache/lucene/index/LiveIndexWriterConfig.java
+++ b/lucene/core/src/java/org/apache/lucene/index/LiveIndexWriterConfig.java
@@ -125,7 +125,7 @@ public class LiveIndexWriterConfig {
mergePolicy = new TieredMergePolicy();
flushPolicy = new FlushByRamOrCountsPolicy();
readerPooling = IndexWriterConfig.DEFAULT_READER_POOLING;
- indexerThreadPool = new ThreadAffinityDocumentsWriterThreadPool(IndexWriterConfig.DEFAULT_MAX_THREAD_STATES);
+ indexerThreadPool = new DocumentsWriterPerThreadPool(IndexWriterConfig.DEFAULT_MAX_THREAD_STATES);
perThreadHardLimitMB = IndexWriterConfig.DEFAULT_RAM_PER_THREAD_HARD_LIMIT_MB;
}
@@ -404,11 +404,7 @@ public class LiveIndexWriterConfig {
* documents at once in IndexWriter.
*/
public int getMaxThreadStates() {
- try {
- return ((ThreadAffinityDocumentsWriterThreadPool) indexerThreadPool).getMaxThreadStates();
- } catch (ClassCastException cce) {
- throw new IllegalStateException(cce);
- }
+ return indexerThreadPool.getMaxThreadStates();
}
/**
diff --git a/lucene/core/src/java/org/apache/lucene/index/ThreadAffinityDocumentsWriterThreadPool.java b/lucene/core/src/java/org/apache/lucene/index/ThreadAffinityDocumentsWriterThreadPool.java
deleted file mode 100644
index 047e5492ebe..00000000000
--- a/lucene/core/src/java/org/apache/lucene/index/ThreadAffinityDocumentsWriterThreadPool.java
+++ /dev/null
@@ -1,111 +0,0 @@
-package org.apache.lucene.index;
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import java.util.Map;
-import java.util.Iterator;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.lucene.index.DocumentsWriterPerThreadPool.ThreadState; //javadoc
-
-/**
- * A {@link DocumentsWriterPerThreadPool} implementation that tries to assign an
- * indexing thread to the same {@link ThreadState} each time the thread tries to
- * obtain a {@link ThreadState}. Once a new {@link ThreadState} is created it is
- * associated with the creating thread. Subsequently, if the threads associated
- * {@link ThreadState} is not in use it will be associated with the requesting
- * thread. Otherwise, if the {@link ThreadState} is used by another thread
- * {@link ThreadAffinityDocumentsWriterThreadPool} tries to find the currently
- * minimal contended {@link ThreadState}.
- */
-class ThreadAffinityDocumentsWriterThreadPool extends DocumentsWriterPerThreadPool {
- private MapDocumentsWriterPerThreadPool
that selects thread states at random.
- *
- * @lucene.internal
- * @lucene.experimental
- */
-class RandomDocumentsWriterPerThreadPool extends DocumentsWriterPerThreadPool {
- private final ThreadState[] states;
- private final Random random;
- private final int maxRetry;
-
- public RandomDocumentsWriterPerThreadPool(int maxNumPerThreads, Random random) {
- super(maxNumPerThreads);
- assert getMaxThreadStates() >= 1;
- states = new ThreadState[maxNumPerThreads];
- this.random = new Random(random.nextLong());
- this.maxRetry = 1 + random.nextInt(10);
- }
-
- @Override
- ThreadState getAndLock(Thread requestingThread,
- DocumentsWriter documentsWriter) {
- ThreadState threadState = null;
- if (getActiveThreadState() == 0) {
- synchronized (this) {
- if (getActiveThreadState() == 0) {
- threadState = states[0] = newThreadState();
- return threadState;
- }
- }
- }
- assert getActiveThreadState() > 0;
- for (int i = 0; i < maxRetry; i++) {
- int ord = random.nextInt(getActiveThreadState());
- synchronized (this) {
- threadState = states[ord];
- assert threadState != null;
- }
-
- if (threadState.tryLock()) {
- return threadState;
- }
- if (random.nextInt(20) == 0) {
- break;
- }
- }
- /*
- * only try to create a new threadstate if we can not lock the randomly
- * selected state. this is important since some tests rely on a single
- * threadstate in the single threaded case. Eventually it would be nice if
- * we would not have this limitation but for now we just make sure we only
- * allocate one threadstate if indexing is single threaded
- */
-
- synchronized (this) {
- ThreadState newThreadState = newThreadState();
- if (newThreadState != null) { // did we get a new state?
- threadState = states[getActiveThreadState() - 1] = newThreadState;
- assert threadState.isHeldByCurrentThread();
- return threadState;
- }
- // if no new state is available lock the random one
- }
- assert threadState != null;
- threadState.lock();
- return threadState;
- }
-
-}
diff --git a/lucene/test-framework/src/java/org/apache/lucene/util/LuceneTestCase.java b/lucene/test-framework/src/java/org/apache/lucene/util/LuceneTestCase.java
index 1baeba91d5d..13a73fbe7ed 100644
--- a/lucene/test-framework/src/java/org/apache/lucene/util/LuceneTestCase.java
+++ b/lucene/test-framework/src/java/org/apache/lucene/util/LuceneTestCase.java
@@ -915,25 +915,7 @@ public abstract class LuceneTestCase extends Assert {
int maxNumThreadStates = rarely(r) ? TestUtil.nextInt(r, 5, 20) // crazy value
: TestUtil.nextInt(r, 1, 4); // reasonable value
- try {
- if (rarely(r)) {
- // Retrieve the package-private setIndexerThreadPool
- // method:
- Method setIndexerThreadPoolMethod = IndexWriterConfig.class.getDeclaredMethod("setIndexerThreadPool",
- Class.forName("org.apache.lucene.index.DocumentsWriterPerThreadPool"));
- setIndexerThreadPoolMethod.setAccessible(true);
- Class> clazz = Class.forName("org.apache.lucene.index.RandomDocumentsWriterPerThreadPool");
- Constructor> ctor = clazz.getConstructor(int.class, Random.class);
- ctor.setAccessible(true);
- // random thread pool
- setIndexerThreadPoolMethod.invoke(c, ctor.newInstance(maxNumThreadStates, r));
- } else {
- // random thread pool
- c.setMaxThreadStates(maxNumThreadStates);
- }
- } catch (Exception e) {
- Rethrow.rethrow(e);
- }
+ c.setMaxThreadStates(maxNumThreadStates);
}
c.setMergePolicy(newMergePolicy(r));