optimize applyDeletes event
- reuse set - don't copy over again the shard ids immutable set
This commit is contained in:
parent
5b078ebfed
commit
09a6907cca
|
@ -192,7 +192,7 @@ public class InternalIndexService extends AbstractIndexComponent implements Inde
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ImmutableSet<Integer> shardIds() {
|
public ImmutableSet<Integer> shardIds() {
|
||||||
return ImmutableSet.copyOf(shards.keySet());
|
return shards.keySet();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -20,6 +20,7 @@
|
||||||
package org.elasticsearch.indices.cluster;
|
package org.elasticsearch.indices.cluster;
|
||||||
|
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
|
import gnu.trove.set.hash.TIntHashSet;
|
||||||
import org.elasticsearch.ElasticSearchException;
|
import org.elasticsearch.ElasticSearchException;
|
||||||
import org.elasticsearch.ExceptionsHelper;
|
import org.elasticsearch.ExceptionsHelper;
|
||||||
import org.elasticsearch.cluster.ClusterChangedEvent;
|
import org.elasticsearch.cluster.ClusterChangedEvent;
|
||||||
|
@ -65,11 +66,9 @@ import org.elasticsearch.threadpool.ThreadPool;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Set;
|
|
||||||
import java.util.concurrent.ConcurrentMap;
|
import java.util.concurrent.ConcurrentMap;
|
||||||
|
|
||||||
import static com.google.common.collect.Maps.newHashMap;
|
import static com.google.common.collect.Maps.newHashMap;
|
||||||
import static com.google.common.collect.Sets.newHashSet;
|
|
||||||
import static org.elasticsearch.ExceptionsHelper.detailedMessage;
|
import static org.elasticsearch.ExceptionsHelper.detailedMessage;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -242,39 +241,40 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
|
||||||
}
|
}
|
||||||
|
|
||||||
private void applyDeletedShards(final ClusterChangedEvent event) {
|
private void applyDeletedShards(final ClusterChangedEvent event) {
|
||||||
RoutingNode routingNodes = event.state().readOnlyRoutingNodes().nodesToShards().get(event.state().nodes().localNodeId());
|
RoutingNode routingNode = event.state().readOnlyRoutingNodes().nodesToShards().get(event.state().nodes().localNodeId());
|
||||||
if (routingNodes == null) {
|
if (routingNode == null) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
for (final String index : indicesService.indices()) {
|
TIntHashSet newShardIds = new TIntHashSet();
|
||||||
|
for (IndexService indexService : indicesService) {
|
||||||
|
String index = indexService.index().name();
|
||||||
IndexMetaData indexMetaData = event.state().metaData().index(index);
|
IndexMetaData indexMetaData = event.state().metaData().index(index);
|
||||||
if (indexMetaData != null) {
|
if (indexMetaData == null) {
|
||||||
// now, go over and delete shards that needs to get deleted
|
continue;
|
||||||
Set<Integer> newShardIds = newHashSet();
|
}
|
||||||
for (final ShardRouting shardRouting : routingNodes) {
|
// now, go over and delete shards that needs to get deleted
|
||||||
if (shardRouting.index().equals(index)) {
|
newShardIds.clear();
|
||||||
newShardIds.add(shardRouting.id());
|
List<MutableShardRouting> shards = routingNode.shards();
|
||||||
}
|
for (int i = 0; i < shards.size(); i++) {
|
||||||
|
ShardRouting shardRouting = shards.get(i);
|
||||||
|
if (shardRouting.index().equals(index)) {
|
||||||
|
newShardIds.add(shardRouting.id());
|
||||||
}
|
}
|
||||||
final IndexService indexService = indicesService.indexService(index);
|
}
|
||||||
if (indexService == null) {
|
for (Integer existingShardId : indexService.shardIds()) {
|
||||||
continue;
|
if (!newShardIds.contains(existingShardId)) {
|
||||||
}
|
if (indexMetaData.state() == IndexMetaData.State.CLOSE) {
|
||||||
for (Integer existingShardId : indexService.shardIds()) {
|
if (logger.isDebugEnabled()) {
|
||||||
if (!newShardIds.contains(existingShardId)) {
|
logger.debug("[{}][{}] removing shard (index is closed)", index, existingShardId);
|
||||||
if (indexMetaData.state() == IndexMetaData.State.CLOSE) {
|
|
||||||
if (logger.isDebugEnabled()) {
|
|
||||||
logger.debug("[{}][{}] removing shard (index is closed)", index, existingShardId);
|
|
||||||
}
|
|
||||||
indexService.removeShard(existingShardId, "removing shard (index is closed)");
|
|
||||||
} else {
|
|
||||||
// we can just remove the shard, without cleaning it locally, since we will clean it
|
|
||||||
// when all shards are allocated in the IndicesStore
|
|
||||||
if (logger.isDebugEnabled()) {
|
|
||||||
logger.debug("[{}][{}] removing shard (not allocated)", index, existingShardId);
|
|
||||||
}
|
|
||||||
indexService.removeShard(existingShardId, "removing shard (not allocated)");
|
|
||||||
}
|
}
|
||||||
|
indexService.removeShard(existingShardId, "removing shard (index is closed)");
|
||||||
|
} else {
|
||||||
|
// we can just remove the shard, without cleaning it locally, since we will clean it
|
||||||
|
// when all shards are allocated in the IndicesStore
|
||||||
|
if (logger.isDebugEnabled()) {
|
||||||
|
logger.debug("[{}][{}] removing shard (not allocated)", index, existingShardId);
|
||||||
|
}
|
||||||
|
indexService.removeShard(existingShardId, "removing shard (not allocated)");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue