internally store on going state of recovery from gateway

This commit is contained in:
kimchy 2010-08-17 08:17:29 +03:00
parent 92fd9af2b9
commit 29e981d28d
4 changed files with 130 additions and 58 deletions

View File

@ -27,6 +27,8 @@ import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit;
import org.elasticsearch.index.shard.IndexShardComponent; import org.elasticsearch.index.shard.IndexShardComponent;
import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.Translog;
import java.util.concurrent.atomic.AtomicLong;
import static org.elasticsearch.common.unit.TimeValue.*; import static org.elasticsearch.common.unit.TimeValue.*;
/** /**
@ -36,6 +38,8 @@ public interface IndexShardGateway extends IndexShardComponent, CloseableIndexCo
String type(); String type();
RecoveryStatus recoveryStatus();
/** /**
* Recovers the state of the shard from the gateway. * Recovers the state of the shard from the gateway.
*/ */
@ -195,13 +199,26 @@ public interface IndexShardGateway extends IndexShardComponent, CloseableIndexCo
class RecoveryStatus { class RecoveryStatus {
private Index index; public static enum Stage {
NONE,
INDEX,
TRANSLOG,
DONE
}
private Translog translog; private Stage stage = Stage.NONE;
public RecoveryStatus(Index index, Translog translog) { private Index index = new Index();
this.index = index;
this.translog = translog; private Translog translog = new Translog();
public Stage stage() {
return this.stage;
}
public RecoveryStatus updateStage(Stage stage) {
this.stage = stage;
return this;
} }
public Index index() { public Index index() {
@ -213,45 +230,60 @@ public interface IndexShardGateway extends IndexShardComponent, CloseableIndexCo
} }
public static class Translog { public static class Translog {
public static final Translog EMPTY = new Translog(0, TimeValue.timeValueMillis(0)); volatile long currentTranslogOperations = 0;
private long startTime = -1;
private long took;
private int numberOfOperations; public long startTime() {
private TimeValue took; return this.startTime;
public Translog(int numberOfOperations, TimeValue took) {
this.numberOfOperations = numberOfOperations;
this.took = took;
} }
public int numberOfOperations() { public void startTime(long startTime) {
return numberOfOperations; this.startTime = startTime;
} }
public TimeValue took() { public TimeValue took() {
return this.took; return new TimeValue(this.took);
}
public void took(long took) {
this.took = took;
}
public void addTranslogOperations(long count) {
this.currentTranslogOperations += count;
}
public long currentTranslogOperations() {
return this.currentTranslogOperations;
} }
} }
public static class Index { public static class Index {
public static final Index EMPTY = new Index(-1, 0, new ByteSizeValue(0), 0, new ByteSizeValue(0), timeValueMillis(0), timeValueMillis(0)); private long startTime = -1;
private long took = -1;
private long version; private long version = -1;
private int numberOfFiles; private int numberOfFiles = 0;
private ByteSizeValue totalSize; private long totalSize = 0;
private int numberOfExistingFiles; private int numberOfExistingFiles = 0;
private ByteSizeValue existingTotalSize; private long existingTotalSize = 0;
private TimeValue throttlingWaitTime; private AtomicLong throttlingWaitTime = new AtomicLong();
private TimeValue took; private AtomicLong currentFilesSize = new AtomicLong();
public Index(long version, int numberOfFiles, ByteSizeValue totalSize, public long startTime() {
int numberOfExistingFiles, ByteSizeValue existingTotalSize, return this.startTime;
TimeValue throttlingWaitTime, TimeValue took) { }
this.version = version;
this.numberOfFiles = numberOfFiles; public void startTime(long startTime) {
this.totalSize = totalSize; this.startTime = startTime;
this.numberOfExistingFiles = numberOfExistingFiles; }
this.existingTotalSize = existingTotalSize;
this.throttlingWaitTime = throttlingWaitTime; public TimeValue took() {
return new TimeValue(this.took);
}
public void took(long took) {
this.took = took; this.took = took;
} }
@ -259,12 +291,19 @@ public interface IndexShardGateway extends IndexShardComponent, CloseableIndexCo
return this.version; return this.version;
} }
public void files(int numberOfFiles, long totalSize, int numberOfExistingFiles, long existingTotalSize) {
this.numberOfFiles = numberOfFiles;
this.totalSize = totalSize;
this.numberOfExistingFiles = numberOfExistingFiles;
this.existingTotalSize = existingTotalSize;
}
public int numberOfFiles() { public int numberOfFiles() {
return numberOfFiles; return numberOfFiles;
} }
public ByteSizeValue totalSize() { public ByteSizeValue totalSize() {
return totalSize; return new ByteSizeValue(totalSize);
} }
public int numberOfExistingFiles() { public int numberOfExistingFiles() {
@ -272,15 +311,27 @@ public interface IndexShardGateway extends IndexShardComponent, CloseableIndexCo
} }
public ByteSizeValue existingTotalSize() { public ByteSizeValue existingTotalSize() {
return existingTotalSize; return new ByteSizeValue(existingTotalSize);
}
public void addThrottlingTime(long delta) {
throttlingWaitTime.addAndGet(delta);
} }
public TimeValue throttlingWaitTime() { public TimeValue throttlingWaitTime() {
return throttlingWaitTime; return new TimeValue(throttlingWaitTime.get());
} }
public TimeValue took() { public void updateVersion(long version) {
return this.took; this.version = version;
}
public long currentFilesSize() {
return this.currentFilesSize.get();
}
public void addCurrentFilesSize(long updatedSize) {
this.currentFilesSize.addAndGet(updatedSize);
} }
} }
} }

View File

@ -159,7 +159,7 @@ public class IndexShardGatewayService extends AbstractIndexShardComponent implem
sb.append("recovery completed from ").append(shardGateway).append(", took [").append(stopWatch.totalTime()).append("], throttling_wait [").append(throttlingWaitTime.totalTime()).append("]\n"); sb.append("recovery completed from ").append(shardGateway).append(", took [").append(stopWatch.totalTime()).append("], throttling_wait [").append(throttlingWaitTime.totalTime()).append("]\n");
sb.append(" index : recovered_files [").append(recoveryStatus.index().numberOfFiles()).append("] with total_size [").append(recoveryStatus.index().totalSize()).append("], took [").append(recoveryStatus.index().took()).append("], throttling_wait [").append(recoveryStatus.index().throttlingWaitTime()).append("]\n"); sb.append(" index : recovered_files [").append(recoveryStatus.index().numberOfFiles()).append("] with total_size [").append(recoveryStatus.index().totalSize()).append("], took [").append(recoveryStatus.index().took()).append("], throttling_wait [").append(recoveryStatus.index().throttlingWaitTime()).append("]\n");
sb.append(" : reusing_files [").append(recoveryStatus.index().numberOfExistingFiles()).append("] with total_size [").append(recoveryStatus.index().existingTotalSize()).append("]\n"); sb.append(" : reusing_files [").append(recoveryStatus.index().numberOfExistingFiles()).append("] with total_size [").append(recoveryStatus.index().existingTotalSize()).append("]\n");
sb.append(" translog : number_of_operations [").append(recoveryStatus.translog().numberOfOperations()).append("], took [").append(recoveryStatus.translog().took()).append("]"); sb.append(" translog : number_of_operations [").append(recoveryStatus.translog().currentTranslogOperations()).append("], took [").append(recoveryStatus.translog().took()).append("]");
logger.debug(sb.toString()); logger.debug(sb.toString());
} }
// refresh the shard // refresh the shard

View File

@ -88,6 +88,7 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo
protected final RecoveryThrottler recoveryThrottler; protected final RecoveryThrottler recoveryThrottler;
private final RecoveryStatus recoveryStatus;
protected final ByteSizeValue chunkSize; protected final ByteSizeValue chunkSize;
@ -122,6 +123,12 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo
this.indexContainer = blobStore.immutableBlobContainer(blobStoreIndexGateway.shardIndexPath(shardId.id())); this.indexContainer = blobStore.immutableBlobContainer(blobStoreIndexGateway.shardIndexPath(shardId.id()));
this.translogContainer = blobStore.appendableBlobContainer(blobStoreIndexGateway.shardTranslogPath(shardId.id())); this.translogContainer = blobStore.appendableBlobContainer(blobStoreIndexGateway.shardTranslogPath(shardId.id()));
this.recoveryStatus = new RecoveryStatus();
}
@Override public RecoveryStatus recoveryStatus() {
return this.recoveryStatus;
} }
@Override public String toString() { @Override public String toString() {
@ -367,26 +374,35 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo
} }
@Override public RecoveryStatus recover() throws IndexShardGatewayRecoveryException { @Override public RecoveryStatus recover() throws IndexShardGatewayRecoveryException {
RecoveryStatus.Index recoveryStatusIndex = recoverIndex(); recoveryStatus.index().startTime(System.currentTimeMillis());
RecoveryStatus.Translog recoveryStatusTranslog = recoverTranslog(); recoveryStatus.updateStage(RecoveryStatus.Stage.INDEX);
return new RecoveryStatus(recoveryStatusIndex, recoveryStatusTranslog); recoverIndex();
recoveryStatus.index().took(System.currentTimeMillis() - recoveryStatus.index().startTime());
recoveryStatus.translog().startTime(System.currentTimeMillis());
recoveryStatus.updateStage(RecoveryStatus.Stage.TRANSLOG);
recoverTranslog();
recoveryStatus.translog().took(System.currentTimeMillis() - recoveryStatus.index().startTime());
recoveryStatus.updateStage(RecoveryStatus.Stage.DONE);
return recoveryStatus;
} }
private RecoveryStatus.Translog recoverTranslog() throws IndexShardGatewayRecoveryException { private void recoverTranslog() throws IndexShardGatewayRecoveryException {
long translogId; long translogId;
try { try {
translogId = IndexReader.getCurrentVersion(store.directory()); translogId = IndexReader.getCurrentVersion(store.directory());
} catch (FileNotFoundException e) { } catch (FileNotFoundException e) {
// no index, that fine // no index, that fine
indexShard.start(); indexShard.start();
return RecoveryStatus.Translog.EMPTY; return;
} catch (IOException e) { } catch (IOException e) {
throw new IndexShardGatewayRecoveryException(shardId, "Failed to recovery translog, can't read current index version", e); throw new IndexShardGatewayRecoveryException(shardId, "Failed to recovery translog, can't read current index version", e);
} }
if (!translogContainer.blobExists("translog-" + translogId)) { if (!translogContainer.blobExists("translog-" + translogId)) {
// no recovery file found, start the shard and bail // no recovery file found, start the shard and bail
indexShard.start(); indexShard.start();
return RecoveryStatus.Translog.EMPTY; return;
} }
@ -396,8 +412,6 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo
final AtomicReference<Throwable> failure = new AtomicReference<Throwable>(); final AtomicReference<Throwable> failure = new AtomicReference<Throwable>();
final CountDownLatch latch = new CountDownLatch(1); final CountDownLatch latch = new CountDownLatch(1);
final AtomicInteger totalOperations = new AtomicInteger();
final AtomicLong totalSize = new AtomicLong();
translogContainer.readBlob("translog-" + translogId, new BlobContainer.ReadBlobListener() { translogContainer.readBlob("translog-" + translogId, new BlobContainer.ReadBlobListener() {
FastByteArrayOutputStream bos = new FastByteArrayOutputStream(); FastByteArrayOutputStream bos = new FastByteArrayOutputStream();
@ -422,21 +436,20 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo
if ((si.position() - curPos) != opSize) { if ((si.position() - curPos) != opSize) {
logger.warn("mismatch in size, expected [{}], got [{}]", opSize, si.position() - curPos); logger.warn("mismatch in size, expected [{}], got [{}]", opSize, si.position() - curPos);
} }
totalOperations.incrementAndGet(); recoveryStatus.translog().addTranslogOperations(1);
indexShard.performRecoveryOperation(operation); indexShard.performRecoveryOperation(operation);
if (si.position() >= bos.size()) { if (si.position() >= bos.size()) {
position = si.position(); position = si.position();
break; break;
} }
} catch (Exception e) { } catch (Exception e) {
logger.warn("failed to retrieve translog after [{}] operations, ignoring the rest, considered corrupted", e, totalOperations.get()); logger.warn("failed to retrieve translog after [{}] operations, ignoring the rest, considered corrupted", e, recoveryStatus.translog().currentTranslogOperations());
ignore = true; ignore = true;
latch.countDown(); latch.countDown();
return; return;
} }
} }
totalSize.addAndGet(position);
FastByteArrayOutputStream newBos = new FastByteArrayOutputStream(); FastByteArrayOutputStream newBos = new FastByteArrayOutputStream();
int leftOver = bos.size() - position; int leftOver = bos.size() - position;
@ -463,15 +476,12 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo
} }
indexShard.performRecoveryFinalization(true); indexShard.performRecoveryFinalization(true);
return new RecoveryStatus.Translog(totalOperations.get(), timer.stop().totalTime());
} catch (Throwable e) { } catch (Throwable e) {
throw new IndexShardGatewayRecoveryException(shardId, "Failed to recovery translog", e); throw new IndexShardGatewayRecoveryException(shardId, "Failed to recovery translog", e);
} }
} }
private RecoveryStatus.Index recoverIndex() throws IndexShardGatewayRecoveryException { private void recoverIndex() throws IndexShardGatewayRecoveryException {
StopWatch timer = new StopWatch().start();
final ImmutableMap<String, BlobMetaData> indicesBlobs; final ImmutableMap<String, BlobMetaData> indicesBlobs;
try { try {
indicesBlobs = indexContainer.listBlobs(); indicesBlobs = indexContainer.listBlobs();
@ -515,11 +525,12 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo
} }
} }
recoveryStatus.index().files(numberOfFiles, totalSize, numberOfExistingFiles, existingTotalSize);
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace("recovering_files [{}] with total_size [{}], reusing_files [{}] with total_size [{}]", numberOfFiles, new ByteSizeValue(totalSize), numberOfExistingFiles, new ByteSizeValue(existingTotalSize)); logger.trace("recovering_files [{}] with total_size [{}], reusing_files [{}] with total_size [{}]", numberOfFiles, new ByteSizeValue(totalSize), numberOfExistingFiles, new ByteSizeValue(existingTotalSize));
} }
final AtomicLong throttlingWaitTime = new AtomicLong();
final CountDownLatch latch = new CountDownLatch(filesToRecover.size()); final CountDownLatch latch = new CountDownLatch(filesToRecover.size());
final CopyOnWriteArrayList<Throwable> failures = new CopyOnWriteArrayList<Throwable>(); final CopyOnWriteArrayList<Throwable> failures = new CopyOnWriteArrayList<Throwable>();
for (final BlobMetaData fileToRecover : filesToRecover) { for (final BlobMetaData fileToRecover : filesToRecover) {
@ -530,7 +541,7 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo
// lets reschedule to do it next time // lets reschedule to do it next time
threadPool.schedule(new Runnable() { threadPool.schedule(new Runnable() {
@Override public void run() { @Override public void run() {
throttlingWaitTime.addAndGet(recoveryThrottler.throttleInterval().millis()); recoveryStatus.index().addThrottlingTime(recoveryThrottler.throttleInterval().millis());
if (recoveryThrottler.tryStream(shardId, fileToRecover.name())) { if (recoveryThrottler.tryStream(shardId, fileToRecover.name())) {
// we managed to get a recovery going // we managed to get a recovery going
recoverFile(fileToRecover, indicesBlobs, latch, failures); recoverFile(fileToRecover, indicesBlobs, latch, failures);
@ -561,6 +572,7 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo
} catch (IOException e) { } catch (IOException e) {
throw new IndexShardGatewayRecoveryException(shardId(), "Failed to fetch index version after copying it over", e); throw new IndexShardGatewayRecoveryException(shardId(), "Failed to fetch index version after copying it over", e);
} }
recoveryStatus.index().updateVersion(version);
/// now, go over and clean files that are in the store, but were not in the gateway /// now, go over and clean files that are in the store, but were not in the gateway
try { try {
@ -576,8 +588,6 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo
} catch (IOException e) { } catch (IOException e) {
// ignore // ignore
} }
return new RecoveryStatus.Index(version, numberOfFiles, new ByteSizeValue(totalSize), numberOfExistingFiles, new ByteSizeValue(existingTotalSize), TimeValue.timeValueMillis(throttlingWaitTime.get()), timer.stop().totalTime());
} }
private void recoverFile(final BlobMetaData fileToRecover, final ImmutableMap<String, BlobMetaData> blobs, final CountDownLatch latch, final List<Throwable> failures) { private void recoverFile(final BlobMetaData fileToRecover, final ImmutableMap<String, BlobMetaData> blobs, final CountDownLatch latch, final List<Throwable> failures) {
@ -606,6 +616,7 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo
final MessageDigest digest = Digest.getMd5Digest(); final MessageDigest digest = Digest.getMd5Digest();
indexContainer.readBlob(firstFileToRecover, new BlobContainer.ReadBlobListener() { indexContainer.readBlob(firstFileToRecover, new BlobContainer.ReadBlobListener() {
@Override public synchronized void onPartial(byte[] data, int offset, int size) throws IOException { @Override public synchronized void onPartial(byte[] data, int offset, int size) throws IOException {
recoveryStatus.index().addCurrentFilesSize(size);
indexOutput.writeBytes(data, offset, size); indexOutput.writeBytes(data, offset, size);
digest.update(data, offset, size); digest.update(data, offset, size);
} }

View File

@ -39,6 +39,8 @@ public class NoneIndexShardGateway extends AbstractIndexShardComponent implement
private final InternalIndexShard indexShard; private final InternalIndexShard indexShard;
private final RecoveryStatus recoveryStatus = new RecoveryStatus();
@Inject public NoneIndexShardGateway(ShardId shardId, @IndexSettings Settings indexSettings, IndexShard indexShard) { @Inject public NoneIndexShardGateway(ShardId shardId, @IndexSettings Settings indexSettings, IndexShard indexShard) {
super(shardId, indexSettings); super(shardId, indexSettings);
this.indexShard = (InternalIndexShard) indexShard; this.indexShard = (InternalIndexShard) indexShard;
@ -48,7 +50,13 @@ public class NoneIndexShardGateway extends AbstractIndexShardComponent implement
return "_none_"; return "_none_";
} }
@Override public RecoveryStatus recoveryStatus() {
return recoveryStatus;
}
@Override public RecoveryStatus recover() throws IndexShardGatewayRecoveryException { @Override public RecoveryStatus recover() throws IndexShardGatewayRecoveryException {
recoveryStatus().index().startTime(System.currentTimeMillis());
recoveryStatus.translog().startTime(System.currentTimeMillis());
// in the none case, we simply start the shard // in the none case, we simply start the shard
// clean the store, there should be nothing there... // clean the store, there should be nothing there...
try { try {
@ -57,7 +65,9 @@ public class NoneIndexShardGateway extends AbstractIndexShardComponent implement
logger.warn("failed to clean store before starting shard", e); logger.warn("failed to clean store before starting shard", e);
} }
indexShard.start(); indexShard.start();
return new RecoveryStatus(RecoveryStatus.Index.EMPTY, RecoveryStatus.Translog.EMPTY); recoveryStatus.index().took(System.currentTimeMillis() - recoveryStatus.index().startTime());
recoveryStatus.translog().took(System.currentTimeMillis() - recoveryStatus.index().startTime());
return recoveryStatus.updateStage(RecoveryStatus.Stage.DONE);
} }
@Override public String type() { @Override public String type() {