Delete API ack to wait also for actual deletion of shards from disk

closes #3413
This commit is contained in:
Shay Banon 2013-07-30 18:58:02 +02:00
parent 61036390e2
commit 0e3c67cbf8
6 changed files with 182 additions and 7 deletions

View File

@ -109,14 +109,18 @@ public class TransportDeleteIndexAction extends TransportMasterNodeOperationActi
deleteIndexService.deleteIndex(new MetaDataDeleteIndexService.Request(index).timeout(request.timeout()).masterTimeout(request.masterNodeTimeout()), new MetaDataDeleteIndexService.Listener() {
private volatile Throwable lastFailure;
private volatile boolean ack = true;
@Override
public void onResponse(MetaDataDeleteIndexService.Response response) {
if (!response.acknowledged()) {
ack = false;
}
if (count.decrementAndGet() == 0) {
if (lastFailure != null) {
listener.onFailure(lastFailure);
} else {
listener.onResponse(new DeleteIndexResponse(response.acknowledged()));
listener.onResponse(new DeleteIndexResponse(ack));
}
}
}

View File

@ -54,6 +54,7 @@ public class NodeIndexDeletedAction extends AbstractComponent {
this.transportService = transportService;
this.clusterService = clusterService;
transportService.registerHandler(NodeIndexDeletedTransportHandler.ACTION, new NodeIndexDeletedTransportHandler());
transportService.registerHandler(NodeIndexStoreDeletedTransportHandler.ACTION, new NodeIndexStoreDeletedTransportHandler());
}
public void add(Listener listener) {
@ -79,14 +80,37 @@ public class NodeIndexDeletedAction extends AbstractComponent {
}
}
public void nodeIndexStoreDeleted(final String index, final String nodeId) throws ElasticSearchException {
DiscoveryNodes nodes = clusterService.state().nodes();
if (nodes.localNodeMaster()) {
threadPool.generic().execute(new Runnable() {
@Override
public void run() {
innerNodeIndexStoreDeleted(index, nodeId);
}
});
} else {
transportService.sendRequest(clusterService.state().nodes().masterNode(),
NodeIndexStoreDeletedTransportHandler.ACTION, new NodeIndexStoreDeletedMessage(index, nodeId), EmptyTransportResponseHandler.INSTANCE_SAME);
}
}
private void innerNodeIndexDeleted(String index, String nodeId) {
for (Listener listener : listeners) {
listener.onNodeIndexDeleted(index, nodeId);
}
}
private void innerNodeIndexStoreDeleted(String index, String nodeId) {
for (Listener listener : listeners) {
listener.onNodeIndexStoreDeleted(index, nodeId);
}
}
public static interface Listener {
void onNodeIndexDeleted(String index, String nodeId);
void onNodeIndexStoreDeleted(String index, String nodeId);
}
private class NodeIndexDeletedTransportHandler extends BaseTransportRequestHandler<NodeIndexDeletedMessage> {
@ -110,6 +134,27 @@ public class NodeIndexDeletedAction extends AbstractComponent {
}
}
private class NodeIndexStoreDeletedTransportHandler extends BaseTransportRequestHandler<NodeIndexStoreDeletedMessage> {
static final String ACTION = "cluster/nodeIndexStoreDeleted";
@Override
public NodeIndexStoreDeletedMessage newInstance() {
return new NodeIndexStoreDeletedMessage();
}
@Override
public void messageReceived(NodeIndexStoreDeletedMessage message, TransportChannel channel) throws Exception {
innerNodeIndexStoreDeleted(message.index, message.nodeId);
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}
@Override
public String executor() {
return ThreadPool.Names.SAME;
}
}
static class NodeIndexDeletedMessage extends TransportRequest {
String index;
@ -137,4 +182,32 @@ public class NodeIndexDeletedAction extends AbstractComponent {
nodeId = in.readString();
}
}
static class NodeIndexStoreDeletedMessage extends TransportRequest {
String index;
String nodeId;
NodeIndexStoreDeletedMessage() {
}
NodeIndexStoreDeletedMessage(String index, String nodeId) {
this.index = index;
this.nodeId = nodeId;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(index);
out.writeString(nodeId);
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
index = in.readString();
nodeId = in.readString();
}
}
}

View File

@ -116,7 +116,7 @@ public class MetaDataDeleteIndexService extends AbstractComponent {
}
@Override
public ClusterState execute(ClusterState currentState) {
public ClusterState execute(final ClusterState currentState) {
if (!currentState.metaData().hasConcreteIndex(request.index)) {
throw new IndexMissingException(new Index(request.index));
}
@ -136,8 +136,11 @@ public class MetaDataDeleteIndexService extends AbstractComponent {
ClusterBlocks blocks = ClusterBlocks.builder().blocks(currentState.blocks()).removeIndexBlocks(request.index).build();
final AtomicInteger counter = new AtomicInteger(currentState.nodes().size());
// wait for events from all nodes that it has been removed from their respective metadata...
int count = currentState.nodes().size();
// add the notifications that the store was deleted from *date* nodes
count += currentState.nodes().dataNodes().size();
final AtomicInteger counter = new AtomicInteger(count);
final NodeIndexDeletedAction.Listener nodeIndexDeleteListener = new NodeIndexDeletedAction.Listener() {
@Override
public void onNodeIndexDeleted(String index, String nodeId) {
@ -148,6 +151,16 @@ public class MetaDataDeleteIndexService extends AbstractComponent {
}
}
}
@Override
public void onNodeIndexStoreDeleted(String index, String nodeId) {
if (index.equals(request.index)) {
if (counter.decrementAndGet() == 0) {
listener.onResponse(new Response(true));
nodeIndexDeletedAction.remove(this);
}
}
}
};
nodeIndexDeletedAction.add(nodeIndexDeleteListener);

View File

@ -26,6 +26,7 @@ import org.elasticsearch.ElasticSearchIllegalArgumentException;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.action.index.NodeIndexDeletedAction;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
@ -98,6 +99,7 @@ public class LocalGatewayMetaState extends AbstractComponent implements ClusterS
private final ThreadPool threadPool;
private final LocalAllocateDangledIndices allocateDangledIndices;
private final NodeIndexDeletedAction nodeIndexDeletedAction;
@Nullable
private volatile MetaData currentMetaData;
@ -113,12 +115,14 @@ public class LocalGatewayMetaState extends AbstractComponent implements ClusterS
@Inject
public LocalGatewayMetaState(Settings settings, ThreadPool threadPool, NodeEnvironment nodeEnv,
TransportNodesListGatewayMetaState nodesListGatewayMetaState, LocalAllocateDangledIndices allocateDangledIndices) throws Exception {
TransportNodesListGatewayMetaState nodesListGatewayMetaState, LocalAllocateDangledIndices allocateDangledIndices,
NodeIndexDeletedAction nodeIndexDeletedAction) throws Exception {
super(settings);
this.nodeEnv = nodeEnv;
this.threadPool = threadPool;
this.format = XContentType.fromRestContentType(settings.get("format", "smile"));
this.allocateDangledIndices = allocateDangledIndices;
this.nodeIndexDeletedAction = nodeIndexDeletedAction;
nodesListGatewayMetaState.init(this);
if (this.format == XContentType.SMILE) {
@ -221,6 +225,11 @@ public class LocalGatewayMetaState extends AbstractComponent implements ClusterS
if (!newMetaData.hasIndex(current.index())) {
logger.debug("[{}] deleting index that is no longer part of the metadata (indices: [{}])", current.index(), newMetaData.indices().keySet());
FileSystemUtils.deleteRecursively(nodeEnv.indexLocations(new Index(current.index())));
try {
nodeIndexDeletedAction.nodeIndexStoreDeleted(current.index(), event.state().nodes().masterNodeId());
} catch (Exception e) {
logger.debug("[{}] failed to notify master on local index store deletion", e, current.index());
}
}
}
}

View File

@ -20,25 +20,47 @@
package org.elasticsearch.gateway.none;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.action.index.NodeIndexDeletedAction;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.inject.Module;
import org.elasticsearch.common.io.FileSystemUtils;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.gateway.Gateway;
import org.elasticsearch.gateway.GatewayException;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.gateway.none.NoneIndexGatewayModule;
/**
*
*/
public class NoneGateway extends AbstractLifecycleComponent<Gateway> implements Gateway {
public class NoneGateway extends AbstractLifecycleComponent<Gateway> implements Gateway, ClusterStateListener {
public static final String TYPE = "none";
private final ClusterService clusterService;
private final NodeEnvironment nodeEnv;
private final NodeIndexDeletedAction nodeIndexDeletedAction;
@Nullable
private volatile MetaData currentMetaData;
@Inject
public NoneGateway(Settings settings) {
public NoneGateway(Settings settings, ClusterService clusterService, NodeEnvironment nodeEnv, NodeIndexDeletedAction nodeIndexDeletedAction) {
super(settings);
this.clusterService = clusterService;
this.nodeEnv = nodeEnv;
this.nodeIndexDeletedAction = nodeIndexDeletedAction;
clusterService.addLast(this);
}
@Override
@ -77,4 +99,36 @@ public class NoneGateway extends AbstractLifecycleComponent<Gateway> implements
@Override
public void reset() {
}
@Override
public void clusterChanged(ClusterChangedEvent event) {
if (event.state().blocks().disableStatePersistence()) {
// reset the current metadata, we need to start fresh...
this.currentMetaData = null;
return;
}
MetaData newMetaData = event.state().metaData();
// delete indices that were there before, but are deleted now
// we need to do it so they won't be detected as dangling
if (nodeEnv.hasNodeFile()) {
if (currentMetaData != null) {
// only delete indices when we already received a state (currentMetaData != null)
for (IndexMetaData current : currentMetaData) {
if (!newMetaData.hasIndex(current.index())) {
logger.debug("[{}] deleting index that is no longer part of the metadata (indices: [{}])", current.index(), newMetaData.indices().keySet());
FileSystemUtils.deleteRecursively(nodeEnv.indexLocations(new Index(current.index())));
try {
nodeIndexDeletedAction.nodeIndexStoreDeleted(current.index(), event.state().nodes().masterNodeId());
} catch (Exception e) {
logger.debug("[{}] failed to notify master on local index store deletion", e, current.index());
}
}
}
}
}
currentMetaData = newMetaData;
}
}

View File

@ -19,12 +19,14 @@
package org.elasticsearch.gateway.shared;
import com.google.common.collect.Sets;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.action.index.NodeIndexDeletedAction;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.common.StopWatch;
@ -38,6 +40,7 @@ import org.elasticsearch.gateway.GatewayException;
import org.elasticsearch.index.Index;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
@ -59,6 +62,8 @@ public abstract class SharedStorageGateway extends AbstractLifecycleComponent<Ga
private NodeEnvironment nodeEnv;
private NodeIndexDeletedAction nodeIndexDeletedAction;
public SharedStorageGateway(Settings settings, ThreadPool threadPool, ClusterService clusterService) {
super(settings);
this.threadPool = threadPool;
@ -73,6 +78,12 @@ public abstract class SharedStorageGateway extends AbstractLifecycleComponent<Ga
this.nodeEnv = nodeEnv;
}
// here as setter injection not to break backward comp. with extensions of this class..
@Inject
public void setNodeIndexDeletedAction(NodeIndexDeletedAction nodeIndexDeletedAction) {
this.nodeIndexDeletedAction = nodeIndexDeletedAction;
}
@Override
protected void doStart() throws ElasticSearchException {
}
@ -135,6 +146,7 @@ public abstract class SharedStorageGateway extends AbstractLifecycleComponent<Ga
writeStateExecutor.execute(new Runnable() {
@Override
public void run() {
Set<String> indicesDeleted = Sets.newHashSet();
if (event.localNodeMaster()) {
logger.debug("writing to gateway {} ...", this);
StopWatch stopWatch = new StopWatch().start();
@ -149,6 +161,7 @@ public abstract class SharedStorageGateway extends AbstractLifecycleComponent<Ga
for (IndexMetaData current : currentMetaData) {
if (!event.state().metaData().hasIndex(current.index())) {
delete(current);
indicesDeleted.add(current.index());
}
}
}
@ -158,11 +171,20 @@ public abstract class SharedStorageGateway extends AbstractLifecycleComponent<Ga
for (IndexMetaData current : currentMetaData) {
if (!event.state().metaData().hasIndex(current.index())) {
FileSystemUtils.deleteRecursively(nodeEnv.indexLocations(new Index(current.index())));
indicesDeleted.add(current.index());
}
}
}
}
currentMetaData = event.state().metaData();
for (String indexDeleted : indicesDeleted) {
try {
nodeIndexDeletedAction.nodeIndexStoreDeleted(indexDeleted, event.state().nodes().masterNodeId());
} catch (Exception e) {
logger.debug("[{}] failed to notify master on local index store deletion", e, indexDeleted);
}
}
}
});
}