diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/IndexShardGateway.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/IndexShardGateway.java index 86776ec4e3d..c465e71f6c5 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/IndexShardGateway.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/IndexShardGateway.java @@ -27,6 +27,8 @@ import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit; import org.elasticsearch.index.shard.IndexShardComponent; import org.elasticsearch.index.translog.Translog; +import java.util.concurrent.atomic.AtomicLong; + import static org.elasticsearch.common.unit.TimeValue.*; /** @@ -36,6 +38,8 @@ public interface IndexShardGateway extends IndexShardComponent, CloseableIndexCo String type(); + RecoveryStatus recoveryStatus(); + /** * Recovers the state of the shard from the gateway. */ @@ -195,13 +199,26 @@ public interface IndexShardGateway extends IndexShardComponent, CloseableIndexCo class RecoveryStatus { - private Index index; + public static enum Stage { + NONE, + INDEX, + TRANSLOG, + DONE + } - private Translog translog; + private Stage stage = Stage.NONE; - public RecoveryStatus(Index index, Translog translog) { - this.index = index; - this.translog = translog; + private Index index = new Index(); + + private Translog translog = new Translog(); + + public Stage stage() { + return this.stage; + } + + public RecoveryStatus updateStage(Stage stage) { + this.stage = stage; + return this; } public Index index() { @@ -213,45 +230,60 @@ public interface IndexShardGateway extends IndexShardComponent, CloseableIndexCo } public static class Translog { - public static final Translog EMPTY = new Translog(0, TimeValue.timeValueMillis(0)); + volatile long currentTranslogOperations = 0; + private long startTime = -1; + private long took; - private int numberOfOperations; - private TimeValue took; - - public Translog(int numberOfOperations, TimeValue took) { - this.numberOfOperations = numberOfOperations; - this.took = took; + public long startTime() { + return this.startTime; } - public int numberOfOperations() { - return numberOfOperations; + public void startTime(long startTime) { + this.startTime = startTime; } public TimeValue took() { - return this.took; + return new TimeValue(this.took); + } + + public void took(long took) { + this.took = took; + } + + public void addTranslogOperations(long count) { + this.currentTranslogOperations += count; + } + + public long currentTranslogOperations() { + return this.currentTranslogOperations; } } public static class Index { - public static final Index EMPTY = new Index(-1, 0, new ByteSizeValue(0), 0, new ByteSizeValue(0), timeValueMillis(0), timeValueMillis(0)); + private long startTime = -1; + private long took = -1; - private long version; - private int numberOfFiles; - private ByteSizeValue totalSize; - private int numberOfExistingFiles; - private ByteSizeValue existingTotalSize; - private TimeValue throttlingWaitTime; - private TimeValue took; + private long version = -1; + private int numberOfFiles = 0; + private long totalSize = 0; + private int numberOfExistingFiles = 0; + private long existingTotalSize = 0; + private AtomicLong throttlingWaitTime = new AtomicLong(); + private AtomicLong currentFilesSize = new AtomicLong(); - public Index(long version, int numberOfFiles, ByteSizeValue totalSize, - int numberOfExistingFiles, ByteSizeValue existingTotalSize, - TimeValue throttlingWaitTime, TimeValue took) { - this.version = version; - this.numberOfFiles = numberOfFiles; - this.totalSize = totalSize; - this.numberOfExistingFiles = numberOfExistingFiles; - this.existingTotalSize = existingTotalSize; - this.throttlingWaitTime = throttlingWaitTime; + public long startTime() { + return this.startTime; + } + + public void startTime(long startTime) { + this.startTime = startTime; + } + + public TimeValue took() { + return new TimeValue(this.took); + } + + public void took(long took) { this.took = took; } @@ -259,12 +291,19 @@ public interface IndexShardGateway extends IndexShardComponent, CloseableIndexCo return this.version; } + public void files(int numberOfFiles, long totalSize, int numberOfExistingFiles, long existingTotalSize) { + this.numberOfFiles = numberOfFiles; + this.totalSize = totalSize; + this.numberOfExistingFiles = numberOfExistingFiles; + this.existingTotalSize = existingTotalSize; + } + public int numberOfFiles() { return numberOfFiles; } public ByteSizeValue totalSize() { - return totalSize; + return new ByteSizeValue(totalSize); } public int numberOfExistingFiles() { @@ -272,15 +311,27 @@ public interface IndexShardGateway extends IndexShardComponent, CloseableIndexCo } public ByteSizeValue existingTotalSize() { - return existingTotalSize; + return new ByteSizeValue(existingTotalSize); + } + + public void addThrottlingTime(long delta) { + throttlingWaitTime.addAndGet(delta); } public TimeValue throttlingWaitTime() { - return throttlingWaitTime; + return new TimeValue(throttlingWaitTime.get()); } - public TimeValue took() { - return this.took; + public void updateVersion(long version) { + this.version = version; + } + + public long currentFilesSize() { + return this.currentFilesSize.get(); + } + + public void addCurrentFilesSize(long updatedSize) { + this.currentFilesSize.addAndGet(updatedSize); } } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/IndexShardGatewayService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/IndexShardGatewayService.java index 291f60373a3..5085ad83d34 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/IndexShardGatewayService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/IndexShardGatewayService.java @@ -159,7 +159,7 @@ public class IndexShardGatewayService extends AbstractIndexShardComponent implem sb.append("recovery completed from ").append(shardGateway).append(", took [").append(stopWatch.totalTime()).append("], throttling_wait [").append(throttlingWaitTime.totalTime()).append("]\n"); sb.append(" index : recovered_files [").append(recoveryStatus.index().numberOfFiles()).append("] with total_size [").append(recoveryStatus.index().totalSize()).append("], took [").append(recoveryStatus.index().took()).append("], throttling_wait [").append(recoveryStatus.index().throttlingWaitTime()).append("]\n"); sb.append(" : reusing_files [").append(recoveryStatus.index().numberOfExistingFiles()).append("] with total_size [").append(recoveryStatus.index().existingTotalSize()).append("]\n"); - sb.append(" translog : number_of_operations [").append(recoveryStatus.translog().numberOfOperations()).append("], took [").append(recoveryStatus.translog().took()).append("]"); + sb.append(" translog : number_of_operations [").append(recoveryStatus.translog().currentTranslogOperations()).append("], took [").append(recoveryStatus.translog().took()).append("]"); logger.debug(sb.toString()); } // refresh the shard diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/blobstore/BlobStoreIndexShardGateway.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/blobstore/BlobStoreIndexShardGateway.java index c73af896be1..b3156688422 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/blobstore/BlobStoreIndexShardGateway.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/blobstore/BlobStoreIndexShardGateway.java @@ -88,6 +88,7 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo protected final RecoveryThrottler recoveryThrottler; + private final RecoveryStatus recoveryStatus; protected final ByteSizeValue chunkSize; @@ -122,6 +123,12 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo this.indexContainer = blobStore.immutableBlobContainer(blobStoreIndexGateway.shardIndexPath(shardId.id())); this.translogContainer = blobStore.appendableBlobContainer(blobStoreIndexGateway.shardTranslogPath(shardId.id())); + + this.recoveryStatus = new RecoveryStatus(); + } + + @Override public RecoveryStatus recoveryStatus() { + return this.recoveryStatus; } @Override public String toString() { @@ -367,26 +374,35 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo } @Override public RecoveryStatus recover() throws IndexShardGatewayRecoveryException { - RecoveryStatus.Index recoveryStatusIndex = recoverIndex(); - RecoveryStatus.Translog recoveryStatusTranslog = recoverTranslog(); - return new RecoveryStatus(recoveryStatusIndex, recoveryStatusTranslog); + recoveryStatus.index().startTime(System.currentTimeMillis()); + recoveryStatus.updateStage(RecoveryStatus.Stage.INDEX); + recoverIndex(); + recoveryStatus.index().took(System.currentTimeMillis() - recoveryStatus.index().startTime()); + + recoveryStatus.translog().startTime(System.currentTimeMillis()); + recoveryStatus.updateStage(RecoveryStatus.Stage.TRANSLOG); + recoverTranslog(); + recoveryStatus.translog().took(System.currentTimeMillis() - recoveryStatus.index().startTime()); + + recoveryStatus.updateStage(RecoveryStatus.Stage.DONE); + return recoveryStatus; } - private RecoveryStatus.Translog recoverTranslog() throws IndexShardGatewayRecoveryException { + private void recoverTranslog() throws IndexShardGatewayRecoveryException { long translogId; try { translogId = IndexReader.getCurrentVersion(store.directory()); } catch (FileNotFoundException e) { // no index, that fine indexShard.start(); - return RecoveryStatus.Translog.EMPTY; + return; } catch (IOException e) { throw new IndexShardGatewayRecoveryException(shardId, "Failed to recovery translog, can't read current index version", e); } if (!translogContainer.blobExists("translog-" + translogId)) { // no recovery file found, start the shard and bail indexShard.start(); - return RecoveryStatus.Translog.EMPTY; + return; } @@ -396,8 +412,6 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo final AtomicReference failure = new AtomicReference(); final CountDownLatch latch = new CountDownLatch(1); - final AtomicInteger totalOperations = new AtomicInteger(); - final AtomicLong totalSize = new AtomicLong(); translogContainer.readBlob("translog-" + translogId, new BlobContainer.ReadBlobListener() { FastByteArrayOutputStream bos = new FastByteArrayOutputStream(); @@ -422,21 +436,20 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo if ((si.position() - curPos) != opSize) { logger.warn("mismatch in size, expected [{}], got [{}]", opSize, si.position() - curPos); } - totalOperations.incrementAndGet(); + recoveryStatus.translog().addTranslogOperations(1); indexShard.performRecoveryOperation(operation); if (si.position() >= bos.size()) { position = si.position(); break; } } catch (Exception e) { - logger.warn("failed to retrieve translog after [{}] operations, ignoring the rest, considered corrupted", e, totalOperations.get()); + logger.warn("failed to retrieve translog after [{}] operations, ignoring the rest, considered corrupted", e, recoveryStatus.translog().currentTranslogOperations()); ignore = true; latch.countDown(); return; } } - totalSize.addAndGet(position); FastByteArrayOutputStream newBos = new FastByteArrayOutputStream(); int leftOver = bos.size() - position; @@ -463,15 +476,12 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo } indexShard.performRecoveryFinalization(true); - - return new RecoveryStatus.Translog(totalOperations.get(), timer.stop().totalTime()); } catch (Throwable e) { throw new IndexShardGatewayRecoveryException(shardId, "Failed to recovery translog", e); } } - private RecoveryStatus.Index recoverIndex() throws IndexShardGatewayRecoveryException { - StopWatch timer = new StopWatch().start(); + private void recoverIndex() throws IndexShardGatewayRecoveryException { final ImmutableMap indicesBlobs; try { indicesBlobs = indexContainer.listBlobs(); @@ -515,11 +525,12 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo } } + recoveryStatus.index().files(numberOfFiles, totalSize, numberOfExistingFiles, existingTotalSize); + if (logger.isTraceEnabled()) { logger.trace("recovering_files [{}] with total_size [{}], reusing_files [{}] with total_size [{}]", numberOfFiles, new ByteSizeValue(totalSize), numberOfExistingFiles, new ByteSizeValue(existingTotalSize)); } - final AtomicLong throttlingWaitTime = new AtomicLong(); final CountDownLatch latch = new CountDownLatch(filesToRecover.size()); final CopyOnWriteArrayList failures = new CopyOnWriteArrayList(); for (final BlobMetaData fileToRecover : filesToRecover) { @@ -530,7 +541,7 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo // lets reschedule to do it next time threadPool.schedule(new Runnable() { @Override public void run() { - throttlingWaitTime.addAndGet(recoveryThrottler.throttleInterval().millis()); + recoveryStatus.index().addThrottlingTime(recoveryThrottler.throttleInterval().millis()); if (recoveryThrottler.tryStream(shardId, fileToRecover.name())) { // we managed to get a recovery going recoverFile(fileToRecover, indicesBlobs, latch, failures); @@ -561,6 +572,7 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo } catch (IOException e) { throw new IndexShardGatewayRecoveryException(shardId(), "Failed to fetch index version after copying it over", e); } + recoveryStatus.index().updateVersion(version); /// now, go over and clean files that are in the store, but were not in the gateway try { @@ -576,8 +588,6 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo } catch (IOException e) { // ignore } - - return new RecoveryStatus.Index(version, numberOfFiles, new ByteSizeValue(totalSize), numberOfExistingFiles, new ByteSizeValue(existingTotalSize), TimeValue.timeValueMillis(throttlingWaitTime.get()), timer.stop().totalTime()); } private void recoverFile(final BlobMetaData fileToRecover, final ImmutableMap blobs, final CountDownLatch latch, final List failures) { @@ -606,6 +616,7 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo final MessageDigest digest = Digest.getMd5Digest(); indexContainer.readBlob(firstFileToRecover, new BlobContainer.ReadBlobListener() { @Override public synchronized void onPartial(byte[] data, int offset, int size) throws IOException { + recoveryStatus.index().addCurrentFilesSize(size); indexOutput.writeBytes(data, offset, size); digest.update(data, offset, size); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/none/NoneIndexShardGateway.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/none/NoneIndexShardGateway.java index b7af93be285..575be66e9ba 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/none/NoneIndexShardGateway.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/none/NoneIndexShardGateway.java @@ -39,6 +39,8 @@ public class NoneIndexShardGateway extends AbstractIndexShardComponent implement private final InternalIndexShard indexShard; + private final RecoveryStatus recoveryStatus = new RecoveryStatus(); + @Inject public NoneIndexShardGateway(ShardId shardId, @IndexSettings Settings indexSettings, IndexShard indexShard) { super(shardId, indexSettings); this.indexShard = (InternalIndexShard) indexShard; @@ -48,7 +50,13 @@ public class NoneIndexShardGateway extends AbstractIndexShardComponent implement return "_none_"; } + @Override public RecoveryStatus recoveryStatus() { + return recoveryStatus; + } + @Override public RecoveryStatus recover() throws IndexShardGatewayRecoveryException { + recoveryStatus().index().startTime(System.currentTimeMillis()); + recoveryStatus.translog().startTime(System.currentTimeMillis()); // in the none case, we simply start the shard // clean the store, there should be nothing there... try { @@ -57,7 +65,9 @@ public class NoneIndexShardGateway extends AbstractIndexShardComponent implement logger.warn("failed to clean store before starting shard", e); } indexShard.start(); - return new RecoveryStatus(RecoveryStatus.Index.EMPTY, RecoveryStatus.Translog.EMPTY); + recoveryStatus.index().took(System.currentTimeMillis() - recoveryStatus.index().startTime()); + recoveryStatus.translog().took(System.currentTimeMillis() - recoveryStatus.index().startTime()); + return recoveryStatus.updateStage(RecoveryStatus.Stage.DONE); } @Override public String type() {