[TEST] Share code for mock engines
Today we have duplicated logic in the MockInternal and MockShadowEngine since they need to subclass the actual engine. This commit shares the most of the code making it easier to add mock engines in the future.
This commit is contained in:
parent
daa25d1b20
commit
982da25f6e
|
@ -35,7 +35,7 @@ import org.elasticsearch.common.settings.Settings;
|
|||
import org.elasticsearch.index.shard.IndexShard;
|
||||
import org.elasticsearch.monitor.fs.FsStats;
|
||||
import org.elasticsearch.test.ElasticsearchIntegrationTest;
|
||||
import org.elasticsearch.test.engine.MockInternalEngine;
|
||||
import org.elasticsearch.test.engine.MockEngineSupport;
|
||||
import org.elasticsearch.test.junit.annotations.TestLogging;
|
||||
import org.elasticsearch.test.transport.MockTransportService;
|
||||
import org.elasticsearch.transport.TransportModule;
|
||||
|
@ -78,7 +78,7 @@ public class CorruptedTranslogTests extends ElasticsearchIntegrationTest {
|
|||
.put("index.number_of_shards", 1)
|
||||
.put("index.number_of_replicas", 0)
|
||||
.put("index.refresh_interval", "-1")
|
||||
.put(MockInternalEngine.FLUSH_ON_CLOSE_RATIO, 0.0d) // never flush - always recover from translog
|
||||
.put(MockEngineSupport.FLUSH_ON_CLOSE_RATIO, 0.0d) // never flush - always recover from translog
|
||||
.put(IndexShard.INDEX_FLUSH_ON_CLOSE, false) // never flush - always recover from translog
|
||||
.put("index.gateway.local.sync", "1s") // fsync the translog every second
|
||||
));
|
||||
|
|
|
@ -38,7 +38,7 @@ import org.elasticsearch.index.query.QueryBuilders;
|
|||
import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache;
|
||||
import org.elasticsearch.search.sort.SortOrder;
|
||||
import org.elasticsearch.test.ElasticsearchIntegrationTest;
|
||||
import org.elasticsearch.test.engine.MockInternalEngine;
|
||||
import org.elasticsearch.test.engine.MockEngineSupport;
|
||||
import org.elasticsearch.test.engine.ThrowingLeafReaderWrapper;
|
||||
import org.junit.Test;
|
||||
|
||||
|
@ -109,10 +109,10 @@ public class RandomExceptionCircuitBreakerTests extends ElasticsearchIntegration
|
|||
|
||||
ImmutableSettings.Builder settings = settingsBuilder()
|
||||
.put(indexSettings())
|
||||
.put(MockInternalEngine.READER_WRAPPER_TYPE, RandomExceptionDirectoryReaderWrapper.class.getName())
|
||||
.put(MockEngineSupport.READER_WRAPPER_TYPE, RandomExceptionDirectoryReaderWrapper.class.getName())
|
||||
.put(EXCEPTION_TOP_LEVEL_RATIO_KEY, topLevelRate)
|
||||
.put(EXCEPTION_LOW_LEVEL_RATIO_KEY, lowLevelRate)
|
||||
.put(MockInternalEngine.WRAP_READER_RATIO, 1.0d);
|
||||
.put(MockEngineSupport.WRAP_READER_RATIO, 1.0d);
|
||||
logger.info("creating index: [test] using settings: [{}]", settings.build().getAsMap());
|
||||
client().admin().indices().prepareCreate("test")
|
||||
.setSettings(settings)
|
||||
|
@ -202,7 +202,7 @@ public class RandomExceptionCircuitBreakerTests extends ElasticsearchIntegration
|
|||
public static final String EXCEPTION_LOW_LEVEL_RATIO_KEY = "index.engine.exception.ratio.low";
|
||||
|
||||
// TODO: Generalize this class and add it as a utility
|
||||
public static class RandomExceptionDirectoryReaderWrapper extends MockInternalEngine.DirectoryReaderWrapper {
|
||||
public static class RandomExceptionDirectoryReaderWrapper extends MockEngineSupport.DirectoryReaderWrapper {
|
||||
private final Settings settings;
|
||||
|
||||
static class ThrowingSubReaderWrapper extends SubReaderWrapper implements ThrowingLeafReaderWrapper.Thrower {
|
||||
|
|
|
@ -20,6 +20,7 @@
|
|||
package org.elasticsearch.search.basic;
|
||||
|
||||
import org.apache.lucene.index.DirectoryReader;
|
||||
import org.apache.lucene.index.FilterDirectoryReader;
|
||||
import org.apache.lucene.index.LeafReader;
|
||||
import org.apache.lucene.util.English;
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
|
@ -37,7 +38,7 @@ import org.elasticsearch.index.query.QueryBuilders;
|
|||
import org.elasticsearch.search.SearchHit;
|
||||
import org.elasticsearch.search.sort.SortOrder;
|
||||
import org.elasticsearch.test.ElasticsearchIntegrationTest;
|
||||
import org.elasticsearch.test.engine.MockInternalEngine;
|
||||
import org.elasticsearch.test.engine.MockEngineSupport;
|
||||
import org.elasticsearch.test.engine.ThrowingLeafReaderWrapper;
|
||||
import org.elasticsearch.test.junit.annotations.TestLogging;
|
||||
import org.elasticsearch.test.store.MockDirectoryHelper;
|
||||
|
@ -249,10 +250,10 @@ public class SearchWithRandomExceptionsTests extends ElasticsearchIntegrationTes
|
|||
|
||||
Builder settings = settingsBuilder()
|
||||
.put(indexSettings())
|
||||
.put(MockInternalEngine.READER_WRAPPER_TYPE, RandomExceptionDirectoryReaderWrapper.class.getName())
|
||||
.put(MockEngineSupport.READER_WRAPPER_TYPE, RandomExceptionDirectoryReaderWrapper.class.getName())
|
||||
.put(EXCEPTION_TOP_LEVEL_RATIO_KEY, topLevelRate)
|
||||
.put(EXCEPTION_LOW_LEVEL_RATIO_KEY, lowLevelRate)
|
||||
.put(MockInternalEngine.WRAP_READER_RATIO, 1.0d);
|
||||
.put(MockEngineSupport.WRAP_READER_RATIO, 1.0d);
|
||||
logger.info("creating index: [test] using settings: [{}]", settings.build().getAsMap());
|
||||
assertAcked(prepareCreate("test")
|
||||
.setSettings(settings)
|
||||
|
@ -308,10 +309,10 @@ public class SearchWithRandomExceptionsTests extends ElasticsearchIntegrationTes
|
|||
public static final String EXCEPTION_LOW_LEVEL_RATIO_KEY = "index.engine.exception.ratio.low";
|
||||
|
||||
|
||||
public static class RandomExceptionDirectoryReaderWrapper extends MockInternalEngine.DirectoryReaderWrapper {
|
||||
public static class RandomExceptionDirectoryReaderWrapper extends MockEngineSupport.DirectoryReaderWrapper {
|
||||
private final Settings settings;
|
||||
|
||||
static class ThrowingSubReaderWrapper extends SubReaderWrapper implements ThrowingLeafReaderWrapper.Thrower {
|
||||
static class ThrowingSubReaderWrapper extends FilterDirectoryReader.SubReaderWrapper implements ThrowingLeafReaderWrapper.Thrower {
|
||||
private final Random random;
|
||||
private final double topLevelRatio;
|
||||
private final double lowLevelRatio;
|
||||
|
|
|
@ -0,0 +1,180 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.elasticsearch.test.engine;
|
||||
|
||||
import org.apache.lucene.index.AssertingDirectoryReader;
|
||||
import org.apache.lucene.index.DirectoryReader;
|
||||
import org.apache.lucene.index.FilterDirectoryReader;
|
||||
import org.apache.lucene.index.IndexReader;
|
||||
import org.apache.lucene.search.AssertingIndexSearcher;
|
||||
import org.apache.lucene.search.IndexSearcher;
|
||||
import org.apache.lucene.search.SearcherManager;
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.common.logging.ESLogger;
|
||||
import org.elasticsearch.common.logging.Loggers;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.index.engine.Engine;
|
||||
import org.elasticsearch.index.engine.EngineConfig;
|
||||
import org.elasticsearch.index.engine.EngineException;
|
||||
import org.elasticsearch.index.engine.InternalEngine;
|
||||
import org.elasticsearch.test.ElasticsearchIntegrationTest;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.lang.reflect.Constructor;
|
||||
import java.util.Map;
|
||||
import java.util.Random;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
/**
|
||||
* Support class to build MockEngines like {@link org.elasticsearch.test.engine.MockInternalEngine} or {@link org.elasticsearch.test.engine.MockShadowEngine}
|
||||
* since they need to subclass the actual engine
|
||||
*/
|
||||
public final class MockEngineSupport {
|
||||
|
||||
public static final String WRAP_READER_RATIO = "index.engine.mock.random.wrap_reader_ratio";
|
||||
public static final String READER_WRAPPER_TYPE = "index.engine.mock.random.wrapper";
|
||||
public static final String FLUSH_ON_CLOSE_RATIO = "index.engine.mock.flush_on_close.ratio";
|
||||
private final AtomicBoolean closing = new AtomicBoolean(false);
|
||||
private final ESLogger logger = Loggers.getLogger(Engine.class);
|
||||
|
||||
public static class MockContext {
|
||||
public final Random random;
|
||||
public final boolean wrapReader;
|
||||
public final Class<? extends FilterDirectoryReader> wrapper;
|
||||
public final Settings indexSettings;
|
||||
private final double flushOnClose;
|
||||
|
||||
public MockContext(Random random, boolean wrapReader, Class<? extends FilterDirectoryReader> wrapper, Settings indexSettings) {
|
||||
this.random = random;
|
||||
this.wrapReader = wrapReader;
|
||||
this.wrapper = wrapper;
|
||||
this.indexSettings = indexSettings;
|
||||
flushOnClose = indexSettings.getAsDouble(FLUSH_ON_CLOSE_RATIO, 0.5d);
|
||||
}
|
||||
}
|
||||
|
||||
public static final ConcurrentMap<AssertingSearcher, RuntimeException> INFLIGHT_ENGINE_SEARCHERS = new ConcurrentHashMap<>();
|
||||
|
||||
private final MockContext mockContext;
|
||||
|
||||
public MockEngineSupport(EngineConfig config) {
|
||||
Settings indexSettings = config.getIndexSettings();
|
||||
final long seed = indexSettings.getAsLong(ElasticsearchIntegrationTest.SETTING_INDEX_SEED, 0l);
|
||||
Random random = new Random(seed);
|
||||
final double ratio = indexSettings.getAsDouble(WRAP_READER_RATIO, 0.0d); // DISABLED by default - AssertingDR is crazy slow
|
||||
Class<? extends AssertingDirectoryReader> wrapper = indexSettings.getAsClass(READER_WRAPPER_TYPE, AssertingDirectoryReader.class);
|
||||
boolean wrapReader = random.nextDouble() < ratio;
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("Using [{}] for shard [{}] seed: [{}] wrapReader: [{}]", this.getClass().getName(), config.getShardId(), seed, wrapReader);
|
||||
}
|
||||
mockContext = new MockContext(random, wrapReader, wrapper, indexSettings);
|
||||
}
|
||||
|
||||
enum CloseAction {
|
||||
FLUSH_AND_CLOSE,
|
||||
CLOSE;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Returns the CloseAction to execute on the actual engine. Note this method changes the state on
|
||||
* the first call and treats subsequent calls as if the engine passed is already closed.
|
||||
*/
|
||||
public CloseAction flushOrClose(Engine engine, CloseAction originalAction) throws IOException {
|
||||
try {
|
||||
if (closing.compareAndSet(false, true)) { // only do the random thing if we are the first call to this since super.flushOnClose() calls #close() again and then we might end up with a stackoverflow.
|
||||
if (mockContext.flushOnClose > mockContext.random.nextDouble()) {
|
||||
return CloseAction.FLUSH_AND_CLOSE;
|
||||
} else {
|
||||
return CloseAction.CLOSE;
|
||||
}
|
||||
} else {
|
||||
return originalAction;
|
||||
}
|
||||
} finally {
|
||||
if (logger.isTraceEnabled()) {
|
||||
// log debug if we have pending searchers
|
||||
for (Map.Entry<AssertingSearcher, RuntimeException> entry : INFLIGHT_ENGINE_SEARCHERS.entrySet()) {
|
||||
logger.trace("Unreleased Searchers instance for shard [{}]",
|
||||
entry.getValue(), entry.getKey().shardId());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public AssertingIndexSearcher newSearcher(Engine engine, String source, IndexSearcher searcher, SearcherManager manager) throws EngineException {
|
||||
IndexReader reader = searcher.getIndexReader();
|
||||
IndexReader wrappedReader = reader;
|
||||
assert reader != null;
|
||||
if (reader instanceof DirectoryReader && mockContext.wrapReader) {
|
||||
wrappedReader = wrapReader((DirectoryReader) reader, engine);
|
||||
}
|
||||
// this executes basic query checks and asserts that weights are normalized only once etc.
|
||||
final AssertingIndexSearcher assertingIndexSearcher = new AssertingIndexSearcher(mockContext.random, wrappedReader);
|
||||
assertingIndexSearcher.setSimilarity(searcher.getSimilarity());
|
||||
return assertingIndexSearcher;
|
||||
}
|
||||
|
||||
private DirectoryReader wrapReader(DirectoryReader reader, Engine engine) {
|
||||
try {
|
||||
Constructor<?>[] constructors = mockContext.wrapper.getConstructors();
|
||||
Constructor<?> nonRandom = null;
|
||||
for (Constructor<?> constructor : constructors) {
|
||||
Class<?>[] parameterTypes = constructor.getParameterTypes();
|
||||
if (parameterTypes.length > 0 && parameterTypes[0] == DirectoryReader.class) {
|
||||
if (parameterTypes.length == 1) {
|
||||
nonRandom = constructor;
|
||||
} else if (parameterTypes.length == 2 && parameterTypes[1] == Settings.class) {
|
||||
|
||||
return (DirectoryReader) constructor.newInstance(reader, mockContext.indexSettings);
|
||||
}
|
||||
}
|
||||
}
|
||||
if (nonRandom != null) {
|
||||
return (DirectoryReader) nonRandom.newInstance(reader);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
throw new ElasticsearchException("Can not wrap reader", e);
|
||||
}
|
||||
return reader;
|
||||
}
|
||||
|
||||
public static abstract class DirectoryReaderWrapper extends FilterDirectoryReader {
|
||||
protected final SubReaderWrapper subReaderWrapper;
|
||||
|
||||
public DirectoryReaderWrapper(DirectoryReader in, SubReaderWrapper subReaderWrapper) throws IOException {
|
||||
super(in, subReaderWrapper);
|
||||
this.subReaderWrapper = subReaderWrapper;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object getCoreCacheKey() {
|
||||
return in.getCoreCacheKey();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object getCombinedCoreAndDeletesKey() {
|
||||
return in.getCombinedCoreAndDeletesKey();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
|
@ -18,176 +18,65 @@
|
|||
*/
|
||||
package org.elasticsearch.test.engine;
|
||||
|
||||
import org.apache.lucene.index.AssertingDirectoryReader;
|
||||
import org.apache.lucene.index.DirectoryReader;
|
||||
import org.apache.lucene.index.FilterDirectoryReader;
|
||||
import org.apache.lucene.index.IndexReader;
|
||||
import org.apache.lucene.search.AssertingIndexSearcher;
|
||||
import org.apache.lucene.search.IndexSearcher;
|
||||
import org.apache.lucene.search.SearcherManager;
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.index.engine.EngineConfig;
|
||||
import org.elasticsearch.index.engine.EngineException;
|
||||
import org.elasticsearch.index.engine.InternalEngine;
|
||||
import org.elasticsearch.test.ElasticsearchIntegrationTest;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.lang.reflect.Constructor;
|
||||
import java.util.Map;
|
||||
import java.util.Random;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
public class MockInternalEngine extends InternalEngine {
|
||||
public static final String WRAP_READER_RATIO = "index.engine.mock.random.wrap_reader_ratio";
|
||||
public static final String READER_WRAPPER_TYPE = "index.engine.mock.random.wrapper";
|
||||
public static final String FLUSH_ON_CLOSE_RATIO = "index.engine.mock.flush_on_close.ratio";
|
||||
private final AtomicBoolean closing = new AtomicBoolean(false);
|
||||
final class MockInternalEngine extends InternalEngine {
|
||||
private MockEngineSupport support;
|
||||
|
||||
public static class MockContext {
|
||||
public final Random random;
|
||||
public final boolean wrapReader;
|
||||
public final Class<? extends FilterDirectoryReader> wrapper;
|
||||
public final Settings indexSettings;
|
||||
private final double flushOnClose;
|
||||
|
||||
public MockContext(Random random, boolean wrapReader, Class<? extends FilterDirectoryReader> wrapper, Settings indexSettings) {
|
||||
this.random = random;
|
||||
this.wrapReader = wrapReader;
|
||||
this.wrapper = wrapper;
|
||||
this.indexSettings = indexSettings;
|
||||
flushOnClose = indexSettings.getAsDouble(FLUSH_ON_CLOSE_RATIO, 0.5d);
|
||||
}
|
||||
}
|
||||
|
||||
public static final ConcurrentMap<AssertingSearcher, RuntimeException> INFLIGHT_ENGINE_SEARCHERS = new ConcurrentHashMap<>();
|
||||
|
||||
private MockContext mockContext;
|
||||
|
||||
public MockInternalEngine(EngineConfig config, boolean skipInitialTranslogRecovery) throws EngineException {
|
||||
MockInternalEngine(EngineConfig config, boolean skipInitialTranslogRecovery) throws EngineException {
|
||||
super(config, skipInitialTranslogRecovery);
|
||||
}
|
||||
|
||||
private synchronized MockContext getMockContext() {
|
||||
if (mockContext == null) {
|
||||
Settings indexSettings = config().getIndexSettings();
|
||||
final long seed = indexSettings.getAsLong(ElasticsearchIntegrationTest.SETTING_INDEX_SEED, 0l);
|
||||
Random random = new Random(seed);
|
||||
final double ratio = indexSettings.getAsDouble(WRAP_READER_RATIO, 0.0d); // DISABLED by default - AssertingDR is crazy slow
|
||||
Class<? extends AssertingDirectoryReader> wrapper = indexSettings.getAsClass(READER_WRAPPER_TYPE, AssertingDirectoryReader.class);
|
||||
boolean wrapReader = random.nextDouble() < ratio;
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("Using [{}] for shard [{}] seed: [{}] wrapReader: [{}]", this.getClass().getName(), shardId, seed, wrapReader);
|
||||
}
|
||||
mockContext = new MockContext(random, wrapReader, wrapper, indexSettings);
|
||||
private synchronized MockEngineSupport support() {
|
||||
// lazy initialized since we need it already on super() ctor execution :(
|
||||
if (support == null) {
|
||||
support = new MockEngineSupport(config());
|
||||
}
|
||||
return mockContext;
|
||||
return support;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
MockContext mockContext = getMockContext();
|
||||
try {
|
||||
if (closing.compareAndSet(false, true)) { // only do the random thing if we are the first call to this since super.flushOnClose() calls #close() again and then we might end up with a stackoverflow.
|
||||
if (mockContext.flushOnClose > mockContext.random.nextDouble()) {
|
||||
super.flushAndClose();
|
||||
} else {
|
||||
super.close();
|
||||
}
|
||||
} else {
|
||||
switch(support().flushOrClose(this, MockEngineSupport.CloseAction.CLOSE)) {
|
||||
case FLUSH_AND_CLOSE:
|
||||
super.flushAndClose();
|
||||
break;
|
||||
case CLOSE:
|
||||
super.close();
|
||||
}
|
||||
} finally {
|
||||
if (logger.isTraceEnabled()) {
|
||||
// log debug if we have pending searchers
|
||||
for (Map.Entry<AssertingSearcher, RuntimeException> entry : INFLIGHT_ENGINE_SEARCHERS.entrySet()) {
|
||||
logger.trace("Unreleased Searchers instance for shard [{}]",
|
||||
entry.getValue(), entry.getKey().shardId());
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
logger.debug("Ongoing recoveries after engine close: " + onGoingRecoveries.get());
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void flushAndClose() throws IOException {
|
||||
switch(support().flushOrClose(this, MockEngineSupport.CloseAction.FLUSH_AND_CLOSE)) {
|
||||
case FLUSH_AND_CLOSE:
|
||||
super.flushAndClose();
|
||||
break;
|
||||
case CLOSE:
|
||||
super.close();
|
||||
break;
|
||||
}
|
||||
logger.debug("Ongoing recoveries after engine close: " + onGoingRecoveries.get());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void flushAndClose() throws IOException {
|
||||
MockContext mockContext = getMockContext();
|
||||
if (closing.compareAndSet(false, true)) { // only do the random thing if we are the first call to this since super.flushOnClose() calls #close() again and then we might end up with a stackoverflow.
|
||||
if (mockContext.flushOnClose > mockContext.random.nextDouble()) {
|
||||
super.flushAndClose();
|
||||
} else {
|
||||
super.close();
|
||||
}
|
||||
} else {
|
||||
super.flushAndClose();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Searcher newSearcher(String source, IndexSearcher searcher, SearcherManager manager) throws EngineException {
|
||||
MockContext mockContext = getMockContext();
|
||||
IndexReader reader = searcher.getIndexReader();
|
||||
IndexReader wrappedReader = reader;
|
||||
assert reader != null;
|
||||
if (reader instanceof DirectoryReader && mockContext.wrapReader) {
|
||||
wrappedReader = wrapReader((DirectoryReader) reader);
|
||||
}
|
||||
// this executes basic query checks and asserts that weights are normalized only once etc.
|
||||
final AssertingIndexSearcher assertingIndexSearcher = new AssertingIndexSearcher(mockContext.random, wrappedReader);
|
||||
final AssertingIndexSearcher assertingIndexSearcher = support().newSearcher(this, source, searcher, manager);
|
||||
assertingIndexSearcher.setSimilarity(searcher.getSimilarity());
|
||||
// pass the original searcher to the super.newSearcher() method to make sure this is the searcher that will
|
||||
// be released later on. If we wrap an index reader here must not pass the wrapped version to the manager
|
||||
// on release otherwise the reader will be closed too early. - good news, stuff will fail all over the place if we don't get this right here
|
||||
return new AssertingSearcher(assertingIndexSearcher,
|
||||
super.newSearcher(source, searcher, manager), shardId, INFLIGHT_ENGINE_SEARCHERS, logger);
|
||||
super.newSearcher(source, searcher, manager), shardId, MockEngineSupport.INFLIGHT_ENGINE_SEARCHERS, logger);
|
||||
}
|
||||
|
||||
private DirectoryReader wrapReader(DirectoryReader reader) {
|
||||
MockContext mockContext = getMockContext();
|
||||
try {
|
||||
Constructor<?>[] constructors = mockContext.wrapper.getConstructors();
|
||||
Constructor<?> nonRandom = null;
|
||||
for (Constructor<?> constructor : constructors) {
|
||||
Class<?>[] parameterTypes = constructor.getParameterTypes();
|
||||
if (parameterTypes.length > 0 && parameterTypes[0] == DirectoryReader.class) {
|
||||
if (parameterTypes.length == 1) {
|
||||
nonRandom = constructor;
|
||||
} else if (parameterTypes.length == 2 && parameterTypes[1] == Settings.class) {
|
||||
|
||||
return (DirectoryReader) constructor.newInstance(reader, mockContext.indexSettings);
|
||||
}
|
||||
}
|
||||
}
|
||||
if (nonRandom != null) {
|
||||
return (DirectoryReader) nonRandom.newInstance(reader);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
throw new ElasticsearchException("Can not wrap reader", e);
|
||||
}
|
||||
return reader;
|
||||
}
|
||||
|
||||
public static abstract class DirectoryReaderWrapper extends FilterDirectoryReader {
|
||||
protected final SubReaderWrapper subReaderWrapper;
|
||||
|
||||
public DirectoryReaderWrapper(DirectoryReader in, SubReaderWrapper subReaderWrapper) throws IOException {
|
||||
super(in, subReaderWrapper);
|
||||
this.subReaderWrapper = subReaderWrapper;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object getCoreCacheKey() {
|
||||
return in.getCoreCacheKey();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object getCombinedCoreAndDeletesKey() {
|
||||
return in.getCombinedCoreAndDeletesKey();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -19,44 +19,24 @@
|
|||
|
||||
package org.elasticsearch.test.engine;
|
||||
|
||||
import org.apache.lucene.index.AssertingDirectoryReader;
|
||||
import org.apache.lucene.index.DirectoryReader;
|
||||
import org.apache.lucene.index.IndexReader;
|
||||
import org.apache.lucene.search.AssertingIndexSearcher;
|
||||
import org.apache.lucene.search.IndexSearcher;
|
||||
import org.apache.lucene.search.SearcherManager;
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.index.engine.EngineConfig;
|
||||
import org.elasticsearch.index.engine.EngineException;
|
||||
import org.elasticsearch.index.engine.ShadowEngine;
|
||||
import org.elasticsearch.test.ElasticsearchIntegrationTest;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.lang.reflect.Constructor;
|
||||
import java.util.Map;
|
||||
import java.util.Random;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
|
||||
public class MockShadowEngine extends ShadowEngine {
|
||||
final class MockShadowEngine extends ShadowEngine {
|
||||
private final MockEngineSupport support;
|
||||
|
||||
private final MockInternalEngine.MockContext mockContext;
|
||||
public static final ConcurrentMap<AssertingSearcher, RuntimeException> INFLIGHT_ENGINE_SEARCHERS = new ConcurrentHashMap<>();
|
||||
|
||||
public MockShadowEngine(EngineConfig config) {
|
||||
MockShadowEngine(EngineConfig config) {
|
||||
super(config);
|
||||
Settings indexSettings = config.getIndexSettings();
|
||||
final long seed = indexSettings.getAsLong(ElasticsearchIntegrationTest.SETTING_INDEX_SEED, 0l);
|
||||
Random random = new Random(seed);
|
||||
final double ratio = indexSettings.getAsDouble(MockInternalEngine.WRAP_READER_RATIO, 0.0d); // DISABLED by default - AssertingDR is crazy slow
|
||||
Class<? extends AssertingDirectoryReader> wrapper = indexSettings.getAsClass(MockInternalEngine.READER_WRAPPER_TYPE, AssertingDirectoryReader.class);
|
||||
boolean wrapReader = random.nextDouble() < ratio;
|
||||
logger.trace("Using [{}] for shard [{}] seed: [{}] wrapReader: [{}]", this.getClass().getName(), shardId, seed, wrapReader);
|
||||
mockContext = new MockInternalEngine.MockContext(random, wrapReader, wrapper, indexSettings);
|
||||
this.support = new MockEngineSupport(config);
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
try {
|
||||
|
@ -64,7 +44,7 @@ public class MockShadowEngine extends ShadowEngine {
|
|||
} finally {
|
||||
if (logger.isTraceEnabled()) {
|
||||
// log debug if we have pending searchers
|
||||
for (Map.Entry<AssertingSearcher, RuntimeException> entry : INFLIGHT_ENGINE_SEARCHERS.entrySet()) {
|
||||
for (Map.Entry<AssertingSearcher, RuntimeException> entry : MockEngineSupport.INFLIGHT_ENGINE_SEARCHERS.entrySet()) {
|
||||
logger.trace("Unreleased Searchers instance for shard [{}]", entry.getValue(), entry.getKey().shardId());
|
||||
}
|
||||
}
|
||||
|
@ -73,48 +53,13 @@ public class MockShadowEngine extends ShadowEngine {
|
|||
|
||||
@Override
|
||||
protected Searcher newSearcher(String source, IndexSearcher searcher, SearcherManager manager) throws EngineException {
|
||||
|
||||
IndexReader reader = searcher.getIndexReader();
|
||||
IndexReader wrappedReader = reader;
|
||||
if (reader instanceof DirectoryReader && mockContext.wrapReader) {
|
||||
wrappedReader = wrapReader((DirectoryReader) reader);
|
||||
}
|
||||
// this executes basic query checks and asserts that weights are normalized only once etc.
|
||||
final AssertingIndexSearcher assertingIndexSearcher = new AssertingIndexSearcher(mockContext.random, wrappedReader);
|
||||
final AssertingIndexSearcher assertingIndexSearcher = support.newSearcher(this, source, searcher, manager);
|
||||
assertingIndexSearcher.setSimilarity(searcher.getSimilarity());
|
||||
// pass the original searcher to the super.newSearcher() method to make
|
||||
// sure this is the searcher that will be released later on. If we wrap
|
||||
// an index reader here must not pass the wrapped version to the manager
|
||||
// on release otherwise the reader will be closed too early. - good
|
||||
// news, stuff will fail all over the place if we don't get this
|
||||
// right here
|
||||
// pass the original searcher to the super.newSearcher() method to make sure this is the searcher that will
|
||||
// be released later on. If we wrap an index reader here must not pass the wrapped version to the manager
|
||||
// on release otherwise the reader will be closed too early. - good news, stuff will fail all over the place if we don't get this right here
|
||||
return new AssertingSearcher(assertingIndexSearcher,
|
||||
super.newSearcher(source, searcher, manager), shardId,
|
||||
INFLIGHT_ENGINE_SEARCHERS, logger);
|
||||
}
|
||||
|
||||
private DirectoryReader wrapReader(DirectoryReader reader) {
|
||||
try {
|
||||
Constructor<?>[] constructors = mockContext.wrapper.getConstructors();
|
||||
Constructor<?> nonRandom = null;
|
||||
for (Constructor<?> constructor : constructors) {
|
||||
Class<?>[] parameterTypes = constructor.getParameterTypes();
|
||||
if (parameterTypes.length > 0 && parameterTypes[0] == DirectoryReader.class) {
|
||||
if (parameterTypes.length == 1) {
|
||||
nonRandom = constructor;
|
||||
} else if (parameterTypes.length == 2 && parameterTypes[1] == Settings.class) {
|
||||
|
||||
return (DirectoryReader) constructor.newInstance(reader, mockContext.indexSettings);
|
||||
}
|
||||
}
|
||||
}
|
||||
if (nonRandom != null) {
|
||||
return (DirectoryReader) nonRandom.newInstance(reader);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
throw new ElasticsearchException("Can not wrap reader", e);
|
||||
}
|
||||
return reader;
|
||||
super.newSearcher(source, searcher, manager), shardId, MockEngineSupport.INFLIGHT_ENGINE_SEARCHERS, logger);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -66,8 +66,7 @@ import org.elasticsearch.search.SearchHit;
|
|||
import org.elasticsearch.search.suggest.Suggest;
|
||||
import org.elasticsearch.test.VersionUtils;
|
||||
import org.elasticsearch.test.engine.AssertingSearcher;
|
||||
import org.elasticsearch.test.engine.MockInternalEngine;
|
||||
import org.elasticsearch.test.engine.MockShadowEngine;
|
||||
import org.elasticsearch.test.engine.MockEngineSupport;
|
||||
import org.elasticsearch.test.store.MockDirectoryHelper;
|
||||
import org.hamcrest.Matcher;
|
||||
import org.hamcrest.Matchers;
|
||||
|
@ -653,34 +652,27 @@ public class ElasticsearchAssertions {
|
|||
if (awaitBusy(new Predicate<Object>() {
|
||||
@Override
|
||||
public boolean apply(Object o) {
|
||||
return MockInternalEngine.INFLIGHT_ENGINE_SEARCHERS.isEmpty() &&
|
||||
MockShadowEngine.INFLIGHT_ENGINE_SEARCHERS.isEmpty();
|
||||
return MockEngineSupport.INFLIGHT_ENGINE_SEARCHERS.isEmpty();
|
||||
}
|
||||
}, 5, TimeUnit.SECONDS)) {
|
||||
return;
|
||||
}
|
||||
} catch (InterruptedException ex) {
|
||||
if (MockInternalEngine.INFLIGHT_ENGINE_SEARCHERS.isEmpty() &&
|
||||
MockShadowEngine.INFLIGHT_ENGINE_SEARCHERS.isEmpty()) {
|
||||
if (MockEngineSupport.INFLIGHT_ENGINE_SEARCHERS.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
try {
|
||||
RuntimeException ex = null;
|
||||
StringBuilder builder = new StringBuilder("Unclosed Searchers instance for shards: [");
|
||||
for (Map.Entry<AssertingSearcher, RuntimeException> entry : MockInternalEngine.INFLIGHT_ENGINE_SEARCHERS.entrySet()) {
|
||||
ex = entry.getValue();
|
||||
builder.append(entry.getKey().shardId()).append(",");
|
||||
}
|
||||
for (Map.Entry<AssertingSearcher, RuntimeException> entry : MockShadowEngine.INFLIGHT_ENGINE_SEARCHERS.entrySet()) {
|
||||
for (Map.Entry<AssertingSearcher, RuntimeException> entry : MockEngineSupport.INFLIGHT_ENGINE_SEARCHERS.entrySet()) {
|
||||
ex = entry.getValue();
|
||||
builder.append(entry.getKey().shardId()).append(",");
|
||||
}
|
||||
builder.append("]");
|
||||
throw new RuntimeException(builder.toString(), ex);
|
||||
} finally {
|
||||
MockInternalEngine.INFLIGHT_ENGINE_SEARCHERS.clear();
|
||||
MockShadowEngine.INFLIGHT_ENGINE_SEARCHERS.clear();
|
||||
MockEngineSupport.INFLIGHT_ENGINE_SEARCHERS.clear();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue