River name re-use not possible between node shutdowns, closes #1921.

This effectively happens because we don't flush before deleting a mapping, which causes it to still reply the changes happening for it on the transaction log, meaning the mapping are still around on the index level.
This commit is contained in:
Shay Banon 2012-06-23 20:26:27 +02:00
parent 6fb836c25e
commit ded5b773da
1 changed files with 24 additions and 8 deletions

View File

@ -21,6 +21,8 @@ package org.elasticsearch.action.admin.indices.mapping.delete;
import org.elasticsearch.ElasticSearchException; import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.flush.FlushResponse;
import org.elasticsearch.action.admin.indices.flush.TransportFlushAction;
import org.elasticsearch.action.admin.indices.refresh.RefreshResponse; import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
import org.elasticsearch.action.admin.indices.refresh.TransportRefreshAction; import org.elasticsearch.action.admin.indices.refresh.TransportRefreshAction;
import org.elasticsearch.action.deletebyquery.DeleteByQueryResponse; import org.elasticsearch.action.deletebyquery.DeleteByQueryResponse;
@ -49,6 +51,8 @@ public class TransportDeleteMappingAction extends TransportMasterNodeOperationAc
private final MetaDataMappingService metaDataMappingService; private final MetaDataMappingService metaDataMappingService;
private final TransportFlushAction flushAction;
private final TransportDeleteByQueryAction deleteByQueryAction; private final TransportDeleteByQueryAction deleteByQueryAction;
private final TransportRefreshAction refreshAction; private final TransportRefreshAction refreshAction;
@ -56,11 +60,12 @@ public class TransportDeleteMappingAction extends TransportMasterNodeOperationAc
@Inject @Inject
public TransportDeleteMappingAction(Settings settings, TransportService transportService, ClusterService clusterService, public TransportDeleteMappingAction(Settings settings, TransportService transportService, ClusterService clusterService,
ThreadPool threadPool, MetaDataMappingService metaDataMappingService, ThreadPool threadPool, MetaDataMappingService metaDataMappingService,
TransportDeleteByQueryAction deleteByQueryAction, TransportRefreshAction refreshAction) { TransportDeleteByQueryAction deleteByQueryAction, TransportRefreshAction refreshAction, TransportFlushAction flushAction) {
super(settings, transportService, clusterService, threadPool); super(settings, transportService, clusterService, threadPool);
this.metaDataMappingService = metaDataMappingService; this.metaDataMappingService = metaDataMappingService;
this.deleteByQueryAction = deleteByQueryAction; this.deleteByQueryAction = deleteByQueryAction;
this.refreshAction = refreshAction; this.refreshAction = refreshAction;
this.flushAction = flushAction;
} }
@Override @Override
@ -100,19 +105,30 @@ public class TransportDeleteMappingAction extends TransportMasterNodeOperationAc
final AtomicReference<Throwable> failureRef = new AtomicReference<Throwable>(); final AtomicReference<Throwable> failureRef = new AtomicReference<Throwable>();
final CountDownLatch latch = new CountDownLatch(1); final CountDownLatch latch = new CountDownLatch(1);
deleteByQueryAction.execute(Requests.deleteByQueryRequest(request.indices()).query(QueryBuilders.filteredQuery(QueryBuilders.matchAllQuery(), FilterBuilders.typeFilter(request.type()))), new ActionListener<DeleteByQueryResponse>() { flushAction.execute(Requests.flushRequest(request.indices()), new ActionListener<FlushResponse>() {
@Override @Override
public void onResponse(DeleteByQueryResponse deleteByQueryResponse) { public void onResponse(FlushResponse flushResponse) {
refreshAction.execute(Requests.refreshRequest(request.indices()), new ActionListener<RefreshResponse>() { deleteByQueryAction.execute(Requests.deleteByQueryRequest(request.indices()).query(QueryBuilders.filteredQuery(QueryBuilders.matchAllQuery(), FilterBuilders.typeFilter(request.type()))), new ActionListener<DeleteByQueryResponse>() {
@Override @Override
public void onResponse(RefreshResponse refreshResponse) { public void onResponse(DeleteByQueryResponse deleteByQueryResponse) {
metaDataMappingService.removeMapping(new MetaDataMappingService.RemoveRequest(request.indices(), request.type())); refreshAction.execute(Requests.refreshRequest(request.indices()), new ActionListener<RefreshResponse>() {
latch.countDown(); @Override
public void onResponse(RefreshResponse refreshResponse) {
metaDataMappingService.removeMapping(new MetaDataMappingService.RemoveRequest(request.indices(), request.type()));
latch.countDown();
}
@Override
public void onFailure(Throwable e) {
metaDataMappingService.removeMapping(new MetaDataMappingService.RemoveRequest(request.indices(), request.type()));
latch.countDown();
}
});
} }
@Override @Override
public void onFailure(Throwable e) { public void onFailure(Throwable e) {
metaDataMappingService.removeMapping(new MetaDataMappingService.RemoveRequest(request.indices(), request.type())); failureRef.set(e);
latch.countDown(); latch.countDown();
} }
}); });