diff --git a/core/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java b/core/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java index f8a601cc41f..15f70e8b2c6 100644 --- a/core/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java +++ b/core/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java @@ -23,7 +23,6 @@ import com.carrotsearch.hppc.cursors.ObjectObjectCursor; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.util.Supplier; import org.elasticsearch.ExceptionsHelper; -import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.ActionResponse; @@ -68,10 +67,6 @@ import org.elasticsearch.repositories.IndexId; import org.elasticsearch.repositories.Repository; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.EmptyTransportResponseHandler; -import org.elasticsearch.transport.TransportChannel; -import org.elasticsearch.transport.TransportRequest; -import org.elasticsearch.transport.TransportRequestHandler; -import org.elasticsearch.transport.TransportResponse; import org.elasticsearch.transport.TransportService; import java.io.IOException; @@ -97,10 +92,8 @@ import static org.elasticsearch.cluster.SnapshotsInProgress.completed; */ public class SnapshotShardsService extends AbstractLifecycleComponent implements ClusterStateListener, IndexEventListener { - public static final String UPDATE_SNAPSHOT_STATUS_ACTION_NAME_V6 = "internal:cluster/snapshot/update_snapshot"; public static final String UPDATE_SNAPSHOT_STATUS_ACTION_NAME = "internal:cluster/snapshot/update_snapshot_status"; - private final ClusterService clusterService; private final IndicesService indicesService; @@ -138,21 +131,12 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements // The constructor of UpdateSnapshotStatusAction will register itself to the TransportService. this.updateSnapshotStatusHandler = new UpdateSnapshotStatusAction(settings, UPDATE_SNAPSHOT_STATUS_ACTION_NAME, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver); - - if (DiscoveryNode.isMasterNode(settings)) { - // This needs to run only on nodes that can become masters - transportService.registerRequestHandler(UPDATE_SNAPSHOT_STATUS_ACTION_NAME_V6, UpdateSnapshotStatusRequestV6::new, ThreadPool.Names.SAME, new UpdateSnapshotStateRequestHandlerV6()); - } - } @Override protected void doStart() { assert this.updateSnapshotStatusHandler != null; assert transportService.getRequestHandler(UPDATE_SNAPSHOT_STATUS_ACTION_NAME) != null; - if (DiscoveryNode.isMasterNode(settings)) { - assert transportService.getRequestHandler(UPDATE_SNAPSHOT_STATUS_ACTION_NAME_V6) != null; - } } @Override @@ -531,13 +515,8 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements */ public void updateIndexShardSnapshotStatus(Snapshot snapshot, ShardId shardId, ShardSnapshotStatus status, DiscoveryNode master) { try { - if (master.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) { - UpdateIndexShardSnapshotStatusRequest request = new UpdateIndexShardSnapshotStatusRequest(snapshot, shardId, status); - transportService.sendRequest(transportService.getLocalNode(), UPDATE_SNAPSHOT_STATUS_ACTION_NAME, request, EmptyTransportResponseHandler.INSTANCE_SAME); - } else { - UpdateSnapshotStatusRequestV6 requestV6 = new UpdateSnapshotStatusRequestV6(snapshot, shardId, status); - transportService.sendRequest(master, UPDATE_SNAPSHOT_STATUS_ACTION_NAME_V6, requestV6, EmptyTransportResponseHandler.INSTANCE_SAME); - } + UpdateIndexShardSnapshotStatusRequest request = new UpdateIndexShardSnapshotStatusRequest(snapshot, shardId, status); + transportService.sendRequest(transportService.getLocalNode(), UPDATE_SNAPSHOT_STATUS_ACTION_NAME, request, EmptyTransportResponseHandler.INSTANCE_SAME); } catch (Exception e) { logger.warn((Supplier) () -> new ParameterizedMessage("[{}] [{}] failed to update snapshot state", snapshot, status), e); } @@ -651,78 +630,4 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements } } - /** - * A BWC version of {@link UpdateIndexShardSnapshotStatusRequest} - */ - static class UpdateSnapshotStatusRequestV6 extends TransportRequest { - private Snapshot snapshot; - private ShardId shardId; - private ShardSnapshotStatus status; - - UpdateSnapshotStatusRequestV6() { - - } - - UpdateSnapshotStatusRequestV6(Snapshot snapshot, ShardId shardId, ShardSnapshotStatus status) { - this.snapshot = snapshot; - this.shardId = shardId; - this.status = status; - } - - @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - snapshot = new Snapshot(in); - shardId = ShardId.readShardId(in); - status = new ShardSnapshotStatus(in); - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - snapshot.writeTo(out); - shardId.writeTo(out); - status.writeTo(out); - } - - Snapshot snapshot() { - return snapshot; - } - - ShardId shardId() { - return shardId; - } - - ShardSnapshotStatus status() { - return status; - } - - @Override - public String toString() { - return snapshot + ", shardId [" + shardId + "], status [" + status.state() + "]"; - } - } - - /** - * A BWC version of {@link UpdateSnapshotStatusAction} - */ - class UpdateSnapshotStateRequestHandlerV6 implements TransportRequestHandler { - @Override - public void messageReceived(UpdateSnapshotStatusRequestV6 requestV6, final TransportChannel channel) throws Exception { - final UpdateIndexShardSnapshotStatusRequest request = new UpdateIndexShardSnapshotStatusRequest(requestV6.snapshot(), requestV6.shardId(), requestV6.status()); - innerUpdateSnapshotState(request, new ActionListener() { - @Override - public void onResponse(UpdateIndexShardSnapshotStatusResponse updateSnapshotStatusResponse) { - - } - - @Override - public void onFailure(Exception e) { - logger.warn("Failed to update snapshot status", e); - } - }); - channel.sendResponse(TransportResponse.Empty.INSTANCE); - } - } - }