Mute accounting circuit breaker check after test (#42448)

If we close an engine while a refresh is happening, then we might leak
refCount of some SegmentReaders. We need to skip the ram accounting
circuit breaker check until we have a new Lucene snapshot which includes
the fix for LUCENE-8809.

This also adds a test to the engine but left it muted so we won't forget
to reenable this check.

Closes #30290
This commit is contained in:
Nhat Nguyen 2019-05-23 13:27:34 -04:00
parent 48dc0dca57
commit 02739d038c
3 changed files with 57 additions and 9 deletions

View File

@ -80,6 +80,7 @@ import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.TriFunction;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.Tuple;
@ -153,6 +154,7 @@ import java.util.Set;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@ -5635,4 +5637,44 @@ public class InternalEngineTests extends EngineTestCase {
rollTranslog.join();
assertMaxSeqNoInCommitUserData(engine);
}
@AwaitsFix(bugUrl = "https://issues.apache.org/jira/browse/LUCENE-8809")
public void testRefreshAndFailEngineConcurrently() throws Exception {
AtomicBoolean stopped = new AtomicBoolean();
Semaphore indexedDocs = new Semaphore(0);
Thread indexer = new Thread(() -> {
while (stopped.get() == false) {
String id = Integer.toString(randomIntBetween(1, 100));
try {
engine.index(indexForDoc(createParsedDoc(id, null)));
indexedDocs.release();
} catch (IOException e) {
throw new AssertionError(e);
} catch (AlreadyClosedException e) {
return;
}
}
});
Thread refresher = new Thread(() -> {
while (stopped.get() == false) {
try {
engine.refresh("test", randomFrom(Engine.SearcherScope.values()), randomBoolean());
} catch (AlreadyClosedException e) {
return;
}
}
});
indexer.start();
refresher.start();
indexedDocs.acquire(randomIntBetween(1, 100));
try {
engine.failEngine("test", new IOException("simulated error"));
} finally {
stopped.set(true);
indexer.join();
refresher.join();
}
assertThat(engine.config().getCircuitBreakerService().getBreaker(CircuitBreaker.ACCOUNTING).getUsed(), equalTo(0L));
}
}

View File

@ -55,6 +55,7 @@ import org.elasticsearch.common.CheckedBiFunction;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.compress.CompressedXContent;
@ -269,6 +270,8 @@ public abstract class EngineTestCase extends ESTestCase {
assertConsistentHistoryBetweenTranslogAndLuceneIndex(replicaEngine, createMapperService("test"));
assertMaxSeqNoInCommitUserData(replicaEngine);
}
assertThat(engine.config().getCircuitBreakerService().getBreaker(CircuitBreaker.ACCOUNTING).getUsed(), equalTo(0L));
assertThat(replicaEngine.config().getCircuitBreakerService().getBreaker(CircuitBreaker.ACCOUNTING).getUsed(), equalTo(0L));
} finally {
IOUtils.close(replicaEngine, storeReplica, engine, store, () -> terminate(threadPool));
}

View File

@ -2426,15 +2426,18 @@ public final class InternalTestCluster extends TestCluster {
final CircuitBreakerService breakerService = getInstanceFromNode(CircuitBreakerService.class, nodeAndClient.node);
CircuitBreaker fdBreaker = breakerService.getBreaker(CircuitBreaker.FIELDDATA);
assertThat("Fielddata breaker not reset to 0 on node: " + name, fdBreaker.getUsed(), equalTo(0L));
try {
assertBusy(() -> {
CircuitBreaker acctBreaker = breakerService.getBreaker(CircuitBreaker.ACCOUNTING);
assertThat("Accounting breaker not reset to 0 on node: " + name + ", are there still Lucene indices around?",
acctBreaker.getUsed(), equalTo(0L));
});
} catch (Exception e) {
throw new AssertionError("Exception during check for accounting breaker reset to 0", e);
}
// Mute this assertion until we have a new Lucene snapshot with https://issues.apache.org/jira/browse/LUCENE-8809.
// try {
// assertBusy(() -> {
// CircuitBreaker acctBreaker = breakerService.getBreaker(CircuitBreaker.ACCOUNTING);
// assertThat("Accounting breaker not reset to 0 on node: " + name + ", are there still Lucene indices around?",
// acctBreaker.getUsed(), equalTo(0L));
// });
// } catch (Exception e) {
// throw new AssertionError("Exception during check for accounting breaker reset to 0", e);
// }
// Anything that uses transport or HTTP can increase the
// request breaker (because they use bigarrays), because of
// that the breaker can sometimes be incremented from ping