add peer recovery status to the indices status API exposing both on going and summary when recovering from a peer shard

This commit is contained in:
kimchy 2010-08-17 21:23:05 +03:00
parent 96fc16dddb
commit 5fb80c391b
9 changed files with 407 additions and 139 deletions

View File

@ -111,26 +111,6 @@ public class IndexShardStatus implements Iterable<ShardStatus> {
return storeSize();
}
public ByteSizeValue estimatedFlushableMemorySize() {
long bytes = -1;
for (ShardStatus shard : shards()) {
if (shard.estimatedFlushableMemorySize() != null) {
if (bytes == -1) {
bytes = 0;
}
bytes += shard.estimatedFlushableMemorySize().bytes();
}
}
if (bytes == -1) {
return null;
}
return new ByteSizeValue(bytes);
}
public ByteSizeValue getEstimatedFlushableMemorySize() {
return estimatedFlushableMemorySize();
}
public long translogOperations() {
long translogOperations = -1;
for (ShardStatus shard : shards()) {
@ -148,7 +128,12 @@ public class IndexShardStatus implements Iterable<ShardStatus> {
return translogOperations();
}
private transient Docs docs;
public Docs docs() {
if (docs != null) {
return docs;
}
Docs docs = new Docs();
for (ShardStatus shard : shards()) {
if (!shard.shardRouting().primary()) {
@ -174,7 +159,12 @@ public class IndexShardStatus implements Iterable<ShardStatus> {
docs.deletedDocs += shard.docs().deletedDocs();
}
}
return docs;
if (docs.numDocs == -1) {
this.docs = Docs.UNKNOWN;
} else {
this.docs = docs;
}
return this.docs;
}
public Docs getDocs() {

View File

@ -139,26 +139,6 @@ public class IndexStatus implements Iterable<IndexShardStatus> {
return storeSize();
}
public ByteSizeValue estimatedFlushableMemorySize() {
long bytes = -1;
for (IndexShardStatus shard : this) {
if (shard.estimatedFlushableMemorySize() != null) {
if (bytes == -1) {
bytes = 0;
}
bytes += shard.estimatedFlushableMemorySize().bytes();
}
}
if (bytes == -1) {
return null;
}
return new ByteSizeValue(bytes);
}
public ByteSizeValue getEstimatedFlushableMemorySize() {
return estimatedFlushableMemorySize();
}
public long translogOperations() {
long translogOperations = -1;
for (IndexShardStatus shard : this) {
@ -176,7 +156,12 @@ public class IndexStatus implements Iterable<IndexShardStatus> {
return translogOperations();
}
private transient Docs docs;
public Docs docs() {
if (docs != null) {
return docs;
}
Docs docs = new Docs();
for (IndexShardStatus shard : this) {
if (shard.docs().numDocs() != -1) {
@ -198,6 +183,11 @@ public class IndexStatus implements Iterable<IndexShardStatus> {
docs.deletedDocs += shard.docs().deletedDocs();
}
}
if (docs.numDocs == -1) {
this.docs = Docs.UNKNOWN;
} else {
this.docs = docs;
}
return docs;
}

View File

@ -19,11 +19,13 @@
package org.elasticsearch.action.admin.indices.status;
import org.elasticsearch.ElasticSearchIllegalArgumentException;
import org.elasticsearch.action.support.broadcast.BroadcastShardOperationResponse;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.shard.IndexShardState;
import java.io.IOException;
@ -68,20 +70,161 @@ public class ShardStatus extends BroadcastShardOperationResponse {
}
}
public static class PeerRecoveryStatus {
public enum Stage {
INIT((byte) 0),
RETRY((byte) 1),
FILES((byte) 2),
TRANSLOG((byte) 3),
FINALIZE((byte) 4),
DONE((byte) 5);
private final byte value;
Stage(byte value) {
this.value = value;
}
public byte value() {
return value;
}
public static Stage fromValue(byte value) {
if (value == 0) {
return INIT;
} else if (value == 1) {
return RETRY;
} else if (value == 2) {
return FILES;
} else if (value == 3) {
return TRANSLOG;
} else if (value == 4) {
return FINALIZE;
} else if (value == 5) {
return DONE;
}
throw new ElasticSearchIllegalArgumentException("No stage found for [" + value + ']');
}
}
final Stage stage;
final long startTime;
final long took;
final long retryTime;
final long indexSize;
final long reusedIndexSize;
final long recoveredIndexSize;
final long recoveredTranslogOperations;
public PeerRecoveryStatus(Stage stage, long startTime, long took, long retryTime, long indexSize, long reusedIndexSize,
long recoveredIndexSize, long recoveredTranslogOperations) {
this.stage = stage;
this.startTime = startTime;
this.took = took;
this.retryTime = retryTime;
this.indexSize = indexSize;
this.reusedIndexSize = reusedIndexSize;
this.recoveredIndexSize = recoveredIndexSize;
this.recoveredTranslogOperations = recoveredTranslogOperations;
}
public Stage stage() {
return this.stage;
}
public long startTime() {
return this.startTime;
}
public long getStartTime() {
return this.startTime;
}
public TimeValue took() {
return TimeValue.timeValueMillis(took);
}
public TimeValue getTook() {
return took();
}
public TimeValue retryTime() {
return TimeValue.timeValueMillis(retryTime);
}
public TimeValue getRetryTime() {
return retryTime();
}
public ByteSizeValue indexSize() {
return new ByteSizeValue(indexSize);
}
public ByteSizeValue getIndexSize() {
return indexSize();
}
public ByteSizeValue reusedIndexSize() {
return new ByteSizeValue(reusedIndexSize);
}
public ByteSizeValue getReusedIndexSize() {
return reusedIndexSize();
}
public ByteSizeValue expectedRecoveredIndexSize() {
return new ByteSizeValue(indexSize - reusedIndexSize);
}
public ByteSizeValue getExpectedRecoveredIndexSize() {
return expectedRecoveredIndexSize();
}
/**
* How much of the index has been recovered.
*/
public ByteSizeValue recoveredIndexSize() {
return new ByteSizeValue(recoveredIndexSize);
}
/**
* How much of the index has been recovered.
*/
public ByteSizeValue getRecoveredIndexSize() {
return recoveredIndexSize();
}
public long recoveredTranslogOperations() {
return recoveredTranslogOperations;
}
public long getRecoveredTranslogOperations() {
return recoveredTranslogOperations();
}
}
private ShardRouting shardRouting;
IndexShardState state;
ByteSizeValue storeSize;
ByteSizeValue estimatedFlushableMemorySize;
long translogId = -1;
long translogOperations = -1;
Docs docs = Docs.UNKNOWN;
PeerRecoveryStatus peerRecoveryStatus;
ShardStatus() {
}
@ -114,14 +257,6 @@ public class ShardStatus extends BroadcastShardOperationResponse {
return storeSize();
}
public ByteSizeValue estimatedFlushableMemorySize() {
return estimatedFlushableMemorySize;
}
public ByteSizeValue getEstimatedFlushableMemorySize() {
return estimatedFlushableMemorySize();
}
public long translogId() {
return translogId;
}
@ -146,6 +281,14 @@ public class ShardStatus extends BroadcastShardOperationResponse {
return docs();
}
public PeerRecoveryStatus peerRecoveryStatus() {
return peerRecoveryStatus;
}
public PeerRecoveryStatus getPeerRecoveryStatus() {
return peerRecoveryStatus();
}
public static ShardStatus readIndexShardStatus(StreamInput in) throws IOException {
ShardStatus shardStatus = new ShardStatus();
shardStatus.readFrom(in);
@ -162,17 +305,29 @@ public class ShardStatus extends BroadcastShardOperationResponse {
out.writeBoolean(true);
storeSize.writeTo(out);
}
if (estimatedFlushableMemorySize == null) {
out.writeLong(translogId);
out.writeLong(translogOperations);
if (docs == Docs.UNKNOWN) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
estimatedFlushableMemorySize.writeTo(out);
out.writeInt(docs.numDocs());
out.writeInt(docs.maxDoc());
out.writeInt(docs.deletedDocs());
}
if (peerRecoveryStatus == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
out.writeByte(peerRecoveryStatus.stage.value);
out.writeVLong(peerRecoveryStatus.startTime);
out.writeVLong(peerRecoveryStatus.took);
out.writeVLong(peerRecoveryStatus.retryTime);
out.writeVLong(peerRecoveryStatus.indexSize);
out.writeVLong(peerRecoveryStatus.reusedIndexSize);
out.writeVLong(peerRecoveryStatus.recoveredIndexSize);
out.writeVLong(peerRecoveryStatus.recoveredTranslogOperations);
}
out.writeLong(translogId);
out.writeLong(translogOperations);
out.writeInt(docs.numDocs());
out.writeInt(docs.maxDoc());
out.writeInt(docs.deletedDocs());
}
@Override public void readFrom(StreamInput in) throws IOException {
@ -182,14 +337,17 @@ public class ShardStatus extends BroadcastShardOperationResponse {
if (in.readBoolean()) {
storeSize = readBytesSizeValue(in);
}
if (in.readBoolean()) {
estimatedFlushableMemorySize = readBytesSizeValue(in);
}
translogId = in.readLong();
translogOperations = in.readLong();
docs = new Docs();
docs.numDocs = in.readInt();
docs.maxDoc = in.readInt();
docs.deletedDocs = in.readInt();
if (in.readBoolean()) {
docs = new Docs();
docs.numDocs = in.readInt();
docs.maxDoc = in.readInt();
docs.deletedDocs = in.readInt();
}
if (in.readBoolean()) {
peerRecoveryStatus = new PeerRecoveryStatus(PeerRecoveryStatus.Stage.fromValue(in.readByte()),
in.readVLong(), in.readVLong(), in.readVLong(), in.readVLong(), in.readVLong(), in.readVLong(), in.readVLong());
}
}
}

View File

@ -30,9 +30,13 @@ import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardsIterator;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.shard.recovery.PeerRecoveryStatus;
import org.elasticsearch.index.shard.recovery.RecoveryTarget;
import org.elasticsearch.index.shard.service.InternalIndexShard;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.threadpool.ThreadPool;
@ -49,8 +53,12 @@ import static org.elasticsearch.common.collect.Lists.*;
*/
public class TransportIndicesStatusAction extends TransportBroadcastOperationAction<IndicesStatusRequest, IndicesStatusResponse, TransportIndicesStatusAction.IndexShardStatusRequest, ShardStatus> {
@Inject public TransportIndicesStatusAction(Settings settings, ThreadPool threadPool, ClusterService clusterService, TransportService transportService, IndicesService indicesService) {
private final RecoveryTarget peerRecoveryTarget;
@Inject public TransportIndicesStatusAction(Settings settings, ThreadPool threadPool, ClusterService clusterService, TransportService transportService,
IndicesService indicesService, RecoveryTarget peerRecoveryTarget) {
super(settings, threadPool, clusterService, transportService, indicesService);
this.peerRecoveryTarget = peerRecoveryTarget;
}
@Override protected String transportAction() {
@ -69,6 +77,14 @@ public class TransportIndicesStatusAction extends TransportBroadcastOperationAct
return true;
}
@Override protected ShardRouting nextShardOrNull(ShardsIterator shardIt) {
return shardIt.nextAssignedOrNull();
}
@Override protected boolean hasNextShard(ShardsIterator shardIt) {
return shardIt.hasNextAssigned();
}
@Override protected IndicesStatusResponse newResponse(IndicesStatusRequest request, AtomicReferenceArray shardsResponses, ClusterState clusterState) {
int successfulShards = 0;
int failedShards = 0;
@ -113,18 +129,54 @@ public class TransportIndicesStatusAction extends TransportBroadcastOperationAct
} catch (IOException e) {
// failure to get the store size...
}
shardStatus.estimatedFlushableMemorySize = indexShard.estimateFlushableMemorySize();
shardStatus.translogId = indexShard.translog().currentId();
shardStatus.translogOperations = indexShard.translog().size();
Engine.Searcher searcher = indexShard.searcher();
try {
shardStatus.docs = new ShardStatus.Docs();
shardStatus.docs.numDocs = searcher.reader().numDocs();
shardStatus.docs.maxDoc = searcher.reader().maxDoc();
shardStatus.docs.deletedDocs = searcher.reader().numDeletedDocs();
} finally {
searcher.release();
if (indexShard.state() == IndexShardState.STARTED) {
// shardStatus.estimatedFlushableMemorySize = indexShard.estimateFlushableMemorySize();
shardStatus.translogId = indexShard.translog().currentId();
shardStatus.translogOperations = indexShard.translog().size();
Engine.Searcher searcher = indexShard.searcher();
try {
shardStatus.docs = new ShardStatus.Docs();
shardStatus.docs.numDocs = searcher.reader().numDocs();
shardStatus.docs.maxDoc = searcher.reader().maxDoc();
shardStatus.docs.deletedDocs = searcher.reader().numDeletedDocs();
} finally {
searcher.release();
}
}
// check on going recovery (from peer or gateway)
PeerRecoveryStatus peerRecoveryStatus = indexShard.peerRecoveryStatus();
if (peerRecoveryStatus == null) {
peerRecoveryStatus = peerRecoveryTarget.peerRecoveryStatus(indexShard.shardId());
}
if (peerRecoveryStatus != null) {
ShardStatus.PeerRecoveryStatus.Stage stage;
switch (peerRecoveryStatus.stage()) {
case INIT:
stage = ShardStatus.PeerRecoveryStatus.Stage.INIT;
break;
case FILES:
stage = ShardStatus.PeerRecoveryStatus.Stage.FILES;
break;
case TRANSLOG:
stage = ShardStatus.PeerRecoveryStatus.Stage.TRANSLOG;
break;
case RETRY:
stage = ShardStatus.PeerRecoveryStatus.Stage.RETRY;
break;
case FINALIZE:
stage = ShardStatus.PeerRecoveryStatus.Stage.FINALIZE;
break;
case DONE:
stage = ShardStatus.PeerRecoveryStatus.Stage.DONE;
break;
default:
stage = ShardStatus.PeerRecoveryStatus.Stage.INIT;
}
shardStatus.peerRecoveryStatus = new ShardStatus.PeerRecoveryStatus(stage, peerRecoveryStatus.startTime(), peerRecoveryStatus.took(),
peerRecoveryStatus.retryTime(), peerRecoveryStatus.phase1TotalSize(), peerRecoveryStatus.phase1ExistingTotalSize(),
peerRecoveryStatus.currentFilesSize(), peerRecoveryStatus.currentTranslogOperations());
}
return shardStatus;
}

View File

@ -29,20 +29,22 @@ import java.util.concurrent.atomic.AtomicLong;
/**
* @author kimchy (shay.banon)
*/
public class OnGoingRecovery {
public class PeerRecoveryStatus {
public static enum Stage {
INIT,
RETRY,
FILES,
TRANSLOG,
FINALIZE
FINALIZE,
DONE
}
ConcurrentMap<String, IndexOutput> openIndexOutputs = ConcurrentCollections.newConcurrentMap();
final long startTimeImMillis = System.currentTimeMillis();
volatile long retryTimeInMillis = 0;
final long startTime = System.currentTimeMillis();
long took;
volatile long retryTime = 0;
List<String> phase1FileNames;
List<Long> phase1FileSizes;
List<String> phase1ExistingFileNames;
@ -53,4 +55,36 @@ public class OnGoingRecovery {
volatile Stage stage = Stage.INIT;
volatile long currentTranslogOperations = 0;
AtomicLong currentFilesSize = new AtomicLong();
public long startTime() {
return startTime;
}
public long took() {
return this.took;
}
public long retryTime() {
return retryTime;
}
public long phase1TotalSize() {
return phase1TotalSize;
}
public long phase1ExistingTotalSize() {
return phase1ExistingTotalSize;
}
public Stage stage() {
return stage;
}
public long currentTranslogOperations() {
return currentTranslogOperations;
}
public long currentFilesSize() {
return currentFilesSize.get();
}
}

View File

@ -78,7 +78,7 @@ public class RecoveryTarget extends AbstractComponent {
private final RecoveryThrottler recoveryThrottler;
private final ConcurrentMap<ShardId, OnGoingRecovery> onGoingRecoveries = ConcurrentCollections.newConcurrentMap();
private final ConcurrentMap<ShardId, PeerRecoveryStatus> onGoingRecoveries = ConcurrentCollections.newConcurrentMap();
@Inject public RecoveryTarget(Settings settings, ThreadPool threadPool, TransportService transportService, IndicesService indicesService,
IndicesLifecycle indicesLifecycle, RecoveryThrottler recoveryThrottler) {
@ -102,6 +102,10 @@ public class RecoveryTarget extends AbstractComponent {
});
}
public PeerRecoveryStatus peerRecoveryStatus(ShardId shardId) {
return onGoingRecoveries.get(shardId);
}
public void startRecovery(final StartRecoveryRequest request, final boolean fromRetry, final RecoveryListener listener) {
if (request.sourceNode() == null) {
listener.onIgnoreRecovery(false, "No node to recovery from, retry on next cluster state update");
@ -147,17 +151,17 @@ public class RecoveryTarget extends AbstractComponent {
return;
}
OnGoingRecovery recovery;
PeerRecoveryStatus recovery;
if (fromRetry) {
recovery = onGoingRecoveries.get(request.shardId());
} else {
recovery = new OnGoingRecovery();
recovery = new PeerRecoveryStatus();
onGoingRecoveries.put(request.shardId(), recovery);
}
if (!recoveryThrottler.tryRecovery(shard.shardId(), "peer recovery target")) {
recovery.stage = OnGoingRecovery.Stage.RETRY;
recovery.retryTimeInMillis = System.currentTimeMillis() - recovery.startTimeImMillis;
recovery.stage = PeerRecoveryStatus.Stage.RETRY;
recovery.retryTime = System.currentTimeMillis() - recovery.startTime;
listener.onRetryRecovery(recoveryThrottler.throttleInterval());
return;
}
@ -177,8 +181,8 @@ public class RecoveryTarget extends AbstractComponent {
return;
}
logger.trace("[{}][{}] retrying recovery in [{}], source shard is busy", request.shardId().index().name(), request.shardId().id(), recoveryThrottler.throttleInterval());
recovery.stage = OnGoingRecovery.Stage.RETRY;
recovery.retryTimeInMillis = System.currentTimeMillis() - recovery.startTimeImMillis;
recovery.stage = PeerRecoveryStatus.Stage.RETRY;
recovery.retryTime = System.currentTimeMillis() - recovery.startTime;
listener.onRetryRecovery(recoveryThrottler.throttleInterval());
return;
}
@ -219,8 +223,8 @@ public class RecoveryTarget extends AbstractComponent {
}
if (cause instanceof IndexShardNotStartedException || cause instanceof IndexMissingException || cause instanceof IndexShardMissingException) {
recovery.stage = OnGoingRecovery.Stage.RETRY;
recovery.retryTimeInMillis = System.currentTimeMillis() - recovery.startTimeImMillis;
recovery.stage = PeerRecoveryStatus.Stage.RETRY;
recovery.retryTime = System.currentTimeMillis() - recovery.startTime;
listener.onRetryRecovery(recoveryThrottler.throttleInterval());
return;
}
@ -257,10 +261,10 @@ public class RecoveryTarget extends AbstractComponent {
private void removeAndCleanOnGoingRecovery(ShardId shardId) {
// clean it from the on going recoveries since it is being closed
OnGoingRecovery onGoingRecovery = onGoingRecoveries.remove(shardId);
if (onGoingRecovery != null) {
PeerRecoveryStatus peerRecoveryStatus = onGoingRecoveries.remove(shardId);
if (peerRecoveryStatus != null) {
// clean open index outputs
for (Map.Entry<String, IndexOutput> entry : onGoingRecovery.openIndexOutputs.entrySet()) {
for (Map.Entry<String, IndexOutput> entry : peerRecoveryStatus.openIndexOutputs.entrySet()) {
synchronized (entry.getValue()) {
try {
entry.getValue().close();
@ -269,6 +273,7 @@ public class RecoveryTarget extends AbstractComponent {
}
}
}
peerRecoveryStatus.openIndexOutputs = null;
}
}
@ -281,12 +286,12 @@ public class RecoveryTarget extends AbstractComponent {
@Override public void messageReceived(RecoveryPrepareForTranslogOperationsRequest request, TransportChannel channel) throws Exception {
InternalIndexShard shard = (InternalIndexShard) indicesService.indexServiceSafe(request.shardId().index().name()).shardSafe(request.shardId().id());
OnGoingRecovery onGoingRecovery = onGoingRecoveries.get(shard.shardId());
PeerRecoveryStatus onGoingRecovery = onGoingRecoveries.get(shard.shardId());
if (onGoingRecovery == null) {
// shard is getting closed on us
throw new IndexShardClosedException(shard.shardId());
}
onGoingRecovery.stage = OnGoingRecovery.Stage.TRANSLOG;
onGoingRecovery.stage = PeerRecoveryStatus.Stage.TRANSLOG;
shard.performRecoveryPrepareForTranslog();
channel.sendResponse(VoidStreamable.INSTANCE);
@ -301,13 +306,15 @@ public class RecoveryTarget extends AbstractComponent {
@Override public void messageReceived(RecoveryFinalizeRecoveryRequest request, TransportChannel channel) throws Exception {
InternalIndexShard shard = (InternalIndexShard) indicesService.indexServiceSafe(request.shardId().index().name()).shardSafe(request.shardId().id());
OnGoingRecovery onGoingRecovery = onGoingRecoveries.get(shard.shardId());
if (onGoingRecovery == null) {
PeerRecoveryStatus peerRecoveryStatus = onGoingRecoveries.get(shard.shardId());
if (peerRecoveryStatus == null) {
// shard is getting closed on us
throw new IndexShardClosedException(shard.shardId());
}
onGoingRecovery.stage = OnGoingRecovery.Stage.FINALIZE;
shard.performRecoveryFinalization(false);
peerRecoveryStatus.stage = PeerRecoveryStatus.Stage.FINALIZE;
shard.performRecoveryFinalization(false, peerRecoveryStatus);
peerRecoveryStatus.took = System.currentTimeMillis() - peerRecoveryStatus.startTime;
peerRecoveryStatus.stage = PeerRecoveryStatus.Stage.DONE;
channel.sendResponse(VoidStreamable.INSTANCE);
}
}
@ -325,7 +332,7 @@ public class RecoveryTarget extends AbstractComponent {
shard.performRecoveryOperation(operation);
}
OnGoingRecovery onGoingRecovery = onGoingRecoveries.get(shard.shardId());
PeerRecoveryStatus onGoingRecovery = onGoingRecoveries.get(shard.shardId());
if (onGoingRecovery == null) {
// shard is getting closed on us
throw new IndexShardClosedException(shard.shardId());
@ -344,7 +351,7 @@ public class RecoveryTarget extends AbstractComponent {
@Override public void messageReceived(RecoveryFilesInfoRequest request, TransportChannel channel) throws Exception {
InternalIndexShard shard = (InternalIndexShard) indicesService.indexServiceSafe(request.shardId.index().name()).shardSafe(request.shardId.id());
OnGoingRecovery onGoingRecovery = onGoingRecoveries.get(shard.shardId());
PeerRecoveryStatus onGoingRecovery = onGoingRecoveries.get(shard.shardId());
if (onGoingRecovery == null) {
// shard is getting closed on us
throw new IndexShardClosedException(shard.shardId());
@ -355,7 +362,7 @@ public class RecoveryTarget extends AbstractComponent {
onGoingRecovery.phase1ExistingFileSizes = request.phase1ExistingFileSizes;
onGoingRecovery.phase1TotalSize = request.phase1TotalSize;
onGoingRecovery.phase1ExistingTotalSize = request.phase1ExistingTotalSize;
onGoingRecovery.stage = OnGoingRecovery.Stage.FILES;
onGoingRecovery.stage = PeerRecoveryStatus.Stage.FILES;
channel.sendResponse(VoidStreamable.INSTANCE);
}
}
@ -386,7 +393,7 @@ public class RecoveryTarget extends AbstractComponent {
@Override public void messageReceived(final RecoveryFileChunkRequest request, TransportChannel channel) throws Exception {
InternalIndexShard shard = (InternalIndexShard) indicesService.indexServiceSafe(request.shardId().index().name()).shardSafe(request.shardId().id());
OnGoingRecovery onGoingRecovery = onGoingRecoveries.get(shard.shardId());
PeerRecoveryStatus onGoingRecovery = onGoingRecoveries.get(shard.shardId());
if (onGoingRecovery == null) {
// shard is getting closed on us
throw new IndexShardClosedException(shard.shardId());

View File

@ -45,6 +45,7 @@ import org.elasticsearch.index.query.IndexQueryParserMissingException;
import org.elasticsearch.index.query.IndexQueryParserService;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.shard.*;
import org.elasticsearch.index.shard.recovery.PeerRecoveryStatus;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.indices.TypeMissingException;
@ -83,6 +84,8 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
private volatile ShardRouting shardRouting;
private PeerRecoveryStatus peerRecoveryStatus;
@Inject public InternalIndexShard(ShardId shardId, @IndexSettings Settings indexSettings, Store store, Engine engine, Translog translog,
ThreadPool threadPool, MapperService mapperService, IndexQueryParserService queryParserService, IndexCache indexCache) {
super(shardId, indexSettings);
@ -399,6 +402,18 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
engine.start();
}
/**
* The peer recovery status if this shard recovered from a peer shard.
*/
public PeerRecoveryStatus peerRecoveryStatus() {
return this.peerRecoveryStatus;
}
public void performRecoveryFinalization(boolean withFlush, PeerRecoveryStatus peerRecoveryStatus) throws ElasticSearchException {
performRecoveryFinalization(withFlush);
this.peerRecoveryStatus = peerRecoveryStatus;
}
public void performRecoveryFinalization(boolean withFlush) throws ElasticSearchException {
if (withFlush) {
engine.flush(new Engine.Flush());

View File

@ -377,7 +377,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
}
@Override public void onRecoveryDone() {
shardStateAction.shardStarted(shardRouting, "after recovery (backup) from node [" + request.sourceNode() + "]");
shardStateAction.shardStarted(shardRouting, "after recovery (replica) from node [" + request.sourceNode() + "]");
}
@Override public void onRetryRecovery(TimeValue retryAfter) {

View File

@ -85,27 +85,21 @@ public class RestIndicesStatusAction extends BaseRestHandler {
}
builder.endObject();
if (indexStatus.storeSize() == null) {
builder.nullField("store_size");
builder.nullField("store_size_in_bytes");
} else {
if (indexStatus.storeSize() != null) {
builder.field("store_size", indexStatus.storeSize().toString());
builder.field("store_size_in_bytes", indexStatus.storeSize().bytes());
}
if (indexStatus.estimatedFlushableMemorySize() == null) {
builder.nullField("estimated_flushable_memory_size");
builder.nullField("estimated_flushable_memory_size_in_bytes");
} else {
builder.field("estimated_flushable_memory_size", indexStatus.estimatedFlushableMemorySize().toString());
builder.field("estimated_flushable_memory_size_in_bytes", indexStatus.estimatedFlushableMemorySize().bytes());
if (indexStatus.translogOperations() != -1) {
builder.field("translog_operations", indexStatus.translogOperations());
}
builder.field("translog_operations", indexStatus.translogOperations());
builder.startObject("docs");
builder.field("num_docs", indexStatus.docs().numDocs());
builder.field("max_doc", indexStatus.docs().maxDoc());
builder.field("deleted_docs", indexStatus.docs().deletedDocs());
builder.endObject();
if (indexStatus.docs() != IndexStatus.Docs.UNKNOWN) {
builder.startObject("docs");
builder.field("num_docs", indexStatus.docs().numDocs());
builder.field("max_doc", indexStatus.docs().maxDoc());
builder.field("deleted_docs", indexStatus.docs().deletedDocs());
builder.endObject();
}
builder.startObject("shards");
for (IndexShardStatus indexShardStatus : indexStatus) {
@ -123,28 +117,56 @@ public class RestIndicesStatusAction extends BaseRestHandler {
.endObject();
builder.field("state", shardStatus.state());
if (shardStatus.storeSize() == null) {
builder.nullField("store_size");
builder.nullField("store_size_in_bytes");
} else {
builder.field("store_size", shardStatus.storeSize().toString());
builder.field("store_size_in_bytes", shardStatus.storeSize().bytes());
if (shardStatus.storeSize() != null) {
builder.startObject("index");
builder.field("size", shardStatus.storeSize().toString());
builder.field("size_in_bytes", shardStatus.storeSize().bytes());
builder.endObject();
}
if (shardStatus.estimatedFlushableMemorySize() == null) {
builder.nullField("estimated_flushable_memory_size");
builder.nullField("estimated_flushable_memory_size_in_bytes");
} else {
builder.field("estimated_flushable_memory_size", shardStatus.estimatedFlushableMemorySize().toString());
builder.field("estimated_flushable_memory_size_in_bytes", shardStatus.estimatedFlushableMemorySize().bytes());
if (shardStatus.translogId() != -1) {
builder.startObject("translog");
builder.field("id", shardStatus.translogId());
builder.field("operations", shardStatus.translogOperations());
builder.endObject();
}
builder.field("translog_id", shardStatus.translogId());
builder.field("translog_operations", shardStatus.translogOperations());
builder.startObject("docs");
builder.field("num_docs", shardStatus.docs().numDocs());
builder.field("max_doc", shardStatus.docs().maxDoc());
builder.field("deleted_docs", shardStatus.docs().deletedDocs());
builder.endObject();
if (shardStatus.docs() != ShardStatus.Docs.UNKNOWN) {
builder.startObject("docs");
builder.field("num_docs", shardStatus.docs().numDocs());
builder.field("max_doc", shardStatus.docs().maxDoc());
builder.field("deleted_docs", shardStatus.docs().deletedDocs());
builder.endObject();
}
if (shardStatus.peerRecoveryStatus() != null) {
ShardStatus.PeerRecoveryStatus peerRecoveryStatus = shardStatus.peerRecoveryStatus();
builder.startObject("peer_recovery");
builder.field("stage", peerRecoveryStatus.stage());
builder.field("start_time_in_millis", peerRecoveryStatus.startTime());
if (peerRecoveryStatus.took().millis() > 0) {
builder.field("took", peerRecoveryStatus.took());
builder.field("took_in_millis", peerRecoveryStatus.took().millis());
}
builder.field("retry_time", peerRecoveryStatus.retryTime());
builder.field("retry_time_in_millis", peerRecoveryStatus.retryTime().millis());
builder.startObject("index");
builder.field("size", peerRecoveryStatus.indexSize());
builder.field("size_in_bytes", peerRecoveryStatus.indexSize().bytes());
builder.field("reused_size", peerRecoveryStatus.reusedIndexSize());
builder.field("reused_size_in_bytes", peerRecoveryStatus.reusedIndexSize().bytes());
builder.field("expected_recovered_size", peerRecoveryStatus.expectedRecoveredIndexSize());
builder.field("expected_recovered_size_in_bytes", peerRecoveryStatus.expectedRecoveredIndexSize().bytes());
builder.field("recovered_size", peerRecoveryStatus.recoveredIndexSize());
builder.field("recovered_size_in_bytes", peerRecoveryStatus.recoveredIndexSize().bytes());
builder.endObject();
builder.startObject("translog");
builder.field("recovered", peerRecoveryStatus.recoveredTranslogOperations());
builder.endObject();
builder.endObject();
}
builder.endObject();
}