perform only one cluster state update per DeleteIndexRequest
This commit is contained in:
parent
4adf7c3d86
commit
efa6c0b37f
|
@ -31,7 +31,6 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
|||
import org.elasticsearch.cluster.metadata.MetaDataDeleteIndexService;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.util.concurrent.CountDown;
|
||||
import org.elasticsearch.node.settings.NodeSettingsService;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
|
@ -77,42 +76,22 @@ public class TransportDeleteIndexAction extends TransportMasterNodeAction<Delete
|
|||
|
||||
@Override
|
||||
protected void masterOperation(final DeleteIndexRequest request, final ClusterState state, final ActionListener<DeleteIndexResponse> listener) {
|
||||
String[] concreteIndices = indexNameExpressionResolver.concreteIndices(state, request);
|
||||
final String[] concreteIndices = indexNameExpressionResolver.concreteIndices(state, request);
|
||||
if (concreteIndices.length == 0) {
|
||||
listener.onResponse(new DeleteIndexResponse(true));
|
||||
return;
|
||||
}
|
||||
// TODO: this API should be improved, currently, if one delete index failed, we send a failure, we should send a response array that includes all the indices that were deleted
|
||||
final CountDown count = new CountDown(concreteIndices.length);
|
||||
for (final String index : concreteIndices) {
|
||||
deleteIndexService.deleteIndex(new MetaDataDeleteIndexService.Request(index).timeout(request.timeout()).masterTimeout(request.masterNodeTimeout()), new MetaDataDeleteIndexService.Listener() {
|
||||
deleteIndexService.deleteIndices(new MetaDataDeleteIndexService.Request(concreteIndices).timeout(request.timeout()).masterTimeout(request.masterNodeTimeout()), new MetaDataDeleteIndexService.Listener() {
|
||||
|
||||
private volatile Throwable lastFailure;
|
||||
private volatile boolean ack = true;
|
||||
@Override
|
||||
public void onResponse(MetaDataDeleteIndexService.Response response) {
|
||||
listener.onResponse(new DeleteIndexResponse(response.acknowledged()));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onResponse(MetaDataDeleteIndexService.Response response) {
|
||||
if (!response.acknowledged()) {
|
||||
ack = false;
|
||||
}
|
||||
if (count.countDown()) {
|
||||
if (lastFailure != null) {
|
||||
listener.onFailure(lastFailure);
|
||||
} else {
|
||||
listener.onResponse(new DeleteIndexResponse(ack));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Throwable t) {
|
||||
logger.debug("[{}] failed to delete index", t, index);
|
||||
lastFailure = t;
|
||||
if (count.countDown()) {
|
||||
listener.onFailure(t);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
@Override
|
||||
public void onFailure(Throwable t) {
|
||||
listener.onFailure(t);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
|
|
@ -37,9 +37,9 @@ import org.elasticsearch.common.util.concurrent.FutureUtils;
|
|||
import org.elasticsearch.index.IndexNotFoundException;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.concurrent.ScheduledFuture;
|
||||
import java.util.concurrent.Semaphore;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
|
@ -66,9 +66,11 @@ public class MetaDataDeleteIndexService extends AbstractComponent {
|
|||
this.nodeIndexDeletedAction = nodeIndexDeletedAction;
|
||||
}
|
||||
|
||||
public void deleteIndex(final Request request, final Listener userListener) {
|
||||
public void deleteIndices(final Request request, final Listener userListener) {
|
||||
Collection<String> indices = Arrays.asList(request.indices);
|
||||
final DeleteIndexListener listener = new DeleteIndexListener(userListener);
|
||||
clusterService.submitStateUpdateTask("delete-index [" + request.index + "]", Priority.URGENT, new ClusterStateUpdateTask() {
|
||||
|
||||
clusterService.submitStateUpdateTask("delete-index " + indices, Priority.URGENT, new ClusterStateUpdateTask() {
|
||||
|
||||
@Override
|
||||
public TimeValue timeout() {
|
||||
|
@ -82,34 +84,32 @@ public class MetaDataDeleteIndexService extends AbstractComponent {
|
|||
|
||||
@Override
|
||||
public ClusterState execute(final ClusterState currentState) {
|
||||
if (!currentState.metaData().hasConcreteIndex(request.index)) {
|
||||
throw new IndexNotFoundException(request.index);
|
||||
}
|
||||
|
||||
logger.info("[{}] deleting index", request.index);
|
||||
|
||||
RoutingTable.Builder routingTableBuilder = RoutingTable.builder(currentState.routingTable());
|
||||
routingTableBuilder.remove(request.index);
|
||||
MetaData.Builder metaDataBuilder = MetaData.builder(currentState.metaData());
|
||||
ClusterBlocks.Builder clusterBlocksBuilder = ClusterBlocks.builder().blocks(currentState.blocks());
|
||||
|
||||
MetaData newMetaData = MetaData.builder(currentState.metaData())
|
||||
.remove(request.index)
|
||||
.build();
|
||||
for (final String index: indices) {
|
||||
if (!currentState.metaData().hasConcreteIndex(index)) {
|
||||
throw new IndexNotFoundException(index);
|
||||
}
|
||||
|
||||
RoutingAllocation.Result routingResult = allocationService.reroute(
|
||||
ClusterState.builder(currentState).routingTable(routingTableBuilder.build()).metaData(newMetaData).build());
|
||||
|
||||
ClusterBlocks blocks = ClusterBlocks.builder().blocks(currentState.blocks()).removeIndexBlocks(request.index).build();
|
||||
logger.debug("[{}] deleting index", index);
|
||||
|
||||
routingTableBuilder.remove(index);
|
||||
clusterBlocksBuilder.removeIndexBlocks(index);
|
||||
metaDataBuilder.remove(index);
|
||||
}
|
||||
// wait for events from all nodes that it has been removed from their respective metadata...
|
||||
int count = currentState.nodes().size();
|
||||
// add the notifications that the store was deleted from *data* nodes
|
||||
count += currentState.nodes().dataNodes().size();
|
||||
final AtomicInteger counter = new AtomicInteger(count);
|
||||
final AtomicInteger counter = new AtomicInteger(count * indices.size());
|
||||
|
||||
// this listener will be notified once we get back a notification based on the cluster state change below.
|
||||
final NodeIndexDeletedAction.Listener nodeIndexDeleteListener = new NodeIndexDeletedAction.Listener() {
|
||||
@Override
|
||||
public void onNodeIndexDeleted(String index, String nodeId) {
|
||||
if (index.equals(request.index)) {
|
||||
public void onNodeIndexDeleted(String deleted, String nodeId) {
|
||||
if (indices.contains(deleted)) {
|
||||
if (counter.decrementAndGet() == 0) {
|
||||
listener.onResponse(new Response(true));
|
||||
nodeIndexDeletedAction.remove(this);
|
||||
|
@ -118,8 +118,8 @@ public class MetaDataDeleteIndexService extends AbstractComponent {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void onNodeIndexStoreDeleted(String index, String nodeId) {
|
||||
if (index.equals(request.index)) {
|
||||
public void onNodeIndexStoreDeleted(String deleted, String nodeId) {
|
||||
if (indices.contains(deleted)) {
|
||||
if (counter.decrementAndGet() == 0) {
|
||||
listener.onResponse(new Response(true));
|
||||
nodeIndexDeletedAction.remove(this);
|
||||
|
@ -128,15 +128,15 @@ public class MetaDataDeleteIndexService extends AbstractComponent {
|
|||
}
|
||||
};
|
||||
nodeIndexDeletedAction.add(nodeIndexDeleteListener);
|
||||
|
||||
listener.future = threadPool.schedule(request.timeout, ThreadPool.Names.SAME, new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
listener.onResponse(new Response(false));
|
||||
nodeIndexDeletedAction.remove(nodeIndexDeleteListener);
|
||||
}
|
||||
listener.future = threadPool.schedule(request.timeout, ThreadPool.Names.SAME, () -> {
|
||||
listener.onResponse(new Response(false));
|
||||
nodeIndexDeletedAction.remove(nodeIndexDeleteListener);
|
||||
});
|
||||
|
||||
MetaData newMetaData = metaDataBuilder.build();
|
||||
ClusterBlocks blocks = clusterBlocksBuilder.build();
|
||||
RoutingAllocation.Result routingResult = allocationService.reroute(
|
||||
ClusterState.builder(currentState).routingTable(routingTableBuilder.build()).metaData(newMetaData).build());
|
||||
return ClusterState.builder(currentState).routingResult(routingResult).metaData(newMetaData).blocks(blocks).build();
|
||||
}
|
||||
|
||||
|
@ -173,7 +173,6 @@ public class MetaDataDeleteIndexService extends AbstractComponent {
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
public interface Listener {
|
||||
|
||||
void onResponse(Response response);
|
||||
|
@ -183,13 +182,13 @@ public class MetaDataDeleteIndexService extends AbstractComponent {
|
|||
|
||||
public static class Request {
|
||||
|
||||
final String index;
|
||||
final String[] indices;
|
||||
|
||||
TimeValue timeout = TimeValue.timeValueSeconds(10);
|
||||
TimeValue masterTimeout = MasterNodeRequest.DEFAULT_MASTER_NODE_TIMEOUT;
|
||||
|
||||
public Request(String index) {
|
||||
this.index = index;
|
||||
public Request(String[] indices) {
|
||||
this.indices = indices;
|
||||
}
|
||||
|
||||
public Request timeout(TimeValue timeout) {
|
||||
|
|
Loading…
Reference in New Issue