[TEST] Remove searchers from tracking map once they are closed

This commit is contained in:
Simon Willnauer 2015-04-29 09:33:20 +02:00
parent bf09e58cb3
commit f18f623878
2 changed files with 53 additions and 27 deletions

View File

@ -21,15 +21,10 @@ package org.elasticsearch.test.engine;
import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.IndexReader;
import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
import java.io.Closeable;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
/** /**
@ -57,15 +52,6 @@ class AssertingSearcher extends Engine.Searcher {
initialRefCount = wrappedSearcher.reader().getRefCount(); initialRefCount = wrappedSearcher.reader().getRefCount();
this.indexSearcher = indexSearcher; this.indexSearcher = indexSearcher;
assert initialRefCount > 0 : "IndexReader#getRefCount() was [" + initialRefCount + "] expected a value > [0] - reader is already closed"; assert initialRefCount > 0 : "IndexReader#getRefCount() was [" + initialRefCount + "] expected a value > [0] - reader is already closed";
final RuntimeException ex = new RuntimeException("Unreleased Searcher, source [" + wrappedSearcher.source() + "]");
LuceneTestCase.closeAfterSuite(new Closeable() {
@Override
public void close() throws IOException {
if (closed.get() == false) {
throw ex;
}
}
});
} }
@Override @Override
@ -74,7 +60,7 @@ class AssertingSearcher extends Engine.Searcher {
} }
@Override @Override
public void close() throws ElasticsearchException { public void close() {
synchronized (lock) { synchronized (lock) {
if (closed.compareAndSet(false, true)) { if (closed.compareAndSet(false, true)) {
firstReleaseStack = new RuntimeException(); firstReleaseStack = new RuntimeException();
@ -109,4 +95,8 @@ class AssertingSearcher extends Engine.Searcher {
public ShardId shardId() { public ShardId shardId() {
return shardId; return shardId;
} }
public boolean isOpen() {
return closed.get() == false;
}
} }

View File

@ -25,6 +25,7 @@ import org.apache.lucene.index.IndexReader;
import org.apache.lucene.search.AssertingIndexSearcher; import org.apache.lucene.search.AssertingIndexSearcher;
import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.SearcherManager; import org.apache.lucene.search.SearcherManager;
import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.logging.Loggers;
@ -32,17 +33,14 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineConfig; import org.elasticsearch.index.engine.EngineConfig;
import org.elasticsearch.index.engine.EngineException; import org.elasticsearch.index.engine.EngineException;
import org.elasticsearch.index.engine.InternalEngine;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.ElasticsearchIntegrationTest; import org.elasticsearch.test.ElasticsearchIntegrationTest;
import java.io.Closeable; import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import java.lang.reflect.Constructor; import java.lang.reflect.Constructor;
import java.util.Map; import java.util.IdentityHashMap;
import java.util.Random; import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
/** /**
@ -54,15 +52,18 @@ public final class MockEngineSupport {
public static final String WRAP_READER_RATIO = "index.engine.mock.random.wrap_reader_ratio"; public static final String WRAP_READER_RATIO = "index.engine.mock.random.wrap_reader_ratio";
public static final String READER_WRAPPER_TYPE = "index.engine.mock.random.wrapper"; public static final String READER_WRAPPER_TYPE = "index.engine.mock.random.wrapper";
public static final String FLUSH_ON_CLOSE_RATIO = "index.engine.mock.flush_on_close.ratio"; public static final String FLUSH_ON_CLOSE_RATIO = "index.engine.mock.flush_on_close.ratio";
private final AtomicBoolean closing = new AtomicBoolean(false); private final AtomicBoolean closing = new AtomicBoolean(false);
private final ESLogger logger = Loggers.getLogger(Engine.class); private final ESLogger logger = Loggers.getLogger(Engine.class);
private final ShardId shardId; private final ShardId shardId;
private final SearcherCloseable searcherCloseable;
private final MockContext mockContext;
public static class MockContext { public static class MockContext {
public final Random random; private final Random random;
public final boolean wrapReader; private final boolean wrapReader;
public final Class<? extends FilterDirectoryReader> wrapper; private final Class<? extends FilterDirectoryReader> wrapper;
public final Settings indexSettings; private final Settings indexSettings;
private final double flushOnClose; private final double flushOnClose;
public MockContext(Random random, boolean wrapReader, Class<? extends FilterDirectoryReader> wrapper, Settings indexSettings) { public MockContext(Random random, boolean wrapReader, Class<? extends FilterDirectoryReader> wrapper, Settings indexSettings) {
@ -74,9 +75,6 @@ public final class MockEngineSupport {
} }
} }
private final MockContext mockContext;
public MockEngineSupport(EngineConfig config) { public MockEngineSupport(EngineConfig config) {
Settings indexSettings = config.getIndexSettings(); Settings indexSettings = config.getIndexSettings();
shardId = config.getShardId(); shardId = config.getShardId();
@ -89,6 +87,8 @@ public final class MockEngineSupport {
logger.trace("Using [{}] for shard [{}] seed: [{}] wrapReader: [{}]", this.getClass().getName(), config.getShardId(), seed, wrapReader); logger.trace("Using [{}] for shard [{}] seed: [{}] wrapReader: [{}]", this.getClass().getName(), config.getShardId(), seed, wrapReader);
} }
mockContext = new MockContext(random, wrapReader, wrapper, indexSettings); mockContext = new MockContext(random, wrapReader, wrapper, indexSettings);
this.searcherCloseable = new SearcherCloseable();
LuceneTestCase.closeAfterSuite(searcherCloseable); // only one suite closeable per Engine
} }
enum CloseAction { enum CloseAction {
@ -176,8 +176,44 @@ public final class MockEngineSupport {
// pass the original searcher to the super.newSearcher() method to make sure this is the searcher that will // pass the original searcher to the super.newSearcher() method to make sure this is the searcher that will
// be released later on. If we wrap an index reader here must not pass the wrapped version to the manager // be released later on. If we wrap an index reader here must not pass the wrapped version to the manager
// on release otherwise the reader will be closed too early. - good news, stuff will fail all over the place if we don't get this right here // on release otherwise the reader will be closed too early. - good news, stuff will fail all over the place if we don't get this right here
return new AssertingSearcher(assertingIndexSearcher, engineSearcher, shardId, logger); AssertingSearcher assertingSearcher = new AssertingSearcher(assertingIndexSearcher, engineSearcher, shardId, logger) {
@Override
public void close() {
try {
searcherCloseable.remove(this);
} finally {
super.close();
}
}
};
searcherCloseable.add(assertingSearcher, engineSearcher.source());
return assertingSearcher;
} }
private static final class SearcherCloseable implements Closeable {
private final IdentityHashMap<AssertingSearcher, RuntimeException> openSearchers = new IdentityHashMap<>();
@Override
public synchronized void close() throws IOException {
if (openSearchers.isEmpty() == false) {
AssertionError error = new AssertionError("Unreleased searchers found");
for (RuntimeException ex : openSearchers.values()) {
error.addSuppressed(ex);
}
throw error;
}
}
void add(AssertingSearcher searcher, String source) {
final RuntimeException ex = new RuntimeException("Unreleased Searcher, source [" + source+ "]");
synchronized (this) {
openSearchers.put(searcher, ex);
}
}
synchronized void remove(AssertingSearcher searcher) {
openSearchers.remove(searcher);
}
}
} }