improve update of mappings, and delete index process with the new optimization of when index is created
This commit is contained in:
parent
0b593bd6d9
commit
c37a0afbf0
|
@ -21,18 +21,29 @@ package org.elasticsearch.cluster.metadata;
|
|||
|
||||
import org.elasticsearch.cluster.ClusterService;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.ProcessedClusterStateUpdateTask;
|
||||
import org.elasticsearch.cluster.ClusterStateUpdateTask;
|
||||
import org.elasticsearch.cluster.action.index.NodeIndexDeletedAction;
|
||||
import org.elasticsearch.cluster.block.ClusterBlocks;
|
||||
import org.elasticsearch.cluster.routing.IndexRoutingTable;
|
||||
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
|
||||
import org.elasticsearch.cluster.routing.RoutingTable;
|
||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
|
||||
import org.elasticsearch.cluster.routing.allocation.ShardsAllocation;
|
||||
import org.elasticsearch.common.collect.Sets;
|
||||
import org.elasticsearch.common.component.AbstractComponent;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.timer.Timeout;
|
||||
import org.elasticsearch.common.timer.TimerTask;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.indices.IndexMissingException;
|
||||
import org.elasticsearch.timer.TimerService;
|
||||
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import static org.elasticsearch.cluster.ClusterState.*;
|
||||
import static org.elasticsearch.cluster.metadata.MetaData.*;
|
||||
|
@ -42,19 +53,27 @@ import static org.elasticsearch.cluster.metadata.MetaData.*;
|
|||
*/
|
||||
public class MetaDataDeleteIndexService extends AbstractComponent {
|
||||
|
||||
private final TimerService timerService;
|
||||
|
||||
private final ClusterService clusterService;
|
||||
|
||||
private final ShardsAllocation shardsAllocation;
|
||||
|
||||
@Inject public MetaDataDeleteIndexService(Settings settings, ClusterService clusterService, ShardsAllocation shardsAllocation) {
|
||||
private final NodeIndexDeletedAction nodeIndexDeletedAction;
|
||||
|
||||
@Inject public MetaDataDeleteIndexService(Settings settings, TimerService timerService, ClusterService clusterService, ShardsAllocation shardsAllocation,
|
||||
NodeIndexDeletedAction nodeIndexDeletedAction) {
|
||||
super(settings);
|
||||
this.timerService = timerService;
|
||||
this.clusterService = clusterService;
|
||||
this.shardsAllocation = shardsAllocation;
|
||||
this.nodeIndexDeletedAction = nodeIndexDeletedAction;
|
||||
}
|
||||
|
||||
public void deleteIndex(final Request request, final Listener listener) {
|
||||
clusterService.submitStateUpdateTask("delete-index [" + request.index + "]", new ProcessedClusterStateUpdateTask() {
|
||||
public void deleteIndex(final Request request, final Listener userListener) {
|
||||
clusterService.submitStateUpdateTask("delete-index [" + request.index + "]", new ClusterStateUpdateTask() {
|
||||
@Override public ClusterState execute(ClusterState currentState) {
|
||||
final DeleteIndexListener listener = new DeleteIndexListener(request, userListener);
|
||||
try {
|
||||
RoutingTable routingTable = currentState.routingTable();
|
||||
if (!routingTable.hasIndex(request.index)) {
|
||||
|
@ -80,19 +99,85 @@ public class MetaDataDeleteIndexService extends AbstractComponent {
|
|||
|
||||
ClusterBlocks blocks = ClusterBlocks.builder().blocks(currentState.blocks()).removeIndexBlocks(request.index).build();
|
||||
|
||||
// initialize the counter only for nodes the shards are allocated to
|
||||
Set<String> allocatedNodes = Sets.newHashSet();
|
||||
for (IndexShardRoutingTable indexShardRoutingTable : currentState.routingTable().index(request.index)) {
|
||||
for (ShardRouting shardRouting : indexShardRoutingTable) {
|
||||
if (shardRouting.currentNodeId() != null) {
|
||||
allocatedNodes.add(shardRouting.currentNodeId());
|
||||
}
|
||||
if (shardRouting.relocatingNodeId() != null) {
|
||||
allocatedNodes.add(shardRouting.relocatingNodeId());
|
||||
}
|
||||
}
|
||||
}
|
||||
final AtomicInteger counter = new AtomicInteger(allocatedNodes.size());
|
||||
|
||||
final NodeIndexDeletedAction.Listener nodeIndexDeleteListener = new NodeIndexDeletedAction.Listener() {
|
||||
@Override public void onNodeIndexDeleted(String index, String nodeId) {
|
||||
if (index.equals(request.index)) {
|
||||
if (counter.decrementAndGet() == 0) {
|
||||
listener.onResponse(new Response(true));
|
||||
nodeIndexDeletedAction.remove(this);
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
nodeIndexDeletedAction.add(nodeIndexDeleteListener);
|
||||
|
||||
Timeout timeoutTask = timerService.newTimeout(new TimerTask() {
|
||||
@Override public void run(Timeout timeout) throws Exception {
|
||||
listener.onResponse(new Response(false));
|
||||
nodeIndexDeletedAction.remove(nodeIndexDeleteListener);
|
||||
}
|
||||
}, request.timeout, TimerService.ExecutionType.THREADED);
|
||||
listener.timeout = timeoutTask;
|
||||
|
||||
|
||||
return newClusterStateBuilder().state(currentState).routingResult(routingResult).metaData(newMetaData).blocks(blocks).build();
|
||||
} catch (Exception e) {
|
||||
listener.onFailure(e);
|
||||
return currentState;
|
||||
}
|
||||
}
|
||||
|
||||
@Override public void clusterStateProcessed(ClusterState clusterState) {
|
||||
listener.onResponse(new Response(true));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
class DeleteIndexListener implements Listener {
|
||||
|
||||
private AtomicBoolean notified = new AtomicBoolean();
|
||||
|
||||
private final Request request;
|
||||
|
||||
private final Listener listener;
|
||||
|
||||
volatile Timeout timeout;
|
||||
|
||||
private DeleteIndexListener(Request request, Listener listener) {
|
||||
this.request = request;
|
||||
this.listener = listener;
|
||||
}
|
||||
|
||||
@Override public void onResponse(final Response response) {
|
||||
if (notified.compareAndSet(false, true)) {
|
||||
if (timeout != null) {
|
||||
timeout.cancel();
|
||||
}
|
||||
listener.onResponse(response);
|
||||
}
|
||||
}
|
||||
|
||||
@Override public void onFailure(Throwable t) {
|
||||
if (notified.compareAndSet(false, true)) {
|
||||
if (timeout != null) {
|
||||
timeout.cancel();
|
||||
}
|
||||
listener.onFailure(t);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public static interface Listener {
|
||||
|
||||
void onResponse(Response response);
|
||||
|
|
|
@ -69,32 +69,47 @@ public class MetaDataMappingService extends AbstractComponent {
|
|||
public void updateMapping(final String index, final String type, final String mappingSource) {
|
||||
clusterService.submitStateUpdateTask("update-mapping [" + index + "][" + type + "]", new ClusterStateUpdateTask() {
|
||||
@Override public ClusterState execute(ClusterState currentState) {
|
||||
MapperService mapperService = indicesService.indexServiceSafe(index).mapperService();
|
||||
|
||||
DocumentMapper existingMapper = mapperService.documentMapper(type);
|
||||
// parse the updated one
|
||||
DocumentMapper updatedMapper = mapperService.parse(type, mappingSource);
|
||||
if (existingMapper == null) {
|
||||
existingMapper = updatedMapper;
|
||||
} else {
|
||||
// merge from the updated into the existing, ignore conflicts (we know we have them, we just want the new ones)
|
||||
existingMapper.merge(updatedMapper, mergeFlags().simulate(false));
|
||||
}
|
||||
// build the updated mapping source
|
||||
if (logger.isDebugEnabled()) {
|
||||
try {
|
||||
logger.debug("[{}] update_mapping [{}] (dynamic) with source [{}]", index, type, existingMapper.mappingSource().string());
|
||||
} catch (IOException e) {
|
||||
// ignore
|
||||
try {
|
||||
IndexService indexService = indicesService.indexService(index);
|
||||
if (indexService == null) {
|
||||
// we need to create the index here, and add the current mapping to it, so we can merge
|
||||
final IndexMetaData indexMetaData = currentState.metaData().index(index);
|
||||
indexService = indicesService.createIndex(indexMetaData.index(), indexMetaData.settings(), currentState.nodes().localNode().id());
|
||||
// only add the current relevant mapping (if exists)
|
||||
if (indexMetaData.mappings().containsKey(type)) {
|
||||
indexService.mapperService().add(type, indexMetaData.mappings().get(type).string());
|
||||
}
|
||||
}
|
||||
} else if (logger.isInfoEnabled()) {
|
||||
logger.info("[{}] update_mapping [{}] (dynamic)", index, type);
|
||||
}
|
||||
MapperService mapperService = indexService.mapperService();
|
||||
|
||||
MetaData.Builder builder = newMetaDataBuilder().metaData(currentState.metaData());
|
||||
IndexMetaData indexMetaData = currentState.metaData().index(index);
|
||||
builder.put(newIndexMetaDataBuilder(indexMetaData).putMapping(type, existingMapper.mappingSource()));
|
||||
return newClusterStateBuilder().state(currentState).metaData(builder).build();
|
||||
DocumentMapper existingMapper = mapperService.documentMapper(type);
|
||||
// parse the updated one
|
||||
DocumentMapper updatedMapper = mapperService.parse(type, mappingSource);
|
||||
if (existingMapper == null) {
|
||||
existingMapper = updatedMapper;
|
||||
} else {
|
||||
// merge from the updated into the existing, ignore conflicts (we know we have them, we just want the new ones)
|
||||
existingMapper.merge(updatedMapper, mergeFlags().simulate(false));
|
||||
}
|
||||
// build the updated mapping source
|
||||
if (logger.isDebugEnabled()) {
|
||||
try {
|
||||
logger.debug("[{}] update_mapping [{}] (dynamic) with source [{}]", index, type, existingMapper.mappingSource().string());
|
||||
} catch (IOException e) {
|
||||
// ignore
|
||||
}
|
||||
} else if (logger.isInfoEnabled()) {
|
||||
logger.info("[{}] update_mapping [{}] (dynamic)", index, type);
|
||||
}
|
||||
|
||||
MetaData.Builder builder = newMetaDataBuilder().metaData(currentState.metaData());
|
||||
IndexMetaData indexMetaData = currentState.metaData().index(index);
|
||||
builder.put(newIndexMetaDataBuilder(indexMetaData).putMapping(type, existingMapper.mappingSource()));
|
||||
return newClusterStateBuilder().state(currentState).metaData(builder).build();
|
||||
} catch (Exception e) {
|
||||
logger.warn("failed to dynamically update the mapping in cluster_state from shard", e);
|
||||
return currentState;
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
@ -143,10 +158,9 @@ public class MetaDataMappingService extends AbstractComponent {
|
|||
}
|
||||
final IndexMetaData indexMetaData = currentState.metaData().index(index);
|
||||
IndexService indexService = indicesService.createIndex(indexMetaData.index(), indexMetaData.settings(), currentState.nodes().localNode().id());
|
||||
for (Map.Entry<String, CompressedString> mapping : indexMetaData.mappings().entrySet()) {
|
||||
if (!indexService.mapperService().hasMapping(mapping.getKey())) {
|
||||
indexService.mapperService().add(mapping.getKey(), mapping.getValue().string());
|
||||
}
|
||||
// only add the current relevant mapping (if exists)
|
||||
if (indexMetaData.mappings().containsKey(request.mappingType)) {
|
||||
indexService.mapperService().add(request.mappingType, indexMetaData.mappings().get(request.mappingType).string());
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue