Removes BWC snapshot status handler used in 6.x (#27443)
We introduced a new snapshot status update handler in 6.1.0. We will keep the old handler along with this new one in all 6.x. This commit removes the old handler from 7.0. Relates #27151
This commit is contained in:
parent
ce45e29be7
commit
4f711a828b
|
@ -23,7 +23,6 @@ import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
|
||||||
import org.apache.logging.log4j.message.ParameterizedMessage;
|
import org.apache.logging.log4j.message.ParameterizedMessage;
|
||||||
import org.apache.logging.log4j.util.Supplier;
|
import org.apache.logging.log4j.util.Supplier;
|
||||||
import org.elasticsearch.ExceptionsHelper;
|
import org.elasticsearch.ExceptionsHelper;
|
||||||
import org.elasticsearch.Version;
|
|
||||||
import org.elasticsearch.action.ActionListener;
|
import org.elasticsearch.action.ActionListener;
|
||||||
import org.elasticsearch.action.ActionRequestValidationException;
|
import org.elasticsearch.action.ActionRequestValidationException;
|
||||||
import org.elasticsearch.action.ActionResponse;
|
import org.elasticsearch.action.ActionResponse;
|
||||||
|
@ -68,10 +67,6 @@ import org.elasticsearch.repositories.IndexId;
|
||||||
import org.elasticsearch.repositories.Repository;
|
import org.elasticsearch.repositories.Repository;
|
||||||
import org.elasticsearch.threadpool.ThreadPool;
|
import org.elasticsearch.threadpool.ThreadPool;
|
||||||
import org.elasticsearch.transport.EmptyTransportResponseHandler;
|
import org.elasticsearch.transport.EmptyTransportResponseHandler;
|
||||||
import org.elasticsearch.transport.TransportChannel;
|
|
||||||
import org.elasticsearch.transport.TransportRequest;
|
|
||||||
import org.elasticsearch.transport.TransportRequestHandler;
|
|
||||||
import org.elasticsearch.transport.TransportResponse;
|
|
||||||
import org.elasticsearch.transport.TransportService;
|
import org.elasticsearch.transport.TransportService;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
@ -97,10 +92,8 @@ import static org.elasticsearch.cluster.SnapshotsInProgress.completed;
|
||||||
*/
|
*/
|
||||||
public class SnapshotShardsService extends AbstractLifecycleComponent implements ClusterStateListener, IndexEventListener {
|
public class SnapshotShardsService extends AbstractLifecycleComponent implements ClusterStateListener, IndexEventListener {
|
||||||
|
|
||||||
public static final String UPDATE_SNAPSHOT_STATUS_ACTION_NAME_V6 = "internal:cluster/snapshot/update_snapshot";
|
|
||||||
public static final String UPDATE_SNAPSHOT_STATUS_ACTION_NAME = "internal:cluster/snapshot/update_snapshot_status";
|
public static final String UPDATE_SNAPSHOT_STATUS_ACTION_NAME = "internal:cluster/snapshot/update_snapshot_status";
|
||||||
|
|
||||||
|
|
||||||
private final ClusterService clusterService;
|
private final ClusterService clusterService;
|
||||||
|
|
||||||
private final IndicesService indicesService;
|
private final IndicesService indicesService;
|
||||||
|
@ -138,21 +131,12 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements
|
||||||
// The constructor of UpdateSnapshotStatusAction will register itself to the TransportService.
|
// The constructor of UpdateSnapshotStatusAction will register itself to the TransportService.
|
||||||
this.updateSnapshotStatusHandler = new UpdateSnapshotStatusAction(settings, UPDATE_SNAPSHOT_STATUS_ACTION_NAME,
|
this.updateSnapshotStatusHandler = new UpdateSnapshotStatusAction(settings, UPDATE_SNAPSHOT_STATUS_ACTION_NAME,
|
||||||
transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver);
|
transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver);
|
||||||
|
|
||||||
if (DiscoveryNode.isMasterNode(settings)) {
|
|
||||||
// This needs to run only on nodes that can become masters
|
|
||||||
transportService.registerRequestHandler(UPDATE_SNAPSHOT_STATUS_ACTION_NAME_V6, UpdateSnapshotStatusRequestV6::new, ThreadPool.Names.SAME, new UpdateSnapshotStateRequestHandlerV6());
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void doStart() {
|
protected void doStart() {
|
||||||
assert this.updateSnapshotStatusHandler != null;
|
assert this.updateSnapshotStatusHandler != null;
|
||||||
assert transportService.getRequestHandler(UPDATE_SNAPSHOT_STATUS_ACTION_NAME) != null;
|
assert transportService.getRequestHandler(UPDATE_SNAPSHOT_STATUS_ACTION_NAME) != null;
|
||||||
if (DiscoveryNode.isMasterNode(settings)) {
|
|
||||||
assert transportService.getRequestHandler(UPDATE_SNAPSHOT_STATUS_ACTION_NAME_V6) != null;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -531,13 +515,8 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements
|
||||||
*/
|
*/
|
||||||
public void updateIndexShardSnapshotStatus(Snapshot snapshot, ShardId shardId, ShardSnapshotStatus status, DiscoveryNode master) {
|
public void updateIndexShardSnapshotStatus(Snapshot snapshot, ShardId shardId, ShardSnapshotStatus status, DiscoveryNode master) {
|
||||||
try {
|
try {
|
||||||
if (master.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
|
UpdateIndexShardSnapshotStatusRequest request = new UpdateIndexShardSnapshotStatusRequest(snapshot, shardId, status);
|
||||||
UpdateIndexShardSnapshotStatusRequest request = new UpdateIndexShardSnapshotStatusRequest(snapshot, shardId, status);
|
transportService.sendRequest(transportService.getLocalNode(), UPDATE_SNAPSHOT_STATUS_ACTION_NAME, request, EmptyTransportResponseHandler.INSTANCE_SAME);
|
||||||
transportService.sendRequest(transportService.getLocalNode(), UPDATE_SNAPSHOT_STATUS_ACTION_NAME, request, EmptyTransportResponseHandler.INSTANCE_SAME);
|
|
||||||
} else {
|
|
||||||
UpdateSnapshotStatusRequestV6 requestV6 = new UpdateSnapshotStatusRequestV6(snapshot, shardId, status);
|
|
||||||
transportService.sendRequest(master, UPDATE_SNAPSHOT_STATUS_ACTION_NAME_V6, requestV6, EmptyTransportResponseHandler.INSTANCE_SAME);
|
|
||||||
}
|
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
logger.warn((Supplier<?>) () -> new ParameterizedMessage("[{}] [{}] failed to update snapshot state", snapshot, status), e);
|
logger.warn((Supplier<?>) () -> new ParameterizedMessage("[{}] [{}] failed to update snapshot state", snapshot, status), e);
|
||||||
}
|
}
|
||||||
|
@ -651,78 +630,4 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* A BWC version of {@link UpdateIndexShardSnapshotStatusRequest}
|
|
||||||
*/
|
|
||||||
static class UpdateSnapshotStatusRequestV6 extends TransportRequest {
|
|
||||||
private Snapshot snapshot;
|
|
||||||
private ShardId shardId;
|
|
||||||
private ShardSnapshotStatus status;
|
|
||||||
|
|
||||||
UpdateSnapshotStatusRequestV6() {
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
UpdateSnapshotStatusRequestV6(Snapshot snapshot, ShardId shardId, ShardSnapshotStatus status) {
|
|
||||||
this.snapshot = snapshot;
|
|
||||||
this.shardId = shardId;
|
|
||||||
this.status = status;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void readFrom(StreamInput in) throws IOException {
|
|
||||||
super.readFrom(in);
|
|
||||||
snapshot = new Snapshot(in);
|
|
||||||
shardId = ShardId.readShardId(in);
|
|
||||||
status = new ShardSnapshotStatus(in);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void writeTo(StreamOutput out) throws IOException {
|
|
||||||
super.writeTo(out);
|
|
||||||
snapshot.writeTo(out);
|
|
||||||
shardId.writeTo(out);
|
|
||||||
status.writeTo(out);
|
|
||||||
}
|
|
||||||
|
|
||||||
Snapshot snapshot() {
|
|
||||||
return snapshot;
|
|
||||||
}
|
|
||||||
|
|
||||||
ShardId shardId() {
|
|
||||||
return shardId;
|
|
||||||
}
|
|
||||||
|
|
||||||
ShardSnapshotStatus status() {
|
|
||||||
return status;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String toString() {
|
|
||||||
return snapshot + ", shardId [" + shardId + "], status [" + status.state() + "]";
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* A BWC version of {@link UpdateSnapshotStatusAction}
|
|
||||||
*/
|
|
||||||
class UpdateSnapshotStateRequestHandlerV6 implements TransportRequestHandler<UpdateSnapshotStatusRequestV6> {
|
|
||||||
@Override
|
|
||||||
public void messageReceived(UpdateSnapshotStatusRequestV6 requestV6, final TransportChannel channel) throws Exception {
|
|
||||||
final UpdateIndexShardSnapshotStatusRequest request = new UpdateIndexShardSnapshotStatusRequest(requestV6.snapshot(), requestV6.shardId(), requestV6.status());
|
|
||||||
innerUpdateSnapshotState(request, new ActionListener<UpdateIndexShardSnapshotStatusResponse>() {
|
|
||||||
@Override
|
|
||||||
public void onResponse(UpdateIndexShardSnapshotStatusResponse updateSnapshotStatusResponse) {
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onFailure(Exception e) {
|
|
||||||
logger.warn("Failed to update snapshot status", e);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
channel.sendResponse(TransportResponse.Empty.INSTANCE);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue