diff --git a/src/main/java/org/elasticsearch/action/admin/indices/delete/TransportDeleteIndexAction.java b/src/main/java/org/elasticsearch/action/admin/indices/delete/TransportDeleteIndexAction.java index 4a248d9d352..4031dcfa8a7 100644 --- a/src/main/java/org/elasticsearch/action/admin/indices/delete/TransportDeleteIndexAction.java +++ b/src/main/java/org/elasticsearch/action/admin/indices/delete/TransportDeleteIndexAction.java @@ -109,14 +109,18 @@ public class TransportDeleteIndexAction extends TransportMasterNodeOperationActi deleteIndexService.deleteIndex(new MetaDataDeleteIndexService.Request(index).timeout(request.timeout()).masterTimeout(request.masterNodeTimeout()), new MetaDataDeleteIndexService.Listener() { private volatile Throwable lastFailure; + private volatile boolean ack = true; @Override public void onResponse(MetaDataDeleteIndexService.Response response) { + if (!response.acknowledged()) { + ack = false; + } if (count.decrementAndGet() == 0) { if (lastFailure != null) { listener.onFailure(lastFailure); } else { - listener.onResponse(new DeleteIndexResponse(response.acknowledged())); + listener.onResponse(new DeleteIndexResponse(ack)); } } } diff --git a/src/main/java/org/elasticsearch/cluster/action/index/NodeIndexDeletedAction.java b/src/main/java/org/elasticsearch/cluster/action/index/NodeIndexDeletedAction.java index ac334afc506..c35afcfce90 100644 --- a/src/main/java/org/elasticsearch/cluster/action/index/NodeIndexDeletedAction.java +++ b/src/main/java/org/elasticsearch/cluster/action/index/NodeIndexDeletedAction.java @@ -54,6 +54,7 @@ public class NodeIndexDeletedAction extends AbstractComponent { this.transportService = transportService; this.clusterService = clusterService; transportService.registerHandler(NodeIndexDeletedTransportHandler.ACTION, new NodeIndexDeletedTransportHandler()); + transportService.registerHandler(NodeIndexStoreDeletedTransportHandler.ACTION, new NodeIndexStoreDeletedTransportHandler()); } public void add(Listener listener) { @@ -79,14 +80,37 @@ public class NodeIndexDeletedAction extends AbstractComponent { } } + public void nodeIndexStoreDeleted(final String index, final String nodeId) throws ElasticSearchException { + DiscoveryNodes nodes = clusterService.state().nodes(); + if (nodes.localNodeMaster()) { + threadPool.generic().execute(new Runnable() { + @Override + public void run() { + innerNodeIndexStoreDeleted(index, nodeId); + } + }); + } else { + transportService.sendRequest(clusterService.state().nodes().masterNode(), + NodeIndexStoreDeletedTransportHandler.ACTION, new NodeIndexStoreDeletedMessage(index, nodeId), EmptyTransportResponseHandler.INSTANCE_SAME); + } + } + private void innerNodeIndexDeleted(String index, String nodeId) { for (Listener listener : listeners) { listener.onNodeIndexDeleted(index, nodeId); } } + private void innerNodeIndexStoreDeleted(String index, String nodeId) { + for (Listener listener : listeners) { + listener.onNodeIndexStoreDeleted(index, nodeId); + } + } + public static interface Listener { void onNodeIndexDeleted(String index, String nodeId); + + void onNodeIndexStoreDeleted(String index, String nodeId); } private class NodeIndexDeletedTransportHandler extends BaseTransportRequestHandler { @@ -110,6 +134,27 @@ public class NodeIndexDeletedAction extends AbstractComponent { } } + private class NodeIndexStoreDeletedTransportHandler extends BaseTransportRequestHandler { + + static final String ACTION = "cluster/nodeIndexStoreDeleted"; + + @Override + public NodeIndexStoreDeletedMessage newInstance() { + return new NodeIndexStoreDeletedMessage(); + } + + @Override + public void messageReceived(NodeIndexStoreDeletedMessage message, TransportChannel channel) throws Exception { + innerNodeIndexStoreDeleted(message.index, message.nodeId); + channel.sendResponse(TransportResponse.Empty.INSTANCE); + } + + @Override + public String executor() { + return ThreadPool.Names.SAME; + } + } + static class NodeIndexDeletedMessage extends TransportRequest { String index; @@ -137,4 +182,32 @@ public class NodeIndexDeletedAction extends AbstractComponent { nodeId = in.readString(); } } + + static class NodeIndexStoreDeletedMessage extends TransportRequest { + + String index; + String nodeId; + + NodeIndexStoreDeletedMessage() { + } + + NodeIndexStoreDeletedMessage(String index, String nodeId) { + this.index = index; + this.nodeId = nodeId; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(index); + out.writeString(nodeId); + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + index = in.readString(); + nodeId = in.readString(); + } + } } \ No newline at end of file diff --git a/src/main/java/org/elasticsearch/cluster/metadata/MetaDataDeleteIndexService.java b/src/main/java/org/elasticsearch/cluster/metadata/MetaDataDeleteIndexService.java index 7ea452ce7f3..6dce1049408 100644 --- a/src/main/java/org/elasticsearch/cluster/metadata/MetaDataDeleteIndexService.java +++ b/src/main/java/org/elasticsearch/cluster/metadata/MetaDataDeleteIndexService.java @@ -116,7 +116,7 @@ public class MetaDataDeleteIndexService extends AbstractComponent { } @Override - public ClusterState execute(ClusterState currentState) { + public ClusterState execute(final ClusterState currentState) { if (!currentState.metaData().hasConcreteIndex(request.index)) { throw new IndexMissingException(new Index(request.index)); } @@ -136,8 +136,11 @@ public class MetaDataDeleteIndexService extends AbstractComponent { ClusterBlocks blocks = ClusterBlocks.builder().blocks(currentState.blocks()).removeIndexBlocks(request.index).build(); - final AtomicInteger counter = new AtomicInteger(currentState.nodes().size()); - + // wait for events from all nodes that it has been removed from their respective metadata... + int count = currentState.nodes().size(); + // add the notifications that the store was deleted from *date* nodes + count += currentState.nodes().dataNodes().size(); + final AtomicInteger counter = new AtomicInteger(count); final NodeIndexDeletedAction.Listener nodeIndexDeleteListener = new NodeIndexDeletedAction.Listener() { @Override public void onNodeIndexDeleted(String index, String nodeId) { @@ -148,6 +151,16 @@ public class MetaDataDeleteIndexService extends AbstractComponent { } } } + + @Override + public void onNodeIndexStoreDeleted(String index, String nodeId) { + if (index.equals(request.index)) { + if (counter.decrementAndGet() == 0) { + listener.onResponse(new Response(true)); + nodeIndexDeletedAction.remove(this); + } + } + } }; nodeIndexDeletedAction.add(nodeIndexDeleteListener); diff --git a/src/main/java/org/elasticsearch/gateway/local/state/meta/LocalGatewayMetaState.java b/src/main/java/org/elasticsearch/gateway/local/state/meta/LocalGatewayMetaState.java index 2755d7c6e9b..7ac8a84618c 100644 --- a/src/main/java/org/elasticsearch/gateway/local/state/meta/LocalGatewayMetaState.java +++ b/src/main/java/org/elasticsearch/gateway/local/state/meta/LocalGatewayMetaState.java @@ -26,6 +26,7 @@ import org.elasticsearch.ElasticSearchIllegalArgumentException; import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterStateListener; +import org.elasticsearch.cluster.action.index.NodeIndexDeletedAction; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; @@ -98,6 +99,7 @@ public class LocalGatewayMetaState extends AbstractComponent implements ClusterS private final ThreadPool threadPool; private final LocalAllocateDangledIndices allocateDangledIndices; + private final NodeIndexDeletedAction nodeIndexDeletedAction; @Nullable private volatile MetaData currentMetaData; @@ -113,12 +115,14 @@ public class LocalGatewayMetaState extends AbstractComponent implements ClusterS @Inject public LocalGatewayMetaState(Settings settings, ThreadPool threadPool, NodeEnvironment nodeEnv, - TransportNodesListGatewayMetaState nodesListGatewayMetaState, LocalAllocateDangledIndices allocateDangledIndices) throws Exception { + TransportNodesListGatewayMetaState nodesListGatewayMetaState, LocalAllocateDangledIndices allocateDangledIndices, + NodeIndexDeletedAction nodeIndexDeletedAction) throws Exception { super(settings); this.nodeEnv = nodeEnv; this.threadPool = threadPool; this.format = XContentType.fromRestContentType(settings.get("format", "smile")); this.allocateDangledIndices = allocateDangledIndices; + this.nodeIndexDeletedAction = nodeIndexDeletedAction; nodesListGatewayMetaState.init(this); if (this.format == XContentType.SMILE) { @@ -221,6 +225,11 @@ public class LocalGatewayMetaState extends AbstractComponent implements ClusterS if (!newMetaData.hasIndex(current.index())) { logger.debug("[{}] deleting index that is no longer part of the metadata (indices: [{}])", current.index(), newMetaData.indices().keySet()); FileSystemUtils.deleteRecursively(nodeEnv.indexLocations(new Index(current.index()))); + try { + nodeIndexDeletedAction.nodeIndexStoreDeleted(current.index(), event.state().nodes().masterNodeId()); + } catch (Exception e) { + logger.debug("[{}] failed to notify master on local index store deletion", e, current.index()); + } } } } diff --git a/src/main/java/org/elasticsearch/gateway/none/NoneGateway.java b/src/main/java/org/elasticsearch/gateway/none/NoneGateway.java index 6ac06e82f44..dc11df6f1bd 100644 --- a/src/main/java/org/elasticsearch/gateway/none/NoneGateway.java +++ b/src/main/java/org/elasticsearch/gateway/none/NoneGateway.java @@ -20,25 +20,47 @@ package org.elasticsearch.gateway.none; import org.elasticsearch.ElasticSearchException; +import org.elasticsearch.cluster.ClusterChangedEvent; +import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateListener; +import org.elasticsearch.cluster.action.index.NodeIndexDeletedAction; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Module; +import org.elasticsearch.common.io.FileSystemUtils; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.gateway.Gateway; import org.elasticsearch.gateway.GatewayException; +import org.elasticsearch.index.Index; import org.elasticsearch.index.gateway.none.NoneIndexGatewayModule; /** * */ -public class NoneGateway extends AbstractLifecycleComponent implements Gateway { +public class NoneGateway extends AbstractLifecycleComponent implements Gateway, ClusterStateListener { public static final String TYPE = "none"; + private final ClusterService clusterService; + private final NodeEnvironment nodeEnv; + private final NodeIndexDeletedAction nodeIndexDeletedAction; + + @Nullable + private volatile MetaData currentMetaData; + @Inject - public NoneGateway(Settings settings) { + public NoneGateway(Settings settings, ClusterService clusterService, NodeEnvironment nodeEnv, NodeIndexDeletedAction nodeIndexDeletedAction) { super(settings); + this.clusterService = clusterService; + this.nodeEnv = nodeEnv; + this.nodeIndexDeletedAction = nodeIndexDeletedAction; + + clusterService.addLast(this); } @Override @@ -77,4 +99,36 @@ public class NoneGateway extends AbstractLifecycleComponent implements @Override public void reset() { } + + @Override + public void clusterChanged(ClusterChangedEvent event) { + if (event.state().blocks().disableStatePersistence()) { + // reset the current metadata, we need to start fresh... + this.currentMetaData = null; + return; + } + + MetaData newMetaData = event.state().metaData(); + + // delete indices that were there before, but are deleted now + // we need to do it so they won't be detected as dangling + if (nodeEnv.hasNodeFile()) { + if (currentMetaData != null) { + // only delete indices when we already received a state (currentMetaData != null) + for (IndexMetaData current : currentMetaData) { + if (!newMetaData.hasIndex(current.index())) { + logger.debug("[{}] deleting index that is no longer part of the metadata (indices: [{}])", current.index(), newMetaData.indices().keySet()); + FileSystemUtils.deleteRecursively(nodeEnv.indexLocations(new Index(current.index()))); + try { + nodeIndexDeletedAction.nodeIndexStoreDeleted(current.index(), event.state().nodes().masterNodeId()); + } catch (Exception e) { + logger.debug("[{}] failed to notify master on local index store deletion", e, current.index()); + } + } + } + } + } + + currentMetaData = newMetaData; + } } diff --git a/src/main/java/org/elasticsearch/gateway/shared/SharedStorageGateway.java b/src/main/java/org/elasticsearch/gateway/shared/SharedStorageGateway.java index 50d21592466..e2daf04a98f 100644 --- a/src/main/java/org/elasticsearch/gateway/shared/SharedStorageGateway.java +++ b/src/main/java/org/elasticsearch/gateway/shared/SharedStorageGateway.java @@ -19,12 +19,14 @@ package org.elasticsearch.gateway.shared; +import com.google.common.collect.Sets; import org.elasticsearch.ElasticSearchException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateListener; +import org.elasticsearch.cluster.action.index.NodeIndexDeletedAction; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.common.StopWatch; @@ -38,6 +40,7 @@ import org.elasticsearch.gateway.GatewayException; import org.elasticsearch.index.Index; import org.elasticsearch.threadpool.ThreadPool; +import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; @@ -59,6 +62,8 @@ public abstract class SharedStorageGateway extends AbstractLifecycleComponent indicesDeleted = Sets.newHashSet(); if (event.localNodeMaster()) { logger.debug("writing to gateway {} ...", this); StopWatch stopWatch = new StopWatch().start(); @@ -149,6 +161,7 @@ public abstract class SharedStorageGateway extends AbstractLifecycleComponent