Ensure DWPTPool never release any new DWPT after it's closed (#1751)

The DWPTPool should not release new DPWTs after it's closed. Yet, if the pool
is in a state where it's preventing new writers from being created in order to swap
the delete queue it might get closed and in that case we miss to throw an AlreadyClosedException
and release a new writer which violates the condition that the pool is empty after it's closed
and all remaining DWPTs have been aborted.
This commit is contained in:
Simon Willnauer 2020-08-14 16:04:58 +02:00 committed by GitHub
parent 150a8dacb5
commit 4267734e80
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 107 additions and 4 deletions

View File

@ -96,6 +96,10 @@ final class DocumentsWriterPerThreadPool implements Iterable<DocumentsWriterPerT
throw new ThreadInterruptedException(ie);
}
}
// we must check if we are closed since this might happen while we are waiting for the writer permit
// and if we miss that we might release a new DWPT even though the pool is closed. Yet, that wouldn't be the
// end of the world it's violating the contract that we don't release any new DWPT after this pool is closed
ensureOpen();
DocumentsWriterPerThread dwpt = dwptFactory.get();
dwpt.lock(); // lock so nobody else will get this DWPT
dwpts.add(dwpt);
@ -108,9 +112,7 @@ final class DocumentsWriterPerThreadPool implements Iterable<DocumentsWriterPerT
/** This method is used by DocumentsWriter/FlushControl to obtain a DWPT to do an indexing operation (add/updateDocument). */
DocumentsWriterPerThread getAndLock() throws IOException {
synchronized (this) {
if (closed) {
throw new AlreadyClosedException("DWPTPool is already closed");
}
ensureOpen();
// Important that we are LIFO here! This way if number of concurrent indexing threads was once high,
// but has now reduced, we only use a limited number of DWPTs. This also guarantees that if we have suddenly
// a single thread indexing
@ -127,6 +129,12 @@ final class DocumentsWriterPerThreadPool implements Iterable<DocumentsWriterPerT
}
}
private void ensureOpen() {
if (closed) {
throw new AlreadyClosedException("DWPTPool is already closed");
}
}
void marksAsFreeAndUnlock(DocumentsWriterPerThread state) {
synchronized (this) {
assert dwpts.contains(state) : "we tried to add a DWPT back to the pool but the pool doesn't know aobut this DWPT";

View File

@ -0,0 +1,96 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.lucene.index;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.Directory;
import org.apache.lucene.util.LuceneTestCase;
import org.apache.lucene.util.Version;
public class TestDocumentsWriterPerThreadPool extends LuceneTestCase {
public void testLockReleaseAndClose() throws IOException {
try (Directory directory = newDirectory()) {
DocumentsWriterPerThreadPool pool = new DocumentsWriterPerThreadPool(() ->
new DocumentsWriterPerThread(Version.LATEST.major, "", directory, directory,
newIndexWriterConfig(), new DocumentsWriterDeleteQueue(null), null, new AtomicLong(), false));
DocumentsWriterPerThread first = pool.getAndLock();
assertEquals(1, pool.size());
DocumentsWriterPerThread second = pool.getAndLock();
assertEquals(2, pool.size());
pool.marksAsFreeAndUnlock(first);
assertEquals(2, pool.size());
DocumentsWriterPerThread third = pool.getAndLock();
assertSame(first, third);
assertEquals(2, pool.size());
pool.checkout(third);
assertEquals(1, pool.size());
pool.close();
assertEquals(1, pool.size());
pool.marksAsFreeAndUnlock(second);
assertEquals(1, pool.size());
for (DocumentsWriterPerThread lastPerThead : pool.filterAndLock(x -> true)) {
pool.checkout(lastPerThead);
lastPerThead.unlock();
}
assertEquals(0, pool.size());
}
}
public void testCloseWhileNewWritersLocked() throws IOException, InterruptedException {
try (Directory directory = newDirectory()) {
DocumentsWriterPerThreadPool pool = new DocumentsWriterPerThreadPool(() ->
new DocumentsWriterPerThread(Version.LATEST.major, "", directory, directory,
newIndexWriterConfig(), new DocumentsWriterDeleteQueue(null), null, new AtomicLong(), false));
DocumentsWriterPerThread first = pool.getAndLock();
pool.lockNewWriters();
CountDownLatch latch = new CountDownLatch(1);
Thread t = new Thread(() -> {
try {
latch.countDown();
pool.getAndLock();
fail();
} catch (AlreadyClosedException e) {
// fine
} catch (IOException e) {
throw new AssertionError(e);
}
});
t.start();
latch.await();
while (t.getState().equals(Thread.State.WAITING) == false) {
Thread.yield();
}
first.unlock();
pool.close();
pool.unlockNewWriters();
for (DocumentsWriterPerThread perThread : pool.filterAndLock(x -> true)) {
assertTrue(pool.checkout(perThread));
perThread.unlock();
}
assertEquals(0, pool.size());
}
}
}

View File

@ -59,7 +59,6 @@ public class TestIndexWriterWithThreads extends LuceneTestCase {
private final CyclicBarrier syncStart;
boolean diskFull;
Throwable error;
AlreadyClosedException ace;
IndexWriter writer;
boolean noErrors;
volatile int addCount;