simplify gateway snapshot operation
This commit is contained in:
parent
19ddee5ec9
commit
e5b041c8ef
|
@ -36,8 +36,6 @@ import org.elasticsearch.action.admin.indices.create.TransportCreateIndexAction;
|
|||
import org.elasticsearch.action.admin.indices.delete.TransportDeleteIndexAction;
|
||||
import org.elasticsearch.action.admin.indices.flush.TransportFlushAction;
|
||||
import org.elasticsearch.action.admin.indices.gateway.snapshot.TransportGatewaySnapshotAction;
|
||||
import org.elasticsearch.action.admin.indices.gateway.snapshot.TransportIndexGatewaySnapshotAction;
|
||||
import org.elasticsearch.action.admin.indices.gateway.snapshot.TransportShardGatewaySnapshotAction;
|
||||
import org.elasticsearch.action.admin.indices.mapping.put.TransportPutMappingAction;
|
||||
import org.elasticsearch.action.admin.indices.optimize.TransportOptimizeAction;
|
||||
import org.elasticsearch.action.admin.indices.refresh.TransportRefreshAction;
|
||||
|
@ -83,8 +81,6 @@ public class TransportActionModule extends AbstractModule {
|
|||
bind(TransportIndicesAliasesAction.class).asEagerSingleton();
|
||||
bind(TransportUpdateSettingsAction.class).asEagerSingleton();
|
||||
|
||||
bind(TransportShardGatewaySnapshotAction.class).asEagerSingleton();
|
||||
bind(TransportIndexGatewaySnapshotAction.class).asEagerSingleton();
|
||||
bind(TransportGatewaySnapshotAction.class).asEagerSingleton();
|
||||
|
||||
bind(TransportRefreshAction.class).asEagerSingleton();
|
||||
|
|
|
@ -19,8 +19,7 @@
|
|||
|
||||
package org.elasticsearch.action.admin.indices.gateway.snapshot;
|
||||
|
||||
import org.elasticsearch.action.support.replication.IndicesReplicationOperationRequest;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.action.support.broadcast.BroadcastOperationRequest;
|
||||
|
||||
/**
|
||||
* Gateway snapshot allows to explicitly perform a snapshot through the gateway of one or more indices (backup them).
|
||||
|
@ -32,7 +31,7 @@ import org.elasticsearch.common.unit.TimeValue;
|
|||
* @see org.elasticsearch.client.IndicesAdminClient#gatewaySnapshot(GatewaySnapshotRequest)
|
||||
* @see GatewaySnapshotResponse
|
||||
*/
|
||||
public class GatewaySnapshotRequest extends IndicesReplicationOperationRequest {
|
||||
public class GatewaySnapshotRequest extends BroadcastOperationRequest {
|
||||
|
||||
GatewaySnapshotRequest() {
|
||||
|
||||
|
@ -53,13 +52,4 @@ public class GatewaySnapshotRequest extends IndicesReplicationOperationRequest {
|
|||
super.listenerThreaded(threadedListener);
|
||||
return this;
|
||||
}
|
||||
|
||||
public GatewaySnapshotRequest timeout(TimeValue timeout) {
|
||||
this.timeout = timeout;
|
||||
return this;
|
||||
}
|
||||
|
||||
public GatewaySnapshotRequest timeout(String timeout) {
|
||||
return timeout(TimeValue.parseTimeValue(timeout, null));
|
||||
}
|
||||
}
|
|
@ -19,67 +19,34 @@
|
|||
|
||||
package org.elasticsearch.action.admin.indices.gateway.snapshot;
|
||||
|
||||
import org.elasticsearch.action.ActionResponse;
|
||||
import org.elasticsearch.action.ShardOperationFailedException;
|
||||
import org.elasticsearch.action.support.broadcast.BroadcastOperationResponse;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.io.stream.Streamable;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.Map;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Reponse for the gateway snapshot action.
|
||||
* Response for the gateway snapshot action.
|
||||
*
|
||||
* @author kimchy (shay.banon)
|
||||
*/
|
||||
public class GatewaySnapshotResponse implements ActionResponse, Streamable, Iterable<IndexGatewaySnapshotResponse> {
|
||||
|
||||
private Map<String, IndexGatewaySnapshotResponse> indices = new HashMap<String, IndexGatewaySnapshotResponse>();
|
||||
public class GatewaySnapshotResponse extends BroadcastOperationResponse {
|
||||
|
||||
GatewaySnapshotResponse() {
|
||||
|
||||
}
|
||||
|
||||
@Override public Iterator<IndexGatewaySnapshotResponse> iterator() {
|
||||
return indices.values().iterator();
|
||||
}
|
||||
|
||||
/**
|
||||
* A map of index level responses of the gateway snapshot operation.
|
||||
*/
|
||||
public Map<String, IndexGatewaySnapshotResponse> indices() {
|
||||
return indices;
|
||||
}
|
||||
|
||||
/**
|
||||
* A map of index level responses of the gateway snapshot operation.
|
||||
*/
|
||||
public Map<String, IndexGatewaySnapshotResponse> getIndices() {
|
||||
return indices();
|
||||
}
|
||||
|
||||
/**
|
||||
* The index level gateway snapshot response for the given index.
|
||||
*/
|
||||
public IndexGatewaySnapshotResponse index(String index) {
|
||||
return indices.get(index);
|
||||
GatewaySnapshotResponse(int totalShards, int successfulShards, int failedShards, List<ShardOperationFailedException> shardFailures) {
|
||||
super(totalShards, successfulShards, failedShards, shardFailures);
|
||||
}
|
||||
|
||||
@Override public void readFrom(StreamInput in) throws IOException {
|
||||
int size = in.readVInt();
|
||||
for (int i = 0; i < size; i++) {
|
||||
IndexGatewaySnapshotResponse response = new IndexGatewaySnapshotResponse();
|
||||
response.readFrom(in);
|
||||
indices.put(response.index(), response);
|
||||
}
|
||||
super.readFrom(in);
|
||||
}
|
||||
|
||||
@Override public void writeTo(StreamOutput out) throws IOException {
|
||||
out.writeVInt(indices.size());
|
||||
for (IndexGatewaySnapshotResponse indexGatewaySnapshotResponse : indices.values()) {
|
||||
indexGatewaySnapshotResponse.writeTo(out);
|
||||
}
|
||||
super.writeTo(out);
|
||||
}
|
||||
}
|
|
@ -1,58 +0,0 @@
|
|||
/*
|
||||
* Licensed to Elastic Search and Shay Banon under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Elastic Search licenses this
|
||||
* file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.action.admin.indices.gateway.snapshot;
|
||||
|
||||
import org.elasticsearch.action.support.replication.IndexReplicationOperationRequest;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* @author kimchy (shay.banon)
|
||||
*/
|
||||
public class IndexGatewaySnapshotRequest extends IndexReplicationOperationRequest {
|
||||
|
||||
IndexGatewaySnapshotRequest(String index) {
|
||||
this.index = index;
|
||||
}
|
||||
|
||||
IndexGatewaySnapshotRequest(GatewaySnapshotRequest request, String index) {
|
||||
this.index = index;
|
||||
this.timeout = request.timeout();
|
||||
}
|
||||
|
||||
IndexGatewaySnapshotRequest() {
|
||||
}
|
||||
|
||||
public IndexGatewaySnapshotRequest timeout(TimeValue timeout) {
|
||||
this.timeout = timeout;
|
||||
return this;
|
||||
}
|
||||
|
||||
public void readFrom(StreamInput in) throws IOException {
|
||||
super.readFrom(in);
|
||||
}
|
||||
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
super.writeTo(out);
|
||||
}
|
||||
}
|
|
@ -1,119 +0,0 @@
|
|||
/*
|
||||
* Licensed to Elastic Search and Shay Banon under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Elastic Search licenses this
|
||||
* file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.action.admin.indices.gateway.snapshot;
|
||||
|
||||
import org.elasticsearch.action.ActionResponse;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.io.stream.Streamable;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* An index level gateway snapshot response.
|
||||
*
|
||||
* @author kimchy (shay.banon)
|
||||
*/
|
||||
public class IndexGatewaySnapshotResponse implements ActionResponse, Streamable {
|
||||
|
||||
private String index;
|
||||
|
||||
private int successfulShards;
|
||||
|
||||
private int failedShards;
|
||||
|
||||
IndexGatewaySnapshotResponse(String index, int successfulShards, int failedShards) {
|
||||
this.index = index;
|
||||
this.successfulShards = successfulShards;
|
||||
this.failedShards = failedShards;
|
||||
}
|
||||
|
||||
IndexGatewaySnapshotResponse() {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* The index the gateway snapshot has performed on.
|
||||
*/
|
||||
public String index() {
|
||||
return index;
|
||||
}
|
||||
|
||||
/**
|
||||
* The index the gateway snapshot has performed on.
|
||||
*/
|
||||
public String getIndex() {
|
||||
return index();
|
||||
}
|
||||
|
||||
/**
|
||||
* The number of successful shards the gateway snapshot operation was performed on.
|
||||
*/
|
||||
public int successfulShards() {
|
||||
return successfulShards;
|
||||
}
|
||||
|
||||
/**
|
||||
* The number of successful shards the gateway snapshot operation was performed on.
|
||||
*/
|
||||
public int getSuccessfulShards() {
|
||||
return successfulShards();
|
||||
}
|
||||
|
||||
/**
|
||||
* The number of failed shards the gateway snapshot operation was performed on.
|
||||
*/
|
||||
public int failedShards() {
|
||||
return failedShards;
|
||||
}
|
||||
|
||||
/**
|
||||
* The number of failed shards the gateway snapshot operation was performed on.
|
||||
*/
|
||||
public int getFailedShards() {
|
||||
return failedShards();
|
||||
}
|
||||
|
||||
/**
|
||||
* The number of total shards the gateway snapshot operation was performed on.
|
||||
*/
|
||||
public int totalShards() {
|
||||
return successfulShards + failedShards;
|
||||
}
|
||||
|
||||
/**
|
||||
* The number of total shards the gateway snapshot operation was performed on.
|
||||
*/
|
||||
public int getTotalShards() {
|
||||
return totalShards();
|
||||
}
|
||||
|
||||
@Override public void readFrom(StreamInput in) throws IOException {
|
||||
index = in.readUTF();
|
||||
successfulShards = in.readVInt();
|
||||
failedShards = in.readVInt();
|
||||
}
|
||||
|
||||
@Override public void writeTo(StreamOutput out) throws IOException {
|
||||
out.writeUTF(index);
|
||||
out.writeVInt(successfulShards);
|
||||
out.writeVInt(failedShards);
|
||||
}
|
||||
}
|
|
@ -19,7 +19,7 @@
|
|||
|
||||
package org.elasticsearch.action.admin.indices.gateway.snapshot;
|
||||
|
||||
import org.elasticsearch.action.support.replication.ShardReplicationOperationRequest;
|
||||
import org.elasticsearch.action.support.broadcast.BroadcastShardOperationRequest;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
|
||||
|
@ -28,38 +28,20 @@ import java.io.IOException;
|
|||
/**
|
||||
* @author kimchy (shay.banon)
|
||||
*/
|
||||
class ShardGatewaySnapshotRequest extends ShardReplicationOperationRequest {
|
||||
|
||||
private int shardId;
|
||||
|
||||
ShardGatewaySnapshotRequest(IndexGatewaySnapshotRequest request, int shardId) {
|
||||
this(request.index(), shardId);
|
||||
timeout = request.timeout();
|
||||
}
|
||||
|
||||
ShardGatewaySnapshotRequest(String index, int shardId) {
|
||||
this.index = index;
|
||||
this.shardId = shardId;
|
||||
}
|
||||
class ShardGatewaySnapshotRequest extends BroadcastShardOperationRequest {
|
||||
|
||||
ShardGatewaySnapshotRequest() {
|
||||
}
|
||||
|
||||
public int shardId() {
|
||||
return this.shardId;
|
||||
public ShardGatewaySnapshotRequest(String index, int shardId) {
|
||||
super(index, shardId);
|
||||
}
|
||||
|
||||
@Override public void readFrom(StreamInput in) throws IOException {
|
||||
super.readFrom(in);
|
||||
shardId = in.readVInt();
|
||||
}
|
||||
|
||||
@Override public void writeTo(StreamOutput out) throws IOException {
|
||||
super.writeTo(out);
|
||||
out.writeVInt(shardId);
|
||||
}
|
||||
|
||||
@Override public String toString() {
|
||||
return "gateway_snapshot {[" + index + "][" + shardId + "]}";
|
||||
}
|
||||
}
|
|
@ -19,25 +19,29 @@
|
|||
|
||||
package org.elasticsearch.action.admin.indices.gateway.snapshot;
|
||||
|
||||
import org.elasticsearch.action.ActionResponse;
|
||||
import org.elasticsearch.action.support.broadcast.BroadcastShardOperationResponse;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.io.stream.Streamable;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* @author kimchy (shay.banon)
|
||||
*/
|
||||
class ShardGatewaySnapshotResponse implements ActionResponse, Streamable {
|
||||
class ShardGatewaySnapshotResponse extends BroadcastShardOperationResponse {
|
||||
|
||||
ShardGatewaySnapshotResponse() {
|
||||
}
|
||||
|
||||
public ShardGatewaySnapshotResponse(String index, int shardId) {
|
||||
super(index, shardId);
|
||||
}
|
||||
|
||||
@Override public void readFrom(StreamInput in) throws IOException {
|
||||
super.readFrom(in);
|
||||
}
|
||||
|
||||
@Override public void writeTo(StreamOutput out) throws IOException {
|
||||
super.writeTo(out);
|
||||
}
|
||||
}
|
|
@ -19,50 +19,97 @@
|
|||
|
||||
package org.elasticsearch.action.admin.indices.gateway.snapshot;
|
||||
|
||||
import org.elasticsearch.ElasticSearchException;
|
||||
import org.elasticsearch.action.ShardOperationFailedException;
|
||||
import org.elasticsearch.action.TransportActions;
|
||||
import org.elasticsearch.action.support.replication.TransportIndicesReplicationOperationAction;
|
||||
import org.elasticsearch.action.support.DefaultShardOperationFailedException;
|
||||
import org.elasticsearch.action.support.broadcast.BroadcastShardOperationFailedException;
|
||||
import org.elasticsearch.action.support.broadcast.TransportBroadcastOperationAction;
|
||||
import org.elasticsearch.cluster.ClusterService;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.routing.GroupShardsIterator;
|
||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
import org.elasticsearch.common.collect.Lists;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.index.gateway.IndexShardGatewayService;
|
||||
import org.elasticsearch.indices.IndicesService;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.concurrent.atomic.AtomicReferenceArray;
|
||||
|
||||
/**
|
||||
* @author kimchy (Shay Banon)
|
||||
* @author kimchy (shay.banon)
|
||||
*/
|
||||
public class TransportGatewaySnapshotAction extends TransportIndicesReplicationOperationAction<GatewaySnapshotRequest, GatewaySnapshotResponse, IndexGatewaySnapshotRequest, IndexGatewaySnapshotResponse, ShardGatewaySnapshotRequest, ShardGatewaySnapshotResponse> {
|
||||
public class TransportGatewaySnapshotAction extends TransportBroadcastOperationAction<GatewaySnapshotRequest, GatewaySnapshotResponse, ShardGatewaySnapshotRequest, ShardGatewaySnapshotResponse> {
|
||||
|
||||
@Inject public TransportGatewaySnapshotAction(Settings settings, TransportService transportService, ClusterService clusterService,
|
||||
ThreadPool threadPool, TransportIndexGatewaySnapshotAction indexAction) {
|
||||
super(settings, transportService, clusterService, threadPool, indexAction);
|
||||
}
|
||||
|
||||
@Override protected GatewaySnapshotRequest newRequestInstance() {
|
||||
return new GatewaySnapshotRequest();
|
||||
}
|
||||
|
||||
@Override protected GatewaySnapshotResponse newResponseInstance(GatewaySnapshotRequest request, AtomicReferenceArray indexResponses) {
|
||||
GatewaySnapshotResponse response = new GatewaySnapshotResponse();
|
||||
for (int i = 0; i < indexResponses.length(); i++) {
|
||||
IndexGatewaySnapshotResponse indexResponse = (IndexGatewaySnapshotResponse) indexResponses.get(i);
|
||||
if (indexResponse != null) {
|
||||
response.indices().put(indexResponse.index(), indexResponse);
|
||||
}
|
||||
}
|
||||
return response;
|
||||
}
|
||||
|
||||
@Override protected boolean accumulateExceptions() {
|
||||
return false;
|
||||
@Inject public TransportGatewaySnapshotAction(Settings settings, ThreadPool threadPool, ClusterService clusterService,
|
||||
TransportService transportService, IndicesService indicesService) {
|
||||
super(settings, threadPool, clusterService, transportService, indicesService);
|
||||
}
|
||||
|
||||
@Override protected String transportAction() {
|
||||
return TransportActions.Admin.Indices.Gateway.SNAPSHOT;
|
||||
}
|
||||
|
||||
@Override protected IndexGatewaySnapshotRequest newIndexRequestInstance(GatewaySnapshotRequest request, String index) {
|
||||
return new IndexGatewaySnapshotRequest(request, index);
|
||||
@Override protected String transportShardAction() {
|
||||
return "indices/gateway/snapshot/shard";
|
||||
}
|
||||
|
||||
@Override protected GatewaySnapshotRequest newRequest() {
|
||||
return new GatewaySnapshotRequest();
|
||||
}
|
||||
|
||||
@Override protected boolean ignoreNonActiveExceptions() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override protected GatewaySnapshotResponse newResponse(GatewaySnapshotRequest request, AtomicReferenceArray shardsResponses, ClusterState clusterState) {
|
||||
int successfulShards = 0;
|
||||
int failedShards = 0;
|
||||
List<ShardOperationFailedException> shardFailures = null;
|
||||
for (int i = 0; i < shardsResponses.length(); i++) {
|
||||
Object shardResponse = shardsResponses.get(i);
|
||||
if (shardResponse == null) {
|
||||
// non active shard, ignore
|
||||
} else if (shardResponse instanceof BroadcastShardOperationFailedException) {
|
||||
failedShards++;
|
||||
if (shardFailures == null) {
|
||||
shardFailures = Lists.newArrayList();
|
||||
}
|
||||
shardFailures.add(new DefaultShardOperationFailedException((BroadcastShardOperationFailedException) shardResponse));
|
||||
} else {
|
||||
successfulShards++;
|
||||
}
|
||||
}
|
||||
return new GatewaySnapshotResponse(shardsResponses.length(), successfulShards, failedShards, shardFailures);
|
||||
}
|
||||
|
||||
@Override protected ShardGatewaySnapshotRequest newShardRequest() {
|
||||
return new ShardGatewaySnapshotRequest();
|
||||
}
|
||||
|
||||
@Override protected ShardGatewaySnapshotRequest newShardRequest(ShardRouting shard, GatewaySnapshotRequest request) {
|
||||
return new ShardGatewaySnapshotRequest(shard.index(), shard.id());
|
||||
}
|
||||
|
||||
@Override protected ShardGatewaySnapshotResponse newShardResponse() {
|
||||
return new ShardGatewaySnapshotResponse();
|
||||
}
|
||||
|
||||
@Override protected ShardGatewaySnapshotResponse shardOperation(ShardGatewaySnapshotRequest request) throws ElasticSearchException {
|
||||
IndexShardGatewayService shardGatewayService = indicesService.indexServiceSafe(request.index())
|
||||
.shardInjectorSafe(request.shardId()).getInstance(IndexShardGatewayService.class);
|
||||
shardGatewayService.snapshot("api");
|
||||
return new ShardGatewaySnapshotResponse(request.index(), request.shardId());
|
||||
}
|
||||
|
||||
/**
|
||||
* The refresh request works against *all* shards.
|
||||
*/
|
||||
@Override protected GroupShardsIterator shards(GatewaySnapshotRequest request, ClusterState clusterState) {
|
||||
return clusterState.routingTable().primaryShardsGrouped(request.indices());
|
||||
}
|
||||
}
|
|
@ -1,82 +0,0 @@
|
|||
/*
|
||||
* Licensed to Elastic Search and Shay Banon under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Elastic Search licenses this
|
||||
* file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.action.admin.indices.gateway.snapshot;
|
||||
|
||||
import org.elasticsearch.action.support.replication.TransportIndexReplicationOperationAction;
|
||||
import org.elasticsearch.cluster.ClusterService;
|
||||
import org.elasticsearch.cluster.routing.GroupShardsIterator;
|
||||
import org.elasticsearch.cluster.routing.IndexRoutingTable;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.indices.IndexMissingException;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
|
||||
import java.util.concurrent.atomic.AtomicReferenceArray;
|
||||
|
||||
/**
|
||||
* @author kimchy (shay.banon)
|
||||
*/
|
||||
public class TransportIndexGatewaySnapshotAction extends TransportIndexReplicationOperationAction<IndexGatewaySnapshotRequest, IndexGatewaySnapshotResponse, ShardGatewaySnapshotRequest, ShardGatewaySnapshotResponse> {
|
||||
|
||||
@Inject public TransportIndexGatewaySnapshotAction(Settings settings, ClusterService clusterService,
|
||||
TransportService transportService, ThreadPool threadPool,
|
||||
TransportShardGatewaySnapshotAction shardGatewaySnapshotAction) {
|
||||
super(settings, transportService, clusterService, threadPool, shardGatewaySnapshotAction);
|
||||
}
|
||||
|
||||
@Override protected IndexGatewaySnapshotRequest newRequestInstance() {
|
||||
return new IndexGatewaySnapshotRequest();
|
||||
}
|
||||
|
||||
@Override protected IndexGatewaySnapshotResponse newResponseInstance(IndexGatewaySnapshotRequest request, AtomicReferenceArray shardsResponses) {
|
||||
int successfulShards = 0;
|
||||
int failedShards = 0;
|
||||
for (int i = 0; i < shardsResponses.length(); i++) {
|
||||
if (shardsResponses.get(i) == null) {
|
||||
failedShards++;
|
||||
} else {
|
||||
successfulShards++;
|
||||
}
|
||||
}
|
||||
return new IndexGatewaySnapshotResponse(request.index(), successfulShards, failedShards);
|
||||
}
|
||||
|
||||
@Override protected boolean accumulateExceptions() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override protected String transportAction() {
|
||||
return "indices/index/gateway/snapshot";
|
||||
}
|
||||
|
||||
@Override protected GroupShardsIterator shards(IndexGatewaySnapshotRequest request) {
|
||||
IndexRoutingTable indexRouting = clusterService.state().routingTable().index(request.index());
|
||||
if (indexRouting == null) {
|
||||
throw new IndexMissingException(new Index(request.index()));
|
||||
}
|
||||
return indexRouting.groupByShardsIt();
|
||||
}
|
||||
|
||||
@Override protected ShardGatewaySnapshotRequest newShardRequestInstance(IndexGatewaySnapshotRequest request, int shardId) {
|
||||
return new ShardGatewaySnapshotRequest(request, shardId);
|
||||
}
|
||||
}
|
|
@ -1,78 +0,0 @@
|
|||
/*
|
||||
* Licensed to Elastic Search and Shay Banon under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Elastic Search licenses this
|
||||
* file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.action.admin.indices.gateway.snapshot;
|
||||
|
||||
import org.elasticsearch.action.support.replication.TransportShardReplicationOperationAction;
|
||||
import org.elasticsearch.cluster.ClusterService;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.action.shard.ShardStateAction;
|
||||
import org.elasticsearch.cluster.routing.ShardsIterator;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.index.gateway.IndexShardGatewayService;
|
||||
import org.elasticsearch.indices.IndicesService;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
|
||||
/**
|
||||
* @author kimchy (Shay Banon)
|
||||
*/
|
||||
public class TransportShardGatewaySnapshotAction extends TransportShardReplicationOperationAction<ShardGatewaySnapshotRequest, ShardGatewaySnapshotResponse> {
|
||||
|
||||
@Inject public TransportShardGatewaySnapshotAction(Settings settings, TransportService transportService,
|
||||
ClusterService clusterService, IndicesService indicesService, ThreadPool threadPool,
|
||||
ShardStateAction shardStateAction) {
|
||||
super(settings, transportService, clusterService, indicesService, threadPool, shardStateAction);
|
||||
}
|
||||
|
||||
@Override protected ShardGatewaySnapshotRequest newRequestInstance() {
|
||||
return new ShardGatewaySnapshotRequest();
|
||||
}
|
||||
|
||||
@Override protected ShardGatewaySnapshotResponse newResponseInstance() {
|
||||
return new ShardGatewaySnapshotResponse();
|
||||
}
|
||||
|
||||
@Override protected String transportAction() {
|
||||
return "indices/index/shard/gateway/snapshot";
|
||||
}
|
||||
|
||||
@Override protected ShardGatewaySnapshotResponse shardOperationOnPrimary(ShardOperationRequest shardRequest) {
|
||||
IndexShardGatewayService shardGatewayService = indicesService.indexServiceSafe(shardRequest.request.index())
|
||||
.shardInjectorSafe(shardRequest.shardId).getInstance(IndexShardGatewayService.class);
|
||||
shardGatewayService.snapshot("api");
|
||||
return new ShardGatewaySnapshotResponse();
|
||||
}
|
||||
|
||||
@Override protected void shardOperationOnReplica(ShardOperationRequest shardRequest) {
|
||||
// silently ignore, we disable it with #ignoreBackups anyhow
|
||||
}
|
||||
|
||||
@Override protected ShardsIterator shards(ClusterState clusterState, ShardGatewaySnapshotRequest request) {
|
||||
return clusterService.state().routingTable().index(request.index()).shard(request.shardId()).shardsIt();
|
||||
}
|
||||
|
||||
/**
|
||||
* Snapshot should only happen on primary shards.
|
||||
*/
|
||||
@Override protected boolean ignoreReplicas() {
|
||||
return true;
|
||||
}
|
||||
}
|
|
@ -24,7 +24,6 @@ import org.elasticsearch.action.admin.indices.gateway.snapshot.GatewaySnapshotRe
|
|||
import org.elasticsearch.action.admin.indices.gateway.snapshot.GatewaySnapshotResponse;
|
||||
import org.elasticsearch.client.IndicesAdminClient;
|
||||
import org.elasticsearch.client.action.admin.indices.support.BaseIndicesRequestBuilder;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
|
||||
/**
|
||||
* @author kimchy (shay.banon)
|
||||
|
@ -40,16 +39,6 @@ public class GatewaySnapshotRequestBuilder extends BaseIndicesRequestBuilder<Gat
|
|||
return this;
|
||||
}
|
||||
|
||||
public GatewaySnapshotRequestBuilder setTimeout(TimeValue timeout) {
|
||||
request.timeout(timeout);
|
||||
return this;
|
||||
}
|
||||
|
||||
public GatewaySnapshotRequestBuilder setTimeout(String timeout) {
|
||||
request.timeout(timeout);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override protected void doExecute(ActionListener<GatewaySnapshotResponse> listener) {
|
||||
client.gatewaySnapshot(request, listener);
|
||||
}
|
||||
|
|
|
@ -22,7 +22,6 @@ package org.elasticsearch.rest.action.admin.indices.gateway.snapshot;
|
|||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.admin.indices.gateway.snapshot.GatewaySnapshotRequest;
|
||||
import org.elasticsearch.action.admin.indices.gateway.snapshot.GatewaySnapshotResponse;
|
||||
import org.elasticsearch.action.admin.indices.gateway.snapshot.IndexGatewaySnapshotResponse;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
|
@ -33,9 +32,9 @@ import org.elasticsearch.rest.action.support.RestXContentBuilder;
|
|||
|
||||
import java.io.IOException;
|
||||
|
||||
import static org.elasticsearch.action.support.replication.ShardReplicationOperationRequest.*;
|
||||
import static org.elasticsearch.rest.RestRequest.Method.*;
|
||||
import static org.elasticsearch.rest.RestResponse.Status.*;
|
||||
import static org.elasticsearch.rest.action.support.RestActions.*;
|
||||
|
||||
/**
|
||||
* @author kimchy (Shay Banon)
|
||||
|
@ -50,26 +49,16 @@ public class RestGatewaySnapshotAction extends BaseRestHandler {
|
|||
|
||||
@Override public void handleRequest(final RestRequest request, final RestChannel channel) {
|
||||
GatewaySnapshotRequest gatewaySnapshotRequest = new GatewaySnapshotRequest(RestActions.splitIndices(request.param("index")));
|
||||
gatewaySnapshotRequest.timeout(request.paramAsTime("timeout", DEFAULT_TIMEOUT));
|
||||
gatewaySnapshotRequest.listenerThreaded(false);
|
||||
client.admin().indices().gatewaySnapshot(gatewaySnapshotRequest, new ActionListener<GatewaySnapshotResponse>() {
|
||||
@Override public void onResponse(GatewaySnapshotResponse result) {
|
||||
@Override public void onResponse(GatewaySnapshotResponse response) {
|
||||
try {
|
||||
XContentBuilder builder = RestXContentBuilder.restContentBuilder(request);
|
||||
builder.startObject();
|
||||
builder.field("ok", true);
|
||||
builder.startObject("indices");
|
||||
for (IndexGatewaySnapshotResponse indexResponse : result.indices().values()) {
|
||||
builder.startObject(indexResponse.index())
|
||||
.field("ok", true)
|
||||
.startObject("_shards")
|
||||
.field("total", indexResponse.totalShards())
|
||||
.field("successful", indexResponse.successfulShards())
|
||||
.field("failed", indexResponse.failedShards())
|
||||
.endObject()
|
||||
.endObject();
|
||||
}
|
||||
builder.endObject();
|
||||
|
||||
buildBroadcastShardsHeader(builder, response);
|
||||
|
||||
builder.endObject();
|
||||
channel.sendResponse(new XContentRestResponse(request, OK, builder));
|
||||
} catch (Exception e) {
|
||||
|
|
Loading…
Reference in New Issue