better logging of recovery
This commit is contained in:
parent
a5838dc403
commit
3698bcc2b9
|
@ -197,6 +197,7 @@ public class IndexShardGatewayService extends AbstractIndexShardComponent implem
|
|||
sb.append(" index : files [").append(recoveryStatus.index().numberOfFiles()).append("] with total_size [").append(new ByteSizeValue(recoveryStatus.index().totalSize())).append("], took[").append(TimeValue.timeValueMillis(recoveryStatus.index().time())).append("]\n");
|
||||
sb.append(" : recovered_files [").append(recoveryStatus.index().numberOfRecoveredFiles()).append("] with total_size [").append(new ByteSizeValue(recoveryStatus.index().recoveredTotalSize())).append("]\n");
|
||||
sb.append(" : reusing_files [").append(recoveryStatus.index().numberOfReusedFiles()).append("] with total_size [").append(new ByteSizeValue(recoveryStatus.index().reusedTotalSize())).append("]\n");
|
||||
sb.append(" start : took [").append(TimeValue.timeValueMillis(recoveryStatus.start().time())).append("], check_index [").append(timeValueMillis(recoveryStatus.start().checkIndexTime())).append("]\n");
|
||||
sb.append(" translog : number_of_operations [").append(recoveryStatus.translog().currentTranslogOperations()).append("], took [").append(TimeValue.timeValueMillis(recoveryStatus.translog().time())).append("]");
|
||||
logger.debug(sb.toString());
|
||||
}
|
||||
|
|
|
@ -29,6 +29,7 @@ public class RecoveryStatus {
|
|||
public static enum Stage {
|
||||
INIT,
|
||||
INDEX,
|
||||
START,
|
||||
TRANSLOG,
|
||||
DONE
|
||||
}
|
||||
|
@ -43,6 +44,8 @@ public class RecoveryStatus {
|
|||
|
||||
private Translog translog = new Translog();
|
||||
|
||||
private Start start = new Start();
|
||||
|
||||
public Stage stage() {
|
||||
return this.stage;
|
||||
}
|
||||
|
@ -72,10 +75,44 @@ public class RecoveryStatus {
|
|||
return index;
|
||||
}
|
||||
|
||||
public Start start() {
|
||||
return this.start;
|
||||
}
|
||||
|
||||
public Translog translog() {
|
||||
return translog;
|
||||
}
|
||||
|
||||
public static class Start {
|
||||
private long startTime;
|
||||
private long time;
|
||||
private long checkIndexTime;
|
||||
|
||||
public long startTime() {
|
||||
return this.startTime;
|
||||
}
|
||||
|
||||
public void startTime(long startTime) {
|
||||
this.startTime = startTime;
|
||||
}
|
||||
|
||||
public long time() {
|
||||
return this.time;
|
||||
}
|
||||
|
||||
public void time(long time) {
|
||||
this.time = time;
|
||||
}
|
||||
|
||||
public long checkIndexTime() {
|
||||
return checkIndexTime;
|
||||
}
|
||||
|
||||
public void checkIndexTime(long checkIndexTime) {
|
||||
this.checkIndexTime = checkIndexTime;
|
||||
}
|
||||
}
|
||||
|
||||
public static class Translog {
|
||||
private long startTime = 0;
|
||||
private long time;
|
||||
|
|
|
@ -405,8 +405,6 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo
|
|||
}
|
||||
recoveryStatus.index().startTime(System.currentTimeMillis());
|
||||
recoveryStatus.index().time(System.currentTimeMillis() - recoveryStatus.index().startTime());
|
||||
recoveryStatus.translog().startTime(System.currentTimeMillis());
|
||||
recoveryStatus.translog().time(System.currentTimeMillis() - recoveryStatus.index().startTime());
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -417,14 +415,10 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo
|
|||
}
|
||||
try {
|
||||
recoveryStatus.index().startTime(System.currentTimeMillis());
|
||||
recoveryStatus.updateStage(RecoveryStatus.Stage.INDEX);
|
||||
recoverIndex(commitPoint, blobs);
|
||||
recoveryStatus.index().time(System.currentTimeMillis() - recoveryStatus.index().startTime());
|
||||
|
||||
recoveryStatus.translog().startTime(System.currentTimeMillis());
|
||||
recoveryStatus.updateStage(RecoveryStatus.Stage.TRANSLOG);
|
||||
recoverTranslog(commitPoint, blobs);
|
||||
recoveryStatus.translog().time(System.currentTimeMillis() - recoveryStatus.index().startTime());
|
||||
return;
|
||||
} catch (Exception e) {
|
||||
throw new IndexShardGatewayRecoveryException(shardId, "failed to recover commit_point [" + commitPoint.name() + "]/[" + commitPoint.version() + "]", e);
|
||||
|
@ -436,12 +430,23 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo
|
|||
private void recoverTranslog(CommitPoint commitPoint, ImmutableMap<String, BlobMetaData> blobs) throws IndexShardGatewayRecoveryException {
|
||||
if (commitPoint.translogFiles().isEmpty()) {
|
||||
// no translog files, bail
|
||||
recoveryStatus.start().startTime(System.currentTimeMillis());
|
||||
recoveryStatus.updateStage(RecoveryStatus.Stage.START);
|
||||
indexShard.start("post recovery from gateway, no translog");
|
||||
recoveryStatus.start().time(System.currentTimeMillis() - recoveryStatus.start().startTime());
|
||||
recoveryStatus.start().checkIndexTime(indexShard.checkIndexTook());
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
recoveryStatus.start().startTime(System.currentTimeMillis());
|
||||
recoveryStatus.updateStage(RecoveryStatus.Stage.START);
|
||||
indexShard.performRecoveryPrepareForTranslog();
|
||||
recoveryStatus.start().time(System.currentTimeMillis() - recoveryStatus.start().startTime());
|
||||
recoveryStatus.start().checkIndexTime(indexShard.checkIndexTook());
|
||||
|
||||
recoveryStatus.updateStage(RecoveryStatus.Stage.TRANSLOG);
|
||||
recoveryStatus.translog().startTime(System.currentTimeMillis());
|
||||
|
||||
final AtomicReference<Throwable> failure = new AtomicReference<Throwable>();
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
|
@ -529,12 +534,14 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo
|
|||
}
|
||||
|
||||
indexShard.performRecoveryFinalization(true);
|
||||
recoveryStatus.translog().time(System.currentTimeMillis() - recoveryStatus.translog().startTime());
|
||||
} catch (Throwable e) {
|
||||
throw new IndexShardGatewayRecoveryException(shardId, "Failed to recover translog", e);
|
||||
}
|
||||
}
|
||||
|
||||
private void recoverIndex(CommitPoint commitPoint, ImmutableMap<String, BlobMetaData> blobs) throws Exception {
|
||||
recoveryStatus.updateStage(RecoveryStatus.Stage.INDEX);
|
||||
int numberOfFiles = 0;
|
||||
long totalSize = 0;
|
||||
int numberOfReusedFiles = 0;
|
||||
|
|
|
@ -91,6 +91,7 @@ public class LocalIndexShardGateway extends AbstractIndexShardComponent implemen
|
|||
@Override
|
||||
public void recover(boolean indexShouldExists, RecoveryStatus recoveryStatus) throws IndexShardGatewayRecoveryException {
|
||||
recoveryStatus.index().startTime(System.currentTimeMillis());
|
||||
recoveryStatus.updateStage(RecoveryStatus.Stage.INDEX);
|
||||
long version = -1;
|
||||
long translogId = -1;
|
||||
try {
|
||||
|
@ -124,12 +125,14 @@ public class LocalIndexShardGateway extends AbstractIndexShardComponent implemen
|
|||
// ignore
|
||||
}
|
||||
|
||||
recoveryStatus.translog().startTime(System.currentTimeMillis());
|
||||
recoveryStatus.start().startTime(System.currentTimeMillis());
|
||||
recoveryStatus.updateStage(RecoveryStatus.Stage.START);
|
||||
if (translogId == -1) {
|
||||
// no translog files, bail
|
||||
indexShard.start("post recovery from gateway, no translog");
|
||||
// no index, just start the shard and bail
|
||||
recoveryStatus.translog().time(System.currentTimeMillis() - recoveryStatus.index().startTime());
|
||||
recoveryStatus.start().time(System.currentTimeMillis() - recoveryStatus.start().startTime());
|
||||
recoveryStatus.start().checkIndexTime(indexShard.checkIndexTook());
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -163,12 +166,18 @@ public class LocalIndexShardGateway extends AbstractIndexShardComponent implemen
|
|||
// no translog files, bail
|
||||
indexShard.start("post recovery from gateway, no translog");
|
||||
// no index, just start the shard and bail
|
||||
recoveryStatus.translog().time(System.currentTimeMillis() - recoveryStatus.index().startTime());
|
||||
recoveryStatus.start().time(System.currentTimeMillis() - recoveryStatus.start().startTime());
|
||||
recoveryStatus.start().checkIndexTime(indexShard.checkIndexTook());
|
||||
return;
|
||||
}
|
||||
|
||||
// recover from the translog file
|
||||
indexShard.performRecoveryPrepareForTranslog();
|
||||
recoveryStatus.start().time(System.currentTimeMillis() - recoveryStatus.start().startTime());
|
||||
recoveryStatus.start().checkIndexTime(indexShard.checkIndexTook());
|
||||
|
||||
recoveryStatus.translog().startTime(System.currentTimeMillis());
|
||||
recoveryStatus.updateStage(RecoveryStatus.Stage.TRANSLOG);
|
||||
try {
|
||||
InputStreamStreamInput si = new InputStreamStreamInput(new FileInputStream(recoveringTranslogFile));
|
||||
while (true) {
|
||||
|
@ -195,7 +204,7 @@ public class LocalIndexShardGateway extends AbstractIndexShardComponent implemen
|
|||
|
||||
recoveringTranslogFile.delete();
|
||||
|
||||
recoveryStatus.translog().time(System.currentTimeMillis() - recoveryStatus.index().startTime());
|
||||
recoveryStatus.translog().time(System.currentTimeMillis() - recoveryStatus.translog().startTime());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -115,6 +115,8 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
|
|||
|
||||
private final boolean checkIndexOnStartup;
|
||||
|
||||
private long checkIndexTook = 0;
|
||||
|
||||
private volatile IndexShardState state;
|
||||
|
||||
private TimeValue refreshInterval;
|
||||
|
@ -550,6 +552,10 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
|
|||
}
|
||||
}
|
||||
|
||||
public long checkIndexTook() {
|
||||
return this.checkIndexTook;
|
||||
}
|
||||
|
||||
/**
|
||||
* After the store has been recovered, we need to start the engine in order to apply operations
|
||||
*/
|
||||
|
@ -830,6 +836,8 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
|
|||
|
||||
private void checkIndex(boolean throwException) throws IndexShardException {
|
||||
try {
|
||||
checkIndexTook = 0;
|
||||
long time = System.currentTimeMillis();
|
||||
if (!IndexReader.indexExists(store.directory())) {
|
||||
return;
|
||||
}
|
||||
|
@ -853,6 +861,7 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
|
|||
logger.debug("check index [success]\n{}", new String(os.underlyingBytes(), 0, os.size()));
|
||||
}
|
||||
}
|
||||
checkIndexTook = System.currentTimeMillis() - time;
|
||||
} catch (Exception e) {
|
||||
logger.warn("failed to check index", e);
|
||||
}
|
||||
|
|
|
@ -41,6 +41,8 @@ class RecoveryResponse implements Streamable {
|
|||
long phase1Time;
|
||||
long phase1ThrottlingWaitTime;
|
||||
|
||||
long startTime;
|
||||
|
||||
int phase2Operations;
|
||||
long phase2Time;
|
||||
|
||||
|
@ -78,6 +80,7 @@ class RecoveryResponse implements Streamable {
|
|||
phase1ExistingTotalSize = in.readVLong();
|
||||
phase1Time = in.readVLong();
|
||||
phase1ThrottlingWaitTime = in.readVLong();
|
||||
startTime = in.readVLong();
|
||||
phase2Operations = in.readVInt();
|
||||
phase2Time = in.readVLong();
|
||||
phase3Operations = in.readVInt();
|
||||
|
@ -108,6 +111,7 @@ class RecoveryResponse implements Streamable {
|
|||
out.writeVLong(phase1ExistingTotalSize);
|
||||
out.writeVLong(phase1Time);
|
||||
out.writeVLong(phase1ThrottlingWaitTime);
|
||||
out.writeVLong(startTime);
|
||||
out.writeVInt(phase2Operations);
|
||||
out.writeVLong(phase2Time);
|
||||
out.writeVInt(phase3Operations);
|
||||
|
|
|
@ -194,13 +194,16 @@ public class RecoverySource extends AbstractComponent {
|
|||
if (shard.state() == IndexShardState.CLOSED) {
|
||||
throw new IndexShardClosedException(request.shardId());
|
||||
}
|
||||
logger.trace("[{}][{}] recovery [phase2] to {}: sending transaction log operations", request.shardId().index().name(), request.shardId().id(), request.targetNode());
|
||||
logger.trace("[{}][{}] recovery [phase2] to {}: start", request.shardId().index().name(), request.shardId().id(), request.targetNode());
|
||||
StopWatch stopWatch = new StopWatch().start();
|
||||
|
||||
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.PREPARE_TRANSLOG, new RecoveryPrepareForTranslogOperationsRequest(request.shardId()), VoidTransportResponseHandler.INSTANCE_SAME).txGet();
|
||||
stopWatch.stop();
|
||||
response.startTime = stopWatch.totalTime().millis();
|
||||
logger.trace("[{}][{}] recovery [phase2] to {}: start took [{}]", request.shardId().index().name(), request.shardId().id(), request.targetNode(), stopWatch.totalTime());
|
||||
|
||||
logger.trace("[{}][{}] recovery [phase2] to {}: sending transaction log operations", request.shardId().index().name(), request.shardId().id(), request.targetNode());
|
||||
stopWatch = new StopWatch().start();
|
||||
int totalOperations = sendSnapshot(snapshot);
|
||||
|
||||
stopWatch.stop();
|
||||
logger.trace("[{}][{}] recovery [phase2] to {}: took [{}]", request.shardId().index().name(), request.shardId().id(), request.targetNode(), stopWatch.totalTime());
|
||||
response.phase2Time = stopWatch.totalTime().millis();
|
||||
|
|
|
@ -195,7 +195,8 @@ public class RecoveryTarget extends AbstractComponent {
|
|||
.append(", took [").append(timeValueMillis(recoveryStatus.phase1Time)).append("], throttling_wait [").append(timeValueMillis(recoveryStatus.phase1ThrottlingWaitTime)).append(']')
|
||||
.append("\n");
|
||||
sb.append(" : reusing_files [").append(recoveryStatus.phase1ExistingFileNames.size()).append("] with total_size of [").append(new ByteSizeValue(recoveryStatus.phase1ExistingTotalSize)).append("]\n");
|
||||
sb.append(" phase2: recovered [").append(recoveryStatus.phase2Operations).append("]").append(" transaction log operations")
|
||||
sb.append(" phase2: start took [").append(timeValueMillis(recoveryStatus.startTime)).append("]\n");
|
||||
sb.append(" : recovered [").append(recoveryStatus.phase2Operations).append("]").append(" transaction log operations")
|
||||
.append(", took [").append(timeValueMillis(recoveryStatus.phase2Time)).append("]")
|
||||
.append("\n");
|
||||
sb.append(" phase3: recovered [").append(recoveryStatus.phase3Operations).append("]").append(" transaction log operations")
|
||||
|
|
Loading…
Reference in New Issue