Automatic index creation can still cause "index missing" failures, closes #1199.
This commit is contained in:
parent
598370b6c7
commit
4a0b010d02
|
@ -20,6 +20,10 @@
|
|||
package org.elasticsearch.cluster;
|
||||
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
import org.elasticsearch.common.collect.ImmutableList;
|
||||
import org.elasticsearch.common.collect.Lists;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* @author kimchy (shay.banon)
|
||||
|
@ -70,6 +74,50 @@ public class ClusterChangedEvent {
|
|||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the indices created in this event
|
||||
*/
|
||||
public List<String> indicesCreated() {
|
||||
if (previousState == null) {
|
||||
return Lists.newArrayList(state.metaData().indices().keySet());
|
||||
}
|
||||
if (!metaDataChanged()) {
|
||||
return ImmutableList.of();
|
||||
}
|
||||
List<String> created = null;
|
||||
for (String index : state.metaData().indices().keySet()) {
|
||||
if (!previousState.metaData().hasIndex(index)) {
|
||||
if (created == null) {
|
||||
created = Lists.newArrayList();
|
||||
}
|
||||
created.add(index);
|
||||
}
|
||||
}
|
||||
return created == null ? ImmutableList.<String>of() : created;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the indices deleted in this event
|
||||
*/
|
||||
public List<String> indicesDeleted() {
|
||||
if (previousState == null) {
|
||||
return ImmutableList.of();
|
||||
}
|
||||
if (!metaDataChanged()) {
|
||||
return ImmutableList.of();
|
||||
}
|
||||
List<String> deleted = null;
|
||||
for (String index : previousState.metaData().indices().keySet()) {
|
||||
if (!state.metaData().hasIndex(index)) {
|
||||
if (deleted == null) {
|
||||
deleted = Lists.newArrayList();
|
||||
}
|
||||
deleted.add(index);
|
||||
}
|
||||
}
|
||||
return deleted == null ? ImmutableList.<String>of() : deleted;
|
||||
}
|
||||
|
||||
public boolean metaDataChanged() {
|
||||
return state.metaData() != previousState.metaData();
|
||||
}
|
||||
|
|
|
@ -27,9 +27,7 @@ import org.elasticsearch.cluster.action.index.NodeIndexCreatedAction;
|
|||
import org.elasticsearch.cluster.block.ClusterBlock;
|
||||
import org.elasticsearch.cluster.block.ClusterBlocks;
|
||||
import org.elasticsearch.cluster.routing.IndexRoutingTable;
|
||||
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
|
||||
import org.elasticsearch.cluster.routing.RoutingTable;
|
||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
|
||||
import org.elasticsearch.cluster.routing.allocation.ShardsAllocation;
|
||||
import org.elasticsearch.common.Strings;
|
||||
|
@ -63,7 +61,11 @@ import org.elasticsearch.threadpool.ThreadPool;
|
|||
import java.io.File;
|
||||
import java.io.FileReader;
|
||||
import java.io.IOException;
|
||||
import java.util.*;
|
||||
import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ScheduledFuture;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
@ -118,8 +120,6 @@ public class MetaDataCreateIndexService extends AbstractComponent {
|
|||
|
||||
|
||||
clusterService.submitStateUpdateTask("create-index [" + request.index + "], cause [" + request.cause + "]", new ProcessedClusterStateUpdateTask() {
|
||||
final Set<String> allocatedNodes = Sets.newHashSet();
|
||||
|
||||
@Override public ClusterState execute(ClusterState currentState) {
|
||||
try {
|
||||
try {
|
||||
|
@ -271,21 +271,8 @@ public class MetaDataCreateIndexService extends AbstractComponent {
|
|||
updatedState = newClusterStateBuilder().state(updatedState).routingResult(routingResult).build();
|
||||
}
|
||||
|
||||
// initialize the counter only for nodes the shards are allocated to
|
||||
if (updatedState.routingTable().hasIndex(request.index)) {
|
||||
for (IndexShardRoutingTable indexShardRoutingTable : updatedState.routingTable().index(request.index)) {
|
||||
for (ShardRouting shardRouting : indexShardRoutingTable) {
|
||||
// if we have a routing for this shard on a node, and its not the master node (since we already created
|
||||
// an index on it), then add it
|
||||
if (shardRouting.currentNodeId() != null && !updatedState.nodes().localNodeId().equals(shardRouting.currentNodeId())) {
|
||||
allocatedNodes.add(shardRouting.currentNodeId());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (!allocatedNodes.isEmpty()) {
|
||||
final AtomicInteger counter = new AtomicInteger(allocatedNodes.size());
|
||||
// we wait for events from all nodes that the index has been added to the metadata
|
||||
final AtomicInteger counter = new AtomicInteger(currentState.nodes().size());
|
||||
|
||||
final NodeIndexCreatedAction.Listener nodeIndexCreatedListener = new NodeIndexCreatedAction.Listener() {
|
||||
@Override public void onNodeIndexCreated(String index, String nodeId) {
|
||||
|
@ -306,8 +293,6 @@ public class MetaDataCreateIndexService extends AbstractComponent {
|
|||
nodeIndexCreatedAction.remove(nodeIndexCreatedListener);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
return updatedState;
|
||||
} catch (Exception e) {
|
||||
|
@ -318,9 +303,6 @@ public class MetaDataCreateIndexService extends AbstractComponent {
|
|||
}
|
||||
|
||||
@Override public void clusterStateProcessed(ClusterState clusterState) {
|
||||
if (allocatedNodes.isEmpty()) {
|
||||
listener.onResponse(new Response(true, clusterState.metaData().index(request.index)));
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
|
|
@ -24,12 +24,9 @@ import org.elasticsearch.cluster.ClusterState;
|
|||
import org.elasticsearch.cluster.ClusterStateUpdateTask;
|
||||
import org.elasticsearch.cluster.action.index.NodeIndexDeletedAction;
|
||||
import org.elasticsearch.cluster.block.ClusterBlocks;
|
||||
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
|
||||
import org.elasticsearch.cluster.routing.RoutingTable;
|
||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
|
||||
import org.elasticsearch.cluster.routing.allocation.ShardsAllocation;
|
||||
import org.elasticsearch.common.collect.Sets;
|
||||
import org.elasticsearch.common.component.AbstractComponent;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
|
@ -38,7 +35,6 @@ import org.elasticsearch.index.Index;
|
|||
import org.elasticsearch.indices.IndexMissingException;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ScheduledFuture;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
@ -93,26 +89,7 @@ public class MetaDataDeleteIndexService extends AbstractComponent {
|
|||
|
||||
ClusterBlocks blocks = ClusterBlocks.builder().blocks(currentState.blocks()).removeIndexBlocks(request.index).build();
|
||||
|
||||
// initialize the counter only for nodes the shards are allocated to
|
||||
Set<String> allocatedNodes = Sets.newHashSet();
|
||||
if (currentState.routingTable().hasIndex(request.index)) {
|
||||
for (IndexShardRoutingTable indexShardRoutingTable : currentState.routingTable().index(request.index)) {
|
||||
for (ShardRouting shardRouting : indexShardRoutingTable) {
|
||||
if (shardRouting.currentNodeId() != null) {
|
||||
allocatedNodes.add(shardRouting.currentNodeId());
|
||||
}
|
||||
if (shardRouting.relocatingNodeId() != null) {
|
||||
allocatedNodes.add(shardRouting.relocatingNodeId());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (allocatedNodes.isEmpty()) {
|
||||
// no nodes allocated, don't wait for a response
|
||||
listener.onResponse(new Response(true));
|
||||
} else {
|
||||
final AtomicInteger counter = new AtomicInteger(allocatedNodes.size());
|
||||
final AtomicInteger counter = new AtomicInteger(currentState.nodes().size());
|
||||
|
||||
final NodeIndexDeletedAction.Listener nodeIndexDeleteListener = new NodeIndexDeletedAction.Listener() {
|
||||
@Override public void onNodeIndexDeleted(String index, String nodeId) {
|
||||
|
@ -132,7 +109,6 @@ public class MetaDataDeleteIndexService extends AbstractComponent {
|
|||
nodeIndexDeletedAction.remove(nodeIndexDeleteListener);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
return newClusterStateBuilder().state(currentState).routingResult(routingResult).metaData(newMetaData).blocks(blocks).build();
|
||||
} catch (Exception e) {
|
||||
|
|
|
@ -180,6 +180,24 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
|
|||
applyDeletedShards(event);
|
||||
applyCleanedIndices(event);
|
||||
applySettings(event);
|
||||
sendIndexLifecycleEvents(event);
|
||||
}
|
||||
}
|
||||
|
||||
private void sendIndexLifecycleEvents(final ClusterChangedEvent event) {
|
||||
for (String index : event.indicesCreated()) {
|
||||
try {
|
||||
nodeIndexCreatedAction.nodeIndexCreated(index, event.state().nodes().localNodeId());
|
||||
} catch (Exception e) {
|
||||
logger.debug("failed to send to master index {} created event", index);
|
||||
}
|
||||
}
|
||||
for (String index : event.indicesDeleted()) {
|
||||
try {
|
||||
nodeIndexDeletedAction.nodeIndexDeleted(index, event.state().nodes().localNodeId());
|
||||
} catch (Exception e) {
|
||||
logger.debug("failed to send to master index {} deleted event", index);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -223,11 +241,6 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
|
|||
}
|
||||
try {
|
||||
indicesService.deleteIndex(index, "deleting index");
|
||||
threadPool.cached().execute(new Runnable() {
|
||||
@Override public void run() {
|
||||
nodeIndexDeletedAction.nodeIndexDeleted(index, event.state().nodes().localNodeId());
|
||||
}
|
||||
});
|
||||
} catch (Exception e) {
|
||||
logger.warn("failed to delete index", e);
|
||||
}
|
||||
|
@ -292,11 +305,6 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
|
|||
logger.debug("[{}] creating index", indexMetaData.index());
|
||||
}
|
||||
indicesService.createIndex(indexMetaData.index(), indexMetaData.settings(), event.state().nodes().localNode().id());
|
||||
threadPool.cached().execute(new Runnable() {
|
||||
@Override public void run() {
|
||||
nodeIndexCreatedAction.nodeIndexCreated(indexMetaData.index(), event.state().nodes().localNodeId());
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue