Add support for Lucene's MockDirectoryWrapper

MockDirectoryWrapper adds asserting logic to the low level directory
implementation that helps to track and catch resource leaks like
unclosed index inputs caused by dangling IndexReader or IndexSearcher
instances. It prevents double writes to files and allows low level
random exceptions to be thrown for testing index consistency etc.

Closes #3654
This commit is contained in:
Simon Willnauer 2013-09-09 17:57:25 +02:00
parent 0e936c99e3
commit fddb7420ae
39 changed files with 1019 additions and 361 deletions

View File

@ -0,0 +1,155 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.lucene.store;
import java.io.IOException;
import java.util.Collection;
public class FilterDirectory extends Directory {
protected final Directory in;
public FilterDirectory(Directory in) {
this.in = in;
}
@Override
public String[] listAll() throws IOException {
ensureOpen();
return in.listAll();
}
@Override
public boolean fileExists(String name) throws IOException {
ensureOpen();
return in.fileExists(name);
}
@Override
public void deleteFile(String name) throws IOException {
ensureOpen();
in.deleteFile(name);
}
@Override
public long fileLength(String name) throws IOException {
ensureOpen();
return in.fileLength(name);
}
@Override
public IndexOutput createOutput(String name, IOContext context) throws IOException {
ensureOpen();
return in.createOutput(name, context);
}
@Override
public void sync(Collection<String> names) throws IOException {
ensureOpen();
in.sync(names);
}
@Override
public IndexInput openInput(String name, IOContext context) throws IOException {
ensureOpen();
return in.openInput(name, context);
}
@Override
public Lock makeLock(String name) {
ensureOpen();
return in.makeLock(name);
}
@Override
public void clearLock(String name) throws IOException {
ensureOpen();
in.clearLock(name);
}
@Override
public void close() throws IOException {
in.close();
}
@Override
public void setLockFactory(LockFactory lockFactory) throws IOException {
ensureOpen();
in.setLockFactory(lockFactory);
}
@Override
public LockFactory getLockFactory() {
ensureOpen();
return in.getLockFactory();
}
@Override
public String getLockID() {
ensureOpen();
return in.getLockID();
}
@Override
public void copy(Directory to, String src, String dest, IOContext context) throws IOException {
ensureOpen();
in.copy(to, src, dest, context);
}
@Override
public Directory.IndexInputSlicer createSlicer(final String name, final IOContext context) throws IOException {
ensureOpen();
return in.createSlicer(name, context);
}
public Directory getDelegate() {
ensureOpen();
return in;
}
@SuppressWarnings("resource")
final Directory getLeafDirectory() {
Directory current = getDelegate();
while ((current instanceof FilterDirectory)) {
current = ((FilterDirectory) current).getDelegate();
}
return current;
}
public static <T extends Directory> T getLeaf(Directory dir, Class<T> targetClass) {
return getLeaf(dir, targetClass, null);
}
public static <T extends Directory> T getLeaf(Directory dir, Class<T> targetClass, T defaultValue) {
Directory d = dir;
if (dir instanceof FilterDirectory) {
d = ((FilterDirectory) dir).getLeafDirectory();
}
if (targetClass.isAssignableFrom(d.getClass())) {
return targetClass.cast(d);
} else {
return defaultValue;
}
}
@Override
public String toString() {
return in.toString();
}
}

View File

@ -21,10 +21,8 @@ package org.apache.lucene.store;
import org.apache.lucene.store.IOContext.Context;
import java.io.IOException;
import java.util.Collection;
public final class RateLimitedFSDirectory extends Directory {
private final FSDirectory delegate;
public final class RateLimitedFSDirectory extends FilterDirectory {
private final StoreRateLimiting.Provider rateLimitingProvider;
@ -32,43 +30,15 @@ public final class RateLimitedFSDirectory extends Directory {
public RateLimitedFSDirectory(FSDirectory wrapped, StoreRateLimiting.Provider rateLimitingProvider,
StoreRateLimiting.Listener rateListener) {
this.delegate = wrapped;
super(wrapped);
this.rateLimitingProvider = rateLimitingProvider;
this.rateListener = rateListener;
}
public FSDirectory wrappedDirectory() {
return this.delegate;
}
@Override
public String[] listAll() throws IOException {
ensureOpen();
return delegate.listAll();
}
@Override
public boolean fileExists(String name) throws IOException {
ensureOpen();
return delegate.fileExists(name);
}
@Override
public void deleteFile(String name) throws IOException {
ensureOpen();
delegate.deleteFile(name);
}
@Override
public long fileLength(String name) throws IOException {
ensureOpen();
return delegate.fileLength(name);
}
@Override
public IndexOutput createOutput(String name, IOContext context) throws IOException {
ensureOpen();
final IndexOutput output = delegate.createOutput(name, context);
final IndexOutput output = in.createOutput(name, context);
StoreRateLimiting rateLimiting = rateLimitingProvider.rateLimiting();
StoreRateLimiting.Type type = rateLimiting.getType();
@ -87,58 +57,11 @@ public final class RateLimitedFSDirectory extends Directory {
return output;
}
@Override
public void sync(Collection<String> names) throws IOException {
ensureOpen();
delegate.sync(names);
}
@Override
public IndexInput openInput(String name, IOContext context) throws IOException {
ensureOpen();
return delegate.openInput(name, context);
}
@Override
public void close() throws IOException {
isOpen = false;
delegate.close();
}
@Override
public IndexInputSlicer createSlicer(String name, IOContext context) throws IOException {
ensureOpen();
return delegate.createSlicer(name, context);
}
@Override
public Lock makeLock(String name) {
ensureOpen();
return delegate.makeLock(name);
}
@Override
public void clearLock(String name) throws IOException {
ensureOpen();
delegate.clearLock(name);
}
@Override
public void setLockFactory(LockFactory lockFactory) throws IOException {
ensureOpen();
delegate.setLockFactory(lockFactory);
}
@Override
public LockFactory getLockFactory() {
ensureOpen();
return delegate.getLockFactory();
}
@Override
public String getLockID() {
ensureOpen();
return delegate.getLockID();
in.close();
}
@Override
@ -147,17 +70,12 @@ public final class RateLimitedFSDirectory extends Directory {
StoreRateLimiting.Type type = rateLimiting.getType();
RateLimiter limiter = rateLimiting.getRateLimiter();
if (type == StoreRateLimiting.Type.NONE || limiter == null) {
return StoreUtils.toString(delegate);
return StoreUtils.toString(in);
} else {
return "rate_limited(" + StoreUtils.toString(delegate) + ", type=" + type.name() + ", rate=" + limiter.getMbPerSec() + ")";
return "rate_limited(" + StoreUtils.toString(in) + ", type=" + type.name() + ", rate=" + limiter.getMbPerSec() + ")";
}
}
@Override
public void copy(Directory to, String src, String dest, IOContext context) throws IOException {
ensureOpen();
delegate.copy(to, src, dest, context);
}
static final class RateLimitedIndexOutput extends BufferedIndexOutput {

View File

@ -219,7 +219,7 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi
if (request.refresh()) {
try {
indexShard.refresh(new Engine.Refresh().force(false));
} catch (Exception e) {
} catch (Throwable e) {
// ignore
}
}

View File

@ -135,7 +135,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
private final AtomicInteger flushing = new AtomicInteger();
private final Lock flushLock = new ReentrantLock();
private volatile int onGoingRecoveries = 0;
private final RecoveryCounter onGoingRecoveries = new RecoveryCounter();
// A uid (in the form of BytesRef) to the version map
@ -751,7 +751,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
}
if (flush.type() == Flush.Type.NEW_WRITER || flush.type() == Flush.Type.COMMIT_TRANSLOG) {
// check outside the lock as well so we can check without blocking on the write lock
if (onGoingRecoveries > 0) {
if (onGoingRecoveries.get() > 0) {
throw new FlushNotAllowedEngineException(shardId, "recovery is in progress, flush [" + flush.type() + "] is not allowed");
}
}
@ -769,7 +769,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
if (indexWriter == null) {
throw new EngineClosedException(shardId, failedEngine);
}
if (onGoingRecoveries > 0) {
if (onGoingRecoveries.get() > 0) {
throw new FlushNotAllowedEngineException(shardId, "Recovery is in progress, flush is not allowed");
}
// disable refreshing, not dirty
@ -815,7 +815,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
if (indexWriter == null) {
throw new EngineClosedException(shardId, failedEngine);
}
if (onGoingRecoveries > 0) {
if (onGoingRecoveries.get() > 0) {
throw new FlushNotAllowedEngineException(shardId, "Recovery is in progress, flush is not allowed");
}
@ -1043,7 +1043,10 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
// this means that next commits will not be allowed once the lock is released
rwl.writeLock().lock();
try {
onGoingRecoveries++;
if (closed) {
throw new EngineClosedException(shardId);
}
onGoingRecoveries.increment();
} finally {
rwl.writeLock().unlock();
}
@ -1052,14 +1055,14 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
try {
phase1Snapshot = deletionPolicy.snapshot();
} catch (Throwable e) {
--onGoingRecoveries;
onGoingRecoveries.decrement();
throw new RecoveryEngineException(shardId, 1, "Snapshot failed", e);
}
try {
recoveryHandler.phase1(phase1Snapshot);
} catch (Throwable e) {
--onGoingRecoveries;
onGoingRecoveries.decrement();
phase1Snapshot.release();
if (closed) {
e = new EngineClosedException(shardId, e);
@ -1071,7 +1074,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
try {
phase2Snapshot = translog.snapshot();
} catch (Throwable e) {
--onGoingRecoveries;
onGoingRecoveries.decrement();
phase1Snapshot.release();
if (closed) {
e = new EngineClosedException(shardId, e);
@ -1082,7 +1085,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
try {
recoveryHandler.phase2(phase2Snapshot);
} catch (Throwable e) {
--onGoingRecoveries;
onGoingRecoveries.decrement();
phase1Snapshot.release();
phase2Snapshot.release();
if (closed) {
@ -1099,7 +1102,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
} catch (Throwable e) {
throw new RecoveryEngineException(shardId, 3, "Execution failed", e);
} finally {
--onGoingRecoveries;
onGoingRecoveries.decrement();
rwl.writeLock().unlock();
phase1Snapshot.release();
phase2Snapshot.release();
@ -1190,6 +1193,17 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
} finally {
rwl.writeLock().unlock();
}
try {
// wait for recoveries to join and close all resources / IO streams
int ongoingRecoveries = onGoingRecoveries.awaitNoRecoveries(5000);
if (ongoingRecoveries > 0) {
logger.debug("Waiting for ongoing recoveries timed out on close currently ongoing disoveries: [{}]", ongoingRecoveries);
}
} catch (InterruptedException e) {
// ignore & restore interrupt
Thread.currentThread().interrupt();
}
}
class FailEngineOnMergeFailure implements MergeSchedulerProvider.FailureListener {
@ -1500,4 +1514,32 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
return searcher;
}
}
private static final class RecoveryCounter {
private volatile int ongoingRecoveries = 0;
synchronized void increment() {
ongoingRecoveries++;
}
synchronized void decrement() {
ongoingRecoveries--;
if (ongoingRecoveries == 0) {
notifyAll(); // notify waiting threads - we only wait on ongoingRecoveries == 0
}
assert ongoingRecoveries >= 0 : "ongoingRecoveries must be >= 0 but was: " + ongoingRecoveries;
}
int get() {
// volatile read - no sync needed
return ongoingRecoveries;
}
synchronized int awaitNoRecoveries(long timeout) throws InterruptedException {
if (ongoingRecoveries > 0) { // no loop here - we either time out or we are done!
wait(timeout);
}
return ongoingRecoveries;
}
}
}

View File

@ -92,8 +92,6 @@ public class InternalIndexService extends AbstractIndexComponent implements Inde
private final Settings indexSettings;
private final NodeEnvironment nodeEnv;
private final ThreadPool threadPool;
private final PluginsService pluginsService;
@ -135,7 +133,6 @@ public class InternalIndexService extends AbstractIndexComponent implements Inde
IndexGateway indexGateway, IndexStore indexStore, IndexSettingsService settingsService, IndexFieldDataService indexFieldData) {
super(index, indexSettings);
this.injector = injector;
this.nodeEnv = nodeEnv;
this.threadPool = threadPool;
this.indexSettings = indexSettings;
this.analysisService = analysisService;

View File

@ -204,7 +204,7 @@ public class Store extends AbstractIndexShardComponent implements CloseableIndex
try {
indexInput.readInt(); // version
return indexInput.readStringStringMap();
} catch (Exception e) {
} catch (Throwable e) {
// failed to load checksums, ignore and return an empty map
return defaultValue;
} finally {
@ -213,8 +213,8 @@ public class Store extends AbstractIndexShardComponent implements CloseableIndex
}
public void writeChecksums() throws IOException {
String checksumName = CHECKSUMS_PREFIX + System.currentTimeMillis();
ImmutableMap<String, StoreFileMetaData> files = list();
String checksumName = CHECKSUMS_PREFIX + System.currentTimeMillis();
synchronized (mutex) {
Map<String, String> checksums = new HashMap<String, String>();
for (StoreFileMetaData metaData : files.values()) {
@ -222,16 +222,23 @@ public class Store extends AbstractIndexShardComponent implements CloseableIndex
checksums.put(metaData.name(), metaData.checksum());
}
}
while (directory.fileExists(checksumName)) {
checksumName = CHECKSUMS_PREFIX + System.currentTimeMillis();
}
IndexOutput output = directory.createOutput(checksumName, IOContext.DEFAULT, true);
output.writeInt(0); // version
output.writeStringStringMap(checksums);
output.close();
try {
output.writeInt(0); // version
output.writeStringStringMap(checksums);
} finally {
output.close();
}
}
for (StoreFileMetaData metaData : files.values()) {
if (metaData.name().startsWith(CHECKSUMS_PREFIX) && !checksumName.equals(metaData.name())) {
try {
directory.deleteFileChecksum(metaData.name());
} catch (Exception e) {
} catch (Throwable e) {
// ignore
}
}

View File

@ -21,7 +21,7 @@ package org.elasticsearch.index.store.distributor;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory;
import org.apache.lucene.store.RateLimitedFSDirectory;
import org.apache.lucene.store.FilterDirectory;
import org.elasticsearch.index.store.DirectoryService;
import java.io.IOException;
@ -53,11 +53,11 @@ public abstract class AbstractDistributor implements Distributor {
}
}
@SuppressWarnings("unchecked")
protected long getUsableSpace(Directory directory) {
if (directory instanceof RateLimitedFSDirectory) {
return ((RateLimitedFSDirectory) directory).wrappedDirectory().getDirectory().getUsableSpace();
} else if (directory instanceof FSDirectory) {
return ((FSDirectory) directory).getDirectory().getUsableSpace();
final FSDirectory leaf = FilterDirectory.getLeaf(directory, FSDirectory.class);
if (leaf != null) {
return leaf.getDirectory().getUsableSpace();
} else {
return 0;
}

View File

@ -20,6 +20,7 @@
package org.elasticsearch.index.store.fs;
import org.apache.lucene.store.*;
import org.elasticsearch.ElasticSearchIllegalArgumentException;
import org.elasticsearch.common.io.FileSystemUtils;
import org.elasticsearch.common.metrics.CounterMetric;
import org.elasticsearch.common.settings.Settings;
@ -48,16 +49,16 @@ public abstract class FsDirectoryService extends AbstractIndexShardComponent imp
}
@Override
public long throttleTimeInNanos() {
public final long throttleTimeInNanos() {
return rateLimitingTimeInNanos.count();
}
@Override
public StoreRateLimiting rateLimiting() {
public final StoreRateLimiting rateLimiting() {
return indexStore.rateLimiting();
}
protected LockFactory buildLockFactory() throws IOException {
protected final LockFactory buildLockFactory() throws IOException {
String fsLock = componentSettings.get("lock", componentSettings.get("fs_lock", "native"));
LockFactory lockFactory = NoLockFactory.getNoLockFactory();
if (fsLock.equals("native")) {
@ -70,10 +71,14 @@ public abstract class FsDirectoryService extends AbstractIndexShardComponent imp
}
return lockFactory;
}
@Override
public void renameFile(Directory dir, String from, String to) throws IOException {
File directory = ((RateLimitedFSDirectory) dir).wrappedDirectory().getDirectory();
public final void renameFile(Directory dir, String from, String to) throws IOException {
final FSDirectory fsDirectory = FilterDirectory.getLeaf(dir, FSDirectory.class);
if (fsDirectory == null) {
throw new ElasticSearchIllegalArgumentException("Can not rename file on non-filesystem based directory ");
}
File directory = fsDirectory.getDirectory();
File old = new File(directory, from);
File nu = new File(directory, to);
if (nu.exists())
@ -102,8 +107,11 @@ public abstract class FsDirectoryService extends AbstractIndexShardComponent imp
}
@Override
public void fullDelete(Directory dir) throws IOException {
FSDirectory fsDirectory = ((RateLimitedFSDirectory) dir).wrappedDirectory();
public final void fullDelete(Directory dir) throws IOException {
final FSDirectory fsDirectory = FilterDirectory.getLeaf(dir, FSDirectory.class);
if (fsDirectory == null) {
throw new ElasticSearchIllegalArgumentException("Can not fully delete on non-filesystem based directory");
}
FileSystemUtils.deleteRecursively(fsDirectory.getDirectory());
// if we are the last ones, delete also the actual index
String[] list = fsDirectory.getDirectory().getParentFile().list();
@ -127,7 +135,7 @@ public abstract class FsDirectoryService extends AbstractIndexShardComponent imp
protected abstract FSDirectory newFSDirectory(File location, LockFactory lockFactory) throws IOException;
@Override
public void onPause(long nanos) {
public final void onPause(long nanos) {
rateLimitingTimeInNanos.inc(nanos);
}
}

View File

@ -31,7 +31,7 @@ import org.elasticsearch.indices.store.IndicesStore;
/**
*
*/
public class MmapFsIndexStore extends FsIndexStore {
public final class MmapFsIndexStore extends FsIndexStore {
@Inject
public MmapFsIndexStore(Index index, @IndexSettings Settings indexSettings, IndexService indexService, IndicesStore indicesStore, NodeEnvironment nodeEnv) {

View File

@ -31,7 +31,7 @@ import org.elasticsearch.indices.store.IndicesStore;
/**
*
*/
public class NioFsIndexStore extends FsIndexStore {
public final class NioFsIndexStore extends FsIndexStore {
@Inject
public NioFsIndexStore(Index index, @IndexSettings Settings indexSettings, IndexService indexService, IndicesStore indicesStore, NodeEnvironment nodeEnv) {

View File

@ -31,7 +31,7 @@ import org.elasticsearch.indices.store.IndicesStore;
/**
*
*/
public class SimpleFsIndexStore extends FsIndexStore {
public final class SimpleFsIndexStore extends FsIndexStore {
@Inject
public SimpleFsIndexStore(Index index, @IndexSettings Settings indexSettings, IndexService indexService, IndicesStore indicesStore, NodeEnvironment nodeEnv) {

View File

@ -20,6 +20,7 @@
package org.elasticsearch.index.store.memory;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FilterDirectory;
import org.apache.lucene.store.bytebuffer.ByteBufferAllocator;
import org.apache.lucene.store.bytebuffer.ByteBufferDirectory;
import org.apache.lucene.store.bytebuffer.ByteBufferFile;
@ -30,19 +31,18 @@ import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.DirectoryService;
import org.elasticsearch.index.store.IndexStore;
import java.io.FileNotFoundException;
import java.io.IOException;
/**
*/
public class ByteBufferDirectoryService extends AbstractIndexShardComponent implements DirectoryService {
public final class ByteBufferDirectoryService extends AbstractIndexShardComponent implements DirectoryService {
private final ByteBufferCache byteBufferCache;
@Inject
public ByteBufferDirectoryService(ShardId shardId, @IndexSettings Settings indexSettings, IndexStore indexStore, ByteBufferCache byteBufferCache) {
public ByteBufferDirectoryService(ShardId shardId, @IndexSettings Settings indexSettings, ByteBufferCache byteBufferCache) {
super(shardId, indexSettings);
this.byteBufferCache = byteBufferCache;
}
@ -59,7 +59,9 @@ public class ByteBufferDirectoryService extends AbstractIndexShardComponent impl
@Override
public void renameFile(Directory dir, String from, String to) throws IOException {
((CustomByteBufferDirectory) dir).renameTo(from, to);
CustomByteBufferDirectory leaf = FilterDirectory.getLeaf(dir, CustomByteBufferDirectory.class);
assert leaf != null;
leaf.renameTo(from, to);
}
@Override

View File

@ -20,6 +20,7 @@
package org.elasticsearch.index.store.ram;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FilterDirectory;
import org.apache.lucene.store.RAMDirectory;
import org.apache.lucene.store.RAMFile;
import org.elasticsearch.common.inject.Inject;
@ -34,7 +35,7 @@ import java.io.IOException;
/**
*/
public class RamDirectoryService extends AbstractIndexShardComponent implements DirectoryService {
public final class RamDirectoryService extends AbstractIndexShardComponent implements DirectoryService {
@Inject
public RamDirectoryService(ShardId shardId, @IndexSettings Settings indexSettings) {
@ -53,7 +54,9 @@ public class RamDirectoryService extends AbstractIndexShardComponent implements
@Override
public void renameFile(Directory dir, String from, String to) throws IOException {
((CustomRAMDirectory) dir).renameTo(from, to);
CustomRAMDirectory leaf = FilterDirectory.getLeaf(dir, CustomRAMDirectory.class);
assert leaf != null;
leaf.renameTo(from, to);
}
@Override

View File

@ -23,6 +23,7 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.routing.RoutingNode;
@ -47,7 +48,6 @@ import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.*;
import java.io.IOException;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
@ -158,7 +158,7 @@ public class RecoverySource extends AbstractComponent {
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.FILES_INFO, recoveryInfoFilesRequest, TransportRequestOptions.options().withTimeout(internalActionTimeout), EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
final CountDownLatch latch = new CountDownLatch(response.phase1FileNames.size());
final AtomicReference<Exception> lastException = new AtomicReference<Exception>();
final AtomicReference<Throwable> lastException = new AtomicReference<Throwable>();
int fileIndex = 0;
for (final String name : response.phase1FileNames) {
ThreadPoolExecutor pool;
@ -203,16 +203,10 @@ public class RecoverySource extends AbstractComponent {
TransportRequestOptions.options().withCompress(shouldCompressRequest).withLowType().withTimeout(internalActionTimeout), EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
readCount += toRead;
}
} catch (Exception e) {
} catch (Throwable e) {
lastException.set(e);
} finally {
if (indexInput != null) {
try {
indexInput.close();
} catch (IOException e) {
// ignore
}
}
IOUtils.closeWhileHandlingException(indexInput);
latch.countDown();
}
}

View File

@ -23,8 +23,12 @@ import org.apache.lucene.store.IndexOutput;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.service.InternalIndexShard;
import org.elasticsearch.index.store.Store;
import java.io.IOException;
import java.util.List;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
@ -52,10 +56,10 @@ public class RecoveryStatus {
}
volatile Thread recoveryThread;
volatile boolean canceled;
private volatile boolean canceled;
volatile boolean sentCanceledToSource;
ConcurrentMap<String, IndexOutput> openIndexOutputs = ConcurrentCollections.newConcurrentMap();
private volatile ConcurrentMap<String, IndexOutput> openIndexOutputs = ConcurrentCollections.newConcurrentMap();
ConcurrentMap<String, String> checksums = ConcurrentCollections.newConcurrentMap();
final long startTime = System.currentTimeMillis();
@ -98,4 +102,50 @@ public class RecoveryStatus {
public long currentFilesSize() {
return currentFilesSize.get();
}
public boolean isCanceled() {
return canceled;
}
public synchronized void cancel() {
canceled = true;
}
public IndexOutput getOpenIndexOutput(String key) {
final ConcurrentMap<String, IndexOutput> outputs = openIndexOutputs;
if (canceled || outputs == null) {
return null;
}
return outputs.get(key);
}
public synchronized Set<Entry<String, IndexOutput>> cancleAndClearOpenIndexInputs() {
cancel();
final ConcurrentMap<String, IndexOutput> outputs = openIndexOutputs;
openIndexOutputs = null;
if (outputs == null) {
return null;
}
Set<Entry<String, IndexOutput>> entrySet = outputs.entrySet();
return entrySet;
}
public IndexOutput removeOpenIndexOutputs(String name) {
final ConcurrentMap<String, IndexOutput> outputs = openIndexOutputs;
if (outputs == null) {
return null;
}
return outputs.remove(name);
}
public synchronized IndexOutput openAndPutIndexOutput(String key, String name, Store store) throws IOException {
if (isCanceled()) {
return null;
}
final ConcurrentMap<String, IndexOutput> outputs = openIndexOutputs;
IndexOutput indexOutput = store.createOutputRaw(name);
outputs.put(key, indexOutput);
return indexOutput;
}
}

View File

@ -22,6 +22,7 @@ package org.elasticsearch.indices.recovery;
import com.google.common.collect.Sets;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.StopWatch;
@ -46,10 +47,8 @@ import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.*;
import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.*;
import java.util.Map.Entry;
import static org.elasticsearch.common.unit.TimeValue.timeValueMillis;
@ -127,23 +126,28 @@ public class RecoveryTarget extends AbstractComponent {
if (recoveryStatus.sentCanceledToSource) {
return;
}
recoveryStatus.canceled = true;
if (recoveryStatus.recoveryThread != null) {
recoveryStatus.recoveryThread.interrupt();
}
long time = System.currentTimeMillis();
// give it a grace period of actually getting the sent ack part
while (!recoveryStatus.sentCanceledToSource) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
// ignore
recoveryStatus.cancel();
try {
if (recoveryStatus.recoveryThread != null) {
recoveryStatus.recoveryThread.interrupt();
}
if (System.currentTimeMillis() - time > 10000) {
break;
// give it a grace period of actually getting the sent ack part
final long sleepTime = 100;
final long maxSleepTime = 10000;
long rounds = Math.round(maxSleepTime / sleepTime);
while (!recoveryStatus.sentCanceledToSource && rounds > 0) {
rounds--;
try {
Thread.sleep(sleepTime);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break; // interrupted - step out!
}
}
} finally {
removeAndCleanOnGoingRecovery(recoveryStatus);
}
removeAndCleanOnGoingRecovery(recoveryStatus);
}
public void startRecovery(final StartRecoveryRequest request, final InternalIndexShard indexShard, final RecoveryListener listener) {
@ -188,7 +192,7 @@ public class RecoveryTarget extends AbstractComponent {
listener.onIgnoreRecovery(false, "local shard closed, stop recovery");
return;
}
if (recoveryStatus.canceled) {
if (recoveryStatus.isCanceled()) {
// don't remove it, the cancellation code will remove it...
listener.onIgnoreRecovery(false, "canceled recovery");
return;
@ -230,9 +234,9 @@ public class RecoveryTarget extends AbstractComponent {
}
removeAndCleanOnGoingRecovery(recoveryStatus);
listener.onRecoveryDone();
} catch (Exception e) {
} catch (Throwable e) {
// logger.trace("[{}][{}] Got exception on recovery", e, request.shardId().index().name(), request.shardId().id());
if (recoveryStatus.canceled) {
if (recoveryStatus.isCanceled()) {
// don't remove it, the cancellation code will remove it...
listener.onIgnoreRecovery(false, "canceled recovery");
return;
@ -335,18 +339,18 @@ public class RecoveryTarget extends AbstractComponent {
}
// just mark it as canceled as well, just in case there are in flight requests
// coming from the recovery target
status.canceled = true;
status.cancel();
// clean open index outputs
for (Map.Entry<String, IndexOutput> entry : status.openIndexOutputs.entrySet()) {
Set<Entry<String, IndexOutput>> entrySet = status.cancleAndClearOpenIndexInputs();
Iterator<Entry<String, IndexOutput>> iterator = entrySet.iterator();
while (iterator.hasNext()) {
Map.Entry<String, IndexOutput> entry = iterator.next();
synchronized (entry.getValue()) {
try {
entry.getValue().close();
} catch (Exception e) {
// ignore
}
IOUtils.closeWhileHandlingException(entry.getValue());
}
}
status.openIndexOutputs = null;
iterator.remove();
}
status.checksums = null;
}
@ -369,7 +373,7 @@ public class RecoveryTarget extends AbstractComponent {
// shard is getting closed on us
throw new IndexShardClosedException(request.shardId());
}
if (onGoingRecovery.canceled) {
if (onGoingRecovery.isCanceled()) {
onGoingRecovery.sentCanceledToSource = true;
throw new IndexShardClosedException(request.shardId());
}
@ -400,7 +404,7 @@ public class RecoveryTarget extends AbstractComponent {
// shard is getting closed on us
throw new IndexShardClosedException(request.shardId());
}
if (onGoingRecovery.canceled) {
if (onGoingRecovery.isCanceled()) {
onGoingRecovery.sentCanceledToSource = true;
throw new IndexShardClosedException(request.shardId());
}
@ -433,14 +437,14 @@ public class RecoveryTarget extends AbstractComponent {
// shard is getting closed on us
throw new IndexShardClosedException(request.shardId());
}
if (onGoingRecovery.canceled) {
if (onGoingRecovery.isCanceled()) {
onGoingRecovery.sentCanceledToSource = true;
throw new IndexShardClosedException(request.shardId());
}
InternalIndexShard shard = (InternalIndexShard) indicesService.indexServiceSafe(request.shardId().index().name()).shardSafe(request.shardId().id());
for (Translog.Operation operation : request.operations()) {
if (onGoingRecovery.canceled) {
if (onGoingRecovery.isCanceled()) {
onGoingRecovery.sentCanceledToSource = true;
throw new IndexShardClosedException(request.shardId());
}
@ -470,7 +474,7 @@ public class RecoveryTarget extends AbstractComponent {
// shard is getting closed on us
throw new IndexShardClosedException(request.shardId());
}
if (onGoingRecovery.canceled) {
if (onGoingRecovery.isCanceled()) {
onGoingRecovery.sentCanceledToSource = true;
throw new IndexShardClosedException(request.shardId());
}
@ -505,7 +509,7 @@ public class RecoveryTarget extends AbstractComponent {
// shard is getting closed on us
throw new IndexShardClosedException(request.shardId());
}
if (onGoingRecovery.canceled) {
if (onGoingRecovery.isCanceled()) {
onGoingRecovery.sentCanceledToSource = true;
throw new IndexShardClosedException(request.shardId());
}
@ -577,7 +581,7 @@ public class RecoveryTarget extends AbstractComponent {
// shard is getting closed on us
throw new IndexShardClosedException(request.shardId());
}
if (onGoingRecovery.canceled) {
if (onGoingRecovery.isCanceled()) {
onGoingRecovery.sentCanceledToSource = true;
throw new IndexShardClosedException(request.shardId());
}
@ -588,14 +592,8 @@ public class RecoveryTarget extends AbstractComponent {
if (request.position() == 0) {
// first request
onGoingRecovery.checksums.remove(request.name());
indexOutput = onGoingRecovery.openIndexOutputs.remove(request.name());
if (indexOutput != null) {
try {
indexOutput.close();
} catch (IOException e) {
// ignore
}
}
indexOutput = onGoingRecovery.removeOpenIndexOutputs(request.name());
IOUtils.closeWhileHandlingException(indexOutput);
// we create an output with no checksum, this is because the pure binary data of the file is not
// the checksum (because of seek). We will create the checksum file once copying is done
@ -604,20 +602,19 @@ public class RecoveryTarget extends AbstractComponent {
// we only want to overwrite the index files once we copied all over, and not create a
// case where the index is half moved
String name = request.name();
if (store.directory().fileExists(name)) {
name = "recovery." + onGoingRecovery.startTime + "." + name;
String fileName = request.name();
if (store.directory().fileExists(fileName)) {
fileName = "recovery." + onGoingRecovery.startTime + "." + fileName;
}
indexOutput = store.createOutputRaw(name);
onGoingRecovery.openIndexOutputs.put(request.name(), indexOutput);
indexOutput = onGoingRecovery.openAndPutIndexOutput(request.name(), fileName, store);
} else {
indexOutput = onGoingRecovery.openIndexOutputs.get(request.name());
indexOutput = onGoingRecovery.getOpenIndexOutput(request.name());
}
if (indexOutput == null) {
// shard is getting closed on us
throw new IndexShardClosedException(request.shardId());
}
boolean success = false;
synchronized (indexOutput) {
try {
if (recoverySettings.rateLimiter() != null) {
@ -637,18 +634,23 @@ public class RecoveryTarget extends AbstractComponent {
onGoingRecovery.checksums.put(request.name(), request.checksum());
}
store.directory().sync(Collections.singleton(request.name()));
onGoingRecovery.openIndexOutputs.remove(request.name());
IndexOutput remove = onGoingRecovery.removeOpenIndexOutputs(request.name());
assert remove == indexOutput;
}
} catch (IOException e) {
onGoingRecovery.openIndexOutputs.remove(request.name());
try {
indexOutput.close();
} catch (IOException e1) {
// ignore
}
throw e;
success = true;
} finally {
if (!success || onGoingRecovery.isCanceled()) {
IndexOutput remove = onGoingRecovery.removeOpenIndexOutputs(request.name());
assert remove == indexOutput;
IOUtils.closeWhileHandlingException(indexOutput);
}
}
}
if (onGoingRecovery.isCanceled()) {
onGoingRecovery.sentCanceledToSource = true;
throw new IndexShardClosedException(request.shardId());
}
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}
}

View File

@ -566,7 +566,7 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
}
}
}
private static final int[] EMPTY_DOC_IDS = new int[0];
/**

View File

@ -19,10 +19,7 @@
package org.apache.lucene.util;
import com.carrotsearch.randomizedtesting.JUnit4MethodProvider;
import com.carrotsearch.randomizedtesting.LifecycleScope;
import com.carrotsearch.randomizedtesting.RandomizedContext;
import com.carrotsearch.randomizedtesting.RandomizedTest;
import com.carrotsearch.randomizedtesting.*;
import com.carrotsearch.randomizedtesting.annotations.Listeners;
import com.carrotsearch.randomizedtesting.annotations.TestGroup;
import com.carrotsearch.randomizedtesting.annotations.TestMethodProviders;
@ -286,6 +283,7 @@ public class AbstractRandomizedTest extends RandomizedTest {
@Before
public void setUp() throws Exception {
parentChainCallRule.setupCalled = true;
currentSeed = SeedUtils.parseSeed(getContext().getRunnerSeedAsString());
}
/**
@ -294,6 +292,7 @@ public class AbstractRandomizedTest extends RandomizedTest {
@After
public void tearDown() throws Exception {
parentChainCallRule.teardownCalled = true;
currentSeed = null;
}
@ -334,4 +333,10 @@ public class AbstractRandomizedTest extends RandomizedTest {
public String getTestName() {
return threadAndTestNameRule.testMethodName;
}
private static volatile Long currentSeed;
public static Long getCurrentSeed() {
return currentSeed;
}
}

View File

@ -34,6 +34,7 @@ import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.ClearScrollResponse;
import org.elasticsearch.action.support.broadcast.BroadcastOperationRequestBuilder;
import org.elasticsearch.action.support.broadcast.BroadcastOperationResponse;
import org.elasticsearch.client.AdminClient;
@ -54,6 +55,7 @@ import org.elasticsearch.indices.IndexMissingException;
import org.elasticsearch.indices.IndexTemplateMissingException;
import org.junit.*;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.ExecutionException;
@ -99,7 +101,7 @@ public abstract class AbstractSharedClusterTest extends ElasticsearchTestCase {
}
@After
public void after() {
public void after() throws IOException {
logger.info("Cleaning up after test.");
MetaData metaData = client().admin().cluster().prepareState().execute().actionGet().getState().getMetaData();
assertThat("test leaves persistent cluster metadata behind: " + metaData.persistentSettings().getAsMap(), metaData
@ -108,6 +110,7 @@ public abstract class AbstractSharedClusterTest extends ElasticsearchTestCase {
.persistentSettings().getAsMap().size(), equalTo(0));
wipeIndices(); // wipe after to make sure we fail in the test that didn't ack the delete
wipeTemplates();
ensureAllFilesClosed();
}
public static TestCluster cluster() {
@ -392,5 +395,11 @@ public abstract class AbstractSharedClusterTest extends ElasticsearchTestCase {
client().admin().indices().prepareRefresh(index).execute().get();
}
}
public void clearScroll(String... scrollIds) {
ClearScrollResponse clearResponse = client().prepareClearScroll()
.setScrollIds(Arrays.asList(scrollIds)).get();
assertThat(clearResponse.isSucceeded(), equalTo(true));
}
}

View File

@ -18,19 +18,21 @@
*/
package org.elasticsearch;
import com.carrotsearch.randomizedtesting.annotations.Listeners;
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakScope;
import com.carrotsearch.randomizedtesting.annotations.*;
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakScope.Scope;
import com.carrotsearch.randomizedtesting.annotations.TimeoutSuite;
import com.google.common.base.Predicate;
import org.apache.lucene.store.MockDirectoryWrapper;
import org.apache.lucene.util.AbstractRandomizedTest;
import org.apache.lucene.util.TimeUnits;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.index.store.mock.MockDirectoryHelper;
import org.elasticsearch.junit.listeners.LoggingListener;
import org.junit.BeforeClass;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.util.Random;
import java.util.concurrent.TimeUnit;
@ -44,7 +46,7 @@ public abstract class ElasticsearchTestCase extends AbstractRandomizedTest {
protected final ESLogger logger = Loggers.getLogger(getClass());
public static final String CHILD_VM_ID = System.getProperty("junit4.childvm.id", "" + System.currentTimeMillis());
public void awaitBusy(Predicate<?> breakPredicate) throws InterruptedException {
awaitBusy(breakPredicate, 10, TimeUnit.SECONDS);
}
@ -65,7 +67,6 @@ public abstract class ElasticsearchTestCase extends AbstractRandomizedTest {
timeInMillis = maxTimeInMillis - sum;
Thread.sleep(Math.max(timeInMillis, 0));
return breakPredicate.apply(null);
}
private static final String[] numericTypes = new String[] {"byte", "short", "integer", "long"};
@ -85,5 +86,40 @@ public abstract class ElasticsearchTestCase extends AbstractRandomizedTest {
URI uri = URI.create(getClass().getResource(relativePath).toString());
return new File(uri);
}
public static void ensureAllFilesClosed() throws IOException {
try {
for (MockDirectoryWrapper w : MockDirectoryHelper.wrappers) {
if (w.isOpen()) {
w.close();
}
}
} finally {
forceClearMockWrappers();
}
}
public static void forceClearMockWrappers() {
MockDirectoryHelper.wrappers.clear();
}
public static boolean hasUnclosedWrapper() {
for (MockDirectoryWrapper w : MockDirectoryHelper.wrappers) {
if (w.isOpen()) {
return true;
}
}
return false;
}
@BeforeClass
public static void registerMockDirectoryHooks() throws Exception {
closeAfterSuite(new Closeable() {
@Override
public void close() throws IOException {
ensureAllFilesClosed();
}
});
}
}

View File

@ -39,6 +39,7 @@ import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.store.mock.MockFSIndexStoreModule;
import org.elasticsearch.node.Node;
import org.elasticsearch.node.internal.InternalNode;
import org.elasticsearch.transport.TransportService;
@ -68,13 +69,12 @@ public class TestCluster {
private final AtomicBoolean open = new AtomicBoolean(true);
private final Settings defaultSettings;
private NodeAndClient clientNode;
private Random random;
private ClientFactory clientFactory;
private AtomicInteger nextNodeId = new AtomicInteger(0);
@ -97,7 +97,10 @@ public class TestCluster {
// decrease the routing schedule so new nodes will be added quickly
defaultSettings = settingsBuilder().put(defaultSettings).put("cluster.routing.schedule", "50ms").build();
}
this.defaultSettings = ImmutableSettings.settingsBuilder().put(defaultSettings).put("cluster.name", clusterName).build();
// TODO once we are reproducible here use MockRamIndexStoreModule
this.defaultSettings = ImmutableSettings.settingsBuilder()
.put("index.store.type", MockFSIndexStoreModule.class.getName())
.put(defaultSettings).put("cluster.name", clusterName).build();
}

View File

@ -1,5 +1,6 @@
package org.elasticsearch.cluster.routing.allocation;
import org.elasticsearch.ElasticsearchTestCase;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.routing.MutableShardRouting;
@ -17,14 +18,13 @@ import static org.elasticsearch.cluster.metadata.MetaData.newMetaDataBuilder;
import static org.elasticsearch.cluster.node.DiscoveryNodes.newNodesBuilder;
import static org.elasticsearch.cluster.routing.RoutingBuilders.routingTable;
import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING;
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
import static org.elasticsearch.cluster.routing.allocation.RoutingAllocationTests.newNode;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
import static org.hamcrest.Matchers.equalTo;
/**
*/
public class SameShardRoutingTests {
public class SameShardRoutingTests extends ElasticsearchTestCase {
private final ESLogger logger = Loggers.getLogger(SameShardRoutingTests.class);

View File

@ -0,0 +1,126 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.store.mock;
import com.carrotsearch.randomizedtesting.SeedUtils;
import org.apache.lucene.store.*;
import org.apache.lucene.store.MockDirectoryWrapper.Throttling;
import org.apache.lucene.util.AbstractRandomizedTest;
import org.apache.lucene.util.Constants;
import org.elasticsearch.cache.memory.ByteBufferCache;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.DirectoryService;
import org.elasticsearch.index.store.IndexStore;
import org.elasticsearch.index.store.fs.FsDirectoryService;
import org.elasticsearch.index.store.fs.MmapFsDirectoryService;
import org.elasticsearch.index.store.fs.NioFsDirectoryService;
import org.elasticsearch.index.store.fs.SimpleFsDirectoryService;
import org.elasticsearch.index.store.memory.ByteBufferDirectoryService;
import org.elasticsearch.index.store.ram.RamDirectoryService;
import java.util.Random;
import java.util.Set;
public class MockDirectoryHelper {
public static final String RANDOM_IO_EXCEPTION_RATE = "store.mock.random.io_exception_rate";
public static final String RANDOM_IO_EXCEPTION_RATE_ON_OPEN = "store.mock.random.io_exception_rate_on_open";
public static final String RANDOM_SEED = "store.mock.random.seed";
public static final String RANDOM_THROTTLE = "store.mock.random.throttle";
public static final String CHECK_INDEX_ON_CLOSE = "store.mock.check_index_on_close";
public static final Set<MockDirectoryWrapper> wrappers = ConcurrentCollections.newConcurrentSet();
private final Random random;
private final double randomIOExceptionRate;
private final double randomIOExceptionRateOnOpen;
private final Throttling throttle;
private final boolean checkIndexOnClose;
private Settings indexSettings;
private ShardId shardId;
public MockDirectoryHelper(ShardId shardId, Settings indexSettings, ESLogger logger) {
randomIOExceptionRate = indexSettings.getAsDouble(RANDOM_IO_EXCEPTION_RATE, 0.0d);
randomIOExceptionRateOnOpen = indexSettings.getAsDouble(RANDOM_IO_EXCEPTION_RATE_ON_OPEN, 0.0d);
final Long currentSeed = AbstractRandomizedTest.getCurrentSeed();
assert currentSeed != null;
final long seed = indexSettings.getAsLong(RANDOM_SEED, currentSeed);
random = new Random(seed);
random.nextInt(shardId.getId() + 1); // some randomness per shard
throttle = Throttling.valueOf(indexSettings.get(RANDOM_THROTTLE, random.nextDouble() < 0.1 ? "SOMETIMES" : "NEVER"));
checkIndexOnClose = indexSettings.getAsBoolean(CHECK_INDEX_ON_CLOSE, random.nextDouble() < 0.1);
if (logger.isDebugEnabled()) {
logger.debug("Using MockDirWrapper with seed [{}] throttle: [{}] checkIndexOnClose: [{}]", SeedUtils.formatSeed(seed),
throttle, checkIndexOnClose);
}
this.indexSettings = indexSettings;
this.shardId = shardId;
}
public Directory wrap(Directory dir) {
final MockDirectoryWrapper w = new MockDirectoryWrapper(random, dir);
w.setRandomIOExceptionRate(randomIOExceptionRate);
w.setRandomIOExceptionRateOnOpen(randomIOExceptionRateOnOpen);
w.setThrottling(throttle);
w.setCheckIndexOnClose(checkIndexOnClose);
wrappers.add(w);
return new FilterDirectory(w) {
@Override
public Directory getDelegate() {
// TODO we should port this FilterDirectory to Lucene
return w.getDelegate();
}
};
}
public Directory[] wrapAllInplace(Directory[] dirs) {
for (int i = 0; i < dirs.length; i++) {
dirs[i] = wrap(dirs[i]);
}
return dirs;
}
public FsDirectoryService randomDirectorService(IndexStore indexStore) {
if ((Constants.WINDOWS || Constants.SUN_OS) && Constants.JRE_IS_64BIT && MMapDirectory.UNMAP_SUPPORTED) {
return new MmapFsDirectoryService(shardId, indexSettings, indexStore);
} else if (Constants.WINDOWS) {
return new SimpleFsDirectoryService(shardId, indexSettings, indexStore);
}
switch (random.nextInt(3)) {
case 1:
return new MmapFsDirectoryService(shardId, indexSettings, indexStore);
case 0:
return new SimpleFsDirectoryService(shardId, indexSettings, indexStore);
default:
return new NioFsDirectoryService(shardId, indexSettings, indexStore);
}
}
public DirectoryService randomRamDirecoryService(ByteBufferCache byteBufferCache) {
switch (random.nextInt(2)) {
case 0:
return new RamDirectoryService(shardId, indexSettings);
default:
return new ByteBufferDirectoryService(shardId, indexSettings, byteBufferCache);
}
}
}

View File

@ -0,0 +1,58 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.store.mock;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory;
import org.apache.lucene.store.LockFactory;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.IndexStore;
import org.elasticsearch.index.store.fs.FsDirectoryService;
import java.io.File;
import java.io.IOException;
public class MockFSDirectoryService extends FsDirectoryService {
private final MockDirectoryHelper helper;
private FsDirectoryService delegateService;
@Inject
public MockFSDirectoryService(ShardId shardId, @IndexSettings Settings indexSettings, IndexStore indexStore) {
super(shardId, indexSettings, indexStore);
helper = new MockDirectoryHelper(shardId, indexSettings, logger);
delegateService = helper.randomDirectorService(indexStore);
}
@Override
public Directory[] build() throws IOException {
return helper.wrapAllInplace(delegateService.build());
}
@Override
protected synchronized FSDirectory newFSDirectory(File location, LockFactory lockFactory) throws IOException {
throw new UnsupportedOperationException();
}
}

View File

@ -0,0 +1,43 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.store.mock;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.service.IndexService;
import org.elasticsearch.index.store.DirectoryService;
import org.elasticsearch.index.store.fs.FsIndexStore;
import org.elasticsearch.indices.store.IndicesStore;
public class MockFSIndexStore extends FsIndexStore {
@Inject
public MockFSIndexStore(Index index, Settings indexSettings, IndexService indexService, IndicesStore indicesStore, NodeEnvironment nodeEnv) {
super(index, indexSettings, indexService, indicesStore, nodeEnv);
}
@Override
public Class<? extends DirectoryService> shardDirectory() {
return MockFSDirectoryService.class;
}
}

View File

@ -0,0 +1,32 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.store.mock;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.index.store.IndexStore;
public class MockFSIndexStoreModule extends AbstractModule {
@Override
protected void configure() {
bind(IndexStore.class).to(MockFSIndexStore.class).asEagerSingleton();
}
}

View File

@ -0,0 +1,63 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.store.mock;
import org.apache.lucene.store.Directory;
import org.elasticsearch.cache.memory.ByteBufferCache;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.DirectoryService;
import java.io.IOException;
public class MockRamDirecorySerivce extends AbstractIndexShardComponent implements DirectoryService {
private final MockDirectoryHelper helper;
private final DirectoryService delegateService;
@Inject
public MockRamDirecorySerivce(ShardId shardId, Settings indexSettings, ByteBufferCache byteBufferCache) {
super(shardId, indexSettings);
helper = new MockDirectoryHelper(shardId, indexSettings, logger);
delegateService = helper.randomRamDirecoryService(byteBufferCache);
}
@Override
public Directory[] build() throws IOException {
return helper.wrapAllInplace(delegateService.build());
}
@Override
public long throttleTimeInNanos() {
return delegateService.throttleTimeInNanos();
}
@Override
public void renameFile(Directory dir, String from, String to) throws IOException {
delegateService.renameFile(dir, from, to);
}
@Override
public void fullDelete(Directory dir) throws IOException {
delegateService.fullDelete(dir);
}
}

View File

@ -0,0 +1,61 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.store.mock;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.service.IndexService;
import org.elasticsearch.index.store.DirectoryService;
import org.elasticsearch.index.store.support.AbstractIndexStore;
import org.elasticsearch.indices.store.IndicesStore;
import org.elasticsearch.monitor.jvm.JvmInfo;
import org.elasticsearch.monitor.jvm.JvmStats;
public class MockRamIndexStore extends AbstractIndexStore{
@Inject
public MockRamIndexStore(Index index, Settings indexSettings, IndexService indexService, IndicesStore indicesStore) {
super(index, indexSettings, indexService, indicesStore);
}
@Override
public boolean persistent() {
return false;
}
@Override
public Class<? extends DirectoryService> shardDirectory() {
return MockRamDirecorySerivce.class;
}
@Override
public ByteSizeValue backingStoreTotalSpace() {
return JvmInfo.jvmInfo().getMem().heapMax();
}
@Override
public ByteSizeValue backingStoreFreeSpace() {
return JvmStats.jvmStats().getMem().heapUsed();
}
}

View File

@ -0,0 +1,32 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.store.mock;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.index.store.IndexStore;
public class MockRamIndexStoreModule extends AbstractModule {
@Override
protected void configure() {
bind(IndexStore.class).to(MockRamIndexStore.class).asEagerSingleton();
}
}

View File

@ -19,6 +19,8 @@
package org.elasticsearch.percolator;
import org.apache.lucene.util.LuceneTestCase.Slow;
import org.elasticsearch.AbstractNodesTests;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus;
import org.elasticsearch.action.admin.cluster.node.info.NodeInfo;
@ -37,7 +39,6 @@ import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.gateway.Gateway;
import org.elasticsearch.junit.annotations.TestLogging;
import org.elasticsearch.node.internal.InternalNode;
import org.elasticsearch.AbstractNodesTests;
import org.junit.After;
import org.junit.Test;
@ -50,9 +51,9 @@ import static org.elasticsearch.client.Requests.clusterHealthRequest;
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.index.query.QueryBuilders.*;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
import static org.elasticsearch.percolator.PercolatorTests.convertFromTextArray;
import static org.elasticsearch.percolator.TTLPercolatorTests.ensureGreen;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
import static org.hamcrest.Matchers.*;
public class RecoveryPercolatorTests extends AbstractNodesTests {
@ -79,6 +80,7 @@ public class RecoveryPercolatorTests extends AbstractNodesTests {
}
@Test
@Slow
public void testRestartNodePercolator1() throws Exception {
logger.info("--> cleaning nodes");
buildNode("node1");
@ -129,6 +131,7 @@ public class RecoveryPercolatorTests extends AbstractNodesTests {
}
@Test
@Slow
public void testRestartNodePercolator2() throws Exception {
logger.info("--> cleaning nodes");
buildNode("node1");
@ -212,6 +215,7 @@ public class RecoveryPercolatorTests extends AbstractNodesTests {
}
@Test
@Slow
public void testLoadingPercolateQueriesDuringCloseAndOpen() throws Exception {
Settings settings = settingsBuilder()
.put("gateway.type", "local").build();
@ -268,6 +272,7 @@ public class RecoveryPercolatorTests extends AbstractNodesTests {
}
@Test
@Slow
public void testSinglePercolator_recovery() throws Exception {
percolatorRecovery(false);
}
@ -275,6 +280,7 @@ public class RecoveryPercolatorTests extends AbstractNodesTests {
@Test
// Need to omit org.elast
@TestLogging("action.percolate:TRACE")
@Slow
public void testMultiPercolator_recovery() throws Exception {
percolatorRecovery(true);
}

View File

@ -19,6 +19,7 @@
package org.elasticsearch.search.basic;
import org.apache.lucene.util.LuceneTestCase.Slow;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
@ -44,21 +45,25 @@ public class SearchWhileCreatingIndexTests extends AbstractSharedClusterTest {
}
@Test
@Slow
public void testIndexCausesIndexCreation() throws Exception {
searchWhileCreatingIndex(-1, 1); // 1 replica in our default...
}
@Test
@Slow
public void testNoReplicas() throws Exception {
searchWhileCreatingIndex(10, 0);
}
@Test
@Slow
public void testOneReplica() throws Exception {
searchWhileCreatingIndex(10, 1);
}
@Test
@Slow
public void testTwoReplicas() throws Exception {
searchWhileCreatingIndex(10, 2);
}

View File

@ -19,6 +19,7 @@
package org.elasticsearch.search.facet;
import org.apache.lucene.util.LuceneTestCase.Slow;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
@ -264,6 +265,7 @@ public class SimpleFacetsTests extends AbstractSharedClusterTest {
@Test
@Slow
public void testConcurrentFacets() throws ElasticSearchException, IOException, InterruptedException, ExecutionException {
prepareCreate("test")
.addMapping("type", jsonBuilder().startObject().startObject("type").startObject("properties")
@ -427,6 +429,7 @@ public class SimpleFacetsTests extends AbstractSharedClusterTest {
}
@Test
@Slow
public void testDuelByteFieldDataImpl() throws ElasticSearchException, IOException, InterruptedException, ExecutionException {
prepareCreate("test")
.addMapping("type", jsonBuilder().startObject().startObject("type").startObject("properties")

View File

@ -28,6 +28,7 @@ import org.apache.lucene.spatial.prefix.tree.GeohashPrefixTree;
import org.apache.lucene.spatial.query.SpatialArgs;
import org.apache.lucene.spatial.query.SpatialOperation;
import org.apache.lucene.spatial.query.UnsupportedSpatialOperation;
import org.apache.lucene.util.LuceneTestCase.Slow;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkResponse;
@ -80,11 +81,6 @@ public class GeoFilterTests extends AbstractSharedClusterTest {
withinSupport = testRelationSupport(SpatialOperation.IsWithin);
}
@Override
protected int numberOfNodes() {
return 2;
}
private static byte[] unZipData(String path) throws IOException {
InputStream is = Streams.class.getResourceAsStream(path);
if (is == null) {
@ -394,6 +390,7 @@ public class GeoFilterTests extends AbstractSharedClusterTest {
}
@Test
@Slow
public void bulktest() throws Exception {
byte[] bulkAction = unZipData("/org/elasticsearch/search/geo/gzippedmap.json");

View File

@ -44,7 +44,6 @@ import java.util.Arrays;
import static org.elasticsearch.action.search.SearchType.QUERY_THEN_FETCH;
import static org.elasticsearch.client.Requests.searchRequest;
import static org.elasticsearch.common.unit.TimeValue.timeValueMinutes;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.index.query.QueryBuilders.*;
import static org.elasticsearch.search.builder.SearchSourceBuilder.highlight;
@ -606,7 +605,7 @@ public class HighlighterSearchTests extends AbstractSharedClusterTest {
.field(new HighlightBuilder.Field("field1"))
.field(new HighlightBuilder.Field("field2").preTags("<field2>").postTags("</field2>")));
SearchResponse searchResponse = client().search(searchRequest("test").source(source).searchType(QUERY_THEN_FETCH).scroll(timeValueMinutes(10))).actionGet();
SearchResponse searchResponse = client().search(searchRequest("test").source(source).searchType(QUERY_THEN_FETCH)).actionGet();
assertNoFailures(searchResponse);
assertThat(searchResponse.getHits().totalHits(), equalTo(1l));
@ -629,7 +628,7 @@ public class HighlighterSearchTests extends AbstractSharedClusterTest {
.from(0).size(60).explain(true)
.highlight(highlight().field("field*").order("score").preTags("<xxx>").postTags("</xxx>"));
SearchResponse searchResponse = client().search(searchRequest("test").source(source).searchType(QUERY_THEN_FETCH).scroll(timeValueMinutes(10))).actionGet();
SearchResponse searchResponse = client().search(searchRequest("test").source(source).searchType(QUERY_THEN_FETCH)).actionGet();
assertNoFailures(searchResponse);
assertThat(searchResponse.getHits().totalHits(), equalTo(1l));
@ -652,7 +651,7 @@ public class HighlighterSearchTests extends AbstractSharedClusterTest {
.from(0).size(60).explain(true)
.highlight(highlight().field("field1").order("score").preTags("<xxx>").postTags("</xxx>"));
SearchResponse searchResponse = client().search(searchRequest("test").source(source).searchType(QUERY_THEN_FETCH).scroll(timeValueMinutes(10))).actionGet();
SearchResponse searchResponse = client().search(searchRequest("test").source(source).searchType(QUERY_THEN_FETCH)).actionGet();
assertNoFailures(searchResponse);
assertThat(searchResponse.getHits().totalHits(), equalTo(1l));
@ -664,7 +663,7 @@ public class HighlighterSearchTests extends AbstractSharedClusterTest {
.from(0).size(60).explain(true)
.highlight(highlight().field("field1").order("score").preTags("<xxx>").postTags("</xxx>"));
searchResponse = client().search(searchRequest("test").source(source).searchType(QUERY_THEN_FETCH).scroll(timeValueMinutes(10))).actionGet();
searchResponse = client().search(searchRequest("test").source(source).searchType(QUERY_THEN_FETCH)).actionGet();
assertNoFailures(searchResponse);
assertThat(searchResponse.getHits().totalHits(), equalTo(1l));
@ -676,7 +675,7 @@ public class HighlighterSearchTests extends AbstractSharedClusterTest {
.from(0).size(60).explain(true)
.highlight(highlight().field("field2").order("score").preTags("<xxx>").postTags("</xxx>"));
searchResponse = client().search(searchRequest("test").source(source).searchType(QUERY_THEN_FETCH).scroll(timeValueMinutes(10))).actionGet();
searchResponse = client().search(searchRequest("test").source(source).searchType(QUERY_THEN_FETCH)).actionGet();
assertNoFailures(searchResponse);
assertThat(searchResponse.getHits().totalHits(), equalTo(1l));
@ -688,7 +687,7 @@ public class HighlighterSearchTests extends AbstractSharedClusterTest {
.from(0).size(60).explain(true)
.highlight(highlight().field("field2").order("score").preTags("<xxx>").postTags("</xxx>"));
searchResponse = client().search(searchRequest("test").source(source).searchType(QUERY_THEN_FETCH).scroll(timeValueMinutes(10))).actionGet();
searchResponse = client().search(searchRequest("test").source(source).searchType(QUERY_THEN_FETCH)).actionGet();
assertNoFailures(searchResponse);
assertThat(searchResponse.getHits().totalHits(), equalTo(1l));
@ -700,7 +699,7 @@ public class HighlighterSearchTests extends AbstractSharedClusterTest {
.from(0).size(60).explain(true)
.highlight(highlight().field("field2").order("score").preTags("<xxx>").postTags("</xxx>"));
searchResponse = client().search(searchRequest("test").source(source).searchType(QUERY_THEN_FETCH).scroll(timeValueMinutes(10))).actionGet();
searchResponse = client().search(searchRequest("test").source(source).searchType(QUERY_THEN_FETCH)).actionGet();
assertNoFailures(searchResponse);
assertThat(searchResponse.getHits().totalHits(), equalTo(1l));
@ -712,7 +711,7 @@ public class HighlighterSearchTests extends AbstractSharedClusterTest {
.from(0).size(60).explain(true)
.highlight(highlight().field("field2").order("score").preTags("<xxx>").postTags("</xxx>"));
searchResponse = client().search(searchRequest("test").source(source).searchType(QUERY_THEN_FETCH).scroll(timeValueMinutes(10))).actionGet();
searchResponse = client().search(searchRequest("test").source(source).searchType(QUERY_THEN_FETCH)).actionGet();
assertNoFailures(searchResponse);
assertThat(searchResponse.getHits().totalHits(), equalTo(1l));
assertThat(searchResponse.getHits().getAt(0).highlightFields().get("field2").fragments()[0].string(), equalTo("The <xxx>quick</xxx> brown fox jumps over the lazy dog"));
@ -733,7 +732,7 @@ public class HighlighterSearchTests extends AbstractSharedClusterTest {
.from(0).size(60).explain(true)
.highlight(highlight().field("field1", 100, 0).order("score").preTags("<xxx>").postTags("</xxx>"));
SearchResponse searchResponse = client().search(searchRequest("test").source(source).searchType(QUERY_THEN_FETCH).scroll(timeValueMinutes(10))).actionGet();
SearchResponse searchResponse = client().search(searchRequest("test").source(source).searchType(QUERY_THEN_FETCH)).actionGet();
assertNoFailures(searchResponse);
assertThat(searchResponse.getHits().totalHits(), equalTo(1l));
@ -745,7 +744,7 @@ public class HighlighterSearchTests extends AbstractSharedClusterTest {
.from(0).size(60).explain(true)
.highlight(highlight().field("field1", 100, 0).order("score").preTags("<xxx>").postTags("</xxx>"));
searchResponse = client().search(searchRequest("test").source(source).searchType(QUERY_THEN_FETCH).scroll(timeValueMinutes(10))).actionGet();
searchResponse = client().search(searchRequest("test").source(source).searchType(QUERY_THEN_FETCH)).actionGet();
assertNoFailures(searchResponse);
assertThat(searchResponse.getHits().totalHits(), equalTo(1l));
@ -758,7 +757,7 @@ public class HighlighterSearchTests extends AbstractSharedClusterTest {
.from(0).size(60).explain(true)
.highlight(highlight().field("field2", 100, 0).order("score").preTags("<xxx>").postTags("</xxx>"));
searchResponse = client().search(searchRequest("test").source(source).searchType(QUERY_THEN_FETCH).scroll(timeValueMinutes(10))).actionGet();
searchResponse = client().search(searchRequest("test").source(source).searchType(QUERY_THEN_FETCH)).actionGet();
assertNoFailures(searchResponse);
assertThat(searchResponse.getHits().totalHits(), equalTo(1l));
@ -771,7 +770,7 @@ public class HighlighterSearchTests extends AbstractSharedClusterTest {
.from(0).size(60).explain(true)
.highlight(highlight().field("field2", 100, 0).order("score").preTags("<xxx>").postTags("</xxx>"));
searchResponse = client().search(searchRequest("test").source(source).searchType(QUERY_THEN_FETCH).scroll(timeValueMinutes(10))).actionGet();
searchResponse = client().search(searchRequest("test").source(source).searchType(QUERY_THEN_FETCH)).actionGet();
assertNoFailures(searchResponse);
assertThat(searchResponse.getHits().totalHits(), equalTo(1l));
@ -1258,7 +1257,7 @@ public class HighlighterSearchTests extends AbstractSharedClusterTest {
.from(0).size(60).explain(true)
.highlight(highlight().field("field2").order("score").preTags("<x>").postTags("</x>"));
SearchResponse searchResponse = client().search(searchRequest("test").source(source).searchType(QUERY_THEN_FETCH).scroll(timeValueMinutes(10))).actionGet();
SearchResponse searchResponse = client().search(searchRequest("test").source(source).searchType(QUERY_THEN_FETCH)).actionGet();
assertNoFailures(searchResponse);
assertThat(searchResponse.getHits().totalHits(), equalTo(1l));
@ -1280,7 +1279,7 @@ public class HighlighterSearchTests extends AbstractSharedClusterTest {
.highlight(highlight().field("field2").order("score").preTags("<x>").postTags("</x>"));
SearchResponse searchResponse = client().search(
searchRequest("test").source(source).searchType(QUERY_THEN_FETCH).scroll(timeValueMinutes(10))).actionGet();
searchRequest("test").source(source).searchType(QUERY_THEN_FETCH)).actionGet();
assertNoFailures(searchResponse);
assertThat(searchResponse.getHits().totalHits(), equalTo(1l));
@ -1305,11 +1304,11 @@ public class HighlighterSearchTests extends AbstractSharedClusterTest {
.from(0).size(60).explain(true)
.highlight(highlight().field("field2").order("score").preTags("<x>").postTags("</x>"));
SearchResponse searchResponse = client().search(searchRequest("test").source(source).searchType(QUERY_THEN_FETCH).scroll(timeValueMinutes(10))).actionGet();
assertNoFailures(searchResponse);
assertThat(searchResponse.getHits().totalHits(), equalTo(1l));
assertThat(searchResponse.getHits().getAt(0).highlightFields().get("field2").fragments()[0].string(), equalTo("The <x>quick</x> <x>brown</x> fox jumps over the lazy dog"));
SearchResponse searchResponse = client().search(searchRequest("test").source(source).searchType(QUERY_THEN_FETCH)).actionGet();
// assertNoFailures(searchResponse);
// assertThat(searchResponse.getHits().totalHits(), equalTo(1l));
//
// assertThat(searchResponse.getHits().getAt(0).highlightFields().get("field2").fragments()[0].string(), equalTo("The <x>quick</x> <x>brown</x> fox jumps over the lazy dog"));
}
@Test
@ -1325,7 +1324,7 @@ public class HighlighterSearchTests extends AbstractSharedClusterTest {
.explain(true).highlight(highlight().field("field2").order("score").preTags("<x>").postTags("</x>"));
SearchResponse searchResponse = client().search(
searchRequest("test").source(source).searchType(QUERY_THEN_FETCH).scroll(timeValueMinutes(10))).actionGet();
searchRequest("test").source(source).searchType(QUERY_THEN_FETCH)).actionGet();
assertNoFailures(searchResponse);
assertThat(searchResponse.getHits().totalHits(), equalTo(1l));

View File

@ -22,6 +22,7 @@ package org.elasticsearch.search.rescore;
import org.apache.lucene.util.English;
import org.apache.lucene.util.LuceneTestCase.Slow;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.common.lucene.search.function.CombineFunction;
@ -448,6 +449,7 @@ public class QueryRescorerTests extends AbstractSharedClusterTest {
@Test
@Slow
public void testScoring_withFunctionScore() throws Exception {
client().admin()
.indices()

View File

@ -74,23 +74,26 @@ public class SearchScanScrollingTests extends AbstractSharedClusterTest {
.setSize(size)
.setScroll(TimeValue.timeValueMinutes(2))
.execute().actionGet();
assertThat(searchResponse.getHits().totalHits(), equalTo(numberOfDocs));
// start scrolling, until we get not results
while (true) {
searchResponse = client().prepareSearchScroll(searchResponse.getScrollId()).setScroll(TimeValue.timeValueMinutes(2)).execute().actionGet();
try {
assertThat(searchResponse.getHits().totalHits(), equalTo(numberOfDocs));
assertThat(searchResponse.getFailedShards(), equalTo(0));
for (SearchHit hit : searchResponse.getHits()) {
assertThat(hit.id() + "should not exists in the result set", ids.contains(hit.id()), equalTo(false));
ids.add(hit.id());
}
if (searchResponse.getHits().hits().length == 0) {
break;
// start scrolling, until we get not results
while (true) {
searchResponse = client().prepareSearchScroll(searchResponse.getScrollId()).setScroll(TimeValue.timeValueMinutes(2)).execute().actionGet();
assertThat(searchResponse.getHits().totalHits(), equalTo(numberOfDocs));
assertThat(searchResponse.getFailedShards(), equalTo(0));
for (SearchHit hit : searchResponse.getHits()) {
assertThat(hit.id() + "should not exists in the result set", ids.contains(hit.id()), equalTo(false));
ids.add(hit.id());
}
if (searchResponse.getHits().hits().length == 0) {
break;
}
}
assertThat(expectedIds, equalTo(ids));
} finally {
clearScroll(searchResponse.getScrollId());
}
assertThat(expectedIds, equalTo(ids));
}
}

View File

@ -43,11 +43,6 @@ public class SearchScrollTests extends AbstractSharedClusterTest {
@Test
public void testSimpleScrollQueryThenFetch() throws Exception {
try {
client().admin().indices().prepareDelete("test").execute().actionGet();
} catch (Exception e) {
// ignore
}
client().admin().indices().prepareCreate("test").setSettings(ImmutableSettings.settingsBuilder().put("index.number_of_shards", 3)).execute().actionGet();
client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().execute().actionGet();
@ -65,43 +60,41 @@ public class SearchScrollTests extends AbstractSharedClusterTest {
.setScroll(TimeValue.timeValueMinutes(2))
.addSort("field", SortOrder.ASC)
.execute().actionGet();
long counter = 0;
assertThat(searchResponse.getHits().getTotalHits(), equalTo(100l));
assertThat(searchResponse.getHits().hits().length, equalTo(35));
for (SearchHit hit : searchResponse.getHits()) {
assertThat(((Number) hit.sortValues()[0]).longValue(), equalTo(counter++));
}
searchResponse = client().prepareSearchScroll(searchResponse.getScrollId())
.setScroll(TimeValue.timeValueMinutes(2))
.execute().actionGet();
assertThat(searchResponse.getHits().getTotalHits(), equalTo(100l));
assertThat(searchResponse.getHits().hits().length, equalTo(35));
for (SearchHit hit : searchResponse.getHits()) {
assertThat(((Number) hit.sortValues()[0]).longValue(), equalTo(counter++));
}
searchResponse = client().prepareSearchScroll(searchResponse.getScrollId())
.setScroll(TimeValue.timeValueMinutes(2))
.execute().actionGet();
assertThat(searchResponse.getHits().getTotalHits(), equalTo(100l));
assertThat(searchResponse.getHits().hits().length, equalTo(30));
for (SearchHit hit : searchResponse.getHits()) {
assertThat(((Number) hit.sortValues()[0]).longValue(), equalTo(counter++));
try {
long counter = 0;
assertThat(searchResponse.getHits().getTotalHits(), equalTo(100l));
assertThat(searchResponse.getHits().hits().length, equalTo(35));
for (SearchHit hit : searchResponse.getHits()) {
assertThat(((Number) hit.sortValues()[0]).longValue(), equalTo(counter++));
}
searchResponse = client().prepareSearchScroll(searchResponse.getScrollId())
.setScroll(TimeValue.timeValueMinutes(2))
.execute().actionGet();
assertThat(searchResponse.getHits().getTotalHits(), equalTo(100l));
assertThat(searchResponse.getHits().hits().length, equalTo(35));
for (SearchHit hit : searchResponse.getHits()) {
assertThat(((Number) hit.sortValues()[0]).longValue(), equalTo(counter++));
}
searchResponse = client().prepareSearchScroll(searchResponse.getScrollId())
.setScroll(TimeValue.timeValueMinutes(2))
.execute().actionGet();
assertThat(searchResponse.getHits().getTotalHits(), equalTo(100l));
assertThat(searchResponse.getHits().hits().length, equalTo(30));
for (SearchHit hit : searchResponse.getHits()) {
assertThat(((Number) hit.sortValues()[0]).longValue(), equalTo(counter++));
}
} finally {
clearScroll(searchResponse.getScrollId());
}
}
@Test
public void testSimpleScrollQueryThenFetchSmallSizeUnevenDistribution() throws Exception {
try {
client().admin().indices().prepareDelete("test").execute().actionGet();
} catch (Exception e) {
// ignore
}
client().admin().indices().prepareCreate("test").setSettings(ImmutableSettings.settingsBuilder().put("index.number_of_shards", 3)).execute().actionGet();
client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().execute().actionGet();
@ -126,57 +119,56 @@ public class SearchScrollTests extends AbstractSharedClusterTest {
.setScroll(TimeValue.timeValueMinutes(2))
.addSort("field", SortOrder.ASC)
.execute().actionGet();
long counter = 0;
assertThat(searchResponse.getHits().getTotalHits(), equalTo(100l));
assertThat(searchResponse.getHits().hits().length, equalTo(3));
for (SearchHit hit : searchResponse.getHits()) {
assertThat(((Number) hit.sortValues()[0]).longValue(), equalTo(counter++));
}
for (int i = 0; i < 32; i++) {
searchResponse = client().prepareSearchScroll(searchResponse.getScrollId())
.setScroll(TimeValue.timeValueMinutes(2))
.execute().actionGet();
try {
long counter = 0;
assertThat(searchResponse.getHits().getTotalHits(), equalTo(100l));
assertThat(searchResponse.getHits().hits().length, equalTo(3));
for (SearchHit hit : searchResponse.getHits()) {
assertThat(((Number) hit.sortValues()[0]).longValue(), equalTo(counter++));
}
}
// and now, the last one is one
searchResponse = client().prepareSearchScroll(searchResponse.getScrollId())
.setScroll(TimeValue.timeValueMinutes(2))
.execute().actionGet();
assertThat(searchResponse.getHits().getTotalHits(), equalTo(100l));
assertThat(searchResponse.getHits().hits().length, equalTo(1));
for (SearchHit hit : searchResponse.getHits()) {
assertThat(((Number) hit.sortValues()[0]).longValue(), equalTo(counter++));
}
// a the last is zero
searchResponse = client().prepareSearchScroll(searchResponse.getScrollId())
.setScroll(TimeValue.timeValueMinutes(2))
.execute().actionGet();
assertThat(searchResponse.getHits().getTotalHits(), equalTo(100l));
assertThat(searchResponse.getHits().hits().length, equalTo(0));
for (SearchHit hit : searchResponse.getHits()) {
assertThat(((Number) hit.sortValues()[0]).longValue(), equalTo(counter++));
for (int i = 0; i < 32; i++) {
searchResponse = client().prepareSearchScroll(searchResponse.getScrollId())
.setScroll(TimeValue.timeValueMinutes(2))
.execute().actionGet();
assertThat(searchResponse.getHits().getTotalHits(), equalTo(100l));
assertThat(searchResponse.getHits().hits().length, equalTo(3));
for (SearchHit hit : searchResponse.getHits()) {
assertThat(((Number) hit.sortValues()[0]).longValue(), equalTo(counter++));
}
}
// and now, the last one is one
searchResponse = client().prepareSearchScroll(searchResponse.getScrollId())
.setScroll(TimeValue.timeValueMinutes(2))
.execute().actionGet();
assertThat(searchResponse.getHits().getTotalHits(), equalTo(100l));
assertThat(searchResponse.getHits().hits().length, equalTo(1));
for (SearchHit hit : searchResponse.getHits()) {
assertThat(((Number) hit.sortValues()[0]).longValue(), equalTo(counter++));
}
// a the last is zero
searchResponse = client().prepareSearchScroll(searchResponse.getScrollId())
.setScroll(TimeValue.timeValueMinutes(2))
.execute().actionGet();
assertThat(searchResponse.getHits().getTotalHits(), equalTo(100l));
assertThat(searchResponse.getHits().hits().length, equalTo(0));
for (SearchHit hit : searchResponse.getHits()) {
assertThat(((Number) hit.sortValues()[0]).longValue(), equalTo(counter++));
}
} finally {
clearScroll(searchResponse.getScrollId());
}
}
@Test
public void testScrollAndUpdateIndex() throws Exception {
try {
client().admin().indices().prepareDelete("test").execute().actionGet();
} catch (Exception e) {
// ignore
}
client().admin().indices().prepareCreate("test").setSettings(ImmutableSettings.settingsBuilder().put("index.number_of_shards", 5)).execute().actionGet();
client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().execute().actionGet();
@ -199,23 +191,25 @@ public class SearchScrollTests extends AbstractSharedClusterTest {
.setScroll(TimeValue.timeValueMinutes(2))
.addSort("postDate", SortOrder.ASC)
.execute().actionGet();
do {
for (SearchHit searchHit : searchResponse.getHits().hits()) {
Map<String, Object> map = searchHit.sourceAsMap();
map.put("message", "update");
client().prepareIndex("test", "tweet", searchHit.id()).setSource(map).execute().actionGet();
}
searchResponse = client().prepareSearchScroll(searchResponse.getScrollId()).setScroll(TimeValue.timeValueMinutes(2)).execute().actionGet();
} while (searchResponse.getHits().hits().length > 0);
client().admin().indices().prepareRefresh().execute().actionGet();
assertThat(client().prepareCount().setQuery(matchAllQuery()).execute().actionGet().getCount(), equalTo(500l));
assertThat(client().prepareCount().setQuery(termQuery("message", "test")).execute().actionGet().getCount(), equalTo(0l));
assertThat(client().prepareCount().setQuery(termQuery("message", "test")).execute().actionGet().getCount(), equalTo(0l));
assertThat(client().prepareCount().setQuery(termQuery("message", "update")).execute().actionGet().getCount(), equalTo(500l));
assertThat(client().prepareCount().setQuery(termQuery("message", "update")).execute().actionGet().getCount(), equalTo(500l));
try {
do {
for (SearchHit searchHit : searchResponse.getHits().hits()) {
Map<String, Object> map = searchHit.sourceAsMap();
map.put("message", "update");
client().prepareIndex("test", "tweet", searchHit.id()).setSource(map).execute().actionGet();
}
searchResponse = client().prepareSearchScroll(searchResponse.getScrollId()).setScroll(TimeValue.timeValueMinutes(2)).execute().actionGet();
} while (searchResponse.getHits().hits().length > 0);
client().admin().indices().prepareRefresh().execute().actionGet();
assertThat(client().prepareCount().setQuery(matchAllQuery()).execute().actionGet().getCount(), equalTo(500l));
assertThat(client().prepareCount().setQuery(termQuery("message", "test")).execute().actionGet().getCount(), equalTo(0l));
assertThat(client().prepareCount().setQuery(termQuery("message", "test")).execute().actionGet().getCount(), equalTo(0l));
assertThat(client().prepareCount().setQuery(termQuery("message", "update")).execute().actionGet().getCount(), equalTo(500l));
assertThat(client().prepareCount().setQuery(termQuery("message", "update")).execute().actionGet().getCount(), equalTo(500l));
} finally {
clearScroll(searchResponse.getScrollId());
}
}
@Test
@ -391,5 +385,4 @@ public class SearchScrollTests extends AbstractSharedClusterTest {
assertThat(searchResponse2.getHits().getTotalHits(), equalTo(0l));
assertThat(searchResponse2.getHits().hits().length, equalTo(0));
}
}

View File

@ -21,6 +21,7 @@ package org.elasticsearch.search.suggest;
import com.google.common.base.Charsets;
import com.google.common.io.Resources;
import org.apache.lucene.util.LuceneTestCase.Slow;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder;
import org.elasticsearch.action.search.SearchPhaseExecutionException;
@ -392,6 +393,7 @@ public class SuggestSearchTests extends AbstractSharedClusterTest {
@Test
@Slow
public void testMarvelHerosPhraseSuggest() throws ElasticSearchException, IOException {
CreateIndexRequestBuilder builder = prepareCreate("test").setSettings(settingsBuilder()
.put("index.analysis.analyzer.reverse.tokenizer", "standard")

View File

@ -19,6 +19,7 @@
package org.elasticsearch.update;
import org.apache.lucene.util.LuceneTestCase.Slow;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.update.UpdateRequest;
@ -445,6 +446,7 @@ public class UpdateTests extends AbstractSharedClusterTest {
}
@Test
@Slow
public void testConcurrentUpdateWithRetryOnConflict() throws Exception {
final boolean useBulkApi = randomBoolean();
createIndex();