add snapshot status (either current one or the latest one executed) to indices status api

This commit is contained in:
kimchy 2010-08-18 17:11:29 +03:00
parent 91aada2706
commit a3107bc5b1
7 changed files with 243 additions and 37 deletions

View File

@ -0,0 +1,124 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.indices.status;
import org.elasticsearch.ElasticSearchIllegalArgumentException;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
/**
* @author kimchy (shay.banon)
*/
public class GatewaySnapshotStatus {
public static enum Stage {
NONE((byte) 0),
INDEX((byte) 1),
TRANSLOG((byte) 2),
FINALIZE((byte) 3),
DONE((byte) 4),
FAILURE((byte) 5);
private final byte value;
Stage(byte value) {
this.value = value;
}
public byte value() {
return this.value;
}
public static Stage fromValue(byte value) {
if (value == 0) {
return Stage.NONE;
} else if (value == 1) {
return Stage.INDEX;
} else if (value == 2) {
return Stage.TRANSLOG;
} else if (value == 3) {
return Stage.FINALIZE;
} else if (value == 4) {
return Stage.DONE;
} else if (value == 5) {
return Stage.FAILURE;
}
throw new ElasticSearchIllegalArgumentException("No stage found for [" + value + "]");
}
}
final Stage stage;
final long startTime;
final long time;
final long indexSize;
final long translogOperations;
public GatewaySnapshotStatus(Stage stage, long startTime, long time, long indexSize, long translogOperations) {
this.stage = stage;
this.startTime = startTime;
this.time = time;
this.indexSize = indexSize;
this.translogOperations = translogOperations;
}
public Stage stage() {
return this.stage;
}
public Stage getStage() {
return stage();
}
public long startTime() {
return this.startTime;
}
public long getStartTime() {
return startTime();
}
public TimeValue time() {
return TimeValue.timeValueMillis(time);
}
public TimeValue getTime() {
return time();
}
public ByteSizeValue indexSize() {
return new ByteSizeValue(indexSize);
}
public ByteSizeValue getIndexSize() {
return indexSize();
}
public long translogOperations() {
return this.translogOperations;
}
public long getTranslogOperations() {
return translogOperations();
}
}

View File

@ -84,6 +84,8 @@ public class ShardStatus extends BroadcastShardOperationResponse {
GatewayRecoveryStatus gatewayRecoveryStatus;
GatewaySnapshotStatus gatewaySnapshotStatus;
ShardStatus() {
}
@ -156,6 +158,14 @@ public class ShardStatus extends BroadcastShardOperationResponse {
return gatewayRecoveryStatus();
}
public GatewaySnapshotStatus gatewaySnapshotStatus() {
return gatewaySnapshotStatus;
}
public GatewaySnapshotStatus getGatewaySnapshotStatus() {
return gatewaySnapshotStatus();
}
public static ShardStatus readIndexShardStatus(StreamInput in) throws IOException {
ShardStatus shardStatus = new ShardStatus();
shardStatus.readFrom(in);
@ -210,6 +220,17 @@ public class ShardStatus extends BroadcastShardOperationResponse {
out.writeVLong(gatewayRecoveryStatus.recoveredIndexSize);
out.writeVLong(gatewayRecoveryStatus.recoveredTranslogOperations);
}
if (gatewaySnapshotStatus == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
out.writeByte(gatewaySnapshotStatus.stage.value());
out.writeVLong(gatewaySnapshotStatus.startTime);
out.writeVLong(gatewaySnapshotStatus.time);
out.writeVLong(gatewaySnapshotStatus.indexSize);
out.writeVLong(gatewaySnapshotStatus.translogOperations);
}
}
@Override public void readFrom(StreamInput in) throws IOException {
@ -236,5 +257,10 @@ public class ShardStatus extends BroadcastShardOperationResponse {
gatewayRecoveryStatus = new GatewayRecoveryStatus(GatewayRecoveryStatus.Stage.fromValue(in.readByte()),
in.readVLong(), in.readVLong(), in.readVLong(), in.readVLong(), in.readVLong(), in.readVLong(), in.readVLong(), in.readVLong());
}
if (in.readBoolean()) {
gatewaySnapshotStatus = new GatewaySnapshotStatus(GatewaySnapshotStatus.Stage.fromValue(in.readByte()),
in.readVLong(), in.readVLong(), in.readVLong(), in.readVLong());
}
}
}

View File

@ -35,6 +35,7 @@ import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.gateway.IndexShardGatewayService;
import org.elasticsearch.index.gateway.SnapshotStatus;
import org.elasticsearch.index.service.InternalIndexService;
import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.shard.recovery.RecoveryStatus;
@ -220,6 +221,33 @@ public class TransportIndicesStatusAction extends TransportBroadcastOperationAct
gatewayRecoveryStatus.index().retryTime(), gatewayRecoveryStatus.index().totalSize(), gatewayRecoveryStatus.index().existingTotalSize(), gatewayRecoveryStatus.index().currentFilesSize(), gatewayRecoveryStatus.translog().currentTranslogOperations());
}
SnapshotStatus snapshotStatus = gatewayService.snapshotStatus();
if (snapshotStatus != null) {
GatewaySnapshotStatus.Stage stage;
switch (snapshotStatus.stage()) {
case DONE:
stage = GatewaySnapshotStatus.Stage.DONE;
break;
case FAILURE:
stage = GatewaySnapshotStatus.Stage.FAILURE;
break;
case TRANSLOG:
stage = GatewaySnapshotStatus.Stage.TRANSLOG;
break;
case FINALIZE:
stage = GatewaySnapshotStatus.Stage.FINALIZE;
break;
case INDEX:
stage = GatewaySnapshotStatus.Stage.INDEX;
break;
default:
stage = GatewaySnapshotStatus.Stage.NONE;
break;
}
shardStatus.gatewaySnapshotStatus = new GatewaySnapshotStatus(stage, snapshotStatus.startTime(), snapshotStatus.time(),
snapshotStatus.index().totalSize(), snapshotStatus.translog().currentTranslogOperations());
}
return shardStatus;
}

View File

@ -115,6 +115,14 @@ public class IndexShardGatewayService extends AbstractIndexShardComponent implem
return recoveryStatus;
}
public SnapshotStatus snapshotStatus() {
SnapshotStatus snapshotStatus = shardGateway.currentSnapshotStatus();
if (snapshotStatus != null) {
return snapshotStatus;
}
return shardGateway.lastSnapshotStatus();
}
/**
* Recovers the state of the shard from the gateway.
*/
@ -241,9 +249,9 @@ public class IndexShardGatewayService extends AbstractIndexShardComponent implem
if (snapshotStatus != null) {
if (logger.isDebugEnabled()) {
StringBuilder sb = new StringBuilder();
sb.append("snapshot (").append(reason).append(") completed to ").append(shardGateway).append(", took [").append(snapshotStatus.took()).append("]\n");
sb.append(" index : version [").append(lastIndexVersion).append("], number_of_files [").append(snapshotStatus.index().numberOfFiles()).append("] with total_size [").append(snapshotStatus.index().totalSize()).append("], took [").append(snapshotStatus.index().took()).append("]\n");
sb.append(" translog : id [").append(lastTranslogId).append("], number_of_operations [").append(snapshotStatus.translog().currentTranslogOperations()).append("], took [").append(snapshotStatus.translog().took()).append("]");
sb.append("snapshot (").append(reason).append(") completed to ").append(shardGateway).append(", took [").append(TimeValue.timeValueMillis(snapshotStatus.time())).append("]\n");
sb.append(" index : version [").append(lastIndexVersion).append("], number_of_files [").append(snapshotStatus.index().numberOfFiles()).append("] with total_size [").append(new ByteSizeValue(snapshotStatus.index().totalSize())).append("], took [").append(TimeValue.timeValueMillis(snapshotStatus.index().time())).append("]\n");
sb.append(" translog : id [").append(lastTranslogId).append("], number_of_operations [").append(snapshotStatus.translog().currentTranslogOperations()).append("], took [").append(TimeValue.timeValueMillis(snapshotStatus.translog().time())).append("]");
logger.debug(sb.toString());
}
}

View File

@ -19,9 +19,6 @@
package org.elasticsearch.index.gateway;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
/**
* @author kimchy (shay.banon)
*/
@ -40,7 +37,7 @@ public class SnapshotStatus {
private long startTime;
private long took;
private long time;
private Index index = new Index();
@ -65,12 +62,12 @@ public class SnapshotStatus {
this.startTime = startTime;
}
public TimeValue took() {
return new TimeValue(this.took);
public long time() {
return this.time;
}
public void took(long took) {
this.took = took;
public void time(long time) {
this.time = time;
}
public void failed(Throwable failure) {
@ -87,10 +84,10 @@ public class SnapshotStatus {
public static class Index {
private long startTime;
private long took;
private long time;
private int numberOfFiles;
private long totalSize = -1;
private long totalSize;
public long startTime() {
return this.startTime;
@ -100,12 +97,12 @@ public class SnapshotStatus {
this.startTime = startTime;
}
public TimeValue took() {
return new TimeValue(this.took);
public long time() {
return this.time;
}
public void took(long took) {
this.took = took;
public void time(long time) {
this.time = time;
}
public void files(int numberOfFiles, long totalSize) {
@ -117,16 +114,16 @@ public class SnapshotStatus {
return numberOfFiles;
}
public ByteSizeValue totalSize() {
return new ByteSizeValue(totalSize);
public long totalSize() {
return totalSize;
}
}
public static class Translog {
private volatile int currentTranslogOperations;
private long startTime = -1;
private long took;
private long startTime;
private long time;
public long startTime() {
return this.startTime;
@ -136,12 +133,12 @@ public class SnapshotStatus {
this.startTime = startTime;
}
public TimeValue took() {
return new TimeValue(this.took);
public long time() {
return this.time;
}
public void took(long took) {
this.took = took;
public void time(long time) {
this.time = time;
}
public void addTranslogOperations(long count) {

View File

@ -151,7 +151,14 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo
}
@Override public SnapshotStatus currentSnapshotStatus() {
return this.currentSnapshotStatus;
SnapshotStatus snapshotStatus = this.currentSnapshotStatus;
if (snapshotStatus == null) {
return snapshotStatus;
}
if (snapshotStatus.stage() != SnapshotStatus.Stage.DONE || snapshotStatus.stage() != SnapshotStatus.Stage.FAILURE) {
snapshotStatus.time(System.currentTimeMillis() - snapshotStatus.startTime());
}
return snapshotStatus;
}
@Override public SnapshotStatus snapshot(final Snapshot snapshot) throws IndexShardGatewaySnapshotFailedException {
@ -160,10 +167,10 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo
try {
doSnapshot(snapshot);
currentSnapshotStatus.took(System.currentTimeMillis() - currentSnapshotStatus.startTime());
currentSnapshotStatus.time(System.currentTimeMillis() - currentSnapshotStatus.startTime());
currentSnapshotStatus.updateStage(SnapshotStatus.Stage.DONE);
} catch (Exception e) {
currentSnapshotStatus.took(System.currentTimeMillis() - currentSnapshotStatus.startTime());
currentSnapshotStatus.time(System.currentTimeMillis() - currentSnapshotStatus.startTime());
currentSnapshotStatus.updateStage(SnapshotStatus.Stage.FAILURE);
currentSnapshotStatus.failed(e);
if (e instanceof IndexShardGatewaySnapshotFailedException) {
@ -192,7 +199,6 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo
int indexNumberOfFiles = 0;
long indexTotalFilesSize = 0;
long indexTime = 0;
if (snapshot.indexChanged()) {
long time = System.currentTimeMillis();
indexDirty = true;
@ -249,7 +255,7 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo
}
}
currentSnapshotStatus.index().files(indexNumberOfFiles, indexTotalFilesSize);
currentSnapshotStatus.index().files(indexNumberOfFiles + 1 /* for the segment */, indexTotalFilesSize);
try {
latch.await();
@ -259,10 +265,9 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo
if (!failures.isEmpty()) {
throw new IndexShardGatewaySnapshotFailedException(shardId(), "Failed to perform snapshot (index files)", failures.get(failures.size() - 1));
}
indexTime = System.currentTimeMillis() - time;
}
currentSnapshotStatus.index().took(System.currentTimeMillis() - currentSnapshotStatus.index().startTime());
currentSnapshotStatus.index().time(System.currentTimeMillis() - currentSnapshotStatus.index().startTime());
currentSnapshotStatus.updateStage(SnapshotStatus.Stage.TRANSLOG);
currentSnapshotStatus.translog().startTime(System.currentTimeMillis());
@ -326,12 +331,11 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo
}
}
currentSnapshotStatus.translog().took(System.currentTimeMillis() - currentSnapshotStatus.translog().startTime());
currentSnapshotStatus.translog().time(System.currentTimeMillis() - currentSnapshotStatus.translog().startTime());
// now write the segments file
if (indexDirty) {
try {
indexNumberOfFiles++;
if (indicesBlobs.containsKey(snapshotIndexCommit.getSegmentsFileName())) {
cachedMd5.remove(snapshotIndexCommit.getSegmentsFileName());
indexContainer.deleteBlob(snapshotIndexCommit.getSegmentsFileName());
@ -348,7 +352,6 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo
if (!failures.isEmpty()) {
throw new IndexShardGatewaySnapshotFailedException(shardId(), "Failed to perform snapshot (segment index file)", failures.get(failures.size() - 1));
}
indexTime += (System.currentTimeMillis() - time);
} catch (Exception e) {
if (e instanceof IndexShardGatewaySnapshotFailedException) {
throw (IndexShardGatewaySnapshotFailedException) e;

View File

@ -144,7 +144,7 @@ public class RestIndicesStatusAction extends BaseRestHandler {
builder.field("stage", peerRecoveryStatus.stage());
builder.field("start_time_in_millis", peerRecoveryStatus.startTime());
builder.field("time", peerRecoveryStatus.time());
builder.field("took_in_millis", peerRecoveryStatus.time().millis());
builder.field("time_in_millis", peerRecoveryStatus.time().millis());
builder.field("throttling_time", peerRecoveryStatus.throttlingTime());
builder.field("throttling_time_in_millis", peerRecoveryStatus.throttlingTime().millis());
@ -172,7 +172,7 @@ public class RestIndicesStatusAction extends BaseRestHandler {
builder.field("stage", gatewayRecoveryStatus.stage());
builder.field("start_time_in_millis", gatewayRecoveryStatus.startTime());
builder.field("time", gatewayRecoveryStatus.time());
builder.field("took_in_millis", gatewayRecoveryStatus.time().millis());
builder.field("time_in_millis", gatewayRecoveryStatus.time().millis());
builder.field("throttling_time", gatewayRecoveryStatus.throttlingTime());
builder.field("throttling_time_in_millis", gatewayRecoveryStatus.throttlingTime().millis());
@ -196,6 +196,26 @@ public class RestIndicesStatusAction extends BaseRestHandler {
builder.endObject();
}
if (shardStatus.gatewaySnapshotStatus() != null) {
GatewaySnapshotStatus gatewaySnapshotStatus = shardStatus.gatewaySnapshotStatus();
builder.startObject("gateway_snapshot");
builder.field("stage", gatewaySnapshotStatus.stage());
builder.field("start_time_in_millis", gatewaySnapshotStatus.startTime());
builder.field("time", gatewaySnapshotStatus.time());
builder.field("time_in_millis", gatewaySnapshotStatus.time().millis());
builder.startObject("index");
builder.field("size", gatewaySnapshotStatus.indexSize());
builder.field("size_in_bytes", gatewaySnapshotStatus.indexSize().bytes());
builder.endObject();
builder.startObject("translog");
builder.field("operations", gatewaySnapshotStatus.translogOperations());
builder.endObject();
builder.endObject();
}
builder.endObject();
}
builder.endArray();