Order methods in IndicesClusterStateService according to execution
This commit is contained in:
parent
3da7393b00
commit
749a851f93
|
@ -191,39 +191,29 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void applyCleanedIndices(final ClusterChangedEvent event) {
|
private void cleanFailedShards(final ClusterChangedEvent event) {
|
||||||
// handle closed indices, since they are not allocated on a node once they are closed
|
RoutingNodes.RoutingNodeIterator routingNode = event.state().getRoutingNodes().routingNodeIter(event.state().nodes().localNodeId());
|
||||||
// so applyDeletedIndices might not take them into account
|
if (routingNode == null) {
|
||||||
for (IndexService indexService : indicesService) {
|
failedShards.clear();
|
||||||
Index index = indexService.index();
|
return;
|
||||||
IndexMetaData indexMetaData = event.state().metaData().index(index);
|
}
|
||||||
if (indexMetaData != null && indexMetaData.getState() == IndexMetaData.State.CLOSE) {
|
RoutingTable routingTable = event.state().routingTable();
|
||||||
for (Integer shardId : indexService.shardIds()) {
|
for (Iterator<Map.Entry<ShardId, ShardRouting>> iterator = failedShards.entrySet().iterator(); iterator.hasNext(); ) {
|
||||||
logger.debug("{}[{}] removing shard (index is closed)", index, shardId);
|
Map.Entry<ShardId, ShardRouting> entry = iterator.next();
|
||||||
try {
|
ShardId failedShardId = entry.getKey();
|
||||||
indexService.removeShard(shardId, "removing shard (index is closed)");
|
ShardRouting failedShardRouting = entry.getValue();
|
||||||
} catch (Throwable e) {
|
IndexRoutingTable indexRoutingTable = routingTable.index(failedShardId.getIndex());
|
||||||
logger.warn("{} failed to remove shard (index is closed)", e, index);
|
if (indexRoutingTable == null) {
|
||||||
}
|
iterator.remove();
|
||||||
}
|
continue;
|
||||||
}
|
}
|
||||||
}
|
IndexShardRoutingTable shardRoutingTable = indexRoutingTable.shard(failedShardId.id());
|
||||||
|
if (shardRoutingTable == null) {
|
||||||
Set<Index> hasAllocations = new HashSet<>();
|
iterator.remove();
|
||||||
for (ShardRouting routing : event.state().getRoutingNodes().node(event.state().nodes().localNodeId())) {
|
continue;
|
||||||
hasAllocations.add(routing.index());
|
}
|
||||||
}
|
if (shardRoutingTable.assignedShards().stream().noneMatch(shr -> shr.isSameAllocation(failedShardRouting))) {
|
||||||
for (IndexService indexService : indicesService) {
|
iterator.remove();
|
||||||
Index index = indexService.index();
|
|
||||||
if (hasAllocations.contains(index) == false) {
|
|
||||||
assert indexService.shardIds().isEmpty() :
|
|
||||||
"no locally assigned shards, but index wasn't emptied by applyDeletedShards."
|
|
||||||
+ " index " + index + ", shards: " + indexService.shardIds();
|
|
||||||
if (logger.isDebugEnabled()) {
|
|
||||||
logger.debug("{} cleaning index (no shards allocated)", index);
|
|
||||||
}
|
|
||||||
// clean the index
|
|
||||||
removeIndex(index, "removing index (no shards allocated)");
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -305,23 +295,39 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void applyNewIndices(final ClusterChangedEvent event) {
|
private void applyCleanedIndices(final ClusterChangedEvent event) {
|
||||||
// we only create indices for shards that are allocated
|
// handle closed indices, since they are not allocated on a node once they are closed
|
||||||
RoutingNodes.RoutingNodeIterator routingNode = event.state().getRoutingNodes().routingNodeIter(event.state().nodes().localNodeId());
|
// so applyDeletedIndices might not take them into account
|
||||||
if (routingNode == null) {
|
for (IndexService indexService : indicesService) {
|
||||||
return;
|
Index index = indexService.index();
|
||||||
|
IndexMetaData indexMetaData = event.state().metaData().index(index);
|
||||||
|
if (indexMetaData != null && indexMetaData.getState() == IndexMetaData.State.CLOSE) {
|
||||||
|
for (Integer shardId : indexService.shardIds()) {
|
||||||
|
logger.debug("{}[{}] removing shard (index is closed)", index, shardId);
|
||||||
|
try {
|
||||||
|
indexService.removeShard(shardId, "removing shard (index is closed)");
|
||||||
|
} catch (Throwable e) {
|
||||||
|
logger.warn("{} failed to remove shard (index is closed)", e, index);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
for (ShardRouting shard : routingNode) {
|
|
||||||
if (!indicesService.hasIndex(shard.index())) {
|
Set<Index> hasAllocations = new HashSet<>();
|
||||||
final IndexMetaData indexMetaData = event.state().metaData().getIndexSafe(shard.index());
|
for (ShardRouting routing : event.state().getRoutingNodes().node(event.state().nodes().localNodeId())) {
|
||||||
|
hasAllocations.add(routing.index());
|
||||||
|
}
|
||||||
|
for (IndexService indexService : indicesService) {
|
||||||
|
Index index = indexService.index();
|
||||||
|
if (hasAllocations.contains(index) == false) {
|
||||||
|
assert indexService.shardIds().isEmpty() :
|
||||||
|
"no locally assigned shards, but index wasn't emptied by applyDeletedShards."
|
||||||
|
+ " index " + index + ", shards: " + indexService.shardIds();
|
||||||
if (logger.isDebugEnabled()) {
|
if (logger.isDebugEnabled()) {
|
||||||
logger.debug("[{}] creating index", indexMetaData.getIndex());
|
logger.debug("{} cleaning index (no shards allocated)", index);
|
||||||
}
|
|
||||||
try {
|
|
||||||
indicesService.createIndex(nodeServicesProvider, indexMetaData, buildInIndexListener);
|
|
||||||
} catch (Throwable e) {
|
|
||||||
sendFailShard(shard, "failed to create index", e);
|
|
||||||
}
|
}
|
||||||
|
// clean the index
|
||||||
|
removeIndex(index, "removing index (no shards allocated)");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -349,6 +355,26 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void applyNewIndices(final ClusterChangedEvent event) {
|
||||||
|
// we only create indices for shards that are allocated
|
||||||
|
RoutingNodes.RoutingNodeIterator routingNode = event.state().getRoutingNodes().routingNodeIter(event.state().nodes().localNodeId());
|
||||||
|
if (routingNode == null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
for (ShardRouting shard : routingNode) {
|
||||||
|
if (!indicesService.hasIndex(shard.index())) {
|
||||||
|
final IndexMetaData indexMetaData = event.state().metaData().getIndexSafe(shard.index());
|
||||||
|
if (logger.isDebugEnabled()) {
|
||||||
|
logger.debug("[{}] creating index", indexMetaData.getIndex());
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
indicesService.createIndex(nodeServicesProvider, indexMetaData, buildInIndexListener);
|
||||||
|
} catch (Throwable e) {
|
||||||
|
sendFailShard(shard, "failed to create index", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private void applyMappings(ClusterChangedEvent event) {
|
private void applyMappings(ClusterChangedEvent event) {
|
||||||
// go over and update mappings
|
// go over and update mappings
|
||||||
|
@ -503,33 +529,6 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void cleanFailedShards(final ClusterChangedEvent event) {
|
|
||||||
RoutingNodes.RoutingNodeIterator routingNode = event.state().getRoutingNodes().routingNodeIter(event.state().nodes().localNodeId());
|
|
||||||
if (routingNode == null) {
|
|
||||||
failedShards.clear();
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
RoutingTable routingTable = event.state().routingTable();
|
|
||||||
for (Iterator<Map.Entry<ShardId, ShardRouting>> iterator = failedShards.entrySet().iterator(); iterator.hasNext(); ) {
|
|
||||||
Map.Entry<ShardId, ShardRouting> entry = iterator.next();
|
|
||||||
ShardId failedShardId = entry.getKey();
|
|
||||||
ShardRouting failedShardRouting = entry.getValue();
|
|
||||||
IndexRoutingTable indexRoutingTable = routingTable.index(failedShardId.getIndex());
|
|
||||||
if (indexRoutingTable == null) {
|
|
||||||
iterator.remove();
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
IndexShardRoutingTable shardRoutingTable = indexRoutingTable.shard(failedShardId.id());
|
|
||||||
if (shardRoutingTable == null) {
|
|
||||||
iterator.remove();
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
if (shardRoutingTable.assignedShards().stream().noneMatch(shr -> shr.isSameAllocation(failedShardRouting))) {
|
|
||||||
iterator.remove();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void applyInitializingShard(final ClusterState state, final IndexMetaData indexMetaData, IndexService indexService, final ShardRouting shardRouting) {
|
private void applyInitializingShard(final ClusterState state, final IndexMetaData indexMetaData, IndexService indexService, final ShardRouting shardRouting) {
|
||||||
final RoutingTable routingTable = state.routingTable();
|
final RoutingTable routingTable = state.routingTable();
|
||||||
final DiscoveryNodes nodes = state.getNodes();
|
final DiscoveryNodes nodes = state.getNodes();
|
||||||
|
|
Loading…
Reference in New Issue