improve replica allocation, fix test to wait for recovery on node2
This commit is contained in:
parent
9503fca2ae
commit
289cfdf25a
|
@ -19,6 +19,7 @@
|
||||||
|
|
||||||
package org.elasticsearch.gateway.local;
|
package org.elasticsearch.gateway.local;
|
||||||
|
|
||||||
|
import org.elasticsearch.ExceptionsHelper;
|
||||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||||
import org.elasticsearch.cluster.routing.*;
|
import org.elasticsearch.cluster.routing.*;
|
||||||
|
@ -29,12 +30,22 @@ import org.elasticsearch.common.collect.Sets;
|
||||||
import org.elasticsearch.common.collect.Tuple;
|
import org.elasticsearch.common.collect.Tuple;
|
||||||
import org.elasticsearch.common.inject.Inject;
|
import org.elasticsearch.common.inject.Inject;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
|
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||||
|
import org.elasticsearch.common.unit.TimeValue;
|
||||||
|
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
|
||||||
|
import org.elasticsearch.index.service.InternalIndexService;
|
||||||
import org.elasticsearch.index.shard.ShardId;
|
import org.elasticsearch.index.shard.ShardId;
|
||||||
|
import org.elasticsearch.index.store.IndexStore;
|
||||||
|
import org.elasticsearch.index.store.StoreFileMetaData;
|
||||||
|
import org.elasticsearch.indices.IndicesService;
|
||||||
|
import org.elasticsearch.indices.store.TransportNodesListShardStoreMetaData;
|
||||||
|
import org.elasticsearch.transport.ConnectTransportException;
|
||||||
|
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.ConcurrentMap;
|
||||||
|
|
||||||
import static org.elasticsearch.cluster.routing.ShardRoutingState.*;
|
import static org.elasticsearch.cluster.routing.ShardRoutingState.*;
|
||||||
|
|
||||||
|
@ -43,14 +54,36 @@ import static org.elasticsearch.cluster.routing.ShardRoutingState.*;
|
||||||
*/
|
*/
|
||||||
public class LocalGatewayNodeAllocation extends NodeAllocation {
|
public class LocalGatewayNodeAllocation extends NodeAllocation {
|
||||||
|
|
||||||
|
private final IndicesService indicesService;
|
||||||
|
|
||||||
private final TransportNodesListGatewayState listGatewayState;
|
private final TransportNodesListGatewayState listGatewayState;
|
||||||
|
|
||||||
@Inject public LocalGatewayNodeAllocation(Settings settings, TransportNodesListGatewayState listGatewayState) {
|
private final TransportNodesListShardStoreMetaData listShardStoreMetaData;
|
||||||
|
|
||||||
|
private final ConcurrentMap<ShardId, ConcurrentMap<DiscoveryNode, IndexStore.StoreFilesMetaData>> cachedStores = ConcurrentCollections.newConcurrentMap();
|
||||||
|
|
||||||
|
private final TimeValue listTimeout;
|
||||||
|
|
||||||
|
@Inject public LocalGatewayNodeAllocation(Settings settings, IndicesService indicesService,
|
||||||
|
TransportNodesListGatewayState listGatewayState, TransportNodesListShardStoreMetaData listShardStoreMetaData) {
|
||||||
super(settings);
|
super(settings);
|
||||||
|
this.indicesService = indicesService;
|
||||||
this.listGatewayState = listGatewayState;
|
this.listGatewayState = listGatewayState;
|
||||||
|
this.listShardStoreMetaData = listShardStoreMetaData;
|
||||||
|
|
||||||
|
this.listTimeout = componentSettings.getAsTime("list_timeout", TimeValue.timeValueSeconds(30));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public void applyStartedShards(NodeAllocations nodeAllocations, RoutingNodes routingNodes, DiscoveryNodes nodes, List<? extends ShardRouting> startedShards) {
|
||||||
|
for (ShardRouting shardRouting : startedShards) {
|
||||||
|
cachedStores.remove(shardRouting.shardId());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override public void applyFailedShards(NodeAllocations nodeAllocations, RoutingNodes routingNodes, DiscoveryNodes nodes, List<? extends ShardRouting> failedShards) {
|
@Override public void applyFailedShards(NodeAllocations nodeAllocations, RoutingNodes routingNodes, DiscoveryNodes nodes, List<? extends ShardRouting> failedShards) {
|
||||||
|
for (ShardRouting shardRouting : failedShards) {
|
||||||
|
cachedStores.remove(shardRouting.shardId());
|
||||||
|
}
|
||||||
for (ShardRouting failedShard : failedShards) {
|
for (ShardRouting failedShard : failedShards) {
|
||||||
IndexRoutingTable indexRoutingTable = routingNodes.routingTable().index(failedShard.index());
|
IndexRoutingTable indexRoutingTable = routingNodes.routingTable().index(failedShard.index());
|
||||||
if (!routingNodes.blocks().hasIndexBlock(indexRoutingTable.index(), LocalGateway.INDEX_NOT_RECOVERED_BLOCK)) {
|
if (!routingNodes.blocks().hasIndexBlock(indexRoutingTable.index(), LocalGateway.INDEX_NOT_RECOVERED_BLOCK)) {
|
||||||
|
@ -153,6 +186,7 @@ public class LocalGatewayNodeAllocation extends NodeAllocation {
|
||||||
MutableShardRouting shardRouting = it.next();
|
MutableShardRouting shardRouting = it.next();
|
||||||
if (shardRouting.primary()) {
|
if (shardRouting.primary()) {
|
||||||
DiscoveryNode node = shards.get(shardRouting.shardId()).v1();
|
DiscoveryNode node = shards.get(shardRouting.shardId()).v1();
|
||||||
|
logger.debug("[{}][{}] initial allocation to [{}]", shardRouting.index(), shardRouting.id(), node);
|
||||||
RoutingNode routingNode = routingNodes.node(node.id());
|
RoutingNode routingNode = routingNodes.node(node.id());
|
||||||
routingNode.add(shardRouting);
|
routingNode.add(shardRouting);
|
||||||
it.remove();
|
it.remove();
|
||||||
|
@ -162,8 +196,181 @@ public class LocalGatewayNodeAllocation extends NodeAllocation {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO optimize replica allocation to existing work locations
|
if (!routingNodes.hasUnassigned()) {
|
||||||
|
return changed;
|
||||||
|
}
|
||||||
|
|
||||||
|
Iterator<MutableShardRouting> unassignedIterator = routingNodes.unassigned().iterator();
|
||||||
|
while (unassignedIterator.hasNext()) {
|
||||||
|
MutableShardRouting shard = unassignedIterator.next();
|
||||||
|
InternalIndexService indexService = (InternalIndexService) indicesService.indexService(shard.index());
|
||||||
|
if (indexService == null) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
// if the store is not persistent, it makes no sense to test for special allocation
|
||||||
|
if (!indexService.store().persistent()) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// pre-check if it can be allocated to any node that currently exists, so we won't list the store for it for nothing
|
||||||
|
boolean canBeAllocatedToAtLeastOneNode = false;
|
||||||
|
for (DiscoveryNode discoNode : nodes.dataNodes().values()) {
|
||||||
|
RoutingNode node = routingNodes.node(discoNode.id());
|
||||||
|
if (node == null) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
// if its THROTTLING, we are not going to allocate it to this node, so ignore it as well
|
||||||
|
if (nodeAllocations.canAllocate(shard, node, routingNodes).allocate()) {
|
||||||
|
canBeAllocatedToAtLeastOneNode = true;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!canBeAllocatedToAtLeastOneNode) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
ConcurrentMap<DiscoveryNode, IndexStore.StoreFilesMetaData> shardStores = buildShardStores(nodes, shard);
|
||||||
|
|
||||||
|
long lastSizeMatched = 0;
|
||||||
|
DiscoveryNode lastDiscoNodeMatched = null;
|
||||||
|
RoutingNode lastNodeMatched = null;
|
||||||
|
|
||||||
|
for (Map.Entry<DiscoveryNode, IndexStore.StoreFilesMetaData> nodeStoreEntry : shardStores.entrySet()) {
|
||||||
|
DiscoveryNode discoNode = nodeStoreEntry.getKey();
|
||||||
|
IndexStore.StoreFilesMetaData storeFilesMetaData = nodeStoreEntry.getValue();
|
||||||
|
logger.trace("{}: checking node [{}]", shard, discoNode);
|
||||||
|
|
||||||
|
if (storeFilesMetaData == null) {
|
||||||
|
// already allocated on that node...
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
RoutingNode node = routingNodes.node(discoNode.id());
|
||||||
|
if (node == null) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// check if we can allocate on that node...
|
||||||
|
// we only check for NO, since if this node is THROTTLING and it has enough "same data"
|
||||||
|
// then we will try and assign it next time
|
||||||
|
if (nodeAllocations.canAllocate(shard, node, routingNodes) == Decision.NO) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// if it is already allocated, we can't assign to it...
|
||||||
|
if (storeFilesMetaData.allocated()) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!shard.primary()) {
|
||||||
|
MutableShardRouting primaryShard = routingNodes.findPrimaryForReplica(shard);
|
||||||
|
if (primaryShard != null && primaryShard.active()) {
|
||||||
|
DiscoveryNode primaryNode = nodes.get(primaryShard.currentNodeId());
|
||||||
|
if (primaryNode != null) {
|
||||||
|
IndexStore.StoreFilesMetaData primaryNodeStore = shardStores.get(primaryNode);
|
||||||
|
if (primaryNodeStore != null && primaryNodeStore.allocated()) {
|
||||||
|
long sizeMatched = 0;
|
||||||
|
|
||||||
|
for (StoreFileMetaData storeFileMetaData : storeFilesMetaData) {
|
||||||
|
if (primaryNodeStore.fileExists(storeFileMetaData.name()) && primaryNodeStore.file(storeFileMetaData.name()).length() == storeFileMetaData.length()) {
|
||||||
|
sizeMatched += storeFileMetaData.length();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (sizeMatched > lastSizeMatched) {
|
||||||
|
lastSizeMatched = sizeMatched;
|
||||||
|
lastDiscoNodeMatched = discoNode;
|
||||||
|
lastNodeMatched = node;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (lastNodeMatched != null) {
|
||||||
|
if (nodeAllocations.canAllocate(shard, lastNodeMatched, routingNodes) == NodeAllocation.Decision.THROTTLE) {
|
||||||
|
if (logger.isTraceEnabled()) {
|
||||||
|
logger.debug("[{}][{}]: throttling allocation [{}] to [{}] in order to reuse its unallocated persistent store with total_size [{}]", shard.index(), shard.id(), shard, lastDiscoNodeMatched, new ByteSizeValue(lastSizeMatched));
|
||||||
|
}
|
||||||
|
// we are throttling this, but we have enough to allocate to this node, ignore it for now
|
||||||
|
unassignedIterator.remove();
|
||||||
|
routingNodes.ignoredUnassigned().add(shard);
|
||||||
|
} else {
|
||||||
|
if (logger.isDebugEnabled()) {
|
||||||
|
logger.debug("[{}][{}]: allocating [{}] to [{}] in order to reuse its unallocated persistent store with total_size [{}]", shard.index(), shard.id(), shard, lastDiscoNodeMatched, new ByteSizeValue(lastSizeMatched));
|
||||||
|
}
|
||||||
|
// we found a match
|
||||||
|
changed = true;
|
||||||
|
lastNodeMatched.add(shard);
|
||||||
|
unassignedIterator.remove();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return changed;
|
return changed;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private ConcurrentMap<DiscoveryNode, IndexStore.StoreFilesMetaData> buildShardStores(DiscoveryNodes nodes, MutableShardRouting shard) {
|
||||||
|
ConcurrentMap<DiscoveryNode, IndexStore.StoreFilesMetaData> shardStores = cachedStores.get(shard.shardId());
|
||||||
|
if (shardStores == null) {
|
||||||
|
shardStores = ConcurrentCollections.newConcurrentMap();
|
||||||
|
TransportNodesListShardStoreMetaData.NodesStoreFilesMetaData nodesStoreFilesMetaData = listShardStoreMetaData.list(shard.shardId(), false, nodes.dataNodes().keySet(), listTimeout).actionGet();
|
||||||
|
if (logger.isDebugEnabled()) {
|
||||||
|
if (nodesStoreFilesMetaData.failures().length > 0) {
|
||||||
|
StringBuilder sb = new StringBuilder(shard + ": failures when trying to list stores on nodes:");
|
||||||
|
for (int i = 0; i < nodesStoreFilesMetaData.failures().length; i++) {
|
||||||
|
Throwable cause = ExceptionsHelper.unwrapCause(nodesStoreFilesMetaData.failures()[i]);
|
||||||
|
if (cause instanceof ConnectTransportException) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
sb.append("\n -> ").append(nodesStoreFilesMetaData.failures()[i].getDetailedMessage());
|
||||||
|
}
|
||||||
|
logger.debug(sb.toString());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for (TransportNodesListShardStoreMetaData.NodeStoreFilesMetaData nodeStoreFilesMetaData : nodesStoreFilesMetaData) {
|
||||||
|
shardStores.put(nodeStoreFilesMetaData.node(), nodeStoreFilesMetaData.storeFilesMetaData());
|
||||||
|
}
|
||||||
|
cachedStores.put(shard.shardId(), shardStores);
|
||||||
|
} else {
|
||||||
|
// clean nodes that have failed
|
||||||
|
for (DiscoveryNode node : shardStores.keySet()) {
|
||||||
|
if (!nodes.nodeExists(node.id())) {
|
||||||
|
shardStores.remove(node);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// we have stored cached from before, see if the nodes changed, if they have, go fetch again
|
||||||
|
Set<String> fetchedNodes = Sets.newHashSet();
|
||||||
|
for (DiscoveryNode node : nodes.dataNodes().values()) {
|
||||||
|
if (!shardStores.containsKey(node)) {
|
||||||
|
fetchedNodes.add(node.id());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!fetchedNodes.isEmpty()) {
|
||||||
|
TransportNodesListShardStoreMetaData.NodesStoreFilesMetaData nodesStoreFilesMetaData = listShardStoreMetaData.list(shard.shardId(), false, fetchedNodes, listTimeout).actionGet();
|
||||||
|
if (logger.isTraceEnabled()) {
|
||||||
|
if (nodesStoreFilesMetaData.failures().length > 0) {
|
||||||
|
StringBuilder sb = new StringBuilder(shard + ": failures when trying to list stores on nodes:");
|
||||||
|
for (int i = 0; i < nodesStoreFilesMetaData.failures().length; i++) {
|
||||||
|
Throwable cause = ExceptionsHelper.unwrapCause(nodesStoreFilesMetaData.failures()[i]);
|
||||||
|
if (cause instanceof ConnectTransportException) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
sb.append("\n -> ").append(nodesStoreFilesMetaData.failures()[i].getDetailedMessage());
|
||||||
|
}
|
||||||
|
logger.trace(sb.toString());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for (TransportNodesListShardStoreMetaData.NodeStoreFilesMetaData nodeStoreFilesMetaData : nodesStoreFilesMetaData) {
|
||||||
|
shardStores.put(nodeStoreFilesMetaData.node(), nodeStoreFilesMetaData.storeFilesMetaData());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return shardStores;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -66,7 +66,6 @@ public class LocalIndexShardGateway extends AbstractIndexShardComponent implemen
|
||||||
|
|
||||||
@Override public void recover(RecoveryStatus recoveryStatus) throws IndexShardGatewayRecoveryException {
|
@Override public void recover(RecoveryStatus recoveryStatus) throws IndexShardGatewayRecoveryException {
|
||||||
recoveryStatus().index().startTime(System.currentTimeMillis());
|
recoveryStatus().index().startTime(System.currentTimeMillis());
|
||||||
// read the gateway data persisted
|
|
||||||
long version = -1;
|
long version = -1;
|
||||||
try {
|
try {
|
||||||
if (IndexReader.indexExists(indexShard.store().directory())) {
|
if (IndexReader.indexExists(indexShard.store().directory())) {
|
||||||
|
|
|
@ -72,8 +72,10 @@ public class SimpleRecoveryLocalGatewayTests extends AbstractNodesTests {
|
||||||
assertThat(clusterHealth.timedOut(), equalTo(false));
|
assertThat(clusterHealth.timedOut(), equalTo(false));
|
||||||
assertThat(clusterHealth.status(), equalTo(ClusterHealthStatus.YELLOW));
|
assertThat(clusterHealth.status(), equalTo(ClusterHealthStatus.YELLOW));
|
||||||
|
|
||||||
|
for (int i = 0; i < 10; i++) {
|
||||||
assertThat(node1.client().prepareCount().setQuery(matchAllQuery()).execute().actionGet().count(), equalTo(2l));
|
assertThat(node1.client().prepareCount().setQuery(matchAllQuery()).execute().actionGet().count(), equalTo(2l));
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Test public void testTwoNodeFirstNodeCleared() throws Exception {
|
@Test public void testTwoNodeFirstNodeCleared() throws Exception {
|
||||||
// clean two nodes
|
// clean two nodes
|
||||||
|
@ -89,7 +91,15 @@ public class SimpleRecoveryLocalGatewayTests extends AbstractNodesTests {
|
||||||
node1.client().prepareIndex("test", "type1", "2").setSource(jsonBuilder().startObject().field("field", "value2").endObject()).execute().actionGet();
|
node1.client().prepareIndex("test", "type1", "2").setSource(jsonBuilder().startObject().field("field", "value2").endObject()).execute().actionGet();
|
||||||
node1.client().admin().indices().prepareRefresh().execute().actionGet();
|
node1.client().admin().indices().prepareRefresh().execute().actionGet();
|
||||||
|
|
||||||
|
logger.info("Running Cluster Health (wait for the shards to startup)");
|
||||||
|
ClusterHealthResponse clusterHealth = client("node1").admin().cluster().health(clusterHealthRequest().waitForGreenStatus().waitForActiveShards(2)).actionGet();
|
||||||
|
logger.info("Done Cluster Health, status " + clusterHealth.status());
|
||||||
|
assertThat(clusterHealth.timedOut(), equalTo(false));
|
||||||
|
assertThat(clusterHealth.status(), equalTo(ClusterHealthStatus.GREEN));
|
||||||
|
|
||||||
|
for (int i = 0; i < 10; i++) {
|
||||||
assertThat(node1.client().prepareCount().setQuery(matchAllQuery()).execute().actionGet().count(), equalTo(2l));
|
assertThat(node1.client().prepareCount().setQuery(matchAllQuery()).execute().actionGet().count(), equalTo(2l));
|
||||||
|
}
|
||||||
|
|
||||||
logger.info("--> closing nodes");
|
logger.info("--> closing nodes");
|
||||||
closeNode("node1");
|
closeNode("node1");
|
||||||
|
@ -103,11 +113,13 @@ public class SimpleRecoveryLocalGatewayTests extends AbstractNodesTests {
|
||||||
node2 = startNode("node2", settingsBuilder().put("gateway.type", "local").put("gateway.recover_after_nodes", 2).build());
|
node2 = startNode("node2", settingsBuilder().put("gateway.type", "local").put("gateway.recover_after_nodes", 2).build());
|
||||||
|
|
||||||
logger.info("Running Cluster Health (wait for the shards to startup)");
|
logger.info("Running Cluster Health (wait for the shards to startup)");
|
||||||
ClusterHealthResponse clusterHealth = client("node1").admin().cluster().health(clusterHealthRequest().waitForGreenStatus().waitForActiveShards(2)).actionGet();
|
clusterHealth = client("node1").admin().cluster().health(clusterHealthRequest().waitForGreenStatus().waitForActiveShards(2)).actionGet();
|
||||||
logger.info("Done Cluster Health, status " + clusterHealth.status());
|
logger.info("Done Cluster Health, status " + clusterHealth.status());
|
||||||
assertThat(clusterHealth.timedOut(), equalTo(false));
|
assertThat(clusterHealth.timedOut(), equalTo(false));
|
||||||
assertThat(clusterHealth.status(), equalTo(ClusterHealthStatus.GREEN));
|
assertThat(clusterHealth.status(), equalTo(ClusterHealthStatus.GREEN));
|
||||||
|
|
||||||
|
for (int i = 0; i < 10; i++) {
|
||||||
assertThat(node1.client().prepareCount().setQuery(matchAllQuery()).execute().actionGet().count(), equalTo(2l));
|
assertThat(node1.client().prepareCount().setQuery(matchAllQuery()).execute().actionGet().count(), equalTo(2l));
|
||||||
}
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -2,6 +2,7 @@ log4j.rootLogger=INFO, out
|
||||||
log4j.logger.jgroups=WARN
|
log4j.logger.jgroups=WARN
|
||||||
|
|
||||||
#log4j.logger.action=DEBUG
|
#log4j.logger.action=DEBUG
|
||||||
|
#log4j.logger.gateway=DEBUG
|
||||||
#log4j.logger.transport=TRACE
|
#log4j.logger.transport=TRACE
|
||||||
#log4j.logger.discovery=TRACE
|
#log4j.logger.discovery=TRACE
|
||||||
#log4j.logger.cluster.service=TRACE
|
#log4j.logger.cluster.service=TRACE
|
||||||
|
|
Loading…
Reference in New Issue