diff --git a/docs/reference/indices/recovery.asciidoc b/docs/reference/indices/recovery.asciidoc index 74b30cb02ce..b50b5c4dce9 100644 --- a/docs/reference/indices/recovery.asciidoc +++ b/docs/reference/indices/recovery.asciidoc @@ -65,6 +65,9 @@ coming[1.5.0, this syntax was change to fix inconsistencies with other API] }, "translog" : { "recovered" : 0, + "total" : 0, + "percent" : "100.0%", + "total_on_start" : 0, "total_time" : "0s", "total_time_in_millis" : 0 }, diff --git a/rest-api-spec/test/cat.recovery/10_basic.yaml b/rest-api-spec/test/cat.recovery/10_basic.yaml index ae6c00581e1..89b1fb8c765 100755 --- a/rest-api-spec/test/cat.recovery/10_basic.yaml +++ b/rest-api-spec/test/cat.recovery/10_basic.yaml @@ -42,6 +42,9 @@ \d+\.\d+% \s+ # bytes_percent \d+ \s+ # total_files \d+ \s+ # total_bytes + \d+ \s+ # translog + -?\d+\.\d+% \s+ # translog_percent + -?\d+ \s+ # total_translog \n )+ $/ diff --git a/rest-api-spec/test/indices.recovery/10_basic.yaml b/rest-api-spec/test/indices.recovery/10_basic.yaml index ea1c522ab8a..86d396c2063 100644 --- a/rest-api-spec/test/indices.recovery/10_basic.yaml +++ b/rest-api-spec/test/indices.recovery/10_basic.yaml @@ -33,6 +33,8 @@ - gte: { test_1.shards.0.index.size.recovered_in_bytes: 0 } - match: { test_1.shards.0.index.size.percent: /^\d+\.\d\%$/ } - gte: { test_1.shards.0.translog.recovered: 0 } + - gte: { test_1.shards.0.translog.total: -1 } + - gte: { test_1.shards.0.translog.total_on_start: 0 } - gte: { test_1.shards.0.translog.total_time_in_millis: 0 } - gte: { test_1.shards.0.start.check_index_time_in_millis: 0 } - gte: { test_1.shards.0.start.total_time_in_millis: 0 } diff --git a/src/main/java/org/elasticsearch/index/gateway/IndexShardGateway.java b/src/main/java/org/elasticsearch/index/gateway/IndexShardGateway.java index a6787fb3537..0fc6a082acc 100644 --- a/src/main/java/org/elasticsearch/index/gateway/IndexShardGateway.java +++ b/src/main/java/org/elasticsearch/index/gateway/IndexShardGateway.java @@ -191,6 +191,8 @@ public class IndexShardGateway extends AbstractIndexShardComponent implements Cl if (recoveringTranslogFile == null || Files.exists(recoveringTranslogFile) == false) { // no translog files, bail + recoveryState.getTranslog().totalOperations(0); + recoveryState.getTranslog().totalOperationsOnStart(0); indexShard.finalizeRecovery(); indexShard.postRecovery("post recovery from gateway, no translog"); // no index, just start the shard and bail @@ -236,7 +238,7 @@ public class IndexShardGateway extends AbstractIndexShardComponent implements Cl typesToUpdate.add(potentialIndexOperation.docMapper().type()); } } - recoveryState.getTranslog().addTranslogOperations(1); + recoveryState.getTranslog().incrementRecoveredOperations(); } catch (ElasticsearchException e) { if (e.status() == RestStatus.BAD_REQUEST) { // mainly for MapperParsingException and Failure to detect xcontent diff --git a/src/main/java/org/elasticsearch/index/gateway/IndexShardGatewayService.java b/src/main/java/org/elasticsearch/index/gateway/IndexShardGatewayService.java index d828ab548b5..ff170b64ba5 100644 --- a/src/main/java/org/elasticsearch/index/gateway/IndexShardGatewayService.java +++ b/src/main/java/org/elasticsearch/index/gateway/IndexShardGatewayService.java @@ -132,7 +132,7 @@ public class IndexShardGatewayService extends AbstractIndexShardComponent implem .append(new ByteSizeValue(index.reusedBytes())).append("]\n"); sb.append(" start : took [").append(TimeValue.timeValueMillis(recoveryState.getStart().time())).append("], check_index [") .append(timeValueMillis(recoveryState.getStart().checkIndexTime())).append("]\n"); - sb.append(" translog : number_of_operations [").append(recoveryState.getTranslog().currentTranslogOperations()) + sb.append(" translog : number_of_operations [").append(recoveryState.getTranslog().recoveredOperations()) .append("], took [").append(TimeValue.timeValueMillis(recoveryState.getTranslog().time())).append("]"); logger.trace(sb.toString()); } else if (logger.isDebugEnabled()) { diff --git a/src/main/java/org/elasticsearch/index/snapshots/IndexShardSnapshotAndRestoreService.java b/src/main/java/org/elasticsearch/index/snapshots/IndexShardSnapshotAndRestoreService.java index d5ac11a704e..ce0ee16f932 100644 --- a/src/main/java/org/elasticsearch/index/snapshots/IndexShardSnapshotAndRestoreService.java +++ b/src/main/java/org/elasticsearch/index/snapshots/IndexShardSnapshotAndRestoreService.java @@ -115,6 +115,8 @@ public class IndexShardSnapshotAndRestoreService extends AbstractIndexShardCompo logger.trace("[{}] restoring shard [{}]", restoreSource.snapshotId(), shardId); } try { + recoveryState.getTranslog().totalOperations(0); + recoveryState.getTranslog().totalOperationsOnStart(0); indexShard.prepareForIndexRecovery(); IndexShardRepository indexShardRepository = repositoriesService.indexShardRepository(restoreSource.snapshotId().getRepository()); ShardId snapshotShardId = shardId; diff --git a/src/main/java/org/elasticsearch/indices/recovery/RecoveryCleanFilesRequest.java b/src/main/java/org/elasticsearch/indices/recovery/RecoveryCleanFilesRequest.java index 819ae984ae9..0ff00d7c008 100644 --- a/src/main/java/org/elasticsearch/indices/recovery/RecoveryCleanFilesRequest.java +++ b/src/main/java/org/elasticsearch/indices/recovery/RecoveryCleanFilesRequest.java @@ -19,7 +19,6 @@ package org.elasticsearch.indices.recovery; -import com.google.common.collect.Sets; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.index.shard.ShardId; @@ -27,7 +26,6 @@ import org.elasticsearch.index.store.Store; import org.elasticsearch.transport.TransportRequest; import java.io.IOException; -import java.util.Set; /** * @@ -37,15 +35,17 @@ class RecoveryCleanFilesRequest extends TransportRequest { private long recoveryId; private ShardId shardId; - private Store.MetadataSnapshot snapshotFiles; + private Store.MetadataSnapshot snapshotFiles; + private int totalTranslogOps = RecoveryState.Translog.UNKNOWN; RecoveryCleanFilesRequest() { } - RecoveryCleanFilesRequest(long recoveryId, ShardId shardId, Store.MetadataSnapshot snapshotFiles) { + RecoveryCleanFilesRequest(long recoveryId, ShardId shardId, Store.MetadataSnapshot snapshotFiles, int totalTranslogOps) { this.recoveryId = recoveryId; this.shardId = shardId; this.snapshotFiles = snapshotFiles; + this.totalTranslogOps = totalTranslogOps; } public long recoveryId() { @@ -62,6 +62,7 @@ class RecoveryCleanFilesRequest extends TransportRequest { recoveryId = in.readLong(); shardId = ShardId.readShardId(in); snapshotFiles = Store.MetadataSnapshot.read(in); + totalTranslogOps = in.readVInt(); } @Override @@ -70,9 +71,14 @@ class RecoveryCleanFilesRequest extends TransportRequest { out.writeLong(recoveryId); shardId.writeTo(out); snapshotFiles.writeTo(out); + out.writeVInt(totalTranslogOps); } public Store.MetadataSnapshot sourceMetaSnapshot() { return snapshotFiles; } + + public int totalTranslogOps() { + return totalTranslogOps; + } } diff --git a/src/main/java/org/elasticsearch/indices/recovery/RecoveryFileChunkRequest.java b/src/main/java/org/elasticsearch/indices/recovery/RecoveryFileChunkRequest.java index d27f94d9a93..5d06772069b 100644 --- a/src/main/java/org/elasticsearch/indices/recovery/RecoveryFileChunkRequest.java +++ b/src/main/java/org/elasticsearch/indices/recovery/RecoveryFileChunkRequest.java @@ -42,16 +42,20 @@ public final class RecoveryFileChunkRequest extends TransportRequest { // publi private BytesReference content; private StoreFileMetaData metaData; + private int totalTranslogOps; + RecoveryFileChunkRequest() { } - public RecoveryFileChunkRequest(long recoveryId, ShardId shardId, StoreFileMetaData metaData, long position, BytesReference content, boolean lastChunk) { + public RecoveryFileChunkRequest(long recoveryId, ShardId shardId, StoreFileMetaData metaData, long position, BytesReference content, + boolean lastChunk, int totalTranslogOps) { this.recoveryId = recoveryId; this.shardId = shardId; this.metaData = metaData; this.position = position; this.content = content; this.lastChunk = lastChunk; + this.totalTranslogOps = totalTranslogOps; } public long recoveryId() { @@ -83,6 +87,10 @@ public final class RecoveryFileChunkRequest extends TransportRequest { // publi return content; } + public int totalTranslogOps() { + return totalTranslogOps; + } + @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); @@ -98,6 +106,7 @@ public final class RecoveryFileChunkRequest extends TransportRequest { // publi writtenBy = Lucene.parseVersionLenient(versionString, null); metaData = new StoreFileMetaData(name, length, checksum, writtenBy); lastChunk = in.readBoolean(); + totalTranslogOps = in.readVInt(); } @Override @@ -112,6 +121,7 @@ public final class RecoveryFileChunkRequest extends TransportRequest { // publi out.writeBytesReference(content); out.writeOptionalString(metaData.writtenBy() == null ? null : metaData.writtenBy().toString()); out.writeBoolean(lastChunk); + out.writeVInt(totalTranslogOps); } @Override diff --git a/src/main/java/org/elasticsearch/indices/recovery/RecoveryFilesInfoRequest.java b/src/main/java/org/elasticsearch/indices/recovery/RecoveryFilesInfoRequest.java index 2d9fb2b0653..d28ae270f9e 100644 --- a/src/main/java/org/elasticsearch/indices/recovery/RecoveryFilesInfoRequest.java +++ b/src/main/java/org/elasticsearch/indices/recovery/RecoveryFilesInfoRequest.java @@ -41,17 +41,20 @@ class RecoveryFilesInfoRequest extends TransportRequest { List<String> phase1ExistingFileNames; List<Long> phase1ExistingFileSizes; + int totalTranslogOps; + RecoveryFilesInfoRequest() { } RecoveryFilesInfoRequest(long recoveryId, ShardId shardId, List<String> phase1FileNames, List<Long> phase1FileSizes, - List<String> phase1ExistingFileNames, List<Long> phase1ExistingFileSizes) { + List<String> phase1ExistingFileNames, List<Long> phase1ExistingFileSizes, int totalTranslogOps) { this.recoveryId = recoveryId; this.shardId = shardId; this.phase1FileNames = phase1FileNames; this.phase1FileSizes = phase1FileSizes; this.phase1ExistingFileNames = phase1ExistingFileNames; this.phase1ExistingFileSizes = phase1ExistingFileSizes; + this.totalTranslogOps = totalTranslogOps; } public long recoveryId() { @@ -90,6 +93,7 @@ class RecoveryFilesInfoRequest extends TransportRequest { for (int i = 0; i < size; i++) { phase1ExistingFileSizes.add(in.readVLong()); } + totalTranslogOps = in.readVInt(); } @Override @@ -117,5 +121,6 @@ class RecoveryFilesInfoRequest extends TransportRequest { for (Long phase1ExistingFileSize : phase1ExistingFileSizes) { out.writeVLong(phase1ExistingFileSize); } + out.writeVInt(totalTranslogOps); } } diff --git a/src/main/java/org/elasticsearch/indices/recovery/RecoveryPrepareForTranslogOperationsRequest.java b/src/main/java/org/elasticsearch/indices/recovery/RecoveryPrepareForTranslogOperationsRequest.java index e5a131843e9..dbc4a1503c1 100644 --- a/src/main/java/org/elasticsearch/indices/recovery/RecoveryPrepareForTranslogOperationsRequest.java +++ b/src/main/java/org/elasticsearch/indices/recovery/RecoveryPrepareForTranslogOperationsRequest.java @@ -33,13 +33,15 @@ class RecoveryPrepareForTranslogOperationsRequest extends TransportRequest { private long recoveryId; private ShardId shardId; + private int totalTranslogOps = RecoveryState.Translog.UNKNOWN; RecoveryPrepareForTranslogOperationsRequest() { } - RecoveryPrepareForTranslogOperationsRequest(long recoveryId, ShardId shardId) { + RecoveryPrepareForTranslogOperationsRequest(long recoveryId, ShardId shardId, int totalTranslogOps) { this.recoveryId = recoveryId; this.shardId = shardId; + this.totalTranslogOps = totalTranslogOps; } public long recoveryId() { @@ -50,11 +52,16 @@ class RecoveryPrepareForTranslogOperationsRequest extends TransportRequest { return shardId; } + public int totalTranslogOps() { + return totalTranslogOps; + } + @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); recoveryId = in.readLong(); shardId = ShardId.readShardId(in); + totalTranslogOps = in.readVInt(); } @Override @@ -62,5 +69,6 @@ class RecoveryPrepareForTranslogOperationsRequest extends TransportRequest { super.writeTo(out); out.writeLong(recoveryId); shardId.writeTo(out); + out.writeVInt(totalTranslogOps); } } diff --git a/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index d3043335b93..39b8c49f8f3 100644 --- a/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -199,7 +199,8 @@ public class RecoverySourceHandler implements Engine.RecoveryHandler { @Override public void run() throws InterruptedException { RecoveryFilesInfoRequest recoveryInfoFilesRequest = new RecoveryFilesInfoRequest(request.recoveryId(), request.shardId(), - response.phase1FileNames, response.phase1FileSizes, response.phase1ExistingFileNames, response.phase1ExistingFileSizes); + response.phase1FileNames, response.phase1FileSizes, response.phase1ExistingFileNames, response.phase1ExistingFileSizes, + shard.translog().estimatedNumberOfOperations()); transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.FILES_INFO, recoveryInfoFilesRequest, TransportRequestOptions.options().withTimeout(recoverySettings.internalActionTimeout()), EmptyTransportResponseHandler.INSTANCE_SAME).txGet(); @@ -288,7 +289,8 @@ public class RecoverySourceHandler implements Engine.RecoveryHandler { public void run() throws InterruptedException { // Actually send the file chunk to the target node, waiting for it to complete transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.FILE_CHUNK, - new RecoveryFileChunkRequest(request.recoveryId(), request.shardId(), md, position, content, lastChunk), + new RecoveryFileChunkRequest(request.recoveryId(), request.shardId(), md, position, content, + lastChunk, shard.translog().estimatedNumberOfOperations()), requestOptions, EmptyTransportResponseHandler.INSTANCE_SAME).txGet(); } }); @@ -350,7 +352,7 @@ public class RecoverySourceHandler implements Engine.RecoveryHandler { // are deleted try { transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.CLEAN_FILES, - new RecoveryCleanFilesRequest(request.recoveryId(), shard.shardId(), recoverySourceMetadata), + new RecoveryCleanFilesRequest(request.recoveryId(), shard.shardId(), recoverySourceMetadata, shard.translog().estimatedNumberOfOperations()), TransportRequestOptions.options().withTimeout(recoverySettings.internalActionTimeout()), EmptyTransportResponseHandler.INSTANCE_SAME).txGet(); } catch (RemoteTransportException remoteException) { @@ -427,7 +429,7 @@ public class RecoverySourceHandler implements Engine.RecoveryHandler { // operations. This ensures the shard engine is started and disables // garbage collection (not the JVM's GC!) of tombstone deletes transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.PREPARE_TRANSLOG, - new RecoveryPrepareForTranslogOperationsRequest(request.recoveryId(), request.shardId()), + new RecoveryPrepareForTranslogOperationsRequest(request.recoveryId(), request.shardId(), shard.translog().estimatedNumberOfOperations()), TransportRequestOptions.options().withTimeout(recoverySettings.internalActionTimeout()), EmptyTransportResponseHandler.INSTANCE_SAME).txGet(); } }); @@ -616,7 +618,8 @@ public class RecoverySourceHandler implements Engine.RecoveryHandler { cancellableThreads.execute(new Interruptable() { @Override public void run() throws InterruptedException { - final RecoveryTranslogOperationsRequest translogOperationsRequest = new RecoveryTranslogOperationsRequest(request.recoveryId(), request.shardId(), operations); + final RecoveryTranslogOperationsRequest translogOperationsRequest = new RecoveryTranslogOperationsRequest( + request.recoveryId(), request.shardId(), operations, shard.translog().estimatedNumberOfOperations()); transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.TRANSLOG_OPS, translogOperationsRequest, recoveryOptions, EmptyTransportResponseHandler.INSTANCE_SAME).txGet(); } @@ -633,7 +636,8 @@ public class RecoverySourceHandler implements Engine.RecoveryHandler { cancellableThreads.execute(new Interruptable() { @Override public void run() throws InterruptedException { - RecoveryTranslogOperationsRequest translogOperationsRequest = new RecoveryTranslogOperationsRequest(request.recoveryId(), request.shardId(), operations); + RecoveryTranslogOperationsRequest translogOperationsRequest = new RecoveryTranslogOperationsRequest( + request.recoveryId(), request.shardId(), operations, shard.translog().estimatedNumberOfOperations()); transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.TRANSLOG_OPS, translogOperationsRequest, recoveryOptions, EmptyTransportResponseHandler.INSTANCE_SAME).txGet(); } diff --git a/src/main/java/org/elasticsearch/indices/recovery/RecoveryState.java b/src/main/java/org/elasticsearch/indices/recovery/RecoveryState.java index 4e6a747530a..36289d9ef53 100644 --- a/src/main/java/org/elasticsearch/indices/recovery/RecoveryState.java +++ b/src/main/java/org/elasticsearch/indices/recovery/RecoveryState.java @@ -39,7 +39,6 @@ import java.io.IOException; import java.util.List; import java.util.Locale; import java.util.Map; -import java.util.concurrent.atomic.AtomicInteger; /** * Keeps track of state related to shard recovery. @@ -359,6 +358,7 @@ public class RecoveryState implements ToXContent, Streamable { static final XContentBuilderString TARGET = new XContentBuilderString("target"); static final XContentBuilderString INDEX = new XContentBuilderString("index"); static final XContentBuilderString TRANSLOG = new XContentBuilderString("translog"); + static final XContentBuilderString TOTAL_ON_START = new XContentBuilderString("total_on_start"); static final XContentBuilderString START = new XContentBuilderString("start"); static final XContentBuilderString RECOVERED = new XContentBuilderString("recovered"); static final XContentBuilderString RECOVERED_IN_BYTES = new XContentBuilderString("recovered_in_bytes"); @@ -473,40 +473,90 @@ public class RecoveryState implements ToXContent, Streamable { } public static class Translog extends Timer implements ToXContent, Streamable { - private final AtomicInteger currentTranslogOperations = new AtomicInteger(); + public static final int UNKNOWN = -1; - public void reset() { + private int recovered; + private int total = UNKNOWN; + private int totalOnStart = UNKNOWN; + + public synchronized void reset() { super.reset(); - currentTranslogOperations.set(0); + recovered = 0; + total = UNKNOWN; + totalOnStart = UNKNOWN; } - public void addTranslogOperations(int count) { - this.currentTranslogOperations.addAndGet(count); + public synchronized void incrementRecoveredOperations() { + recovered++; + assert total == UNKNOWN || total >= recovered : "total, if known, should be > recovered. total [" + total + "], recovered [" + recovered + "]"; } - public void incrementTranslogOperations() { - this.currentTranslogOperations.incrementAndGet(); + /** returns the total number of translog operations recovered so far */ + public synchronized int recoveredOperations() { + return recovered; } - public int currentTranslogOperations() { - return this.currentTranslogOperations.get(); + /** + * returns the total number of translog operations needed to be recovered at this moment. + * Note that this can change as the number of operations grows during recovery. + * <p/> + * A value of -1 ({@link RecoveryState.Translog#UNKNOWN} is return if this is unknown (typically a gateway recovery) + */ + public synchronized int totalOperations() { + return total; + } + + public synchronized void totalOperations(int total) { + this.total = total; + assert total == UNKNOWN || total >= recovered : "total, if known, should be > recovered. total [" + total + "], recovered [" + recovered + "]"; + } + + /** + * returns the total number of translog operations to recovered, on the start of the recovery. Unlike {@link #totalOperations} + * this does change during recovery. + * <p/> + * A value of -1 ({@link RecoveryState.Translog#UNKNOWN} is return if this is unknown (typically a gateway recovery) + */ + public synchronized int totalOperationsOnStart() { + return this.totalOnStart; + } + + public synchronized void totalOperationsOnStart(int total) { + this.totalOnStart = total; + } + + public synchronized float recoveredPercent() { + if (total == UNKNOWN) { + return -1.f; + } + if (total == 0) { + return 100.f; + } + return recovered * 100.0f / total; } @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); - currentTranslogOperations.set(in.readVInt()); + recovered = in.readVInt(); + total = in.readVInt(); + totalOnStart = in.readVInt(); } @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); - out.writeVInt(currentTranslogOperations.get()); + out.writeVInt(recovered); + out.writeVInt(total); + out.writeVInt(totalOnStart); } @Override - public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - builder.field(Fields.RECOVERED, currentTranslogOperations.get()); + public synchronized XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.field(Fields.RECOVERED, recovered); + builder.field(Fields.TOTAL, total); + builder.field(Fields.PERCENT, String.format(Locale.ROOT, "%1.1f%%", recoveredPercent())); + builder.field(Fields.TOTAL_ON_START, totalOnStart); builder.timeValueField(Fields.TOTAL_TIME_IN_MILLIS, Fields.TOTAL_TIME, time()); return builder; } diff --git a/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java b/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java index 3b72a3059d2..437560c16d8 100644 --- a/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java +++ b/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java @@ -277,6 +277,7 @@ public class RecoveryTarget extends AbstractComponent { public void messageReceived(RecoveryPrepareForTranslogOperationsRequest request, TransportChannel channel) throws Exception { try (RecoveriesCollection.StatusRef statusRef = onGoingRecoveries.getStatusSafe(request.recoveryId(), request.shardId())) { final RecoveryStatus recoveryStatus = statusRef.status(); + recoveryStatus.state().getTranslog().totalOperations(request.totalTranslogOps()); recoveryStatus.indexShard().prepareForTranslogRecovery(); } channel.sendResponse(TransportResponse.Empty.INSTANCE); @@ -322,9 +323,11 @@ public class RecoveryTarget extends AbstractComponent { public void messageReceived(RecoveryTranslogOperationsRequest request, TransportChannel channel) throws Exception { try (RecoveriesCollection.StatusRef statusRef = onGoingRecoveries.getStatusSafe(request.recoveryId(), request.shardId())) { final RecoveryStatus recoveryStatus = statusRef.status(); + final RecoveryState.Translog translog = recoveryStatus.state().getTranslog(); + translog.totalOperations(request.totalTranslogOps()); for (Translog.Operation operation : request.operations()) { recoveryStatus.indexShard().performRecoveryOperation(operation); - recoveryStatus.state().getTranslog().incrementTranslogOperations(); + translog.incrementRecoveredOperations(); } } channel.sendResponse(TransportResponse.Empty.INSTANCE); @@ -355,6 +358,8 @@ public class RecoveryTarget extends AbstractComponent { for (int i = 0; i < request.phase1FileNames.size(); i++) { index.addFileDetail(request.phase1FileNames.get(i), request.phase1FileSizes.get(i), false); } + recoveryStatus.state().getTranslog().totalOperations(request.totalTranslogOps); + recoveryStatus.state().getTranslog().totalOperationsOnStart(request.totalTranslogOps); // recoveryBytesCount / recoveryFileCount will be set as we go... channel.sendResponse(TransportResponse.Empty.INSTANCE); } @@ -377,6 +382,7 @@ public class RecoveryTarget extends AbstractComponent { public void messageReceived(RecoveryCleanFilesRequest request, TransportChannel channel) throws Exception { try (RecoveriesCollection.StatusRef statusRef = onGoingRecoveries.getStatusSafe(request.recoveryId(), request.shardId())) { final RecoveryStatus recoveryStatus = statusRef.status(); + recoveryStatus.state().getTranslog().totalOperations(request.totalTranslogOps()); // first, we go and move files that were created with the recovery id suffix to // the actual names, its ok if we have a corrupted index here, since we have replicas // to recover from in case of a full cluster shutdown just when this code executes... @@ -425,6 +431,7 @@ public class RecoveryTarget extends AbstractComponent { try (RecoveriesCollection.StatusRef statusRef = onGoingRecoveries.getStatusSafe(request.recoveryId(), request.shardId())) { final RecoveryStatus recoveryStatus = statusRef.status(); final Store store = recoveryStatus.store(); + recoveryStatus.state().getTranslog().totalOperations(request.totalTranslogOps()); IndexOutput indexOutput; if (request.position() == 0) { indexOutput = recoveryStatus.openAndPutIndexOutput(request.name(), request.metadata(), store); diff --git a/src/main/java/org/elasticsearch/indices/recovery/RecoveryTranslogOperationsRequest.java b/src/main/java/org/elasticsearch/indices/recovery/RecoveryTranslogOperationsRequest.java index f5729fce9db..30c693d827e 100644 --- a/src/main/java/org/elasticsearch/indices/recovery/RecoveryTranslogOperationsRequest.java +++ b/src/main/java/org/elasticsearch/indices/recovery/RecoveryTranslogOperationsRequest.java @@ -20,12 +20,11 @@ package org.elasticsearch.indices.recovery; import com.google.common.collect.Lists; -import org.elasticsearch.Version; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.index.translog.TranslogStreams; import org.elasticsearch.index.translog.Translog; +import org.elasticsearch.index.translog.TranslogStreams; import org.elasticsearch.transport.TransportRequest; import java.io.IOException; @@ -39,14 +38,16 @@ class RecoveryTranslogOperationsRequest extends TransportRequest { private long recoveryId; private ShardId shardId; private List<Translog.Operation> operations; + private int totalTranslogOps = RecoveryState.Translog.UNKNOWN; RecoveryTranslogOperationsRequest() { } - RecoveryTranslogOperationsRequest(long recoveryId, ShardId shardId, List<Translog.Operation> operations) { + RecoveryTranslogOperationsRequest(long recoveryId, ShardId shardId, List<Translog.Operation> operations, int totalTranslogOps) { this.recoveryId = recoveryId; this.shardId = shardId; this.operations = operations; + this.totalTranslogOps = totalTranslogOps; } public long recoveryId() { @@ -61,6 +62,10 @@ class RecoveryTranslogOperationsRequest extends TransportRequest { return operations; } + public int totalTranslogOps() { + return totalTranslogOps; + } + @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); @@ -71,6 +76,7 @@ class RecoveryTranslogOperationsRequest extends TransportRequest { for (int i = 0; i < size; i++) { operations.add(TranslogStreams.CHECKSUMMED_TRANSLOG_STREAM.read(in)); } + totalTranslogOps = in.readVInt(); } @Override @@ -82,5 +88,6 @@ class RecoveryTranslogOperationsRequest extends TransportRequest { for (Translog.Operation operation : operations) { TranslogStreams.CHECKSUMMED_TRANSLOG_STREAM.write(out, operation); } + out.writeVInt(totalTranslogOps); } } diff --git a/src/main/java/org/elasticsearch/rest/action/cat/RestRecoveryAction.java b/src/main/java/org/elasticsearch/rest/action/cat/RestRecoveryAction.java index 87053d72f4a..a7a7eb53e85 100644 --- a/src/main/java/org/elasticsearch/rest/action/cat/RestRecoveryAction.java +++ b/src/main/java/org/elasticsearch/rest/action/cat/RestRecoveryAction.java @@ -98,6 +98,9 @@ public class RestRecoveryAction extends AbstractCatAction { .addCell("bytes_percent", "alias:bp;desc:percent of bytes recovered") .addCell("total_files", "alias:tf;desc:total number of files") .addCell("total_bytes", "alias:tb;desc:total number of bytes") + .addCell("translog", "alias:tr;desc:translog operations recovered") + .addCell("translog_percent", "alias:trp;desc:percent of translog recovery") + .addCell("total_translog", "alias:trt;desc:current total translog operations") .endHeaders(); return t; } @@ -156,6 +159,9 @@ public class RestRecoveryAction extends AbstractCatAction { t.addCell(String.format(Locale.ROOT, "%1.1f%%", state.getIndex().recoveredBytesPercent())); t.addCell(state.getIndex().totalFileCount()); t.addCell(state.getIndex().totalBytes()); + t.addCell(state.getTranslog().recoveredOperations()); + t.addCell(String.format(Locale.ROOT, "%1.1f%%", state.getTranslog().recoveredPercent())); + t.addCell(state.getTranslog().totalOperations()); t.endRow(); } } diff --git a/src/test/java/org/elasticsearch/benchmark/recovery/ReplicaRecoveryBenchmark.java b/src/test/java/org/elasticsearch/benchmark/recovery/ReplicaRecoveryBenchmark.java index c6a99b040fa..85383dd46b5 100644 --- a/src/test/java/org/elasticsearch/benchmark/recovery/ReplicaRecoveryBenchmark.java +++ b/src/test/java/org/elasticsearch/benchmark/recovery/ReplicaRecoveryBenchmark.java @@ -132,7 +132,7 @@ public class ReplicaRecoveryBenchmark { long translogOps; long bytes; if (indexRecoveries.size() > 0) { - translogOps = indexRecoveries.get(0).recoveryState().getTranslog().currentTranslogOperations(); + translogOps = indexRecoveries.get(0).recoveryState().getTranslog().recoveredOperations(); bytes = recoveryResponse.shardResponses().get(INDEX_NAME).get(0).recoveryState().getIndex().recoveredBytes(); } else { bytes = lastBytes = 0; diff --git a/src/test/java/org/elasticsearch/index/store/CorruptedFileTest.java b/src/test/java/org/elasticsearch/index/store/CorruptedFileTest.java index ed2463d69de..a7365c4b59d 100644 --- a/src/test/java/org/elasticsearch/index/store/CorruptedFileTest.java +++ b/src/test/java/org/elasticsearch/index/store/CorruptedFileTest.java @@ -20,7 +20,6 @@ package org.elasticsearch.index.store; import com.carrotsearch.ant.tasks.junit4.dependencies.com.google.common.collect.Lists; import com.carrotsearch.randomizedtesting.LifecycleScope; -import com.carrotsearch.randomizedtesting.annotations.Repeat; import com.carrotsearch.randomizedtesting.generators.RandomPicks; import com.google.common.base.Charsets; import com.google.common.base.Predicate; @@ -51,7 +50,6 @@ import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.discovery.Discovery; -import org.elasticsearch.index.engine.EngineConfig; import org.elasticsearch.index.merge.policy.MergePolicyModule; import org.elasticsearch.index.settings.IndexSettings; import org.elasticsearch.index.shard.IndexShard; @@ -406,7 +404,7 @@ public class CorruptedFileTest extends ElasticsearchIntegrationTest { RecoveryFileChunkRequest req = (RecoveryFileChunkRequest) request; if (truncate && req.length() > 1) { BytesArray array = new BytesArray(req.content().array(), req.content().arrayOffset(), (int) req.length() - 1); - request = new RecoveryFileChunkRequest(req.recoveryId(), req.shardId(), req.metadata(), req.position(), array, req.lastChunk()); + request = new RecoveryFileChunkRequest(req.recoveryId(), req.shardId(), req.metadata(), req.position(), array, req.lastChunk(), req.totalTranslogOps()); } else { byte[] array = req.content().array(); int i = randomIntBetween(0, req.content().length() - 1); diff --git a/src/test/java/org/elasticsearch/indices/recovery/RecoveryStateTest.java b/src/test/java/org/elasticsearch/indices/recovery/RecoveryStateTest.java index 12bf29aecf7..5d96a4d8f2e 100644 --- a/src/test/java/org/elasticsearch/indices/recovery/RecoveryStateTest.java +++ b/src/test/java/org/elasticsearch/indices/recovery/RecoveryStateTest.java @@ -343,19 +343,33 @@ public class RecoveryStateTest extends ElasticsearchTestCase { // we don't need to test the time aspect, it's done in the timer test translog.start(); - assertThat(translog.currentTranslogOperations(), equalTo(0)); + assertThat(translog.recoveredOperations(), equalTo(0)); + assertThat(translog.totalOperations(), equalTo(Translog.UNKNOWN)); + assertThat(translog.totalOperationsOnStart(), equalTo(Translog.UNKNOWN)); streamer.start(); // force one streamer.serializeDeserialize(); int ops = 0; + int totalOps = 0; + int totalOpsOnStart = randomIntBetween(10, 200); + translog.totalOperationsOnStart(totalOpsOnStart); for (int i = scaledRandomIntBetween(10, 200); i > 0; i--) { - for (int j = randomIntBetween(1, 10); j > 0; j--) { + final int iterationOps = randomIntBetween(1, 10); + totalOps += iterationOps; + translog.totalOperations(totalOps); + assertThat((double) translog.recoveredPercent(), closeTo(100.0 * ops / totalOps, 0.1)); + for (int j = iterationOps; j > 0; j--) { ops++; - translog.incrementTranslogOperations(); + translog.incrementRecoveredOperations(); } - assertThat(translog.currentTranslogOperations(), equalTo(ops)); - assertThat(streamer.lastRead().currentTranslogOperations(), greaterThanOrEqualTo(0)); - assertThat(streamer.lastRead().currentTranslogOperations(), lessThanOrEqualTo(ops)); + assertThat(translog.recoveredOperations(), equalTo(ops)); + assertThat(translog.totalOperations(), equalTo(totalOps)); + assertThat(translog.recoveredPercent(), equalTo(100.f)); + assertThat(streamer.lastRead().recoveredOperations(), greaterThanOrEqualTo(0)); + assertThat(streamer.lastRead().recoveredOperations(), lessThanOrEqualTo(ops)); + assertThat(streamer.lastRead().totalOperations(), lessThanOrEqualTo(totalOps)); + assertThat(streamer.lastRead().totalOperationsOnStart(), lessThanOrEqualTo(totalOpsOnStart)); + assertThat(streamer.lastRead().recoveredPercent(), either(greaterThanOrEqualTo(0.f)).or(equalTo(-1.f))); } boolean stopped = false; @@ -367,13 +381,19 @@ public class RecoveryStateTest extends ElasticsearchTestCase { if (randomBoolean()) { translog.reset(); ops = 0; - assertThat(translog.currentTranslogOperations(), equalTo(0)); + totalOps = Translog.UNKNOWN; + totalOpsOnStart = Translog.UNKNOWN; + assertThat(translog.recoveredOperations(), equalTo(0)); + assertThat(translog.totalOperationsOnStart(), equalTo(Translog.UNKNOWN)); + assertThat(translog.totalOperations(), equalTo(Translog.UNKNOWN)); } stop.set(true); streamer.join(); final Translog lastRead = streamer.lastRead(); - assertThat(lastRead.currentTranslogOperations(), equalTo(ops)); + assertThat(lastRead.recoveredOperations(), equalTo(ops)); + assertThat(lastRead.totalOperations(), equalTo(totalOps)); + assertThat(lastRead.totalOperationsOnStart(), equalTo(totalOpsOnStart)); assertThat(lastRead.startTime(), equalTo(translog.startTime())); assertThat(lastRead.stopTime(), equalTo(translog.stopTime()));