add gateway recovery status to the indices status API exposing both on going and summary when recovering from a gateway

This commit is contained in:
kimchy 2010-08-17 22:34:56 +03:00
parent 311520d146
commit 3f9034b41c
13 changed files with 579 additions and 260 deletions

View File

@ -0,0 +1,179 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.indices.status;
import org.elasticsearch.ElasticSearchIllegalArgumentException;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
/**
* @author kimchy (shay.banon)
*/
public class GatewayRecoveryStatus {
public enum Stage {
INIT((byte) 0),
RETRY((byte) 1),
INDEX((byte) 2),
TRANSLOG((byte) 3),
FINALIZE((byte) 4),
DONE((byte) 5);
private final byte value;
Stage(byte value) {
this.value = value;
}
public byte value() {
return value;
}
public static Stage fromValue(byte value) {
if (value == 0) {
return INIT;
} else if (value == 1) {
return RETRY;
} else if (value == 2) {
return INDEX;
} else if (value == 3) {
return TRANSLOG;
} else if (value == 4) {
return FINALIZE;
} else if (value == 5) {
return DONE;
}
throw new ElasticSearchIllegalArgumentException("No stage found for [" + value + ']');
}
}
final Stage stage;
final long startTime;
final long time;
final long throttlingTime;
final long indexThrottlingTime;
final long indexSize;
final long reusedIndexSize;
final long recoveredIndexSize;
final long recoveredTranslogOperations;
public GatewayRecoveryStatus(Stage stage, long startTime, long time, long throttlingTime, long indexThrottlingTime, long indexSize, long reusedIndexSize,
long recoveredIndexSize, long recoveredTranslogOperations) {
this.stage = stage;
this.startTime = startTime;
this.time = time;
this.throttlingTime = throttlingTime;
this.indexThrottlingTime = indexThrottlingTime;
this.indexSize = indexSize;
this.reusedIndexSize = reusedIndexSize;
this.recoveredIndexSize = recoveredIndexSize;
this.recoveredTranslogOperations = recoveredTranslogOperations;
}
public Stage stage() {
return this.stage;
}
public long startTime() {
return this.startTime;
}
public long getStartTime() {
return this.startTime;
}
public TimeValue time() {
return TimeValue.timeValueMillis(time);
}
public TimeValue getTime() {
return time();
}
public TimeValue throttlingTime() {
return TimeValue.timeValueMillis(throttlingTime);
}
public TimeValue getThrottlingTime() {
return throttlingTime();
}
public TimeValue indexThrottlingTime() {
return TimeValue.timeValueMillis(indexThrottlingTime);
}
public TimeValue getIndexThrottlingTime() {
return indexThrottlingTime();
}
public ByteSizeValue indexSize() {
return new ByteSizeValue(indexSize);
}
public ByteSizeValue getIndexSize() {
return indexSize();
}
public ByteSizeValue reusedIndexSize() {
return new ByteSizeValue(reusedIndexSize);
}
public ByteSizeValue getReusedIndexSize() {
return reusedIndexSize();
}
public ByteSizeValue expectedRecoveredIndexSize() {
return new ByteSizeValue(indexSize - reusedIndexSize);
}
public ByteSizeValue getExpectedRecoveredIndexSize() {
return expectedRecoveredIndexSize();
}
/**
* How much of the index has been recovered.
*/
public ByteSizeValue recoveredIndexSize() {
return new ByteSizeValue(recoveredIndexSize);
}
/**
* How much of the index has been recovered.
*/
public ByteSizeValue getRecoveredIndexSize() {
return recoveredIndexSize();
}
public long recoveredTranslogOperations() {
return recoveredTranslogOperations;
}
public long getRecoveredTranslogOperations() {
return recoveredTranslogOperations();
}
}

View File

@ -0,0 +1,168 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.indices.status;
import org.elasticsearch.ElasticSearchIllegalArgumentException;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
/**
* @author kimchy (shay.banon)
*/
public class PeerRecoveryStatus {
public enum Stage {
INIT((byte) 0),
RETRY((byte) 1),
INDEX((byte) 2),
TRANSLOG((byte) 3),
FINALIZE((byte) 4),
DONE((byte) 5);
private final byte value;
Stage(byte value) {
this.value = value;
}
public byte value() {
return value;
}
public static Stage fromValue(byte value) {
if (value == 0) {
return INIT;
} else if (value == 1) {
return RETRY;
} else if (value == 2) {
return INDEX;
} else if (value == 3) {
return TRANSLOG;
} else if (value == 4) {
return FINALIZE;
} else if (value == 5) {
return DONE;
}
throw new ElasticSearchIllegalArgumentException("No stage found for [" + value + ']');
}
}
final Stage stage;
final long startTime;
final long time;
final long throttlingTime;
final long indexSize;
final long reusedIndexSize;
final long recoveredIndexSize;
final long recoveredTranslogOperations;
public PeerRecoveryStatus(Stage stage, long startTime, long time, long throttlingTime, long indexSize, long reusedIndexSize,
long recoveredIndexSize, long recoveredTranslogOperations) {
this.stage = stage;
this.startTime = startTime;
this.time = time;
this.throttlingTime = throttlingTime;
this.indexSize = indexSize;
this.reusedIndexSize = reusedIndexSize;
this.recoveredIndexSize = recoveredIndexSize;
this.recoveredTranslogOperations = recoveredTranslogOperations;
}
public Stage stage() {
return this.stage;
}
public long startTime() {
return this.startTime;
}
public long getStartTime() {
return this.startTime;
}
public TimeValue time() {
return TimeValue.timeValueMillis(time);
}
public TimeValue getTime() {
return time();
}
public TimeValue throttlingTime() {
return TimeValue.timeValueMillis(throttlingTime);
}
public TimeValue getThrottlingTime() {
return throttlingTime();
}
public ByteSizeValue indexSize() {
return new ByteSizeValue(indexSize);
}
public ByteSizeValue getIndexSize() {
return indexSize();
}
public ByteSizeValue reusedIndexSize() {
return new ByteSizeValue(reusedIndexSize);
}
public ByteSizeValue getReusedIndexSize() {
return reusedIndexSize();
}
public ByteSizeValue expectedRecoveredIndexSize() {
return new ByteSizeValue(indexSize - reusedIndexSize);
}
public ByteSizeValue getExpectedRecoveredIndexSize() {
return expectedRecoveredIndexSize();
}
/**
* How much of the index has been recovered.
*/
public ByteSizeValue recoveredIndexSize() {
return new ByteSizeValue(recoveredIndexSize);
}
/**
* How much of the index has been recovered.
*/
public ByteSizeValue getRecoveredIndexSize() {
return recoveredIndexSize();
}
public long recoveredTranslogOperations() {
return recoveredTranslogOperations;
}
public long getRecoveredTranslogOperations() {
return recoveredTranslogOperations();
}
}

View File

@ -19,13 +19,11 @@
package org.elasticsearch.action.admin.indices.status;
import org.elasticsearch.ElasticSearchIllegalArgumentException;
import org.elasticsearch.action.support.broadcast.BroadcastShardOperationResponse;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.shard.IndexShardState;
import java.io.IOException;
@ -34,7 +32,7 @@ import static org.elasticsearch.cluster.routing.ImmutableShardRouting.*;
import static org.elasticsearch.common.unit.ByteSizeValue.*;
/**
* @author kimchy (Shay Banon)
* @author kimchy (shay.banon)
*/
public class ShardStatus extends BroadcastShardOperationResponse {
@ -70,147 +68,6 @@ public class ShardStatus extends BroadcastShardOperationResponse {
}
}
public static class PeerRecoveryStatus {
public enum Stage {
INIT((byte) 0),
RETRY((byte) 1),
FILES((byte) 2),
TRANSLOG((byte) 3),
FINALIZE((byte) 4),
DONE((byte) 5);
private final byte value;
Stage(byte value) {
this.value = value;
}
public byte value() {
return value;
}
public static Stage fromValue(byte value) {
if (value == 0) {
return INIT;
} else if (value == 1) {
return RETRY;
} else if (value == 2) {
return FILES;
} else if (value == 3) {
return TRANSLOG;
} else if (value == 4) {
return FINALIZE;
} else if (value == 5) {
return DONE;
}
throw new ElasticSearchIllegalArgumentException("No stage found for [" + value + ']');
}
}
final Stage stage;
final long startTime;
final long time;
final long retryTime;
final long indexSize;
final long reusedIndexSize;
final long recoveredIndexSize;
final long recoveredTranslogOperations;
public PeerRecoveryStatus(Stage stage, long startTime, long time, long retryTime, long indexSize, long reusedIndexSize,
long recoveredIndexSize, long recoveredTranslogOperations) {
this.stage = stage;
this.startTime = startTime;
this.time = time;
this.retryTime = retryTime;
this.indexSize = indexSize;
this.reusedIndexSize = reusedIndexSize;
this.recoveredIndexSize = recoveredIndexSize;
this.recoveredTranslogOperations = recoveredTranslogOperations;
}
public Stage stage() {
return this.stage;
}
public long startTime() {
return this.startTime;
}
public long getStartTime() {
return this.startTime;
}
public TimeValue time() {
return TimeValue.timeValueMillis(time);
}
public TimeValue getTime() {
return time();
}
public TimeValue retryTime() {
return TimeValue.timeValueMillis(retryTime);
}
public TimeValue getRetryTime() {
return retryTime();
}
public ByteSizeValue indexSize() {
return new ByteSizeValue(indexSize);
}
public ByteSizeValue getIndexSize() {
return indexSize();
}
public ByteSizeValue reusedIndexSize() {
return new ByteSizeValue(reusedIndexSize);
}
public ByteSizeValue getReusedIndexSize() {
return reusedIndexSize();
}
public ByteSizeValue expectedRecoveredIndexSize() {
return new ByteSizeValue(indexSize - reusedIndexSize);
}
public ByteSizeValue getExpectedRecoveredIndexSize() {
return expectedRecoveredIndexSize();
}
/**
* How much of the index has been recovered.
*/
public ByteSizeValue recoveredIndexSize() {
return new ByteSizeValue(recoveredIndexSize);
}
/**
* How much of the index has been recovered.
*/
public ByteSizeValue getRecoveredIndexSize() {
return recoveredIndexSize();
}
public long recoveredTranslogOperations() {
return recoveredTranslogOperations;
}
public long getRecoveredTranslogOperations() {
return recoveredTranslogOperations();
}
}
private ShardRouting shardRouting;
IndexShardState state;
@ -225,6 +82,8 @@ public class ShardStatus extends BroadcastShardOperationResponse {
PeerRecoveryStatus peerRecoveryStatus;
GatewayRecoveryStatus gatewayRecoveryStatus;
ShardStatus() {
}
@ -289,6 +148,14 @@ public class ShardStatus extends BroadcastShardOperationResponse {
return peerRecoveryStatus();
}
public GatewayRecoveryStatus gatewayRecoveryStatus() {
return gatewayRecoveryStatus;
}
public GatewayRecoveryStatus getGatewayRecoveryStatus() {
return gatewayRecoveryStatus();
}
public static ShardStatus readIndexShardStatus(StreamInput in) throws IOException {
ShardStatus shardStatus = new ShardStatus();
shardStatus.readFrom(in);
@ -319,15 +186,30 @@ public class ShardStatus extends BroadcastShardOperationResponse {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
out.writeByte(peerRecoveryStatus.stage.value);
out.writeByte(peerRecoveryStatus.stage.value());
out.writeVLong(peerRecoveryStatus.startTime);
out.writeVLong(peerRecoveryStatus.time);
out.writeVLong(peerRecoveryStatus.retryTime);
out.writeVLong(peerRecoveryStatus.throttlingTime);
out.writeVLong(peerRecoveryStatus.indexSize);
out.writeVLong(peerRecoveryStatus.reusedIndexSize);
out.writeVLong(peerRecoveryStatus.recoveredIndexSize);
out.writeVLong(peerRecoveryStatus.recoveredTranslogOperations);
}
if (gatewayRecoveryStatus == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
out.writeByte(gatewayRecoveryStatus.stage.value());
out.writeVLong(gatewayRecoveryStatus.startTime);
out.writeVLong(gatewayRecoveryStatus.time);
out.writeVLong(gatewayRecoveryStatus.throttlingTime);
out.writeVLong(gatewayRecoveryStatus.indexThrottlingTime);
out.writeVLong(gatewayRecoveryStatus.indexSize);
out.writeVLong(gatewayRecoveryStatus.reusedIndexSize);
out.writeVLong(gatewayRecoveryStatus.recoveredIndexSize);
out.writeVLong(gatewayRecoveryStatus.recoveredTranslogOperations);
}
}
@Override public void readFrom(StreamInput in) throws IOException {
@ -349,5 +231,10 @@ public class ShardStatus extends BroadcastShardOperationResponse {
peerRecoveryStatus = new PeerRecoveryStatus(PeerRecoveryStatus.Stage.fromValue(in.readByte()),
in.readVLong(), in.readVLong(), in.readVLong(), in.readVLong(), in.readVLong(), in.readVLong(), in.readVLong());
}
if (in.readBoolean()) {
gatewayRecoveryStatus = new GatewayRecoveryStatus(GatewayRecoveryStatus.Stage.fromValue(in.readByte()),
in.readVLong(), in.readVLong(), in.readVLong(), in.readVLong(), in.readVLong(), in.readVLong(), in.readVLong(), in.readVLong());
}
}
}

View File

@ -34,8 +34,10 @@ import org.elasticsearch.cluster.routing.ShardsIterator;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.gateway.IndexShardGatewayService;
import org.elasticsearch.index.service.InternalIndexService;
import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.shard.recovery.PeerRecoveryStatus;
import org.elasticsearch.index.shard.recovery.RecoveryStatus;
import org.elasticsearch.index.shard.recovery.RecoveryTarget;
import org.elasticsearch.index.shard.service.InternalIndexShard;
import org.elasticsearch.indices.IndicesService;
@ -77,10 +79,23 @@ public class TransportIndicesStatusAction extends TransportBroadcastOperationAct
return true;
}
/**
* Status goes across *all* shards.
*/
@Override protected GroupShardsIterator shards(IndicesStatusRequest request, ClusterState clusterState) {
return clusterState.routingTable().allShardsGrouped(request.indices());
}
/**
* We want to go over all assigned nodes (to get recovery status) and not just active ones.
*/
@Override protected ShardRouting nextShardOrNull(ShardsIterator shardIt) {
return shardIt.nextAssignedOrNull();
}
/**
* We want to go over all assigned nodes (to get recovery status) and not just active ones.
*/
@Override protected boolean hasNextShard(ShardsIterator shardIt) {
return shardIt.hasNextAssigned();
}
@ -121,7 +136,8 @@ public class TransportIndicesStatusAction extends TransportBroadcastOperationAct
}
@Override protected ShardStatus shardOperation(IndexShardStatusRequest request) throws ElasticSearchException {
InternalIndexShard indexShard = (InternalIndexShard) indicesService.indexServiceSafe(request.index()).shard(request.shardId());
InternalIndexService indexService = (InternalIndexService) indicesService.indexServiceSafe(request.index());
InternalIndexShard indexShard = (InternalIndexShard) indexService.shard(request.shardId());
ShardStatus shardStatus = new ShardStatus(indexShard.routingEntry());
shardStatus.state = indexShard.state();
try {
@ -144,47 +160,67 @@ public class TransportIndicesStatusAction extends TransportBroadcastOperationAct
}
}
// check on going recovery (from peer or gateway)
PeerRecoveryStatus peerRecoveryStatus = indexShard.peerRecoveryStatus();
RecoveryStatus peerRecoveryStatus = indexShard.peerRecoveryStatus();
if (peerRecoveryStatus == null) {
peerRecoveryStatus = peerRecoveryTarget.peerRecoveryStatus(indexShard.shardId());
}
if (peerRecoveryStatus != null) {
ShardStatus.PeerRecoveryStatus.Stage stage;
PeerRecoveryStatus.Stage stage;
switch (peerRecoveryStatus.stage()) {
case INIT:
stage = ShardStatus.PeerRecoveryStatus.Stage.INIT;
stage = PeerRecoveryStatus.Stage.INIT;
break;
case FILES:
stage = ShardStatus.PeerRecoveryStatus.Stage.FILES;
case INDEX:
stage = PeerRecoveryStatus.Stage.INDEX;
break;
case TRANSLOG:
stage = ShardStatus.PeerRecoveryStatus.Stage.TRANSLOG;
stage = PeerRecoveryStatus.Stage.TRANSLOG;
break;
case RETRY:
stage = ShardStatus.PeerRecoveryStatus.Stage.RETRY;
stage = PeerRecoveryStatus.Stage.RETRY;
break;
case FINALIZE:
stage = ShardStatus.PeerRecoveryStatus.Stage.FINALIZE;
stage = PeerRecoveryStatus.Stage.FINALIZE;
break;
case DONE:
stage = ShardStatus.PeerRecoveryStatus.Stage.DONE;
stage = PeerRecoveryStatus.Stage.DONE;
break;
default:
stage = ShardStatus.PeerRecoveryStatus.Stage.INIT;
stage = PeerRecoveryStatus.Stage.INIT;
}
shardStatus.peerRecoveryStatus = new ShardStatus.PeerRecoveryStatus(stage, peerRecoveryStatus.startTime(), peerRecoveryStatus.time(),
shardStatus.peerRecoveryStatus = new PeerRecoveryStatus(stage, peerRecoveryStatus.startTime(), peerRecoveryStatus.time(),
peerRecoveryStatus.retryTime(), peerRecoveryStatus.phase1TotalSize(), peerRecoveryStatus.phase1ExistingTotalSize(),
peerRecoveryStatus.currentFilesSize(), peerRecoveryStatus.currentTranslogOperations());
}
return shardStatus;
}
IndexShardGatewayService gatewayService = indexService.shardInjector(request.shardId()).getInstance(IndexShardGatewayService.class);
org.elasticsearch.index.gateway.RecoveryStatus gatewayRecoveryStatus = gatewayService.recoveryStatus();
if (gatewayRecoveryStatus != null) {
GatewayRecoveryStatus.Stage stage;
switch (gatewayRecoveryStatus.stage()) {
case INIT:
stage = GatewayRecoveryStatus.Stage.INIT;
break;
case INDEX:
stage = GatewayRecoveryStatus.Stage.INDEX;
break;
case TRANSLOG:
stage = GatewayRecoveryStatus.Stage.TRANSLOG;
break;
case RETRY:
stage = GatewayRecoveryStatus.Stage.RETRY;
break;
case DONE:
stage = GatewayRecoveryStatus.Stage.DONE;
break;
default:
stage = GatewayRecoveryStatus.Stage.INIT;
}
shardStatus.gatewayRecoveryStatus = new GatewayRecoveryStatus(stage, gatewayRecoveryStatus.startTime(), gatewayRecoveryStatus.time(), gatewayRecoveryStatus.retryTime(),
gatewayRecoveryStatus.index().retryTime(), gatewayRecoveryStatus.index().totalSize(), gatewayRecoveryStatus.index().existingTotalSize(), gatewayRecoveryStatus.index().currentFilesSize(), gatewayRecoveryStatus.translog().currentTranslogOperations());
}
/**
* Status goes across *all* shards.
*/
@Override protected GroupShardsIterator shards(IndicesStatusRequest request, ClusterState clusterState) {
return clusterState.routingTable().allShardsGrouped(request.indices());
return shardStatus;
}
public static class IndexShardStatusRequest extends BroadcastShardOperationRequest {

View File

@ -51,7 +51,7 @@ public interface IndexShardGateway extends IndexShardComponent, CloseableIndexCo
/**
* Recovers the state of the shard from the gateway.
*/
RecoveryStatus recover() throws IndexShardGatewayRecoveryException;
void recover(RecoveryStatus recoveryStatus) throws IndexShardGatewayRecoveryException;
/**
* Snapshots the given shard into the gateway.

View File

@ -19,10 +19,10 @@
package org.elasticsearch.index.gateway;
import org.elasticsearch.common.StopWatch;
import org.elasticsearch.common.component.CloseableIndexComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit;
import org.elasticsearch.index.engine.Engine;
@ -40,6 +40,8 @@ import org.elasticsearch.threadpool.ThreadPool;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.elasticsearch.common.unit.TimeValue.*;
/**
* @author kimchy (shay.banon)
*/
@ -72,6 +74,8 @@ public class IndexShardGatewayService extends AbstractIndexShardComponent implem
private volatile ScheduledFuture snapshotScheduleFuture;
private RecoveryStatus recoveryStatus;
@Inject public IndexShardGatewayService(ShardId shardId, @IndexSettings Settings indexSettings,
ThreadPool threadPool, IndexShard indexShard, IndexShardGateway shardGateway,
Store store, RecoveryThrottler recoveryThrottler) {
@ -101,6 +105,16 @@ public class IndexShardGatewayService extends AbstractIndexShardComponent implem
void onRecoveryFailed(IndexShardGatewayRecoveryException e);
}
public RecoveryStatus recoveryStatus() {
if (recoveryStatus == null) {
return recoveryStatus;
}
if (recoveryStatus.startTime() > 0 && recoveryStatus.stage() != RecoveryStatus.Stage.DONE) {
recoveryStatus.time(System.currentTimeMillis() - recoveryStatus.startTime());
}
return recoveryStatus;
}
/**
* Recovers the state of the shard from the gateway.
*/
@ -122,13 +136,16 @@ public class IndexShardGatewayService extends AbstractIndexShardComponent implem
threadPool.cached().execute(new Runnable() {
@Override public void run() {
recoveryStatus = new RecoveryStatus();
recoveryStatus.updateStage(RecoveryStatus.Stage.INIT);
indexShard.recovering();
StopWatch throttlingWaitTime = new StopWatch().start();
// we know we are on a thread, we can spin till we can engage in recovery
while (!recoveryThrottler.tryRecovery(shardId, "gateway")) {
recoveryStatus.updateStage(RecoveryStatus.Stage.RETRY);
try {
Thread.sleep(recoveryThrottler.throttleInterval().millis());
recoveryStatus.retryTime(System.currentTimeMillis() - recoveryStatus.startTime());
} catch (InterruptedException e) {
if (indexShard.ignoreRecoveryAttempt()) {
listener.onIgnoreRecovery("Interrupted while waiting for recovery, but we should ignore ...");
@ -137,12 +154,10 @@ public class IndexShardGatewayService extends AbstractIndexShardComponent implem
listener.onRecoveryFailed(new IndexShardGatewayRecoveryException(shardId, "Interrupted while waiting to recovery", e));
}
}
throttlingWaitTime.stop();
try {
logger.debug("starting recovery from {} ...", shardGateway);
StopWatch stopWatch = new StopWatch().start();
RecoveryStatus recoveryStatus = shardGateway.recover();
shardGateway.recover(recoveryStatus);
lastIndexVersion = recoveryStatus.index().version();
lastTranslogId = -1;
@ -153,17 +168,20 @@ public class IndexShardGatewayService extends AbstractIndexShardComponent implem
if (indexShard.state() != IndexShardState.STARTED) {
indexShard.start();
}
stopWatch.stop();
if (logger.isDebugEnabled()) {
StringBuilder sb = new StringBuilder();
sb.append("recovery completed from ").append(shardGateway).append(", took [").append(stopWatch.totalTime()).append("], throttling_wait [").append(throttlingWaitTime.totalTime()).append("]\n");
sb.append(" index : recovered_files [").append(recoveryStatus.index().numberOfFiles()).append("] with total_size [").append(recoveryStatus.index().totalSize()).append("], took [").append(recoveryStatus.index().took()).append("], throttling_wait [").append(recoveryStatus.index().throttlingWaitTime()).append("]\n");
sb.append(" : reusing_files [").append(recoveryStatus.index().numberOfExistingFiles()).append("] with total_size [").append(recoveryStatus.index().existingTotalSize()).append("]\n");
sb.append(" translog : number_of_operations [").append(recoveryStatus.translog().currentTranslogOperations()).append("], took [").append(recoveryStatus.translog().took()).append("]");
logger.debug(sb.toString());
}
// refresh the shard
indexShard.refresh(new Engine.Refresh(false));
recoveryStatus.time(System.currentTimeMillis() - recoveryStatus.startTime());
recoveryStatus.updateStage(RecoveryStatus.Stage.DONE);
if (logger.isDebugEnabled()) {
StringBuilder sb = new StringBuilder();
sb.append("recovery completed from ").append(shardGateway).append(", took [").append(timeValueMillis(recoveryStatus.time())).append("], retry_time [").append(TimeValue.timeValueMillis(recoveryStatus.retryTime())).append("]\n");
sb.append(" index : recovered_files [").append(recoveryStatus.index().numberOfFiles()).append("] with total_size [").append(new ByteSizeValue(recoveryStatus.index().totalSize())).append("], took [").append(TimeValue.timeValueMillis(recoveryStatus.index().time())).append("], throttling_wait [").append(TimeValue.timeValueMillis(recoveryStatus.index().retryTime())).append("]\n");
sb.append(" : reusing_files [").append(recoveryStatus.index().numberOfExistingFiles()).append("] with total_size [").append(new ByteSizeValue(recoveryStatus.index().existingTotalSize())).append("]\n");
sb.append(" translog : number_of_operations [").append(recoveryStatus.translog().currentTranslogOperations()).append("], took [").append(TimeValue.timeValueMillis(recoveryStatus.translog().time())).append("]");
logger.debug(sb.toString());
}
listener.onRecoveryDone();
scheduleSnapshotIfNeeded();
} catch (IndexShardGatewayRecoveryException e) {

View File

@ -19,9 +19,6 @@
package org.elasticsearch.index.gateway;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import java.util.concurrent.atomic.AtomicLong;
/**
@ -30,17 +27,20 @@ import java.util.concurrent.atomic.AtomicLong;
public class RecoveryStatus {
public static enum Stage {
NONE,
INIT,
RETRY,
INDEX,
TRANSLOG,
DONE
}
private Stage stage = Stage.NONE;
private Stage stage = Stage.INIT;
private long startTime;
private long startTime = System.currentTimeMillis();
private long took;
private long retryTime = 0;
private long time;
private Index index = new Index();
@ -63,12 +63,20 @@ public class RecoveryStatus {
this.startTime = startTime;
}
public TimeValue took() {
return new TimeValue(this.took);
public long retryTime() {
return this.retryTime;
}
public void took(long took) {
this.took = took;
public void retryTime(long retryTime) {
this.retryTime = retryTime;
}
public long time() {
return this.time;
}
public void time(long time) {
this.time = time;
}
public Index index() {
@ -80,9 +88,9 @@ public class RecoveryStatus {
}
public static class Translog {
volatile long currentTranslogOperations = 0;
private long startTime = -1;
private long took;
private long startTime = 0;
private long time;
private volatile long currentTranslogOperations = 0;
public long startTime() {
return this.startTime;
@ -92,12 +100,12 @@ public class RecoveryStatus {
this.startTime = startTime;
}
public TimeValue took() {
return new TimeValue(this.took);
public long time() {
return this.time;
}
public void took(long took) {
this.took = took;
public void time(long time) {
this.time = time;
}
public void addTranslogOperations(long count) {
@ -110,15 +118,15 @@ public class RecoveryStatus {
}
public static class Index {
private long startTime = -1;
private long took = -1;
private long startTime = 0;
private long time = 0;
private long version = -1;
private int numberOfFiles = 0;
private long totalSize = 0;
private int numberOfExistingFiles = 0;
private long existingTotalSize = 0;
private AtomicLong throttlingWaitTime = new AtomicLong();
private AtomicLong retryTime = new AtomicLong();
private AtomicLong currentFilesSize = new AtomicLong();
public long startTime() {
@ -129,12 +137,12 @@ public class RecoveryStatus {
this.startTime = startTime;
}
public TimeValue took() {
return new TimeValue(this.took);
public long time() {
return this.time;
}
public void took(long took) {
this.took = took;
public void time(long time) {
this.time = time;
}
public long version() {
@ -152,24 +160,24 @@ public class RecoveryStatus {
return numberOfFiles;
}
public ByteSizeValue totalSize() {
return new ByteSizeValue(totalSize);
public long totalSize() {
return this.totalSize;
}
public int numberOfExistingFiles() {
return numberOfExistingFiles;
}
public ByteSizeValue existingTotalSize() {
return new ByteSizeValue(existingTotalSize);
public long existingTotalSize() {
return this.existingTotalSize;
}
public void addThrottlingTime(long delta) {
throttlingWaitTime.addAndGet(delta);
public void addRetryTime(long delta) {
retryTime.addAndGet(delta);
}
public TimeValue throttlingWaitTime() {
return new TimeValue(throttlingWaitTime.get());
public long retryTime() {
return this.retryTime.get();
}
public void updateVersion(long version) {

View File

@ -84,8 +84,6 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo
protected final RecoveryThrottler recoveryThrottler;
private final RecoveryStatus recoveryStatus;
protected final ByteSizeValue chunkSize;
protected final BlobStore blobStore;
@ -102,6 +100,8 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo
private volatile AppendableBlobContainer.AppendableBlob translogBlob;
private volatile RecoveryStatus recoveryStatus;
private volatile SnapshotStatus lastSnapshotStatus;
private volatile SnapshotStatus currentSnapshotStatus;
@ -403,22 +403,18 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo
}
}
@Override public RecoveryStatus recover() throws IndexShardGatewayRecoveryException {
recoveryStatus.startTime(System.currentTimeMillis());
@Override public void recover(RecoveryStatus recoveryStatus) throws IndexShardGatewayRecoveryException {
this.recoveryStatus = recoveryStatus;
recoveryStatus.index().startTime(System.currentTimeMillis());
recoveryStatus.updateStage(RecoveryStatus.Stage.INDEX);
recoverIndex();
recoveryStatus.index().took(System.currentTimeMillis() - recoveryStatus.index().startTime());
recoveryStatus.index().time(System.currentTimeMillis() - recoveryStatus.index().startTime());
recoveryStatus.translog().startTime(System.currentTimeMillis());
recoveryStatus.updateStage(RecoveryStatus.Stage.TRANSLOG);
recoverTranslog();
recoveryStatus.translog().took(System.currentTimeMillis() - recoveryStatus.index().startTime());
recoveryStatus.took(System.currentTimeMillis() - recoveryStatus.startTime());
recoveryStatus.updateStage(RecoveryStatus.Stage.DONE);
return recoveryStatus;
recoveryStatus.translog().time(System.currentTimeMillis() - recoveryStatus.index().startTime());
}
private void recoverTranslog() throws IndexShardGatewayRecoveryException {
@ -574,7 +570,7 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo
// lets reschedule to do it next time
threadPool.schedule(new Runnable() {
@Override public void run() {
recoveryStatus.index().addThrottlingTime(recoveryThrottler.throttleInterval().millis());
recoveryStatus.index().addRetryTime(recoveryThrottler.throttleInterval().millis());
if (recoveryThrottler.tryStream(shardId, fileToRecover.name())) {
// we managed to get a recovery going
recoverFile(fileToRecover, indicesBlobs, latch, failures);

View File

@ -56,10 +56,8 @@ public class NoneIndexShardGateway extends AbstractIndexShardComponent implement
return recoveryStatus;
}
@Override public RecoveryStatus recover() throws IndexShardGatewayRecoveryException {
recoveryStatus.startTime(System.currentTimeMillis());
@Override public void recover(RecoveryStatus recoveryStatus) throws IndexShardGatewayRecoveryException {
recoveryStatus().index().startTime(System.currentTimeMillis());
recoveryStatus.translog().startTime(System.currentTimeMillis());
// in the none case, we simply start the shard
// clean the store, there should be nothing there...
try {
@ -68,10 +66,9 @@ public class NoneIndexShardGateway extends AbstractIndexShardComponent implement
logger.warn("failed to clean store before starting shard", e);
}
indexShard.start();
recoveryStatus.index().took(System.currentTimeMillis() - recoveryStatus.index().startTime());
recoveryStatus.translog().took(System.currentTimeMillis() - recoveryStatus.index().startTime());
recoveryStatus.took(System.currentTimeMillis() - recoveryStatus.startTime());
return recoveryStatus.updateStage(RecoveryStatus.Stage.DONE);
recoveryStatus.index().time(System.currentTimeMillis() - recoveryStatus.index().startTime());
recoveryStatus.translog().startTime(System.currentTimeMillis());
recoveryStatus.translog().time(System.currentTimeMillis() - recoveryStatus.index().startTime());
}
@Override public String type() {

View File

@ -29,12 +29,12 @@ import java.util.concurrent.atomic.AtomicLong;
/**
* @author kimchy (shay.banon)
*/
public class PeerRecoveryStatus {
public class RecoveryStatus {
public static enum Stage {
INIT,
RETRY,
FILES,
INDEX,
TRANSLOG,
FINALIZE,
DONE

View File

@ -78,7 +78,7 @@ public class RecoveryTarget extends AbstractComponent {
private final RecoveryThrottler recoveryThrottler;
private final ConcurrentMap<ShardId, PeerRecoveryStatus> onGoingRecoveries = ConcurrentCollections.newConcurrentMap();
private final ConcurrentMap<ShardId, RecoveryStatus> onGoingRecoveries = ConcurrentCollections.newConcurrentMap();
@Inject public RecoveryTarget(Settings settings, ThreadPool threadPool, TransportService transportService, IndicesService indicesService,
IndicesLifecycle indicesLifecycle, RecoveryThrottler recoveryThrottler) {
@ -102,13 +102,13 @@ public class RecoveryTarget extends AbstractComponent {
});
}
public PeerRecoveryStatus peerRecoveryStatus(ShardId shardId) {
PeerRecoveryStatus peerRecoveryStatus = onGoingRecoveries.get(shardId);
public RecoveryStatus peerRecoveryStatus(ShardId shardId) {
RecoveryStatus peerRecoveryStatus = onGoingRecoveries.get(shardId);
if (peerRecoveryStatus == null) {
return null;
}
// update how long it takes if we are still recovering...
if (peerRecoveryStatus.startTime > 0 && peerRecoveryStatus.stage != PeerRecoveryStatus.Stage.DONE) {
if (peerRecoveryStatus.startTime > 0 && peerRecoveryStatus.stage != RecoveryStatus.Stage.DONE) {
peerRecoveryStatus.time = System.currentTimeMillis() - peerRecoveryStatus.startTime;
}
return peerRecoveryStatus;
@ -159,16 +159,16 @@ public class RecoveryTarget extends AbstractComponent {
return;
}
PeerRecoveryStatus recovery;
RecoveryStatus recovery;
if (fromRetry) {
recovery = onGoingRecoveries.get(request.shardId());
} else {
recovery = new PeerRecoveryStatus();
recovery = new RecoveryStatus();
onGoingRecoveries.put(request.shardId(), recovery);
}
if (!recoveryThrottler.tryRecovery(shard.shardId(), "peer recovery target")) {
recovery.stage = PeerRecoveryStatus.Stage.RETRY;
recovery.stage = RecoveryStatus.Stage.RETRY;
recovery.retryTime = System.currentTimeMillis() - recovery.startTime;
listener.onRetryRecovery(recoveryThrottler.throttleInterval());
return;
@ -189,7 +189,7 @@ public class RecoveryTarget extends AbstractComponent {
return;
}
logger.trace("[{}][{}] retrying recovery in [{}], source shard is busy", request.shardId().index().name(), request.shardId().id(), recoveryThrottler.throttleInterval());
recovery.stage = PeerRecoveryStatus.Stage.RETRY;
recovery.stage = RecoveryStatus.Stage.RETRY;
recovery.retryTime = System.currentTimeMillis() - recovery.startTime;
listener.onRetryRecovery(recoveryThrottler.throttleInterval());
return;
@ -231,7 +231,7 @@ public class RecoveryTarget extends AbstractComponent {
}
if (cause instanceof IndexShardNotStartedException || cause instanceof IndexMissingException || cause instanceof IndexShardMissingException) {
recovery.stage = PeerRecoveryStatus.Stage.RETRY;
recovery.stage = RecoveryStatus.Stage.RETRY;
recovery.retryTime = System.currentTimeMillis() - recovery.startTime;
listener.onRetryRecovery(recoveryThrottler.throttleInterval());
return;
@ -269,7 +269,7 @@ public class RecoveryTarget extends AbstractComponent {
private void removeAndCleanOnGoingRecovery(ShardId shardId) {
// clean it from the on going recoveries since it is being closed
PeerRecoveryStatus peerRecoveryStatus = onGoingRecoveries.remove(shardId);
RecoveryStatus peerRecoveryStatus = onGoingRecoveries.remove(shardId);
if (peerRecoveryStatus != null) {
// clean open index outputs
for (Map.Entry<String, IndexOutput> entry : peerRecoveryStatus.openIndexOutputs.entrySet()) {
@ -294,12 +294,12 @@ public class RecoveryTarget extends AbstractComponent {
@Override public void messageReceived(RecoveryPrepareForTranslogOperationsRequest request, TransportChannel channel) throws Exception {
InternalIndexShard shard = (InternalIndexShard) indicesService.indexServiceSafe(request.shardId().index().name()).shardSafe(request.shardId().id());
PeerRecoveryStatus onGoingRecovery = onGoingRecoveries.get(shard.shardId());
RecoveryStatus onGoingRecovery = onGoingRecoveries.get(shard.shardId());
if (onGoingRecovery == null) {
// shard is getting closed on us
throw new IndexShardClosedException(shard.shardId());
}
onGoingRecovery.stage = PeerRecoveryStatus.Stage.TRANSLOG;
onGoingRecovery.stage = RecoveryStatus.Stage.TRANSLOG;
shard.performRecoveryPrepareForTranslog();
channel.sendResponse(VoidStreamable.INSTANCE);
@ -314,15 +314,15 @@ public class RecoveryTarget extends AbstractComponent {
@Override public void messageReceived(RecoveryFinalizeRecoveryRequest request, TransportChannel channel) throws Exception {
InternalIndexShard shard = (InternalIndexShard) indicesService.indexServiceSafe(request.shardId().index().name()).shardSafe(request.shardId().id());
PeerRecoveryStatus peerRecoveryStatus = onGoingRecoveries.get(shard.shardId());
RecoveryStatus peerRecoveryStatus = onGoingRecoveries.get(shard.shardId());
if (peerRecoveryStatus == null) {
// shard is getting closed on us
throw new IndexShardClosedException(shard.shardId());
}
peerRecoveryStatus.stage = PeerRecoveryStatus.Stage.FINALIZE;
peerRecoveryStatus.stage = RecoveryStatus.Stage.FINALIZE;
shard.performRecoveryFinalization(false, peerRecoveryStatus);
peerRecoveryStatus.time = System.currentTimeMillis() - peerRecoveryStatus.startTime;
peerRecoveryStatus.stage = PeerRecoveryStatus.Stage.DONE;
peerRecoveryStatus.stage = RecoveryStatus.Stage.DONE;
channel.sendResponse(VoidStreamable.INSTANCE);
}
}
@ -340,7 +340,7 @@ public class RecoveryTarget extends AbstractComponent {
shard.performRecoveryOperation(operation);
}
PeerRecoveryStatus onGoingRecovery = onGoingRecoveries.get(shard.shardId());
RecoveryStatus onGoingRecovery = onGoingRecoveries.get(shard.shardId());
if (onGoingRecovery == null) {
// shard is getting closed on us
throw new IndexShardClosedException(shard.shardId());
@ -359,7 +359,7 @@ public class RecoveryTarget extends AbstractComponent {
@Override public void messageReceived(RecoveryFilesInfoRequest request, TransportChannel channel) throws Exception {
InternalIndexShard shard = (InternalIndexShard) indicesService.indexServiceSafe(request.shardId.index().name()).shardSafe(request.shardId.id());
PeerRecoveryStatus onGoingRecovery = onGoingRecoveries.get(shard.shardId());
RecoveryStatus onGoingRecovery = onGoingRecoveries.get(shard.shardId());
if (onGoingRecovery == null) {
// shard is getting closed on us
throw new IndexShardClosedException(shard.shardId());
@ -370,7 +370,7 @@ public class RecoveryTarget extends AbstractComponent {
onGoingRecovery.phase1ExistingFileSizes = request.phase1ExistingFileSizes;
onGoingRecovery.phase1TotalSize = request.phase1TotalSize;
onGoingRecovery.phase1ExistingTotalSize = request.phase1ExistingTotalSize;
onGoingRecovery.stage = PeerRecoveryStatus.Stage.FILES;
onGoingRecovery.stage = RecoveryStatus.Stage.INDEX;
channel.sendResponse(VoidStreamable.INSTANCE);
}
}
@ -401,7 +401,7 @@ public class RecoveryTarget extends AbstractComponent {
@Override public void messageReceived(final RecoveryFileChunkRequest request, TransportChannel channel) throws Exception {
InternalIndexShard shard = (InternalIndexShard) indicesService.indexServiceSafe(request.shardId().index().name()).shardSafe(request.shardId().id());
PeerRecoveryStatus onGoingRecovery = onGoingRecoveries.get(shard.shardId());
RecoveryStatus onGoingRecovery = onGoingRecoveries.get(shard.shardId());
if (onGoingRecovery == null) {
// shard is getting closed on us
throw new IndexShardClosedException(shard.shardId());

View File

@ -45,7 +45,7 @@ import org.elasticsearch.index.query.IndexQueryParserMissingException;
import org.elasticsearch.index.query.IndexQueryParserService;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.shard.*;
import org.elasticsearch.index.shard.recovery.PeerRecoveryStatus;
import org.elasticsearch.index.shard.recovery.RecoveryStatus;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.indices.TypeMissingException;
@ -84,7 +84,7 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
private volatile ShardRouting shardRouting;
private PeerRecoveryStatus peerRecoveryStatus;
private RecoveryStatus peerRecoveryStatus;
@Inject public InternalIndexShard(ShardId shardId, @IndexSettings Settings indexSettings, Store store, Engine engine, Translog translog,
ThreadPool threadPool, MapperService mapperService, IndexQueryParserService queryParserService, IndexCache indexCache) {
@ -405,11 +405,11 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
/**
* The peer recovery status if this shard recovered from a peer shard.
*/
public PeerRecoveryStatus peerRecoveryStatus() {
public RecoveryStatus peerRecoveryStatus() {
return this.peerRecoveryStatus;
}
public void performRecoveryFinalization(boolean withFlush, PeerRecoveryStatus peerRecoveryStatus) throws ElasticSearchException {
public void performRecoveryFinalization(boolean withFlush, RecoveryStatus peerRecoveryStatus) throws ElasticSearchException {
performRecoveryFinalization(withFlush);
this.peerRecoveryStatus = peerRecoveryStatus;
}

View File

@ -139,14 +139,14 @@ public class RestIndicesStatusAction extends BaseRestHandler {
}
if (shardStatus.peerRecoveryStatus() != null) {
ShardStatus.PeerRecoveryStatus peerRecoveryStatus = shardStatus.peerRecoveryStatus();
PeerRecoveryStatus peerRecoveryStatus = shardStatus.peerRecoveryStatus();
builder.startObject("peer_recovery");
builder.field("stage", peerRecoveryStatus.stage());
builder.field("start_time_in_millis", peerRecoveryStatus.startTime());
builder.field("time", peerRecoveryStatus.time());
builder.field("took_in_millis", peerRecoveryStatus.time().millis());
builder.field("retry_time", peerRecoveryStatus.retryTime());
builder.field("retry_time_in_millis", peerRecoveryStatus.retryTime().millis());
builder.field("throttling_time", peerRecoveryStatus.throttlingTime());
builder.field("throttling_time_in_millis", peerRecoveryStatus.throttlingTime().millis());
builder.startObject("index");
builder.field("size", peerRecoveryStatus.indexSize());
@ -166,6 +166,36 @@ public class RestIndicesStatusAction extends BaseRestHandler {
builder.endObject();
}
if (shardStatus.gatewayRecoveryStatus() != null) {
GatewayRecoveryStatus gatewayRecoveryStatus = shardStatus.gatewayRecoveryStatus();
builder.startObject("gateway_recovery");
builder.field("stage", gatewayRecoveryStatus.stage());
builder.field("start_time_in_millis", gatewayRecoveryStatus.startTime());
builder.field("time", gatewayRecoveryStatus.time());
builder.field("took_in_millis", gatewayRecoveryStatus.time().millis());
builder.field("throttling_time", gatewayRecoveryStatus.throttlingTime());
builder.field("throttling_time_in_millis", gatewayRecoveryStatus.throttlingTime().millis());
builder.startObject("index");
builder.field("size", gatewayRecoveryStatus.indexSize());
builder.field("size_in_bytes", gatewayRecoveryStatus.indexSize().bytes());
builder.field("reused_size", gatewayRecoveryStatus.reusedIndexSize());
builder.field("reused_size_in_bytes", gatewayRecoveryStatus.reusedIndexSize().bytes());
builder.field("expected_recovered_size", gatewayRecoveryStatus.expectedRecoveredIndexSize());
builder.field("expected_recovered_size_in_bytes", gatewayRecoveryStatus.expectedRecoveredIndexSize().bytes());
builder.field("recovered_size", gatewayRecoveryStatus.recoveredIndexSize());
builder.field("recovered_size_in_bytes", gatewayRecoveryStatus.recoveredIndexSize().bytes());
builder.field("throttling_time", gatewayRecoveryStatus.indexThrottlingTime());
builder.field("throttling_time_in_millis", gatewayRecoveryStatus.indexThrottlingTime().millis());
builder.endObject();
builder.startObject("translog");
builder.field("recovered", gatewayRecoveryStatus.recoveredTranslogOperations());
builder.endObject();
builder.endObject();
}
builder.endObject();
}
builder.endArray();