[TEST] Cleanup Direcotry and Searcher mock code
We deployed our own code to check if directories are closed etc an d if serachers are still open. Yet, since we don't have a global cluster anymore we can just use lucene's internal mechanism to do that. This commit removes all special handling and usese LuceneTestCase.closeAfterSuite to fail if certain resources are not closed Closes #10853
This commit is contained in:
parent
4d0a82d928
commit
bc3136a345
|
@ -227,7 +227,6 @@ public class OldIndexBackwardsCompatibilityTests extends ElasticsearchIntegratio
|
||||||
|
|
||||||
void unloadIndex(String indexName) throws Exception {
|
void unloadIndex(String indexName) throws Exception {
|
||||||
ElasticsearchAssertions.assertAcked(client().admin().indices().prepareDelete(indexName).get());
|
ElasticsearchAssertions.assertAcked(client().admin().indices().prepareDelete(indexName).get());
|
||||||
ElasticsearchAssertions.assertAllFilesClosed();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testAllVersionsTested() throws Exception {
|
public void testAllVersionsTested() throws Exception {
|
||||||
|
|
|
@ -538,33 +538,6 @@ public class MetaDataStateFormatTest extends ElasticsearchTestCase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// copied from lucene - it's package private
|
|
||||||
final class CloseableDirectory implements Closeable {
|
|
||||||
private final BaseDirectoryWrapper dir;
|
|
||||||
private final TestRuleMarkFailure failureMarker;
|
|
||||||
|
|
||||||
public CloseableDirectory(BaseDirectoryWrapper dir,
|
|
||||||
TestRuleMarkFailure failureMarker) {
|
|
||||||
this.dir = dir;
|
|
||||||
this.failureMarker = failureMarker;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void close() throws IOException {
|
|
||||||
// We only attempt to check open/closed state if there were no other test
|
|
||||||
// failures.
|
|
||||||
try {
|
|
||||||
if (failureMarker.wasSuccessful() && dir.isOpen()) {
|
|
||||||
Assert.fail("Directory not closed: " + dir);
|
|
||||||
}
|
|
||||||
} finally {
|
|
||||||
if (dir.isOpen()) {
|
|
||||||
dir.close();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public Path[] content(String glob, Path dir) throws IOException {
|
public Path[] content(String glob, Path dir) throws IOException {
|
||||||
try (DirectoryStream<Path> stream = Files.newDirectoryStream(dir, glob)) {
|
try (DirectoryStream<Path> stream = Files.newDirectoryStream(dir, glob)) {
|
||||||
return Iterators.toArray(stream.iterator(), Path.class);
|
return Iterators.toArray(stream.iterator(), Path.class);
|
||||||
|
|
|
@ -35,7 +35,6 @@ import org.elasticsearch.indices.recovery.RecoveryState;
|
||||||
import org.elasticsearch.test.ElasticsearchIntegrationTest;
|
import org.elasticsearch.test.ElasticsearchIntegrationTest;
|
||||||
import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope;
|
import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope;
|
||||||
import org.elasticsearch.test.InternalTestCluster.RestartCallback;
|
import org.elasticsearch.test.InternalTestCluster.RestartCallback;
|
||||||
import org.elasticsearch.test.store.MockDirectoryHelper;
|
|
||||||
import org.elasticsearch.test.store.MockFSDirectoryService;
|
import org.elasticsearch.test.store.MockFSDirectoryService;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
@ -353,7 +352,7 @@ public class RecoveryFromGatewayTests extends ElasticsearchIntegrationTest {
|
||||||
.put(MockFSDirectoryService.CHECK_INDEX_ON_CLOSE, false)
|
.put(MockFSDirectoryService.CHECK_INDEX_ON_CLOSE, false)
|
||||||
.put("gateway.recover_after_nodes", 4)
|
.put("gateway.recover_after_nodes", 4)
|
||||||
.put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_CONCURRENT_RECOVERIES, 4)
|
.put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_CONCURRENT_RECOVERIES, 4)
|
||||||
.put(MockDirectoryHelper.CRASH_INDEX, false).build();
|
.put(MockFSDirectoryService.CRASH_INDEX, false).build();
|
||||||
|
|
||||||
internalCluster().startNodesAsync(4, settings).get();
|
internalCluster().startNodesAsync(4, settings).get();
|
||||||
// prevent any rebalance actions during the peer recovery
|
// prevent any rebalance actions during the peer recovery
|
||||||
|
|
|
@ -18,7 +18,6 @@
|
||||||
*/
|
*/
|
||||||
package org.elasticsearch.indices.leaks;
|
package org.elasticsearch.indices.leaks;
|
||||||
|
|
||||||
import org.apache.lucene.util.LuceneTestCase.BadApple;
|
|
||||||
import org.elasticsearch.common.inject.Injector;
|
import org.elasticsearch.common.inject.Injector;
|
||||||
import org.elasticsearch.common.settings.ImmutableSettings;
|
import org.elasticsearch.common.settings.ImmutableSettings;
|
||||||
import org.elasticsearch.index.mapper.DocumentMapper;
|
import org.elasticsearch.index.mapper.DocumentMapper;
|
||||||
|
@ -28,7 +27,6 @@ import org.elasticsearch.index.shard.IndexShard;
|
||||||
import org.elasticsearch.indices.IndicesService;
|
import org.elasticsearch.indices.IndicesService;
|
||||||
import org.elasticsearch.test.ElasticsearchIntegrationTest;
|
import org.elasticsearch.test.ElasticsearchIntegrationTest;
|
||||||
import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope;
|
import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope;
|
||||||
import org.elasticsearch.test.store.MockDirectoryHelper;
|
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import java.lang.ref.WeakReference;
|
import java.lang.ref.WeakReference;
|
||||||
|
@ -92,7 +90,6 @@ public class IndicesLeaksTests extends ElasticsearchIntegrationTest {
|
||||||
shardInjector = null;
|
shardInjector = null;
|
||||||
|
|
||||||
cluster().wipeIndices("test");
|
cluster().wipeIndices("test");
|
||||||
MockDirectoryHelper.wrappers.clear(); // we need to clear this to allow the objects to recycle
|
|
||||||
|
|
||||||
for (int i = 0; i < 100; i++) {
|
for (int i = 0; i < 100; i++) {
|
||||||
System.gc();
|
System.gc();
|
||||||
|
|
|
@ -19,7 +19,6 @@
|
||||||
|
|
||||||
package org.elasticsearch.indices.recovery;
|
package org.elasticsearch.indices.recovery;
|
||||||
|
|
||||||
import com.carrotsearch.randomizedtesting.LifecycleScope;
|
|
||||||
import com.google.common.util.concurrent.ListenableFuture;
|
import com.google.common.util.concurrent.ListenableFuture;
|
||||||
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
|
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
|
||||||
import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
|
import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
|
||||||
|
@ -52,7 +51,7 @@ import org.elasticsearch.snapshots.SnapshotState;
|
||||||
import org.elasticsearch.test.ElasticsearchIntegrationTest;
|
import org.elasticsearch.test.ElasticsearchIntegrationTest;
|
||||||
import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope;
|
import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope;
|
||||||
import org.elasticsearch.test.junit.annotations.TestLogging;
|
import org.elasticsearch.test.junit.annotations.TestLogging;
|
||||||
import org.elasticsearch.test.store.MockDirectoryHelper;
|
import org.elasticsearch.test.store.MockFSDirectoryService;
|
||||||
import org.elasticsearch.test.transport.MockTransportService;
|
import org.elasticsearch.test.transport.MockTransportService;
|
||||||
import org.elasticsearch.transport.*;
|
import org.elasticsearch.transport.*;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
@ -522,7 +521,7 @@ public class IndexRecoveryTests extends ElasticsearchIntegrationTest {
|
||||||
.put(RecoverySettings.INDICES_RECOVERY_INTERNAL_ACTION_TIMEOUT, "1s")
|
.put(RecoverySettings.INDICES_RECOVERY_INTERNAL_ACTION_TIMEOUT, "1s")
|
||||||
.put("cluster.routing.schedule", "100ms") // aggressive reroute post shard failures
|
.put("cluster.routing.schedule", "100ms") // aggressive reroute post shard failures
|
||||||
.put(TransportModule.TRANSPORT_SERVICE_TYPE_KEY, MockTransportService.class.getName())
|
.put(TransportModule.TRANSPORT_SERVICE_TYPE_KEY, MockTransportService.class.getName())
|
||||||
.put(MockDirectoryHelper.RANDOM_PREVENT_DOUBLE_WRITE, false) // restarted recoveries will delete temp files and write them again
|
.put(MockFSDirectoryService.RANDOM_PREVENT_DOUBLE_WRITE, false) // restarted recoveries will delete temp files and write them again
|
||||||
.build();
|
.build();
|
||||||
// start a master node
|
// start a master node
|
||||||
internalCluster().startNode(nodeSettings);
|
internalCluster().startNode(nodeSettings);
|
||||||
|
|
|
@ -41,7 +41,6 @@ import org.elasticsearch.test.ElasticsearchIntegrationTest;
|
||||||
import org.elasticsearch.test.engine.MockEngineSupport;
|
import org.elasticsearch.test.engine.MockEngineSupport;
|
||||||
import org.elasticsearch.test.engine.ThrowingLeafReaderWrapper;
|
import org.elasticsearch.test.engine.ThrowingLeafReaderWrapper;
|
||||||
import org.elasticsearch.test.junit.annotations.TestLogging;
|
import org.elasticsearch.test.junit.annotations.TestLogging;
|
||||||
import org.elasticsearch.test.store.MockDirectoryHelper;
|
|
||||||
import org.elasticsearch.test.store.MockFSDirectoryService;
|
import org.elasticsearch.test.store.MockFSDirectoryService;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
@ -108,15 +107,15 @@ public class SearchWithRandomExceptionsTests extends ElasticsearchIntegrationTes
|
||||||
client().admin().indices().prepareFlush("test").setWaitIfOngoing(true).execute().get();
|
client().admin().indices().prepareFlush("test").setWaitIfOngoing(true).execute().get();
|
||||||
client().admin().indices().prepareClose("test").execute().get();
|
client().admin().indices().prepareClose("test").execute().get();
|
||||||
client().admin().indices().prepareUpdateSettings("test").setSettings(settingsBuilder()
|
client().admin().indices().prepareUpdateSettings("test").setSettings(settingsBuilder()
|
||||||
.put(MockDirectoryHelper.RANDOM_IO_EXCEPTION_RATE, exceptionRate)
|
.put(MockFSDirectoryService.RANDOM_IO_EXCEPTION_RATE, exceptionRate)
|
||||||
.put(MockDirectoryHelper.RANDOM_IO_EXCEPTION_RATE_ON_OPEN, exceptionOnOpenRate));
|
.put(MockFSDirectoryService.RANDOM_IO_EXCEPTION_RATE_ON_OPEN, exceptionOnOpenRate));
|
||||||
client().admin().indices().prepareOpen("test").execute().get();
|
client().admin().indices().prepareOpen("test").execute().get();
|
||||||
} else {
|
} else {
|
||||||
Builder settings = settingsBuilder()
|
Builder settings = settingsBuilder()
|
||||||
.put("index.number_of_replicas", randomIntBetween(0, 1))
|
.put("index.number_of_replicas", randomIntBetween(0, 1))
|
||||||
.put(MockFSDirectoryService.CHECK_INDEX_ON_CLOSE, false)
|
.put(MockFSDirectoryService.CHECK_INDEX_ON_CLOSE, false)
|
||||||
.put(MockDirectoryHelper.RANDOM_IO_EXCEPTION_RATE, exceptionRate)
|
.put(MockFSDirectoryService.RANDOM_IO_EXCEPTION_RATE, exceptionRate)
|
||||||
.put(MockDirectoryHelper.RANDOM_IO_EXCEPTION_RATE_ON_OPEN, exceptionOnOpenRate); // we cannot expect that the index will be valid
|
.put(MockFSDirectoryService.RANDOM_IO_EXCEPTION_RATE_ON_OPEN, exceptionOnOpenRate); // we cannot expect that the index will be valid
|
||||||
logger.info("creating index: [test] using settings: [{}]", settings.build().getAsMap());
|
logger.info("creating index: [test] using settings: [{}]", settings.build().getAsMap());
|
||||||
client().admin().indices().prepareCreate("test")
|
client().admin().indices().prepareCreate("test")
|
||||||
.setSettings(settings)
|
.setSettings(settings)
|
||||||
|
@ -190,8 +189,8 @@ public class SearchWithRandomExceptionsTests extends ElasticsearchIntegrationTes
|
||||||
// check the index still contains the records that we indexed without errors
|
// check the index still contains the records that we indexed without errors
|
||||||
client().admin().indices().prepareClose("test").execute().get();
|
client().admin().indices().prepareClose("test").execute().get();
|
||||||
client().admin().indices().prepareUpdateSettings("test").setSettings(settingsBuilder()
|
client().admin().indices().prepareUpdateSettings("test").setSettings(settingsBuilder()
|
||||||
.put(MockDirectoryHelper.RANDOM_IO_EXCEPTION_RATE, 0)
|
.put(MockFSDirectoryService.RANDOM_IO_EXCEPTION_RATE, 0)
|
||||||
.put(MockDirectoryHelper.RANDOM_IO_EXCEPTION_RATE_ON_OPEN, 0));
|
.put(MockFSDirectoryService.RANDOM_IO_EXCEPTION_RATE_ON_OPEN, 0));
|
||||||
client().admin().indices().prepareOpen("test").execute().get();
|
client().admin().indices().prepareOpen("test").execute().get();
|
||||||
ensureGreen();
|
ensureGreen();
|
||||||
SearchResponse searchResponse = client().prepareSearch().setTypes("type").setQuery(QueryBuilders.matchQuery("test", "init")).get();
|
SearchResponse searchResponse = client().prepareSearch().setTypes("type").setQuery(QueryBuilders.matchQuery("test", "init")).get();
|
||||||
|
|
|
@ -71,8 +71,6 @@ import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAllFilesClosed;
|
|
||||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAllSearchersClosed;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Base testcase for randomized unit testing with Elasticsearch
|
* Base testcase for randomized unit testing with Elasticsearch
|
||||||
|
@ -205,26 +203,6 @@ public abstract class ElasticsearchTestCase extends LuceneTestCase {
|
||||||
System.clearProperty(EsExecutors.DEFAULT_SYSPROP);
|
System.clearProperty(EsExecutors.DEFAULT_SYSPROP);
|
||||||
}
|
}
|
||||||
|
|
||||||
// check some things (like MockDirectoryWrappers) are closed where we currently
|
|
||||||
// manage them. TODO: can we add these to LuceneTestCase.closeAfterSuite directly?
|
|
||||||
// or something else simpler instead of the fake closeables?
|
|
||||||
|
|
||||||
@BeforeClass
|
|
||||||
public static void setAfterSuiteAssertions() throws Exception {
|
|
||||||
closeAfterSuite(new Closeable() {
|
|
||||||
@Override
|
|
||||||
public void close() throws IOException {
|
|
||||||
assertAllFilesClosed();
|
|
||||||
}
|
|
||||||
});
|
|
||||||
closeAfterSuite(new Closeable() {
|
|
||||||
@Override
|
|
||||||
public void close() throws IOException {
|
|
||||||
assertAllSearchersClosed();
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
@After
|
@After
|
||||||
public final void ensureCleanedUp() throws Exception {
|
public final void ensureCleanedUp() throws Exception {
|
||||||
MockPageCacheRecycler.ensureAllPagesAreReleased();
|
MockPageCacheRecycler.ensureAllPagesAreReleased();
|
||||||
|
|
|
@ -81,8 +81,6 @@ public abstract class TestCluster implements Iterable<Client>, Closeable {
|
||||||
* This method checks all the things that need to be checked after each test
|
* This method checks all the things that need to be checked after each test
|
||||||
*/
|
*/
|
||||||
public void assertAfterTest() throws IOException {
|
public void assertAfterTest() throws IOException {
|
||||||
assertAllSearchersClosed();
|
|
||||||
assertAllFilesClosed();
|
|
||||||
ensureEstimatedStats();
|
ensureEstimatedStats();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -21,17 +21,21 @@ package org.elasticsearch.test.engine;
|
||||||
|
|
||||||
import org.apache.lucene.index.IndexReader;
|
import org.apache.lucene.index.IndexReader;
|
||||||
import org.apache.lucene.search.IndexSearcher;
|
import org.apache.lucene.search.IndexSearcher;
|
||||||
|
import org.apache.lucene.util.LuceneTestCase;
|
||||||
import org.elasticsearch.ElasticsearchException;
|
import org.elasticsearch.ElasticsearchException;
|
||||||
import org.elasticsearch.common.logging.ESLogger;
|
import org.elasticsearch.common.logging.ESLogger;
|
||||||
import org.elasticsearch.index.engine.Engine;
|
import org.elasticsearch.index.engine.Engine;
|
||||||
import org.elasticsearch.index.shard.ShardId;
|
import org.elasticsearch.index.shard.ShardId;
|
||||||
|
|
||||||
|
import java.io.Closeable;
|
||||||
|
import java.io.IOException;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A searcher that asserts the IndexReader's refcount on close
|
* A searcher that asserts the IndexReader's refcount on close
|
||||||
*/
|
*/
|
||||||
public class AssertingSearcher extends Engine.Searcher {
|
class AssertingSearcher extends Engine.Searcher {
|
||||||
private final Engine.Searcher wrappedSearcher;
|
private final Engine.Searcher wrappedSearcher;
|
||||||
private final ShardId shardId;
|
private final ShardId shardId;
|
||||||
private final IndexSearcher indexSearcher;
|
private final IndexSearcher indexSearcher;
|
||||||
|
@ -39,10 +43,10 @@ public class AssertingSearcher extends Engine.Searcher {
|
||||||
private final Object lock = new Object();
|
private final Object lock = new Object();
|
||||||
private final int initialRefCount;
|
private final int initialRefCount;
|
||||||
private final ESLogger logger;
|
private final ESLogger logger;
|
||||||
private final Map<AssertingSearcher, RuntimeException> inFlightSearchers;
|
private final AtomicBoolean closed = new AtomicBoolean(false);
|
||||||
|
|
||||||
public AssertingSearcher(IndexSearcher indexSearcher, Engine.Searcher wrappedSearcher,
|
AssertingSearcher(IndexSearcher indexSearcher, final Engine.Searcher wrappedSearcher,
|
||||||
ShardId shardId, Map<AssertingSearcher, RuntimeException> inFlightSearchers,
|
ShardId shardId,
|
||||||
ESLogger logger) {
|
ESLogger logger) {
|
||||||
super(wrappedSearcher.source(), indexSearcher);
|
super(wrappedSearcher.source(), indexSearcher);
|
||||||
// we only use the given index searcher here instead of the IS of the wrapped searcher. the IS might be a wrapped searcher
|
// we only use the given index searcher here instead of the IS of the wrapped searcher. the IS might be a wrapped searcher
|
||||||
|
@ -53,8 +57,15 @@ public class AssertingSearcher extends Engine.Searcher {
|
||||||
initialRefCount = wrappedSearcher.reader().getRefCount();
|
initialRefCount = wrappedSearcher.reader().getRefCount();
|
||||||
this.indexSearcher = indexSearcher;
|
this.indexSearcher = indexSearcher;
|
||||||
assert initialRefCount > 0 : "IndexReader#getRefCount() was [" + initialRefCount + "] expected a value > [0] - reader is already closed";
|
assert initialRefCount > 0 : "IndexReader#getRefCount() was [" + initialRefCount + "] expected a value > [0] - reader is already closed";
|
||||||
this.inFlightSearchers = inFlightSearchers;
|
final RuntimeException ex = new RuntimeException("Unreleased Searcher, source [" + wrappedSearcher.source() + "]");
|
||||||
this.inFlightSearchers.put(this, new RuntimeException("Unreleased Searcher, source [" + wrappedSearcher.source() + "]"));
|
LuceneTestCase.closeAfterSuite(new Closeable() {
|
||||||
|
@Override
|
||||||
|
public void close() throws IOException {
|
||||||
|
if (closed.get() == false) {
|
||||||
|
throw ex;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -64,29 +75,25 @@ public class AssertingSearcher extends Engine.Searcher {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void close() throws ElasticsearchException {
|
public void close() throws ElasticsearchException {
|
||||||
RuntimeException remove = inFlightSearchers.remove(this);
|
|
||||||
synchronized (lock) {
|
synchronized (lock) {
|
||||||
// make sure we only get this once and store the stack of the first caller!
|
if (closed.compareAndSet(false, true)) {
|
||||||
if (remove == null) {
|
firstReleaseStack = new RuntimeException();
|
||||||
assert firstReleaseStack != null;
|
final int refCount = wrappedSearcher.reader().getRefCount();
|
||||||
|
// this assert seems to be paranoid but given LUCENE-5362 we better add some assertions here to make sure we catch any potential
|
||||||
|
// problems.
|
||||||
|
assert refCount > 0 : "IndexReader#getRefCount() was [" + refCount + "] expected a value > [0] - reader is already closed. Initial refCount was: [" + initialRefCount + "]";
|
||||||
|
try {
|
||||||
|
wrappedSearcher.close();
|
||||||
|
} catch (RuntimeException ex) {
|
||||||
|
logger.debug("Failed to release searcher", ex);
|
||||||
|
throw ex;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
AssertionError error = new AssertionError("Released Searcher more than once, source [" + wrappedSearcher.source() + "]");
|
AssertionError error = new AssertionError("Released Searcher more than once, source [" + wrappedSearcher.source() + "]");
|
||||||
error.initCause(firstReleaseStack);
|
error.initCause(firstReleaseStack);
|
||||||
throw error;
|
throw error;
|
||||||
} else {
|
|
||||||
assert firstReleaseStack == null;
|
|
||||||
firstReleaseStack = new RuntimeException("Searcher Released first here, source [" + wrappedSearcher.source() + "]");
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
final int refCount = wrappedSearcher.reader().getRefCount();
|
|
||||||
// this assert seems to be paranoid but given LUCENE-5362 we better add some assertions here to make sure we catch any potential
|
|
||||||
// problems.
|
|
||||||
assert refCount > 0 : "IndexReader#getRefCount() was [" + refCount + "] expected a value > [0] - reader is already closed. Initial refCount was: [" + initialRefCount + "]";
|
|
||||||
try {
|
|
||||||
wrappedSearcher.close();
|
|
||||||
} catch (RuntimeException ex) {
|
|
||||||
logger.debug("Failed to release searcher", ex);
|
|
||||||
throw ex;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -33,8 +33,10 @@ import org.elasticsearch.index.engine.Engine;
|
||||||
import org.elasticsearch.index.engine.EngineConfig;
|
import org.elasticsearch.index.engine.EngineConfig;
|
||||||
import org.elasticsearch.index.engine.EngineException;
|
import org.elasticsearch.index.engine.EngineException;
|
||||||
import org.elasticsearch.index.engine.InternalEngine;
|
import org.elasticsearch.index.engine.InternalEngine;
|
||||||
|
import org.elasticsearch.index.shard.ShardId;
|
||||||
import org.elasticsearch.test.ElasticsearchIntegrationTest;
|
import org.elasticsearch.test.ElasticsearchIntegrationTest;
|
||||||
|
|
||||||
|
import java.io.Closeable;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.lang.reflect.Constructor;
|
import java.lang.reflect.Constructor;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
@ -54,6 +56,7 @@ public final class MockEngineSupport {
|
||||||
public static final String FLUSH_ON_CLOSE_RATIO = "index.engine.mock.flush_on_close.ratio";
|
public static final String FLUSH_ON_CLOSE_RATIO = "index.engine.mock.flush_on_close.ratio";
|
||||||
private final AtomicBoolean closing = new AtomicBoolean(false);
|
private final AtomicBoolean closing = new AtomicBoolean(false);
|
||||||
private final ESLogger logger = Loggers.getLogger(Engine.class);
|
private final ESLogger logger = Loggers.getLogger(Engine.class);
|
||||||
|
private final ShardId shardId;
|
||||||
|
|
||||||
public static class MockContext {
|
public static class MockContext {
|
||||||
public final Random random;
|
public final Random random;
|
||||||
|
@ -71,12 +74,12 @@ public final class MockEngineSupport {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public static final ConcurrentMap<AssertingSearcher, RuntimeException> INFLIGHT_ENGINE_SEARCHERS = new ConcurrentHashMap<>();
|
|
||||||
|
|
||||||
private final MockContext mockContext;
|
private final MockContext mockContext;
|
||||||
|
|
||||||
public MockEngineSupport(EngineConfig config) {
|
public MockEngineSupport(EngineConfig config) {
|
||||||
Settings indexSettings = config.getIndexSettings();
|
Settings indexSettings = config.getIndexSettings();
|
||||||
|
shardId = config.getShardId();
|
||||||
final long seed = indexSettings.getAsLong(ElasticsearchIntegrationTest.SETTING_INDEX_SEED, 0l);
|
final long seed = indexSettings.getAsLong(ElasticsearchIntegrationTest.SETTING_INDEX_SEED, 0l);
|
||||||
Random random = new Random(seed);
|
Random random = new Random(seed);
|
||||||
final double ratio = indexSettings.getAsDouble(WRAP_READER_RATIO, 0.0d); // DISABLED by default - AssertingDR is crazy slow
|
final double ratio = indexSettings.getAsDouble(WRAP_READER_RATIO, 0.0d); // DISABLED by default - AssertingDR is crazy slow
|
||||||
|
@ -99,33 +102,23 @@ public final class MockEngineSupport {
|
||||||
* the first call and treats subsequent calls as if the engine passed is already closed.
|
* the first call and treats subsequent calls as if the engine passed is already closed.
|
||||||
*/
|
*/
|
||||||
public CloseAction flushOrClose(Engine engine, CloseAction originalAction) throws IOException {
|
public CloseAction flushOrClose(Engine engine, CloseAction originalAction) throws IOException {
|
||||||
try {
|
if (closing.compareAndSet(false, true)) { // only do the random thing if we are the first call to this since super.flushOnClose() calls #close() again and then we might end up with a stackoverflow.
|
||||||
if (closing.compareAndSet(false, true)) { // only do the random thing if we are the first call to this since super.flushOnClose() calls #close() again and then we might end up with a stackoverflow.
|
if (mockContext.flushOnClose > mockContext.random.nextDouble()) {
|
||||||
if (mockContext.flushOnClose > mockContext.random.nextDouble()) {
|
return CloseAction.FLUSH_AND_CLOSE;
|
||||||
return CloseAction.FLUSH_AND_CLOSE;
|
|
||||||
} else {
|
|
||||||
return CloseAction.CLOSE;
|
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
return originalAction;
|
return CloseAction.CLOSE;
|
||||||
}
|
|
||||||
} finally {
|
|
||||||
if (logger.isTraceEnabled()) {
|
|
||||||
// log debug if we have pending searchers
|
|
||||||
for (Map.Entry<AssertingSearcher, RuntimeException> entry : INFLIGHT_ENGINE_SEARCHERS.entrySet()) {
|
|
||||||
logger.trace("Unreleased Searchers instance for shard [{}]",
|
|
||||||
entry.getValue(), entry.getKey().shardId());
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
return originalAction;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public AssertingIndexSearcher newSearcher(Engine engine, String source, IndexSearcher searcher, SearcherManager manager) throws EngineException {
|
public AssertingIndexSearcher newSearcher(String source, IndexSearcher searcher, SearcherManager manager) throws EngineException {
|
||||||
IndexReader reader = searcher.getIndexReader();
|
IndexReader reader = searcher.getIndexReader();
|
||||||
IndexReader wrappedReader = reader;
|
IndexReader wrappedReader = reader;
|
||||||
assert reader != null;
|
assert reader != null;
|
||||||
if (reader instanceof DirectoryReader && mockContext.wrapReader) {
|
if (reader instanceof DirectoryReader && mockContext.wrapReader) {
|
||||||
wrappedReader = wrapReader((DirectoryReader) reader, engine);
|
wrappedReader = wrapReader((DirectoryReader) reader);
|
||||||
}
|
}
|
||||||
// this executes basic query checks and asserts that weights are normalized only once etc.
|
// this executes basic query checks and asserts that weights are normalized only once etc.
|
||||||
final AssertingIndexSearcher assertingIndexSearcher = new AssertingIndexSearcher(mockContext.random, wrappedReader);
|
final AssertingIndexSearcher assertingIndexSearcher = new AssertingIndexSearcher(mockContext.random, wrappedReader);
|
||||||
|
@ -133,7 +126,7 @@ public final class MockEngineSupport {
|
||||||
return assertingIndexSearcher;
|
return assertingIndexSearcher;
|
||||||
}
|
}
|
||||||
|
|
||||||
private DirectoryReader wrapReader(DirectoryReader reader, Engine engine) {
|
private DirectoryReader wrapReader(DirectoryReader reader) {
|
||||||
try {
|
try {
|
||||||
Constructor<?>[] constructors = mockContext.wrapper.getConstructors();
|
Constructor<?>[] constructors = mockContext.wrapper.getConstructors();
|
||||||
Constructor<?> nonRandom = null;
|
Constructor<?> nonRandom = null;
|
||||||
|
@ -177,4 +170,14 @@ public final class MockEngineSupport {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Engine.Searcher wrapSearcher(String source, Engine.Searcher engineSearcher, IndexSearcher searcher, SearcherManager manager) {
|
||||||
|
final AssertingIndexSearcher assertingIndexSearcher = newSearcher(source, searcher, manager);
|
||||||
|
assertingIndexSearcher.setSimilarity(searcher.getSimilarity());
|
||||||
|
// pass the original searcher to the super.newSearcher() method to make sure this is the searcher that will
|
||||||
|
// be released later on. If we wrap an index reader here must not pass the wrapped version to the manager
|
||||||
|
// on release otherwise the reader will be closed too early. - good news, stuff will fail all over the place if we don't get this right here
|
||||||
|
return new AssertingSearcher(assertingIndexSearcher, engineSearcher, shardId, logger);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,6 +21,7 @@ package org.elasticsearch.test.engine;
|
||||||
import org.apache.lucene.search.AssertingIndexSearcher;
|
import org.apache.lucene.search.AssertingIndexSearcher;
|
||||||
import org.apache.lucene.search.IndexSearcher;
|
import org.apache.lucene.search.IndexSearcher;
|
||||||
import org.apache.lucene.search.SearcherManager;
|
import org.apache.lucene.search.SearcherManager;
|
||||||
|
import org.elasticsearch.index.engine.Engine;
|
||||||
import org.elasticsearch.index.engine.EngineConfig;
|
import org.elasticsearch.index.engine.EngineConfig;
|
||||||
import org.elasticsearch.index.engine.EngineException;
|
import org.elasticsearch.index.engine.EngineException;
|
||||||
import org.elasticsearch.index.engine.InternalEngine;
|
import org.elasticsearch.index.engine.InternalEngine;
|
||||||
|
@ -71,12 +72,7 @@ final class MockInternalEngine extends InternalEngine {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected Searcher newSearcher(String source, IndexSearcher searcher, SearcherManager manager) throws EngineException {
|
protected Searcher newSearcher(String source, IndexSearcher searcher, SearcherManager manager) throws EngineException {
|
||||||
final AssertingIndexSearcher assertingIndexSearcher = support().newSearcher(this, source, searcher, manager);
|
final Searcher engineSearcher = super.newSearcher(source, searcher, manager);
|
||||||
assertingIndexSearcher.setSimilarity(searcher.getSimilarity());
|
return support().wrapSearcher(source, engineSearcher, searcher, manager);
|
||||||
// pass the original searcher to the super.newSearcher() method to make sure this is the searcher that will
|
|
||||||
// be released later on. If we wrap an index reader here must not pass the wrapped version to the manager
|
|
||||||
// on release otherwise the reader will be closed too early. - good news, stuff will fail all over the place if we don't get this right here
|
|
||||||
return new AssertingSearcher(assertingIndexSearcher,
|
|
||||||
super.newSearcher(source, searcher, manager), shardId, MockEngineSupport.INFLIGHT_ENGINE_SEARCHERS, logger);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -37,29 +37,10 @@ final class MockShadowEngine extends ShadowEngine {
|
||||||
this.support = new MockEngineSupport(config);
|
this.support = new MockEngineSupport(config);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public void close() throws IOException {
|
|
||||||
try {
|
|
||||||
super.close();
|
|
||||||
} finally {
|
|
||||||
if (logger.isTraceEnabled()) {
|
|
||||||
// log debug if we have pending searchers
|
|
||||||
for (Map.Entry<AssertingSearcher, RuntimeException> entry : MockEngineSupport.INFLIGHT_ENGINE_SEARCHERS.entrySet()) {
|
|
||||||
logger.trace("Unreleased Searchers instance for shard [{}]", entry.getValue(), entry.getKey().shardId());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected Searcher newSearcher(String source, IndexSearcher searcher, SearcherManager manager) throws EngineException {
|
protected Searcher newSearcher(String source, IndexSearcher searcher, SearcherManager manager) throws EngineException {
|
||||||
final AssertingIndexSearcher assertingIndexSearcher = support.newSearcher(this, source, searcher, manager);
|
final Searcher engineSearcher = super.newSearcher(source, searcher, manager);
|
||||||
assertingIndexSearcher.setSimilarity(searcher.getSimilarity());
|
return support.wrapSearcher(source, engineSearcher, searcher, manager);
|
||||||
// pass the original searcher to the super.newSearcher() method to make sure this is the searcher that will
|
|
||||||
// be released later on. If we wrap an index reader here must not pass the wrapped version to the manager
|
|
||||||
// on release otherwise the reader will be closed too early. - good news, stuff will fail all over the place if we don't get this right here
|
|
||||||
return new AssertingSearcher(assertingIndexSearcher,
|
|
||||||
super.newSearcher(source, searcher, manager), shardId, MockEngineSupport.INFLIGHT_ENGINE_SEARCHERS, logger);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -26,7 +26,6 @@ import com.google.common.collect.Iterables;
|
||||||
import org.apache.lucene.search.BooleanQuery;
|
import org.apache.lucene.search.BooleanQuery;
|
||||||
import org.apache.lucene.search.Query;
|
import org.apache.lucene.search.Query;
|
||||||
import org.elasticsearch.ElasticsearchException;
|
import org.elasticsearch.ElasticsearchException;
|
||||||
import org.elasticsearch.ElasticsearchIllegalStateException;
|
|
||||||
import org.elasticsearch.ExceptionsHelper;
|
import org.elasticsearch.ExceptionsHelper;
|
||||||
import org.elasticsearch.Version;
|
import org.elasticsearch.Version;
|
||||||
import org.elasticsearch.action.ActionFuture;
|
import org.elasticsearch.action.ActionFuture;
|
||||||
|
@ -67,9 +66,6 @@ import org.elasticsearch.rest.RestStatus;
|
||||||
import org.elasticsearch.search.SearchHit;
|
import org.elasticsearch.search.SearchHit;
|
||||||
import org.elasticsearch.search.suggest.Suggest;
|
import org.elasticsearch.search.suggest.Suggest;
|
||||||
import org.elasticsearch.test.VersionUtils;
|
import org.elasticsearch.test.VersionUtils;
|
||||||
import org.elasticsearch.test.engine.AssertingSearcher;
|
|
||||||
import org.elasticsearch.test.engine.MockEngineSupport;
|
|
||||||
import org.elasticsearch.test.store.MockDirectoryHelper;
|
|
||||||
import org.hamcrest.Matcher;
|
import org.hamcrest.Matcher;
|
||||||
import org.hamcrest.Matchers;
|
import org.hamcrest.Matchers;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
|
@ -80,7 +76,6 @@ import java.lang.reflect.InvocationTargetException;
|
||||||
import java.nio.file.Files;
|
import java.nio.file.Files;
|
||||||
import java.nio.file.Path;
|
import java.nio.file.Path;
|
||||||
import java.util.*;
|
import java.util.*;
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
|
|
||||||
import static com.google.common.base.Predicates.isNull;
|
import static com.google.common.base.Predicates.isNull;
|
||||||
import static org.elasticsearch.test.ElasticsearchTestCase.*;
|
import static org.elasticsearch.test.ElasticsearchTestCase.*;
|
||||||
|
@ -683,67 +678,6 @@ public class ElasticsearchAssertions {
|
||||||
return response;
|
return response;
|
||||||
}
|
}
|
||||||
|
|
||||||
public static void assertAllSearchersClosed() {
|
|
||||||
/* in some cases we finish a test faster than the freeContext calls make it to the
|
|
||||||
* shards. Let's wait for some time if there are still searchers. If the are really
|
|
||||||
* pending we will fail anyway.*/
|
|
||||||
try {
|
|
||||||
if (awaitBusy(new Predicate<Object>() {
|
|
||||||
@Override
|
|
||||||
public boolean apply(Object o) {
|
|
||||||
return MockEngineSupport.INFLIGHT_ENGINE_SEARCHERS.isEmpty();
|
|
||||||
}
|
|
||||||
}, 5, TimeUnit.SECONDS)) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
} catch (InterruptedException ex) {
|
|
||||||
if (MockEngineSupport.INFLIGHT_ENGINE_SEARCHERS.isEmpty()) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
try {
|
|
||||||
RuntimeException ex = null;
|
|
||||||
StringBuilder builder = new StringBuilder("Unclosed Searchers instance for shards: [");
|
|
||||||
for (Map.Entry<AssertingSearcher, RuntimeException> entry : MockEngineSupport.INFLIGHT_ENGINE_SEARCHERS.entrySet()) {
|
|
||||||
ex = entry.getValue();
|
|
||||||
builder.append(entry.getKey().shardId()).append(",");
|
|
||||||
}
|
|
||||||
builder.append("]");
|
|
||||||
throw new RuntimeException(builder.toString(), ex);
|
|
||||||
} finally {
|
|
||||||
MockEngineSupport.INFLIGHT_ENGINE_SEARCHERS.clear();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public static void assertAllFilesClosed() {
|
|
||||||
try {
|
|
||||||
for (final MockDirectoryHelper.ElasticsearchMockDirectoryWrapper w : MockDirectoryHelper.wrappers) {
|
|
||||||
try {
|
|
||||||
w.awaitClosed(5000);
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
Thread.interrupted();
|
|
||||||
}
|
|
||||||
if (!w.successfullyClosed()) {
|
|
||||||
if (w.closeException() == null) {
|
|
||||||
try {
|
|
||||||
w.close();
|
|
||||||
} catch (IOException e) {
|
|
||||||
throw new ElasticsearchIllegalStateException("directory close threw IOException", e);
|
|
||||||
}
|
|
||||||
if (w.closeException() != null) {
|
|
||||||
throw w.closeException();
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
throw w.closeException();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
assertThat(w.isOpen(), is(false));
|
|
||||||
}
|
|
||||||
} finally {
|
|
||||||
MockDirectoryHelper.wrappers.clear();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public static void assertNodeContainsPlugins(NodesInfoResponse response, String nodeId,
|
public static void assertNodeContainsPlugins(NodesInfoResponse response, String nodeId,
|
||||||
List<String> expectedJvmPluginNames,
|
List<String> expectedJvmPluginNames,
|
||||||
List<String> expectedJvmPluginDescriptions,
|
List<String> expectedJvmPluginDescriptions,
|
||||||
|
|
|
@ -1,199 +0,0 @@
|
||||||
/*
|
|
||||||
* Licensed to Elasticsearch under one or more contributor
|
|
||||||
* license agreements. See the NOTICE file distributed with
|
|
||||||
* this work for additional information regarding copyright
|
|
||||||
* ownership. Elasticsearch licenses this file to you under
|
|
||||||
* the Apache License, Version 2.0 (the "License"); you may
|
|
||||||
* not use this file except in compliance with the License.
|
|
||||||
* You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing,
|
|
||||||
* software distributed under the License is distributed on an
|
|
||||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
||||||
* KIND, either express or implied. See the License for the
|
|
||||||
* specific language governing permissions and limitations
|
|
||||||
* under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package org.elasticsearch.test.store;
|
|
||||||
|
|
||||||
import com.carrotsearch.randomizedtesting.generators.RandomPicks;
|
|
||||||
import org.apache.lucene.store.Directory;
|
|
||||||
import org.apache.lucene.store.FilterDirectory;
|
|
||||||
import org.apache.lucene.store.MockDirectoryWrapper.Throttling;
|
|
||||||
import org.apache.lucene.store.MockDirectoryWrapper;
|
|
||||||
import org.apache.lucene.store.NRTCachingDirectory;
|
|
||||||
import org.elasticsearch.common.logging.ESLogger;
|
|
||||||
import org.elasticsearch.common.settings.ImmutableSettings;
|
|
||||||
import org.elasticsearch.common.settings.Settings;
|
|
||||||
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
|
|
||||||
import org.elasticsearch.index.shard.ShardId;
|
|
||||||
import org.elasticsearch.index.shard.ShardPath;
|
|
||||||
import org.elasticsearch.index.store.FsDirectoryService;
|
|
||||||
import org.elasticsearch.index.store.IndexStore;
|
|
||||||
import org.elasticsearch.index.store.IndexStoreModule;
|
|
||||||
import com.carrotsearch.randomizedtesting.SeedUtils;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.lang.reflect.Field;
|
|
||||||
import java.util.Collection;
|
|
||||||
import java.util.Random;
|
|
||||||
import java.util.Set;
|
|
||||||
|
|
||||||
public class MockDirectoryHelper {
|
|
||||||
public static final String RANDOM_IO_EXCEPTION_RATE = "index.store.mock.random.io_exception_rate";
|
|
||||||
public static final String RANDOM_IO_EXCEPTION_RATE_ON_OPEN = "index.store.mock.random.io_exception_rate_on_open";
|
|
||||||
public static final String RANDOM_PREVENT_DOUBLE_WRITE = "index.store.mock.random.prevent_double_write";
|
|
||||||
public static final String RANDOM_NO_DELETE_OPEN_FILE = "index.store.mock.random.no_delete_open_file";
|
|
||||||
public static final String CRASH_INDEX = "index.store.mock.random.crash_index";
|
|
||||||
|
|
||||||
public static final Set<ElasticsearchMockDirectoryWrapper> wrappers = ConcurrentCollections.newConcurrentSet();
|
|
||||||
|
|
||||||
private final Random random;
|
|
||||||
private final double randomIOExceptionRate;
|
|
||||||
private final double randomIOExceptionRateOnOpen;
|
|
||||||
private final Throttling throttle;
|
|
||||||
private final Settings indexSettings;
|
|
||||||
private final ShardId shardId;
|
|
||||||
private final boolean preventDoubleWrite;
|
|
||||||
private final boolean noDeleteOpenFile;
|
|
||||||
private final ESLogger logger;
|
|
||||||
private final boolean crashIndex;
|
|
||||||
|
|
||||||
public MockDirectoryHelper(ShardId shardId, Settings indexSettings, ESLogger logger, Random random, long seed) {
|
|
||||||
this.random = random;
|
|
||||||
randomIOExceptionRate = indexSettings.getAsDouble(RANDOM_IO_EXCEPTION_RATE, 0.0d);
|
|
||||||
randomIOExceptionRateOnOpen = indexSettings.getAsDouble(RANDOM_IO_EXCEPTION_RATE_ON_OPEN, 0.0d);
|
|
||||||
preventDoubleWrite = indexSettings.getAsBoolean(RANDOM_PREVENT_DOUBLE_WRITE, true); // true is default in MDW
|
|
||||||
noDeleteOpenFile = indexSettings.getAsBoolean(RANDOM_NO_DELETE_OPEN_FILE, random.nextBoolean()); // true is default in MDW
|
|
||||||
random.nextInt(shardId.getId() + 1); // some randomness per shard
|
|
||||||
throttle = Throttling.NEVER;
|
|
||||||
crashIndex = indexSettings.getAsBoolean(CRASH_INDEX, true);
|
|
||||||
|
|
||||||
if (logger.isDebugEnabled()) {
|
|
||||||
logger.debug("Using MockDirWrapper with seed [{}] throttle: [{}] crashIndex: [{}]", SeedUtils.formatSeed(seed),
|
|
||||||
throttle, crashIndex);
|
|
||||||
}
|
|
||||||
this.indexSettings = indexSettings;
|
|
||||||
this.shardId = shardId;
|
|
||||||
this.logger = logger;
|
|
||||||
}
|
|
||||||
|
|
||||||
public Directory wrap(Directory dir) {
|
|
||||||
final ElasticsearchMockDirectoryWrapper w = new ElasticsearchMockDirectoryWrapper(random, dir, logger, this.crashIndex);
|
|
||||||
w.setRandomIOExceptionRate(randomIOExceptionRate);
|
|
||||||
w.setRandomIOExceptionRateOnOpen(randomIOExceptionRateOnOpen);
|
|
||||||
w.setThrottling(throttle);
|
|
||||||
w.setCheckIndexOnClose(false); // we do this on the index level
|
|
||||||
w.setPreventDoubleWrite(preventDoubleWrite);
|
|
||||||
// TODO: make this test robust to virus scanner
|
|
||||||
w.setEnableVirusScanner(false);
|
|
||||||
w.setNoDeleteOpenFile(noDeleteOpenFile);
|
|
||||||
w.setUseSlowOpenClosers(false);
|
|
||||||
wrappers.add(w);
|
|
||||||
return w;
|
|
||||||
}
|
|
||||||
|
|
||||||
public FsDirectoryService randomDirectorService(IndexStore indexStore, ShardPath path) {
|
|
||||||
ImmutableSettings.Builder builder = ImmutableSettings.settingsBuilder();
|
|
||||||
builder.put(indexSettings);
|
|
||||||
builder.put(IndexStoreModule.STORE_TYPE, RandomPicks.randomFrom(random, IndexStoreModule.Type.values()));
|
|
||||||
return new FsDirectoryService(builder.build(), indexStore, path);
|
|
||||||
}
|
|
||||||
|
|
||||||
public static final class ElasticsearchMockDirectoryWrapper extends MockDirectoryWrapper {
|
|
||||||
|
|
||||||
private final ESLogger logger;
|
|
||||||
private final boolean crash;
|
|
||||||
private volatile RuntimeException closeException;
|
|
||||||
private final Object lock = new Object();
|
|
||||||
private final Set<String> superUnSyncedFiles;
|
|
||||||
private final Random superRandomState;
|
|
||||||
|
|
||||||
public ElasticsearchMockDirectoryWrapper(Random random, Directory delegate, ESLogger logger, boolean crash) {
|
|
||||||
super(random, delegate);
|
|
||||||
this.crash = crash;
|
|
||||||
this.logger = logger;
|
|
||||||
|
|
||||||
// TODO: remove all this and cutover to MockFS (DisableFsyncFS) instead
|
|
||||||
try {
|
|
||||||
Field field = MockDirectoryWrapper.class.getDeclaredField("unSyncedFiles");
|
|
||||||
field.setAccessible(true);
|
|
||||||
superUnSyncedFiles = (Set<String>) field.get(this);
|
|
||||||
|
|
||||||
field = MockDirectoryWrapper.class.getDeclaredField("randomState");
|
|
||||||
field.setAccessible(true);
|
|
||||||
superRandomState = (Random) field.get(this);
|
|
||||||
} catch (ReflectiveOperationException roe) {
|
|
||||||
throw new RuntimeException(roe);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public synchronized void close() throws IOException {
|
|
||||||
try {
|
|
||||||
super.close();
|
|
||||||
} catch (RuntimeException ex) {
|
|
||||||
logger.info("MockDirectoryWrapper#close() threw exception", ex);
|
|
||||||
closeException = ex;
|
|
||||||
throw ex;
|
|
||||||
} finally {
|
|
||||||
synchronized (lock) {
|
|
||||||
lock.notifyAll();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Returns true if {@link #in} must sync its files.
|
|
||||||
* Currently, only {@link NRTCachingDirectory} requires sync'ing its files
|
|
||||||
* because otherwise they are cached in an internal {@link org.apache.lucene.store.RAMDirectory}. If
|
|
||||||
* other directories require that too, they should be added to this method.
|
|
||||||
*/
|
|
||||||
private boolean mustSync() {
|
|
||||||
Directory delegate = in;
|
|
||||||
while (delegate instanceof FilterDirectory) {
|
|
||||||
if (delegate instanceof NRTCachingDirectory) {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
delegate = ((FilterDirectory) delegate).getDelegate();
|
|
||||||
}
|
|
||||||
return delegate instanceof NRTCachingDirectory;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public synchronized void sync(Collection<String> names) throws IOException {
|
|
||||||
// don't wear out our hardware so much in tests.
|
|
||||||
if (superRandomState.nextInt(100) == 0 || mustSync()) {
|
|
||||||
super.sync(names);
|
|
||||||
} else {
|
|
||||||
superUnSyncedFiles.removeAll(names);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public void awaitClosed(long timeout) throws InterruptedException {
|
|
||||||
synchronized (lock) {
|
|
||||||
if(isOpen()) {
|
|
||||||
lock.wait(timeout);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public synchronized boolean successfullyClosed() {
|
|
||||||
return closeException == null && !isOpen();
|
|
||||||
}
|
|
||||||
|
|
||||||
public synchronized RuntimeException closeException() {
|
|
||||||
return closeException;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public synchronized void crash() throws IOException {
|
|
||||||
if (crash) {
|
|
||||||
super.crash();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -19,57 +19,86 @@
|
||||||
|
|
||||||
package org.elasticsearch.test.store;
|
package org.elasticsearch.test.store;
|
||||||
|
|
||||||
|
import com.carrotsearch.randomizedtesting.SeedUtils;
|
||||||
|
import com.carrotsearch.randomizedtesting.generators.RandomPicks;
|
||||||
import com.google.common.base.Charsets;
|
import com.google.common.base.Charsets;
|
||||||
|
|
||||||
import org.apache.lucene.index.CheckIndex;
|
import org.apache.lucene.index.CheckIndex;
|
||||||
import org.apache.lucene.index.IndexWriter;
|
import org.apache.lucene.index.IndexWriter;
|
||||||
import org.apache.lucene.store.Directory;
|
import org.apache.lucene.store.*;
|
||||||
import org.apache.lucene.store.LockFactory;
|
import org.apache.lucene.util.LuceneTestCase;
|
||||||
import org.apache.lucene.store.StoreRateLimiting;
|
import org.apache.lucene.util.TestRuleMarkFailure;
|
||||||
|
import org.elasticsearch.ElasticsearchException;
|
||||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||||
import org.elasticsearch.common.Nullable;
|
import org.elasticsearch.common.Nullable;
|
||||||
import org.elasticsearch.common.inject.Inject;
|
import org.elasticsearch.common.inject.Inject;
|
||||||
import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
||||||
import org.elasticsearch.common.lease.Releasables;
|
import org.elasticsearch.common.lease.Releasables;
|
||||||
import org.elasticsearch.common.lucene.Lucene;
|
import org.elasticsearch.common.lucene.Lucene;
|
||||||
|
import org.elasticsearch.common.settings.ImmutableSettings;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.index.settings.IndexSettings;
|
import org.elasticsearch.index.settings.IndexSettings;
|
||||||
import org.elasticsearch.index.shard.*;
|
import org.elasticsearch.index.shard.*;
|
||||||
import org.elasticsearch.index.store.IndexStore;
|
import org.elasticsearch.index.store.IndexStore;
|
||||||
|
import org.elasticsearch.index.store.IndexStoreModule;
|
||||||
import org.elasticsearch.index.store.Store;
|
import org.elasticsearch.index.store.Store;
|
||||||
import org.elasticsearch.index.store.FsDirectoryService;
|
import org.elasticsearch.index.store.FsDirectoryService;
|
||||||
import org.elasticsearch.indices.IndicesLifecycle;
|
import org.elasticsearch.indices.IndicesLifecycle;
|
||||||
import org.elasticsearch.indices.IndicesService;
|
import org.elasticsearch.indices.IndicesService;
|
||||||
import org.elasticsearch.test.ElasticsearchTestCase;
|
import org.elasticsearch.test.ElasticsearchTestCase;
|
||||||
import org.elasticsearch.test.ElasticsearchIntegrationTest;
|
import org.elasticsearch.test.ElasticsearchIntegrationTest;
|
||||||
|
import org.junit.Assert;
|
||||||
|
|
||||||
|
import java.io.Closeable;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.PrintStream;
|
import java.io.PrintStream;
|
||||||
|
import java.lang.reflect.Field;
|
||||||
import java.nio.file.Path;
|
import java.nio.file.Path;
|
||||||
import java.util.Arrays;
|
import java.util.*;
|
||||||
import java.util.EnumSet;
|
|
||||||
import java.util.Random;
|
|
||||||
|
|
||||||
public class MockFSDirectoryService extends FsDirectoryService {
|
public class MockFSDirectoryService extends FsDirectoryService {
|
||||||
|
|
||||||
|
public static final String CHECK_INDEX_ON_CLOSE = "index.store.mock.check_index_on_close";
|
||||||
|
public static final String RANDOM_IO_EXCEPTION_RATE_ON_OPEN = "index.store.mock.random.io_exception_rate_on_open";
|
||||||
|
public static final String RANDOM_PREVENT_DOUBLE_WRITE = "index.store.mock.random.prevent_double_write";
|
||||||
|
public static final String RANDOM_NO_DELETE_OPEN_FILE = "index.store.mock.random.no_delete_open_file";
|
||||||
|
public static final String CRASH_INDEX = "index.store.mock.random.crash_index";
|
||||||
|
|
||||||
private static final EnumSet<IndexShardState> validCheckIndexStates = EnumSet.of(
|
private static final EnumSet<IndexShardState> validCheckIndexStates = EnumSet.of(
|
||||||
IndexShardState.STARTED, IndexShardState.RELOCATED , IndexShardState.POST_RECOVERY
|
IndexShardState.STARTED, IndexShardState.RELOCATED , IndexShardState.POST_RECOVERY
|
||||||
);
|
);
|
||||||
|
|
||||||
private final MockDirectoryHelper helper;
|
private final FsDirectoryService delegateService;
|
||||||
private FsDirectoryService delegateService;
|
|
||||||
public static final String CHECK_INDEX_ON_CLOSE = "index.store.mock.check_index_on_close";
|
|
||||||
private final boolean checkIndexOnClose;
|
private final boolean checkIndexOnClose;
|
||||||
|
private final Random random;
|
||||||
|
private final double randomIOExceptionRate;
|
||||||
|
private final double randomIOExceptionRateOnOpen;
|
||||||
|
private final MockDirectoryWrapper.Throttling throttle;
|
||||||
|
private final Settings indexSettings;
|
||||||
|
private final boolean preventDoubleWrite;
|
||||||
|
private final boolean noDeleteOpenFile;
|
||||||
|
private final boolean crashIndex;
|
||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
public MockFSDirectoryService(@IndexSettings Settings indexSettings, IndexStore indexStore, final IndicesService service, final ShardPath path) {
|
public MockFSDirectoryService(@IndexSettings Settings indexSettings, IndexStore indexStore, final IndicesService service, final ShardPath path) {
|
||||||
super(indexSettings, indexStore, path);
|
super(indexSettings, indexStore, path);
|
||||||
final long seed = indexSettings.getAsLong(ElasticsearchIntegrationTest.SETTING_INDEX_SEED, 0l);
|
final long seed = indexSettings.getAsLong(ElasticsearchIntegrationTest.SETTING_INDEX_SEED, 0l);
|
||||||
Random random = new Random(seed);
|
this.random = new Random(seed);
|
||||||
helper = new MockDirectoryHelper(shardId, indexSettings, logger, random, seed);
|
|
||||||
checkIndexOnClose = indexSettings.getAsBoolean(CHECK_INDEX_ON_CLOSE, true);
|
checkIndexOnClose = indexSettings.getAsBoolean(CHECK_INDEX_ON_CLOSE, true);
|
||||||
|
randomIOExceptionRate = indexSettings.getAsDouble(RANDOM_IO_EXCEPTION_RATE, 0.0d);
|
||||||
|
randomIOExceptionRateOnOpen = indexSettings.getAsDouble(RANDOM_IO_EXCEPTION_RATE_ON_OPEN, 0.0d);
|
||||||
|
preventDoubleWrite = indexSettings.getAsBoolean(RANDOM_PREVENT_DOUBLE_WRITE, true); // true is default in MDW
|
||||||
|
noDeleteOpenFile = indexSettings.getAsBoolean(RANDOM_NO_DELETE_OPEN_FILE, random.nextBoolean()); // true is default in MDW
|
||||||
|
random.nextInt(shardId.getId() + 1); // some randomness per shard
|
||||||
|
throttle = MockDirectoryWrapper.Throttling.NEVER;
|
||||||
|
crashIndex = indexSettings.getAsBoolean(CRASH_INDEX, true);
|
||||||
|
|
||||||
delegateService = helper.randomDirectorService(indexStore, path);
|
if (logger.isDebugEnabled()) {
|
||||||
|
logger.debug("Using MockDirWrapper with seed [{}] throttle: [{}] crashIndex: [{}]", SeedUtils.formatSeed(seed),
|
||||||
|
throttle, crashIndex);
|
||||||
|
}
|
||||||
|
this.indexSettings = indexSettings;
|
||||||
|
delegateService = randomDirectorService(indexStore, path);
|
||||||
if (checkIndexOnClose) {
|
if (checkIndexOnClose) {
|
||||||
final IndicesLifecycle.Listener listener = new IndicesLifecycle.Listener() {
|
final IndicesLifecycle.Listener listener = new IndicesLifecycle.Listener() {
|
||||||
|
|
||||||
|
@ -112,7 +141,7 @@ public class MockFSDirectoryService extends FsDirectoryService {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Directory newDirectory() throws IOException {
|
public Directory newDirectory() throws IOException {
|
||||||
return helper.wrap(delegateService.newDirectory());
|
return wrap(delegateService.newDirectory());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -173,4 +202,117 @@ public class MockFSDirectoryService extends FsDirectoryService {
|
||||||
public long throttleTimeInNanos() {
|
public long throttleTimeInNanos() {
|
||||||
return delegateService.throttleTimeInNanos();
|
return delegateService.throttleTimeInNanos();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static final String RANDOM_IO_EXCEPTION_RATE = "index.store.mock.random.io_exception_rate";
|
||||||
|
|
||||||
|
private Directory wrap(Directory dir) {
|
||||||
|
final ElasticsearchMockDirectoryWrapper w = new ElasticsearchMockDirectoryWrapper(random, dir, this.crashIndex);
|
||||||
|
w.setRandomIOExceptionRate(randomIOExceptionRate);
|
||||||
|
w.setRandomIOExceptionRateOnOpen(randomIOExceptionRateOnOpen);
|
||||||
|
w.setThrottling(throttle);
|
||||||
|
w.setCheckIndexOnClose(false); // we do this on the index level
|
||||||
|
w.setPreventDoubleWrite(preventDoubleWrite);
|
||||||
|
// TODO: make this test robust to virus scanner
|
||||||
|
w.setEnableVirusScanner(false);
|
||||||
|
w.setNoDeleteOpenFile(noDeleteOpenFile);
|
||||||
|
w.setUseSlowOpenClosers(false);
|
||||||
|
LuceneTestCase.closeAfterSuite(new CloseableDirectory(w));
|
||||||
|
return w;
|
||||||
|
}
|
||||||
|
|
||||||
|
private FsDirectoryService randomDirectorService(IndexStore indexStore, ShardPath path) {
|
||||||
|
ImmutableSettings.Builder builder = ImmutableSettings.settingsBuilder();
|
||||||
|
builder.put(indexSettings);
|
||||||
|
builder.put(IndexStoreModule.STORE_TYPE, RandomPicks.randomFrom(random, IndexStoreModule.Type.values()));
|
||||||
|
return new FsDirectoryService(builder.build(), indexStore, path);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static final class ElasticsearchMockDirectoryWrapper extends MockDirectoryWrapper {
|
||||||
|
|
||||||
|
private final boolean crash;
|
||||||
|
private final Set<String> superUnSyncedFiles;
|
||||||
|
private final Random superRandomState;
|
||||||
|
|
||||||
|
public ElasticsearchMockDirectoryWrapper(Random random, Directory delegate, boolean crash) {
|
||||||
|
super(random, delegate);
|
||||||
|
this.crash = crash;
|
||||||
|
|
||||||
|
// TODO: remove all this and cutover to MockFS (DisableFsyncFS) instead
|
||||||
|
try {
|
||||||
|
Field field = MockDirectoryWrapper.class.getDeclaredField("unSyncedFiles");
|
||||||
|
field.setAccessible(true);
|
||||||
|
superUnSyncedFiles = (Set<String>) field.get(this);
|
||||||
|
|
||||||
|
field = MockDirectoryWrapper.class.getDeclaredField("randomState");
|
||||||
|
field.setAccessible(true);
|
||||||
|
superRandomState = (Random) field.get(this);
|
||||||
|
} catch (ReflectiveOperationException roe) {
|
||||||
|
throw new RuntimeException(roe);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns true if {@link #in} must sync its files.
|
||||||
|
* Currently, only {@link org.apache.lucene.store.NRTCachingDirectory} requires sync'ing its files
|
||||||
|
* because otherwise they are cached in an internal {@link org.apache.lucene.store.RAMDirectory}. If
|
||||||
|
* other directories require that too, they should be added to this method.
|
||||||
|
*/
|
||||||
|
private boolean mustSync() {
|
||||||
|
Directory delegate = in;
|
||||||
|
while (delegate instanceof FilterDirectory) {
|
||||||
|
if (delegate instanceof NRTCachingDirectory) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
delegate = ((FilterDirectory) delegate).getDelegate();
|
||||||
|
}
|
||||||
|
return delegate instanceof NRTCachingDirectory;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized void sync(Collection<String> names) throws IOException {
|
||||||
|
// don't wear out our hardware so much in tests.
|
||||||
|
if (superRandomState.nextInt(100) == 0 || mustSync()) {
|
||||||
|
super.sync(names);
|
||||||
|
} else {
|
||||||
|
superUnSyncedFiles.removeAll(names);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized void crash() throws IOException {
|
||||||
|
if (crash) {
|
||||||
|
super.crash();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
final class CloseableDirectory implements Closeable {
|
||||||
|
private final BaseDirectoryWrapper dir;
|
||||||
|
private final TestRuleMarkFailure failureMarker;
|
||||||
|
|
||||||
|
public CloseableDirectory(BaseDirectoryWrapper dir) {
|
||||||
|
this.dir = dir;
|
||||||
|
try {
|
||||||
|
final Field suiteFailureMarker = LuceneTestCase.class.getDeclaredField("suiteFailureMarker");
|
||||||
|
suiteFailureMarker.setAccessible(true);
|
||||||
|
this.failureMarker = (TestRuleMarkFailure) suiteFailureMarker.get(LuceneTestCase.class);
|
||||||
|
} catch (Throwable e) {
|
||||||
|
throw new ElasticsearchException("foo", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() {
|
||||||
|
// We only attempt to check open/closed state if there were no other test
|
||||||
|
// failures.
|
||||||
|
try {
|
||||||
|
if (failureMarker.wasSuccessful() && dir.isOpen()) {
|
||||||
|
Assert.fail("Directory not closed: " + dir);
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
// TODO: perform real close of the delegate: LUCENE-4058
|
||||||
|
// dir.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue