Recovery API

Adds a new API endpoint at /_recovery as well as to the Java API. The
recovery API allows one to see the recovery status of all shards in the
cluster. It will report on percent complete, recovery type, and which
files are copied.

Closes #4637
This commit is contained in:
Andrew Selden 2014-02-18 14:28:49 -08:00
parent 8f6e1d4720
commit 89e45fde9c
35 changed files with 2448 additions and 501 deletions

View File

@ -46,6 +46,7 @@ and warmers.
* <<indices-status>>
* <<indices-stats>>
* <<indices-segments>>
* <<indices-recovery>>
[float]
[[status-management]]
@ -94,6 +95,8 @@ include::indices/stats.asciidoc[]
include::indices/segments.asciidoc[]
include::indices/recovery.asciidoc[]
include::indices/clearcache.asciidoc[]
include::indices/flush.asciidoc[]

View File

@ -0,0 +1,194 @@
[[indices-recovery]]
== Indices Recovery
The indices recovery API provides insight into on-going shard recoveries.
Recovery status may be reported for specific indices, or cluster-wide.
For example, the following command would show recovery information for the indices "index1" and "index2".
[source,js]
--------------------------------------------------
curl -XGET http://localhost:9200/index1,index2/_recovery?pretty=true
--------------------------------------------------
To see cluster-wide recovery status simply leave out the index names.
[source,js]
--------------------------------------------------
curl -XGET http://localhost:9200/_recovery?pretty=true
--------------------------------------------------
Response:
[source,js]
--------------------------------------------------
{
"index1" : {
"shards" : [ {
"id" : 0,
"type" : "snapshot",
"stage" : "index",
"primary" : true,
"start_time" : "2014-02-24T12:15:59.716",
"stop_time" : 0,
"total_time_in_millis" : 175576,
"source" : {
"repository" : "my_repository",
"snapshot" : "my_snapshot",
"index" : "index1"
},
"target" : {
"id" : "ryqJ5lO5S4-lSFbGntkEkg",
"hostname" : "my.fqdn",
"ip" : "10.0.1.7",
"name" : "my_es_node"
},
"index" : {
"files" : {
"total" : 73,
"reused" : 0,
"recovered" : 69,
"percent" : "94.5%"
},
"bytes" : {
"total" : 79063092,
"reused" : 0,
"recovered" : 68891939,
"percent" : "87.1%"
},
"total_time_in_millis" : 0
},
"translog" : {
"recovered" : 0,
"total_time_in_millis" : 0
},
"start" : {
"check_index_time" : 0,
"total_time_in_millis" : 0
}
} ]
}
}
--------------------------------------------------
The above response shows a single index recovering a single shard. In this case, the source of the recovery is a snapshot repository
and the target of the recovery is the node with name "my_es_node".
Additionally, the output shows the number and percent of files recovered, as well as the number and percent of bytes recovered.
In some cases a higher level of detail may be preferable. Setting "detailed=true" will present a list of physical files in recovery.
[source,js]
--------------------------------------------------
curl -XGET http://localhost:9200/_recovery?pretty=true&detailed=true
--------------------------------------------------
Response:
[source,js]
--------------------------------------------------
{
"index1" : {
"shards" : [ {
"id" : 0,
"type" : "gateway",
"stage" : "done",
"primary" : true,
"start_time" : "2014-02-24T12:38:06.349",
"stop_time" : "2014-02-24T12:38:08.464",
"total_time_in_millis" : 2115,
"source" : {
"id" : "RGMdRc-yQWWKIBM4DGvwqQ",
"hostname" : "my.fqdn",
"ip" : "10.0.1.7",
"name" : "my_es_node"
},
"target" : {
"id" : "RGMdRc-yQWWKIBM4DGvwqQ",
"hostname" : "my.fqdn",
"ip" : "10.0.1.7",
"name" : "my_es_node"
},
"index" : {
"files" : {
"total" : 26,
"reused" : 26,
"recovered" : 26,
"percent" : "100.0%",
"details" : [ {
"name" : "segments.gen",
"length" : 20,
"recovered" : 20
}, {
"name" : "_0.cfs",
"length" : 135306,
"recovered" : 135306
}, {
"name" : "segments_2",
"length" : 251,
"recovered" : 251
},
...
]
},
"bytes" : {
"total" : 26001617,
"reused" : 26001617,
"recovered" : 26001617,
"percent" : "100.0%"
},
"total_time_in_millis" : 2
},
"translog" : {
"recovered" : 71,
"total_time_in_millis" : 2025
},
"start" : {
"check_index_time" : 0,
"total_time_in_millis" : 88
}
} ]
}
}
--------------------------------------------------
This response shows a detailed listing (truncated for brevity) of the actual files recovered and their sizes.
Also shown are the timings in milliseconds of the various stages of recovery: index retrieval, translog replay, and index start time.
Note that the above listing indicates that the recovery is in stage "done". All recoveries, whether on-going or complete, are kept in
cluster state and may be reported on at any time. Setting "active_only=true" will cause only on-going recoveries to be reported.
Here is a complete list of options:
[horizontal]
`detailed`:: Display a detailed view. This is primarily useful for viewing the recovery of physical index files. Default: false.
`active_only`:: Display only those recoveries that are currently on-going. Default: false.
Description of output fields:
[horizontal]
`id`:: Shard ID
`type`:: Recovery type:
* gateway
* snapshot
* replica
* relocating
`stage`:: Recovery stage:
* init: Recovery has not started
* index: Reading index meta-data and copying bytes from source to destination
* start: Starting the engine; opening the index for use
* translog: Replaying transaction log
* finalize: Cleanup
* done: Complete
`primary`:: True if shard is primary, false otherwise
`start_time`:: Timestamp of recovery start
`stop_time`:: Timestamp of recovery finish
`total_time_in_millis`:: Total time to recover shard in milliseconds
`source`:: Recovery source:
* repository description if recovery is from a snapshot
* description of source node otherwise
`target`:: Destination node
`index`:: Statistics about physical index recovery
`translog`:: Statistics about translog recovery
`start`:: Statistics about time to open and start the index

View File

@ -17,10 +17,6 @@
"description" : "The unit in which to display byte values",
"options": [ "b", "k", "m", "g" ]
},
"local": {
"type" : "boolean",
"description" : "Return local information, do not retrieve the state from master node (default: false)"
},
"master_timeout": {
"type" : "time",
"description" : "Explicit operation timeout for connection to master node"

View File

@ -0,0 +1,34 @@
{
"indices.recovery" : {
"documentation": "http://www.elasticsearch.org/guide/en/elasticsearch/reference/master/indices-recovery.html",
"methods": ["GET"],
"url": {
"path": "/_recovery",
"paths": ["/_recovery", "/{index}/_recovery"],
"parts": {
"index": {
"type" : "list",
"description" : "A comma-separated list of index names; use `_all` or empty string to perform the operation on all indices"
}
},
"params": {
"detailed" : {
"type": "boolean",
"description": "Whether to display detailed information about shard recovery",
"default": false
},
"active_only" : {
"type": "boolean",
"description": "Display only those recoveries that are currently on-going",
"default": false
},
"human": {
"type": "boolean",
"description": "Whether to return time and byte values in human-readable format.",
"default": false
}
}
},
"body": null
}
}

View File

@ -0,0 +1,26 @@
---
"Test cat recovery output":
- do:
cat.recovery: {}
- match:
$body: >
/^$/
- do:
index:
index: index1
type: type1
id: 1
body: { foo: bar }
refresh: true
- do:
cluster.health:
wait_for_status: yellow
- do:
cat.recovery: {}
- match:
$body: >
/^(index1\s+\d+\s+\d+\s+(gateway|replica|snapshot|relocating)\s+(init|index|start|translog|finalize|done)\s+([a-zA-Z_0-9/.])+\s+([a-zA-Z_0-9/.])+\s+([a-zA-Z_0-9/.])+\s+([a-zA-Z_0-9/.])+\s+\d+\s+\d+\.\d+\%\s+\d+\s+\d+\.\d+\%\s+\n?){1,}$/

View File

@ -0,0 +1,32 @@
---
"Indices recovery test":
- skip:
features: gtelte
- do:
indices.create:
index: test_1
- do:
indices.recovery:
index: [test_1]
- match: { test_1.shards.0.type: "GATEWAY" }
- match: { test_1.shards.0.stage: "DONE" }
- match: { test_1.shards.0.primary: true }
- match: { test_1.shards.0.target.ip: /^\d+\.\d+\.\d+\.\d+$/ }
- gte: { test_1.shards.0.index.files.total: 0 }
- gte: { test_1.shards.0.index.files.reused: 0 }
- gte: { test_1.shards.0.index.files.recovered: 0 }
- match: { test_1.shards.0.index.files.percent: /^\d+\.\d\%$/ }
- gte: { test_1.shards.0.index.bytes.total: 0 }
- gte: { test_1.shards.0.index.bytes.reused: 0 }
- gte: { test_1.shards.0.index.bytes.recovered: 0 }
- match: { test_1.shards.0.index.bytes.percent: /^\d+\.\d\%$/ }
- gte: { test_1.shards.0.translog.recovered: 0 }
- gte: { test_1.shards.0.translog.total_time_in_millis: 0 }
- gte: { test_1.shards.0.start.check_index_time_in_millis: 0 }
- gte: { test_1.shards.0.start.total_time_in_millis: 0 }

View File

@ -95,6 +95,8 @@ import org.elasticsearch.action.admin.indices.optimize.OptimizeAction;
import org.elasticsearch.action.admin.indices.optimize.TransportOptimizeAction;
import org.elasticsearch.action.admin.indices.refresh.RefreshAction;
import org.elasticsearch.action.admin.indices.refresh.TransportRefreshAction;
import org.elasticsearch.action.admin.indices.recovery.RecoveryAction;
import org.elasticsearch.action.admin.indices.recovery.TransportRecoveryAction;
import org.elasticsearch.action.admin.indices.segments.IndicesSegmentsAction;
import org.elasticsearch.action.admin.indices.segments.TransportIndicesSegmentsAction;
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsAction;
@ -284,6 +286,7 @@ public class ActionModule extends AbstractModule {
registerAction(MultiPercolateAction.INSTANCE, TransportMultiPercolateAction.class, TransportShardMultiPercolateAction.class);
registerAction(ExplainAction.INSTANCE, TransportExplainAction.class);
registerAction(ClearScrollAction.INSTANCE, TransportClearScrollAction.class);
registerAction(RecoveryAction.INSTANCE, TransportRecoveryAction.class);
// register Name -> GenericAction Map that can be injected to instances.
MapBinder<String, GenericAction> actionsBinder

View File

@ -0,0 +1,46 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.indices.recovery;
import org.elasticsearch.client.IndicesAdminClient;
import org.elasticsearch.action.admin.indices.IndicesAction;
/**
* Recovery information action
*/
public class RecoveryAction extends IndicesAction<RecoveryRequest, RecoveryResponse, RecoveryRequestBuilder> {
public static final RecoveryAction INSTANCE = new RecoveryAction();
public static final String NAME = "indices/recovery";
private RecoveryAction() {
super(NAME);
}
@Override
public RecoveryRequestBuilder newRequestBuilder(IndicesAdminClient client) {
return new RecoveryRequestBuilder(client);
}
@Override
public RecoveryResponse newResponse() {
return new RecoveryResponse();
}
}

View File

@ -0,0 +1,104 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.indices.recovery;
import java.io.IOException;
import org.elasticsearch.action.support.broadcast.BroadcastOperationRequest;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
/**
* Request for recovery information
*/
public class RecoveryRequest extends BroadcastOperationRequest<RecoveryRequest> {
private boolean detailed = false; // Provides extra details in the response
private boolean activeOnly = false; // Only reports on active recoveries
/**
* Constructs a request for recovery information for all shards
*/
public RecoveryRequest() {
this(Strings.EMPTY_ARRAY);
}
/**
* Constructs a request for recovery information for all shards for the given indices
*
* @param indices Comma-separated list of indices about which to gather recovery information
*/
public RecoveryRequest(String... indices) {
super(indices);
}
/**
* True if detailed flag is set, false otherwise. This value if false by default.
*
* @return True if detailed flag is set, false otherwise
*/
public boolean detailed() {
return detailed;
}
/**
* Set value of the detailed flag. Detailed requests will contain extra
* information such as a list of physical files and their recovery progress.
*
* @param detailed Whether or not to set the detailed flag
*/
public void detailed(boolean detailed) {
this.detailed = detailed;
}
/**
* True if activeOnly flag is set, false otherwise. This value is false by default.
*
* @return True if activeOnly flag is set, false otherwise
*/
public boolean activeOnly() {
return activeOnly;
}
/**
* Set value of the activeOnly flag. If true, this request will only response with
* on-going recovery information.
*
* @param activeOnly Whether or not to set the activeOnly flag.
*/
public void activeOnly(boolean activeOnly) {
this.activeOnly = activeOnly;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeBoolean(detailed);
out.writeBoolean(activeOnly);
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
detailed = in.readBoolean();
activeOnly = in.readBoolean();
}
}

View File

@ -0,0 +1,55 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.indices.recovery;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.broadcast.BroadcastOperationRequestBuilder;
import org.elasticsearch.client.IndicesAdminClient;
import org.elasticsearch.client.internal.InternalIndicesAdminClient;
/**
* Recovery information request builder.
*/
public class RecoveryRequestBuilder extends BroadcastOperationRequestBuilder<RecoveryRequest, RecoveryResponse, RecoveryRequestBuilder> {
/**
* Constructs a new recovery information request builder.
*
* @param indicesClient Indices admin client
*/
public RecoveryRequestBuilder(IndicesAdminClient indicesClient) {
super((InternalIndicesAdminClient) indicesClient, new RecoveryRequest());
}
@Override
protected void doExecute(ActionListener<RecoveryResponse> listener) {
((IndicesAdminClient) client).recoveries(request, listener);
}
public RecoveryRequestBuilder setDetailed(boolean detailed) {
request.detailed(detailed);
return this;
}
public RecoveryRequestBuilder setActiveOnly(boolean activeOnly) {
request.activeOnly(activeOnly);
return this;
}
}

View File

@ -0,0 +1,139 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.indices.recovery;
import org.elasticsearch.action.ShardOperationFailedException;
import org.elasticsearch.action.support.broadcast.BroadcastOperationResponse;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.HashMap;
/**
* Information regarding the recovery state of indices and their associated shards.
*/
public class RecoveryResponse extends BroadcastOperationResponse implements ToXContent {
private boolean detailed = false;
private Map<String, List<ShardRecoveryResponse>> shardResponses = new HashMap<String, List<ShardRecoveryResponse>>();
public RecoveryResponse() { }
/**
* Constructs recovery information for a collection of indices and associated shards. Keeps track of how many total shards
* were seen, and out of those how many were successfully processed and how many failed.
*
* @param totalShards Total count of shards seen
* @param successfulShards Count of shards successfully processed
* @param failedShards Count of shards which failed to process
* @param detailed Display detailed metrics
* @param shardResponses Map of indices to shard recovery information
* @param shardFailures List of failures processing shards
*/
public RecoveryResponse(int totalShards, int successfulShards, int failedShards, boolean detailed,
Map<String, List<ShardRecoveryResponse>> shardResponses, List<ShardOperationFailedException> shardFailures) {
super(totalShards, successfulShards, failedShards, shardFailures);
this.shardResponses = shardResponses;
this.detailed = detailed;
}
public boolean hasRecoveries() {
return shardResponses.size() > 0;
}
public void addShardRecovery(String index, ShardRecoveryResponse shardRecoveryResponse) {
List<ShardRecoveryResponse> shardRecoveries = shardResponses.get(index);
if (shardRecoveries == null) {
shardRecoveries = new ArrayList<ShardRecoveryResponse>();
shardResponses.put(index, shardRecoveries);
}
shardRecoveries.add(shardRecoveryResponse);
}
public boolean detailed() {
return detailed;
}
public void detailed(boolean detailed) {
this.detailed = detailed;
}
public Map<String, List<ShardRecoveryResponse>> shardResponses() {
return shardResponses;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
for (String index : shardResponses.keySet()) {
List<ShardRecoveryResponse> responses = shardResponses.get(index);
if (responses == null || responses.size() == 0) {
continue;
}
builder.startObject(index);
builder.startArray("shards");
for (ShardRecoveryResponse recoveryResponse : responses) {
builder.startObject();
recoveryResponse.detailed(this.detailed);
recoveryResponse.toXContent(builder, params);
builder.endObject();
}
builder.endArray();
builder.endObject();
}
return builder;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeVInt(shardResponses.size());
for (Map.Entry<String, List<ShardRecoveryResponse>> entry : shardResponses.entrySet()) {
out.writeString(entry.getKey());
out.writeVInt(entry.getValue().size());
for (ShardRecoveryResponse recoveryResponse : entry.getValue()) {
recoveryResponse.writeTo(out);
}
}
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
int size = in.readVInt();
for (int i = 0; i < size; i++) {
String s = in.readString();
int listSize = in.readVInt();
List<ShardRecoveryResponse> list = new ArrayList<ShardRecoveryResponse>(listSize);
for (int j = 0; j < listSize; j++) {
list.add(ShardRecoveryResponse.readShardRecoveryResponse(in));
}
shardResponses.put(s, list);
}
}
}

View File

@ -0,0 +1,108 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.indices.recovery;
import org.elasticsearch.action.support.broadcast.BroadcastShardOperationResponse;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.indices.recovery.RecoveryState;
import java.io.IOException;
/**
* Information regarding the recovery state of a shard.
*/
public class ShardRecoveryResponse extends BroadcastShardOperationResponse implements ToXContent {
RecoveryState recoveryState;
private boolean detailed = false;
public ShardRecoveryResponse() { }
/**
* Constructs shard recovery information for the given index and shard id.
*
* @param index Name of the index
* @param shardId Id of the shard
*/
public ShardRecoveryResponse(String index, int shardId) {
super(index, shardId);
}
/**
* Sets the recovery state information for the shard.
*
* @param recoveryState Recovery state
*/
public void recoveryState(RecoveryState recoveryState) {
this.recoveryState = recoveryState;
}
/**
* Gets the recovery state information for the shard.
*
* @return Recovery state
*/
public RecoveryState recoveryState() {
return recoveryState;
}
public boolean detailed() {
return detailed;
}
public void detailed(boolean detailed) {
this.detailed = detailed;
this.recoveryState.setDetailed(detailed);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
recoveryState.toXContent(builder, params);
return builder;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
recoveryState.writeTo(out);
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
recoveryState = RecoveryState.readRecoveryState(in);
}
/**
* Builds a new ShardRecoveryResponse from the give input stream.
*
* @param in Input stream
* @return A new ShardRecoveryResponse
* @throws IOException
*/
public static ShardRecoveryResponse readShardRecoveryResponse(StreamInput in) throws IOException {
ShardRecoveryResponse response = new ShardRecoveryResponse();
response.readFrom(in);
return response;
}
}

View File

@ -0,0 +1,208 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.indices.recovery;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ShardOperationFailedException;
import org.elasticsearch.action.support.DefaultShardOperationFailedException;
import org.elasticsearch.action.support.broadcast.BroadcastShardOperationFailedException;
import org.elasticsearch.action.support.broadcast.BroadcastShardOperationRequest;
import org.elasticsearch.action.support.broadcast.TransportBroadcastOperationAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.indices.recovery.RecoveryTarget;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.index.service.InternalIndexService;
import org.elasticsearch.index.gateway.IndexShardGatewayService;
import org.elasticsearch.index.shard.service.InternalIndexShard;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.indices.recovery.RecoveryStatus;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.HashMap;
import java.util.concurrent.atomic.AtomicReferenceArray;
/**
* Transport action for shard recovery operation. This transport action does not actually
* perform shard recovery, it only reports on recoveries (both active and complete).
*/
public class TransportRecoveryAction extends
TransportBroadcastOperationAction<RecoveryRequest, RecoveryResponse, TransportRecoveryAction.ShardRecoveryRequest, ShardRecoveryResponse> {
private final IndicesService indicesService;
private final RecoveryTarget recoveryTarget;
@Inject
public TransportRecoveryAction(Settings settings, ThreadPool threadPool, ClusterService clusterService,
TransportService transportService, IndicesService indicesService, RecoveryTarget recoveryTarget) {
super(settings, threadPool, clusterService, transportService);
this.indicesService = indicesService;
this.recoveryTarget = recoveryTarget;
}
@Override
protected String transportAction() {
return RecoveryAction.NAME;
}
@Override
protected String executor() {
return ThreadPool.Names.MANAGEMENT;
}
@Override
protected RecoveryRequest newRequest() {
return new RecoveryRequest();
}
@Override
protected RecoveryResponse newResponse(RecoveryRequest request, AtomicReferenceArray shardsResponses, ClusterState clusterState) {
int successfulShards = 0;
int failedShards = 0;
List<ShardOperationFailedException> shardFailures = null;
Map<String, List<ShardRecoveryResponse>> shardResponses = new HashMap<String, List<ShardRecoveryResponse>>();
for (int i = 0; i < shardsResponses.length(); i++) {
Object shardResponse = shardsResponses.get(i);
if (shardResponse == null) {
// simply ignore non active shards
} else if (shardResponse instanceof BroadcastShardOperationFailedException) {
failedShards++;
if (shardFailures == null) {
shardFailures = new ArrayList<ShardOperationFailedException>();
}
shardFailures.add(new DefaultShardOperationFailedException((BroadcastShardOperationFailedException) shardResponse));
} else {
ShardRecoveryResponse recoveryResponse = (ShardRecoveryResponse) shardResponse;
successfulShards++;
String indexName = recoveryResponse.getIndex();
List<ShardRecoveryResponse> responses = shardResponses.get(indexName);
if (responses == null) {
responses = new ArrayList<ShardRecoveryResponse>();
shardResponses.put(indexName, responses);
}
if (request.activeOnly()) {
if (recoveryResponse.recoveryState().getStage() != RecoveryState.Stage.DONE) {
responses.add(recoveryResponse);
}
} else {
responses.add(recoveryResponse);
}
}
}
RecoveryResponse response = new RecoveryResponse(shardsResponses.length(), successfulShards,
failedShards, request.detailed(), shardResponses, shardFailures);
return response;
}
@Override
protected ShardRecoveryRequest newShardRequest() {
return new ShardRecoveryRequest();
}
@Override
protected ShardRecoveryRequest newShardRequest(ShardRouting shard, RecoveryRequest request) {
return new ShardRecoveryRequest(shard.index(), shard.id(), request);
}
@Override
protected ShardRecoveryResponse newShardResponse() {
return new ShardRecoveryResponse();
}
@Override
protected ShardRecoveryResponse shardOperation(ShardRecoveryRequest request) throws ElasticsearchException {
InternalIndexService indexService = (InternalIndexService) indicesService.indexServiceSafe(request.index());
InternalIndexShard indexShard = (InternalIndexShard) indexService.shardSafe(request.shardId());
ShardRouting shardRouting = indexShard.routingEntry();
ShardRecoveryResponse shardRecoveryResponse = new ShardRecoveryResponse(shardRouting.index(), shardRouting.id());
RecoveryState state;
RecoveryStatus recoveryStatus = indexShard.recoveryStatus();
if (recoveryStatus == null) {
recoveryStatus = recoveryTarget.recoveryStatus(indexShard.shardId());
}
if (recoveryStatus != null) {
state = recoveryStatus.recoveryState();
} else {
IndexShardGatewayService gatewayService =
indexService.shardInjector(request.shardId()).getInstance(IndexShardGatewayService.class);
state = gatewayService.recoveryState();
}
shardRecoveryResponse.recoveryState(state);
return shardRecoveryResponse;
}
@Override
protected GroupShardsIterator shards(ClusterState state, RecoveryRequest request, String[] concreteIndices) {
return state.routingTable().allAssignedShardsGrouped(concreteIndices, true);
}
@Override
protected ClusterBlockException checkGlobalBlock(ClusterState state, RecoveryRequest request) {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA);
}
@Override
protected ClusterBlockException checkRequestBlock(ClusterState state, RecoveryRequest request, String[] concreteIndices) {
return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA, concreteIndices);
}
public static class ShardRecoveryRequest extends BroadcastShardOperationRequest {
ShardRecoveryRequest() { }
ShardRecoveryRequest(String index, int shardId, RecoveryRequest request) {
super(index, shardId, request);
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
}
}
}

View File

@ -37,6 +37,7 @@ import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.gateway.IndexShardGatewayService;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.index.gateway.SnapshotStatus;
import org.elasticsearch.index.service.InternalIndexService;
import org.elasticsearch.index.shard.IndexShardState;
@ -174,9 +175,9 @@ public class TransportIndicesStatusAction extends TransportBroadcastOperationAct
if (request.recovery) {
// check on going recovery (from peer or gateway)
RecoveryStatus peerRecoveryStatus = indexShard.peerRecoveryStatus();
RecoveryStatus peerRecoveryStatus = indexShard.recoveryStatus();
if (peerRecoveryStatus == null) {
peerRecoveryStatus = peerRecoveryTarget.peerRecoveryStatus(indexShard.shardId());
peerRecoveryStatus = peerRecoveryTarget.recoveryStatus(indexShard.shardId());
}
if (peerRecoveryStatus != null) {
PeerRecoveryStatus.Stage stage;
@ -199,16 +200,18 @@ public class TransportIndicesStatusAction extends TransportBroadcastOperationAct
default:
stage = PeerRecoveryStatus.Stage.INIT;
}
shardStatus.peerRecoveryStatus = new PeerRecoveryStatus(stage, peerRecoveryStatus.startTime(), peerRecoveryStatus.time(),
peerRecoveryStatus.phase1TotalSize(), peerRecoveryStatus.phase1ExistingTotalSize(),
peerRecoveryStatus.currentFilesSize(), peerRecoveryStatus.currentTranslogOperations());
shardStatus.peerRecoveryStatus = new PeerRecoveryStatus(stage, peerRecoveryStatus.recoveryState().getTimer().startTime(),
peerRecoveryStatus.recoveryState().getTimer().time(),
peerRecoveryStatus.recoveryState().getIndex().totalByteCount(),
peerRecoveryStatus.recoveryState().getIndex().reusedByteCount(),
peerRecoveryStatus.recoveryState().getIndex().recoveredByteCount(), peerRecoveryStatus.recoveryState().getTranslog().currentTranslogOperations());
}
IndexShardGatewayService gatewayService = indexService.shardInjector(request.shardId()).getInstance(IndexShardGatewayService.class);
org.elasticsearch.index.gateway.RecoveryStatus gatewayRecoveryStatus = gatewayService.recoveryStatus();
if (gatewayRecoveryStatus != null) {
RecoveryState gatewayRecoveryState = gatewayService.recoveryState();
if (gatewayRecoveryState != null) {
GatewayRecoveryStatus.Stage stage;
switch (gatewayRecoveryStatus.stage()) {
switch (gatewayRecoveryState.getStage()) {
case INIT:
stage = GatewayRecoveryStatus.Stage.INIT;
break;
@ -224,8 +227,8 @@ public class TransportIndicesStatusAction extends TransportBroadcastOperationAct
default:
stage = GatewayRecoveryStatus.Stage.INIT;
}
shardStatus.gatewayRecoveryStatus = new GatewayRecoveryStatus(stage, gatewayRecoveryStatus.startTime(), gatewayRecoveryStatus.time(),
gatewayRecoveryStatus.index().totalSize(), gatewayRecoveryStatus.index().reusedTotalSize(), gatewayRecoveryStatus.index().currentFilesSize(), gatewayRecoveryStatus.translog().currentTranslogOperations());
shardStatus.gatewayRecoveryStatus = new GatewayRecoveryStatus(stage, gatewayRecoveryState.getTimer().startTime(), gatewayRecoveryState.getTimer().time(),
gatewayRecoveryState.getIndex().totalByteCount(), gatewayRecoveryState.getIndex().reusedByteCount(), gatewayRecoveryState.getIndex().recoveredByteCount(), gatewayRecoveryState.getTranslog().currentTranslogOperations());
}
}

View File

@ -87,6 +87,9 @@ import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
import org.elasticsearch.action.admin.indices.status.IndicesStatusRequest;
import org.elasticsearch.action.admin.indices.status.IndicesStatusRequestBuilder;
import org.elasticsearch.action.admin.indices.status.IndicesStatusResponse;
import org.elasticsearch.action.admin.indices.recovery.RecoveryRequest;
import org.elasticsearch.action.admin.indices.recovery.RecoveryRequestBuilder;
import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse;
import org.elasticsearch.action.admin.indices.template.delete.DeleteIndexTemplateRequest;
import org.elasticsearch.action.admin.indices.template.delete.DeleteIndexTemplateRequestBuilder;
import org.elasticsearch.action.admin.indices.template.delete.DeleteIndexTemplateResponse;
@ -184,6 +187,21 @@ public interface IndicesAdminClient {
*/
IndicesStatsRequestBuilder prepareStats(String... indices);
/**
* Indices recoveries
*/
ActionFuture<RecoveryResponse> recoveries(RecoveryRequest request);
/**
*Indices recoveries
*/
void recoveries(RecoveryRequest request, ActionListener<RecoveryResponse> listener);
/**
* Indices recoveries
*/
RecoveryRequestBuilder prepareRecoveries(String... indices);
/**
* The status of one or more indices.
*

View File

@ -89,6 +89,10 @@ import org.elasticsearch.action.admin.indices.refresh.RefreshAction;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequestBuilder;
import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
import org.elasticsearch.action.admin.indices.recovery.RecoveryAction;
import org.elasticsearch.action.admin.indices.recovery.RecoveryRequest;
import org.elasticsearch.action.admin.indices.recovery.RecoveryRequestBuilder;
import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse;
import org.elasticsearch.action.admin.indices.segments.IndicesSegmentResponse;
import org.elasticsearch.action.admin.indices.segments.IndicesSegmentsAction;
import org.elasticsearch.action.admin.indices.segments.IndicesSegmentsRequest;
@ -450,6 +454,21 @@ public abstract class AbstractIndicesAdminClient implements InternalIndicesAdmin
return new IndicesStatusRequestBuilder(this).setIndices(indices);
}
@Override
public ActionFuture<RecoveryResponse> recoveries(final RecoveryRequest request) {
return execute(RecoveryAction.INSTANCE, request);
}
@Override
public void recoveries(final RecoveryRequest request, final ActionListener<RecoveryResponse> listener) {
execute(RecoveryAction.INSTANCE, request, listener);
}
@Override
public RecoveryRequestBuilder prepareRecoveries(String... indices) {
return new RecoveryRequestBuilder(this).setIndices(indices);
}
@Override
public ActionFuture<IndicesSegmentResponse> segments(final IndicesSegmentsRequest request) {
return execute(IndicesSegmentsAction.INSTANCE, request);

View File

@ -24,6 +24,7 @@ import org.elasticsearch.index.CloseableIndexComponent;
import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit;
import org.elasticsearch.index.shard.IndexShardComponent;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.indices.recovery.RecoveryState;
/**
*
@ -35,7 +36,7 @@ public interface IndexShardGateway extends IndexShardComponent, CloseableIndexCo
/**
* The last / on going recovery status.
*/
RecoveryStatus recoveryStatus();
RecoveryState recoveryState();
/**
* The last snapshot status performed. Can be <tt>null</tt>.
@ -51,7 +52,7 @@ public interface IndexShardGateway extends IndexShardComponent, CloseableIndexCo
/**
* Recovers the state of the shard from the gateway.
*/
void recover(boolean indexShouldExists, RecoveryStatus recoveryStatus) throws IndexShardGatewayRecoveryException;
void recover(boolean indexShouldExists, RecoveryState recoveryState) throws IndexShardGatewayRecoveryException;
/**
* Snapshots the given shard into the gateway.

View File

@ -19,6 +19,7 @@
package org.elasticsearch.index.gateway;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
@ -35,7 +36,7 @@ import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.index.shard.service.InternalIndexShard;
import org.elasticsearch.index.snapshots.IndexShardSnapshotAndRestoreService;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.concurrent.ScheduledFuture;
@ -53,6 +54,8 @@ public class IndexShardGatewayService extends AbstractIndexShardComponent implem
private final IndexSettingsService indexSettingsService;
private final ClusterService clusterService;
private final InternalIndexShard indexShard;
private final IndexShardGateway shardGateway;
@ -72,7 +75,7 @@ public class IndexShardGatewayService extends AbstractIndexShardComponent implem
private volatile ScheduledFuture snapshotScheduleFuture;
private RecoveryStatus recoveryStatus;
private RecoveryState recoveryState;
private IndexShardGateway.SnapshotLock snapshotLock;
@ -84,13 +87,15 @@ public class IndexShardGatewayService extends AbstractIndexShardComponent implem
@Inject
public IndexShardGatewayService(ShardId shardId, @IndexSettings Settings indexSettings, IndexSettingsService indexSettingsService,
ThreadPool threadPool, IndexShard indexShard, IndexShardGateway shardGateway, IndexShardSnapshotAndRestoreService snapshotService,
RepositoriesService repositoriesService) {
ClusterService clusterService) {
super(shardId, indexSettings);
this.threadPool = threadPool;
this.indexSettingsService = indexSettingsService;
this.indexShard = (InternalIndexShard) indexShard;
this.shardGateway = shardGateway;
this.snapshotService = snapshotService;
this.recoveryState = new RecoveryState(shardId);
this.clusterService = clusterService;
this.snapshotOnClose = componentSettings.getAsBoolean("snapshot_on_close", true);
this.snapshotInterval = componentSettings.getAsTime("snapshot_interval", TimeValue.timeValueSeconds(10));
@ -131,14 +136,11 @@ public class IndexShardGatewayService extends AbstractIndexShardComponent implem
void onRecoveryFailed(IndexShardGatewayRecoveryException e);
}
public RecoveryStatus recoveryStatus() {
if (recoveryStatus == null) {
return recoveryStatus;
public RecoveryState recoveryState() {
if (recoveryState.getTimer().startTime() > 0 && recoveryState.getStage() != RecoveryState.Stage.DONE) {
recoveryState.getTimer().time(System.currentTimeMillis() - recoveryState.getTimer().startTime());
}
if (recoveryStatus.startTime() > 0 && recoveryStatus.stage() != RecoveryStatus.Stage.DONE) {
recoveryStatus.time(System.currentTimeMillis() - recoveryStatus.startTime());
}
return recoveryStatus;
return recoveryState;
}
public SnapshotStatus snapshotStatus() {
@ -177,22 +179,28 @@ public class IndexShardGatewayService extends AbstractIndexShardComponent implem
threadPool.generic().execute(new Runnable() {
@Override
public void run() {
recoveryStatus = new RecoveryStatus();
recoveryStatus.updateStage(RecoveryStatus.Stage.INIT);
recoveryState.getTimer().startTime(System.currentTimeMillis());
recoveryState.setTargetNode(clusterService.localNode());
recoveryState.setStage(RecoveryState.Stage.INIT);
recoveryState.setPrimary(indexShard.routingEntry().primary());
try {
if (indexShard.routingEntry().restoreSource() != null) {
logger.debug("restoring from {} ...", indexShard.routingEntry().restoreSource());
snapshotService.restore(recoveryStatus);
recoveryState.setType(RecoveryState.Type.SNAPSHOT);
recoveryState.setRestoreSource(indexShard.routingEntry().restoreSource());
snapshotService.restore(recoveryState);
} else {
logger.debug("starting recovery from {} ...", shardGateway);
shardGateway.recover(indexShouldExists, recoveryStatus);
recoveryState.setType(RecoveryState.Type.GATEWAY);
recoveryState.setSourceNode(clusterService.localNode());
shardGateway.recover(indexShouldExists, recoveryState);
}
lastIndexVersion = recoveryStatus.index().version();
lastIndexVersion = recoveryState.getIndex().version();
lastTranslogId = -1;
lastTranslogLength = 0;
lastTotalTranslogOperations = recoveryStatus.translog().currentTranslogOperations();
lastTotalTranslogOperations = recoveryState.getTranslog().currentTranslogOperations();
// start the shard if the gateway has not started it already. Note that if the gateway
// moved shard to POST_RECOVERY, it may have been started as well if:
@ -206,20 +214,20 @@ public class IndexShardGatewayService extends AbstractIndexShardComponent implem
// refresh the shard
indexShard.refresh(new Engine.Refresh("post_gateway").force(true));
recoveryStatus.time(System.currentTimeMillis() - recoveryStatus.startTime());
recoveryStatus.updateStage(RecoveryStatus.Stage.DONE);
recoveryState.getTimer().time(System.currentTimeMillis() - recoveryState.getTimer().startTime());
recoveryState.setStage(RecoveryState.Stage.DONE);
if (logger.isTraceEnabled()) {
StringBuilder sb = new StringBuilder();
sb.append("recovery completed from ").append(shardGateway).append(", took [").append(timeValueMillis(recoveryStatus.time())).append("]\n");
sb.append(" index : files [").append(recoveryStatus.index().numberOfFiles()).append("] with total_size [").append(new ByteSizeValue(recoveryStatus.index().totalSize())).append("], took[").append(TimeValue.timeValueMillis(recoveryStatus.index().time())).append("]\n");
sb.append(" : recovered_files [").append(recoveryStatus.index().numberOfRecoveredFiles()).append("] with total_size [").append(new ByteSizeValue(recoveryStatus.index().recoveredTotalSize())).append("]\n");
sb.append(" : reusing_files [").append(recoveryStatus.index().numberOfReusedFiles()).append("] with total_size [").append(new ByteSizeValue(recoveryStatus.index().reusedTotalSize())).append("]\n");
sb.append(" start : took [").append(TimeValue.timeValueMillis(recoveryStatus.start().time())).append("], check_index [").append(timeValueMillis(recoveryStatus.start().checkIndexTime())).append("]\n");
sb.append(" translog : number_of_operations [").append(recoveryStatus.translog().currentTranslogOperations()).append("], took [").append(TimeValue.timeValueMillis(recoveryStatus.translog().time())).append("]");
sb.append("recovery completed from ").append(shardGateway).append(", took [").append(timeValueMillis(recoveryState.getTimer().time())).append("]\n");
sb.append(" index : files [").append(recoveryState.getIndex().totalFileCount()).append("] with total_size [").append(new ByteSizeValue(recoveryState.getIndex().totalByteCount())).append("], took[").append(TimeValue.timeValueMillis(recoveryState.getIndex().time())).append("]\n");
sb.append(" : recovered_files [").append(recoveryState.getIndex().numberOfRecoveredFiles()).append("] with total_size [").append(new ByteSizeValue(recoveryState.getIndex().recoveredTotalSize())).append("]\n");
sb.append(" : reusing_files [").append(recoveryState.getIndex().reusedFileCount()).append("] with total_size [").append(new ByteSizeValue(recoveryState.getIndex().reusedByteCount())).append("]\n");
sb.append(" start : took [").append(TimeValue.timeValueMillis(recoveryState.getStart().time())).append("], check_index [").append(timeValueMillis(recoveryState.getStart().checkIndexTime())).append("]\n");
sb.append(" translog : number_of_operations [").append(recoveryState.getTranslog().currentTranslogOperations()).append("], took [").append(TimeValue.timeValueMillis(recoveryState.getTranslog().time())).append("]");
logger.trace(sb.toString());
} else if (logger.isDebugEnabled()) {
logger.debug("recovery completed from [{}], took [{}]", shardGateway, timeValueMillis(recoveryStatus.time()));
logger.debug("recovery completed from [{}], took [{}]", shardGateway, timeValueMillis(recoveryState.getTimer().time()));
}
listener.onRecoveryDone();
scheduleSnapshotIfNeeded();

View File

@ -1,220 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.gateway;
import java.util.concurrent.atomic.AtomicLong;
/**
*
*/
public class RecoveryStatus {
public static enum Stage {
INIT,
INDEX,
START,
TRANSLOG,
DONE
}
private Stage stage = Stage.INIT;
private long startTime = System.currentTimeMillis();
private long time;
private Index index = new Index();
private Translog translog = new Translog();
private Start start = new Start();
public Stage stage() {
return this.stage;
}
public RecoveryStatus updateStage(Stage stage) {
this.stage = stage;
return this;
}
public long startTime() {
return this.startTime;
}
public void startTime(long startTime) {
this.startTime = startTime;
}
public long time() {
return this.time;
}
public void time(long time) {
this.time = time;
}
public Index index() {
return index;
}
public Start start() {
return this.start;
}
public Translog translog() {
return translog;
}
public static class Start {
private long startTime;
private long time;
private long checkIndexTime;
public long startTime() {
return this.startTime;
}
public void startTime(long startTime) {
this.startTime = startTime;
}
public long time() {
return this.time;
}
public void time(long time) {
this.time = time;
}
public long checkIndexTime() {
return checkIndexTime;
}
public void checkIndexTime(long checkIndexTime) {
this.checkIndexTime = checkIndexTime;
}
}
public static class Translog {
private long startTime = 0;
private long time;
private volatile int currentTranslogOperations = 0;
public long startTime() {
return this.startTime;
}
public void startTime(long startTime) {
this.startTime = startTime;
}
public long time() {
return this.time;
}
public void time(long time) {
this.time = time;
}
public void addTranslogOperations(int count) {
this.currentTranslogOperations += count;
}
public int currentTranslogOperations() {
return this.currentTranslogOperations;
}
}
public static class Index {
private long startTime = 0;
private long time = 0;
private long version = -1;
private int numberOfFiles = 0;
private long totalSize = 0;
private int numberOfReusedFiles = 0;
private long reusedTotalSize = 0;
private AtomicLong currentFilesSize = new AtomicLong();
public long startTime() {
return this.startTime;
}
public void startTime(long startTime) {
this.startTime = startTime;
}
public long time() {
return this.time;
}
public void time(long time) {
this.time = time;
}
public long version() {
return this.version;
}
public void files(int numberOfFiles, long totalSize, int numberOfReusedFiles, long reusedTotalSize) {
this.numberOfFiles = numberOfFiles;
this.totalSize = totalSize;
this.numberOfReusedFiles = numberOfReusedFiles;
this.reusedTotalSize = reusedTotalSize;
}
public int numberOfFiles() {
return numberOfFiles;
}
public int numberOfRecoveredFiles() {
return numberOfFiles - numberOfReusedFiles;
}
public long totalSize() {
return this.totalSize;
}
public int numberOfReusedFiles() {
return numberOfReusedFiles;
}
public long reusedTotalSize() {
return this.reusedTotalSize;
}
public long recoveredTotalSize() {
return totalSize - reusedTotalSize;
}
public void updateVersion(long version) {
this.version = version;
}
public long currentFilesSize() {
return this.currentFilesSize.get();
}
public void addCurrentFilesSize(long updatedSize) {
this.currentFilesSize.addAndGet(updatedSize);
}
}
}

View File

@ -46,6 +46,7 @@ import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.store.StoreFileMetaData;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogStreams;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException;
@ -77,7 +78,7 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo
protected final ImmutableBlobContainer blobContainer;
private volatile RecoveryStatus recoveryStatus;
private volatile RecoveryState recoveryState;
private volatile SnapshotStatus lastSnapshotStatus;
@ -99,12 +100,12 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo
this.blobContainer = blobStore.immutableBlobContainer(shardPath);
this.recoveryStatus = new RecoveryStatus();
this.recoveryState = new RecoveryState();
}
@Override
public RecoveryStatus recoveryStatus() {
return this.recoveryStatus;
public RecoveryState recoveryState() {
return this.recoveryState;
}
@Override
@ -365,8 +366,8 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo
}
@Override
public void recover(boolean indexShouldExists, RecoveryStatus recoveryStatus) throws IndexShardGatewayRecoveryException {
this.recoveryStatus = recoveryStatus;
public void recover(boolean indexShouldExists, RecoveryState recoveryState) throws IndexShardGatewayRecoveryException {
this.recoveryState = recoveryState;
final ImmutableMap<String, BlobMetaData> blobs;
try {
@ -400,8 +401,8 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo
} catch (IOException e) {
logger.warn("failed to clean store before starting shard", e);
}
recoveryStatus.index().startTime(System.currentTimeMillis());
recoveryStatus.index().time(System.currentTimeMillis() - recoveryStatus.index().startTime());
recoveryState.getIndex().startTime(System.currentTimeMillis());
recoveryState.getIndex().time(System.currentTimeMillis() - recoveryState.getIndex().startTime());
return;
}
@ -411,9 +412,9 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo
continue;
}
try {
recoveryStatus.index().startTime(System.currentTimeMillis());
recoveryState.getIndex().startTime(System.currentTimeMillis());
recoverIndex(commitPoint, blobs);
recoveryStatus.index().time(System.currentTimeMillis() - recoveryStatus.index().startTime());
recoveryState.getIndex().time(System.currentTimeMillis() - recoveryState.getIndex().startTime());
recoverTranslog(commitPoint, blobs);
return;
@ -427,23 +428,23 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo
private void recoverTranslog(CommitPoint commitPoint, ImmutableMap<String, BlobMetaData> blobs) throws IndexShardGatewayRecoveryException {
if (commitPoint.translogFiles().isEmpty()) {
// no translog files, bail
recoveryStatus.start().startTime(System.currentTimeMillis());
recoveryStatus.updateStage(RecoveryStatus.Stage.START);
recoveryState.getStart().startTime(System.currentTimeMillis());
recoveryState.setStage(RecoveryState.Stage.START);
indexShard.postRecovery("post recovery from gateway, no translog");
recoveryStatus.start().time(System.currentTimeMillis() - recoveryStatus.start().startTime());
recoveryStatus.start().checkIndexTime(indexShard.checkIndexTook());
recoveryState.getStart().time(System.currentTimeMillis() - recoveryState.getStart().startTime());
recoveryState.getStart().checkIndexTime(indexShard.checkIndexTook());
return;
}
try {
recoveryStatus.start().startTime(System.currentTimeMillis());
recoveryStatus.updateStage(RecoveryStatus.Stage.START);
recoveryState.getStart().startTime(System.currentTimeMillis());
recoveryState.setStage(RecoveryState.Stage.START);
indexShard.performRecoveryPrepareForTranslog();
recoveryStatus.start().time(System.currentTimeMillis() - recoveryStatus.start().startTime());
recoveryStatus.start().checkIndexTime(indexShard.checkIndexTook());
recoveryState.getStart().time(System.currentTimeMillis() - recoveryState.getStart().startTime());
recoveryState.getStart().checkIndexTime(indexShard.checkIndexTook());
recoveryStatus.updateStage(RecoveryStatus.Stage.TRANSLOG);
recoveryStatus.translog().startTime(System.currentTimeMillis());
recoveryState.setStage(RecoveryState.Stage.TRANSLOG);
recoveryState.getTranslog().startTime(System.currentTimeMillis());
final AtomicReference<Throwable> failure = new AtomicReference<Throwable>();
final CountDownLatch latch = new CountDownLatch(1);
@ -481,14 +482,14 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo
if ((si.position() - curPos) != opSize) {
logger.warn("mismatch in size, expected [{}], got [{}]", opSize, si.position() - curPos);
}
recoveryStatus.translog().addTranslogOperations(1);
recoveryState.getTranslog().addTranslogOperations(1);
indexShard.performRecoveryOperation(operation);
if (si.position() >= bos.size()) {
position = si.position();
break;
}
} catch (Throwable e) {
logger.warn("failed to retrieve translog after [{}] operations, ignoring the rest, considered corrupted", e, recoveryStatus.translog().currentTranslogOperations());
logger.warn("failed to retrieve translog after [{}] operations, ignoring the rest, considered corrupted", e, recoveryState.getTranslog().currentTranslogOperations());
ignore = true;
latch.countDown();
return;
@ -531,14 +532,14 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo
}
indexShard.performRecoveryFinalization(true);
recoveryStatus.translog().time(System.currentTimeMillis() - recoveryStatus.translog().startTime());
recoveryState.getTranslog().time(System.currentTimeMillis() - recoveryState.getTranslog().startTime());
} catch (Throwable e) {
throw new IndexShardGatewayRecoveryException(shardId, "Failed to recover translog", e);
}
}
private void recoverIndex(CommitPoint commitPoint, ImmutableMap<String, BlobMetaData> blobs) throws Exception {
recoveryStatus.updateStage(RecoveryStatus.Stage.INDEX);
recoveryState.setStage(RecoveryState.Stage.INDEX);
int numberOfFiles = 0;
long totalSize = 0;
int numberOfReusedFiles = 0;
@ -576,7 +577,7 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo
}
}
recoveryStatus.index().files(numberOfFiles, totalSize, numberOfReusedFiles, reusedTotalSize);
recoveryState.getIndex().files(numberOfFiles, totalSize, numberOfReusedFiles, reusedTotalSize);
if (filesToRecover.isEmpty()) {
logger.trace("no files to recover, all exists within the local store");
}
@ -611,7 +612,7 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo
} catch (IOException e) {
throw new IndexShardGatewayRecoveryException(shardId(), "Failed to fetch index version after copying it over", e);
}
recoveryStatus.index().updateVersion(version);
recoveryState.getIndex().updateVersion(version);
/// now, go over and clean files that are in the store, but were not in the gateway
try {
@ -657,7 +658,7 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo
blobContainer.readBlob(firstFileToRecover, new BlobContainer.ReadBlobListener() {
@Override
public synchronized void onPartial(byte[] data, int offset, int size) throws IOException {
recoveryStatus.index().addCurrentFilesSize(size);
recoveryState.getIndex().addRecoveredByteCount(size);
indexOutput.writeBytes(data, offset, size);
}
@ -678,6 +679,7 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo
store.writeChecksum(fileInfo.physicalName(), fileInfo.checksum());
}
store.directory().sync(Collections.singleton(fileInfo.physicalName()));
recoveryState.getIndex().addRecoveredFileCount(1);
} catch (IOException e) {
onFailure(e);
return;

View File

@ -31,7 +31,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.gateway.IndexShardGateway;
import org.elasticsearch.index.gateway.IndexShardGatewayRecoveryException;
import org.elasticsearch.index.gateway.RecoveryStatus;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.index.gateway.SnapshotStatus;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
@ -61,7 +61,7 @@ public class LocalIndexShardGateway extends AbstractIndexShardComponent implemen
private final InternalIndexShard indexShard;
private final RecoveryStatus recoveryStatus = new RecoveryStatus();
private final RecoveryState recoveryState = new RecoveryState();
private volatile ScheduledFuture flushScheduler;
private final TimeValue syncInterval;
@ -90,14 +90,14 @@ public class LocalIndexShardGateway extends AbstractIndexShardComponent implemen
}
@Override
public RecoveryStatus recoveryStatus() {
return recoveryStatus;
public RecoveryState recoveryState() {
return recoveryState;
}
@Override
public void recover(boolean indexShouldExists, RecoveryStatus recoveryStatus) throws IndexShardGatewayRecoveryException {
recoveryStatus.index().startTime(System.currentTimeMillis());
recoveryStatus.updateStage(RecoveryStatus.Stage.INDEX);
public void recover(boolean indexShouldExists, RecoveryState recoveryState) throws IndexShardGatewayRecoveryException {
recoveryState.getIndex().startTime(System.currentTimeMillis());
recoveryState.setStage(RecoveryState.Stage.INDEX);
long version = -1;
long translogId = -1;
try {
@ -135,8 +135,8 @@ public class LocalIndexShardGateway extends AbstractIndexShardComponent implemen
} catch (Throwable e) {
throw new IndexShardGatewayRecoveryException(shardId(), "failed to fetch index version after copying it over", e);
}
recoveryStatus.index().updateVersion(version);
recoveryStatus.index().time(System.currentTimeMillis() - recoveryStatus.index().startTime());
recoveryState.getIndex().updateVersion(version);
recoveryState.getIndex().time(System.currentTimeMillis() - recoveryState.getIndex().startTime());
// since we recover from local, just fill the files and size
try {
@ -144,21 +144,25 @@ public class LocalIndexShardGateway extends AbstractIndexShardComponent implemen
long totalSizeInBytes = 0;
for (String name : indexShard.store().directory().listAll()) {
numberOfFiles++;
totalSizeInBytes += indexShard.store().directory().fileLength(name);
long length = indexShard.store().directory().fileLength(name);
totalSizeInBytes += length;
recoveryState.getIndex().addFileDetail(name, length, length);
}
recoveryStatus.index().files(numberOfFiles, totalSizeInBytes, numberOfFiles, totalSizeInBytes);
recoveryState.getIndex().files(numberOfFiles, totalSizeInBytes, numberOfFiles, totalSizeInBytes);
recoveryState.getIndex().recoveredFileCount(numberOfFiles);
recoveryState.getIndex().recoveredByteCount(totalSizeInBytes);
} catch (Exception e) {
// ignore
}
recoveryStatus.start().startTime(System.currentTimeMillis());
recoveryStatus.updateStage(RecoveryStatus.Stage.START);
recoveryState.getStart().startTime(System.currentTimeMillis());
recoveryState.setStage(RecoveryState.Stage.START);
if (translogId == -1) {
// no translog files, bail
indexShard.postRecovery("post recovery from gateway, no translog");
// no index, just start the shard and bail
recoveryStatus.start().time(System.currentTimeMillis() - recoveryStatus.start().startTime());
recoveryStatus.start().checkIndexTime(indexShard.checkIndexTook());
recoveryState.getStart().time(System.currentTimeMillis() - recoveryState.getStart().startTime());
recoveryState.getStart().checkIndexTime(indexShard.checkIndexTook());
return;
}
@ -192,18 +196,18 @@ public class LocalIndexShardGateway extends AbstractIndexShardComponent implemen
// no translog files, bail
indexShard.postRecovery("post recovery from gateway, no translog");
// no index, just start the shard and bail
recoveryStatus.start().time(System.currentTimeMillis() - recoveryStatus.start().startTime());
recoveryStatus.start().checkIndexTime(indexShard.checkIndexTook());
recoveryState.getStart().time(System.currentTimeMillis() - recoveryState.getStart().startTime());
recoveryState.getStart().checkIndexTime(indexShard.checkIndexTook());
return;
}
// recover from the translog file
indexShard.performRecoveryPrepareForTranslog();
recoveryStatus.start().time(System.currentTimeMillis() - recoveryStatus.start().startTime());
recoveryStatus.start().checkIndexTime(indexShard.checkIndexTook());
recoveryState.getStart().time(System.currentTimeMillis() - recoveryState.getStart().startTime());
recoveryState.getStart().checkIndexTime(indexShard.checkIndexTook());
recoveryStatus.translog().startTime(System.currentTimeMillis());
recoveryStatus.updateStage(RecoveryStatus.Stage.TRANSLOG);
recoveryState.getTranslog().startTime(System.currentTimeMillis());
recoveryState.setStage(RecoveryState.Stage.TRANSLOG);
FileInputStream fs = null;
try {
fs = new FileInputStream(recoveringTranslogFile);
@ -222,7 +226,7 @@ public class LocalIndexShardGateway extends AbstractIndexShardComponent implemen
}
try {
indexShard.performRecoveryOperation(operation);
recoveryStatus.translog().addTranslogOperations(1);
recoveryState.getTranslog().addTranslogOperations(1);
} catch (ElasticsearchException e) {
if (e.status() == RestStatus.BAD_REQUEST) {
// mainly for MapperParsingException and Failure to detect xcontent
@ -247,7 +251,7 @@ public class LocalIndexShardGateway extends AbstractIndexShardComponent implemen
recoveringTranslogFile.delete();
recoveryStatus.translog().time(System.currentTimeMillis() - recoveryStatus.translog().startTime());
recoveryState.getTranslog().time(System.currentTimeMillis() - recoveryState.getTranslog().startTime());
}
@Override

View File

@ -24,7 +24,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.gateway.none.NoneGateway;
import org.elasticsearch.index.gateway.IndexShardGateway;
import org.elasticsearch.index.gateway.IndexShardGatewayRecoveryException;
import org.elasticsearch.index.gateway.RecoveryStatus;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.index.gateway.SnapshotStatus;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
@ -41,7 +41,7 @@ public class NoneIndexShardGateway extends AbstractIndexShardComponent implement
private final InternalIndexShard indexShard;
private final RecoveryStatus recoveryStatus = new RecoveryStatus();
private final RecoveryState recoveryState = new RecoveryState();
@Inject
public NoneIndexShardGateway(ShardId shardId, @IndexSettings Settings indexSettings, IndexShard indexShard) {
@ -55,13 +55,13 @@ public class NoneIndexShardGateway extends AbstractIndexShardComponent implement
}
@Override
public RecoveryStatus recoveryStatus() {
return recoveryStatus;
public RecoveryState recoveryState() {
return recoveryState;
}
@Override
public void recover(boolean indexShouldExists, RecoveryStatus recoveryStatus) throws IndexShardGatewayRecoveryException {
recoveryStatus.index().startTime(System.currentTimeMillis());
public void recover(boolean indexShouldExists, RecoveryState recoveryState) throws IndexShardGatewayRecoveryException {
recoveryState.getIndex().startTime(System.currentTimeMillis());
// in the none case, we simply start the shard
// clean the store, there should be nothing there...
try {
@ -71,9 +71,9 @@ public class NoneIndexShardGateway extends AbstractIndexShardComponent implement
logger.warn("failed to clean store before starting shard", e);
}
indexShard.postRecovery("post recovery from gateway");
recoveryStatus.index().time(System.currentTimeMillis() - recoveryStatus.index().startTime());
recoveryStatus.translog().startTime(System.currentTimeMillis());
recoveryStatus.translog().time(System.currentTimeMillis() - recoveryStatus.index().startTime());
recoveryState.getIndex().time(System.currentTimeMillis() - recoveryState.getIndex().startTime());
recoveryState.getTranslog().startTime(System.currentTimeMillis());
recoveryState.getTranslog().time(System.currentTimeMillis() - recoveryState.getIndex().startTime());
}
@Override

View File

@ -135,7 +135,7 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
private volatile ScheduledFuture mergeScheduleFuture;
private volatile ShardRouting shardRouting;
private RecoveryStatus peerRecoveryStatus;
private RecoveryStatus recoveryStatus;
private ApplyRefreshSettings applyRefreshSettings = new ApplyRefreshSettings();
@ -702,13 +702,13 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
/**
* The peer recovery status if this shard recovered from a peer shard.
*/
public RecoveryStatus peerRecoveryStatus() {
return this.peerRecoveryStatus;
public RecoveryStatus recoveryStatus() {
return this.recoveryStatus;
}
public void performRecoveryFinalization(boolean withFlush, RecoveryStatus peerRecoveryStatus) throws ElasticsearchException {
public void performRecoveryFinalization(boolean withFlush, RecoveryStatus recoveryStatus) throws ElasticsearchException {
performRecoveryFinalization(withFlush);
this.peerRecoveryStatus = peerRecoveryStatus;
this.recoveryStatus = recoveryStatus;
}
public void performRecoveryFinalization(boolean withFlush) throws ElasticsearchException {

View File

@ -21,7 +21,7 @@ package org.elasticsearch.index.snapshots;
import org.elasticsearch.cluster.metadata.SnapshotId;
import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit;
import org.elasticsearch.index.gateway.RecoveryStatus;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.index.shard.ShardId;
/**
@ -56,9 +56,9 @@ public interface IndexShardRepository {
* @param snapshotId snapshot id
* @param shardId shard id (in the current index)
* @param snapshotShardId shard id (in the snapshot)
* @param recoveryStatus recovery status
* @param recoveryState recovery state
*/
void restore(SnapshotId snapshotId, ShardId shardId, ShardId snapshotShardId, RecoveryStatus recoveryStatus);
void restore(SnapshotId snapshotId, ShardId shardId, ShardId snapshotShardId, RecoveryState recoveryState);
/**
* Retrieve shard snapshot status for the stored snapshot

View File

@ -27,7 +27,7 @@ import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit;
import org.elasticsearch.index.engine.SnapshotFailedEngineException;
import org.elasticsearch.index.gateway.RecoveryStatus;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
import org.elasticsearch.index.shard.IndexShardState;
@ -104,9 +104,9 @@ public class IndexShardSnapshotAndRestoreService extends AbstractIndexShardCompo
/**
* Restores shard from {@link RestoreSource} associated with this shard in routing table
*
* @param recoveryStatus recovery status
* @param recoveryState recovery state
*/
public void restore(final RecoveryStatus recoveryStatus) {
public void restore(final RecoveryState recoveryState) {
RestoreSource restoreSource = indexShard.routingEntry().restoreSource();
if (restoreSource == null) {
throw new IndexShardRestoreFailedException(shardId, "empty restore source");
@ -120,7 +120,7 @@ public class IndexShardSnapshotAndRestoreService extends AbstractIndexShardCompo
if (!shardId.getIndex().equals(restoreSource.index())) {
snapshotShardId = new ShardId(restoreSource.index(), shardId.id());
}
indexShardRepository.restore(restoreSource.snapshotId(), shardId, snapshotShardId, recoveryStatus);
indexShardRepository.restore(restoreSource.snapshotId(), shardId, snapshotShardId, recoveryState);
restoreService.indexShardRestoreCompleted(restoreSource.snapshotId(), shardId);
} catch (Throwable t) {
throw new IndexShardRestoreFailedException(shardId, "restore failed", t);

View File

@ -39,7 +39,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.xcontent.*;
import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit;
import org.elasticsearch.index.gateway.RecoveryStatus;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.snapshots.*;
import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo;
@ -145,13 +145,13 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements
* {@inheritDoc}
*/
@Override
public void restore(SnapshotId snapshotId, ShardId shardId, ShardId snapshotShardId, RecoveryStatus recoveryStatus) {
RestoreContext snapshotContext = new RestoreContext(snapshotId, shardId, snapshotShardId, recoveryStatus);
public void restore(SnapshotId snapshotId, ShardId shardId, ShardId snapshotShardId, RecoveryState recoveryState) {
RestoreContext snapshotContext = new RestoreContext(snapshotId, shardId, snapshotShardId, recoveryState);
try {
recoveryStatus.index().startTime(System.currentTimeMillis());
recoveryState.getIndex().startTime(System.currentTimeMillis());
snapshotContext.restore();
recoveryStatus.index().time(System.currentTimeMillis() - recoveryStatus.index().startTime());
recoveryState.getIndex().time(System.currentTimeMillis() - recoveryState.getIndex().startTime());
} catch (Throwable e) {
throw new IndexShardRestoreFailedException(shardId, "failed to restore snapshot [" + snapshotId.getSnapshot() + "]", e);
}
@ -638,7 +638,7 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements
private final Store store;
private final RecoveryStatus recoveryStatus;
private final RecoveryState recoveryState;
/**
* Constructs new restore context
@ -646,12 +646,12 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements
* @param snapshotId snapshot id
* @param shardId shard to be restored
* @param snapshotShardId shard in the snapshot that data should be restored from
* @param recoveryStatus recovery status to report progress
* @param recoveryState recovery state to report progress
*/
public RestoreContext(SnapshotId snapshotId, ShardId shardId, ShardId snapshotShardId, RecoveryStatus recoveryStatus) {
public RestoreContext(SnapshotId snapshotId, ShardId shardId, ShardId snapshotShardId, RecoveryState recoveryState) {
super(snapshotId, shardId, snapshotShardId);
store = indicesService.indexServiceSafe(shardId.getIndex()).shardInjectorSafe(shardId.id()).getInstance(Store.class);
this.recoveryStatus = recoveryStatus;
this.recoveryState = recoveryState;
}
/**
@ -661,7 +661,7 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements
logger.debug("[{}] [{}] restoring to [{}] ...", snapshotId, repositoryName, shardId);
BlobStoreIndexShardSnapshot snapshot = loadSnapshot();
recoveryStatus.updateStage(RecoveryStatus.Stage.INDEX);
recoveryState.setStage(RecoveryState.Stage.INDEX);
int numberOfFiles = 0;
long totalSize = 0;
int numberOfReusedFiles = 0;
@ -682,12 +682,14 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements
totalSize += md.length();
numberOfReusedFiles++;
reusedTotalSize += md.length();
recoveryState.getIndex().addReusedFileDetail(fileInfo.name(), fileInfo.length());
if (logger.isTraceEnabled()) {
logger.trace("not_recovering [{}], exists in local store and is same", fileInfo.physicalName());
}
} else {
totalSize += fileInfo.length();
filesToRecover.add(fileInfo);
recoveryState.getIndex().addFileDetail(fileInfo.name(), fileInfo.length());
if (logger.isTraceEnabled()) {
if (md == null) {
logger.trace("recovering [{}], does not exists in local store", fileInfo.physicalName());
@ -698,7 +700,7 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements
}
}
recoveryStatus.index().files(numberOfFiles, totalSize, numberOfReusedFiles, reusedTotalSize);
recoveryState.getIndex().files(numberOfFiles, totalSize, numberOfReusedFiles, reusedTotalSize);
if (filesToRecover.isEmpty()) {
logger.trace("no files to recover, all exists within the local store");
}
@ -734,7 +736,7 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements
} catch (IOException e) {
throw new IndexShardRestoreFailedException(shardId, "Failed to fetch index version after copying it over", e);
}
recoveryStatus.index().updateVersion(version);
recoveryState.getIndex().updateVersion(version);
/// now, go over and clean files that are in the store, but were not in the snapshot
try {
@ -779,7 +781,11 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements
blobContainer.readBlob(firstFileToRecover, new BlobContainer.ReadBlobListener() {
@Override
public synchronized void onPartial(byte[] data, int offset, int size) throws IOException {
recoveryStatus.index().addCurrentFilesSize(size);
recoveryState.getIndex().addRecoveredByteCount(size);
RecoveryState.File file = recoveryState.getIndex().file(fileInfo.name());
if (file != null) {
file.updateRecovered(size);
}
indexOutput.writeBytes(data, offset, size);
if (restoreRateLimiter != null) {
rateLimiterListener.onRestorePause(restoreRateLimiter.pause(size));
@ -803,6 +809,7 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements
store.writeChecksum(fileInfo.physicalName(), fileInfo.checksum());
}
store.directory().sync(Collections.singleton(fileInfo.physicalName()));
recoveryState.getIndex().addRecoveredFileCount(1);
} catch (IOException e) {
onFailure(e);
return;

View File

@ -61,6 +61,7 @@ import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.index.shard.service.InternalIndexShard;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.recovery.RecoveryFailedException;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.indices.recovery.RecoveryStatus;
import org.elasticsearch.indices.recovery.RecoveryTarget;
import org.elasticsearch.indices.recovery.StartRecoveryRequest;
@ -665,7 +666,8 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
final DiscoveryNode sourceNode = nodes.get(entry.currentNodeId());
try {
// we are recovering a backup from a primary, so no need to mark it as relocated
final StartRecoveryRequest request = new StartRecoveryRequest(indexShard.shardId(), sourceNode, nodes.localNode(), false, indexShard.store().list());
final StartRecoveryRequest request = new StartRecoveryRequest(indexShard.shardId(), sourceNode, nodes.localNode(),
false, indexShard.store().list(), RecoveryState.Type.REPLICA);
recoveryTarget.startRecovery(request, indexShard, new PeerRecoveryListener(request, shardRouting, indexService, indexMetaData));
} catch (Throwable e) {
handleRecoveryFailure(indexService, indexMetaData, shardRouting, true, e);
@ -701,7 +703,8 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
try {
// we don't mark this one as relocated at the end, requests in any case are routed to both when its relocating
// and that way we handle the edge case where its mark as relocated, and we might need to roll it back...
final StartRecoveryRequest request = new StartRecoveryRequest(indexShard.shardId(), sourceNode, nodes.localNode(), false, indexShard.store().list());
final StartRecoveryRequest request = new StartRecoveryRequest(indexShard.shardId(), sourceNode, nodes.localNode(),
false, indexShard.store().list(), RecoveryState.Type.RELOCATION);
recoveryTarget.startRecovery(request, indexShard, new PeerRecoveryListener(request, shardRouting, indexService, indexMetaData));
} catch (Throwable e) {
handleRecoveryFailure(indexService, indexMetaData, shardRouting, true, e);

View File

@ -0,0 +1,808 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.indices.recovery;
import org.elasticsearch.common.xcontent.XContentBuilderString;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.RestoreSource;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.Locale;
/**
* Keeps track of state related to shard recovery.
*/
public class RecoveryState implements ToXContent, Streamable {
public static enum Stage {
INIT((byte) 0),
INDEX((byte) 1),
START((byte) 2),
TRANSLOG((byte) 3),
FINALIZE((byte) 4),
DONE((byte) 5);
private static final Stage[] STAGES = new Stage[Stage.values().length];
static {
for (Stage stage : Stage.values()) {
assert stage.id() < STAGES.length && stage.id() >= 0;
STAGES[stage.id] = stage;
}
}
private final byte id;
Stage(byte id) {
this.id = id;
}
public byte id() {
return id;
}
public static Stage fromId(byte id) throws ElasticsearchIllegalArgumentException {
if (id < 0 || id >= STAGES.length) {
throw new ElasticsearchIllegalArgumentException("No mapping for id [" + id + "]");
}
return STAGES[id];
}
}
public static enum Type {
GATEWAY((byte) 0),
SNAPSHOT((byte) 1),
REPLICA((byte) 2),
RELOCATION((byte) 3);
private static final Type[] TYPES = new Type[Type.values().length];
static {
for (Type type : Type.values()) {
assert type.id() < TYPES.length && type.id() >= 0;
TYPES[type.id] = type;
}
}
private final byte id;
Type(byte id) {
this.id = id;
}
public byte id() {
return id;
}
public static Type fromId(byte id) throws ElasticsearchIllegalArgumentException {
if (id < 0 || id >= TYPES.length) {
throw new ElasticsearchIllegalArgumentException("No mapping for id [" + id + "]");
}
return TYPES[id];
}
}
private volatile Stage stage = Stage.INIT;
private Index index = new Index();
private Translog translog = new Translog();
private Start start = new Start();
private Timer timer = new Timer();
private Type type;
private ShardId shardId;
private RestoreSource restoreSource;
private DiscoveryNode sourceNode;
private DiscoveryNode targetNode;
private boolean detailed = false;
private boolean primary = false;
public RecoveryState() { }
public RecoveryState(ShardId shardId) {
this.shardId = shardId;
}
public ShardId getShardId() {
return shardId;
}
public Stage getStage() {
return this.stage;
}
public RecoveryState setStage(Stage stage) {
this.stage = stage;
if (stage == Stage.DONE) {
timer.stopTime(System.currentTimeMillis());
}
return this;
}
public Index getIndex() {
return index;
}
public Start getStart() {
return this.start;
}
public Translog getTranslog() {
return translog;
}
public Timer getTimer() {
return timer;
}
public Type getType() {
return type;
}
public void setType(Type type) {
this.type = type;
}
public void setSourceNode(DiscoveryNode sourceNode) {
this.sourceNode = sourceNode;
}
public DiscoveryNode getSourceNode() {
return sourceNode;
}
public void setTargetNode(DiscoveryNode targetNode) {
this.targetNode = targetNode;
}
public DiscoveryNode getTargetNode() {
return targetNode;
}
public void setRestoreSource(RestoreSource restoreSource) {
this.restoreSource = restoreSource;
}
public RestoreSource getRestoreSource() {
return restoreSource;
}
public void setDetailed(boolean detailed) {
this.detailed = detailed;
this.index.detailed(detailed);
}
public void setPrimary(boolean primary) {
this.primary = primary;
}
public boolean getPrimary() {
return primary;
}
public static RecoveryState readRecoveryState(StreamInput in) throws IOException {
RecoveryState recoveryState = new RecoveryState();
recoveryState.readFrom(in);
return recoveryState;
}
@Override
public void readFrom(StreamInput in) throws IOException {
timer.startTime(in.readVLong());
timer.stopTime(in.readVLong());
timer.time(in.readVLong());
type = Type.fromId(in.readByte());
stage = Stage.fromId(in.readByte());
shardId = ShardId.readShardId(in);
restoreSource = RestoreSource.readOptionalRestoreSource(in);
targetNode = DiscoveryNode.readNode(in);
if (in.readBoolean()) {
sourceNode = DiscoveryNode.readNode(in);
}
index = Index.readIndex(in);
translog = Translog.readTranslog(in);
start = Start.readStart(in);
detailed = in.readBoolean();
primary = in.readBoolean();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(timer.startTime());
out.writeVLong(timer.stopTime());
out.writeVLong(timer.time());
out.writeByte(type.id());
out.writeByte(stage.id());
shardId.writeTo(out);
out.writeOptionalStreamable(restoreSource);
targetNode.writeTo(out);
out.writeBoolean(sourceNode != null);
if (sourceNode != null) {
sourceNode.writeTo(out);
}
index.writeTo(out);
translog.writeTo(out);
start.writeTo(out);
out.writeBoolean(detailed);
out.writeBoolean(primary);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.field(Fields.ID, shardId.id());
builder.field(Fields.TYPE, type.toString());
builder.field(Fields.STAGE, stage.toString());
builder.field(Fields.PRIMARY, primary);
builder.timeValueField(Fields.START_TIME_IN_MILLIS, Fields.START_TIME, timer.startTime);
builder.timeValueField(Fields.STOP_TIME_IN_MILLIS, Fields.STOP_TIME, timer.stopTime);
builder.timeValueField(Fields.TOTAL_TIME_IN_MILLIS, Fields.TOTAL_TIME, timer.time);
if (restoreSource != null) {
builder.field(Fields.SOURCE);
restoreSource.toXContent(builder, params);
} else {
builder.startObject(Fields.SOURCE);
builder.field(Fields.ID, sourceNode.id());
builder.field(Fields.HOST, sourceNode.getHostName());
builder.field(Fields.TRANSPORT_ADDRESS, sourceNode.address().toString());
builder.field(Fields.IP, sourceNode.getHostAddress());
builder.field(Fields.NAME, sourceNode.name());
builder.endObject();
}
builder.startObject(Fields.TARGET);
builder.field(Fields.ID, targetNode.id());
builder.field(Fields.HOST, targetNode.getHostName());
builder.field(Fields.TRANSPORT_ADDRESS, targetNode.address().toString());
builder.field(Fields.IP, targetNode.getHostAddress());
builder.field(Fields.NAME, targetNode.name());
builder.endObject();
builder.startObject(Fields.INDEX);
index.detailed(this.detailed);
index.toXContent(builder, params);
builder.endObject();
builder.startObject(Fields.TRANSLOG);
translog.toXContent(builder, params);
builder.endObject();
builder.startObject(Fields.START);
start.toXContent(builder, params);
builder.endObject();
return builder;
}
static final class Fields {
static final XContentBuilderString ID = new XContentBuilderString("id");
static final XContentBuilderString TYPE = new XContentBuilderString("type");
static final XContentBuilderString STAGE = new XContentBuilderString("stage");
static final XContentBuilderString PRIMARY = new XContentBuilderString("primary");
static final XContentBuilderString START_TIME = new XContentBuilderString("start_time");
static final XContentBuilderString START_TIME_IN_MILLIS = new XContentBuilderString("start_time_in_millis");
static final XContentBuilderString STOP_TIME = new XContentBuilderString("stop_time");
static final XContentBuilderString STOP_TIME_IN_MILLIS = new XContentBuilderString("stop_time_in_millis");
static final XContentBuilderString TOTAL_TIME = new XContentBuilderString("total_time");
static final XContentBuilderString TOTAL_TIME_IN_MILLIS = new XContentBuilderString("total_time_in_millis");
static final XContentBuilderString SOURCE = new XContentBuilderString("source");
static final XContentBuilderString HOST = new XContentBuilderString("host");
static final XContentBuilderString TRANSPORT_ADDRESS = new XContentBuilderString("transport_address");
static final XContentBuilderString IP = new XContentBuilderString("ip");
static final XContentBuilderString NAME = new XContentBuilderString("name");
static final XContentBuilderString TARGET = new XContentBuilderString("target");
static final XContentBuilderString INDEX = new XContentBuilderString("index");
static final XContentBuilderString TRANSLOG = new XContentBuilderString("translog");
static final XContentBuilderString START = new XContentBuilderString("start");
static final XContentBuilderString RECOVERED = new XContentBuilderString("recovered");
static final XContentBuilderString CHECK_INDEX_TIME = new XContentBuilderString("check_index_time");
static final XContentBuilderString CHECK_INDEX_TIME_IN_MILLIS = new XContentBuilderString("check_index_time_in_millis");
static final XContentBuilderString LENGTH = new XContentBuilderString("length");
static final XContentBuilderString FILES = new XContentBuilderString("files");
static final XContentBuilderString TOTAL = new XContentBuilderString("total");
static final XContentBuilderString REUSED = new XContentBuilderString("reused");
static final XContentBuilderString PERCENT = new XContentBuilderString("percent");
static final XContentBuilderString DETAILS = new XContentBuilderString("details");
static final XContentBuilderString BYTES = new XContentBuilderString("bytes");
}
public static class Timer {
private long startTime = 0;
private long time = 0;
private long stopTime = 0;
public long startTime() {
return startTime;
}
public void startTime(long startTime) {
this.startTime = startTime;
}
public long time() {
return time;
}
public void time(long time) {
this.time = time;
}
public long stopTime() {
return stopTime;
}
public void stopTime(long stopTime) {
this.stopTime = stopTime;
}
}
public static class Start implements ToXContent, Streamable {
private long startTime;
private long time;
private long checkIndexTime;
public long startTime() {
return this.startTime;
}
public void startTime(long startTime) {
this.startTime = startTime;
}
public long time() {
return this.time;
}
public void time(long time) {
this.time = time;
}
public long checkIndexTime() {
return checkIndexTime;
}
public void checkIndexTime(long checkIndexTime) {
this.checkIndexTime = checkIndexTime;
}
public static Start readStart(StreamInput in) throws IOException {
Start start = new Start();
start.readFrom(in);
return start;
}
@Override
public void readFrom(StreamInput in) throws IOException {
startTime = in.readVLong();
time = in.readVLong();
checkIndexTime = in.readVLong();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(startTime);
out.writeVLong(time);
out.writeVLong(checkIndexTime);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.timeValueField(Fields.CHECK_INDEX_TIME_IN_MILLIS, Fields.CHECK_INDEX_TIME, checkIndexTime);
builder.timeValueField(Fields.TOTAL_TIME_IN_MILLIS, Fields.TOTAL_TIME, time);
return builder;
}
}
public static class Translog implements ToXContent, Streamable {
private long startTime = 0;
private long time;
private volatile int currentTranslogOperations = 0;
public long startTime() {
return this.startTime;
}
public void startTime(long startTime) {
this.startTime = startTime;
}
public long time() {
return this.time;
}
public void time(long time) {
this.time = time;
}
public void addTranslogOperations(int count) {
this.currentTranslogOperations += count;
}
public void incrementTranslogOperations() {
this.currentTranslogOperations++;
}
public int currentTranslogOperations() {
return this.currentTranslogOperations;
}
public static Translog readTranslog(StreamInput in) throws IOException {
Translog translog = new Translog();
translog.readFrom(in);
return translog;
}
@Override
public void readFrom(StreamInput in) throws IOException {
startTime = in.readVLong();
time = in.readVLong();
currentTranslogOperations = in.readVInt();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(startTime);
out.writeVLong(time);
out.writeVInt(currentTranslogOperations);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.field(Fields.RECOVERED, currentTranslogOperations);
builder.timeValueField(Fields.TOTAL_TIME_IN_MILLIS, Fields.TOTAL_TIME, time);
return builder;
}
}
public static class File implements ToXContent, Streamable {
String name;
long length;
long recovered;
public File() { }
public File(String name, long length) {
this.name = name;
this.length = length;
}
public void updateRecovered(long length) {
recovered += length;
}
public static File readFile(StreamInput in) throws IOException {
File file = new File();
file.readFrom(in);
return file;
}
@Override
public void readFrom(StreamInput in) throws IOException {
name = in.readString();
length = in.readVLong();
recovered = in.readVLong();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(name);
out.writeVLong(length);
out.writeVLong(recovered);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(Fields.NAME, name);
builder.field(Fields.LENGTH, length);
builder.field(Fields.RECOVERED, recovered);
builder.endObject();
return builder;
}
}
public static class Index implements ToXContent, Streamable {
private long startTime = 0;
private long time = 0;
private List<File> fileDetails = new ArrayList<File>();
private List<File> reusedFileDetails = new ArrayList<File>();
private long version = -1;
private boolean detailed = false;
private int totalFileCount = 0;
private int reusedFileCount = 0;
private AtomicInteger recoveredFileCount = new AtomicInteger();
private long totalByteCount = 0;
private long reusedByteCount = 0;
private AtomicLong recoveredByteCount = new AtomicLong();
public List<File> fileDetails() {
return fileDetails;
}
public List<File> reusedFileDetails() {
return reusedFileDetails;
}
public void addFileDetail(String name, long length) {
fileDetails.add(new File(name, length));
}
public void addFileDetail(String name, long length, long recovered) {
File file = new File(name, length);
file.recovered = recovered;
fileDetails.add(file);
}
public void addFileDetails(List<String> names, List<Long> lengths) {
for (int i = 0; i < names.size(); i++) {
fileDetails.add(new File(names.get(i), lengths.get(i)));
}
}
public void addReusedFileDetail(String name, long length) {
reusedFileDetails.add(new File(name, length));
}
public void addReusedFileDetails(List<String> names, List<Long> lengths) {
for (int i = 0; i < names.size(); i++) {
reusedFileDetails.add(new File(names.get(i), lengths.get(i)));
}
}
public File file(String name) {
for (File file : fileDetails) {
if (file.name.equals(name))
return file;
}
for (File file : reusedFileDetails) {
if (file.name.equals(name)) {
return file;
}
}
return null;
}
public long startTime() {
return this.startTime;
}
public void startTime(long startTime) {
this.startTime = startTime;
}
public long time() {
return this.time;
}
public void time(long time) {
this.time = time;
}
public long version() {
return this.version;
}
public void files(int totalFileCount, long totalByteCount, int reusedFileCount, long reusedByteCount) {
this.totalFileCount = totalFileCount;
this.totalByteCount = totalByteCount;
this.reusedFileCount = reusedFileCount;
this.reusedByteCount = reusedByteCount;
}
public int totalFileCount() {
return totalFileCount;
}
public void totalFileCount(int totalFileCount) {
this.totalFileCount = totalFileCount;
}
public int recoveredFileCount() {
return recoveredFileCount.get();
}
public void recoveredFileCount(int recoveredFileCount) {
this.recoveredFileCount.set(recoveredFileCount);
}
public void addRecoveredFileCount(int updatedCount) {
this.recoveredFileCount.addAndGet(updatedCount);
}
public float percentFilesRecovered(int numberRecovered) {
if (totalFileCount == 0) { // indicates we are still in init phase
return 0.0f;
}
if ((totalFileCount - reusedFileCount) == 0) {
return 100.0f;
} else {
int d = totalFileCount - reusedFileCount;
float result = 100.0f * (numberRecovered / (float) d);
return result;
}
}
public int numberOfRecoveredFiles() {
return totalFileCount - reusedFileCount;
}
public long totalByteCount() {
return this.totalByteCount;
}
public void totalByteCount(long totalByteCount) {
this.totalByteCount = totalByteCount;
}
public long recoveredByteCount() {
return recoveredByteCount.longValue();
}
public void recoveredByteCount(long recoveredByteCount) {
this.recoveredByteCount.set(recoveredByteCount);
}
public void addRecoveredByteCount(long updatedSize) {
recoveredByteCount.addAndGet(updatedSize);
}
public long numberOfRecoveredBytes() {
return recoveredByteCount.get() - reusedByteCount;
}
public float percentBytesRecovered(long numberRecovered) {
if (totalByteCount == 0) { // indicates we are still in init phase
return 0.0f;
}
if ((totalByteCount - reusedByteCount) == 0) {
return 100.0f;
} else {
long d = totalByteCount - reusedByteCount;
float result = 100.0f * (numberRecovered / (float) d);
return result;
}
}
public int reusedFileCount() {
return reusedFileCount;
}
public long reusedByteCount() {
return this.reusedByteCount;
}
public void reusedByteCount(long reusedByteCount) {
this.reusedByteCount = reusedByteCount;
}
public long recoveredTotalSize() {
return totalByteCount - reusedByteCount;
}
public void updateVersion(long version) {
this.version = version;
}
public void detailed(boolean detailed) {
this.detailed = detailed;
}
public static Index readIndex(StreamInput in) throws IOException {
Index index = new Index();
index.readFrom(in);
return index;
}
@Override
public void readFrom(StreamInput in) throws IOException {
startTime = in.readVLong();
time = in.readVLong();
totalFileCount = in.readVInt();
totalByteCount = in.readVLong();
reusedFileCount = in.readVInt();
reusedByteCount = in.readVLong();
recoveredFileCount = new AtomicInteger(in.readVInt());
recoveredByteCount = new AtomicLong(in.readVLong());
int size = in.readVInt();
fileDetails = new ArrayList<File>(size);
for (int i = 0; i < size; i++) {
fileDetails.add(File.readFile(in));
}
size = in.readVInt();
reusedFileDetails = new ArrayList<File>(size);
for (int i = 0; i < size; i++) {
reusedFileDetails.add(File.readFile(in));
}
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(startTime);
out.writeVLong(time);
out.writeVInt(totalFileCount);
out.writeVLong(totalByteCount);
out.writeVInt(reusedFileCount);
out.writeVLong(reusedByteCount);
out.writeVInt(recoveredFileCount.get());
out.writeVLong(recoveredByteCount.get());
out.writeVInt(fileDetails.size());
for (File file : fileDetails) {
file.writeTo(out);
}
out.writeVInt(reusedFileDetails.size());
for (File file : reusedFileDetails) {
file.writeTo(out);
}
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
int filesRecovered = recoveredFileCount.get();
long bytesRecovered = recoveredByteCount.get();
builder.startObject(Fields.FILES);
builder.field(Fields.TOTAL, totalFileCount);
builder.field(Fields.REUSED, reusedFileCount);
builder.field(Fields.RECOVERED, filesRecovered);
builder.field(Fields.PERCENT, String.format(Locale.ROOT, "%1.1f%%", percentFilesRecovered(filesRecovered)));
if (detailed) {
builder.startArray(Fields.DETAILS);
for (File file : fileDetails) {
file.toXContent(builder, params);
}
for (File file : reusedFileDetails) {
file.toXContent(builder, params);
}
builder.endArray();
}
builder.endObject();
builder.startObject(Fields.BYTES);
builder.field(Fields.TOTAL, totalByteCount);
builder.field(Fields.REUSED, reusedByteCount);
builder.field(Fields.RECOVERED, bytesRecovered);
builder.field(Fields.PERCENT, String.format(Locale.ROOT, "%1.1f%%", percentBytesRecovered(bytesRecovered)));
builder.endObject();
builder.timeValueField(Fields.TOTAL_TIME_IN_MILLIS, Fields.TOTAL_TIME, time);
return builder;
}
}
}

View File

@ -26,33 +26,26 @@ import org.elasticsearch.index.shard.service.InternalIndexShard;
import org.elasticsearch.index.store.Store;
import java.io.IOException;
import java.util.List;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
/**
*
*/
public class RecoveryStatus {
public static enum Stage {
INIT,
INDEX,
TRANSLOG,
FINALIZE,
DONE
}
final ShardId shardId;
final long recoveryId;
final InternalIndexShard indexShard;
final RecoveryState recoveryState;
public RecoveryStatus(long recoveryId, InternalIndexShard indexShard) {
this.recoveryId = recoveryId;
this.indexShard = indexShard;
this.shardId = indexShard.shardId();
this.recoveryState = new RecoveryState(shardId);
recoveryState.getTimer().startTime(System.currentTimeMillis());
}
volatile Thread recoveryThread;
@ -62,47 +55,18 @@ public class RecoveryStatus {
private volatile ConcurrentMap<String, IndexOutput> openIndexOutputs = ConcurrentCollections.newConcurrentMap();
ConcurrentMap<String, String> checksums = ConcurrentCollections.newConcurrentMap();
final long startTime = System.currentTimeMillis();
long time;
List<String> phase1FileNames;
List<Long> phase1FileSizes;
List<String> phase1ExistingFileNames;
List<Long> phase1ExistingFileSizes;
long phase1TotalSize;
long phase1ExistingTotalSize;
volatile Stage stage = Stage.INIT;
volatile long currentTranslogOperations = 0;
AtomicLong currentFilesSize = new AtomicLong();
public long startTime() {
return startTime;
public RecoveryState recoveryState() {
return recoveryState;
}
public long time() {
return this.time;
public void stage(RecoveryState.Stage stage) {
recoveryState.setStage(stage);
}
public long phase1TotalSize() {
return phase1TotalSize;
public RecoveryState.Stage stage() {
return recoveryState.getStage();
}
public long phase1ExistingTotalSize() {
return phase1ExistingTotalSize;
}
public Stage stage() {
return stage;
}
public long currentTranslogOperations() {
return currentTranslogOperations;
}
public long currentFilesSize() {
return currentFilesSize.get();
}
public boolean isCanceled() {
return canceled;
}

View File

@ -109,14 +109,14 @@ public class RecoveryTarget extends AbstractComponent {
});
}
public RecoveryStatus peerRecoveryStatus(ShardId shardId) {
public RecoveryStatus recoveryStatus(ShardId shardId) {
RecoveryStatus peerRecoveryStatus = findRecoveryByShardId(shardId);
if (peerRecoveryStatus == null) {
return null;
}
// update how long it takes if we are still recovering...
if (peerRecoveryStatus.startTime > 0 && peerRecoveryStatus.stage != RecoveryStatus.Stage.DONE) {
peerRecoveryStatus.time = System.currentTimeMillis() - peerRecoveryStatus.startTime;
if (peerRecoveryStatus.recoveryState().getTimer().startTime() > 0 && peerRecoveryStatus.stage() != RecoveryState.Stage.DONE) {
peerRecoveryStatus.recoveryState().getTimer().time(System.currentTimeMillis() - peerRecoveryStatus.recoveryState().getTimer().startTime());
}
return peerRecoveryStatus;
}
@ -167,6 +167,10 @@ public class RecoveryTarget extends AbstractComponent {
public void run() {
// create a new recovery status, and process...
RecoveryStatus recoveryStatus = new RecoveryStatus(request.recoveryId(), indexShard);
recoveryStatus.recoveryState.setType(request.recoveryType());
recoveryStatus.recoveryState.setSourceNode(request.sourceNode());
recoveryStatus.recoveryState.setTargetNode(request.targetNode());
recoveryStatus.recoveryState.setPrimary(indexShard.routingEntry().primary());
onGoingRecoveries.put(recoveryStatus.recoveryId, recoveryStatus);
doRecovery(request, recoveryStatus, listener);
}
@ -384,9 +388,9 @@ public class RecoveryTarget extends AbstractComponent {
throw new IndexShardClosedException(request.shardId());
}
onGoingRecovery.stage = RecoveryStatus.Stage.TRANSLOG;
onGoingRecovery.indexShard.performRecoveryPrepareForTranslog();
onGoingRecovery.stage(RecoveryState.Stage.TRANSLOG);
onGoingRecovery.recoveryState.getStart().checkIndexTime(onGoingRecovery.indexShard.checkIndexTook());
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}
}
@ -415,10 +419,10 @@ public class RecoveryTarget extends AbstractComponent {
throw new IndexShardClosedException(request.shardId());
}
onGoingRecovery.stage = RecoveryStatus.Stage.FINALIZE;
onGoingRecovery.stage(RecoveryState.Stage.FINALIZE);
onGoingRecovery.indexShard.performRecoveryFinalization(false, onGoingRecovery);
onGoingRecovery.time = System.currentTimeMillis() - onGoingRecovery.startTime;
onGoingRecovery.stage = RecoveryStatus.Stage.DONE;
onGoingRecovery.recoveryState().getTimer().time(System.currentTimeMillis() - onGoingRecovery.recoveryState().getTimer().startTime());
onGoingRecovery.stage(RecoveryState.Stage.DONE);
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}
}
@ -455,7 +459,7 @@ public class RecoveryTarget extends AbstractComponent {
throw new IndexShardClosedException(request.shardId());
}
shard.performRecoveryOperation(operation);
onGoingRecovery.currentTranslogOperations++;
onGoingRecovery.recoveryState.getTranslog().incrementTranslogOperations();
}
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}
@ -485,13 +489,12 @@ public class RecoveryTarget extends AbstractComponent {
throw new IndexShardClosedException(request.shardId());
}
onGoingRecovery.phase1FileNames = request.phase1FileNames;
onGoingRecovery.phase1FileSizes = request.phase1FileSizes;
onGoingRecovery.phase1ExistingFileNames = request.phase1ExistingFileNames;
onGoingRecovery.phase1ExistingFileSizes = request.phase1ExistingFileSizes;
onGoingRecovery.phase1TotalSize = request.phase1TotalSize;
onGoingRecovery.phase1ExistingTotalSize = request.phase1ExistingTotalSize;
onGoingRecovery.stage = RecoveryStatus.Stage.INDEX;
onGoingRecovery.recoveryState().getIndex().addFileDetails(request.phase1FileNames, request.phase1FileSizes);
onGoingRecovery.recoveryState().getIndex().addReusedFileDetails(request.phase1ExistingFileNames, request.phase1ExistingFileSizes);
onGoingRecovery.recoveryState().getIndex().totalByteCount(request.phase1TotalSize);
onGoingRecovery.recoveryState().getIndex().reusedByteCount(request.phase1ExistingTotalSize);
onGoingRecovery.recoveryState().getIndex().totalFileCount(request.phase1FileNames.size());
onGoingRecovery.stage(RecoveryState.Stage.INDEX);
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}
}
@ -524,7 +527,7 @@ public class RecoveryTarget extends AbstractComponent {
// first, we go and move files that were created with the recovery id suffix to
// the actual names, its ok if we have a corrupted index here, since we have replicas
// to recover from in case of a full cluster shutdown just when this code executes...
String prefix = "recovery." + onGoingRecovery.startTime + ".";
String prefix = "recovery." + onGoingRecovery.recoveryState().getTimer().startTime() + ".";
Set<String> filesToRename = Sets.newHashSet();
for (String existingFile : store.directory().listAll()) {
if (existingFile.startsWith(prefix)) {
@ -566,7 +569,6 @@ public class RecoveryTarget extends AbstractComponent {
class FileChunkTransportRequestHandler extends BaseTransportRequestHandler<RecoveryFileChunkRequest> {
@Override
public RecoveryFileChunkRequest newInstance() {
return new RecoveryFileChunkRequest();
@ -607,7 +609,7 @@ public class RecoveryTarget extends AbstractComponent {
String fileName = request.name();
if (store.directory().fileExists(fileName)) {
fileName = "recovery." + onGoingRecovery.startTime + "." + fileName;
fileName = "recovery." + onGoingRecovery.recoveryState().getTimer().startTime() + "." + fileName;
}
indexOutput = onGoingRecovery.openAndPutIndexOutput(request.name(), fileName, store);
} else {
@ -628,7 +630,11 @@ public class RecoveryTarget extends AbstractComponent {
content = content.toBytesArray();
}
indexOutput.writeBytes(content.array(), content.arrayOffset(), content.length());
onGoingRecovery.currentFilesSize.addAndGet(request.length());
onGoingRecovery.recoveryState.getIndex().addRecoveredByteCount(request.length());
RecoveryState.File file = onGoingRecovery.recoveryState.getIndex().file(request.name());
if (file != null) {
file.updateRecovered(request.length());
}
if (indexOutput.getFilePointer() == request.length()) {
// we are done
indexOutput.close();
@ -638,8 +644,8 @@ public class RecoveryTarget extends AbstractComponent {
}
store.directory().sync(Collections.singleton(request.name()));
IndexOutput remove = onGoingRecovery.removeOpenIndexOutputs(request.name());
onGoingRecovery.recoveryState.getIndex().addRecoveredFileCount(1);
assert remove == indexOutput;
}
success = true;
} finally {

View File

@ -50,6 +50,8 @@ public class StartRecoveryRequest extends TransportRequest {
private Map<String, StoreFileMetaData> existingFiles;
private RecoveryState.Type recoveryType;
StartRecoveryRequest() {
}
@ -62,13 +64,15 @@ public class StartRecoveryRequest extends TransportRequest {
* @param markAsRelocated
* @param existingFiles
*/
public StartRecoveryRequest(ShardId shardId, DiscoveryNode sourceNode, DiscoveryNode targetNode, boolean markAsRelocated, Map<String, StoreFileMetaData> existingFiles) {
public StartRecoveryRequest(ShardId shardId, DiscoveryNode sourceNode, DiscoveryNode targetNode, boolean markAsRelocated, Map<String,
StoreFileMetaData> existingFiles, RecoveryState.Type recoveryType) {
this.recoveryId = recoveryIdGenerator.incrementAndGet();
this.shardId = shardId;
this.sourceNode = sourceNode;
this.targetNode = targetNode;
this.markAsRelocated = markAsRelocated;
this.existingFiles = existingFiles;
this.recoveryType = recoveryType;
}
public long recoveryId() {
@ -95,6 +99,10 @@ public class StartRecoveryRequest extends TransportRequest {
return existingFiles;
}
public RecoveryState.Type recoveryType() {
return recoveryType;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);

View File

@ -79,6 +79,7 @@ import org.elasticsearch.rest.action.admin.indices.validate.query.RestValidateQu
import org.elasticsearch.rest.action.admin.indices.warmer.delete.RestDeleteWarmerAction;
import org.elasticsearch.rest.action.admin.indices.warmer.get.RestGetWarmerAction;
import org.elasticsearch.rest.action.admin.indices.warmer.put.RestPutWarmerAction;
import org.elasticsearch.rest.action.admin.indices.recovery.RestRecoveryAction;
import org.elasticsearch.rest.action.bulk.RestBulkAction;
import org.elasticsearch.rest.action.cat.*;
import org.elasticsearch.rest.action.delete.RestDeleteAction;
@ -212,6 +213,8 @@ public class RestActionModule extends AbstractModule {
bind(RestExplainAction.class).asEagerSingleton();
bind(RestRecoveryAction.class).asEagerSingleton();
// cat API
Multibinder<AbstractCatAction> catActionMultibinder = Multibinder.newSetBinder(binder(), AbstractCatAction.class);
catActionMultibinder.addBinding().to(RestAllocationAction.class).asEagerSingleton();
@ -222,7 +225,8 @@ public class RestActionModule extends AbstractModule {
catActionMultibinder.addBinding().to(RestSegmentsAction.class).asEagerSingleton();
// Fully qualified to prevent interference with rest.action.count.RestCountAction
catActionMultibinder.addBinding().to(org.elasticsearch.rest.action.cat.RestCountAction.class).asEagerSingleton();
catActionMultibinder.addBinding().to(RestRecoveryAction.class).asEagerSingleton();
// Fully qualified to prevent interference with rest.action.indices.RestRecoveryAction
catActionMultibinder.addBinding().to(org.elasticsearch.rest.action.cat.RestRecoveryAction.class).asEagerSingleton();
catActionMultibinder.addBinding().to(RestHealthAction.class).asEagerSingleton();
catActionMultibinder.addBinding().to(org.elasticsearch.rest.action.cat.RestPendingClusterTasksAction.class).asEagerSingleton();
catActionMultibinder.addBinding().to(RestAliasAction.class).asEagerSingleton();

View File

@ -0,0 +1,92 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.rest.action.admin.indices.recovery;
import java.io.IOException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.recovery.RecoveryRequest;
import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.rest.*;
import org.elasticsearch.rest.action.support.RestXContentBuilder;
import static org.elasticsearch.rest.RestRequest.Method.GET;
import static org.elasticsearch.rest.RestStatus.OK;
import static org.elasticsearch.rest.action.support.RestActions.buildBroadcastShardsHeader;
/**
* REST handler to report on index recoveries.
*/
public class RestRecoveryAction extends BaseRestHandler {
@Inject
public RestRecoveryAction(Settings settings, Client client, RestController controller) {
super(settings, client);
controller.registerHandler(GET, "/_recovery", this);
controller.registerHandler(GET, "/{index}/_recovery", this);
}
@Override
public void handleRequest(final RestRequest request, final RestChannel channel) {
final RecoveryRequest recoveryRequest = new RecoveryRequest(Strings.splitStringByCommaToArray(request.param("index")));
recoveryRequest.detailed(request.paramAsBoolean("detailed", false));
recoveryRequest.activeOnly(request.paramAsBoolean("active_only", false));
recoveryRequest.listenerThreaded(false);
recoveryRequest.indicesOptions(IndicesOptions.fromRequest(request, recoveryRequest.indicesOptions()));
client.admin().indices().recoveries(recoveryRequest, new ActionListener<RecoveryResponse>() {
@Override
public void onResponse(RecoveryResponse response) {
try {
XContentBuilder builder = RestXContentBuilder.restContentBuilder(request);
if (response.hasRecoveries()) {
response.detailed(recoveryRequest.detailed());
builder.startObject();
response.toXContent(builder, request);
builder.endObject();
}
channel.sendResponse(new XContentRestResponse(request, OK, builder));
} catch (Throwable e) {
onFailure(e);
}
}
@Override
public void onFailure(Throwable e) {
try {
channel.sendResponse(new XContentThrowableRestResponse(request, e));
} catch (IOException ioe) {
logger.error("Failed to send failure response", ioe);
}
}
});
}
}

View File

@ -20,25 +20,24 @@
package org.elasticsearch.rest.action.cat;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.admin.indices.status.IndicesStatusRequest;
import org.elasticsearch.action.admin.indices.status.IndicesStatusResponse;
import org.elasticsearch.action.admin.indices.status.ShardStatus;
import org.elasticsearch.action.support.broadcast.BroadcastOperationThreading;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.action.admin.indices.recovery.RecoveryRequest;
import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse;
import org.elasticsearch.action.admin.indices.recovery.ShardRecoveryResponse;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.Table;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.XContentThrowableRestResponse;
import org.elasticsearch.rest.action.support.RestTable;
import org.apache.lucene.util.CollectionUtil;
import java.io.IOException;
import java.util.*;
@ -66,57 +65,25 @@ public class RestRecoveryAction extends AbstractCatAction {
@Override
public void doRequest(final RestRequest request, final RestChannel channel) {
final String[] indices = Strings.splitStringByCommaToArray(request.param("index"));
final ClusterStateRequest clusterStateRequest = new ClusterStateRequest();
clusterStateRequest.clear().nodes(true);
clusterStateRequest.local(request.paramAsBoolean("local", clusterStateRequest.local()));
clusterStateRequest.masterNodeTimeout(request.paramAsTime("master_timeout", clusterStateRequest.masterNodeTimeout()));
final RecoveryRequest recoveryRequest = new RecoveryRequest(Strings.splitStringByCommaToArray(request.param("index")));
recoveryRequest.detailed(request.paramAsBoolean("detailed", false));
recoveryRequest.activeOnly(request.paramAsBoolean("active_only", false));
recoveryRequest.listenerThreaded(false);
recoveryRequest.indicesOptions(IndicesOptions.fromRequest(request, recoveryRequest.indicesOptions()));
client.admin().indices().recoveries(recoveryRequest, new ActionListener<RecoveryResponse>() {
client.admin().cluster().state(clusterStateRequest, new ActionListener<ClusterStateResponse>() {
@Override
public void onResponse(final ClusterStateResponse clusterStateResponse) {
IndicesStatusRequest indicesStatusRequest = new IndicesStatusRequest(indices);
indicesStatusRequest.recovery(true);
indicesStatusRequest.operationThreading(BroadcastOperationThreading.SINGLE_THREAD);
client.admin().indices().status(indicesStatusRequest, new ActionListener<IndicesStatusResponse>() {
@Override
public void onResponse(IndicesStatusResponse indicesStatusResponse) {
Map<String, Long> primarySizes = new HashMap<String, Long>();
Set<ShardStatus> replicas = new HashSet<ShardStatus>();
// Loop through all the shards in the index status, keeping
// track of the primary shard size with a Map and the
// recovering shards in a Set of ShardStatus objects
for (ShardStatus shardStatus : indicesStatusResponse.getShards()) {
if (shardStatus.getShardRouting().primary()) {
primarySizes.put(shardStatus.getShardRouting().getIndex() + shardStatus.getShardRouting().getId(),
shardStatus.getStoreSize().bytes());
} else if (shardStatus.getState() == IndexShardState.RECOVERING) {
replicas.add(shardStatus);
}
}
try {
channel.sendResponse(RestTable.buildResponse(buildRecoveryTable(request, clusterStateResponse, primarySizes, replicas), request, channel));
} catch (Throwable e) {
try {
channel.sendResponse(new XContentThrowableRestResponse(request, e));
} catch (IOException e2) {
logger.error("Unable to send recovery status response", e2);
}
}
public void onResponse(final RecoveryResponse response) {
try {
channel.sendResponse(RestTable.buildResponse(buildRecoveryTable(request, response), request, channel));
} catch (Throwable e) {
try {
channel.sendResponse(new XContentThrowableRestResponse(request, e));
} catch (IOException e2) {
logger.error("Unable to send recovery status response", e2);
}
@Override
public void onFailure(Throwable e) {
try {
channel.sendResponse(new XContentThrowableRestResponse(request, e));
} catch (IOException e1) {
logger.error("Failed to send failure response", e1);
}
}
});
}
}
@Override
@ -128,20 +95,25 @@ public class RestRecoveryAction extends AbstractCatAction {
}
}
});
}
@Override
Table getTableWithHeader(RestRequest request) {
Table t = new Table();
t.startHeaders().addCell("index", "alias:i,idx;desc:index name")
t.startHeaders()
.addCell("index", "alias:i,idx;desc:index name")
.addCell("shard", "alias:s,sh;desc:shard name")
.addCell("target", "alias:t;text-align:right;desc:bytes of source shard")
.addCell("recovered", "alias:r;text-align:right;desc:bytes recovered so far")
.addCell("percent", "alias:per,ratio;text-align:right;desc:percent recovered so far")
.addCell("host", "alias:h;desc:node host where source shard lives")
.addCell("ip", "desc:node ip where source shard lives")
.addCell("node", "alias:n;desc:node name where source shard lives")
.addCell("time", "alias:t,ti;desc:recovery time")
.addCell("type", "alias:ty;desc:recovery type")
.addCell("stage", "alias:st;desc:recovery stage")
.addCell("source_host", "alias:shost;desc:source host")
.addCell("target_host", "alias:thost;desc:target host")
.addCell("repository", "alias:rep;desc:repository")
.addCell("snapshot", "alias:snap;desc:snapshot")
.addCell("files", "alias:f;desc:number of files")
.addCell("files_percent", "alias:fp;desc:percent of files recovered")
.addCell("bytes", "alias:b;desc:size in bytes")
.addCell("bytes_percent", "alias:bp;desc:percent of bytes recovered")
.endHeaders();
return t;
}
@ -150,32 +122,62 @@ public class RestRecoveryAction extends AbstractCatAction {
* buildRecoveryTable will build a table of recovery information suitable
* for displaying at the command line.
*
* @param request
* @param state Current cluster state.
* @param primarySizes A Map of {@code index + shardId} strings to store size for all primary shards.
* @param recoveringReplicas A Set of {@link org.elasticsearch.action.admin.indices.status.ShardStatus} objects for each recovering replica to be displayed.
* @param request A Rest request
* @param response A recovery status response
* @return A table containing index, shardId, node, target size, recovered size and percentage for each recovering replica
*/
public Table buildRecoveryTable(RestRequest request, ClusterStateResponse state, Map<String, Long> primarySizes, Set<ShardStatus> recoveringReplicas) {
Table t = getTableWithHeader(request);
for (ShardStatus status : recoveringReplicas) {
DiscoveryNode node = state.getState().nodes().get(status.getShardRouting().currentNodeId());
public Table buildRecoveryTable(RestRequest request, RecoveryResponse response) {
String index = status.getShardRouting().getIndex();
int id = status.getShardId();
long replicaSize = status.getStoreSize().bytes();
Long primarySize = primarySizes.get(index + id);
t.startRow();
t.addCell(index);
t.addCell(id);
t.addCell(primarySize);
t.addCell(replicaSize);
t.addCell(primarySize == null ? null : String.format(Locale.ROOT, "%1.1f%%", 100.0 * (float) replicaSize / primarySize));
t.addCell(node == null ? null : node.getHostName());
t.addCell(node == null ? null : node.getHostAddress());
t.addCell(node == null ? null : node.name());
t.endRow();
Table t = getTableWithHeader(request);
for (String index : response.shardResponses().keySet()) {
List<ShardRecoveryResponse> shardRecoveryResponses = response.shardResponses().get(index);
if (shardRecoveryResponses.size() == 0) {
continue;
}
// Sort ascending by shard id for readability
CollectionUtil.introSort(shardRecoveryResponses, new Comparator<ShardRecoveryResponse>() {
@Override
public int compare(ShardRecoveryResponse o1, ShardRecoveryResponse o2) {
int id1 = o1.recoveryState().getShardId().id();
int id2 = o2.recoveryState().getShardId().id();
if (id1 < id2) {
return -1;
} else if (id1 > id2) {
return 1;
} else {
return 0;
}
}
});
for (ShardRecoveryResponse shardResponse : shardRecoveryResponses) {
RecoveryState state = shardResponse.recoveryState();
int filesRecovered = state.getIndex().recoveredFileCount();
long bytesRecovered = state.getIndex().recoveredByteCount();
t.startRow();
t.addCell(index);
t.addCell(shardResponse.getShardId());
t.addCell(state.getTimer().time());
t.addCell(state.getType().toString().toLowerCase(Locale.ROOT));
t.addCell(state.getStage().toString().toLowerCase(Locale.ROOT));
t.addCell(state.getSourceNode() == null ? "n/a" : state.getSourceNode().getHostName());
t.addCell(state.getTargetNode().getHostName());
t.addCell(state.getRestoreSource() == null ? "n/a" : state.getRestoreSource().snapshotId().getRepository());
t.addCell(state.getRestoreSource() == null ? "n/a" : state.getRestoreSource().snapshotId().getSnapshot());
t.addCell(state.getIndex().totalFileCount());
t.addCell(String.format(Locale.ROOT, "%1.1f%%", state.getIndex().percentFilesRecovered(filesRecovered)));
t.addCell(state.getIndex().totalByteCount());
t.addCell(String.format(Locale.ROOT, "%1.1f%%", state.getIndex().percentBytesRecovered(bytesRecovered)));
t.endRow();
}
}
return t;
}
}

View File

@ -0,0 +1,270 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.indices.recovery;
import com.carrotsearch.randomizedtesting.LifecycleScope;
import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryResponse;
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse;
import org.elasticsearch.action.admin.indices.recovery.ShardRecoveryResponse;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.snapshots.SnapshotState;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope;
import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
/**
*
*/
@ClusterScope(scope = ElasticsearchIntegrationTest.Scope.TEST, numNodes = 0)
public class IndexRecoveryTests extends ElasticsearchIntegrationTest {
private static final String INDEX_NAME = "test-idx-1";
private static final String REPO_NAME = "test-repo-1";
private static final String SNAP_NAME = "test-snap-1";
private static final int DOC_COUNT = 100;
private static final int SHARD_COUNT = 1;
private static final int REPLICA_COUNT = 0;
@Test
public void gatewayRecoveryTest() throws Exception {
logger.info("--> start nodes");
String node = cluster().startNode(settingsBuilder().put("gateway.type", "local"));
createAndPopulateIndex(INDEX_NAME, 1, SHARD_COUNT, REPLICA_COUNT);
logger.info("--> restarting cluster");
cluster().fullRestart();
ensureGreen();
logger.info("--> request recoveries");
RecoveryResponse response = client().admin().indices().prepareRecoveries(INDEX_NAME).execute().actionGet();
assertThat(response.shardResponses().size(), equalTo(SHARD_COUNT));
assertThat(response.shardResponses().get(INDEX_NAME).size(), equalTo(1));
List<ShardRecoveryResponse> shardResponses = response.shardResponses().get(INDEX_NAME);
assertThat(shardResponses.size(), equalTo(1));
ShardRecoveryResponse shardResponse = shardResponses.get(0);
RecoveryState state = shardResponse.recoveryState();
assertThat(state.getType(), equalTo(RecoveryState.Type.GATEWAY));
assertThat(state.getStage(), equalTo(RecoveryState.Stage.DONE));
assertThat(node, equalTo(state.getSourceNode().getName()));
assertThat(node, equalTo(state.getTargetNode().getName()));
assertNull(state.getRestoreSource());
}
@Test
public void gatewayRecoveryTestActiveOnly() throws Exception {
logger.info("--> start nodes");
cluster().startNode(settingsBuilder().put("gateway.type", "local"));
createAndPopulateIndex(INDEX_NAME, 1, SHARD_COUNT, REPLICA_COUNT);
logger.info("--> restarting cluster");
cluster().fullRestart();
ensureGreen();
logger.info("--> request recoveries");
RecoveryResponse response = client().admin().indices().prepareRecoveries(INDEX_NAME).setActiveOnly(true).execute().actionGet();
List<ShardRecoveryResponse> shardResponses = response.shardResponses().get(INDEX_NAME);
assertThat(shardResponses.size(), equalTo(0)); // Should not expect any responses back
}
@Test
public void replicaRecoveryTest() throws Exception {
logger.info("--> start node A");
String nodeA = cluster().startNode(settingsBuilder().put("gateway.type", "local"));
logger.info("--> create index on node: {}", nodeA);
createAndPopulateIndex(INDEX_NAME, 1, SHARD_COUNT, REPLICA_COUNT);
logger.info("--> start node B");
String nodeB = cluster().startNode(settingsBuilder().put("gateway.type", "local"));
ensureGreen();
// force a shard recovery from nodeA to nodeB
logger.info("--> bump replica count");
client().admin().indices().prepareUpdateSettings(INDEX_NAME)
.setSettings(settingsBuilder().put("number_of_replicas", 1)).execute().actionGet();
ensureGreen();
logger.info("--> request recoveries");
RecoveryResponse response = client().admin().indices().prepareRecoveries(INDEX_NAME).execute().actionGet();
// we should now have two total shards, one primary and one replica
List<ShardRecoveryResponse> shardResponses = response.shardResponses().get(INDEX_NAME);
assertThat(shardResponses.size(), equalTo(2));
List<ShardRecoveryResponse> nodeAResponses = findRecoveriesForTargetNode(nodeA, shardResponses);
assertThat(nodeAResponses.size(), equalTo(1));
List<ShardRecoveryResponse> nodeBResponses = findRecoveriesForTargetNode(nodeB, shardResponses);
assertThat(nodeBResponses.size(), equalTo(1));
// validate node A recovery
ShardRecoveryResponse nodeAShardResponse = nodeAResponses.get(0);
assertThat(nodeAShardResponse.recoveryState().getShardId().id(), equalTo(0));
assertThat(nodeAShardResponse.recoveryState().getSourceNode().getName(), equalTo(nodeA));
assertThat(nodeAShardResponse.recoveryState().getTargetNode().getName(), equalTo(nodeA));
assertThat(nodeAShardResponse.recoveryState().getType(), equalTo(RecoveryState.Type.GATEWAY));
assertThat(nodeAShardResponse.recoveryState().getStage(), equalTo(RecoveryState.Stage.DONE));
// validate node B recovery
ShardRecoveryResponse nodeBShardResponse = nodeBResponses.get(0);
assertThat(nodeBShardResponse.recoveryState().getShardId().id(), equalTo(0));
assertThat(nodeBShardResponse.recoveryState().getSourceNode().getName(), equalTo(nodeA));
assertThat(nodeBShardResponse.recoveryState().getTargetNode().getName(), equalTo(nodeB));
assertThat(nodeBShardResponse.recoveryState().getType(), equalTo(RecoveryState.Type.REPLICA));
assertThat(nodeBShardResponse.recoveryState().getStage(), equalTo(RecoveryState.Stage.DONE));
}
@Test
public void rerouteRecoveryTest() throws Exception {
logger.info("--> start node A");
String nodeA = cluster().startNode(settingsBuilder().put("gateway.type", "local"));
logger.info("--> create index on node: {}", nodeA);
createAndPopulateIndex(INDEX_NAME, 1, SHARD_COUNT, REPLICA_COUNT);
logger.info("--> start node B");
String nodeB = cluster().startNode(settingsBuilder().put("gateway.type", "local"));
ensureGreen();
logger.info("--> move shard from: {} to: {}", nodeA, nodeB);
client().admin().cluster().prepareReroute()
.add(new MoveAllocationCommand(new ShardId(INDEX_NAME, 0), nodeA, nodeB))
.execute().actionGet().getState();
ensureGreen();
logger.info("--> request recoveries");
RecoveryResponse response = client().admin().indices().prepareRecoveries(INDEX_NAME).execute().actionGet();
List<ShardRecoveryResponse> shardResponses = response.shardResponses().get(INDEX_NAME);
assertThat(shardResponses.size(), equalTo(1));
ShardRecoveryResponse shardResponse = shardResponses.get(0);
RecoveryState state = shardResponse.recoveryState();
assertThat(state.getType(), equalTo(RecoveryState.Type.RELOCATION));
assertThat(state.getStage(), equalTo(RecoveryState.Stage.DONE));
assertThat(nodeA, equalTo(state.getSourceNode().getName()));
assertThat(nodeB, equalTo(state.getTargetNode().getName()));
assertNull(state.getRestoreSource());
}
@Test
public void snapshotRecoveryTest() throws Exception {
logger.info("--> start node A");
String nodeA = cluster().startNode(settingsBuilder().put("gateway.type", "local"));
logger.info("--> create repository");
assertAcked(client().admin().cluster().preparePutRepository(REPO_NAME)
.setType("fs").setSettings(ImmutableSettings.settingsBuilder()
.put("location", newTempDir(LifecycleScope.SUITE))
.put("compress", false)
).get());
ensureGreen();
logger.info("--> create index on node: {}", nodeA);
createAndPopulateIndex(INDEX_NAME, 1, SHARD_COUNT, REPLICA_COUNT);
logger.info("--> snapshot");
CreateSnapshotResponse createSnapshotResponse = client().admin().cluster().prepareCreateSnapshot(REPO_NAME, SNAP_NAME)
.setWaitForCompletion(true).setIndices(INDEX_NAME).get();
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0));
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), equalTo(createSnapshotResponse.getSnapshotInfo().totalShards()));
assertThat(client().admin().cluster().prepareGetSnapshots(REPO_NAME).setSnapshots(SNAP_NAME).get()
.getSnapshots().get(0).state(), equalTo(SnapshotState.SUCCESS));
client().admin().indices().prepareClose(INDEX_NAME).execute().actionGet();
logger.info("--> restore");
RestoreSnapshotResponse restoreSnapshotResponse = client().admin().cluster()
.prepareRestoreSnapshot(REPO_NAME, SNAP_NAME).setWaitForCompletion(true).execute().actionGet();
int totalShards = restoreSnapshotResponse.getRestoreInfo().totalShards();
assertThat(totalShards, greaterThan(0));
ensureGreen();
logger.info("--> request recoveries");
RecoveryResponse response = client().admin().indices().prepareRecoveries(INDEX_NAME).execute().actionGet();
for (Map.Entry<String, List<ShardRecoveryResponse>> shardRecoveryResponse : response.shardResponses().entrySet()) {
assertThat(shardRecoveryResponse.getKey(), equalTo(INDEX_NAME));
List<ShardRecoveryResponse> shardRecoveryResponses = shardRecoveryResponse.getValue();
assertThat(shardRecoveryResponses.size(), equalTo(totalShards));
for (ShardRecoveryResponse shardResponse : shardRecoveryResponses) {
assertThat(shardResponse.recoveryState().getType(), equalTo(RecoveryState.Type.SNAPSHOT));
assertThat(shardResponse.recoveryState().getStage(), equalTo(RecoveryState.Stage.DONE));
assertNotNull(shardResponse.recoveryState().getRestoreSource());
assertThat(shardResponse.recoveryState().getTargetNode().getName(), equalTo(nodeA));
}
}
}
private List<ShardRecoveryResponse> findRecoveriesForTargetNode(String nodeName, List<ShardRecoveryResponse> responses) {
List<ShardRecoveryResponse> nodeResponses = new ArrayList<ShardRecoveryResponse>();
for (ShardRecoveryResponse response : responses) {
if (response.recoveryState().getTargetNode().getName().equals(nodeName)) {
nodeResponses.add(response);
}
}
return nodeResponses;
}
private IndicesStatsResponse createAndPopulateIndex(String name, int nodeCount, int shardCount, int replicaCount) {
logger.info("--> creating test index: {}", name);
assertAcked(prepareCreate(name, nodeCount, settingsBuilder().put("number_of_shards", shardCount)
.put("number_of_replicas", replicaCount)));
ensureGreen();
logger.info("--> indexing sample data");
for (int i = 0; i < DOC_COUNT; i++) {
index(INDEX_NAME, "x", Integer.toString(i), "foo-" + i, "bar-" + i);
}
refresh();
assertThat(client().prepareCount(INDEX_NAME).get().getCount(), equalTo((long) DOC_COUNT));
return client().admin().indices().prepareStats(INDEX_NAME).execute().actionGet();
}
}