Close Directory / Store once all resources have been released

Currently we close the store and therefor the underlying directory
when the engine / shard is closed ie. during relocation etc. We also
just close it while there are still searches going on and/or we are
recovering from it. The recoveries might fail which is ok but searches
etc. will be working like pending fetch phases.

The contract of the Directory doesn't prevent to read from a stream
that was already opened before the Directory was closed but from a
system boundary perspective and from lifecycles that we test it seems
to be the right thing to do to wait until all resources are released.

Additionally it will also help to make sure everything is closed
properly before directories are closed itself.

Note: this commit adds Object#wait & Object@#notify/All to forbidden APIs

Closes #5432
This commit is contained in:
Simon Willnauer 2013-12-14 20:46:16 +01:00
parent ec8f404ac7
commit 2398bb4f1c
18 changed files with 729 additions and 509 deletions

View File

@ -33,3 +33,10 @@ org.apache.lucene.index.IndexWriter#forceMergeDeletes(boolean) @ use Merges#forc
@defaultMessage QueryWrapperFilter is cachable by default - use Queries#wrap instead
org.apache.lucene.search.QueryWrapperFilter#<init>(org.apache.lucene.search.Query)
@defaultMessage Only use wait / notify when really needed try to use concurrency primitives, latches or callbacks instead.
java.lang.Object#wait()
java.lang.Object#wait(long)
java.lang.Object#wait(long,int)
java.lang.Object#notify()
java.lang.Object#notifyAll()

View File

@ -19,6 +19,7 @@
package org.elasticsearch.common.blobstore.fs;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.ImmutableBlobContainer;
@ -50,11 +51,13 @@ public class FsImmutableBlobContainer extends AbstractFsBlobContainer implements
raf = new RandomAccessFile(file, "rw");
// clean the file if it exists
raf.setLength(0);
} catch (Exception e) {
} catch (Throwable e) {
listener.onFailure(e);
return;
}
boolean success = false;
try {
boolean innerSuccess = false;
try {
long bytesWritten = 0;
byte[] buffer = new byte[blobStore.bufferSizeInBytes()];
@ -67,21 +70,18 @@ public class FsImmutableBlobContainer extends AbstractFsBlobContainer implements
listener.onFailure(new ElasticsearchIllegalStateException("[" + blobName + "]: wrote [" + bytesWritten + "], expected to write [" + sizeInBytes + "]"));
return;
}
innerSuccess = true;
} finally {
try {
is.close();
} catch (IOException ex) {
// do nothing
}
try {
raf.close();
} catch (IOException ex) {
// do nothing
if (innerSuccess) {
IOUtils.close(is, raf);
} else {
IOUtils.closeWhileHandlingException(is, raf);
}
}
FileSystemUtils.syncFile(file);
listener.onCompleted();
} catch (Exception e) {
success = true;
} catch (Throwable e) {
listener.onFailure(e);
// just on the safe size, try and delete it on failure
try {
if (file.exists()) {
@ -90,7 +90,10 @@ public class FsImmutableBlobContainer extends AbstractFsBlobContainer implements
} catch (Exception e1) {
// ignore
}
listener.onFailure(e);
} finally {
if (success) {
listener.onCompleted();
}
}
}
});

View File

@ -37,6 +37,8 @@ import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Preconditions;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.lucene.HashedBytesRef;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.lucene.SegmentReaderUtils;
@ -199,6 +201,7 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
if (failOnMergeFailure) {
this.mergeScheduler.addFailureListener(new FailEngineOnMergeFailure());
}
store.incRef();
}
@Override
@ -345,14 +348,14 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
try {
docIdAndVersion = Versions.loadDocIdAndVersion(searcher.reader(), get.uid());
} catch (Throwable e) {
searcher.release();
Releasables.releaseWhileHandlingException(searcher);
//TODO: A better exception goes here
throw new EngineException(shardId(), "Couldn't resolve version", e);
}
if (get.version() != Versions.MATCH_ANY && docIdAndVersion != null) {
if (get.versionType().isVersionConflict(docIdAndVersion.version, get.version())) {
searcher.release();
Releasables.release(searcher);
Uid uid = Uid.createUid(get.uid().text());
throw new VersionConflictEngineException(shardId, uid.type(), uid.id(), docIdAndVersion.version, get.version());
}
@ -362,7 +365,7 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
// don't release the searcher on this path, it is the responsability of the caller to call GetResult.release
return new GetResult(searcher, docIdAndVersion);
} else {
searcher.release();
Releasables.release(searcher);
return GetResult.NOT_EXISTS;
}
@ -1005,25 +1008,22 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
@Override
public <T> T snapshot(SnapshotHandler<T> snapshotHandler) throws EngineException {
SnapshotIndexCommit snapshotIndexCommit = null;
Translog.Snapshot traslogSnapshot = null;
Translog.Snapshot translogSnapshot = null;
rwl.readLock().lock();
try {
snapshotIndexCommit = deletionPolicy.snapshot();
traslogSnapshot = translog.snapshot();
translogSnapshot = translog.snapshot();
} catch (Throwable e) {
if (snapshotIndexCommit != null) {
snapshotIndexCommit.release();
}
Releasables.releaseWhileHandlingException(snapshotIndexCommit);
throw new SnapshotFailedEngineException(shardId, e);
} finally {
rwl.readLock().unlock();
}
try {
return snapshotHandler.snapshot(snapshotIndexCommit, traslogSnapshot);
return snapshotHandler.snapshot(snapshotIndexCommit, translogSnapshot);
} finally {
snapshotIndexCommit.release();
traslogSnapshot.release();
Releasables.release(snapshotIndexCommit, translogSnapshot);
}
}
@ -1050,7 +1050,7 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
if (closed) {
throw new EngineClosedException(shardId);
}
onGoingRecoveries.increment();
onGoingRecoveries.startRecovery();
} finally {
rwl.writeLock().unlock();
}
@ -1059,15 +1059,14 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
try {
phase1Snapshot = deletionPolicy.snapshot();
} catch (Throwable e) {
onGoingRecoveries.decrement();
Releasables.releaseWhileHandlingException(onGoingRecoveries);
throw new RecoveryEngineException(shardId, 1, "Snapshot failed", e);
}
try {
recoveryHandler.phase1(phase1Snapshot);
} catch (Throwable e) {
onGoingRecoveries.decrement();
phase1Snapshot.release();
Releasables.releaseWhileHandlingException(onGoingRecoveries, phase1Snapshot);
if (closed) {
e = new EngineClosedException(shardId, e);
}
@ -1078,8 +1077,7 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
try {
phase2Snapshot = translog.snapshot();
} catch (Throwable e) {
onGoingRecoveries.decrement();
phase1Snapshot.release();
Releasables.releaseWhileHandlingException(onGoingRecoveries, phase1Snapshot);
if (closed) {
e = new EngineClosedException(shardId, e);
}
@ -1089,9 +1087,7 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
try {
recoveryHandler.phase2(phase2Snapshot);
} catch (Throwable e) {
onGoingRecoveries.decrement();
phase1Snapshot.release();
phase2Snapshot.release();
Releasables.releaseWhileHandlingException(onGoingRecoveries, phase1Snapshot, phase2Snapshot);
if (closed) {
e = new EngineClosedException(shardId, e);
}
@ -1100,19 +1096,17 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
rwl.writeLock().lock();
Translog.Snapshot phase3Snapshot = null;
boolean success = false;
try {
phase3Snapshot = translog.snapshot(phase2Snapshot);
recoveryHandler.phase3(phase3Snapshot);
success = true;
} catch (Throwable e) {
throw new RecoveryEngineException(shardId, 3, "Execution failed", e);
} finally {
onGoingRecoveries.decrement();
Releasables.release(success, onGoingRecoveries);
rwl.writeLock().unlock();
phase1Snapshot.release();
phase2Snapshot.release();
if (phase3Snapshot != null) {
phase3Snapshot.release();
}
Releasables.release(success, phase1Snapshot, phase2Snapshot, phase3Snapshot);
}
}
@ -1233,17 +1227,6 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
} finally {
rwl.writeLock().unlock();
}
try {
// wait for recoveries to join and close all resources / IO streams
int ongoingRecoveries = onGoingRecoveries.awaitNoRecoveries(5000);
if (ongoingRecoveries > 0) {
logger.debug("Waiting for ongoing recoveries timed out on close currently ongoing disoveries: [{}]", ongoingRecoveries);
}
} catch (InterruptedException e) {
// ignore & restore interrupt
Thread.currentThread().interrupt();
}
}
class FailEngineOnMergeFailure implements MergeSchedulerProvider.FailureListener {
@ -1263,36 +1246,37 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
for (FailedEngineListener listener : failedEngineListeners) {
listener.onFailedEngine(shardId, failure);
}
// TODO - should we acquire the writeLock here?
innerClose();
}
}
private void innerClose() {
if (closed) {
return;
}
indexSettingsService.removeListener(applySettings);
closed = true;
this.versionMap.clear();
this.failedEngineListeners.clear();
try {
if (!closed) {
try {
IOUtils.close(searcherManager);
} catch (Throwable t) {
logger.warn("Failed to close SearcherManager", t);
}
// no need to commit in this case!, we snapshot before we close the shard, so translog and all sync'ed
if (indexWriter != null) {
closed = true;
indexSettingsService.removeListener(applySettings);
this.versionMap.clear();
this.failedEngineListeners.clear();
try {
indexWriter.rollback();
} catch (AlreadyClosedException e) {
// ignore
IOUtils.close(searcherManager);
} catch (Throwable t) {
logger.warn("Failed to close SearcherManager", t);
}
// no need to commit in this case!, we snapshot before we close the shard, so translog and all sync'ed
if (indexWriter != null) {
try {
indexWriter.rollback();
} catch (AlreadyClosedException e) {
// ignore
}
}
} catch (Throwable e) {
logger.warn("failed to rollback writer on close", e);
} finally {
indexWriter = null;
store.decRef();
}
} catch (Throwable e) {
logger.warn("failed to rollback writer on close", e);
} finally {
indexWriter = null;
}
}
@ -1472,6 +1456,7 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
this.searcher = searcher;
this.manager = manager;
this.released = new AtomicBoolean(false);
store.incRef();
}
@Override
@ -1509,6 +1494,8 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
* underlying store / directory and we call into the
* IndexWriter to free up pending files. */
return false;
} finally {
store.decRef();
}
}
}
@ -1597,9 +1584,7 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
}
} finally {
// no need to release the fullSearcher, nothing really is done...
if (currentSearcher != null) {
currentSearcher.release();
}
Releasables.release(currentSearcher);
if (newSearcher != null && closeNewSearcher) {
IOUtils.closeWhileHandlingException(newSearcher.getIndexReader()); // ignore
}
@ -1609,31 +1594,28 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
}
}
private static final class RecoveryCounter {
private volatile int ongoingRecoveries = 0;
private final class RecoveryCounter implements Releasable {
private final AtomicInteger onGoingRecoveries = new AtomicInteger();
synchronized void increment() {
ongoingRecoveries++;
public void startRecovery() {
store.incRef();
onGoingRecoveries.incrementAndGet();
}
synchronized void decrement() {
ongoingRecoveries--;
if (ongoingRecoveries == 0) {
notifyAll(); // notify waiting threads - we only wait on ongoingRecoveries == 0
}
assert ongoingRecoveries >= 0 : "ongoingRecoveries must be >= 0 but was: " + ongoingRecoveries;
public int get() {
return onGoingRecoveries.get();
}
int get() {
// volatile read - no sync needed
return ongoingRecoveries;
public void endRecovery() throws ElasticsearchException {
store.decRef();
onGoingRecoveries.decrementAndGet();
assert onGoingRecoveries.get() >= 0 : "ongoingRecoveries must be >= 0 but was: " + onGoingRecoveries.get();
}
synchronized int awaitNoRecoveries(long timeout) throws InterruptedException {
if (ongoingRecoveries > 0) { // no loop here - we either time out or we are done!
wait(timeout);
}
return ongoingRecoveries;
@Override
public boolean release() throws ElasticsearchException {
endRecovery();
return true;
}
}

View File

@ -408,113 +408,117 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements
*/
public void snapshot(SnapshotIndexCommit snapshotIndexCommit) {
logger.debug("[{}] [{}] snapshot to [{}] ...", shardId, snapshotId, repositoryName);
final ImmutableMap<String, BlobMetaData> blobs;
store.incRef();
try {
blobs = blobContainer.listBlobs();
} catch (IOException e) {
throw new IndexShardSnapshotFailedException(shardId, "failed to list blobs", e);
}
long generation = findLatestFileNameGeneration(blobs);
BlobStoreIndexShardSnapshots snapshots = buildBlobStoreIndexShardSnapshots(blobs);
final CopyOnWriteArrayList<Throwable> failures = new CopyOnWriteArrayList<Throwable>();
final List<BlobStoreIndexShardSnapshot.FileInfo> indexCommitPointFiles = newArrayList();
int indexNumberOfFiles = 0;
long indexTotalFilesSize = 0;
ArrayList<FileInfo> filesToSnapshot = newArrayList();
for (String fileName : snapshotIndexCommit.getFiles()) {
if (snapshotStatus.aborted()) {
logger.debug("[{}] [{}] Aborted on the file [{}], exiting", shardId, snapshotId, fileName);
throw new IndexShardSnapshotFailedException(shardId, "Aborted");
}
logger.trace("[{}] [{}] Processing [{}]", shardId, snapshotId, fileName);
final StoreFileMetaData md;
final ImmutableMap<String, BlobMetaData> blobs;
try {
md = store.metaData(fileName);
blobs = blobContainer.listBlobs();
} catch (IOException e) {
throw new IndexShardSnapshotFailedException(shardId, "Failed to get store file metadata", e);
throw new IndexShardSnapshotFailedException(shardId, "failed to list blobs", e);
}
boolean snapshotRequired = false;
// TODO: For now segment files are copied on each commit because segment files don't have checksum
// if (snapshot.indexChanged() && fileName.equals(snapshotIndexCommit.getSegmentsFileName())) {
// snapshotRequired = true; // we want to always snapshot the segment file if the index changed
// }
long generation = findLatestFileNameGeneration(blobs);
BlobStoreIndexShardSnapshots snapshots = buildBlobStoreIndexShardSnapshots(blobs);
BlobStoreIndexShardSnapshot.FileInfo fileInfo = snapshots.findPhysicalIndexFile(fileName);
final CopyOnWriteArrayList<Throwable> failures = new CopyOnWriteArrayList<Throwable>();
final List<BlobStoreIndexShardSnapshot.FileInfo> indexCommitPointFiles = newArrayList();
if (fileInfo == null || !fileInfo.isSame(md) || !snapshotFileExistsInBlobs(fileInfo, blobs)) {
// commit point file does not exists in any commit point, or has different length, or does not fully exists in the listed blobs
snapshotRequired = true;
int indexNumberOfFiles = 0;
long indexTotalFilesSize = 0;
ArrayList<FileInfo> filesToSnapshot = newArrayList();
for (String fileName : snapshotIndexCommit.getFiles()) {
if (snapshotStatus.aborted()) {
logger.debug("[{}] [{}] Aborted on the file [{}], exiting", shardId, snapshotId, fileName);
throw new IndexShardSnapshotFailedException(shardId, "Aborted");
}
logger.trace("[{}] [{}] Processing [{}]", shardId, snapshotId, fileName);
final StoreFileMetaData md;
try {
md = store.metaData(fileName);
} catch (IOException e) {
throw new IndexShardSnapshotFailedException(shardId, "Failed to get store file metadata", e);
}
boolean snapshotRequired = false;
// TODO: For now segment files are copied on each commit because segment files don't have checksum
// if (snapshot.indexChanged() && fileName.equals(snapshotIndexCommit.getSegmentsFileName())) {
// snapshotRequired = true; // we want to always snapshot the segment file if the index changed
// }
BlobStoreIndexShardSnapshot.FileInfo fileInfo = snapshots.findPhysicalIndexFile(fileName);
if (fileInfo == null || !fileInfo.isSame(md) || !snapshotFileExistsInBlobs(fileInfo, blobs)) {
// commit point file does not exists in any commit point, or has different length, or does not fully exists in the listed blobs
snapshotRequired = true;
}
if (snapshotRequired) {
indexNumberOfFiles++;
indexTotalFilesSize += md.length();
// create a new FileInfo
BlobStoreIndexShardSnapshot.FileInfo snapshotFileInfo = new BlobStoreIndexShardSnapshot.FileInfo(fileNameFromGeneration(++generation), fileName, md.length(), chunkSize, md.checksum());
indexCommitPointFiles.add(snapshotFileInfo);
filesToSnapshot.add(snapshotFileInfo);
} else {
indexCommitPointFiles.add(fileInfo);
}
}
if (snapshotRequired) {
indexNumberOfFiles++;
indexTotalFilesSize += md.length();
// create a new FileInfo
BlobStoreIndexShardSnapshot.FileInfo snapshotFileInfo = new BlobStoreIndexShardSnapshot.FileInfo(fileNameFromGeneration(++generation), fileName, md.length(), chunkSize, md.checksum());
indexCommitPointFiles.add(snapshotFileInfo);
filesToSnapshot.add(snapshotFileInfo);
} else {
indexCommitPointFiles.add(fileInfo);
snapshotStatus.files(indexNumberOfFiles, indexTotalFilesSize);
snapshotStatus.updateStage(IndexShardSnapshotStatus.Stage.STARTED);
final CountDownLatch indexLatch = new CountDownLatch(filesToSnapshot.size());
for (FileInfo snapshotFileInfo : filesToSnapshot) {
try {
snapshotFile(snapshotFileInfo, indexLatch, failures);
} catch (IOException e) {
failures.add(e);
}
}
}
snapshotStatus.files(indexNumberOfFiles, indexTotalFilesSize);
snapshotStatus.indexVersion(snapshotIndexCommit.getGeneration());
snapshotStatus.updateStage(IndexShardSnapshotStatus.Stage.STARTED);
final CountDownLatch indexLatch = new CountDownLatch(filesToSnapshot.size());
for (FileInfo snapshotFileInfo : filesToSnapshot) {
try {
snapshotFile(snapshotFileInfo, indexLatch, failures);
} catch (IOException e) {
indexLatch.await();
} catch (InterruptedException e) {
failures.add(e);
Thread.currentThread().interrupt();
}
if (!failures.isEmpty()) {
throw new IndexShardSnapshotFailedException(shardId, "Failed to perform snapshot (index files)", failures.get(0));
}
}
snapshotStatus.indexVersion(snapshotIndexCommit.getGeneration());
// now create and write the commit point
snapshotStatus.updateStage(IndexShardSnapshotStatus.Stage.FINALIZE);
try {
indexLatch.await();
} catch (InterruptedException e) {
failures.add(e);
Thread.currentThread().interrupt();
}
if (!failures.isEmpty()) {
throw new IndexShardSnapshotFailedException(shardId, "Failed to perform snapshot (index files)", failures.get(0));
}
String commitPointName = snapshotBlobName(snapshotId);
BlobStoreIndexShardSnapshot snapshot = new BlobStoreIndexShardSnapshot(snapshotId.getSnapshot(),
snapshotIndexCommit.getGeneration(), indexCommitPointFiles, snapshotStatus.startTime(),
// snapshotStatus.startTime() is assigned on the same machine, so it's safe to use with VLong
System.currentTimeMillis() - snapshotStatus.startTime(), indexNumberOfFiles, indexTotalFilesSize);
//TODO: The time stored in snapshot doesn't include cleanup time.
try {
byte[] snapshotData = writeSnapshot(snapshot);
logger.trace("[{}] [{}] writing shard snapshot file", shardId, snapshotId);
blobContainer.writeBlob(commitPointName, new BytesStreamInput(snapshotData, false), snapshotData.length);
} catch (IOException e) {
throw new IndexShardSnapshotFailedException(shardId, "Failed to write commit point", e);
}
// now create and write the commit point
snapshotStatus.updateStage(IndexShardSnapshotStatus.Stage.FINALIZE);
String commitPointName = snapshotBlobName(snapshotId);
BlobStoreIndexShardSnapshot snapshot = new BlobStoreIndexShardSnapshot(snapshotId.getSnapshot(),
snapshotIndexCommit.getGeneration(), indexCommitPointFiles, snapshotStatus.startTime(),
// snapshotStatus.startTime() is assigned on the same machine, so it's safe to use with VLong
System.currentTimeMillis() - snapshotStatus.startTime(), indexNumberOfFiles, indexTotalFilesSize);
//TODO: The time stored in snapshot doesn't include cleanup time.
try {
byte[] snapshotData = writeSnapshot(snapshot);
logger.trace("[{}] [{}] writing shard snapshot file", shardId, snapshotId);
blobContainer.writeBlob(commitPointName, new BytesStreamInput(snapshotData, false), snapshotData.length);
} catch (IOException e) {
throw new IndexShardSnapshotFailedException(shardId, "Failed to write commit point", e);
// delete all files that are not referenced by any commit point
// build a new BlobStoreIndexShardSnapshot, that includes this one and all the saved ones
List<BlobStoreIndexShardSnapshot> newSnapshotsList = Lists.newArrayList();
newSnapshotsList.add(snapshot);
for (BlobStoreIndexShardSnapshot point : snapshots) {
newSnapshotsList.add(point);
}
cleanup(newSnapshotsList, blobs);
snapshotStatus.updateStage(IndexShardSnapshotStatus.Stage.DONE);
} finally {
store.decRef();
}
// delete all files that are not referenced by any commit point
// build a new BlobStoreIndexShardSnapshot, that includes this one and all the saved ones
List<BlobStoreIndexShardSnapshot> newSnapshotsList = Lists.newArrayList();
newSnapshotsList.add(snapshot);
for (BlobStoreIndexShardSnapshot point : snapshots) {
newSnapshotsList.add(point);
}
cleanup(newSnapshotsList, blobs);
snapshotStatus.updateStage(IndexShardSnapshotStatus.Stage.DONE);
}
/**
@ -533,7 +537,8 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements
for (long i = 0; i < fileInfo.numberOfParts(); i++) {
IndexInput indexInput = null;
try {
indexInput = store.openInputRaw(fileInfo.physicalName(), IOContext.READONCE);
final String file = fileInfo.physicalName();
indexInput = store.openInputRaw(file, IOContext.READONCE);
indexInput.seek(i * fileInfo.partBytes());
InputStreamIndexInput inputStreamIndexInput = new ThreadSafeInputStreamIndexInput(indexInput, fileInfo.partBytes());
@ -545,7 +550,7 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements
} else {
inputStream = inputStreamIndexInput;
}
inputStream = new AbortableInputStream(inputStream, fileInfo.physicalName());
inputStream = new AbortableInputStream(inputStream, file);
blobContainer.writeBlob(fileInfo.partName(i), inputStream, size, new ImmutableBlobContainer.WriterListener() {
@Override
public void onCompleted() {
@ -658,99 +663,104 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements
* Performs restore operation
*/
public void restore() {
logger.debug("[{}] [{}] restoring to [{}] ...", snapshotId, repositoryName, shardId);
BlobStoreIndexShardSnapshot snapshot = loadSnapshot();
store.incRef();
try {
logger.debug("[{}] [{}] restoring to [{}] ...", snapshotId, repositoryName, shardId);
BlobStoreIndexShardSnapshot snapshot = loadSnapshot();
recoveryState.setStage(RecoveryState.Stage.INDEX);
int numberOfFiles = 0;
long totalSize = 0;
int numberOfReusedFiles = 0;
long reusedTotalSize = 0;
recoveryState.setStage(RecoveryState.Stage.INDEX);
int numberOfFiles = 0;
long totalSize = 0;
int numberOfReusedFiles = 0;
long reusedTotalSize = 0;
List<FileInfo> filesToRecover = Lists.newArrayList();
for (FileInfo fileInfo : snapshot.indexFiles()) {
String fileName = fileInfo.physicalName();
StoreFileMetaData md = null;
try {
md = store.metaData(fileName);
} catch (IOException e) {
// no file
}
numberOfFiles++;
// we don't compute checksum for segments, so always recover them
if (!fileName.startsWith("segments") && md != null && fileInfo.isSame(md)) {
totalSize += md.length();
numberOfReusedFiles++;
reusedTotalSize += md.length();
recoveryState.getIndex().addReusedFileDetail(fileInfo.name(), fileInfo.length());
if (logger.isTraceEnabled()) {
logger.trace("not_recovering [{}], exists in local store and is same", fileInfo.physicalName());
}
} else {
totalSize += fileInfo.length();
filesToRecover.add(fileInfo);
recoveryState.getIndex().addFileDetail(fileInfo.name(), fileInfo.length());
if (logger.isTraceEnabled()) {
if (md == null) {
logger.trace("recovering [{}], does not exists in local store", fileInfo.physicalName());
} else {
logger.trace("recovering [{}], exists in local store but is different", fileInfo.physicalName());
}
}
}
}
recoveryState.getIndex().files(numberOfFiles, totalSize, numberOfReusedFiles, reusedTotalSize);
if (filesToRecover.isEmpty()) {
logger.trace("no files to recover, all exists within the local store");
}
if (logger.isTraceEnabled()) {
logger.trace("[{}] [{}] recovering_files [{}] with total_size [{}], reusing_files [{}] with reused_size [{}]", shardId, snapshotId, numberOfFiles, new ByteSizeValue(totalSize), numberOfReusedFiles, new ByteSizeValue(reusedTotalSize));
}
final CountDownLatch latch = new CountDownLatch(filesToRecover.size());
final CopyOnWriteArrayList<Throwable> failures = new CopyOnWriteArrayList<Throwable>();
for (final FileInfo fileToRecover : filesToRecover) {
logger.trace("[{}] [{}] restoring file [{}]", shardId, snapshotId, fileToRecover.name());
restoreFile(fileToRecover, latch, failures);
}
List<FileInfo> filesToRecover = Lists.newArrayList();
for (FileInfo fileInfo : snapshot.indexFiles()) {
String fileName = fileInfo.physicalName();
StoreFileMetaData md = null;
try {
md = store.metaData(fileName);
latch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
if (!failures.isEmpty()) {
throw new IndexShardRestoreFailedException(shardId, "Failed to recover index", failures.get(0));
}
// read the snapshot data persisted
long version = -1;
try {
if (Lucene.indexExists(store.directory())) {
version = Lucene.readSegmentInfos(store.directory()).getVersion();
}
} catch (IOException e) {
// no file
throw new IndexShardRestoreFailedException(shardId, "Failed to fetch index version after copying it over", e);
}
numberOfFiles++;
// we don't compute checksum for segments, so always recover them
if (!fileName.startsWith("segments") && md != null && fileInfo.isSame(md)) {
totalSize += md.length();
numberOfReusedFiles++;
reusedTotalSize += md.length();
recoveryState.getIndex().addReusedFileDetail(fileInfo.name(), fileInfo.length());
if (logger.isTraceEnabled()) {
logger.trace("not_recovering [{}], exists in local store and is same", fileInfo.physicalName());
}
} else {
totalSize += fileInfo.length();
filesToRecover.add(fileInfo);
recoveryState.getIndex().addFileDetail(fileInfo.name(), fileInfo.length());
if (logger.isTraceEnabled()) {
if (md == null) {
logger.trace("recovering [{}], does not exists in local store", fileInfo.physicalName());
} else {
logger.trace("recovering [{}], exists in local store but is different", fileInfo.physicalName());
recoveryState.getIndex().updateVersion(version);
/// now, go over and clean files that are in the store, but were not in the snapshot
try {
for (String storeFile : store.directory().listAll()) {
if (!snapshot.containPhysicalIndexFile(storeFile)) {
try {
store.directory().deleteFile(storeFile);
} catch (IOException e) {
// ignore
}
}
}
} catch (IOException e) {
// ignore
}
}
recoveryState.getIndex().files(numberOfFiles, totalSize, numberOfReusedFiles, reusedTotalSize);
if (filesToRecover.isEmpty()) {
logger.trace("no files to recover, all exists within the local store");
}
if (logger.isTraceEnabled()) {
logger.trace("[{}] [{}] recovering_files [{}] with total_size [{}], reusing_files [{}] with reused_size [{}]", shardId, snapshotId, numberOfFiles, new ByteSizeValue(totalSize), numberOfReusedFiles, new ByteSizeValue(reusedTotalSize));
}
final CountDownLatch latch = new CountDownLatch(filesToRecover.size());
final CopyOnWriteArrayList<Throwable> failures = new CopyOnWriteArrayList<Throwable>();
for (final FileInfo fileToRecover : filesToRecover) {
logger.trace("[{}] [{}] restoring file [{}]", shardId, snapshotId, fileToRecover.name());
restoreFile(fileToRecover, latch, failures);
}
try {
latch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
if (!failures.isEmpty()) {
throw new IndexShardRestoreFailedException(shardId, "Failed to recover index", failures.get(0));
}
// read the snapshot data persisted
long version = -1;
try {
if (Lucene.indexExists(store.directory())) {
version = Lucene.readSegmentInfos(store.directory()).getVersion();
}
} catch (IOException e) {
throw new IndexShardRestoreFailedException(shardId, "Failed to fetch index version after copying it over", e);
}
recoveryState.getIndex().updateVersion(version);
/// now, go over and clean files that are in the store, but were not in the snapshot
try {
for (String storeFile : store.directory().listAll()) {
if (!snapshot.containPhysicalIndexFile(storeFile)) {
try {
store.directory().deleteFile(storeFile);
} catch (IOException e) {
// ignore
}
}
}
} catch (IOException e) {
// ignore
} finally {
store.decRef();
}
}
@ -770,60 +780,76 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements
// the checksum (because of seek). We will create the checksum file once copying is done
indexOutput = store.createOutputRaw(fileInfo.physicalName());
} catch (IOException e) {
failures.add(e);
latch.countDown();
try {
failures.add(e);
} finally {
latch.countDown();
}
return;
}
String firstFileToRecover = fileInfo.partName(0);
final AtomicInteger partIndex = new AtomicInteger();
blobContainer.readBlob(firstFileToRecover, new BlobContainer.ReadBlobListener() {
@Override
public synchronized void onPartial(byte[] data, int offset, int size) throws IOException {
recoveryState.getIndex().addRecoveredByteCount(size);
RecoveryState.File file = recoveryState.getIndex().file(fileInfo.name());
if (file != null) {
file.updateRecovered(size);
}
indexOutput.writeBytes(data, offset, size);
if (restoreRateLimiter != null) {
rateLimiterListener.onRestorePause(restoreRateLimiter.pause(size));
}
}
@Override
public synchronized void onCompleted() {
int part = partIndex.incrementAndGet();
if (part < fileInfo.numberOfParts()) {
String partName = fileInfo.partName(part);
// continue with the new part
blobContainer.readBlob(partName, this);
return;
} else {
// we are done...
try {
indexOutput.close();
// write the checksum
if (fileInfo.checksum() != null) {
store.writeChecksum(fileInfo.physicalName(), fileInfo.checksum());
}
store.directory().sync(Collections.singleton(fileInfo.physicalName()));
recoveryState.getIndex().addRecoveredFileCount(1);
} catch (IOException e) {
onFailure(e);
return;
boolean success = false;
try {
blobContainer.readBlob(firstFileToRecover, new BlobContainer.ReadBlobListener() {
@Override
public synchronized void onPartial(byte[] data, int offset, int size) throws IOException {
recoveryState.getIndex().addRecoveredByteCount(size);
RecoveryState.File file = recoveryState.getIndex().file(fileInfo.name());
if (file != null) {
file.updateRecovered(size);
}
indexOutput.writeBytes(data, offset, size);
if (restoreRateLimiter != null) {
rateLimiterListener.onRestorePause(restoreRateLimiter.pause(size));
}
}
latch.countDown();
}
@Override
public void onFailure(Throwable t) {
failures.add(t);
@Override
public synchronized void onCompleted() {
int part = partIndex.incrementAndGet();
if (part < fileInfo.numberOfParts()) {
String partName = fileInfo.partName(part);
// continue with the new part
blobContainer.readBlob(partName, this);
return;
} else {
// we are done...
try {
indexOutput.close();
// write the checksum
if (fileInfo.checksum() != null) {
store.writeChecksum(fileInfo.physicalName(), fileInfo.checksum());
}
store.directory().sync(Collections.singleton(fileInfo.physicalName()));
recoveryState.getIndex().addRecoveredFileCount(1);
} catch (IOException e) {
onFailure(e);
return;
}
}
latch.countDown();
}
@Override
public void onFailure(Throwable t) {
try {
IOUtils.closeWhileHandlingException(indexOutput);
failures.add(t);
} finally {
latch.countDown();
}
}
});
success = true;
} finally {
if (!success) {
IOUtils.closeWhileHandlingException(indexOutput);
latch.countDown();
}
});
}
}
}

View File

@ -50,6 +50,8 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.zip.Adler32;
/**
@ -61,6 +63,8 @@ public class Store extends AbstractIndexShardComponent implements CloseableIndex
public static final boolean isChecksum(String name) {
return name.startsWith(CHECKSUMS_PREFIX);
}
private final AtomicBoolean isClosed = new AtomicBoolean(false);
private final AtomicInteger refCount = new AtomicInteger(1);
private final IndexStore indexStore;
final CodecService codecService;
@ -84,14 +88,23 @@ public class Store extends AbstractIndexShardComponent implements CloseableIndex
}
public IndexStore indexStore() {
ensureOpen();
return this.indexStore;
}
public Directory directory() {
ensureOpen();
return directory;
}
private final void ensureOpen() {
if (this.refCount.get() <= 0) {
throw new AlreadyClosedException("Store is already closed");
}
}
public ImmutableMap<String, StoreFileMetaData> list() throws IOException {
ensureOpen();
ImmutableMap.Builder<String, StoreFileMetaData> builder = ImmutableMap.builder();
for (String name : files) {
StoreFileMetaData md = metaData(name);
@ -103,6 +116,7 @@ public class Store extends AbstractIndexShardComponent implements CloseableIndex
}
public StoreFileMetaData metaData(String name) throws IOException {
ensureOpen();
StoreFileMetaData md = filesMetadata.get(name);
if (md == null) {
return null;
@ -118,6 +132,7 @@ public class Store extends AbstractIndexShardComponent implements CloseableIndex
* Deletes the content of a shard store. Be careful calling this!.
*/
public void deleteContent() throws IOException {
ensureOpen();
String[] files = directory.listAll();
IOException lastException = null;
for (String file : files) {
@ -143,14 +158,17 @@ public class Store extends AbstractIndexShardComponent implements CloseableIndex
}
public StoreStats stats() throws IOException {
ensureOpen();
return new StoreStats(Directories.estimateSize(directory), directoryService.throttleTimeInNanos());
}
public ByteSizeValue estimateSize() throws IOException {
ensureOpen();
return new ByteSizeValue(Directories.estimateSize(directory));
}
public void renameFile(String from, String to) throws IOException {
ensureOpen();
synchronized (mutex) {
StoreFileMetaData fromMetaData = filesMetadata.get(from); // we should always find this one
if (fromMetaData == null) {
@ -171,19 +189,11 @@ public class Store extends AbstractIndexShardComponent implements CloseableIndex
}
return readChecksums(dirs, null);
} finally {
for (Directory dir : dirs) {
if (dir != null) {
try {
dir.close();
} catch (IOException e) {
// ignore
}
}
}
IOUtils.closeWhileHandlingException(dirs);
}
}
static Map<String, String> readChecksums(Directory[] dirs, Map<String, String> defaultValue) throws IOException {
private static Map<String, String> readChecksums(Directory[] dirs, Map<String, String> defaultValue) throws IOException {
long lastFound = -1;
Directory lastDir = null;
for (Directory dir : dirs) {
@ -214,6 +224,7 @@ public class Store extends AbstractIndexShardComponent implements CloseableIndex
}
public void writeChecksums() throws IOException {
ensureOpen();
ImmutableMap<String, StoreFileMetaData> files = list();
String checksumName = CHECKSUMS_PREFIX + System.currentTimeMillis();
synchronized (mutex) {
@ -253,18 +264,51 @@ public class Store extends AbstractIndexShardComponent implements CloseableIndex
return false;
}
public final void incRef() {
do {
int i = refCount.get();
if (i > 0) {
if (refCount.compareAndSet(i, i+1)) {
return;
}
} else {
throw new AlreadyClosedException("Store is already closed can't increment refCount current count [" + i + "]");
}
} while(true);
}
public final void decRef() {
int i = refCount.decrementAndGet();
assert i >= 0;
if (i == 0) {
closeInternal();
}
}
public void close() {
try {
directory.close();
} catch (IOException e) {
logger.debug("failed to close directory", e);
if (isClosed.compareAndSet(false, true)) {
// only do this once!
decRef();
}
}
private void closeInternal() {
synchronized (mutex) { // if we close the dir we need to make sure nobody writes checksums
try {
directory.closeInternal(); // don't call close here we throw an exception there!
} catch (IOException e) {
logger.debug("failed to close directory", e);
}
}
}
/**
* Creates a raw output, no checksum is computed, and no compression if enabled.
*/
public IndexOutput createOutputRaw(String name) throws IOException {
ensureOpen();
return directory.createOutput(name, IOContext.DEFAULT, true);
}
@ -272,6 +316,7 @@ public class Store extends AbstractIndexShardComponent implements CloseableIndex
* Opened an index input in raw form, no decompression for example.
*/
public IndexInput openInputRaw(String name, IOContext context) throws IOException {
ensureOpen();
StoreFileMetaData metaData = filesMetadata.get(name);
if (metaData == null) {
throw new FileNotFoundException(name);
@ -280,6 +325,7 @@ public class Store extends AbstractIndexShardComponent implements CloseableIndex
}
public void writeChecksum(String name, String checksum) throws IOException {
ensureOpen();
// update the metadata to include the checksum and write a new checksums file
synchronized (mutex) {
StoreFileMetaData metaData = filesMetadata.get(name);
@ -290,6 +336,7 @@ public class Store extends AbstractIndexShardComponent implements CloseableIndex
}
public void writeChecksums(Map<String, String> checksums) throws IOException {
ensureOpen();
// update the metadata to include the checksum and write a new checksums file
synchronized (mutex) {
for (Map.Entry<String, String> entry : checksums.entrySet()) {
@ -506,14 +553,20 @@ public class Store extends AbstractIndexShardComponent implements CloseableIndex
}
@Override
public synchronized void close() throws IOException {
isOpen = false;
for (Directory delegate : distributor.all()) {
delegate.close();
}
synchronized (mutex) {
filesMetadata = ImmutableOpenMap.of();
files = Strings.EMPTY_ARRAY;
public void close() throws IOException {
assert false : "Nobody should close this directory except of the Store itself";
}
synchronized void closeInternal() throws IOException {
if (isOpen) {
isOpen = false;
for (Directory delegate : distributor.all()) {
delegate.close();
}
synchronized (mutex) {
filesMetadata = ImmutableOpenMap.of();
files = Strings.EMPTY_ARRAY;
}
}
}

View File

@ -42,6 +42,7 @@ import org.elasticsearch.index.shard.IllegalIndexShardStateException;
import org.elasticsearch.index.shard.IndexShardClosedException;
import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.shard.service.InternalIndexShard;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.store.StoreFileMetaData;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.indices.IndicesService;
@ -119,11 +120,13 @@ public class RecoverySource extends AbstractComponent {
public void phase1(final SnapshotIndexCommit snapshot) throws ElasticsearchException {
long totalSize = 0;
long existingTotalSize = 0;
final Store store = shard.store();
store.incRef();
try {
StopWatch stopWatch = new StopWatch().start();
for (String name : snapshot.getFiles()) {
StoreFileMetaData md = shard.store().metaData(name);
StoreFileMetaData md = store.metaData(name);
boolean useExisting = false;
if (request.existingFiles().containsKey(name)) {
// we don't compute checksum for segments, so always recover them
@ -173,12 +176,13 @@ public class RecoverySource extends AbstractComponent {
@Override
public void run() {
IndexInput indexInput = null;
store.incRef();
try {
final int BUFFER_SIZE = (int) recoverySettings.fileChunkSize().bytes();
byte[] buf = new byte[BUFFER_SIZE];
StoreFileMetaData md = shard.store().metaData(name);
StoreFileMetaData md = store.metaData(name);
// TODO: maybe use IOContext.READONCE?
indexInput = shard.store().openInputRaw(name, IOContext.READ);
indexInput = store.openInputRaw(name, IOContext.READ);
boolean shouldCompressRequest = recoverySettings.compress();
if (CompressorFactory.isCompressed(indexInput)) {
shouldCompressRequest = false;
@ -207,7 +211,11 @@ public class RecoverySource extends AbstractComponent {
lastException.set(e);
} finally {
IOUtils.closeWhileHandlingException(indexInput);
latch.countDown();
try {
store.decRef();
} finally {
latch.countDown();
}
}
}
});
@ -229,6 +237,8 @@ public class RecoverySource extends AbstractComponent {
response.phase1Time = stopWatch.totalTime().millis();
} catch (Throwable e) {
throw new RecoverFilesRecoveryException(request.shardId(), response.phase1FileNames.size(), new ByteSizeValue(totalSize), e);
} finally {
store.decRef();
}
}

View File

@ -523,47 +523,52 @@ public class RecoveryTarget extends AbstractComponent {
throw new IndexShardClosedException(request.shardId());
}
Store store = onGoingRecovery.indexShard.store();
// first, we go and move files that were created with the recovery id suffix to
// the actual names, its ok if we have a corrupted index here, since we have replicas
// to recover from in case of a full cluster shutdown just when this code executes...
String prefix = "recovery." + onGoingRecovery.recoveryState().getTimer().startTime() + ".";
Set<String> filesToRename = Sets.newHashSet();
for (String existingFile : store.directory().listAll()) {
if (existingFile.startsWith(prefix)) {
filesToRename.add(existingFile.substring(prefix.length(), existingFile.length()));
}
}
Exception failureToRename = null;
if (!filesToRename.isEmpty()) {
// first, go and delete the existing ones
final Directory directory = store.directory();
for (String file : filesToRename) {
try {
directory.deleteFile(file);
} catch (Throwable ex) {
logger.debug("failed to delete file [{}]", ex, file);
final Store store = onGoingRecovery.indexShard.store();
store.incRef();
try {
// first, we go and move files that were created with the recovery id suffix to
// the actual names, its ok if we have a corrupted index here, since we have replicas
// to recover from in case of a full cluster shutdown just when this code executes...
String prefix = "recovery." + onGoingRecovery.recoveryState().getTimer().startTime() + ".";
Set<String> filesToRename = Sets.newHashSet();
for (String existingFile : store.directory().listAll()) {
if (existingFile.startsWith(prefix)) {
filesToRename.add(existingFile.substring(prefix.length(), existingFile.length()));
}
}
for (String fileToRename : filesToRename) {
// now, rename the files... and fail it it won't work
store.renameFile(prefix + fileToRename, fileToRename);
Exception failureToRename = null;
if (!filesToRename.isEmpty()) {
// first, go and delete the existing ones
final Directory directory = store.directory();
for (String file : filesToRename) {
try {
directory.deleteFile(file);
} catch (Throwable ex) {
logger.debug("failed to delete file [{}]", ex, file);
}
}
for (String fileToRename : filesToRename) {
// now, rename the files... and fail it it won't work
store.renameFile(prefix + fileToRename, fileToRename);
}
}
}
// now write checksums
store.writeChecksums(onGoingRecovery.checksums);
// now write checksums
store.writeChecksums(onGoingRecovery.checksums);
for (String existingFile : store.directory().listAll()) {
// don't delete snapshot file, or the checksums file (note, this is extra protection since the Store won't delete checksum)
if (!request.snapshotFiles().contains(existingFile) && !Store.isChecksum(existingFile)) {
try {
store.directory().deleteFile(existingFile);
} catch (Exception e) {
// ignore, we don't really care, will get deleted later on
for (String existingFile : store.directory().listAll()) {
// don't delete snapshot file, or the checksums file (note, this is extra protection since the Store won't delete checksum)
if (!request.snapshotFiles().contains(existingFile) && !Store.isChecksum(existingFile)) {
try {
store.directory().deleteFile(existingFile);
} catch (Exception e) {
// ignore, we don't really care, will get deleted later on
}
}
}
channel.sendResponse(TransportResponse.Empty.INSTANCE);
} finally {
store.decRef();
}
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}
}
@ -592,75 +597,79 @@ public class RecoveryTarget extends AbstractComponent {
}
Store store = onGoingRecovery.indexShard.store();
store.incRef();
try {
IndexOutput indexOutput;
if (request.position() == 0) {
// first request
onGoingRecovery.checksums.remove(request.name());
indexOutput = onGoingRecovery.removeOpenIndexOutputs(request.name());
IOUtils.closeWhileHandlingException(indexOutput);
// we create an output with no checksum, this is because the pure binary data of the file is not
// the checksum (because of seek). We will create the checksum file once copying is done
IndexOutput indexOutput;
if (request.position() == 0) {
// first request
onGoingRecovery.checksums.remove(request.name());
indexOutput = onGoingRecovery.removeOpenIndexOutputs(request.name());
IOUtils.closeWhileHandlingException(indexOutput);
// we create an output with no checksum, this is because the pure binary data of the file is not
// the checksum (because of seek). We will create the checksum file once copying is done
// also, we check if the file already exists, if it does, we create a file name based
// on the current recovery "id" and later we make the switch, the reason for that is that
// we only want to overwrite the index files once we copied all over, and not create a
// case where the index is half moved
// also, we check if the file already exists, if it does, we create a file name based
// on the current recovery "id" and later we make the switch, the reason for that is that
// we only want to overwrite the index files once we copied all over, and not create a
// case where the index is half moved
String fileName = request.name();
if (store.directory().fileExists(fileName)) {
fileName = "recovery." + onGoingRecovery.recoveryState().getTimer().startTime() + "." + fileName;
String fileName = request.name();
if (store.directory().fileExists(fileName)) {
fileName = "recovery." + onGoingRecovery.recoveryState().getTimer().startTime() + "." + fileName;
}
indexOutput = onGoingRecovery.openAndPutIndexOutput(request.name(), fileName, store);
} else {
indexOutput = onGoingRecovery.getOpenIndexOutput(request.name());
}
indexOutput = onGoingRecovery.openAndPutIndexOutput(request.name(), fileName, store);
} else {
indexOutput = onGoingRecovery.getOpenIndexOutput(request.name());
}
if (indexOutput == null) {
// shard is getting closed on us
throw new IndexShardClosedException(request.shardId());
}
boolean success = false;
synchronized (indexOutput) {
try {
if (recoverySettings.rateLimiter() != null) {
recoverySettings.rateLimiter().pause(request.content().length());
}
BytesReference content = request.content();
if (!content.hasArray()) {
content = content.toBytesArray();
}
indexOutput.writeBytes(content.array(), content.arrayOffset(), content.length());
onGoingRecovery.recoveryState.getIndex().addRecoveredByteCount(request.length());
RecoveryState.File file = onGoingRecovery.recoveryState.getIndex().file(request.name());
if (file != null) {
file.updateRecovered(request.length());
}
if (indexOutput.getFilePointer() == request.length()) {
// we are done
indexOutput.close();
// write the checksum
if (request.checksum() != null) {
onGoingRecovery.checksums.put(request.name(), request.checksum());
if (indexOutput == null) {
// shard is getting closed on us
throw new IndexShardClosedException(request.shardId());
}
boolean success = false;
synchronized (indexOutput) {
try {
if (recoverySettings.rateLimiter() != null) {
recoverySettings.rateLimiter().pause(request.content().length());
}
BytesReference content = request.content();
if (!content.hasArray()) {
content = content.toBytesArray();
}
indexOutput.writeBytes(content.array(), content.arrayOffset(), content.length());
onGoingRecovery.recoveryState.getIndex().addRecoveredByteCount(request.length());
RecoveryState.File file = onGoingRecovery.recoveryState.getIndex().file(request.name());
if (file != null) {
file.updateRecovered(request.length());
}
if (indexOutput.getFilePointer() == request.length()) {
// we are done
indexOutput.close();
// write the checksum
if (request.checksum() != null) {
onGoingRecovery.checksums.put(request.name(), request.checksum());
}
store.directory().sync(Collections.singleton(request.name()));
IndexOutput remove = onGoingRecovery.removeOpenIndexOutputs(request.name());
onGoingRecovery.recoveryState.getIndex().addRecoveredFileCount(1);
assert remove == null || remove == indexOutput; // remove maybe null if we got canceled
}
success = true;
} finally {
if (!success || onGoingRecovery.isCanceled()) {
IndexOutput remove = onGoingRecovery.removeOpenIndexOutputs(request.name());
assert remove == null || remove == indexOutput;
IOUtils.closeWhileHandlingException(indexOutput);
}
store.directory().sync(Collections.singleton(request.name()));
IndexOutput remove = onGoingRecovery.removeOpenIndexOutputs(request.name());
onGoingRecovery.recoveryState.getIndex().addRecoveredFileCount(1);
assert remove == indexOutput;
}
success = true;
} finally {
if (!success || onGoingRecovery.isCanceled()) {
IndexOutput remove = onGoingRecovery.removeOpenIndexOutputs(request.name());
assert remove == indexOutput;
IOUtils.closeWhileHandlingException(indexOutput);
}
}
if (onGoingRecovery.isCanceled()) {
onGoingRecovery.sentCanceledToSource = true;
throw new IndexShardClosedException(request.shardId());
}
channel.sendResponse(TransportResponse.Empty.INSTANCE);
} finally {
store.decRef();
}
if (onGoingRecovery.isCanceled()) {
onGoingRecovery.sentCanceledToSource = true;
throw new IndexShardClosedException(request.shardId());
}
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}
}
}

View File

@ -36,6 +36,7 @@ import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope;
import org.elasticsearch.test.ElasticsearchIntegrationTest.Scope;
import org.elasticsearch.test.TestCluster.RestartCallback;
import org.elasticsearch.test.store.MockDirectoryHelper;
import org.junit.Test;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
@ -304,8 +305,8 @@ public class SimpleRecoveryLocalGatewayTests extends ElasticsearchIntegrationTes
ImmutableSettings.Builder settings = settingsBuilder()
.put("action.admin.cluster.node.shutdown.delay", "10ms")
.put("gateway.recover_after_nodes", 4)
.put(BalancedShardsAllocator.SETTING_THRESHOLD, 1.1f); // use less aggressive settings
.put(MockDirectoryHelper.CRASH_INDEX, false)
.put(BalancedShardsAllocator.SETTING_THRESHOLD, 1.1f); // use less agressive settings
cluster().startNode(settings);
cluster().startNode(settings);
@ -319,6 +320,8 @@ public class SimpleRecoveryLocalGatewayTests extends ElasticsearchIntegrationTes
client().admin().indices().prepareFlush().execute().actionGet();
}
}
client().admin().indices().prepareFlush().execute().actionGet();
logger.info("Running Cluster Health");
ensureGreen();

View File

@ -70,9 +70,7 @@ public class UpdateNumberOfReplicasTests extends ElasticsearchIntegrationTest {
}
logger.info("Increasing the number of replicas from 1 to 2");
client().admin().indices().prepareUpdateSettings("test").setSettings(settingsBuilder().put("index.number_of_replicas", 2)).execute().actionGet();
Thread.sleep(200);
assertAcked(client().admin().indices().prepareUpdateSettings("test").setSettings(settingsBuilder().put("index.number_of_replicas", 2)).execute().actionGet());
logger.info("Running Cluster Health");
clusterHealth = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForYellowStatus().setWaitForActiveShards(numShards.numPrimaries * 2).execute().actionGet();
logger.info("Done Cluster Health, status " + clusterHealth.getStatus());
@ -85,10 +83,9 @@ public class UpdateNumberOfReplicasTests extends ElasticsearchIntegrationTest {
logger.info("starting another node to new replicas will be allocated to it");
allowNodes("test", 3);
Thread.sleep(100);
logger.info("Running Cluster Health");
clusterHealth = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().setWaitForNodes(">=3").execute().actionGet();
clusterHealth = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().setWaitForRelocatingShards(0).setWaitForNodes(">=3").execute().actionGet();
logger.info("Done Cluster Health, status " + clusterHealth.getStatus());
assertThat(clusterHealth.isTimedOut(), equalTo(false));
assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.GREEN));
@ -103,11 +100,10 @@ public class UpdateNumberOfReplicasTests extends ElasticsearchIntegrationTest {
}
logger.info("Decreasing number of replicas from 2 to 0");
client().admin().indices().prepareUpdateSettings("test").setSettings(settingsBuilder().put("index.number_of_replicas", 0)).get();
Thread.sleep(200);
assertAcked(client().admin().indices().prepareUpdateSettings("test").setSettings(settingsBuilder().put("index.number_of_replicas", 0)).get());
logger.info("Running Cluster Health");
clusterHealth = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().setWaitForNodes(">=3").execute().actionGet();
clusterHealth = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().setWaitForRelocatingShards(0).setWaitForNodes(">=3").execute().actionGet();
logger.info("Done Cluster Health, status " + clusterHealth.getStatus());
assertThat(clusterHealth.isTimedOut(), equalTo(false));
assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.GREEN));

View File

@ -38,6 +38,7 @@ import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.elasticsearch.test.engine.MockInternalEngine;
import org.elasticsearch.test.engine.ThrowingAtomicReaderWrapper;
import org.elasticsearch.test.store.MockDirectoryHelper;
import org.elasticsearch.test.store.MockFSDirectoryService;
import org.hamcrest.Matchers;
import org.junit.Test;
@ -83,16 +84,42 @@ public class SearchWithRandomExceptionsTests extends ElasticsearchIntegrationTes
exceptionRate = 0d;
exceptionOnOpenRate = 0d;
}
Builder settings = settingsBuilder()
.put(indexSettings())
.put(MockDirectoryHelper.RANDOM_IO_EXCEPTION_RATE, exceptionRate)
.put(MockDirectoryHelper.RANDOM_IO_EXCEPTION_RATE_ON_OPEN, exceptionOnOpenRate)
.put(MockDirectoryHelper.CHECK_INDEX_ON_CLOSE, true);
logger.info("creating index: [test] using settings: [{}]", settings.build().getAsMap());
assertAcked(prepareCreate("test")
.setSettings(settings)
.addMapping("type", mapping));
boolean createIndexWithoutErrors = randomBoolean();
long numInitialDocs = 0;
if (createIndexWithoutErrors) {
Builder settings = settingsBuilder()
.put("index.number_of_replicas", randomIntBetween(0, 1))
.put(MockFSDirectoryService.CHECK_INDEX_ON_CLOSE, true)
.put("gateway.type", "local");
logger.info("creating index: [test] using settings: [{}]", settings.build().getAsMap());
client().admin().indices().prepareCreate("test")
.setSettings(settings)
.addMapping("type", mapping).execute().actionGet();
numInitialDocs = between(10, 100);
ensureYellow();
for (int i = 0; i < numInitialDocs ; i++) {
client().prepareIndex("test", "type", "" + i).setTimeout(TimeValue.timeValueSeconds(1)).setSource("test", "init").get();
}
client().admin().indices().prepareRefresh("test").execute().get();
client().admin().indices().prepareFlush("test").execute().get();
client().admin().indices().prepareClose("test").execute().get();
client().admin().indices().prepareUpdateSettings("test").setSettings(settingsBuilder()
.put(MockFSDirectoryService.CHECK_INDEX_ON_CLOSE, true)
.put(MockDirectoryHelper.RANDOM_IO_EXCEPTION_RATE, exceptionRate)
.put(MockDirectoryHelper.RANDOM_IO_EXCEPTION_RATE_ON_OPEN, exceptionOnOpenRate));
client().admin().indices().prepareOpen("test").execute().get();
} else {
Builder settings = settingsBuilder()
.put("index.number_of_replicas", randomIntBetween(0, 1))
.put(MockFSDirectoryService.CHECK_INDEX_ON_CLOSE, false)
.put(MockDirectoryHelper.RANDOM_IO_EXCEPTION_RATE, exceptionRate)
.put(MockDirectoryHelper.RANDOM_IO_EXCEPTION_RATE_ON_OPEN, exceptionOnOpenRate); // we cannot expect that the index will be valid
logger.info("creating index: [test] using settings: [{}]", settings.build().getAsMap());
client().admin().indices().prepareCreate("test")
.setSettings(settings)
.addMapping("type", mapping).execute().actionGet();
}
ClusterHealthResponse clusterHealthResponse = client().admin().cluster()
.health(Requests.clusterHealthRequest().waitForYellowStatus().timeout(TimeValue.timeValueSeconds(5))).get(); // it's OK to timeout here
final int numDocs;
@ -113,7 +140,7 @@ public class SearchWithRandomExceptionsTests extends ElasticsearchIntegrationTes
boolean[] added = new boolean[numDocs];
for (int i = 0; i < numDocs ; i++) {
try {
IndexResponse indexResponse = client().prepareIndex("test", "type", "" + i).setTimeout(TimeValue.timeValueSeconds(1)).setSource("test", English.intToEnglish(i)).get();
IndexResponse indexResponse = client().prepareIndex("test", "type", "" + (i + numInitialDocs)).setTimeout(TimeValue.timeValueSeconds(1)).setSource("test", English.intToEnglish(i)).get();
if (indexResponse.isCreated()) {
numCreated++;
added[i] = true;
@ -143,7 +170,18 @@ public class SearchWithRandomExceptionsTests extends ElasticsearchIntegrationTes
logger.info("expected SearchPhaseException: [{}]", ex.getMessage());
}
}
}
if (createIndexWithoutErrors) {
// check the index still contains the records that we indexed without errors
client().admin().indices().prepareClose("test").execute().get();
client().admin().indices().prepareUpdateSettings("test").setSettings(settingsBuilder()
.put(MockDirectoryHelper.RANDOM_IO_EXCEPTION_RATE, 0)
.put(MockDirectoryHelper.RANDOM_IO_EXCEPTION_RATE_ON_OPEN, 0));
client().admin().indices().prepareOpen("test").execute().get();
ensureYellow();
SearchResponse searchResponse = client().prepareSearch().setQuery(QueryBuilders.matchQuery("test", "init")).get();
assertThat(searchResponse.getHits().totalHits(), Matchers.equalTo(numInitialDocs));
}
}

View File

@ -231,19 +231,23 @@ public class MockRepository extends FsRepository {
}
}
private void maybeIOExceptionOrBlock(String blobName, ImmutableBlobContainer.WriterListener listener) {
private boolean maybeIOExceptionOrBlock(String blobName, ImmutableBlobContainer.WriterListener listener) {
try {
maybeIOExceptionOrBlock(blobName);
return true;
} catch (IOException ex) {
listener.onFailure(ex);
return false;
}
}
private void maybeIOExceptionOrBlock(String blobName, ImmutableBlobContainer.ReadBlobListener listener) {
private boolean maybeIOExceptionOrBlock(String blobName, ImmutableBlobContainer.ReadBlobListener listener) {
try {
maybeIOExceptionOrBlock(blobName);
return true;
} catch (IOException ex) {
listener.onFailure(ex);
return false;
}
}
@ -254,8 +258,9 @@ public class MockRepository extends FsRepository {
@Override
public void writeBlob(String blobName, InputStream is, long sizeInBytes, WriterListener listener) {
maybeIOExceptionOrBlock(blobName, listener);
super.writeBlob(blobName, is, sizeInBytes, listener);
if (maybeIOExceptionOrBlock(blobName, listener) ) {
super.writeBlob(blobName, is, sizeInBytes, listener);
}
}
@Override
@ -271,8 +276,9 @@ public class MockRepository extends FsRepository {
@Override
public void readBlob(String blobName, ReadBlobListener listener) {
maybeIOExceptionOrBlock(blobName, listener);
super.readBlob(blobName, listener);
if (maybeIOExceptionOrBlock(blobName, listener)) {
super.readBlob(blobName, listener);
}
}
@Override

View File

@ -311,13 +311,14 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase
private static void randomIndexTemplate() {
// TODO move settings for random directory etc here into the index based randomized settings.
if (cluster().size() > 0) {
ImmutableSettings.Builder builder = setRandomNormsLoading(setRandomMerge(getRandom(), ImmutableSettings.builder())
.put(INDEX_SEED_SETTING, randomLong()))
.put(SETTING_NUMBER_OF_SHARDS, between(DEFAULT_MIN_NUM_SHARDS, DEFAULT_MAX_NUM_SHARDS))
.put(SETTING_NUMBER_OF_REPLICAS, between(0, 1));
client().admin().indices().preparePutTemplate("random_index_template")
.setTemplate("*")
.setOrder(0)
.setSettings(setRandomNormsLoading(setRandomMerge(getRandom(), ImmutableSettings.builder())
.put(INDEX_SEED_SETTING, randomLong()))
.put(SETTING_NUMBER_OF_SHARDS, between(DEFAULT_MIN_NUM_SHARDS, DEFAULT_MAX_NUM_SHARDS))
.put(SETTING_NUMBER_OF_REPLICAS, between(0, 1)))
.setSettings(builder)
.execute().actionGet();
}
}

View File

@ -52,6 +52,8 @@ import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.TimeUnit;
import static org.hamcrest.Matchers.is;
/**
* Base testcase for randomized unit testing with Elasticsearch
*/
@ -145,10 +147,28 @@ public abstract class ElasticsearchTestCase extends AbstractRandomizedTest {
public static void ensureAllFilesClosed() throws IOException {
try {
for (MockDirectoryHelper.ElasticsearchMockDirectoryWrapper w : MockDirectoryHelper.wrappers) {
if (w.isOpen()) {
w.closeWithRuntimeException();
for (final MockDirectoryHelper.ElasticsearchMockDirectoryWrapper w : MockDirectoryHelper.wrappers) {
try {
awaitBusy(new Predicate<Object>() {
@Override
public boolean apply(Object input) {
return !w.isOpen();
}
});
} catch (InterruptedException e) {
Thread.interrupted();
}
if (!w.successfullyClosed()) {
if (w.closeException() == null) {
w.close();
if (w.closeException() == null) {
throw w.closeException();
}
} else {
throw w.closeException();
}
}
assertThat(w.isOpen(), is(false));
}
} finally {
forceClearMockWrappers();

View File

@ -181,14 +181,16 @@ public final class TestCluster implements Iterable<Client> {
logger.info("Setup TestCluster [{}] with seed [{}] using [{}] nodes", clusterName, SeedUtils.formatSeed(clusterSeed), numSharedNodes);
this.nodeSettingsSource = nodeSettingsSource;
Builder builder = ImmutableSettings.settingsBuilder();
// randomize (multi/single) data path, special case for 0, don't set it at all...
int numOfDataPaths = random.nextInt(5);
if (numOfDataPaths > 0) {
StringBuilder dataPath = new StringBuilder();
for (int i = 0; i < numOfDataPaths; i++) {
dataPath.append("data/d").append(i).append(',');
if (random.nextInt(5) == 0) { // sometimes set this
// randomize (multi/single) data path, special case for 0, don't set it at all...
final int numOfDataPaths = random.nextInt(5);
if (numOfDataPaths > 0) {
StringBuilder dataPath = new StringBuilder();
for (int i = 0; i < numOfDataPaths; i++) {
dataPath.append("data/d").append(i).append(',');
}
builder.put("path.data", dataPath.toString());
}
builder.put("path.data", dataPath.toString());
}
defaultSettings = builder.build();

View File

@ -83,7 +83,8 @@ public final class MockInternalEngine extends InternalEngine implements Engine {
}
public void close() throws ElasticsearchException {
@Override
public void close() {
try {
super.close();
} finally {

View File

@ -36,7 +36,6 @@ import org.elasticsearch.index.store.fs.MmapFsDirectoryService;
import org.elasticsearch.index.store.fs.NioFsDirectoryService;
import org.elasticsearch.index.store.fs.SimpleFsDirectoryService;
import org.elasticsearch.index.store.ram.RamDirectoryService;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import java.io.IOException;
import java.util.Random;
@ -46,40 +45,37 @@ public class MockDirectoryHelper {
public static final String RANDOM_IO_EXCEPTION_RATE = "index.store.mock.random.io_exception_rate";
public static final String RANDOM_IO_EXCEPTION_RATE_ON_OPEN = "index.store.mock.random.io_exception_rate_on_open";
public static final String RANDOM_THROTTLE = "index.store.mock.random.throttle";
public static final String CHECK_INDEX_ON_CLOSE = "index.store.mock.check_index_on_close";
public static final String RANDOM_PREVENT_DOUBLE_WRITE = "index.store.mock.random.prevent_double_write";
public static final String RANDOM_NO_DELETE_OPEN_FILE = "index.store.mock.random.no_delete_open_file";
public static final String RANDOM_FAIL_ON_CLOSE= "index.store.mock.random.fail_on_close";
public static final String CRASH_INDEX = "index.store.mock.random.crash_index";
public static final Set<ElasticsearchMockDirectoryWrapper> wrappers = ConcurrentCollections.newConcurrentSet();
private final Random random;
private final double randomIOExceptionRate;
private final double randomIOExceptionRateOnOpen;
private final Throttling throttle;
private final boolean checkIndexOnClose;
private final Settings indexSettings;
private final ShardId shardId;
private final boolean preventDoubleWrite;
private final boolean noDeleteOpenFile;
private final ESLogger logger;
private final boolean failOnClose;
private final boolean crashIndex;
public MockDirectoryHelper(ShardId shardId, Settings indexSettings, ESLogger logger) {
final long seed = indexSettings.getAsLong(ElasticsearchIntegrationTest.INDEX_SEED_SETTING, 0l);
random = new Random(seed);
public MockDirectoryHelper(ShardId shardId, Settings indexSettings, ESLogger logger, Random random, long seed) {
this.random = random;
randomIOExceptionRate = indexSettings.getAsDouble(RANDOM_IO_EXCEPTION_RATE, 0.0d);
randomIOExceptionRateOnOpen = indexSettings.getAsDouble(RANDOM_IO_EXCEPTION_RATE_ON_OPEN, 0.0d);
preventDoubleWrite = indexSettings.getAsBoolean(RANDOM_PREVENT_DOUBLE_WRITE, true); // true is default in MDW
noDeleteOpenFile = indexSettings.getAsBoolean(RANDOM_NO_DELETE_OPEN_FILE, random.nextBoolean()); // true is default in MDW
random.nextInt(shardId.getId() + 1); // some randomness per shard
throttle = Throttling.valueOf(indexSettings.get(RANDOM_THROTTLE, random.nextDouble() < 0.1 ? "SOMETIMES" : "NEVER"));
checkIndexOnClose = indexSettings.getAsBoolean(CHECK_INDEX_ON_CLOSE, false);// we can't do this by default since it might close the index input that we still read from in a pending fetch phase.
failOnClose = indexSettings.getAsBoolean(RANDOM_FAIL_ON_CLOSE, false);
crashIndex = indexSettings.getAsBoolean(CRASH_INDEX, true);
if (logger.isDebugEnabled()) {
logger.debug("Using MockDirWrapper with seed [{}] throttle: [{}] checkIndexOnClose: [{}]", SeedUtils.formatSeed(seed),
throttle, checkIndexOnClose);
logger.debug("Using MockDirWrapper with seed [{}] throttle: [{}] crashIndex: [{}]", SeedUtils.formatSeed(seed),
throttle, crashIndex);
}
this.indexSettings = indexSettings;
this.shardId = shardId;
@ -87,11 +83,11 @@ public class MockDirectoryHelper {
}
public Directory wrap(Directory dir) {
final ElasticsearchMockDirectoryWrapper w = new ElasticsearchMockDirectoryWrapper(random, dir, logger, failOnClose);
final ElasticsearchMockDirectoryWrapper w = new ElasticsearchMockDirectoryWrapper(random, dir, logger, this.crashIndex);
w.setRandomIOExceptionRate(randomIOExceptionRate);
w.setRandomIOExceptionRateOnOpen(randomIOExceptionRateOnOpen);
w.setThrottling(throttle);
w.setCheckIndexOnClose(checkIndexOnClose);
w.setCheckIndexOnClose(false); // we do this on the index level
w.setPreventDoubleWrite(preventDoubleWrite);
w.setNoDeleteOpenFile(noDeleteOpenFile);
wrappers.add(w);
@ -128,31 +124,39 @@ public class MockDirectoryHelper {
public static final class ElasticsearchMockDirectoryWrapper extends MockDirectoryWrapper {
private final ESLogger logger;
private final boolean failOnClose;
private final boolean crash;
private RuntimeException closeException;
public ElasticsearchMockDirectoryWrapper(Random random, Directory delegate, ESLogger logger, boolean failOnClose) {
public ElasticsearchMockDirectoryWrapper(Random random, Directory delegate, ESLogger logger, boolean crash) {
super(random, delegate);
this.crash = crash;
this.logger = logger;
this.failOnClose = failOnClose;
}
@Override
public void close() throws IOException {
public synchronized void close() throws IOException {
try {
super.close();
} catch (RuntimeException ex) {
if (failOnClose) {
throw ex;
}
// we catch the exception on close to properly close shards even if there are open files
// the test framework will call closeWithRuntimeException after the test exits to fail
// on unclosed files.
logger.debug("MockDirectoryWrapper#close() threw exception", ex);
logger.info("MockDirectoryWrapper#close() threw exception", ex);
closeException = ex;
throw ex;
}
}
public void closeWithRuntimeException() throws IOException {
super.close(); // force fail if open files etc. called in tear down of ElasticsearchIntegrationTest
public synchronized boolean successfullyClosed() {
return closeException == null && !isOpen();
}
public synchronized RuntimeException closeException() {
return closeException;
}
@Override
public synchronized void crash() throws IOException {
if (crash) {
super.crash();
}
}
}
}

View File

@ -19,29 +19,61 @@
package org.elasticsearch.test.store;
import com.google.common.base.Charsets;
import org.apache.lucene.index.CheckIndex;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory;
import org.apache.lucene.store.LockFactory;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.shard.IndexShardException;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.index.shard.service.InternalIndexShard;
import org.elasticsearch.index.store.IndexStore;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.store.fs.FsDirectoryService;
import org.elasticsearch.indices.IndicesLifecycle;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import java.io.File;
import java.io.IOException;
import java.io.PrintStream;
import java.util.Random;
public class MockFSDirectoryService extends FsDirectoryService {
private final MockDirectoryHelper helper;
private FsDirectoryService delegateService;
public static final String CHECK_INDEX_ON_CLOSE = "index.store.mock.check_index_on_close";
private final boolean checkIndexOnClose;
@Inject
public MockFSDirectoryService(ShardId shardId, @IndexSettings Settings indexSettings, IndexStore indexStore) {
public MockFSDirectoryService(final ShardId shardId, @IndexSettings Settings indexSettings, IndexStore indexStore, final IndicesService service) {
super(shardId, indexSettings, indexStore);
helper = new MockDirectoryHelper(shardId, indexSettings, logger);
final long seed = indexSettings.getAsLong(ElasticsearchIntegrationTest.INDEX_SEED_SETTING, 0l);
Random random = new Random(seed);
helper = new MockDirectoryHelper(shardId, indexSettings, logger, random, seed);
checkIndexOnClose = indexSettings.getAsBoolean(CHECK_INDEX_ON_CLOSE, random.nextDouble() < 0.1);
delegateService = helper.randomDirectorService(indexStore);
if (checkIndexOnClose) {
final IndicesLifecycle.Listener listener = new IndicesLifecycle.Listener() {
@Override
public void beforeIndexShardClosed(ShardId sid, @Nullable IndexShard indexShard) {
if (shardId.equals(sid) && indexShard != null) {
checkIndex(((InternalIndexShard) indexShard).store());
}
service.indicesLifecycle().removeListener(this);
}
};
service.indicesLifecycle().addListener(listener);
}
}
@Override
@ -53,6 +85,29 @@ public class MockFSDirectoryService extends FsDirectoryService {
protected synchronized FSDirectory newFSDirectory(File location, LockFactory lockFactory) throws IOException {
throw new UnsupportedOperationException();
}
private void checkIndex(Store store) throws IndexShardException {
try {
if (!Lucene.indexExists(store.directory())) {
return;
}
CheckIndex checkIndex = new CheckIndex(store.directory());
BytesStreamOutput os = new BytesStreamOutput();
PrintStream out = new PrintStream(os, false, Charsets.UTF_8.name());
checkIndex.setInfoStream(out);
out.flush();
CheckIndex.Status status = checkIndex.checkIndex();
if (!status.clean) {
logger.warn("check index [failure]\n{}", new String(os.bytes().toBytes(), Charsets.UTF_8));
throw new IndexShardException(shardId, "index check failure");
} else {
if (logger.isDebugEnabled()) {
logger.debug("check index [success]\n{}", new String(os.bytes().toBytes(), Charsets.UTF_8));
}
}
} catch (Exception e) {
logger.warn("failed to check index", e);
}
}
}

View File

@ -25,8 +25,10 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.DirectoryService;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import java.io.IOException;
import java.util.Random;
public class MockRamDirectoryService extends AbstractIndexShardComponent implements DirectoryService {
@ -36,7 +38,9 @@ public class MockRamDirectoryService extends AbstractIndexShardComponent impleme
@Inject
public MockRamDirectoryService(ShardId shardId, Settings indexSettings) {
super(shardId, indexSettings);
helper = new MockDirectoryHelper(shardId, indexSettings, logger);
final long seed = indexSettings.getAsLong(ElasticsearchIntegrationTest.INDEX_SEED_SETTING, 0l);
Random random = new Random(seed);
helper = new MockDirectoryHelper(shardId, indexSettings, logger, random, seed);
delegateService = helper.randomRamDirectoryService();
}