River: When deleting a river, make sure when its closed that its data is deleted, closes #1534.

This commit is contained in:
Shay Banon 2011-12-12 01:09:46 +02:00
parent de861d6f43
commit b0b379dc88
3 changed files with 46 additions and 5 deletions

View File

@ -235,14 +235,24 @@ public class MetaDataMappingService extends AbstractComponent {
throw new IndexMissingException(new Index("_all"));
}
logger.info("[{}] remove_mapping [{}]", request.indices, request.mappingType);
MetaData.Builder builder = newMetaDataBuilder().metaData(currentState.metaData());
boolean changed = false;
for (String indexName : request.indices) {
if (currentState.metaData().hasIndex(indexName)) {
builder.put(newIndexMetaDataBuilder(currentState.metaData().index(indexName)).removeMapping(request.mappingType));
IndexMetaData indexMetaData = currentState.metaData().index(indexName);
if (indexMetaData != null) {
if (indexMetaData.mappings().containsKey(request.mappingType)) {
builder.put(newIndexMetaDataBuilder(indexMetaData).removeMapping(request.mappingType));
changed = true;
}
}
}
if (!changed) {
return currentState;
}
logger.info("[{}] remove_mapping [{}]", request.indices, request.mappingType);
return ClusterState.builder().state(currentState).metaData(builder).build();
}

View File

@ -27,6 +27,7 @@ import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.NoShardAvailableActionException;
import org.elasticsearch.action.WriteConsistencyLevel;
import org.elasticsearch.action.admin.indices.mapping.delete.DeleteMappingResponse;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterService;
@ -208,11 +209,42 @@ public class RiversService extends AbstractLifecycleComponent<RiversService> {
RiverClusterState state = event.state();
// first, go over and delete ones that either don't exists or are not allocated
for (RiverName riverName : rivers.keySet()) {
for (final RiverName riverName : rivers.keySet()) {
RiverRouting routing = state.routing().routing(riverName);
if (routing == null || !localNode.equals(routing.node())) {
// not routed at all, and not allocated here, clean it (we delete the relevant ones before)
closeRiver(riverName);
// also, double check and delete the river content if it was deleted (_meta does not exists)
try {
client.prepareGet(riverIndexName, riverName.name(), "_meta").setListenerThreaded(true).execute(new ActionListener<GetResponse>() {
@Override
public void onResponse(GetResponse getResponse) {
if (!getResponse.exists()) {
// verify the river is deleted
client.admin().indices().prepareDeleteMapping(riverIndexName).setType(riverName.name()).execute(new ActionListener<DeleteMappingResponse>() {
@Override
public void onResponse(DeleteMappingResponse deleteMappingResponse) {
// all is well...
}
@Override
public void onFailure(Throwable e) {
logger.debug("failed to (double) delete river [{}] content", e, riverName.name());
}
});
}
}
@Override
public void onFailure(Throwable e) {
logger.debug("failed to (double) delete river [{}] content", e, riverName.name());
}
});
} catch (IndexMissingException e) {
// all is well, the _river index was deleted
} catch (Exception e) {
logger.warn("unexpected failure when trying to verify river [{}] deleted", e, riverName.name());
}
}
}

View File

@ -106,7 +106,6 @@ public class RiversRouter extends AbstractLifecycleComponent<RiversRouter> imple
if (!currentState.routing().hasRiverByName(mappingType)) {
// no river, we need to add it to the routing with no node allocation
try {
client.admin().indices().prepareRefresh(riverIndexName).execute().actionGet();
GetResponse getResponse = client.prepareGet(riverIndexName, mappingType, "_meta").execute().actionGet();
if (getResponse.exists()) {
String riverType = XContentMapValues.nodeStringValue(getResponse.sourceAsMap().get("type"), null);