nicer code

This commit is contained in:
kimchy 2010-04-06 12:54:19 +03:00
parent b760ab18b6
commit 5cef456305
2 changed files with 69 additions and 48 deletions
modules/elasticsearch/src/main/java/org/elasticsearch

@ -78,7 +78,7 @@ public class ClusterState {
/**
* Returns a built (on demand) routing nodes view of the routing table. <b>NOTE, the routing nodes
* are immutable, use them just for read operations</b>
* are mutable, use them just for read operations</b>
*/
public RoutingNodes readOnlyRoutingNodes() {
if (routingNodes != null) {

@ -30,7 +30,6 @@ import org.elasticsearch.cluster.action.index.NodeIndexDeletedAction;
import org.elasticsearch.cluster.action.index.NodeMappingCreatedAction;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.Node;
import org.elasticsearch.cluster.node.Nodes;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
@ -60,7 +59,7 @@ import static com.google.common.collect.Sets.*;
import static org.elasticsearch.ExceptionsHelper.*;
/**
* @author kimchy (Shay Banon)
* @author kimchy (shay.banon)
*/
public class IndicesClusterStateService extends AbstractLifecycleComponent<IndicesClusterStateService> implements ClusterStateListener {
@ -107,9 +106,62 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
if (!indicesService.changesAllowed())
return;
MetaData metaData = event.state().metaData();
applyNewIndices(event);
applyMappings(event);
applyNewShards(event);
applyDeletedIndices(event);
applyDeletedShards(event);
}
private void applyDeletedIndices(final ClusterChangedEvent event) {
for (final String index : indicesService.indices()) {
if (!event.state().metaData().hasIndex(index)) {
if (logger.isDebugEnabled()) {
logger.debug("Index [{}]: Deleting", index);
}
indicesService.deleteIndex(index);
threadPool.execute(new Runnable() {
@Override public void run() {
nodeIndexDeletedAction.nodeIndexDeleted(index, event.state().nodes().localNodeId());
}
});
}
}
}
private void applyDeletedShards(final ClusterChangedEvent event) {
RoutingNode routingNodes = event.state().readOnlyRoutingNodes().nodesToShards().get(event.state().nodes().localNodeId());
if (routingNodes == null) {
return;
}
for (final String index : indicesService.indices()) {
if (event.state().metaData().hasIndex(index)) {
// now, go over and delete shards that needs to get deleted
Set<Integer> newShardIds = newHashSet();
for (final ShardRouting shardRouting : routingNodes) {
if (shardRouting.index().equals(index)) {
newShardIds.add(shardRouting.id());
}
}
final IndexService indexService = indicesService.indexService(index);
if (indexService == null) {
continue;
}
for (Integer existingShardId : indexService.shardIds()) {
if (!newShardIds.contains(existingShardId)) {
if (logger.isDebugEnabled()) {
logger.debug("Index [{}]: Deleting shard [{}]", index, existingShardId);
}
indexService.deleteShard(existingShardId);
}
}
}
}
}
private void applyNewIndices(final ClusterChangedEvent event) {
// first, go over and create and indices that needs to be created
for (final IndexMetaData indexMetaData : metaData) {
for (final IndexMetaData indexMetaData : event.state().metaData()) {
if (!indicesService.hasIndex(indexMetaData.index())) {
if (logger.isDebugEnabled()) {
logger.debug("Index [{}]: Creating", indexMetaData.index());
@ -122,11 +174,12 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
});
}
}
}
RoutingTable routingTable = event.state().routingTable();
private void applyMappings(ClusterChangedEvent event) {
// go over and update mappings
for (IndexMetaData indexMetaData : metaData) {
for (IndexMetaData indexMetaData : event.state().metaData()) {
if (!indicesService.hasIndex(indexMetaData.index())) {
// we only create / update here
continue;
@ -163,52 +216,20 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
}
}
}
RoutingNode routingNodes = event.state().readOnlyRoutingNodes().nodesToShards().get(event.state().nodes().localNodeId());
if (routingNodes != null) {
applyShards(routingNodes, routingTable, event.state().nodes());
}
// go over and delete either all indices or specific shards
for (final String index : indicesService.indices()) {
if (!metaData.hasIndex(index)) {
if (logger.isDebugEnabled()) {
logger.debug("Index [{}]: Deleting", index);
}
indicesService.deleteIndex(index);
threadPool.execute(new Runnable() {
@Override public void run() {
nodeIndexDeletedAction.nodeIndexDeleted(index, event.state().nodes().localNodeId());
}
});
} else if (routingNodes != null) {
// now, go over and delete shards that needs to get deleted
Set<Integer> newShardIds = newHashSet();
for (final ShardRouting shardRouting : routingNodes) {
if (shardRouting.index().equals(index)) {
newShardIds.add(shardRouting.id());
}
}
final IndexService indexService = indicesService.indexService(index);
if (indexService == null) {
continue;
}
for (Integer existingShardId : indexService.shardIds()) {
if (!newShardIds.contains(existingShardId)) {
if (logger.isDebugEnabled()) {
logger.debug("Index [{}]: Deleting shard [{}]", index, existingShardId);
}
indexService.deleteShard(existingShardId);
}
}
}
}
}
private void applyShards(final RoutingNode routingNodes, final RoutingTable routingTable, final Nodes nodes) throws ElasticSearchException {
private void applyNewShards(final ClusterChangedEvent event) throws ElasticSearchException {
if (!indicesService.changesAllowed())
return;
RoutingTable routingTable = event.state().routingTable();
RoutingNode routingNodes = event.state().readOnlyRoutingNodes().nodesToShards().get(event.state().nodes().localNodeId());
if (routingNodes == null) {
return;
}
Nodes nodes = event.state().nodes();
for (final ShardRouting shardRouting : routingNodes) {
final IndexService indexService = indicesService.indexServiceSafe(shardRouting.index());