mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-03-09 14:34:43 +00:00
refactor shard allocation to be more pluggable for node allocation "deciders"
This commit is contained in:
parent
2865ceef85
commit
5ded04c5cb
@ -26,9 +26,7 @@ import org.elasticsearch.cluster.action.index.NodeMappingCreatedAction;
|
||||
import org.elasticsearch.cluster.action.shard.ShardStateAction;
|
||||
import org.elasticsearch.cluster.metadata.*;
|
||||
import org.elasticsearch.cluster.routing.RoutingService;
|
||||
import org.elasticsearch.cluster.routing.strategy.PreferUnallocatedShardUnassignedStrategy;
|
||||
import org.elasticsearch.cluster.routing.strategy.PreferUnallocatedStrategy;
|
||||
import org.elasticsearch.cluster.routing.strategy.ShardsRoutingStrategy;
|
||||
import org.elasticsearch.cluster.routing.allocation.ShardAllocationModule;
|
||||
import org.elasticsearch.cluster.service.InternalClusterService;
|
||||
import org.elasticsearch.common.inject.AbstractModule;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
@ -46,8 +44,8 @@ public class ClusterModule extends AbstractModule {
|
||||
|
||||
@Override
|
||||
protected void configure() {
|
||||
bind(PreferUnallocatedStrategy.class).to(PreferUnallocatedShardUnassignedStrategy.class).asEagerSingleton();
|
||||
bind(ShardsRoutingStrategy.class).asEagerSingleton();
|
||||
|
||||
new ShardAllocationModule().configure(binder());
|
||||
|
||||
bind(ClusterService.class).to(InternalClusterService.class).asEagerSingleton();
|
||||
bind(MetaDataCreateIndexService.class).asEagerSingleton();
|
||||
|
@ -28,7 +28,7 @@ import org.elasticsearch.cluster.routing.IndexRoutingTable;
|
||||
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
|
||||
import org.elasticsearch.cluster.routing.RoutingTable;
|
||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
import org.elasticsearch.cluster.routing.strategy.ShardsRoutingStrategy;
|
||||
import org.elasticsearch.cluster.routing.allocation.ShardsAllocation;
|
||||
import org.elasticsearch.common.component.AbstractComponent;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
@ -54,16 +54,16 @@ public class ShardStateAction extends AbstractComponent {
|
||||
|
||||
private final ClusterService clusterService;
|
||||
|
||||
private final ShardsRoutingStrategy shardsRoutingStrategy;
|
||||
private final ShardsAllocation shardsAllocation;
|
||||
|
||||
private final ThreadPool threadPool;
|
||||
|
||||
@Inject public ShardStateAction(Settings settings, ClusterService clusterService, TransportService transportService,
|
||||
ShardsRoutingStrategy shardsRoutingStrategy, ThreadPool threadPool) {
|
||||
ShardsAllocation shardsAllocation, ThreadPool threadPool) {
|
||||
super(settings);
|
||||
this.clusterService = clusterService;
|
||||
this.transportService = transportService;
|
||||
this.shardsRoutingStrategy = shardsRoutingStrategy;
|
||||
this.shardsAllocation = shardsAllocation;
|
||||
this.threadPool = threadPool;
|
||||
|
||||
transportService.registerHandler(ShardStartedTransportHandler.ACTION, new ShardStartedTransportHandler());
|
||||
@ -125,7 +125,7 @@ public class ShardStateAction extends AbstractComponent {
|
||||
logger.debug("Applying failed shard {}, reason [{}]", shardRouting, reason);
|
||||
}
|
||||
RoutingTable prevRoutingTable = currentState.routingTable();
|
||||
RoutingTable newRoutingTable = shardsRoutingStrategy.applyFailedShards(currentState, newArrayList(shardRouting));
|
||||
RoutingTable newRoutingTable = shardsAllocation.applyFailedShards(currentState, newArrayList(shardRouting));
|
||||
if (prevRoutingTable == newRoutingTable) {
|
||||
return currentState;
|
||||
}
|
||||
@ -163,7 +163,7 @@ public class ShardStateAction extends AbstractComponent {
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("Applying started shard {}, reason [{}]", shardRouting, reason);
|
||||
}
|
||||
RoutingTable newRoutingTable = shardsRoutingStrategy.applyStartedShards(currentState, newArrayList(shardRouting));
|
||||
RoutingTable newRoutingTable = shardsAllocation.applyStartedShards(currentState, newArrayList(shardRouting));
|
||||
if (routingTable == newRoutingTable) {
|
||||
return currentState;
|
||||
}
|
||||
|
@ -26,7 +26,7 @@ import org.elasticsearch.cluster.ProcessedClusterStateUpdateTask;
|
||||
import org.elasticsearch.cluster.action.index.NodeIndexCreatedAction;
|
||||
import org.elasticsearch.cluster.routing.IndexRoutingTable;
|
||||
import org.elasticsearch.cluster.routing.RoutingTable;
|
||||
import org.elasticsearch.cluster.routing.strategy.ShardsRoutingStrategy;
|
||||
import org.elasticsearch.cluster.routing.allocation.ShardsAllocation;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.collect.Maps;
|
||||
import org.elasticsearch.common.component.AbstractComponent;
|
||||
@ -74,18 +74,18 @@ public class MetaDataCreateIndexService extends AbstractComponent {
|
||||
|
||||
private final IndicesService indicesService;
|
||||
|
||||
private final ShardsRoutingStrategy shardsRoutingStrategy;
|
||||
private final ShardsAllocation shardsAllocation;
|
||||
|
||||
private final NodeIndexCreatedAction nodeIndexCreatedAction;
|
||||
|
||||
@Inject public MetaDataCreateIndexService(Settings settings, Environment environment, TimerService timerService, ClusterService clusterService, IndicesService indicesService,
|
||||
ShardsRoutingStrategy shardsRoutingStrategy, NodeIndexCreatedAction nodeIndexCreatedAction) {
|
||||
ShardsAllocation shardsAllocation, NodeIndexCreatedAction nodeIndexCreatedAction) {
|
||||
super(settings);
|
||||
this.environment = environment;
|
||||
this.timerService = timerService;
|
||||
this.clusterService = clusterService;
|
||||
this.indicesService = indicesService;
|
||||
this.shardsRoutingStrategy = shardsRoutingStrategy;
|
||||
this.shardsAllocation = shardsAllocation;
|
||||
this.nodeIndexCreatedAction = nodeIndexCreatedAction;
|
||||
}
|
||||
|
||||
@ -265,9 +265,9 @@ public class MetaDataCreateIndexService extends AbstractComponent {
|
||||
}
|
||||
// do prefetch here so we won't compute md5 and such on the cluster update state...
|
||||
long prefetchTime = 0;
|
||||
if (shardsRoutingStrategy.preferUnallocatedStrategy() != null) {
|
||||
if (shardsAllocation.preferUnallocatedStrategy() != null) {
|
||||
long start = System.currentTimeMillis();
|
||||
shardsRoutingStrategy.preferUnallocatedStrategy().prefetch(response.indexMetaData(), clusterService.state().nodes());
|
||||
shardsAllocation.preferUnallocatedStrategy().prefetch(response.indexMetaData(), clusterService.state().nodes());
|
||||
prefetchTime = System.currentTimeMillis() - start;
|
||||
}
|
||||
final long fPrefetchTime = prefetchTime;
|
||||
@ -282,7 +282,7 @@ public class MetaDataCreateIndexService extends AbstractComponent {
|
||||
IndexRoutingTable.Builder indexRoutingBuilder = new IndexRoutingTable.Builder(request.index)
|
||||
.initializeEmpty(currentState.metaData().index(request.index));
|
||||
routingTableBuilder.add(indexRoutingBuilder);
|
||||
RoutingTable newRoutingTable = shardsRoutingStrategy.reroute(newClusterStateBuilder().state(currentState).routingTable(routingTableBuilder).build());
|
||||
RoutingTable newRoutingTable = shardsAllocation.reroute(newClusterStateBuilder().state(currentState).routingTable(routingTableBuilder).build());
|
||||
return newClusterStateBuilder().state(currentState).routingTable(newRoutingTable).build();
|
||||
}
|
||||
|
||||
|
@ -26,7 +26,7 @@ import org.elasticsearch.cluster.action.index.NodeIndexDeletedAction;
|
||||
import org.elasticsearch.cluster.block.ClusterBlocks;
|
||||
import org.elasticsearch.cluster.routing.IndexRoutingTable;
|
||||
import org.elasticsearch.cluster.routing.RoutingTable;
|
||||
import org.elasticsearch.cluster.routing.strategy.ShardsRoutingStrategy;
|
||||
import org.elasticsearch.cluster.routing.allocation.ShardsAllocation;
|
||||
import org.elasticsearch.common.component.AbstractComponent;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
@ -52,16 +52,16 @@ public class MetaDataDeleteIndexService extends AbstractComponent {
|
||||
|
||||
private final ClusterService clusterService;
|
||||
|
||||
private final ShardsRoutingStrategy shardsRoutingStrategy;
|
||||
private final ShardsAllocation shardsAllocation;
|
||||
|
||||
private final NodeIndexDeletedAction nodeIndexDeletedAction;
|
||||
|
||||
@Inject public MetaDataDeleteIndexService(Settings settings, TimerService timerService, ClusterService clusterService, ShardsRoutingStrategy shardsRoutingStrategy,
|
||||
@Inject public MetaDataDeleteIndexService(Settings settings, TimerService timerService, ClusterService clusterService, ShardsAllocation shardsAllocation,
|
||||
NodeIndexDeletedAction nodeIndexDeletedAction) {
|
||||
super(settings);
|
||||
this.timerService = timerService;
|
||||
this.clusterService = clusterService;
|
||||
this.shardsRoutingStrategy = shardsRoutingStrategy;
|
||||
this.shardsAllocation = shardsAllocation;
|
||||
this.nodeIndexDeletedAction = nodeIndexDeletedAction;
|
||||
}
|
||||
|
||||
@ -89,7 +89,7 @@ public class MetaDataDeleteIndexService extends AbstractComponent {
|
||||
.remove(request.index)
|
||||
.build();
|
||||
|
||||
RoutingTable newRoutingTable = shardsRoutingStrategy.reroute(
|
||||
RoutingTable newRoutingTable = shardsAllocation.reroute(
|
||||
newClusterStateBuilder().state(currentState).routingTable(routingTableBuilder).metaData(newMetaData).build());
|
||||
|
||||
ClusterBlocks blocks = ClusterBlocks.builder().blocks(currentState.blocks()).removeIndexBlocks(request.index).build();
|
||||
|
@ -117,30 +117,6 @@ public class RoutingNode implements Iterable<MutableShardRouting> {
|
||||
return count;
|
||||
}
|
||||
|
||||
public boolean canAllocate(RoutingNodes nodes) {
|
||||
return true;
|
||||
}
|
||||
|
||||
public boolean canAllocate(ShardRouting requested) {
|
||||
for (MutableShardRouting current : shards) {
|
||||
// we do not allow for two shards of the same shard id to exists on the same node
|
||||
if (current.shardId().equals(requested.shardId())) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
public boolean canAllocate(MutableShardRouting requested) {
|
||||
for (MutableShardRouting current : shards) {
|
||||
// we do not allow for two shards of the same shard id to exists on the same node
|
||||
if (current.shardId().equals(requested.shardId())) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
public String prettyPrint() {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
sb.append("-----node_id[").append(nodeId).append("]\n");
|
||||
|
@ -21,7 +21,7 @@ package org.elasticsearch.cluster.routing;
|
||||
|
||||
import org.elasticsearch.ElasticSearchException;
|
||||
import org.elasticsearch.cluster.*;
|
||||
import org.elasticsearch.cluster.routing.strategy.ShardsRoutingStrategy;
|
||||
import org.elasticsearch.cluster.routing.allocation.ShardsAllocation;
|
||||
import org.elasticsearch.common.component.AbstractLifecycleComponent;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
@ -42,7 +42,7 @@ public class RoutingService extends AbstractLifecycleComponent<RoutingService> i
|
||||
|
||||
private final ClusterService clusterService;
|
||||
|
||||
private final ShardsRoutingStrategy shardsRoutingStrategy;
|
||||
private final ShardsAllocation shardsAllocation;
|
||||
|
||||
private final TimeValue schedule;
|
||||
|
||||
@ -50,11 +50,11 @@ public class RoutingService extends AbstractLifecycleComponent<RoutingService> i
|
||||
|
||||
private volatile Future scheduledRoutingTableFuture;
|
||||
|
||||
@Inject public RoutingService(Settings settings, ThreadPool threadPool, ClusterService clusterService, ShardsRoutingStrategy shardsRoutingStrategy) {
|
||||
@Inject public RoutingService(Settings settings, ThreadPool threadPool, ClusterService clusterService, ShardsAllocation shardsAllocation) {
|
||||
super(settings);
|
||||
this.threadPool = threadPool;
|
||||
this.clusterService = clusterService;
|
||||
this.shardsRoutingStrategy = shardsRoutingStrategy;
|
||||
this.shardsAllocation = shardsAllocation;
|
||||
this.schedule = componentSettings.getAsTime("schedule", timeValueSeconds(10));
|
||||
}
|
||||
|
||||
@ -120,7 +120,7 @@ public class RoutingService extends AbstractLifecycleComponent<RoutingService> i
|
||||
}
|
||||
clusterService.submitStateUpdateTask(CLUSTER_UPDATE_TASK_SOURCE, new ClusterStateUpdateTask() {
|
||||
@Override public ClusterState execute(ClusterState currentState) {
|
||||
RoutingTable newRoutingTable = shardsRoutingStrategy.reroute(currentState);
|
||||
RoutingTable newRoutingTable = shardsAllocation.reroute(currentState);
|
||||
if (newRoutingTable == currentState.routingTable()) {
|
||||
// no state changed
|
||||
return currentState;
|
||||
|
@ -0,0 +1,47 @@
|
||||
/*
|
||||
* Licensed to Elastic Search and Shay Banon under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Elastic Search licenses this
|
||||
* file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.cluster.routing.allocation;
|
||||
|
||||
import org.elasticsearch.cluster.routing.RoutingNode;
|
||||
import org.elasticsearch.cluster.routing.RoutingNodes;
|
||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
|
||||
/**
|
||||
* A pluggable logic allowing to control if allocation of a shard is allowed on a specific node.
|
||||
*
|
||||
* @author kimchy (shay.banon)
|
||||
*/
|
||||
public interface NodeAllocation {
|
||||
|
||||
enum Decision {
|
||||
ALLOWED {
|
||||
@Override boolean allocate() {
|
||||
return true;
|
||||
}},
|
||||
DISALLOWED {
|
||||
@Override boolean allocate() {
|
||||
return false;
|
||||
}};
|
||||
|
||||
abstract boolean allocate();
|
||||
}
|
||||
|
||||
Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingNodes routingNodes);
|
||||
}
|
@ -0,0 +1,62 @@
|
||||
/*
|
||||
* Licensed to Elastic Search and Shay Banon under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Elastic Search licenses this
|
||||
* file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.cluster.routing.allocation;
|
||||
|
||||
import org.elasticsearch.cluster.routing.RoutingNode;
|
||||
import org.elasticsearch.cluster.routing.RoutingNodes;
|
||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
import org.elasticsearch.common.collect.ImmutableSet;
|
||||
import org.elasticsearch.common.component.AbstractComponent;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
* Holds several {@link NodeAllocation}s and combines them into a single allocation decision.
|
||||
*
|
||||
* @author kimchy (shay.banon)
|
||||
*/
|
||||
public class NodeAllocations extends AbstractComponent implements NodeAllocation {
|
||||
|
||||
private final NodeAllocation[] allocations;
|
||||
|
||||
public NodeAllocations(Settings settings) {
|
||||
this(settings, ImmutableSet.<NodeAllocation>builder()
|
||||
.add(new SameShardNodeAllocation(settings)
|
||||
).build()
|
||||
);
|
||||
}
|
||||
|
||||
@Inject public NodeAllocations(Settings settings, Set<NodeAllocation> allocations) {
|
||||
super(settings);
|
||||
this.allocations = allocations.toArray(new NodeAllocation[allocations.size()]);
|
||||
}
|
||||
|
||||
@Override public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingNodes routingNodes) {
|
||||
for (NodeAllocation allocation : allocations) {
|
||||
Decision decision = allocation.canAllocate(shardRouting, node, routingNodes);
|
||||
if (decision == Decision.DISALLOWED) {
|
||||
return Decision.DISALLOWED;
|
||||
}
|
||||
}
|
||||
return Decision.ALLOWED;
|
||||
}
|
||||
}
|
@ -17,7 +17,7 @@
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.cluster.routing.strategy;
|
||||
package org.elasticsearch.cluster.routing.allocation;
|
||||
|
||||
import org.elasticsearch.ExceptionsHelper;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
@ -40,7 +40,6 @@ import org.elasticsearch.index.store.IndexStore;
|
||||
import org.elasticsearch.index.store.StoreFileMetaData;
|
||||
import org.elasticsearch.indices.IndicesService;
|
||||
import org.elasticsearch.indices.store.TransportNodesListShardStoreMetaData;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.ConnectTransportException;
|
||||
|
||||
import java.util.Iterator;
|
||||
@ -51,18 +50,19 @@ import java.util.concurrent.CountDownLatch;
|
||||
*/
|
||||
public class PreferUnallocatedShardUnassignedStrategy extends AbstractComponent implements PreferUnallocatedStrategy {
|
||||
|
||||
private final ThreadPool threadPool;
|
||||
|
||||
private final IndicesService indicesService;
|
||||
|
||||
private final TransportNodesListShardStoreMetaData transportNodesListShardStoreMetaData;
|
||||
|
||||
@Inject public PreferUnallocatedShardUnassignedStrategy(Settings settings, ThreadPool threadPool, IndicesService indicesService,
|
||||
TransportNodesListShardStoreMetaData transportNodesListShardStoreMetaData) {
|
||||
private final NodeAllocations nodeAllocations;
|
||||
|
||||
@Inject public PreferUnallocatedShardUnassignedStrategy(Settings settings, IndicesService indicesService,
|
||||
TransportNodesListShardStoreMetaData transportNodesListShardStoreMetaData,
|
||||
NodeAllocations nodeAllocations) {
|
||||
super(settings);
|
||||
this.threadPool = threadPool;
|
||||
this.indicesService = indicesService;
|
||||
this.transportNodesListShardStoreMetaData = transportNodesListShardStoreMetaData;
|
||||
this.nodeAllocations = nodeAllocations;
|
||||
}
|
||||
|
||||
@Override public void prefetch(IndexMetaData index, DiscoveryNodes nodes) {
|
||||
@ -149,7 +149,7 @@ public class PreferUnallocatedShardUnassignedStrategy extends AbstractComponent
|
||||
}
|
||||
|
||||
// check if we can allocate on that node...
|
||||
if (!(node.canAllocate(routingNodes) && node.canAllocate(shard))) {
|
||||
if (!nodeAllocations.canAllocate(shard, node, routingNodes).allocate()) {
|
||||
continue;
|
||||
}
|
||||
// if it is already allocated, we can't assign to it...
|
@ -17,7 +17,7 @@
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.cluster.routing.strategy;
|
||||
package org.elasticsearch.cluster.routing.allocation;
|
||||
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
@ -0,0 +1,50 @@
|
||||
/*
|
||||
* Licensed to Elastic Search and Shay Banon under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Elastic Search licenses this
|
||||
* file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.cluster.routing.allocation;
|
||||
|
||||
import org.elasticsearch.cluster.routing.MutableShardRouting;
|
||||
import org.elasticsearch.cluster.routing.RoutingNode;
|
||||
import org.elasticsearch.cluster.routing.RoutingNodes;
|
||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
import org.elasticsearch.common.component.AbstractComponent;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
|
||||
/**
|
||||
* An allocation strategy that does not allow for the same shard instance to be allocated on the same node.
|
||||
*
|
||||
* @author kimchy (shay.banon)
|
||||
*/
|
||||
public class SameShardNodeAllocation extends AbstractComponent implements NodeAllocation {
|
||||
|
||||
@Inject public SameShardNodeAllocation(Settings settings) {
|
||||
super(settings);
|
||||
}
|
||||
|
||||
@Override public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingNodes routingNodes) {
|
||||
for (MutableShardRouting current : node.shards()) {
|
||||
// we do not allow for two shards of the same shard id to exists on the same node
|
||||
if (current.shardId().equals(shardRouting.shardId())) {
|
||||
return Decision.DISALLOWED;
|
||||
}
|
||||
}
|
||||
return Decision.ALLOWED;
|
||||
}
|
||||
}
|
@ -0,0 +1,39 @@
|
||||
/*
|
||||
* Licensed to Elastic Search and Shay Banon under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Elastic Search licenses this
|
||||
* file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.cluster.routing.allocation;
|
||||
|
||||
import org.elasticsearch.common.inject.AbstractModule;
|
||||
import org.elasticsearch.common.inject.multibindings.Multibinder;
|
||||
|
||||
/**
|
||||
* @author kimchy (shay.banon)
|
||||
*/
|
||||
public class ShardAllocationModule extends AbstractModule {
|
||||
|
||||
@Override protected void configure() {
|
||||
bind(ShardsAllocation.class).asEagerSingleton();
|
||||
bind(PreferUnallocatedStrategy.class).to(PreferUnallocatedShardUnassignedStrategy.class).asEagerSingleton();
|
||||
|
||||
Multibinder<NodeAllocation> decidersBinder = Multibinder.newSetBinder(binder(), NodeAllocation.class);
|
||||
decidersBinder.addBinding().to(SameShardNodeAllocation.class);
|
||||
|
||||
bind(NodeAllocations.class).asEagerSingleton();
|
||||
}
|
||||
}
|
@ -17,7 +17,7 @@
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.cluster.routing.strategy;
|
||||
package org.elasticsearch.cluster.routing.allocation;
|
||||
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
@ -39,16 +39,24 @@ import static org.elasticsearch.common.collect.Sets.*;
|
||||
/**
|
||||
* @author kimchy (shay.banon)
|
||||
*/
|
||||
public class ShardsRoutingStrategy extends AbstractComponent {
|
||||
public class ShardsAllocation extends AbstractComponent {
|
||||
|
||||
private final NodeAllocations nodeAllocations;
|
||||
|
||||
private final PreferUnallocatedStrategy preferUnallocatedStrategy;
|
||||
|
||||
public ShardsRoutingStrategy() {
|
||||
this(ImmutableSettings.Builder.EMPTY_SETTINGS, null);
|
||||
public ShardsAllocation() {
|
||||
this(ImmutableSettings.Builder.EMPTY_SETTINGS);
|
||||
}
|
||||
|
||||
@Inject public ShardsRoutingStrategy(Settings settings, @Nullable PreferUnallocatedStrategy preferUnallocatedStrategy) {
|
||||
public ShardsAllocation(Settings settings) {
|
||||
this(settings, new NodeAllocations(settings), null);
|
||||
}
|
||||
|
||||
@Inject public ShardsAllocation(Settings settings, NodeAllocations nodeAllocations,
|
||||
@Nullable PreferUnallocatedStrategy preferUnallocatedStrategy) {
|
||||
super(settings);
|
||||
this.nodeAllocations = nodeAllocations;
|
||||
this.preferUnallocatedStrategy = preferUnallocatedStrategy;
|
||||
}
|
||||
|
||||
@ -171,7 +179,7 @@ public class ShardsRoutingStrategy extends AbstractComponent {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (lowRoutingNode.canAllocate(routingNodes) && lowRoutingNode.canAllocate(startedShard)) {
|
||||
if (nodeAllocations.canAllocate(startedShard, lowRoutingNode, routingNodes).allocate()) {
|
||||
changed = true;
|
||||
lowRoutingNode.add(new MutableShardRouting(startedShard.index(), startedShard.id(),
|
||||
lowRoutingNode.nodeId(), startedShard.currentNodeId(),
|
||||
@ -228,6 +236,7 @@ public class ShardsRoutingStrategy extends AbstractComponent {
|
||||
|
||||
Iterator<MutableShardRouting> unassignedIterator = routingNodes.unassigned().iterator();
|
||||
int lastNode = 0;
|
||||
|
||||
while (unassignedIterator.hasNext()) {
|
||||
MutableShardRouting shard = unassignedIterator.next();
|
||||
// if its a replica, only allocate it if the primary is active
|
||||
@ -242,10 +251,11 @@ public class ShardsRoutingStrategy extends AbstractComponent {
|
||||
for (int i = 0; i < nodes.size(); i++) {
|
||||
RoutingNode node = nodes.get(lastNode);
|
||||
lastNode++;
|
||||
if (lastNode == nodes.size())
|
||||
if (lastNode == nodes.size()) {
|
||||
lastNode = 0;
|
||||
}
|
||||
|
||||
if (node.canAllocate(routingNodes) && node.canAllocate(shard)) {
|
||||
if (nodeAllocations.canAllocate(shard, node, routingNodes).allocate()) {
|
||||
int numberOfShardsToAllocate = routingNodes.requiredAverageNumberOfShardsPerNode() - node.shards().size();
|
||||
if (numberOfShardsToAllocate <= 0) {
|
||||
continue;
|
||||
@ -271,7 +281,7 @@ public class ShardsRoutingStrategy extends AbstractComponent {
|
||||
}
|
||||
// go over the nodes and try and allocate the remaining ones
|
||||
for (RoutingNode routingNode : routingNodes.sortedNodesLeastToHigh()) {
|
||||
if (routingNode.canAllocate(routingNodes) && routingNode.canAllocate(shard)) {
|
||||
if (nodeAllocations.canAllocate(shard, routingNode, routingNodes).allocate()) {
|
||||
changed = true;
|
||||
routingNode.add(shard);
|
||||
it.remove();
|
||||
@ -462,10 +472,8 @@ public class ShardsRoutingStrategy extends AbstractComponent {
|
||||
boolean allocated = false;
|
||||
List<RoutingNode> sortedNodesLeastToHigh = routingNodes.sortedNodesLeastToHigh();
|
||||
for (RoutingNode target : sortedNodesLeastToHigh) {
|
||||
if (target.canAllocate(failedShard) &&
|
||||
target.canAllocate(routingNodes) &&
|
||||
!target.nodeId().equals(failedShard.currentNodeId())) {
|
||||
|
||||
if (!target.nodeId().equals(failedShard.currentNodeId()) &&
|
||||
nodeAllocations.canAllocate(failedShard, target, routingNodes).allocate()) {
|
||||
target.add(new MutableShardRouting(failedShard.index(), failedShard.id(),
|
||||
target.nodeId(), failedShard.relocatingNodeId(),
|
||||
failedShard.primary(), INITIALIZING));
|
@ -17,7 +17,7 @@
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.cluster.routing.strategy;
|
||||
package org.elasticsearch.cluster.routing.allocation;
|
||||
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
@ -47,7 +47,7 @@ public class ElectReplicaAsPrimaryDuringRelocationTests {
|
||||
private final ESLogger logger = Loggers.getLogger(ElectReplicaAsPrimaryDuringRelocationTests.class);
|
||||
|
||||
@Test public void testElectReplicaAsPrimaryDuringRelocation() {
|
||||
ShardsRoutingStrategy strategy = new ShardsRoutingStrategy();
|
||||
ShardsAllocation strategy = new ShardsAllocation();
|
||||
|
||||
logger.info("Building initial routing table");
|
||||
|
@ -17,7 +17,7 @@
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.cluster.routing.strategy;
|
||||
package org.elasticsearch.cluster.routing.allocation;
|
||||
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
@ -50,7 +50,7 @@ public class FailedShardsRoutingTests {
|
||||
private final ESLogger logger = Loggers.getLogger(FailedShardsRoutingTests.class);
|
||||
|
||||
@Test public void testFailures() {
|
||||
ShardsRoutingStrategy strategy = new ShardsRoutingStrategy();
|
||||
ShardsAllocation strategy = new ShardsAllocation();
|
||||
|
||||
logger.info("Building initial routing table");
|
||||
|
||||
@ -163,7 +163,7 @@ public class FailedShardsRoutingTests {
|
||||
}
|
||||
|
||||
@Test public void test10ShardsWith1ReplicaFailure() {
|
||||
ShardsRoutingStrategy strategy = new ShardsRoutingStrategy();
|
||||
ShardsAllocation strategy = new ShardsAllocation();
|
||||
|
||||
logger.info("Building initial routing table");
|
||||
|
@ -17,7 +17,7 @@
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.cluster.routing.strategy;
|
||||
package org.elasticsearch.cluster.routing.allocation;
|
||||
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
@ -46,7 +46,7 @@ public class PrimaryElectionRoutingTests {
|
||||
private final ESLogger logger = Loggers.getLogger(PrimaryElectionRoutingTests.class);
|
||||
|
||||
@Test public void testBackupElectionToPrimaryWhenPrimaryCanBeAllocatedToAnotherNode() {
|
||||
ShardsRoutingStrategy strategy = new ShardsRoutingStrategy();
|
||||
ShardsAllocation strategy = new ShardsAllocation();
|
||||
|
||||
logger.info("Building initial routing table");
|
||||
|
@ -17,7 +17,7 @@
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.cluster.routing.strategy;
|
||||
package org.elasticsearch.cluster.routing.allocation;
|
||||
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
@ -48,7 +48,7 @@ public class RebalanceAfterActiveTests {
|
||||
|
||||
@Test public void testRebalanceOnlyAfterAllShardsAreActive() {
|
||||
|
||||
ShardsRoutingStrategy strategy = new ShardsRoutingStrategy();
|
||||
ShardsAllocation strategy = new ShardsAllocation();
|
||||
|
||||
logger.info("Building initial routing table");
|
||||
|
@ -17,7 +17,7 @@
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.cluster.routing.strategy;
|
||||
package org.elasticsearch.cluster.routing.allocation;
|
||||
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
@ -47,7 +47,7 @@ public class ReplicaAllocatedAfterPrimaryTests {
|
||||
|
||||
@Test public void testBackupIsAllocatedAfterPrimary() {
|
||||
|
||||
ShardsRoutingStrategy strategy = new ShardsRoutingStrategy();
|
||||
ShardsAllocation strategy = new ShardsAllocation();
|
||||
|
||||
logger.info("Building initial routing table");
|
||||
|
@ -17,7 +17,7 @@
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.cluster.routing.strategy;
|
||||
package org.elasticsearch.cluster.routing.allocation;
|
||||
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
@ -49,12 +49,12 @@ import static org.hamcrest.Matchers.*;
|
||||
/**
|
||||
* @author kimchy (shay.banon)
|
||||
*/
|
||||
public class SingleShardNoReplicasRoutingStrategyTests {
|
||||
public class SingleShardNoReplicasRoutingTests {
|
||||
|
||||
private final ESLogger logger = Loggers.getLogger(SingleShardNoReplicasRoutingStrategyTests.class);
|
||||
private final ESLogger logger = Loggers.getLogger(SingleShardNoReplicasRoutingTests.class);
|
||||
|
||||
@Test public void testSingleIndexStartedShard() {
|
||||
ShardsRoutingStrategy strategy = new ShardsRoutingStrategy();
|
||||
ShardsAllocation strategy = new ShardsAllocation();
|
||||
|
||||
logger.info("Building initial routing table");
|
||||
|
||||
@ -154,7 +154,7 @@ public class SingleShardNoReplicasRoutingStrategyTests {
|
||||
}
|
||||
|
||||
@Test public void testSingleIndexShardFailed() {
|
||||
ShardsRoutingStrategy strategy = new ShardsRoutingStrategy();
|
||||
ShardsAllocation strategy = new ShardsAllocation();
|
||||
|
||||
logger.info("Building initial routing table");
|
||||
|
||||
@ -203,7 +203,7 @@ public class SingleShardNoReplicasRoutingStrategyTests {
|
||||
}
|
||||
|
||||
@Test public void testMultiIndexEvenDistribution() {
|
||||
ShardsRoutingStrategy strategy = new ShardsRoutingStrategy();
|
||||
ShardsAllocation strategy = new ShardsAllocation();
|
||||
|
||||
final int numberOfIndices = 50;
|
||||
logger.info("Building initial routing table with " + numberOfIndices + " indices");
|
||||
@ -311,7 +311,7 @@ public class SingleShardNoReplicasRoutingStrategyTests {
|
||||
}
|
||||
|
||||
@Test public void testMultiIndexUnevenNodes() {
|
||||
ShardsRoutingStrategy strategy = new ShardsRoutingStrategy();
|
||||
ShardsAllocation strategy = new ShardsAllocation();
|
||||
|
||||
final int numberOfIndices = 10;
|
||||
logger.info("Building initial routing table with " + numberOfIndices + " indices");
|
@ -17,7 +17,7 @@
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.cluster.routing.strategy;
|
||||
package org.elasticsearch.cluster.routing.allocation;
|
||||
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
@ -41,12 +41,12 @@ import static org.hamcrest.Matchers.*;
|
||||
/**
|
||||
* @author kimchy (shay.banon)
|
||||
*/
|
||||
public class SingleShardOneReplicaRoutingStrategyTests {
|
||||
public class SingleShardOneReplicaRoutingTests {
|
||||
|
||||
private final ESLogger logger = Loggers.getLogger(SingleShardOneReplicaRoutingStrategyTests.class);
|
||||
private final ESLogger logger = Loggers.getLogger(SingleShardOneReplicaRoutingTests.class);
|
||||
|
||||
@Test public void testSingleIndexFirstStartPrimaryThenBackups() {
|
||||
ShardsRoutingStrategy strategy = new ShardsRoutingStrategy();
|
||||
ShardsAllocation strategy = new ShardsAllocation();
|
||||
|
||||
logger.info("Building initial routing table");
|
||||
|
@ -17,7 +17,7 @@
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.cluster.routing.strategy;
|
||||
package org.elasticsearch.cluster.routing.allocation;
|
||||
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
@ -46,7 +46,7 @@ public class TenShardsOneReplicaRoutingTests {
|
||||
private final ESLogger logger = Loggers.getLogger(TenShardsOneReplicaRoutingTests.class);
|
||||
|
||||
@Test public void testSingleIndexFirstStartPrimaryThenBackups() {
|
||||
ShardsRoutingStrategy strategy = new ShardsRoutingStrategy();
|
||||
ShardsAllocation strategy = new ShardsAllocation();
|
||||
|
||||
logger.info("Building initial routing table");
|
||||
|
@ -1,4 +1,4 @@
|
||||
package org.elasticsearch.cluster.routing.strategy;
|
||||
package org.elasticsearch.cluster.routing.allocation;
|
||||
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
@ -28,7 +28,7 @@ public class UpdateNumberOfReplicasTests {
|
||||
private final ESLogger logger = Loggers.getLogger(UpdateNumberOfReplicasTests.class);
|
||||
|
||||
@Test public void testUpdateNumberOfReplicas() {
|
||||
ShardsRoutingStrategy strategy = new ShardsRoutingStrategy();
|
||||
ShardsAllocation strategy = new ShardsAllocation();
|
||||
|
||||
logger.info("Building initial routing table");
|
||||
|
@ -24,7 +24,7 @@ import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
import org.elasticsearch.cluster.routing.RoutingTable;
|
||||
import org.elasticsearch.cluster.routing.strategy.ShardsRoutingStrategy;
|
||||
import org.elasticsearch.cluster.routing.allocation.ShardsAllocation;
|
||||
import org.elasticsearch.common.io.stream.BytesStreamInput;
|
||||
import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
||||
import org.elasticsearch.common.settings.ImmutableSettings;
|
||||
@ -56,7 +56,7 @@ public class ClusterSerializationTests {
|
||||
|
||||
ClusterState clusterState = newClusterStateBuilder().nodes(nodes).metaData(metaData).routingTable(routingTable).build();
|
||||
|
||||
ShardsRoutingStrategy strategy = new ShardsRoutingStrategy();
|
||||
ShardsAllocation strategy = new ShardsAllocation();
|
||||
clusterState = newClusterStateBuilder().state(clusterState).routingTable(strategy.reroute(clusterState)).build();
|
||||
|
||||
ClusterState serializedClusterState = ClusterState.Builder.fromBytes(ClusterState.Builder.toBytes(clusterState), ImmutableSettings.settingsBuilder().build(), newNode("node1"));
|
||||
@ -78,7 +78,7 @@ public class ClusterSerializationTests {
|
||||
|
||||
ClusterState clusterState = newClusterStateBuilder().nodes(nodes).metaData(metaData).routingTable(routingTable).build();
|
||||
|
||||
ShardsRoutingStrategy strategy = new ShardsRoutingStrategy();
|
||||
ShardsAllocation strategy = new ShardsAllocation();
|
||||
RoutingTable source = strategy.reroute(clusterState);
|
||||
|
||||
BytesStreamOutput outStream = new BytesStreamOutput();
|
||||
|
Loading…
x
Reference in New Issue
Block a user