Acknowledge index deletion requests based on standard cluster state acknowledgment (#18602)
Index deletion requests currently use a custom acknowledgement mechanism that wait for the data nodes to actually delete the data before acknowledging the request to the client. This was initially put into place as a new index with same name could only be created if the old index was wiped as we used the index name as data folder on the data nodes. With PR #16442, we now use the index uuid as folder name which avoids collision between indices that are named the same (deleted and recreated). This allows us to get rid of the custom acknowledgment mechanism altogether and rely on the standard cluster state-based acknowledgment instead. Closes #18558
This commit is contained in:
parent
f32f35bec4
commit
bdd1d0703d
|
@ -0,0 +1,31 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.elasticsearch.action.admin.indices.delete;
|
||||
|
||||
import org.elasticsearch.cluster.ack.IndicesClusterStateUpdateRequest;
|
||||
|
||||
/**
|
||||
* Cluster state update request that allows to close one or more indices
|
||||
*/
|
||||
public class DeleteIndexClusterStateUpdateRequest extends IndicesClusterStateUpdateRequest<DeleteIndexClusterStateUpdateRequest> {
|
||||
|
||||
DeleteIndexClusterStateUpdateRequest() {
|
||||
|
||||
}
|
||||
}
|
|
@ -23,26 +23,22 @@ import org.elasticsearch.action.ActionRequestValidationException;
|
|||
import org.elasticsearch.action.IndicesRequest;
|
||||
import org.elasticsearch.action.support.IndicesOptions;
|
||||
import org.elasticsearch.action.support.master.AcknowledgedRequest;
|
||||
import org.elasticsearch.action.support.master.MasterNodeRequest;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.util.CollectionUtils;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import static org.elasticsearch.action.ValidateActions.addValidationError;
|
||||
import static org.elasticsearch.common.unit.TimeValue.readTimeValue;
|
||||
|
||||
/**
|
||||
* A request to delete an index. Best created with {@link org.elasticsearch.client.Requests#deleteIndexRequest(String)}.
|
||||
*/
|
||||
public class DeleteIndexRequest extends MasterNodeRequest<DeleteIndexRequest> implements IndicesRequest.Replaceable {
|
||||
public class DeleteIndexRequest extends AcknowledgedRequest<DeleteIndexRequest> implements IndicesRequest.Replaceable {
|
||||
|
||||
private String[] indices;
|
||||
// Delete index should work by default on both open and closed indices.
|
||||
private IndicesOptions indicesOptions = IndicesOptions.fromOptions(false, true, true, true);
|
||||
private TimeValue timeout = AcknowledgedRequest.DEFAULT_ACK_TIMEOUT;
|
||||
|
||||
public DeleteIndexRequest() {
|
||||
}
|
||||
|
@ -98,37 +94,11 @@ public class DeleteIndexRequest extends MasterNodeRequest<DeleteIndexRequest> im
|
|||
return indices;
|
||||
}
|
||||
|
||||
/**
|
||||
* Timeout to wait for the index deletion to be acknowledged by current cluster nodes. Defaults
|
||||
* to <tt>10s</tt>.
|
||||
*/
|
||||
public TimeValue timeout() {
|
||||
return timeout;
|
||||
}
|
||||
|
||||
/**
|
||||
* Timeout to wait for the index deletion to be acknowledged by current cluster nodes. Defaults
|
||||
* to <tt>10s</tt>.
|
||||
*/
|
||||
public DeleteIndexRequest timeout(TimeValue timeout) {
|
||||
this.timeout = timeout;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Timeout to wait for the index deletion to be acknowledged by current cluster nodes. Defaults
|
||||
* to <tt>10s</tt>.
|
||||
*/
|
||||
public DeleteIndexRequest timeout(String timeout) {
|
||||
return timeout(TimeValue.parseTimeValue(timeout, null, getClass().getSimpleName() + ".timeout"));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readFrom(StreamInput in) throws IOException {
|
||||
super.readFrom(in);
|
||||
indices = in.readStringArray();
|
||||
indicesOptions = IndicesOptions.readIndicesOptions(in);
|
||||
timeout = readTimeValue(in);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -136,6 +106,5 @@ public class DeleteIndexRequest extends MasterNodeRequest<DeleteIndexRequest> im
|
|||
super.writeTo(out);
|
||||
out.writeStringArray(indices);
|
||||
indicesOptions.writeIndicesOptions(out);
|
||||
timeout.writeTo(out);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -24,6 +24,7 @@ import org.elasticsearch.action.support.ActionFilters;
|
|||
import org.elasticsearch.action.support.DestructiveOperations;
|
||||
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse;
|
||||
import org.elasticsearch.cluster.block.ClusterBlockException;
|
||||
import org.elasticsearch.cluster.block.ClusterBlockLevel;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
|
@ -85,15 +86,21 @@ public class TransportDeleteIndexAction extends TransportMasterNodeAction<Delete
|
|||
listener.onResponse(new DeleteIndexResponse(true));
|
||||
return;
|
||||
}
|
||||
deleteIndexService.deleteIndices(new MetaDataDeleteIndexService.Request(concreteIndices).timeout(request.timeout()).masterTimeout(request.masterNodeTimeout()), new MetaDataDeleteIndexService.Listener() {
|
||||
|
||||
DeleteIndexClusterStateUpdateRequest deleteRequest = new DeleteIndexClusterStateUpdateRequest()
|
||||
.ackTimeout(request.timeout()).masterNodeTimeout(request.masterNodeTimeout())
|
||||
.indices(concreteIndices.toArray(new Index[concreteIndices.size()]));
|
||||
|
||||
deleteIndexService.deleteIndices(deleteRequest, new ActionListener<ClusterStateUpdateResponse>() {
|
||||
|
||||
@Override
|
||||
public void onResponse(MetaDataDeleteIndexService.Response response) {
|
||||
listener.onResponse(new DeleteIndexResponse(response.acknowledged()));
|
||||
public void onResponse(ClusterStateUpdateResponse response) {
|
||||
listener.onResponse(new DeleteIndexResponse(response.isAcknowledged()));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Throwable t) {
|
||||
logger.debug("failed to delete indices [{}]", t, concreteIndices);
|
||||
listener.onFailure(t);
|
||||
}
|
||||
});
|
||||
|
|
|
@ -18,7 +18,6 @@
|
|||
*/
|
||||
package org.elasticsearch.action.support.master;
|
||||
|
||||
import org.elasticsearch.action.ActionRequest;
|
||||
import org.elasticsearch.cluster.ack.AckedRequest;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
|
|
|
@ -20,7 +20,6 @@
|
|||
package org.elasticsearch.cluster;
|
||||
|
||||
import org.elasticsearch.cluster.action.index.MappingUpdatedAction;
|
||||
import org.elasticsearch.cluster.action.index.NodeIndexDeletedAction;
|
||||
import org.elasticsearch.cluster.action.index.NodeMappingRefreshAction;
|
||||
import org.elasticsearch.cluster.action.shard.ShardStateAction;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
|
@ -155,7 +154,6 @@ public class ClusterModule extends AbstractModule {
|
|||
bind(RoutingService.class).asEagerSingleton();
|
||||
bind(DelayedAllocationService.class).asEagerSingleton();
|
||||
bind(ShardStateAction.class).asEagerSingleton();
|
||||
bind(NodeIndexDeletedAction.class).asEagerSingleton();
|
||||
bind(NodeMappingRefreshAction.class).asEagerSingleton();
|
||||
bind(MappingUpdatedAction.class).asEagerSingleton();
|
||||
bind(TaskResultsService.class).asEagerSingleton();
|
||||
|
|
|
@ -1,199 +0,0 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.cluster.action.index;
|
||||
|
||||
import org.apache.lucene.store.LockObtainFailedException;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
import org.elasticsearch.common.component.AbstractComponent;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.index.IndexSettings;
|
||||
import org.elasticsearch.indices.IndicesService;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.EmptyTransportResponseHandler;
|
||||
import org.elasticsearch.transport.TransportChannel;
|
||||
import org.elasticsearch.transport.TransportRequest;
|
||||
import org.elasticsearch.transport.TransportRequestHandler;
|
||||
import org.elasticsearch.transport.TransportResponse;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public class NodeIndexDeletedAction extends AbstractComponent {
|
||||
|
||||
public static final String INDEX_DELETED_ACTION_NAME = "internal:cluster/node/index/deleted";
|
||||
public static final String INDEX_STORE_DELETED_ACTION_NAME = "internal:cluster/node/index_store/deleted";
|
||||
|
||||
private final ThreadPool threadPool;
|
||||
private final TransportService transportService;
|
||||
private final List<Listener> listeners = new CopyOnWriteArrayList<>();
|
||||
private final IndicesService indicesService;
|
||||
|
||||
@Inject
|
||||
public NodeIndexDeletedAction(Settings settings, ThreadPool threadPool, TransportService transportService, IndicesService indicesService) {
|
||||
super(settings);
|
||||
this.threadPool = threadPool;
|
||||
this.transportService = transportService;
|
||||
transportService.registerRequestHandler(INDEX_DELETED_ACTION_NAME, NodeIndexDeletedMessage::new, ThreadPool.Names.SAME, new NodeIndexDeletedTransportHandler());
|
||||
transportService.registerRequestHandler(INDEX_STORE_DELETED_ACTION_NAME, NodeIndexStoreDeletedMessage::new, ThreadPool.Names.SAME, new NodeIndexStoreDeletedTransportHandler());
|
||||
this.indicesService = indicesService;
|
||||
}
|
||||
|
||||
public void add(Listener listener) {
|
||||
listeners.add(listener);
|
||||
}
|
||||
|
||||
public void remove(Listener listener) {
|
||||
listeners.remove(listener);
|
||||
}
|
||||
|
||||
public void nodeIndexDeleted(final ClusterState clusterState, final Index index, final IndexSettings indexSettings, final String nodeId) {
|
||||
final DiscoveryNodes nodes = clusterState.nodes();
|
||||
transportService.sendRequest(clusterState.nodes().getMasterNode(),
|
||||
INDEX_DELETED_ACTION_NAME, new NodeIndexDeletedMessage(index, nodeId), EmptyTransportResponseHandler.INSTANCE_SAME);
|
||||
if (nodes.getLocalNode().isDataNode() == false) {
|
||||
logger.trace("[{}] not acking store deletion (not a data node)", index);
|
||||
return;
|
||||
}
|
||||
threadPool.generic().execute(new AbstractRunnable() {
|
||||
@Override
|
||||
public void onFailure(Throwable t) {
|
||||
logger.warn("[{}] failed to ack index store deleted for index", t, index);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doRun() throws Exception {
|
||||
lockIndexAndAck(index, nodes, nodeId, clusterState, indexSettings);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private void lockIndexAndAck(Index index, DiscoveryNodes nodes, String nodeId, ClusterState clusterState, IndexSettings indexSettings) throws IOException {
|
||||
try {
|
||||
// we are waiting until we can lock the index / all shards on the node and then we ack the delete of the store to the
|
||||
// master. If we can't acquire the locks here immediately there might be a shard of this index still holding on to the lock
|
||||
// due to a "currently canceled recovery" or so. The shard will delete itself BEFORE the lock is released so it's guaranteed to be
|
||||
// deleted by the time we get the lock
|
||||
indicesService.processPendingDeletes(indexSettings.getIndex(), indexSettings, new TimeValue(30, TimeUnit.MINUTES));
|
||||
transportService.sendRequest(clusterState.nodes().getMasterNode(),
|
||||
INDEX_STORE_DELETED_ACTION_NAME, new NodeIndexStoreDeletedMessage(index, nodeId), EmptyTransportResponseHandler.INSTANCE_SAME);
|
||||
} catch (LockObtainFailedException exc) {
|
||||
logger.warn("[{}] failed to lock all shards for index - timed out after 30 seconds", index);
|
||||
} catch (InterruptedException e) {
|
||||
logger.warn("[{}] failed to lock all shards for index - interrupted", index);
|
||||
}
|
||||
}
|
||||
|
||||
public interface Listener {
|
||||
void onNodeIndexDeleted(Index index, String nodeId);
|
||||
|
||||
void onNodeIndexStoreDeleted(Index index, String nodeId);
|
||||
}
|
||||
|
||||
private class NodeIndexDeletedTransportHandler implements TransportRequestHandler<NodeIndexDeletedMessage> {
|
||||
|
||||
@Override
|
||||
public void messageReceived(NodeIndexDeletedMessage message, TransportChannel channel) throws Exception {
|
||||
for (Listener listener : listeners) {
|
||||
listener.onNodeIndexDeleted(message.index, message.nodeId);
|
||||
}
|
||||
channel.sendResponse(TransportResponse.Empty.INSTANCE);
|
||||
}
|
||||
}
|
||||
|
||||
private class NodeIndexStoreDeletedTransportHandler implements TransportRequestHandler<NodeIndexStoreDeletedMessage> {
|
||||
|
||||
@Override
|
||||
public void messageReceived(NodeIndexStoreDeletedMessage message, TransportChannel channel) throws Exception {
|
||||
for (Listener listener : listeners) {
|
||||
listener.onNodeIndexStoreDeleted(message.index, message.nodeId);
|
||||
}
|
||||
channel.sendResponse(TransportResponse.Empty.INSTANCE);
|
||||
}
|
||||
}
|
||||
|
||||
public static class NodeIndexDeletedMessage extends TransportRequest {
|
||||
|
||||
Index index;
|
||||
String nodeId;
|
||||
|
||||
public NodeIndexDeletedMessage() {
|
||||
}
|
||||
|
||||
NodeIndexDeletedMessage(Index index, String nodeId) {
|
||||
this.index = index;
|
||||
this.nodeId = nodeId;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
super.writeTo(out);
|
||||
index.writeTo(out);
|
||||
out.writeString(nodeId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readFrom(StreamInput in) throws IOException {
|
||||
super.readFrom(in);
|
||||
index = new Index(in);
|
||||
nodeId = in.readString();
|
||||
}
|
||||
}
|
||||
|
||||
public static class NodeIndexStoreDeletedMessage extends TransportRequest {
|
||||
|
||||
Index index;
|
||||
String nodeId;
|
||||
|
||||
public NodeIndexStoreDeletedMessage() {
|
||||
}
|
||||
|
||||
NodeIndexStoreDeletedMessage(Index index, String nodeId) {
|
||||
this.index = index;
|
||||
this.nodeId = nodeId;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
super.writeTo(out);
|
||||
index.writeTo(out);
|
||||
out.writeString(nodeId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readFrom(StreamInput in) throws IOException {
|
||||
super.readFrom(in);
|
||||
index = new Index(in);
|
||||
nodeId = in.readString();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -219,8 +219,10 @@ final public class IndexGraveyard implements MetaData.Custom {
|
|||
/**
|
||||
* Add a set of deleted indexes to the list of tombstones in the cluster state.
|
||||
*/
|
||||
public Builder addTombstones(final Set<Index> indices) {
|
||||
indices.stream().forEach(this::addTombstone);
|
||||
public Builder addTombstones(final Index[] indices) {
|
||||
for (Index index : indices) {
|
||||
addTombstone(index);
|
||||
}
|
||||
return this;
|
||||
}
|
||||
|
||||
|
|
|
@ -19,10 +19,11 @@
|
|||
|
||||
package org.elasticsearch.cluster.metadata;
|
||||
|
||||
import org.elasticsearch.action.support.master.MasterNodeRequest;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.admin.indices.delete.DeleteIndexClusterStateUpdateRequest;
|
||||
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.ClusterStateUpdateTask;
|
||||
import org.elasticsearch.cluster.action.index.NodeIndexDeletedAction;
|
||||
import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse;
|
||||
import org.elasticsearch.cluster.block.ClusterBlocks;
|
||||
import org.elasticsearch.cluster.routing.RoutingTable;
|
||||
import org.elasticsearch.cluster.routing.allocation.AllocationService;
|
||||
|
@ -32,16 +33,11 @@ import org.elasticsearch.common.Priority;
|
|||
import org.elasticsearch.common.component.AbstractComponent;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.util.concurrent.FutureUtils;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.snapshots.SnapshotsService;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ScheduledFuture;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
|
@ -49,46 +45,37 @@ import java.util.stream.Collectors;
|
|||
*/
|
||||
public class MetaDataDeleteIndexService extends AbstractComponent {
|
||||
|
||||
private final ThreadPool threadPool;
|
||||
|
||||
private final ClusterService clusterService;
|
||||
|
||||
private final AllocationService allocationService;
|
||||
|
||||
private final NodeIndexDeletedAction nodeIndexDeletedAction;
|
||||
|
||||
@Inject
|
||||
public MetaDataDeleteIndexService(Settings settings, ThreadPool threadPool, ClusterService clusterService, AllocationService allocationService,
|
||||
NodeIndexDeletedAction nodeIndexDeletedAction) {
|
||||
public MetaDataDeleteIndexService(Settings settings, ClusterService clusterService, AllocationService allocationService) {
|
||||
super(settings);
|
||||
this.threadPool = threadPool;
|
||||
this.clusterService = clusterService;
|
||||
this.allocationService = allocationService;
|
||||
this.nodeIndexDeletedAction = nodeIndexDeletedAction;
|
||||
}
|
||||
|
||||
public void deleteIndices(final Request request, final Listener userListener) {
|
||||
final DeleteIndexListener listener = new DeleteIndexListener(userListener);
|
||||
public void deleteIndices(final DeleteIndexClusterStateUpdateRequest request, final ActionListener<ClusterStateUpdateResponse> listener) {
|
||||
if (request.indices() == null || request.indices().length == 0) {
|
||||
throw new IllegalArgumentException("Index name is required");
|
||||
}
|
||||
|
||||
clusterService.submitStateUpdateTask("delete-index " + request.indices, new ClusterStateUpdateTask(Priority.URGENT) {
|
||||
clusterService.submitStateUpdateTask("delete-index " + request.indices(),
|
||||
new AckedClusterStateUpdateTask<ClusterStateUpdateResponse>(Priority.URGENT, request, listener) {
|
||||
|
||||
@Override
|
||||
public TimeValue timeout() {
|
||||
return request.masterTimeout;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(String source, Throwable t) {
|
||||
listener.onFailure(t);
|
||||
protected ClusterStateUpdateResponse newResponse(boolean acknowledged) {
|
||||
return new ClusterStateUpdateResponse(acknowledged);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ClusterState execute(final ClusterState currentState) {
|
||||
final MetaData meta = currentState.metaData();
|
||||
final Set<IndexMetaData> metaDatas = request.indices.stream().map(i -> meta.getIndexSafe(i)).collect(Collectors.toSet());
|
||||
final Index[] indices = request.indices();
|
||||
final Set<IndexMetaData> metaDatas = Arrays.asList(indices).stream().map(i -> meta.getIndexSafe(i)).collect(Collectors.toSet());
|
||||
// Check if index deletion conflicts with any running snapshots
|
||||
SnapshotsService.checkIndexDeletion(currentState, metaDatas);
|
||||
final Set<Index> indices = request.indices;
|
||||
RoutingTable.Builder routingTableBuilder = RoutingTable.builder(currentState.routingTable());
|
||||
MetaData.Builder metaDataBuilder = MetaData.builder(meta);
|
||||
ClusterBlocks.Builder clusterBlocksBuilder = ClusterBlocks.builder().blocks(currentState.blocks());
|
||||
|
@ -108,40 +95,6 @@ public class MetaDataDeleteIndexService extends AbstractComponent {
|
|||
logger.trace("{} tombstones purged from the cluster state. Previous tombstone size: {}. Current tombstone size: {}.",
|
||||
graveyardBuilder.getNumPurged(), previousGraveyardSize, currentGraveyard.getTombstones().size());
|
||||
|
||||
// wait for events from all nodes that it has been removed from their respective metadata...
|
||||
int count = currentState.nodes().getSize();
|
||||
// add the notifications that the store was deleted from *data* nodes
|
||||
count += currentState.nodes().getDataNodes().size();
|
||||
final AtomicInteger counter = new AtomicInteger(count * indices.size());
|
||||
|
||||
// this listener will be notified once we get back a notification based on the cluster state change below.
|
||||
final NodeIndexDeletedAction.Listener nodeIndexDeleteListener = new NodeIndexDeletedAction.Listener() {
|
||||
@Override
|
||||
public void onNodeIndexDeleted(Index deleted, String nodeId) {
|
||||
if (indices.contains(deleted)) {
|
||||
if (counter.decrementAndGet() == 0) {
|
||||
listener.onResponse(new Response(true));
|
||||
nodeIndexDeletedAction.remove(this);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onNodeIndexStoreDeleted(Index deleted, String nodeId) {
|
||||
if (indices.contains(deleted)) {
|
||||
if (counter.decrementAndGet() == 0) {
|
||||
listener.onResponse(new Response(true));
|
||||
nodeIndexDeletedAction.remove(this);
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
nodeIndexDeletedAction.add(nodeIndexDeleteListener);
|
||||
listener.future = threadPool.schedule(request.timeout, ThreadPool.Names.SAME, () -> {
|
||||
listener.onResponse(new Response(false));
|
||||
nodeIndexDeletedAction.remove(nodeIndexDeleteListener);
|
||||
});
|
||||
|
||||
MetaData newMetaData = metaDataBuilder.build();
|
||||
ClusterBlocks blocks = clusterBlocksBuilder.build();
|
||||
RoutingAllocation.Result routingResult = allocationService.reroute(
|
||||
|
@ -149,78 +102,6 @@ public class MetaDataDeleteIndexService extends AbstractComponent {
|
|||
"deleted indices [" + indices + "]");
|
||||
return ClusterState.builder(currentState).routingResult(routingResult).metaData(newMetaData).blocks(blocks).build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
class DeleteIndexListener implements Listener {
|
||||
|
||||
private final AtomicBoolean notified = new AtomicBoolean();
|
||||
private final Listener listener;
|
||||
volatile ScheduledFuture<?> future;
|
||||
|
||||
private DeleteIndexListener(Listener listener) {
|
||||
this.listener = listener;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onResponse(final Response response) {
|
||||
if (notified.compareAndSet(false, true)) {
|
||||
FutureUtils.cancel(future);
|
||||
listener.onResponse(response);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Throwable t) {
|
||||
if (notified.compareAndSet(false, true)) {
|
||||
FutureUtils.cancel(future);
|
||||
listener.onFailure(t);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public interface Listener {
|
||||
|
||||
void onResponse(Response response);
|
||||
|
||||
void onFailure(Throwable t);
|
||||
}
|
||||
|
||||
public static class Request {
|
||||
|
||||
final Set<Index> indices;
|
||||
|
||||
TimeValue timeout = TimeValue.timeValueSeconds(10);
|
||||
TimeValue masterTimeout = MasterNodeRequest.DEFAULT_MASTER_NODE_TIMEOUT;
|
||||
|
||||
public Request(Set<Index> indices) {
|
||||
this.indices = indices;
|
||||
}
|
||||
|
||||
public Request timeout(TimeValue timeout) {
|
||||
this.timeout = timeout;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Request masterTimeout(TimeValue masterTimeout) {
|
||||
this.masterTimeout = masterTimeout;
|
||||
return this;
|
||||
}
|
||||
}
|
||||
|
||||
public static class Response {
|
||||
private final boolean acknowledged;
|
||||
|
||||
public Response(boolean acknowledged) {
|
||||
this.acknowledged = acknowledged;
|
||||
}
|
||||
|
||||
public boolean acknowledged() {
|
||||
return acknowledged;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -24,14 +24,12 @@ import org.elasticsearch.ElasticsearchException;
|
|||
import org.elasticsearch.cluster.ClusterChangedEvent;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.ClusterStateListener;
|
||||
import org.elasticsearch.cluster.action.index.NodeIndexDeletedAction;
|
||||
import org.elasticsearch.cluster.action.index.NodeMappingRefreshAction;
|
||||
import org.elasticsearch.cluster.action.shard.ShardStateAction;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.metadata.MappingMetaData;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
|
||||
import org.elasticsearch.cluster.routing.RoutingNode;
|
||||
import org.elasticsearch.cluster.routing.RoutingTable;
|
||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
|
@ -48,7 +46,6 @@ import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
|
|||
import org.elasticsearch.gateway.GatewayService;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.index.IndexService;
|
||||
import org.elasticsearch.index.IndexSettings;
|
||||
import org.elasticsearch.index.IndexShardAlreadyExistsException;
|
||||
import org.elasticsearch.index.NodeServicesProvider;
|
||||
import org.elasticsearch.index.mapper.DocumentMapper;
|
||||
|
@ -89,7 +86,6 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
|
|||
private final ThreadPool threadPool;
|
||||
private final RecoveryTargetService recoveryTargetService;
|
||||
private final ShardStateAction shardStateAction;
|
||||
private final NodeIndexDeletedAction nodeIndexDeletedAction;
|
||||
private final NodeMappingRefreshAction nodeMappingRefreshAction;
|
||||
private final NodeServicesProvider nodeServicesProvider;
|
||||
|
||||
|
@ -112,7 +108,6 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
|
|||
public IndicesClusterStateService(Settings settings, IndicesService indicesService, ClusterService clusterService,
|
||||
ThreadPool threadPool, RecoveryTargetService recoveryTargetService,
|
||||
ShardStateAction shardStateAction,
|
||||
NodeIndexDeletedAction nodeIndexDeletedAction,
|
||||
NodeMappingRefreshAction nodeMappingRefreshAction,
|
||||
RepositoriesService repositoriesService, RestoreService restoreService,
|
||||
SearchService searchService, SyncedFlushService syncedFlushService,
|
||||
|
@ -124,7 +119,6 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
|
|||
this.threadPool = threadPool;
|
||||
this.recoveryTargetService = recoveryTargetService;
|
||||
this.shardStateAction = shardStateAction;
|
||||
this.nodeIndexDeletedAction = nodeIndexDeletedAction;
|
||||
this.nodeMappingRefreshAction = nodeMappingRefreshAction;
|
||||
this.restoreService = restoreService;
|
||||
this.repositoriesService = repositoriesService;
|
||||
|
@ -219,14 +213,11 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
|
|||
logger.debug("[{}] cleaning index, no longer part of the metadata", index);
|
||||
}
|
||||
final IndexService idxService = indicesService.indexService(index);
|
||||
final IndexSettings indexSettings;
|
||||
if (idxService != null) {
|
||||
indexSettings = idxService.getIndexSettings();
|
||||
deleteIndex(index, "index no longer part of the metadata");
|
||||
} else if (previousState.metaData().hasIndex(index.getName())) {
|
||||
// The deleted index was part of the previous cluster state, but not loaded on the local node
|
||||
final IndexMetaData metaData = previousState.metaData().index(index);
|
||||
indexSettings = new IndexSettings(metaData, settings);
|
||||
indicesService.deleteUnassignedIndex("deleted index was not assigned to local node", metaData, event.state());
|
||||
} else {
|
||||
// The previous cluster state's metadata also does not contain the index,
|
||||
|
@ -236,21 +227,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
|
|||
// First, though, verify the precondition for applying this case by
|
||||
// asserting that the previous cluster state is not initialized/recovered.
|
||||
assert previousState.blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK);
|
||||
final IndexMetaData metaData = indicesService.verifyIndexIsDeleted(index, event.state());
|
||||
if (metaData != null) {
|
||||
indexSettings = new IndexSettings(metaData, settings);
|
||||
} else {
|
||||
indexSettings = null;
|
||||
}
|
||||
}
|
||||
// indexSettings can only be null if there was no IndexService and no metadata existed
|
||||
// on disk for this index, so it won't need to go through the node deleted action anyway
|
||||
if (indexSettings != null) {
|
||||
try {
|
||||
nodeIndexDeletedAction.nodeIndexDeleted(event.state(), index, indexSettings, localNodeId);
|
||||
} catch (Exception e) {
|
||||
logger.debug("failed to send to master index {} deleted event", e, index);
|
||||
}
|
||||
indicesService.verifyIndexIsDeleted(index, event.state());
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -105,7 +105,6 @@ public class IndicesServiceTests extends ESSingleNodeTestCase {
|
|||
indicesService.canDeleteShardContent(notAllocated, test.getIndexSettings()));
|
||||
}
|
||||
|
||||
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/18558")
|
||||
public void testDeleteIndexStore() throws Exception {
|
||||
IndicesService indicesService = getIndicesService();
|
||||
IndexService test = createIndex("test");
|
||||
|
|
Loading…
Reference in New Issue