diff --git a/src/main/java/org/elasticsearch/index/store/distributor/AbstractDistributor.java b/src/main/java/org/elasticsearch/index/store/distributor/AbstractDistributor.java index 69f78957aac..1919dfc4c1b 100644 --- a/src/main/java/org/elasticsearch/index/store/distributor/AbstractDistributor.java +++ b/src/main/java/org/elasticsearch/index/store/distributor/AbstractDistributor.java @@ -20,6 +20,8 @@ package org.elasticsearch.index.store.distributor; import org.apache.lucene.store.Directory; +import org.apache.lucene.store.FSDirectory; +import org.apache.lucene.store.RateLimitedFSDirectory; import org.elasticsearch.index.store.DirectoryService; import java.io.IOException; @@ -50,6 +52,16 @@ public abstract class AbstractDistributor implements Distributor { } } + protected long getUsableSpace(Directory directory) { + if (directory instanceof RateLimitedFSDirectory) { + return ((RateLimitedFSDirectory) directory).wrappedDirectory().getDirectory().getUsableSpace(); + } else if (directory instanceof FSDirectory) { + return ((FSDirectory) directory).getDirectory().getUsableSpace(); + } else { + return 0; + } + } + protected abstract Directory doAny(); } diff --git a/src/main/java/org/elasticsearch/index/store/distributor/LeastUsedDistributor.java b/src/main/java/org/elasticsearch/index/store/distributor/LeastUsedDistributor.java index 1da7edca001..651c72b185e 100644 --- a/src/main/java/org/elasticsearch/index/store/distributor/LeastUsedDistributor.java +++ b/src/main/java/org/elasticsearch/index/store/distributor/LeastUsedDistributor.java @@ -21,7 +21,6 @@ package org.elasticsearch.index.store.distributor; import jsr166y.ThreadLocalRandom; import org.apache.lucene.store.Directory; -import org.apache.lucene.store.FSDirectory; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.index.store.DirectoryService; @@ -41,20 +40,22 @@ public class LeastUsedDistributor extends AbstractDistributor { public Directory doAny() { Directory directory = null; long size = Long.MIN_VALUE; + int sameSize = 0; for (Directory delegate : delegates) { - if (delegate instanceof FSDirectory) { - long currentSize = ((FSDirectory) delegate).getDirectory().getUsableSpace(); - if (currentSize > size) { - size = currentSize; + long currentSize = getUsableSpace(delegate); + if (currentSize > size) { + size = currentSize; + directory = delegate; + sameSize = 1; + } else if (currentSize == size) { + sameSize++; + // Ensure uniform distribution between all directories with the same size + if (ThreadLocalRandom.current().nextDouble() < 1.0 / sameSize) { directory = delegate; - } else if (currentSize == size && ThreadLocalRandom.current().nextBoolean()) { - directory = delegate; - } else { } - } else { - directory = delegate; // really, make sense to have multiple directories for FS } } + return directory; } diff --git a/src/main/java/org/elasticsearch/index/store/distributor/RandomWeightedDistributor.java b/src/main/java/org/elasticsearch/index/store/distributor/RandomWeightedDistributor.java index a74ef520da9..a92ceba077f 100644 --- a/src/main/java/org/elasticsearch/index/store/distributor/RandomWeightedDistributor.java +++ b/src/main/java/org/elasticsearch/index/store/distributor/RandomWeightedDistributor.java @@ -21,7 +21,6 @@ package org.elasticsearch.index.store.distributor; import jsr166y.ThreadLocalRandom; import org.apache.lucene.store.Directory; -import org.apache.lucene.store.FSDirectory; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.index.store.DirectoryService; @@ -44,12 +43,7 @@ public class RandomWeightedDistributor extends AbstractDistributor { long size = 0; for (int i = 0; i < delegates.length; i++) { - Directory delegate = delegates[i]; - if (delegate instanceof FSDirectory) { - size += ((FSDirectory) delegate).getDirectory().getUsableSpace(); - } else { - // makes little sense to use multiple non fs directories - } + size += getUsableSpace(delegates[i]); usableSpace[i] = size; } diff --git a/src/test/java/org/elasticsearch/test/integration/indices/store/SimpleDistributorTests.java b/src/test/java/org/elasticsearch/test/integration/indices/store/SimpleDistributorTests.java new file mode 100644 index 00000000000..4e05d0022a0 --- /dev/null +++ b/src/test/java/org/elasticsearch/test/integration/indices/store/SimpleDistributorTests.java @@ -0,0 +1,79 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.test.integration.indices.store; + +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.env.Environment; +import org.elasticsearch.indices.IndexMissingException; +import org.elasticsearch.node.internal.InternalNode; +import org.elasticsearch.test.integration.AbstractNodesTests; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import java.io.File; + +import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; + +/** + * + */ +public class SimpleDistributorTests extends AbstractNodesTests { + protected Environment environment; + + @BeforeClass + public void getTestEnvironment() { + environment = ((InternalNode) startNode("node0")).injector().getInstance(Environment.class); + closeNode("node0"); + } + + @AfterClass + public void closeNodes() { + closeAllNodes(); + } + + public final static String[] STORE_TYPES = {"fs", "simplefs", "niofs", "mmapfs"}; + + @Test + public void testAvailableSpaceDetection() { + File dataRoot = environment.dataFiles()[0]; + startNode("node1", settingsBuilder().putArray("path.data", new File(dataRoot, "data1").getAbsolutePath(), new File(dataRoot, "data2").getAbsolutePath())); + for (String store : STORE_TYPES) { + try { + client("node1").admin().indices().prepareDelete("test").execute().actionGet(); + } catch (IndexMissingException ex) { + // Ignore + } + client("node1").admin().indices().prepareCreate("test") + .setSettings(settingsBuilder() + .put("index.store.distributor", StrictDistributor.class.getCanonicalName()) + .put("index.store.type", store) + .put("index.number_of_replicas", 0) + .put("index.number_of_shards", 1) + ) + .execute().actionGet(); + assertThat(client("node1").admin().cluster().prepareHealth("test").setWaitForGreenStatus() + .setTimeout(TimeValue.timeValueSeconds(5)).execute().actionGet().isTimedOut(), equalTo(false)); + } + } + +} diff --git a/src/test/java/org/elasticsearch/test/integration/indices/store/StrictDistributor.java b/src/test/java/org/elasticsearch/test/integration/indices/store/StrictDistributor.java new file mode 100644 index 00000000000..17a2463e6d4 --- /dev/null +++ b/src/test/java/org/elasticsearch/test/integration/indices/store/StrictDistributor.java @@ -0,0 +1,49 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.test.integration.indices.store; + +import org.apache.lucene.store.Directory; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.index.store.DirectoryService; +import org.elasticsearch.index.store.distributor.AbstractDistributor; + +import java.io.IOException; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.greaterThan; + +/** + * + */ +public class StrictDistributor extends AbstractDistributor { + + @Inject + public StrictDistributor(DirectoryService directoryService) throws IOException { + super(directoryService); + } + + @Override + public Directory doAny() { + for (Directory delegate : delegates) { + assertThat(getUsableSpace(delegate), greaterThan(0L)); + } + return primary(); + } +} \ No newline at end of file diff --git a/src/test/java/org/elasticsearch/test/unit/index/store/distributor/DistributorTests.java b/src/test/java/org/elasticsearch/test/unit/index/store/distributor/DistributorTests.java index 17c5223e983..7c27aae9738 100644 --- a/src/test/java/org/elasticsearch/test/unit/index/store/distributor/DistributorTests.java +++ b/src/test/java/org/elasticsearch/test/unit/index/store/distributor/DistributorTests.java @@ -36,7 +36,7 @@ import static org.hamcrest.Matchers.*; public class DistributorTests { @Test - public void testEmptyFirstDistributor() throws Exception { + public void testLeastUsedDistributor() throws Exception { FakeFsDirectory[] directories = new FakeFsDirectory[]{ new FakeFsDirectory("dir0", 10L), new FakeFsDirectory("dir1", 20L), @@ -60,6 +60,34 @@ public class DistributorTests { } + directories[0].setUsableSpace(10L); + directories[1].setUsableSpace(20L); + directories[2].setUsableSpace(20L); + for (FakeFsDirectory directory : directories) { + directory.resetAllocationCount(); + } + for (int i = 0; i < 10000; i++) { + ((FakeFsDirectory) distributor.any()).incrementAllocationCount(); + } + assertThat(directories[0].getAllocationCount(), equalTo(0)); + assertThat((double) directories[1].getAllocationCount() / directories[2].getAllocationCount(), closeTo(1.0, 0.5)); + + // Test failover scenario + for (FakeFsDirectory directory : directories) { + directory.resetAllocationCount(); + } + directories[0].setUsableSpace(0L); + directories[1].setUsableSpace(0L); + directories[2].setUsableSpace(0L); + for (int i = 0; i < 10000; i++) { + ((FakeFsDirectory) distributor.any()).incrementAllocationCount(); + } + for (FakeFsDirectory directory : directories) { + assertThat(directory.getAllocationCount(), greaterThan(0)); + } + assertThat((double) directories[0].getAllocationCount() / directories[2].getAllocationCount(), closeTo(1.0, 0.5)); + assertThat((double) directories[1].getAllocationCount() / directories[2].getAllocationCount(), closeTo(1.0, 0.5)); + } @Test