smarter handling of throttling when preferring allocating to an existing work location
This commit is contained in:
parent
78b6879ecd
commit
b52d854711
|
@ -118,7 +118,9 @@ public class BlobReuseExistingNodeAllocation extends NodeAllocation {
|
||||||
}
|
}
|
||||||
|
|
||||||
// check if we can allocate on that node...
|
// check if we can allocate on that node...
|
||||||
if (!nodeAllocations.canAllocate(shard, node, routingNodes).allocate()) {
|
// we only check for NO, since if this node is THROTTLING and it has enough "same data"
|
||||||
|
// then we will try and assign it next time
|
||||||
|
if (nodeAllocations.canAllocate(shard, node, routingNodes) == Decision.NO) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -192,6 +192,11 @@ public class IndexShardGatewayService extends AbstractIndexShardComponent implem
|
||||||
} catch (IndexShardNotStartedException e) {
|
} catch (IndexShardNotStartedException e) {
|
||||||
listener.onIgnoreRecovery("shard closed");
|
listener.onIgnoreRecovery("shard closed");
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
|
if (indexShard.state() == IndexShardState.CLOSED) {
|
||||||
|
// got closed on us, just ignore this recovery
|
||||||
|
listener.onIgnoreRecovery("shard closed");
|
||||||
|
return;
|
||||||
|
}
|
||||||
listener.onRecoveryFailed(new IndexShardGatewayRecoveryException(shardId, "failed recovery", e));
|
listener.onRecoveryFailed(new IndexShardGatewayRecoveryException(shardId, "failed recovery", e));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -220,6 +220,10 @@ public abstract class AbstractSimpleIndexGatewayTests extends AbstractNodesTests
|
||||||
testLoad(false);
|
testLoad(false);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected boolean isPersistentStorage() {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
private void testLoad(boolean fullRecovery) {
|
private void testLoad(boolean fullRecovery) {
|
||||||
startNode("server1");
|
startNode("server1");
|
||||||
|
|
||||||
|
@ -265,7 +269,7 @@ public abstract class AbstractSimpleIndexGatewayTests extends AbstractNodesTests
|
||||||
for (IndexShardStatus indexShardStatus : statusResponse.index("test")) {
|
for (IndexShardStatus indexShardStatus : statusResponse.index("test")) {
|
||||||
for (ShardStatus shardStatus : indexShardStatus) {
|
for (ShardStatus shardStatus : indexShardStatus) {
|
||||||
if (shardStatus.shardRouting().primary()) {
|
if (shardStatus.shardRouting().primary()) {
|
||||||
if (fullRecovery) {
|
if (fullRecovery || !isPersistentStorage()) {
|
||||||
assertThat(shardStatus.gatewayRecoveryStatus().reusedIndexSize().bytes(), equalTo(0l));
|
assertThat(shardStatus.gatewayRecoveryStatus().reusedIndexSize().bytes(), equalTo(0l));
|
||||||
} else {
|
} else {
|
||||||
assertThat(shardStatus.gatewayRecoveryStatus().reusedIndexSize().bytes(), greaterThan(shardStatus.gatewayRecoveryStatus().indexSize().bytes() - 4098 /* segments file */));
|
assertThat(shardStatus.gatewayRecoveryStatus().reusedIndexSize().bytes(), greaterThan(shardStatus.gatewayRecoveryStatus().indexSize().bytes() - 4098 /* segments file */));
|
||||||
|
|
|
@ -26,4 +26,7 @@ import org.elasticsearch.test.integration.gateway.AbstractSimpleIndexGatewayTest
|
||||||
*/
|
*/
|
||||||
public class SimpleFsIndexInRamIndexGatewayTests extends AbstractSimpleIndexGatewayTests {
|
public class SimpleFsIndexInRamIndexGatewayTests extends AbstractSimpleIndexGatewayTests {
|
||||||
|
|
||||||
|
@Override protected boolean isPersistentStorage() {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
}
|
}
|
Loading…
Reference in New Issue