Recovery: RecoveryState clean up

To support the `_recovery` API, the recovery process keeps track of current progress in a class called RecoveryState. This class currently have some issues, mostly around concurrency (see #6644 ). This PR cleans it up as well as other issues around it:

- Make the Index subsection API cleaner:
- remove redundant information - all calculation is done based on the underlying file map
- clearer definition of what is what: total files, vs reused files (local files that match the source) vs recovered files (copied over). % based progress is reported based on recovered files only.
- cleaned up json response to match other API (sadly this breaks the structure). We now properly report human values for dates and other units.
- Add more robust unit testing
- Detail flag was passed along as state (it's now a ToXContent param)
- State lookup during reporting is now always done via the IndexShard , no more fall backs to many other classes.
- Cleanup APIs around time and move the little computations to the state class as opposed to doing them out of the API

I also improved error messages out of the REST testing infra for things I run into.

Closes #6644
Closes #9811
This commit is contained in:
Boaz Leskes 2015-02-10 08:54:03 +01:00
parent 99714ee1bd
commit 3e32dd985a
35 changed files with 669 additions and 588 deletions

View File

@ -8,29 +8,30 @@ For example, the following command would show recovery information for the indic
[source,js]
--------------------------------------------------
curl -XGET http://localhost:9200/index1,index2/_recovery?pretty=true
curl -XGET http://localhost:9200/index1,index2/_recovery
--------------------------------------------------
To see cluster-wide recovery status simply leave out the index names.
[source,js]
--------------------------------------------------
curl -XGET http://localhost:9200/_recovery?pretty=true
curl -XGET http://localhost:9200/_recovery?pretty&human
--------------------------------------------------
Response:
coming[1.5.0, this syntax was change to fix inconsistencies with other API]
[source,js]
--------------------------------------------------
{
"index1" : {
"shards" : [ {
"id" : 0,
"type" : "snapshot",
"stage" : "index",
"type" : "SNAPSHOT",
"stage" : "INDEX",
"primary" : true,
"start_time" : "2014-02-24T12:15:59.716",
"stop_time" : 0,
"start_time_in_millis": 1393244159716,
"total_time" : "2.9m"
"total_time_in_millis" : 175576,
"source" : {
"repository" : "my_repository",
@ -44,26 +45,33 @@ Response:
"name" : "my_es_node"
},
"index" : {
"size" : {
"total" : "75.4mb"
"total_in_bytes" : 79063092,
"reused" : "0b",
"reused_in_bytes" : 0,
"recovered" : "65.7mb",
"recovered_in_bytes" : 68891939,
"percent" : "87.1%"
},
"files" : {
"total" : 73,
"reused" : 0,
"recovered" : 69,
"percent" : "94.5%"
},
"bytes" : {
"total" : 79063092,
"reused" : 0,
"recovered" : 68891939,
"percent" : "87.1%"
},
"total_time" : "0s",
"total_time_in_millis" : 0
},
"translog" : {
"recovered" : 0,
"total_time" : "0s",
"total_time_in_millis" : 0
},
"start" : {
"check_index_time" : 0,
"check_index_time" : "0s",
"check_index_time_in_millis" : 0,
"total_time" : "0s",
"total_time_in_millis" : 0
}
} ]
@ -80,9 +88,10 @@ In some cases a higher level of detail may be preferable. Setting "detailed=true
[source,js]
--------------------------------------------------
curl -XGET http://localhost:9200/_recovery?pretty=true&detailed=true
curl -XGET http://localhost:9200/_recovery?pretty&human&detailed=true
--------------------------------------------------
coming[1.5.0, this syntax was change to fix inconsistencies with other API]
Response:
[source,js]
@ -91,11 +100,14 @@ Response:
"index1" : {
"shards" : [ {
"id" : 0,
"type" : "gateway",
"stage" : "done",
"type" : "GATEWAY",
"stage" : "DONE",
"primary" : true,
"start_time" : "2014-02-24T12:38:06.349",
"start_time_in_millis" : "1393245486349",
"stop_time" : "2014-02-24T12:38:08.464",
"stop_time_in_millis" : "1393245488464",
"total_time" : "2.1s",
"total_time_in_millis" : 2115,
"source" : {
"id" : "RGMdRc-yQWWKIBM4DGvwqQ",
@ -110,10 +122,19 @@ Response:
"name" : "my_es_node"
},
"index" : {
"size" : {
"total" : "24.7mb",
"total_in_bytes" : 26001617,
"reused" : "24.7mb",
"reused_in_bytes" : 26001617,
"recovered" : "0b",
"recovered_in_bytes" : 0,
"percent" : "100.0%"
},
"files" : {
"total" : 26,
"reused" : 26,
"recovered" : 26,
"recovered" : 0,
"percent" : "100.0%",
"details" : [ {
"name" : "segments.gen",
@ -131,20 +152,17 @@ Response:
...
]
},
"bytes" : {
"total" : 26001617,
"reused" : 26001617,
"recovered" : 26001617,
"percent" : "100.0%"
},
"total_time" : "2ms",
"total_time_in_millis" : 2
},
"translog" : {
"recovered" : 71,
"total_time" : "2.0s",
"total_time_in_millis" : 2025
},
"start" : {
"check_index_time" : 0,
"total_time" : "88ms",
"total_time_in_millis" : 88
}
} ]

View File

@ -40,6 +40,8 @@
\d+\.\d+% \s+ # files_percent
\d+ \s+ # bytes
\d+\.\d+% \s+ # bytes_percent
\d+ \s+ # total_files
\d+ \s+ # total_bytes
\n
)+
$/

View File

@ -28,10 +28,10 @@
- gte: { test_1.shards.0.index.files.reused: 0 }
- gte: { test_1.shards.0.index.files.recovered: 0 }
- match: { test_1.shards.0.index.files.percent: /^\d+\.\d\%$/ }
- gte: { test_1.shards.0.index.bytes.total: 0 }
- gte: { test_1.shards.0.index.bytes.reused: 0 }
- gte: { test_1.shards.0.index.bytes.recovered: 0 }
- match: { test_1.shards.0.index.bytes.percent: /^\d+\.\d\%$/ }
- gte: { test_1.shards.0.index.size.total_in_bytes: 0 }
- gte: { test_1.shards.0.index.size.reused_in_bytes: 0 }
- gte: { test_1.shards.0.index.size.recovered_in_bytes: 0 }
- match: { test_1.shards.0.index.size.percent: /^\d+\.\d\%$/ }
- gte: { test_1.shards.0.translog.recovered: 0 }
- gte: { test_1.shards.0.translog.total_time_in_millis: 0 }
- gte: { test_1.shards.0.start.check_index_time_in_millis: 0 }

View File

@ -28,9 +28,9 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.HashMap;
/**
* Information regarding the recovery state of indices and their associated shards.
@ -64,17 +64,6 @@ public class RecoveryResponse extends BroadcastOperationResponse implements ToXC
return shardResponses.size() > 0;
}
public void addShardRecovery(String index, ShardRecoveryResponse shardRecoveryResponse) {
List<ShardRecoveryResponse> shardRecoveries = shardResponses.get(index);
if (shardRecoveries == null) {
shardRecoveries = new ArrayList<>();
shardResponses.put(index, shardRecoveries);
}
shardRecoveries.add(shardRecoveryResponse);
}
public boolean detailed() {
return detailed;
}
@ -99,7 +88,6 @@ public class RecoveryResponse extends BroadcastOperationResponse implements ToXC
builder.startArray("shards");
for (ShardRecoveryResponse recoveryResponse : responses) {
builder.startObject();
recoveryResponse.detailed(this.detailed);
recoveryResponse.toXContent(builder, params);
builder.endObject();
}

View File

@ -20,6 +20,7 @@
package org.elasticsearch.action.admin.indices.recovery;
import org.elasticsearch.action.support.broadcast.BroadcastShardOperationResponse;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ToXContent;
@ -35,7 +36,6 @@ import java.io.IOException;
public class ShardRecoveryResponse extends BroadcastShardOperationResponse implements ToXContent {
RecoveryState recoveryState;
private boolean detailed = false;
public ShardRecoveryResponse() { }
@ -58,23 +58,15 @@ public class ShardRecoveryResponse extends BroadcastShardOperationResponse imple
}
/**
* Gets the recovery state information for the shard.
* Gets the recovery state information for the shard. Null if shard wasn't recovered / recovery didn't start yet.
*
* @return Recovery state
*/
@Nullable
public RecoveryState recoveryState() {
return recoveryState;
}
public boolean detailed() {
return detailed;
}
public void detailed(boolean detailed) {
this.detailed = detailed;
this.recoveryState.setDetailed(detailed);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
recoveryState.toXContent(builder, params);

View File

@ -34,13 +34,11 @@ import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.gateway.IndexShardGatewayService;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.indices.recovery.RecoveryTarget;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
@ -58,15 +56,13 @@ public class TransportRecoveryAction extends
TransportBroadcastOperationAction<RecoveryRequest, RecoveryResponse, TransportRecoveryAction.ShardRecoveryRequest, ShardRecoveryResponse> {
private final IndicesService indicesService;
private final RecoveryTarget recoveryTarget;
@Inject
public TransportRecoveryAction(Settings settings, ThreadPool threadPool, ClusterService clusterService,
TransportService transportService, IndicesService indicesService, RecoveryTarget recoveryTarget, ActionFilters actionFilters) {
TransportService transportService, IndicesService indicesService, ActionFilters actionFilters) {
super(settings, RecoveryAction.NAME, threadPool, clusterService, transportService, actionFilters);
this.indicesService = indicesService;
this.recoveryTarget = recoveryTarget;
}
@Override
@ -100,6 +96,12 @@ public class TransportRecoveryAction extends
} else {
ShardRecoveryResponse recoveryResponse = (ShardRecoveryResponse) shardResponse;
successfulShards++;
if (recoveryResponse.recoveryState() == null) {
// recovery not yet started
continue;
}
String indexName = recoveryResponse.getIndex();
List<ShardRecoveryResponse> responses = shardResponses.get(indexName);
@ -146,17 +148,6 @@ public class TransportRecoveryAction extends
ShardRecoveryResponse shardRecoveryResponse = new ShardRecoveryResponse(request.shardId());
RecoveryState state = indexShard.recoveryState();
if (state == null) {
state = recoveryTarget.recoveryState(indexShard);
}
if (state == null) {
IndexShardGatewayService gatewayService =
indexService.shardInjectorSafe(request.shardId().id()).getInstance(IndexShardGatewayService.class);
state = gatewayService.recoveryState();
}
shardRecoveryResponse.recoveryState(state);
return shardRecoveryResponse;
}

View File

@ -36,16 +36,13 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.CancellableThreads;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.gateway.IndexShardGateway;
import org.elasticsearch.index.gateway.IndexShardGatewayRecoveryException;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.store.DirectoryUtils;
import org.elasticsearch.index.translog.*;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.rest.RestStatus;
@ -56,7 +53,6 @@ import java.io.EOFException;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.util.Arrays;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
@ -101,6 +97,9 @@ public class IndexShardGateway extends AbstractIndexShardComponent implements Cl
}
}
/**
* Recovers the state of the shard from the gateway.
*/
public void recover(boolean indexShouldExists, RecoveryState recoveryState) throws IndexShardGatewayRecoveryException {
recoveryState.getIndex().startTime(System.currentTimeMillis());
recoveryState.setStage(RecoveryState.Stage.INDEX);
@ -151,40 +150,31 @@ public class IndexShardGateway extends AbstractIndexShardComponent implements Cl
throw new IndexShardGatewayRecoveryException(shardId(), "failed to fetch index version after copying it over", e);
}
recoveryState.getIndex().updateVersion(version);
recoveryState.getIndex().time(System.currentTimeMillis() - recoveryState.getIndex().startTime());
recoveryState.getIndex().stopTime(System.currentTimeMillis());
// since we recover from local, just fill the files and size
try {
int numberOfFiles = 0;
long totalSizeInBytes = 0;
final RecoveryState.Index index = recoveryState.getIndex();
if (si != null) {
final Directory directory = indexShard.store().directory();
for (String name : Lucene.files(si)) {
numberOfFiles++;
long length = directory.fileLength(name);
totalSizeInBytes += length;
recoveryState.getIndex().addFileDetail(name, length, length);
index.addFileDetail(name, length, true);
}
}
RecoveryState.Index index = recoveryState.getIndex();
index.totalFileCount(numberOfFiles);
index.totalByteCount(totalSizeInBytes);
index.reusedFileCount(numberOfFiles);
index.reusedByteCount(totalSizeInBytes);
index.recoveredFileCount(numberOfFiles);
index.recoveredByteCount(totalSizeInBytes);
} catch (Exception e) {
// ignore
} catch (IOException e) {
logger.debug("failed to list file details", e);
}
recoveryState.getStart().startTime(System.currentTimeMillis());
final RecoveryState.Start stateStart = recoveryState.getStart();
stateStart.startTime(System.currentTimeMillis());
recoveryState.setStage(RecoveryState.Stage.START);
if (translogId == -1) {
// no translog files, bail
indexShard.postRecovery("post recovery from gateway, no translog for id [" + translogId + "]");
// no index, just start the shard and bail
recoveryState.getStart().time(System.currentTimeMillis() - recoveryState.getStart().startTime());
recoveryState.getStart().checkIndexTime(indexShard.checkIndexTook());
stateStart.stopTime(System.currentTimeMillis());
stateStart.checkIndexTime(indexShard.checkIndexTook());
return;
}
@ -209,15 +199,15 @@ public class IndexShardGateway extends AbstractIndexShardComponent implements Cl
// no translog files, bail
indexShard.postRecovery("post recovery from gateway, no translog");
// no index, just start the shard and bail
recoveryState.getStart().time(System.currentTimeMillis() - recoveryState.getStart().startTime());
recoveryState.getStart().checkIndexTime(indexShard.checkIndexTook());
stateStart.stopTime(System.currentTimeMillis());
stateStart.checkIndexTime(0);
return;
}
// recover from the translog file
indexShard.performRecoveryPrepareForTranslog();
recoveryState.getStart().time(System.currentTimeMillis() - recoveryState.getStart().startTime());
recoveryState.getStart().checkIndexTime(indexShard.checkIndexTook());
stateStart.stopTime(System.currentTimeMillis());
stateStart.checkIndexTime(0);
recoveryState.getTranslog().startTime(System.currentTimeMillis());
recoveryState.setStage(RecoveryState.Stage.TRANSLOG);

View File

@ -26,7 +26,6 @@ import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.shard.*;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.snapshots.IndexShardSnapshotAndRestoreService;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.threadpool.ThreadPool;
@ -51,8 +50,6 @@ public class IndexShardGatewayService extends AbstractIndexShardComponent implem
private final IndexShardSnapshotAndRestoreService snapshotService;
private RecoveryState recoveryState;
@Inject
public IndexShardGatewayService(ShardId shardId, @IndexSettings Settings indexSettings, ThreadPool threadPool,
IndexShard indexShard, IndexShardGateway shardGateway, IndexShardSnapshotAndRestoreService snapshotService, ClusterService clusterService) {
@ -61,8 +58,6 @@ public class IndexShardGatewayService extends AbstractIndexShardComponent implem
this.indexShard = indexShard;
this.shardGateway = shardGateway;
this.snapshotService = snapshotService;
this.recoveryState = new RecoveryState(shardId);
this.recoveryState.setType(RecoveryState.Type.GATEWAY);
this.clusterService = clusterService;
}
@ -80,13 +75,6 @@ public class IndexShardGatewayService extends AbstractIndexShardComponent implem
void onRecoveryFailed(IndexShardGatewayRecoveryException e);
}
public RecoveryState recoveryState() {
if (recoveryState.getTimer().startTime() > 0 && recoveryState.getStage() != RecoveryState.Stage.DONE) {
recoveryState.getTimer().time(System.currentTimeMillis() - recoveryState.getTimer().startTime());
}
return recoveryState;
}
/**
* Recovers the state of the shard from the gateway.
*/
@ -102,9 +90,9 @@ public class IndexShardGatewayService extends AbstractIndexShardComponent implem
}
try {
if (indexShard.routingEntry().restoreSource() != null) {
indexShard.recovering("from snapshot");
indexShard.recovering("from snapshot", RecoveryState.Type.SNAPSHOT, indexShard.routingEntry().restoreSource());
} else {
indexShard.recovering("from gateway");
indexShard.recovering("from gateway", RecoveryState.Type.GATEWAY, clusterService.localNode());
}
} catch (IllegalIndexShardStateException e) {
// that's fine, since we might be called concurrently, just ignore this, we are already recovering
@ -112,24 +100,20 @@ public class IndexShardGatewayService extends AbstractIndexShardComponent implem
return;
}
final RecoveryState recoveryState = indexShard.recoveryState();
threadPool.generic().execute(new Runnable() {
@Override
public void run() {
recoveryState.getTimer().startTime(System.currentTimeMillis());
recoveryState.setTargetNode(clusterService.localNode());
recoveryState.setStage(RecoveryState.Stage.INIT);
recoveryState.setPrimary(indexShard.routingEntry().primary());
try {
if (indexShard.routingEntry().restoreSource() != null) {
logger.debug("restoring from {} ...", indexShard.routingEntry().restoreSource());
recoveryState.setType(RecoveryState.Type.SNAPSHOT);
recoveryState.setRestoreSource(indexShard.routingEntry().restoreSource());
snapshotService.restore(recoveryState);
} else {
logger.debug("starting recovery from {} ...", shardGateway);
recoveryState.setType(RecoveryState.Type.GATEWAY);
recoveryState.setSourceNode(clusterService.localNode());
shardGateway.recover(indexShouldExists, recoveryState);
}
@ -145,17 +129,23 @@ public class IndexShardGatewayService extends AbstractIndexShardComponent implem
// refresh the shard
indexShard.refresh("post_gateway");
recoveryState.getTimer().time(System.currentTimeMillis() - recoveryState.getTimer().startTime());
recoveryState.setStage(RecoveryState.Stage.DONE);
if (logger.isTraceEnabled()) {
StringBuilder sb = new StringBuilder();
sb.append("recovery completed from ").append(shardGateway).append(", took [").append(timeValueMillis(recoveryState.getTimer().time())).append("]\n");
sb.append(" index : files [").append(recoveryState.getIndex().totalFileCount()).append("] with total_size [").append(new ByteSizeValue(recoveryState.getIndex().totalByteCount())).append("], took[").append(TimeValue.timeValueMillis(recoveryState.getIndex().time())).append("]\n");
sb.append(" : recovered_files [").append(recoveryState.getIndex().numberOfRecoveredFiles()).append("] with total_size [").append(new ByteSizeValue(recoveryState.getIndex().recoveredTotalSize())).append("]\n");
sb.append(" : reusing_files [").append(recoveryState.getIndex().reusedFileCount()).append("] with total_size [").append(new ByteSizeValue(recoveryState.getIndex().reusedByteCount())).append("]\n");
sb.append(" start : took [").append(TimeValue.timeValueMillis(recoveryState.getStart().time())).append("], check_index [").append(timeValueMillis(recoveryState.getStart().checkIndexTime())).append("]\n");
sb.append(" translog : number_of_operations [").append(recoveryState.getTranslog().currentTranslogOperations()).append("], took [").append(TimeValue.timeValueMillis(recoveryState.getTranslog().time())).append("]");
RecoveryState.Index index = recoveryState.getIndex();
sb.append(" index : files [").append(index.totalFileCount()).append("] with total_size [")
.append(new ByteSizeValue(index.totalBytes())).append("], took[")
.append(TimeValue.timeValueMillis(index.time())).append("]\n");
sb.append(" : recovered_files [").append(index.recoveredFileCount()).append("] with total_size [")
.append(new ByteSizeValue(index.recoveredBytes())).append("]\n");
sb.append(" : reusing_files [").append(index.reusedFileCount()).append("] with total_size [")
.append(new ByteSizeValue(index.reusedBytes())).append("]\n");
sb.append(" start : took [").append(TimeValue.timeValueMillis(recoveryState.getStart().time())).append("], check_index [")
.append(timeValueMillis(recoveryState.getStart().checkIndexTime())).append("]\n");
sb.append(" translog : number_of_operations [").append(recoveryState.getTranslog().currentTranslogOperations())
.append("], took [").append(TimeValue.timeValueMillis(recoveryState.getTranslog().time())).append("]");
logger.trace(sb.toString());
} else if (logger.isDebugEnabled()) {
logger.debug("recovery completed from [{}], took [{}]", shardGateway, timeValueMillis(recoveryState.getTimer().time()));

View File

@ -32,9 +32,12 @@ import org.apache.lucene.util.ThreadInterruptedException;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.action.WriteFailureException;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.admin.indices.optimize.OptimizeRequest;
import org.elasticsearch.action.WriteFailureException;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.RestoreSource;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.common.Booleans;
@ -43,6 +46,7 @@ import org.elasticsearch.common.Preconditions;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.component.Lifecycle;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.lucene.Lucene;
@ -64,12 +68,6 @@ import org.elasticsearch.index.codec.CodecService;
import org.elasticsearch.index.deletionpolicy.SnapshotDeletionPolicy;
import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit;
import org.elasticsearch.index.engine.*;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineClosedException;
import org.elasticsearch.index.engine.EngineException;
import org.elasticsearch.index.engine.IgnoreOnRecoveryEngineException;
import org.elasticsearch.index.engine.RefreshFailedEngineException;
import org.elasticsearch.index.engine.SegmentsStats;
import org.elasticsearch.index.fielddata.FieldDataStats;
import org.elasticsearch.index.fielddata.IndexFieldDataService;
import org.elasticsearch.index.fielddata.ShardFieldData;
@ -153,6 +151,7 @@ public class IndexShard extends AbstractIndexShardComponent {
private final IndexService indexService;
private final ShardSuggestService shardSuggestService;
private final ShardBitsetFilterCache shardBitsetFilterCache;
private final DiscoveryNode localNode;
private final Object mutex = new Object();
private final String checkIndexOnStartup;
@ -192,7 +191,8 @@ public class IndexShard extends AbstractIndexShardComponent {
ThreadPool threadPool, MapperService mapperService, IndexQueryParserService queryParserService, IndexCache indexCache, IndexAliasesService indexAliasesService, ShardIndexingService indexingService, ShardGetService getService, ShardSearchService searchService, ShardIndexWarmerService shardWarmerService,
ShardFilterCache shardFilterCache, ShardFieldData shardFieldData, PercolatorQueriesRegistry percolatorQueriesRegistry, ShardPercolateService shardPercolateService, CodecService codecService,
ShardTermVectorsService termVectorsService, IndexFieldDataService indexFieldDataService, IndexService indexService, ShardSuggestService shardSuggestService, ShardQueryCache shardQueryCache, ShardBitsetFilterCache shardBitsetFilterCache,
@Nullable IndicesWarmer warmer, SnapshotDeletionPolicy deletionPolicy, SimilarityService similarityService, MergePolicyProvider mergePolicyProvider, EngineFactory factory) {
@Nullable IndicesWarmer warmer, SnapshotDeletionPolicy deletionPolicy, SimilarityService similarityService, MergePolicyProvider mergePolicyProvider, EngineFactory factory,
ClusterService clusterService) {
super(shardId, indexSettings);
Preconditions.checkNotNull(store, "Store must be provided to the index shard");
Preconditions.checkNotNull(deletionPolicy, "Snapshot deletion policy must be provided to the index shard");
@ -223,6 +223,8 @@ public class IndexShard extends AbstractIndexShardComponent {
this.codecService = codecService;
this.shardSuggestService = shardSuggestService;
this.shardBitsetFilterCache = shardBitsetFilterCache;
assert clusterService.lifecycleState() == Lifecycle.State.STARTED; // otherwise localNode is still none;
this.localNode = clusterService.localNode();
state = IndexShardState.CREATED;
this.refreshInterval = indexSettings.getAsTime(INDEX_REFRESH_INTERVAL, EngineConfig.DEFAULT_REFRESH_INTERVAL);
this.flushOnClose = indexSettings.getAsBoolean(INDEX_FLUSH_ON_CLOSE, true);
@ -358,10 +360,23 @@ public class IndexShard extends AbstractIndexShardComponent {
return this;
}
/**
* Marks the shard as recovering, fails with exception is recovering is not allowed to be set.
* Marks the shard as recovering based on a remote or local node, fails with exception is recovering is not allowed to be set.
*/
public IndexShardState recovering(String reason) throws IndexShardStartedException,
public IndexShardState recovering(String reason, RecoveryState.Type type, DiscoveryNode sourceNode) throws IndexShardStartedException,
IndexShardRelocatedException, IndexShardRecoveringException, IndexShardClosedException {
return recovering(reason, new RecoveryState(shardId, shardRouting.primary(), type, sourceNode, localNode));
}
/**
* Marks the shard as recovering based on a restore, fails with exception is recovering is not allowed to be set.
*/
public IndexShardState recovering(String reason, RecoveryState.Type type, RestoreSource restoreSource) throws IndexShardStartedException {
return recovering(reason, new RecoveryState(shardId, shardRouting.primary(), type, restoreSource, localNode));
}
private IndexShardState recovering(String reason, RecoveryState recoveryState) throws IndexShardStartedException,
IndexShardRelocatedException, IndexShardRecoveringException, IndexShardClosedException {
synchronized (mutex) {
if (state == IndexShardState.CLOSED) {
@ -379,6 +394,7 @@ public class IndexShard extends AbstractIndexShardComponent {
if (state == IndexShardState.POST_RECOVERY) {
throw new IndexShardRecoveringException(shardId);
}
this.recoveryState = recoveryState;
return changeState(IndexShardState.RECOVERING, reason);
}
}
@ -766,17 +782,13 @@ public class IndexShard extends AbstractIndexShardComponent {
}
/**
* The peer recovery state if this shard recovered from a peer shard, null o.w.
* Returns the current {@link RecoveryState} if this shard is recovering or has been recovering.
* Returns null if the recovery has not yet started or shard was not recovered (created via an API).
*/
public RecoveryState recoveryState() {
return this.recoveryState;
}
public void performRecoveryFinalization(boolean withFlush, RecoveryState recoveryState) throws ElasticsearchException {
performRecoveryFinalization(withFlush);
this.recoveryState = recoveryState;
}
public void performRecoveryFinalization(boolean withFlush) throws ElasticsearchException {
if (withFlush) {
engine().flush();
@ -1171,7 +1183,7 @@ public class IndexShard extends AbstractIndexShardComponent {
return engine;
}
class ShardEngineFailListener implements Engine.FailedEngineListener {
class ShardEngineFailListener implements Engine.FailedEngineListener {
private final CopyOnWriteArrayList<Engine.FailedEngineListener> delegates = new CopyOnWriteArrayList<>();
// called by the current engine

View File

@ -18,8 +18,8 @@
*/
package org.elasticsearch.index.shard;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.inject.Inject;
@ -33,7 +33,6 @@ import org.elasticsearch.index.cache.query.ShardQueryCache;
import org.elasticsearch.index.codec.CodecService;
import org.elasticsearch.index.deletionpolicy.SnapshotDeletionPolicy;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineClosedException;
import org.elasticsearch.index.engine.EngineFactory;
import org.elasticsearch.index.fielddata.IndexFieldDataService;
import org.elasticsearch.index.fielddata.ShardFieldData;
@ -82,14 +81,14 @@ public final class ShadowIndexShard extends IndexShard {
IndexService indexService, ShardSuggestService shardSuggestService, ShardQueryCache shardQueryCache,
ShardBitsetFilterCache shardBitsetFilterCache, @Nullable IndicesWarmer warmer,
SnapshotDeletionPolicy deletionPolicy, SimilarityService similarityService,
MergePolicyProvider mergePolicyProvider, EngineFactory factory) {
MergePolicyProvider mergePolicyProvider, EngineFactory factory, ClusterService clusterService) {
super(shardId, indexSettings, indexSettingsService, indicesLifecycle, store, mergeScheduler,
translog, threadPool, mapperService, queryParserService, indexCache, indexAliasesService,
indexingService, getService, searchService, shardWarmerService, shardFilterCache,
shardFieldData, percolatorQueriesRegistry, shardPercolateService, codecService,
termVectorsService, indexFieldDataService, indexService, shardSuggestService,
shardQueryCache, shardBitsetFilterCache, warmer, deletionPolicy, similarityService,
mergePolicyProvider, factory);
mergePolicyProvider, factory, clusterService);
}
/**

View File

@ -22,7 +22,10 @@ package org.elasticsearch.index.snapshots.blobstore;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import org.apache.lucene.index.*;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.index.IndexFormatTooNewException;
import org.apache.lucene.index.IndexFormatTooOldException;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput;
@ -45,8 +48,8 @@ import org.elasticsearch.common.lucene.store.InputStreamIndexInput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.xcontent.*;
import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.snapshots.*;
import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo;
@ -160,7 +163,7 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements
try {
recoveryState.getIndex().startTime(System.currentTimeMillis());
snapshotContext.restore();
recoveryState.getIndex().time(System.currentTimeMillis() - recoveryState.getIndex().startTime());
recoveryState.getIndex().stopTime(System.currentTimeMillis());
} catch (Throwable e) {
throw new IndexShardRestoreFailedException(shardId, "failed to restore snapshot [" + snapshotId.getSnapshot() + "]", e);
}
@ -710,10 +713,6 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements
BlobStoreIndexShardSnapshot snapshot = loadSnapshot();
recoveryState.setStage(RecoveryState.Stage.INDEX);
int numberOfFiles = 0;
long totalSize = 0;
int numberOfReusedFiles = 0;
long reusedTotalSize = 0;
final Store.MetadataSnapshot recoveryTargetMetadata;
try {
recoveryTargetMetadata = store.getMetadataOrEmpty();
@ -743,11 +742,7 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements
final Store.RecoveryDiff diff = sourceMetaData.recoveryDiff(recoveryTargetMetadata);
for (StoreFileMetaData md : diff.identical) {
FileInfo fileInfo = fileInfos.get(md.name());
numberOfFiles++;
totalSize += md.length();
numberOfReusedFiles++;
reusedTotalSize += md.length();
recoveryState.getIndex().addReusedFileDetail(fileInfo.name(), fileInfo.length());
recoveryState.getIndex().addFileDetail(fileInfo.name(), fileInfo.length(), true);
if (logger.isTraceEnabled()) {
logger.trace("[{}] [{}] not_recovering [{}] from [{}], exists in local store and is same", shardId, snapshotId, fileInfo.physicalName(), fileInfo.name());
}
@ -755,10 +750,8 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements
for (StoreFileMetaData md : Iterables.concat(diff.different, diff.missing)) {
FileInfo fileInfo = fileInfos.get(md.name());
numberOfFiles++;
totalSize += fileInfo.length();
filesToRecover.add(fileInfo);
recoveryState.getIndex().addFileDetail(fileInfo.name(), fileInfo.length());
recoveryState.getIndex().addFileDetail(fileInfo.name(), fileInfo.length(), false);
if (logger.isTraceEnabled()) {
if (md == null) {
logger.trace("[{}] [{}] recovering [{}] from [{}], does not exists in local store", shardId, snapshotId, fileInfo.physicalName(), fileInfo.name());
@ -768,16 +761,13 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements
}
}
final RecoveryState.Index index = recoveryState.getIndex();
index.totalFileCount(numberOfFiles);
index.totalByteCount(totalSize);
index.reusedFileCount(numberOfReusedFiles);
index.reusedByteCount(reusedTotalSize);
if (filesToRecover.isEmpty()) {
logger.trace("no files to recover, all exists within the local store");
}
if (logger.isTraceEnabled()) {
logger.trace("[{}] [{}] recovering_files [{}] with total_size [{}], reusing_files [{}] with reused_size [{}]", shardId, snapshotId, numberOfFiles, new ByteSizeValue(totalSize), numberOfReusedFiles, new ByteSizeValue(reusedTotalSize));
logger.trace("[{}] [{}] recovering_files [{}] with total_size [{}], reusing_files [{}] with reused_size [{}]", shardId, snapshotId,
index.totalRecoverFiles(), new ByteSizeValue(index.totalRecoverBytes()), index.reusedFileCount(), new ByteSizeValue(index.reusedFileCount()));
}
try {
for (final FileInfo fileToRecover : filesToRecover) {
@ -814,16 +804,13 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements
*/
private void restoreFile(final FileInfo fileInfo) throws IOException {
boolean success = false;
RecoveryState.File file = recoveryState.getIndex().file(fileInfo.name());
try (InputStream stream = new PartSliceStream(blobContainer, fileInfo)) {
try (final IndexOutput indexOutput = store.createVerifyingOutput(fileInfo.physicalName(), fileInfo.metadata(), IOContext.DEFAULT)) {
final byte[] buffer = new byte[BUFFER_SIZE];
int length;
while((length=stream.read(buffer))>0){
indexOutput.writeBytes(buffer,0,length);
if (file != null) {
file.updateRecovered(length);
}
recoveryState.getIndex().addRecoveredBytesToFile(fileInfo.name(), length);
if (restoreRateLimiter != null) {
rateLimiterListener.onRestorePause(restoreRateLimiter.pause(length));
}
@ -838,7 +825,6 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements
}
store.directory().sync(Collections.singleton(fileInfo.physicalName()));
recoveryState.getIndex().addRecoveredFileCount(1);
success = true;
} catch (CorruptIndexException | IndexFormatTooOldException | IndexFormatTooNewException ex) {
try {

View File

@ -47,6 +47,7 @@ import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.IndexShardAlreadyExistsException;
import org.elasticsearch.index.IndexShardMissingException;
import org.elasticsearch.index.aliases.IndexAlias;
@ -56,18 +57,20 @@ import org.elasticsearch.index.gateway.IndexShardGatewayRecoveryException;
import org.elasticsearch.index.gateway.IndexShardGatewayService;
import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.settings.IndexSettingsService;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.recovery.RecoveryFailedException;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.indices.recovery.RecoveryTarget;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.*;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import static com.google.common.collect.Maps.newHashMap;
@ -846,6 +849,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
}
}
}
private void removeIndex(String index, String reason) {
try {
indicesService.removeIndex(index, reason);

View File

@ -58,9 +58,9 @@ public class RecoveriesCollection {
*
* @return the id of the new recovery.
*/
public long startRecovery(IndexShard indexShard, DiscoveryNode sourceNode, RecoveryState state,
public long startRecovery(IndexShard indexShard, DiscoveryNode sourceNode,
RecoveryTarget.RecoveryListener listener, TimeValue activityTimeout) {
RecoveryStatus status = new RecoveryStatus(indexShard, sourceNode, state, listener);
RecoveryStatus status = new RecoveryStatus(indexShard, sourceNode, listener);
RecoveryStatus existingStatus = onGoingRecoveries.putIfAbsent(status.recoveryId(), status);
assert existingStatus == null : "found two RecoveryStatus instances with the same id";
logger.trace("{} started recovery from {}, id [{}]", indexShard.shardId(), sourceNode, status.recoveryId());
@ -150,6 +150,10 @@ public class RecoveriesCollection {
return null;
}
/** the number of ongoing recoveries */
public int size() {
return onGoingRecoveries.size();
}
/** cancel all ongoing recoveries for the given shard. typically because the shards is closed */
public void cancelRecoveriesForShard(ShardId shardId, String reason) {

View File

@ -40,21 +40,18 @@ class RecoveryFilesInfoRequest extends TransportRequest {
List<Long> phase1FileSizes;
List<String> phase1ExistingFileNames;
List<Long> phase1ExistingFileSizes;
long phase1TotalSize;
long phase1ExistingTotalSize;
RecoveryFilesInfoRequest() {
}
RecoveryFilesInfoRequest(long recoveryId, ShardId shardId, List<String> phase1FileNames, List<Long> phase1FileSizes, List<String> phase1ExistingFileNames, List<Long> phase1ExistingFileSizes, long phase1TotalSize, long phase1ExistingTotalSize) {
RecoveryFilesInfoRequest(long recoveryId, ShardId shardId, List<String> phase1FileNames, List<Long> phase1FileSizes,
List<String> phase1ExistingFileNames, List<Long> phase1ExistingFileSizes) {
this.recoveryId = recoveryId;
this.shardId = shardId;
this.phase1FileNames = phase1FileNames;
this.phase1FileSizes = phase1FileSizes;
this.phase1ExistingFileNames = phase1ExistingFileNames;
this.phase1ExistingFileSizes = phase1ExistingFileSizes;
this.phase1TotalSize = phase1TotalSize;
this.phase1ExistingTotalSize = phase1ExistingTotalSize;
}
public long recoveryId() {
@ -93,9 +90,6 @@ class RecoveryFilesInfoRequest extends TransportRequest {
for (int i = 0; i < size; i++) {
phase1ExistingFileSizes.add(in.readVLong());
}
phase1TotalSize = in.readVLong();
phase1ExistingTotalSize = in.readVLong();
}
@Override
@ -123,8 +117,5 @@ class RecoveryFilesInfoRequest extends TransportRequest {
for (Long phase1ExistingFileSize : phase1ExistingFileSizes) {
out.writeVLong(phase1ExistingFileSize);
}
out.writeVLong(phase1TotalSize);
out.writeVLong(phase1ExistingTotalSize);
}
}

View File

@ -19,25 +19,26 @@
package org.elasticsearch.indices.recovery;
import org.elasticsearch.common.xcontent.XContentBuilderString;
import com.google.common.collect.ImmutableList;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.RestoreSource;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentBuilderString;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.index.shard.ShardId;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Keeps track of state related to shard recovery.
@ -114,10 +115,10 @@ public class RecoveryState implements ToXContent, Streamable {
private volatile Stage stage = Stage.INIT;
private Index index = new Index();
private Translog translog = new Translog();
private Start start = new Start();
private Timer timer = new Timer();
private final Index index = new Index();
private final Translog translog = new Translog();
private final Start start = new Start();
private final Timer timer = new Timer();
private Type type;
private ShardId shardId;
@ -125,13 +126,26 @@ public class RecoveryState implements ToXContent, Streamable {
private DiscoveryNode sourceNode;
private DiscoveryNode targetNode;
private boolean detailed = false;
private boolean primary = false;
private volatile boolean primary = false;
public RecoveryState() { }
private RecoveryState() {
}
public RecoveryState(ShardId shardId) {
public RecoveryState(ShardId shardId, boolean primary, Type type, DiscoveryNode sourceNode, DiscoveryNode targetNode) {
this(shardId, primary, type, sourceNode, null, targetNode);
}
public RecoveryState(ShardId shardId, boolean primary, Type type, RestoreSource restoreSource, DiscoveryNode targetNode) {
this(shardId, primary, type, null, restoreSource, targetNode);
}
private RecoveryState(ShardId shardId, boolean primary, Type type, @Nullable DiscoveryNode sourceNode, @Nullable RestoreSource restoreSource, DiscoveryNode targetNode) {
this.shardId = shardId;
this.primary = primary;
this.type = type;
this.sourceNode = sourceNode;
this.restoreSource = restoreSource;
this.targetNode = targetNode;
}
public ShardId getShardId() {
@ -170,43 +184,18 @@ public class RecoveryState implements ToXContent, Streamable {
return type;
}
public void setType(Type type) {
this.type = type;
}
public void setSourceNode(DiscoveryNode sourceNode) {
this.sourceNode = sourceNode;
}
public DiscoveryNode getSourceNode() {
return sourceNode;
}
public void setTargetNode(DiscoveryNode targetNode) {
this.targetNode = targetNode;
}
public DiscoveryNode getTargetNode() {
return targetNode;
}
public void setRestoreSource(RestoreSource restoreSource) {
this.restoreSource = restoreSource;
}
public RestoreSource getRestoreSource() {
return restoreSource;
}
public void setDetailed(boolean detailed) {
this.detailed = detailed;
this.index.detailed(detailed);
}
public void setPrimary(boolean primary) {
this.primary = primary;
}
public boolean getPrimary() {
return primary;
}
@ -221,7 +210,7 @@ public class RecoveryState implements ToXContent, Streamable {
public void readFrom(StreamInput in) throws IOException {
timer.startTime(in.readVLong());
timer.stopTime(in.readVLong());
timer.time(in.readVLong());
timer.time = in.readVLong();
type = Type.fromId(in.readByte());
stage = Stage.fromId(in.readByte());
shardId = ShardId.readShardId(in);
@ -230,10 +219,9 @@ public class RecoveryState implements ToXContent, Streamable {
if (in.readBoolean()) {
sourceNode = DiscoveryNode.readNode(in);
}
index = Index.readIndex(in);
translog = Translog.readTranslog(in);
start = Start.readStart(in);
detailed = in.readBoolean();
index.readFrom(in);
translog.readFrom(in);
start.readFrom(in);
primary = in.readBoolean();
}
@ -254,7 +242,6 @@ public class RecoveryState implements ToXContent, Streamable {
index.writeTo(out);
translog.writeTo(out);
start.writeTo(out);
out.writeBoolean(detailed);
out.writeBoolean(primary);
}
@ -265,9 +252,11 @@ public class RecoveryState implements ToXContent, Streamable {
builder.field(Fields.TYPE, type.toString());
builder.field(Fields.STAGE, stage.toString());
builder.field(Fields.PRIMARY, primary);
builder.timeValueField(Fields.START_TIME_IN_MILLIS, Fields.START_TIME, timer.startTime);
builder.timeValueField(Fields.STOP_TIME_IN_MILLIS, Fields.STOP_TIME, timer.stopTime);
builder.timeValueField(Fields.TOTAL_TIME_IN_MILLIS, Fields.TOTAL_TIME, timer.time);
builder.dateValueField(Fields.START_TIME_IN_MILLIS, Fields.START_TIME, timer.startTime);
if (timer.stopTime > 0) {
builder.dateValueField(Fields.STOP_TIME_IN_MILLIS, Fields.STOP_TIME, timer.stopTime);
}
builder.timeValueField(Fields.TOTAL_TIME_IN_MILLIS, Fields.TOTAL_TIME, timer.time());
if (restoreSource != null) {
builder.field(Fields.SOURCE);
@ -291,7 +280,6 @@ public class RecoveryState implements ToXContent, Streamable {
builder.endObject();
builder.startObject(Fields.INDEX);
index.detailed(this.detailed);
index.toXContent(builder, params);
builder.endObject();
@ -327,21 +315,25 @@ public class RecoveryState implements ToXContent, Streamable {
static final XContentBuilderString TRANSLOG = new XContentBuilderString("translog");
static final XContentBuilderString START = new XContentBuilderString("start");
static final XContentBuilderString RECOVERED = new XContentBuilderString("recovered");
static final XContentBuilderString RECOVERED_IN_BYTES = new XContentBuilderString("recovered_in_bytes");
static final XContentBuilderString CHECK_INDEX_TIME = new XContentBuilderString("check_index_time");
static final XContentBuilderString CHECK_INDEX_TIME_IN_MILLIS = new XContentBuilderString("check_index_time_in_millis");
static final XContentBuilderString LENGTH = new XContentBuilderString("length");
static final XContentBuilderString LENGTH_IN_BYTES = new XContentBuilderString("length_in_bytes");
static final XContentBuilderString FILES = new XContentBuilderString("files");
static final XContentBuilderString TOTAL = new XContentBuilderString("total");
static final XContentBuilderString TOTAL_IN_BYTES = new XContentBuilderString("total_in_bytes");
static final XContentBuilderString REUSED = new XContentBuilderString("reused");
static final XContentBuilderString REUSED_IN_BYTES = new XContentBuilderString("reused_in_bytes");
static final XContentBuilderString PERCENT = new XContentBuilderString("percent");
static final XContentBuilderString DETAILS = new XContentBuilderString("details");
static final XContentBuilderString BYTES = new XContentBuilderString("bytes");
static final XContentBuilderString SIZE = new XContentBuilderString("size");
}
public static class Timer {
private long startTime = 0;
private long time = 0;
private long stopTime = 0;
private volatile long startTime = 0;
private volatile long time = 0;
private volatile long stopTime = 0;
public long startTime() {
return startTime;
@ -352,11 +344,13 @@ public class RecoveryState implements ToXContent, Streamable {
}
public long time() {
return time;
}
public void time(long time) {
this.time = time;
if (startTime == 0) {
return 0;
}
if (time > 0) {
return time;
}
return Math.max(0, System.currentTimeMillis() - startTime);
}
public long stopTime() {
@ -364,14 +358,15 @@ public class RecoveryState implements ToXContent, Streamable {
}
public void stopTime(long stopTime) {
this.time = Math.max(0, stopTime - startTime);
this.stopTime = stopTime;
}
}
public static class Start implements ToXContent, Streamable {
private long startTime;
private long time;
private long checkIndexTime;
private volatile long startTime;
private volatile long time;
private volatile long checkIndexTime;
public long startTime() {
return this.startTime;
@ -385,8 +380,8 @@ public class RecoveryState implements ToXContent, Streamable {
return this.time;
}
public void time(long time) {
this.time = time;
public void stopTime(long time) {
this.time = Math.max(0, time - startTime);
}
public long checkIndexTime() {
@ -397,12 +392,6 @@ public class RecoveryState implements ToXContent, Streamable {
this.checkIndexTime = checkIndexTime;
}
public static Start readStart(StreamInput in) throws IOException {
Start start = new Start();
start.readFrom(in);
return start;
}
@Override
public void readFrom(StreamInput in) throws IOException {
startTime = in.readVLong();
@ -426,9 +415,9 @@ public class RecoveryState implements ToXContent, Streamable {
}
public static class Translog implements ToXContent, Streamable {
private long startTime = 0;
private long time;
private volatile int currentTranslogOperations = 0;
private volatile long startTime = 0;
private volatile long time;
private final AtomicInteger currentTranslogOperations = new AtomicInteger();
public long startTime() {
return this.startTime;
@ -447,59 +436,83 @@ public class RecoveryState implements ToXContent, Streamable {
}
public void addTranslogOperations(int count) {
this.currentTranslogOperations += count;
this.currentTranslogOperations.addAndGet(count);
}
public void incrementTranslogOperations() {
this.currentTranslogOperations++;
this.currentTranslogOperations.incrementAndGet();
}
public int currentTranslogOperations() {
return this.currentTranslogOperations;
}
public static Translog readTranslog(StreamInput in) throws IOException {
Translog translog = new Translog();
translog.readFrom(in);
return translog;
return this.currentTranslogOperations.get();
}
@Override
public void readFrom(StreamInput in) throws IOException {
startTime = in.readVLong();
time = in.readVLong();
currentTranslogOperations = in.readVInt();
currentTranslogOperations.set(in.readVInt());
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(startTime);
out.writeVLong(time);
out.writeVInt(currentTranslogOperations);
out.writeVInt(currentTranslogOperations.get());
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.field(Fields.RECOVERED, currentTranslogOperations);
builder.field(Fields.RECOVERED, currentTranslogOperations.get());
builder.timeValueField(Fields.TOTAL_TIME_IN_MILLIS, Fields.TOTAL_TIME, time);
return builder;
}
}
public static class File implements ToXContent, Streamable {
String name;
long length;
long recovered;
private String name;
private long length;
private long recovered;
private boolean reused;
public File() { }
public File(String name, long length) {
this.name = name;
this.length = length;
public File() {
}
public void updateRecovered(long length) {
recovered += length;
public File(String name, long length, boolean reused) {
assert name != null;
this.name = name;
this.length = length;
this.reused = reused;
}
void addRecoveredBytes(long bytes) {
assert reused == false : "file is marked as reused, can't update recovered bytes";
assert bytes >= 0 : "can't recovered negative bytes. got [" + bytes + "]";
recovered += bytes;
}
/** file name * */
public String name() {
return name;
}
/** file length * */
public long length() {
return length;
}
/** number of bytes recovered for this file (so far). 0 if the file is reused * */
public long recovered() {
return recovered;
}
/** returns true if the file is reused from a local copy */
public boolean reused() {
return reused;
}
boolean fullyRecovered() {
return reused == false && length == recovered;
}
public static File readFile(StreamInput in) throws IOException {
@ -513,6 +526,7 @@ public class RecoveryState implements ToXContent, Streamable {
name = in.readString();
length = in.readVLong();
recovered = in.readVLong();
reused = in.readBoolean();
}
@Override
@ -520,84 +534,57 @@ public class RecoveryState implements ToXContent, Streamable {
out.writeString(name);
out.writeVLong(length);
out.writeVLong(recovered);
out.writeBoolean(reused);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(Fields.NAME, name);
builder.field(Fields.LENGTH, length);
builder.field(Fields.RECOVERED, recovered);
builder.byteSizeField(Fields.LENGTH_IN_BYTES, Fields.LENGTH, length);
builder.field(Fields.REUSED, reused);
builder.byteSizeField(Fields.RECOVERED_IN_BYTES, Fields.RECOVERED, length);
builder.endObject();
return builder;
}
@Override
public boolean equals(Object obj) {
if (obj instanceof File) {
File other = (File) obj;
return name.equals(other.name) && length == other.length() && reused == other.reused() && recovered == other.recovered();
}
return false;
}
@Override
public String toString() {
return "file (name [" + name + "], reused [" + reused + "], length [" + length + "], recovered [" + recovered + "])";
}
}
public static class Index implements ToXContent, Streamable {
private long startTime = 0;
private long time = 0;
private volatile long startTime = 0;
private volatile long time = 0;
private List<File> fileDetails = new ArrayList<>();
private List<File> reusedFileDetails = new ArrayList<>();
private Map<String, File> fileDetails = ConcurrentCollections.newConcurrentMap();
private long version = -1;
private boolean detailed = false;
private int totalFileCount = 0;
private int reusedFileCount = 0;
private AtomicInteger recoveredFileCount = new AtomicInteger();
private long totalByteCount = 0;
private long reusedByteCount = 0;
private AtomicLong recoveredByteCount = new AtomicLong();
private volatile long version = -1;
public List<File> fileDetails() {
return fileDetails;
return ImmutableList.copyOf(fileDetails.values());
}
public List<File> reusedFileDetails() {
return reusedFileDetails;
public void addFileDetail(String name, long length, boolean reused) {
File file = new File(name, length, reused);
File existing = fileDetails.put(name, file);
assert existing == null : "file [" + name + "] is already reported";
}
public void addFileDetail(String name, long length) {
fileDetails.add(new File(name, length));
}
public void addFileDetail(String name, long length, long recovered) {
File file = new File(name, length);
file.recovered = recovered;
fileDetails.add(file);
}
public void addFileDetails(List<String> names, List<Long> lengths) {
for (int i = 0; i < names.size(); i++) {
fileDetails.add(new File(names.get(i), lengths.get(i)));
}
}
public void addReusedFileDetail(String name, long length) {
reusedFileDetails.add(new File(name, length));
}
public void addReusedFileDetails(List<String> names, List<Long> lengths) {
for (int i = 0; i < names.size(); i++) {
reusedFileDetails.add(new File(names.get(i), lengths.get(i)));
}
}
public File file(String name) {
for (File file : fileDetails) {
if (file.name.equals(name))
return file;
}
for (File file : reusedFileDetails) {
if (file.name.equals(name)) {
return file;
}
}
return null;
public void addRecoveredBytesToFile(String name, long bytes) {
File file = fileDetails.get(name);
file.addRecoveredBytes(bytes);
}
public long startTime() {
@ -612,141 +599,158 @@ public class RecoveryState implements ToXContent, Streamable {
return this.time;
}
public void time(long time) {
this.time = time;
public void stopTime(long stopTime) {
assert stopTime >= 0;
this.time = Math.max(0, stopTime - startTime);
}
public long version() {
return this.version;
}
/** total number of files that are part of this recovery, both re-used and recovered */
public int totalFileCount() {
return totalFileCount;
return fileDetails.size();
}
public void totalFileCount(int totalFileCount) {
this.totalFileCount = totalFileCount;
/** total number of files to be recovered (potentially not yet done) */
public int totalRecoverFiles() {
int total = 0;
for (File file : fileDetails.values()) {
if (file.reused() == false) {
total++;
}
}
return total;
}
/** number of file that were recovered (excluding on ongoing files) */
public int recoveredFileCount() {
return recoveredFileCount.get();
int count = 0;
for (File file : fileDetails.values()) {
if (file.fullyRecovered()) {
count++;
}
}
return count;
}
public void recoveredFileCount(int recoveredFileCount) {
this.recoveredFileCount.set(recoveredFileCount);
}
public void addRecoveredFileCount(int updatedCount) {
this.recoveredFileCount.addAndGet(updatedCount);
}
public float percentFilesRecovered() {
if (totalFileCount == 0) { // indicates we are still in init phase
/** percent of recovered (i.e., not reused) files out of the total files to be recovered */
public float recoveredFilesPercent() {
int total = 0;
int recovered = 0;
for (File file : fileDetails.values()) {
if (file.reused() == false) {
total++;
if (file.fullyRecovered()) {
recovered++;
}
}
}
if (total == 0 && fileDetails.size() == 0) { // indicates we are still in init phase
return 0.0f;
}
final int filesRecovered = recoveredFileCount.get();
if ((totalFileCount - filesRecovered) == 0) {
if (total == recovered) {
return 100.0f;
} else {
float result = 100.0f * (filesRecovered / (float)totalFileCount);
float result = 100.0f * (recovered / (float) total);
return result;
}
}
public int numberOfRecoveredFiles() {
return totalFileCount - reusedFileCount;
/** total number of bytes in th shard */
public long totalBytes() {
long total = 0;
for (File file : fileDetails.values()) {
total += file.length();
}
return total;
}
public long totalByteCount() {
return this.totalByteCount;
/** total number of bytes recovered so far, including both existing and reused */
public long recoveredBytes() {
long recovered = 0;
for (File file : fileDetails.values()) {
recovered += file.recovered();
}
return recovered;
}
public void totalByteCount(long totalByteCount) {
this.totalByteCount = totalByteCount;
/** total bytes of files to be recovered (potentially not yet done) */
public long totalRecoverBytes() {
long total = 0;
for (File file : fileDetails.values()) {
if (file.reused() == false) {
total += file.length();
}
}
return total;
}
public long recoveredByteCount() {
return recoveredByteCount.longValue();
public long totalReuseBytes() {
long total = 0;
for (File file : fileDetails.values()) {
if (file.reused()) {
total += file.length();
}
}
return total;
}
public void recoveredByteCount(long recoveredByteCount) {
this.recoveredByteCount.set(recoveredByteCount);
}
public void addRecoveredByteCount(long updatedSize) {
recoveredByteCount.addAndGet(updatedSize);
}
public long numberOfRecoveredBytes() {
return recoveredByteCount.get() - reusedByteCount;
}
public float percentBytesRecovered() {
if (totalByteCount == 0) { // indicates we are still in init phase
/** percent of bytes recovered out of total files bytes *to be* recovered */
public float recoveredBytesPercent() {
long total = 0;
long recovered = 0;
for (File file : fileDetails.values()) {
if (file.reused() == false) {
total += file.length();
recovered += file.recovered();
}
}
if (total == 0 && fileDetails.size() == 0) {
// indicates we are still in init phase
return 0.0f;
}
final long recByteCount = recoveredByteCount.get();
if ((totalByteCount - recByteCount) == 0) {
if (total == recovered) {
return 100.0f;
} else {
float result = 100.0f * (recByteCount / (float) totalByteCount);
return result;
return 100.0f * recovered / total;
}
}
public int reusedFileCount() {
return reusedFileCount;
int reused = 0;
for (File file : fileDetails.values()) {
if (file.reused()) {
reused++;
}
}
return reused;
}
public void reusedFileCount(int reusedFileCount) {
this.reusedFileCount = reusedFileCount;
}
public long reusedByteCount() {
return this.reusedByteCount;
}
public void reusedByteCount(long reusedByteCount) {
this.reusedByteCount = reusedByteCount;
}
public long recoveredTotalSize() {
return totalByteCount - reusedByteCount;
public long reusedBytes() {
long reused = 0;
for (File file : fileDetails.values()) {
if (file.reused()) {
reused += file.length();
}
}
return reused;
}
public void updateVersion(long version) {
this.version = version;
}
public void detailed(boolean detailed) {
this.detailed = detailed;
}
public static Index readIndex(StreamInput in) throws IOException {
Index index = new Index();
index.readFrom(in);
return index;
}
@Override
public void readFrom(StreamInput in) throws IOException {
startTime = in.readVLong();
time = in.readVLong();
totalFileCount = in.readVInt();
totalByteCount = in.readVLong();
reusedFileCount = in.readVInt();
reusedByteCount = in.readVLong();
recoveredFileCount = new AtomicInteger(in.readVInt());
recoveredByteCount = new AtomicLong(in.readVLong());
int size = in.readVInt();
fileDetails = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
fileDetails.add(File.readFile(in));
}
size = in.readVInt();
reusedFileDetails = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
reusedFileDetails.add(File.readFile(in));
File file = File.readFile(in);
fileDetails.put(file.name, file);
}
}
@ -754,53 +758,37 @@ public class RecoveryState implements ToXContent, Streamable {
public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(startTime);
out.writeVLong(time);
out.writeVInt(totalFileCount);
out.writeVLong(totalByteCount);
out.writeVInt(reusedFileCount);
out.writeVLong(reusedByteCount);
out.writeVInt(recoveredFileCount.get());
out.writeVLong(recoveredByteCount.get());
out.writeVInt(fileDetails.size());
for (File file : fileDetails) {
file.writeTo(out);
}
out.writeVInt(reusedFileDetails.size());
for (File file : reusedFileDetails) {
final File[] files = fileDetails.values().toArray(new File[0]);
out.writeVInt(files.length);
for (File file : files) {
file.writeTo(out);
}
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
int filesRecovered = recoveredFileCount.get();
long bytesRecovered = recoveredByteCount.get();
// stream size first, as it matters more and the files section can be long
builder.startObject(Fields.SIZE);
builder.byteSizeField(Fields.TOTAL_IN_BYTES, Fields.TOTAL, totalBytes());
builder.byteSizeField(Fields.REUSED_IN_BYTES, Fields.REUSED, totalBytes());
builder.byteSizeField(Fields.RECOVERED_IN_BYTES, Fields.RECOVERED, recoveredBytes());
builder.field(Fields.PERCENT, String.format(Locale.ROOT, "%1.1f%%", recoveredBytesPercent()));
builder.endObject();
builder.startObject(Fields.FILES);
builder.field(Fields.TOTAL, totalFileCount);
builder.field(Fields.REUSED, reusedFileCount);
builder.field(Fields.RECOVERED, filesRecovered);
builder.field(Fields.PERCENT, String.format(Locale.ROOT, "%1.1f%%", percentFilesRecovered()));
if (detailed) {
builder.field(Fields.TOTAL, totalFileCount());
builder.field(Fields.REUSED, reusedFileCount());
builder.field(Fields.RECOVERED, recoveredFileCount());
builder.field(Fields.PERCENT, String.format(Locale.ROOT, "%1.1f%%", recoveredFilesPercent()));
if (params.paramAsBoolean("details", false)) {
builder.startArray(Fields.DETAILS);
for (File file : fileDetails) {
file.toXContent(builder, params);
}
for (File file : reusedFileDetails) {
for (File file : fileDetails.values()) {
file.toXContent(builder, params);
}
builder.endArray();
}
builder.endObject();
builder.startObject(Fields.BYTES);
builder.field(Fields.TOTAL, totalByteCount);
builder.field(Fields.REUSED, reusedByteCount);
builder.field(Fields.RECOVERED, bytesRecovered);
builder.field(Fields.PERCENT, String.format(Locale.ROOT, "%1.1f%%", percentBytesRecovered()));
builder.endObject();
builder.timeValueField(Fields.TOTAL_TIME_IN_MILLIS, Fields.TOTAL_TIME, time);
return builder;
}

View File

@ -57,7 +57,6 @@ public class RecoveryStatus extends AbstractRefCounted {
private final ShardId shardId;
private final long recoveryId;
private final IndexShard indexShard;
private final RecoveryState state;
private final DiscoveryNode sourceNode;
private final String tempFilePrefix;
private final Store store;
@ -73,7 +72,7 @@ public class RecoveryStatus extends AbstractRefCounted {
// last time this status was accessed
private volatile long lastAccessTime = System.nanoTime();
public RecoveryStatus(IndexShard indexShard, DiscoveryNode sourceNode, RecoveryState state, RecoveryTarget.RecoveryListener listener) {
public RecoveryStatus(IndexShard indexShard, DiscoveryNode sourceNode, RecoveryTarget.RecoveryListener listener) {
super("recovery_status");
this.recoveryId = idGenerator.incrementAndGet();
@ -82,9 +81,9 @@ public class RecoveryStatus extends AbstractRefCounted {
this.indexShard = indexShard;
this.sourceNode = sourceNode;
this.shardId = indexShard.shardId();
this.state = state;
this.state.getTimer().startTime(System.currentTimeMillis());
this.tempFilePrefix = RECOVERY_PREFIX + this.state.getTimer().startTime() + ".";
final RecoveryState.Timer timer = this.indexShard.recoveryState().getTimer();
timer.startTime(System.currentTimeMillis());
this.tempFilePrefix = RECOVERY_PREFIX + timer.startTime() + ".";
this.store = indexShard.store();
// make sure the store is not released until we are done.
store.incRef();
@ -110,7 +109,7 @@ public class RecoveryStatus extends AbstractRefCounted {
}
public RecoveryState state() {
return state;
return indexShard.recoveryState();
}
public CancellableThreads CancellableThreads() {
@ -133,11 +132,11 @@ public class RecoveryStatus extends AbstractRefCounted {
}
public void stage(RecoveryState.Stage stage) {
state.setStage(stage);
state().setStage(stage);
}
public RecoveryState.Stage stage() {
return state.getStage();
return state().getStage();
}
public Store.LegacyChecksums legacyChecksums() {
@ -178,7 +177,7 @@ public class RecoveryStatus extends AbstractRefCounted {
public void fail(RecoveryFailedException e, boolean sendShardFailure) {
if (finished.compareAndSet(false, true)) {
try {
listener.onRecoveryFailure(state, e, sendShardFailure);
listener.onRecoveryFailure(state(), e, sendShardFailure);
} finally {
try {
cancellableThreads.cancel("failed recovery [" + e.getMessage() + "]");
@ -196,7 +195,7 @@ public class RecoveryStatus extends AbstractRefCounted {
assert tempFileNames.isEmpty() : "not all temporary files are renamed";
// release the initial reference. recovery files will be cleaned as soon as ref count goes to zero, potentially now
decRef();
listener.onRecoveryDone(state);
listener.onRecoveryDone(state());
}
}

View File

@ -119,9 +119,6 @@ public class RecoveryTarget extends AbstractComponent {
return null;
}
final RecoveryStatus recoveryStatus = statusRef.status();
if (recoveryStatus.state().getTimer().startTime() > 0 && recoveryStatus.stage() != RecoveryState.Stage.DONE) {
recoveryStatus.state().getTimer().time(System.currentTimeMillis() - recoveryStatus.state().getTimer().startTime());
}
return recoveryStatus.state();
} catch (Exception e) {
// shouldn't really happen, but have to be here due to auto close
@ -131,19 +128,14 @@ public class RecoveryTarget extends AbstractComponent {
public void startRecovery(final IndexShard indexShard, final RecoveryState.Type recoveryType, final DiscoveryNode sourceNode, final RecoveryListener listener) {
try {
indexShard.recovering("from " + sourceNode);
indexShard.recovering("from " + sourceNode, recoveryType, sourceNode);
} catch (IllegalIndexShardStateException e) {
// that's fine, since we might be called concurrently, just ignore this, we are already recovering
logger.debug("{} ignore recovery. already in recovering process, {}", indexShard.shardId(), e.getMessage());
return;
}
// create a new recovery status, and process...
RecoveryState recoveryState = new RecoveryState(indexShard.shardId());
recoveryState.setType(recoveryType);
recoveryState.setSourceNode(sourceNode);
recoveryState.setTargetNode(clusterService.localNode());
recoveryState.setPrimary(indexShard.routingEntry().primary());
final long recoveryId = onGoingRecoveries.startRecovery(indexShard, sourceNode, recoveryState, listener, recoverySettings.activityTimeout());
final long recoveryId = onGoingRecoveries.startRecovery(indexShard, sourceNode, listener, recoverySettings.activityTimeout());
threadPool.generic().execute(new RecoveryRunner(recoveryId));
}
@ -311,8 +303,7 @@ public class RecoveryTarget extends AbstractComponent {
public void messageReceived(RecoveryFinalizeRecoveryRequest request, TransportChannel channel) throws Exception {
try (RecoveriesCollection.StatusRef statusRef = onGoingRecoveries.getStatusSafe(request.recoveryId(), request.shardId())) {
final RecoveryStatus recoveryStatus = statusRef.status();
recoveryStatus.indexShard().performRecoveryFinalization(false, recoveryStatus.state());
recoveryStatus.state().getTimer().time(System.currentTimeMillis() - recoveryStatus.state().getTimer().startTime());
recoveryStatus.indexShard().performRecoveryFinalization(false);
recoveryStatus.stage(RecoveryState.Stage.DONE);
}
channel.sendResponse(TransportResponse.Empty.INSTANCE);
@ -363,12 +354,12 @@ public class RecoveryTarget extends AbstractComponent {
try (RecoveriesCollection.StatusRef statusRef = onGoingRecoveries.getStatusSafe(request.recoveryId(), request.shardId())) {
final RecoveryStatus recoveryStatus = statusRef.status();
final RecoveryState.Index index = recoveryStatus.state().getIndex();
index.addFileDetails(request.phase1FileNames, request.phase1FileSizes);
index.addReusedFileDetails(request.phase1ExistingFileNames, request.phase1ExistingFileSizes);
index.totalByteCount(request.phase1TotalSize);
index.totalFileCount(request.phase1FileNames.size() + request.phase1ExistingFileNames.size());
index.reusedByteCount(request.phase1ExistingTotalSize);
index.reusedFileCount(request.phase1ExistingFileNames.size());
for (int i = 0; i < request.phase1ExistingFileNames.size(); i++) {
index.addFileDetail(request.phase1ExistingFileNames.get(i), request.phase1ExistingFileSizes.get(i), true);
}
for (int i = 0; i < request.phase1FileNames.size(); i++) {
index.addFileDetail(request.phase1FileNames.get(i), request.phase1FileSizes.get(i), false);
}
// recoveryBytesCount / recoveryFileCount will be set as we go...
recoveryStatus.stage(RecoveryState.Stage.INDEX);
channel.sendResponse(TransportResponse.Empty.INSTANCE);
@ -454,11 +445,7 @@ public class RecoveryTarget extends AbstractComponent {
content = content.toBytesArray();
}
indexOutput.writeBytes(content.array(), content.arrayOffset(), content.length());
recoveryStatus.state().getIndex().addRecoveredByteCount(content.length());
RecoveryState.File file = recoveryStatus.state().getIndex().file(request.name());
if (file != null) {
file.updateRecovered(request.length());
}
recoveryStatus.state().getIndex().addRecoveredBytesToFile(request.name(), content.length());
if (indexOutput.getFilePointer() >= request.length() || request.lastChunk()) {
try {
Store.verify(indexOutput);
@ -472,7 +459,6 @@ public class RecoveryTarget extends AbstractComponent {
assert Arrays.asList(store.directory().listAll()).contains(temporaryFileName);
store.directory().sync(Collections.singleton(temporaryFileName));
IndexOutput remove = recoveryStatus.removeOpenIndexOutputs(request.name());
recoveryStatus.state().getIndex().addRecoveredFileCount(1);
assert remove == null || remove == indexOutput; // remove maybe null if we got finished
}
}

View File

@ -42,7 +42,6 @@ import org.elasticsearch.common.compress.CompressorFactory;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.ArrayUtils;
import org.elasticsearch.common.util.CancellableThreads;
import org.elasticsearch.common.util.CancellableThreads.Interruptable;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
@ -65,7 +64,6 @@ import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.*;
@ -201,8 +199,7 @@ public class ShardRecoveryHandler implements Engine.RecoveryHandler {
@Override
public void run() throws InterruptedException {
RecoveryFilesInfoRequest recoveryInfoFilesRequest = new RecoveryFilesInfoRequest(request.recoveryId(), request.shardId(),
response.phase1FileNames, response.phase1FileSizes, response.phase1ExistingFileNames, response.phase1ExistingFileSizes,
response.phase1TotalSize, response.phase1ExistingTotalSize);
response.phase1FileNames, response.phase1FileSizes, response.phase1ExistingFileNames, response.phase1ExistingFileSizes);
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.FILES_INFO, recoveryInfoFilesRequest,
TransportRequestOptions.options().withTimeout(recoverySettings.internalActionTimeout()),
EmptyTransportResponseHandler.INSTANCE_SAME).txGet();

View File

@ -30,7 +30,10 @@ import org.elasticsearch.common.Table;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.rest.*;
import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.RestResponse;
import org.elasticsearch.rest.action.support.RestResponseListener;
import org.elasticsearch.rest.action.support.RestTable;
@ -89,10 +92,12 @@ public class RestRecoveryAction extends AbstractCatAction {
.addCell("target_host", "alias:thost;desc:target host")
.addCell("repository", "alias:rep;desc:repository")
.addCell("snapshot", "alias:snap;desc:snapshot")
.addCell("files", "alias:f;desc:number of files")
.addCell("files", "alias:f;desc:number of files to recover")
.addCell("files_percent", "alias:fp;desc:percent of files recovered")
.addCell("bytes", "alias:b;desc:size in bytes")
.addCell("bytes", "alias:b;desc:size to recover in bytes")
.addCell("bytes_percent", "alias:bp;desc:percent of bytes recovered")
.addCell("total_files", "alias:tf;desc:total number of files")
.addCell("total_bytes", "alias:tb;desc:total number of bytes")
.endHeaders();
return t;
}
@ -145,10 +150,12 @@ public class RestRecoveryAction extends AbstractCatAction {
t.addCell(state.getTargetNode().getHostName());
t.addCell(state.getRestoreSource() == null ? "n/a" : state.getRestoreSource().snapshotId().getRepository());
t.addCell(state.getRestoreSource() == null ? "n/a" : state.getRestoreSource().snapshotId().getSnapshot());
t.addCell(state.getIndex().totalRecoverFiles());
t.addCell(String.format(Locale.ROOT, "%1.1f%%", state.getIndex().recoveredFilesPercent()));
t.addCell(state.getIndex().totalRecoverBytes());
t.addCell(String.format(Locale.ROOT, "%1.1f%%", state.getIndex().recoveredBytesPercent()));
t.addCell(state.getIndex().totalFileCount());
t.addCell(String.format(Locale.ROOT, "%1.1f%%", state.getIndex().percentFilesRecovered()));
t.addCell(state.getIndex().totalByteCount());
t.addCell(String.format(Locale.ROOT, "%1.1f%%", state.getIndex().percentBytesRecovered()));
t.addCell(state.getIndex().totalBytes());
t.endRow();
}
}

View File

@ -133,7 +133,7 @@ public class ReplicaRecoveryBenchmark {
long bytes;
if (indexRecoveries.size() > 0) {
translogOps = indexRecoveries.get(0).recoveryState().getTranslog().currentTranslogOperations();
bytes = recoveryResponse.shardResponses().get(INDEX_NAME).get(0).recoveryState().getIndex().recoveredByteCount();
bytes = recoveryResponse.shardResponses().get(INDEX_NAME).get(0).recoveryState().getIndex().recoveredBytes();
} else {
bytes = lastBytes = 0;
translogOps = lastTranslogOps = 0;

View File

@ -19,7 +19,6 @@
package org.elasticsearch.gateway;
import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.Version;
import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse;
import org.elasticsearch.action.admin.indices.recovery.ShardRecoveryResponse;
import org.elasticsearch.action.count.CountResponse;
@ -31,12 +30,12 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.test.ElasticsearchBackwardsCompatIntegrationTest;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.junit.Test;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.hamcrest.Matchers.*;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
@ElasticsearchIntegrationTest.ClusterScope(numDataNodes = 0, scope = ElasticsearchIntegrationTest.Scope.TEST, numClientNodes = 0, transportClientRatio = 0.0)
public class RecoveryBackwardsCompatibilityTests extends ElasticsearchBackwardsCompatIntegrationTest {
@ -101,15 +100,15 @@ public class RecoveryBackwardsCompatibilityTests extends ElasticsearchBackwardsC
RecoveryState recoveryState = response.recoveryState();
if (!recoveryState.getPrimary()) {
RecoveryState.Index index = recoveryState.getIndex();
assertThat(index.toString(), index.recoveredByteCount(), equalTo(0l));
assertThat(index.toString(), index.reusedByteCount(), greaterThan(0l));
assertThat(index.toString(), index.reusedByteCount(), equalTo(index.totalByteCount()));
assertThat(index.toString(), index.recoveredBytes(), equalTo(0l));
assertThat(index.toString(), index.reusedBytes(), greaterThan(0l));
assertThat(index.toString(), index.reusedBytes(), equalTo(index.totalBytes()));
assertThat(index.toString(), index.recoveredFileCount(), equalTo(0));
assertThat(index.toString(), index.reusedFileCount(), equalTo(index.totalFileCount()));
assertThat(index.toString(), index.reusedFileCount(), greaterThan(0));
assertThat(index.toString(), index.percentBytesRecovered(), equalTo(0.f));
assertThat(index.toString(), index.percentFilesRecovered(), equalTo(0.f));
assertThat(index.toString(), index.reusedByteCount(), greaterThan(index.numberOfRecoveredBytes()));
assertThat(index.toString(), index.recoveredBytesPercent(), equalTo(100.f));
assertThat(index.toString(), index.recoveredFilesPercent(), equalTo(100.f));
assertThat(index.toString(), index.reusedBytes(), greaterThan(index.recoveredBytes()));
// TODO upgrade via optimize?
}
}

View File

@ -401,18 +401,18 @@ public class RecoveryFromGatewayTests extends ElasticsearchIntegrationTest {
if (!recoveryState.getPrimary()) {
logger.info("--> replica shard {} recovered from {} to {}, recovered {}, reuse {}",
response.getShardId(), recoveryState.getSourceNode().name(), recoveryState.getTargetNode().name(),
recoveryState.getIndex().recoveredTotalSize(), recoveryState.getIndex().reusedByteCount());
assertThat("no bytes should be recovered", recoveryState.getIndex().recoveredByteCount(), equalTo(0l));
assertThat("data should have been reused", recoveryState.getIndex().reusedByteCount(), greaterThan(0l));
assertThat("all bytes should be reused", recoveryState.getIndex().reusedByteCount(), equalTo(recoveryState.getIndex().totalByteCount()));
recoveryState.getIndex().recoveredBytes(), recoveryState.getIndex().reusedBytes());
assertThat("no bytes should be recovered", recoveryState.getIndex().recoveredBytes(), equalTo(0l));
assertThat("data should have been reused", recoveryState.getIndex().reusedBytes(), greaterThan(0l));
assertThat("all bytes should be reused", recoveryState.getIndex().reusedBytes(), equalTo(recoveryState.getIndex().totalBytes()));
assertThat("no files should be recovered", recoveryState.getIndex().recoveredFileCount(), equalTo(0));
assertThat("all files should be reused", recoveryState.getIndex().reusedFileCount(), equalTo(recoveryState.getIndex().totalFileCount()));
assertThat("> 0 files should be reused", recoveryState.getIndex().reusedFileCount(), greaterThan(0));
assertThat("all bytes should be reused bytes",
recoveryState.getIndex().reusedByteCount(), greaterThan(recoveryState.getIndex().numberOfRecoveredBytes()));
} else {
assertThat(recoveryState.getIndex().recoveredByteCount(), equalTo(recoveryState.getIndex().reusedByteCount()));
assertThat(recoveryState.getIndex().recoveredFileCount(), equalTo(recoveryState.getIndex().reusedFileCount()));
assertThat(recoveryState.getIndex().recoveredBytes(), equalTo(0l));
assertThat(recoveryState.getIndex().reusedBytes(), equalTo(recoveryState.getIndex().totalBytes()));
assertThat(recoveryState.getIndex().recoveredFileCount(), equalTo(0));
assertThat(recoveryState.getIndex().reusedFileCount(), equalTo(recoveryState.getIndex().totalFileCount()));
}
}

View File

@ -424,10 +424,10 @@ public class IndexRecoveryTests extends ElasticsearchIntegrationTest {
private void validateIndexRecoveryState(RecoveryState.Index indexState) {
assertThat(indexState.time(), greaterThanOrEqualTo(0L));
assertThat(indexState.percentFilesRecovered(), greaterThanOrEqualTo(0.0f));
assertThat(indexState.percentFilesRecovered(), lessThanOrEqualTo(100.0f));
assertThat(indexState.percentBytesRecovered(), greaterThanOrEqualTo(0.0f));
assertThat(indexState.percentBytesRecovered(), lessThanOrEqualTo(100.0f));
assertThat(indexState.recoveredFilesPercent(), greaterThanOrEqualTo(0.0f));
assertThat(indexState.recoveredFilesPercent(), lessThanOrEqualTo(100.0f));
assertThat(indexState.recoveredBytesPercent(), greaterThanOrEqualTo(0.0f));
assertThat(indexState.recoveredBytesPercent(), lessThanOrEqualTo(100.0f));
}
@Test

View File

@ -18,43 +18,168 @@
*/
package org.elasticsearch.indices.recovery;
import org.elasticsearch.common.io.stream.BytesStreamInput;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.indices.recovery.RecoveryState.File;
import org.elasticsearch.test.ElasticsearchTestCase;
import static org.hamcrest.Matchers.closeTo;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import static org.hamcrest.Matchers.*;
public class RecoveryStateTest extends ElasticsearchTestCase {
public void testPercentage() {
RecoveryState state = new RecoveryState();
RecoveryState.Index index = state.getIndex();
index.totalByteCount(100);
index.reusedByteCount(20);
index.recoveredByteCount(80);
assertThat((double)index.percentBytesRecovered(), closeTo(80.0d, 0.1d));
abstract class Streamer<T extends Streamable> extends Thread {
index.totalFileCount(100);
index.reusedFileCount(80);
index.recoveredFileCount(20);
assertThat((double)index.percentFilesRecovered(), closeTo(20.0d, 0.1d));
private T lastRead;
final private AtomicBoolean shouldStop;
final private T source;
final AtomicReference<Throwable> error = new AtomicReference<>();
index.totalByteCount(0);
index.reusedByteCount(0);
index.recoveredByteCount(0);
assertThat((double)index.percentBytesRecovered(), closeTo(0d, 0.1d));
Streamer(AtomicBoolean shouldStop, T source) {
this.shouldStop = shouldStop;
this.source = source;
}
index.totalFileCount(0);
index.reusedFileCount(0);
index.recoveredFileCount(0);
assertThat((double)index.percentFilesRecovered(), closeTo(00.0d, 0.1d));
void serializeDeserialize() throws IOException {
BytesStreamOutput out = new BytesStreamOutput();
source.writeTo(out);
out.close();
StreamInput in = new BytesStreamInput(out.bytes());
lastRead = deserialize(in);
}
index.totalByteCount(10);
index.reusedByteCount(0);
index.recoveredByteCount(10);
assertThat((double)index.percentBytesRecovered(), closeTo(100d, 0.1d));
abstract T deserialize(StreamInput in) throws IOException;
index.totalFileCount(20);
index.reusedFileCount(0);
index.recoveredFileCount(20);
assertThat((double)index.percentFilesRecovered(), closeTo(100.0d, 0.1d));
@Override
public void run() {
try {
while (shouldStop.get() == false) {
serializeDeserialize();
}
serializeDeserialize();
} catch (Throwable t) {
error.set(t);
}
}
}
public void testIndex() throws Exception {
File[] files = new File[randomIntBetween(1, 20)];
ArrayList<File> filesToRecover = new ArrayList<>();
long totalFileBytes = 0;
long totalReusedBytes = 0;
int totalReused = 0;
for (int i = 0; i < files.length; i++) {
final int fileLength = randomIntBetween(1, 1000);
final boolean reused = randomBoolean();
totalFileBytes += fileLength;
files[i] = new RecoveryState.File("f_" + i, fileLength, reused);
if (reused) {
totalReused++;
totalReusedBytes += fileLength;
} else {
filesToRecover.add(files[i]);
}
}
Collections.shuffle(Arrays.asList(files));
final RecoveryState.Index index = new RecoveryState.Index();
final long startTime = System.currentTimeMillis();
// before we start we must report 0
assertThat(index.recoveredFilesPercent(), equalTo((float) 0.0));
assertThat(index.recoveredBytesPercent(), equalTo((float) 0.0));
index.startTime(startTime);
for (File file : files) {
index.addFileDetail(file.name(), file.length(), file.reused());
}
logger.info("testing initial information");
assertThat(index.totalBytes(), equalTo(totalFileBytes));
assertThat(index.reusedBytes(), equalTo(totalReusedBytes));
assertThat(index.totalRecoverBytes(), equalTo(totalFileBytes - totalReusedBytes));
assertThat(index.totalFileCount(), equalTo(files.length));
assertThat(index.reusedFileCount(), equalTo(totalReused));
assertThat(index.totalRecoverFiles(), equalTo(filesToRecover.size()));
assertThat(index.recoveredFileCount(), equalTo(0));
assertThat(index.recoveredBytes(), equalTo(0l));
assertThat(index.recoveredFilesPercent(), equalTo(filesToRecover.size() == 0 ? 100.0f : 0.0f));
assertThat(index.recoveredBytesPercent(), equalTo(filesToRecover.size() == 0 ? 100.0f : 0.0f));
assertThat(index.startTime(), equalTo(startTime));
long bytesToRecover = totalFileBytes - totalReusedBytes;
boolean completeRecovery = bytesToRecover == 0 || randomBoolean();
if (completeRecovery == false) {
bytesToRecover = randomIntBetween(1, (int) bytesToRecover);
logger.info("performing partial recovery ([{}] bytes of [{}])", bytesToRecover, totalFileBytes - totalReusedBytes);
}
AtomicBoolean streamShouldStop = new AtomicBoolean();
Streamer<RecoveryState.Index> backgroundReader = new Streamer<RecoveryState.Index>(streamShouldStop, index) {
@Override
RecoveryState.Index deserialize(StreamInput in) throws IOException {
RecoveryState.Index index = new RecoveryState.Index();
index.readFrom(in);
return index;
}
};
backgroundReader.start();
long recoveredBytes = 0;
while (bytesToRecover > 0) {
File file = randomFrom(filesToRecover);
long toRecover = Math.min(bytesToRecover, randomIntBetween(1, (int) (file.length() - file.recovered())));
index.addRecoveredBytesToFile(file.name(), toRecover);
file.addRecoveredBytes(toRecover);
bytesToRecover -= toRecover;
recoveredBytes += toRecover;
if (file.reused() || file.fullyRecovered()) {
filesToRecover.remove(file);
}
}
if (completeRecovery) {
assertThat(filesToRecover.size(), equalTo(0));
long time = System.currentTimeMillis();
index.stopTime(time);
assertThat(index.time(), equalTo(Math.max(0, time - startTime)));
}
logger.info("testing serialized information");
streamShouldStop.set(true);
backgroundReader.join();
assertThat(backgroundReader.lastRead.fileDetails().toArray(), arrayContainingInAnyOrder(index.fileDetails().toArray()));
assertThat(backgroundReader.lastRead.startTime(), equalTo(index.startTime()));
assertThat(backgroundReader.lastRead.time(), equalTo(index.time()));
logger.info("testing post recovery");
assertThat(index.totalBytes(), equalTo(totalFileBytes));
assertThat(index.reusedBytes(), equalTo(totalReusedBytes));
assertThat(index.totalRecoverBytes(), equalTo(totalFileBytes - totalReusedBytes));
assertThat(index.totalFileCount(), equalTo(files.length));
assertThat(index.reusedFileCount(), equalTo(totalReused));
assertThat(index.totalRecoverFiles(), equalTo(files.length - totalReused));
assertThat(index.recoveredFileCount(), equalTo(index.totalRecoverFiles() - filesToRecover.size()));
assertThat(index.recoveredBytes(), equalTo(recoveredBytes));
if (index.totalRecoverFiles() == 0) {
assertThat((double) index.recoveredFilesPercent(), equalTo(100.0));
assertThat((double) index.recoveredBytesPercent(), equalTo(100.0));
} else {
assertThat((double) index.recoveredFilesPercent(), closeTo(100.0 * index.recoveredFileCount() / index.totalRecoverFiles(), 0.1));
assertThat((double) index.recoveredBytesPercent(), closeTo(100.0 * index.recoveredBytes() / index.totalRecoverBytes(), 0.1));
}
assertThat(index.startTime(), equalTo(startTime));
}
}

View File

@ -38,11 +38,10 @@ public class RecoveryStatusTests extends ElasticsearchSingleNodeTest {
public void testRenameTempFiles() throws IOException {
IndexService service = createIndex("foo");
RecoveryState state = new RecoveryState();
IndexShard indexShard = service.shard(0);
DiscoveryNode node = new DiscoveryNode("foo", new LocalTransportAddress("bar"), Version.CURRENT);
RecoveryStatus status = new RecoveryStatus(indexShard, node, state, new RecoveryTarget.RecoveryListener() {
RecoveryStatus status = new RecoveryStatus(indexShard, node, new RecoveryTarget.RecoveryListener() {
@Override
public void onRecoveryDone(RecoveryState state) {
}

View File

@ -116,9 +116,8 @@ public class RecoveriesCollectionTests extends ElasticsearchSingleNodeTest {
long startRecovery(RecoveriesCollection collection, RecoveryTarget.RecoveryListener listener, TimeValue timeValue) {
IndicesService indexServices = getInstanceFromNode(IndicesService.class);
IndexShard indexShard = indexServices.indexServiceSafe("test").shard(0);
return collection.startRecovery(
indexShard, new DiscoveryNode("id", DummyTransportAddress.INSTANCE, Version.CURRENT),
new RecoveryState(indexShard.shardId()), listener, timeValue);
final DiscoveryNode sourceNode = new DiscoveryNode("id", DummyTransportAddress.INSTANCE, Version.CURRENT);
return collection.startRecovery(indexShard, sourceNode, listener, timeValue);
}
}

View File

@ -25,7 +25,6 @@ import com.carrotsearch.randomizedtesting.LifecycleScope;
import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.ListenableFuture;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.action.ListenableActionFuture;
import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryResponse;
@ -568,7 +567,7 @@ public class DedicatedClusterSnapshotRestoreTests extends AbstractSnapshotTests
IntSet reusedShards = IntOpenHashSet.newInstance();
for (ShardRecoveryResponse response : client().admin().indices().prepareRecoveries("test-idx").get().shardResponses().get("test-idx")) {
if (response.recoveryState().getIndex().reusedByteCount() > 0) {
if (response.recoveryState().getIndex().reusedBytes() > 0) {
reusedShards.add(response.getShardId());
}
}

View File

@ -24,12 +24,12 @@ import org.elasticsearch.common.logging.Loggers;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.instanceOf;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
/**
* Represents a gt assert section:
*
* - gt: { fields._ttl: 0}
*
* <p/>
* - gt: { fields._ttl: 0}
*/
public class GreaterThanAssertion extends Assertion {
@ -42,10 +42,14 @@ public class GreaterThanAssertion extends Assertion {
@Override
@SuppressWarnings("unchecked")
protected void doAssert(Object actualValue, Object expectedValue) {
logger.trace("assert that [{}] is greater than [{}]", actualValue, expectedValue);
assertThat(actualValue, instanceOf(Comparable.class));
assertThat(expectedValue, instanceOf(Comparable.class));
assertThat(errorMessage(), (Comparable)actualValue, greaterThan((Comparable) expectedValue));
logger.trace("assert that [{}] is greater than [{}] (field: [{}])", actualValue, expectedValue, getField());
assertThat("value of [" + getField() + "] is not comparable (got [" + actualValue.getClass() + "])", actualValue, instanceOf(Comparable.class));
assertThat("expected value of [" + getField() + "] is not comparable (got [" + expectedValue.getClass() + "])", expectedValue, instanceOf(Comparable.class));
try {
assertThat(errorMessage(), (Comparable) actualValue, greaterThan((Comparable) expectedValue));
} catch (ClassCastException e) {
fail("cast error while checking (" + errorMessage() + "): " + e);
}
}
private String errorMessage() {

View File

@ -25,6 +25,7 @@ import org.elasticsearch.common.logging.Loggers;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
/**
* Represents a gte assert section:
@ -41,10 +42,14 @@ public class GreaterThanEqualToAssertion extends Assertion {
@Override
protected void doAssert(Object actualValue, Object expectedValue) {
logger.trace("assert that [{}] is greater than or equal to [{}]", actualValue, expectedValue);
assertThat(actualValue, instanceOf(Comparable.class));
assertThat(expectedValue, instanceOf(Comparable.class));
assertThat(errorMessage(), (Comparable)actualValue, greaterThanOrEqualTo((Comparable) expectedValue));
logger.trace("assert that [{}] is greater than or equal to [{}] (field: [{}])", actualValue, expectedValue, getField());
assertThat("value of [" + getField() + "] is not comparable (got [" + actualValue.getClass() + "])", actualValue, instanceOf(Comparable.class));
assertThat("expected value of [" + getField() + "] is not comparable (got [" + expectedValue.getClass() + "])", expectedValue, instanceOf(Comparable.class));
try {
assertThat(errorMessage(), (Comparable) actualValue, greaterThanOrEqualTo((Comparable) expectedValue));
} catch (ClassCastException e) {
fail("cast error while checking (" + errorMessage() + "): " + e);
}
}
private String errorMessage() {

View File

@ -41,7 +41,7 @@ public class IsFalseAssertion extends Assertion {
@Override
@SuppressWarnings("unchecked")
protected void doAssert(Object actualValue, Object expectedValue) {
logger.trace("assert that [{}] doesn't have a true value", actualValue);
logger.trace("assert that [{}] doesn't have a true value (field: [{}])", actualValue, getField());
if (actualValue == null) {
return;

View File

@ -40,7 +40,7 @@ public class IsTrueAssertion extends Assertion {
@Override
protected void doAssert(Object actualValue, Object expectedValue) {
logger.trace("assert that [{}] has a true value", actualValue);
logger.trace("assert that [{}] has a true value (field [{}])", actualValue, getField());
String errorMessage = errorMessage();
assertThat(errorMessage, actualValue, notNullValue());
String actualString = actualValue.toString();

View File

@ -30,9 +30,8 @@ import static org.junit.Assert.assertThat;
/**
* Represents a length assert section:
*
* - length: { hits.hits: 1 }
*
* <p/>
* - length: { hits.hits: 1 }
*/
public class LengthAssertion extends Assertion {
@ -44,8 +43,8 @@ public class LengthAssertion extends Assertion {
@Override
protected void doAssert(Object actualValue, Object expectedValue) {
logger.trace("assert that [{}] has length [{}]", actualValue, expectedValue);
assertThat(expectedValue, instanceOf(Number.class));
logger.trace("assert that [{}] has length [{}] (field: [{}])", actualValue, expectedValue, getField());
assertThat("expected value of [" + getField() + "] is not numeric (got [" + expectedValue.getClass() + "]", expectedValue, instanceOf(Number.class));
int length = ((Number) expectedValue).intValue();
if (actualValue instanceof String) {
assertThat(errorMessage(), ((String) actualValue).length(), equalTo(length));

View File

@ -24,6 +24,7 @@ import org.elasticsearch.common.logging.Loggers;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.lessThan;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
/**
* Represents a lt assert section:
@ -42,10 +43,14 @@ public class LessThanAssertion extends Assertion {
@Override
@SuppressWarnings("unchecked")
protected void doAssert(Object actualValue, Object expectedValue) {
logger.trace("assert that [{}] is less than [{}]", actualValue, expectedValue);
assertThat(actualValue, instanceOf(Comparable.class));
assertThat(expectedValue, instanceOf(Comparable.class));
assertThat(errorMessage(), (Comparable)actualValue, lessThan((Comparable)expectedValue));
logger.trace("assert that [{}] is less than [{}] (field: [{}])", actualValue, expectedValue, getField());
assertThat("value of [" + getField() + "] is not comparable (got [" + actualValue.getClass() + "])", actualValue, instanceOf(Comparable.class));
assertThat("expected value of [" + getField() + "] is not comparable (got [" + expectedValue.getClass() + "])", expectedValue, instanceOf(Comparable.class));
try {
assertThat(errorMessage(), (Comparable) actualValue, lessThan((Comparable) expectedValue));
} catch (ClassCastException e) {
fail("cast error while checking (" + errorMessage() + "): " + e);
}
}
private String errorMessage() {

View File

@ -22,9 +22,10 @@ package org.elasticsearch.test.rest.section;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
/**
* Represents a lte assert section:
@ -41,10 +42,14 @@ public class LessThanOrEqualToAssertion extends Assertion {
@Override
protected void doAssert(Object actualValue, Object expectedValue) {
logger.trace("assert that [{}] is less than or equal to [{}]", actualValue, expectedValue);
assertThat(actualValue, instanceOf(Comparable.class));
assertThat(expectedValue, instanceOf(Comparable.class));
assertThat(errorMessage(), (Comparable)actualValue, lessThanOrEqualTo((Comparable) expectedValue));
logger.trace("assert that [{}] is less than or equal to [{}] (field: [{}])", actualValue, expectedValue, getField());
assertThat("value of [" + getField() + "] is not comparable (got [" + actualValue.getClass() + "])", actualValue, instanceOf(Comparable.class));
assertThat("expected value of [" + getField() + "] is not comparable (got [" + expectedValue.getClass() + "])", expectedValue, instanceOf(Comparable.class));
try {
assertThat(errorMessage(), (Comparable) actualValue, lessThanOrEqualTo((Comparable) expectedValue));
} catch (ClassCastException e) {
fail("cast error while checking (" + errorMessage() + "): " + e);
}
}
private String errorMessage() {

View File

@ -24,9 +24,7 @@ import org.elasticsearch.common.logging.Loggers;
import java.util.regex.Pattern;
import static org.elasticsearch.test.hamcrest.RegexMatcher.matches;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.*;
import static org.junit.Assert.assertThat;
/**
@ -61,7 +59,7 @@ public class MatchAssertion extends Assertion {
}
assertThat(errorMessage(), actualValue, notNullValue());
logger.trace("assert that [{}] matches [{}]", actualValue, expectedValue);
logger.trace("assert that [{}] matches [{}] (field [{}])", actualValue, expectedValue, getField());
if (!actualValue.getClass().equals(expectedValue.getClass())) {
if (actualValue instanceof Number && expectedValue instanceof Number) {
//Double 1.0 is equal to Integer 1