diff --git a/src/main/java/org/elasticsearch/index/store/Store.java b/src/main/java/org/elasticsearch/index/store/Store.java index beac7e18b75..0450b024983 100644 --- a/src/main/java/org/elasticsearch/index/store/Store.java +++ b/src/main/java/org/elasticsearch/index/store/Store.java @@ -22,7 +22,6 @@ package org.elasticsearch.index.store; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; -import jsr166y.ThreadLocalRandom; import org.apache.lucene.store.*; import org.elasticsearch.common.Strings; import org.elasticsearch.common.collect.MapBuilder; @@ -37,6 +36,7 @@ import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.index.settings.IndexSettings; import org.elasticsearch.index.shard.AbstractIndexShardComponent; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.store.distributor.Distributor; import org.elasticsearch.index.store.support.ForceSyncDirectory; import java.io.File; @@ -73,12 +73,12 @@ public class Store extends AbstractIndexShardComponent { private final boolean sync; @Inject - public Store(ShardId shardId, @IndexSettings Settings indexSettings, IndexStore indexStore, DirectoryService directoryService) throws IOException { + public Store(ShardId shardId, @IndexSettings Settings indexSettings, IndexStore indexStore, DirectoryService directoryService, Distributor distributor) throws IOException { super(shardId, indexSettings); this.indexStore = indexStore; this.directoryService = directoryService; this.sync = componentSettings.getAsBoolean("sync", true); // TODO we don't really need to fsync when using shared gateway... - this.directory = new StoreDirectory(directoryService.build()); + this.directory = new StoreDirectory(distributor); } public IndexStore indexStore() { @@ -297,14 +297,14 @@ public class Store extends AbstractIndexShardComponent { */ class StoreDirectory extends Directory implements ForceSyncDirectory { - private final Directory[] delegates; + private final Distributor distributor; - StoreDirectory(Directory[] delegates) throws IOException { - this.delegates = delegates; + StoreDirectory(Distributor distributor) throws IOException { + this.distributor = distributor; synchronized (mutex) { MapBuilder builder = MapBuilder.newMapBuilder(); - Map checksums = readChecksums(delegates, new HashMap()); - for (Directory delegate : delegates) { + Map checksums = readChecksums(distributor.all(), new HashMap()); + for (Directory delegate : distributor.all()) { for (String file : delegate.listAll()) { String checksum = checksums.get(file); builder.put(file, new StoreFileMetaData(file, delegate.fileLength(file), checksum, delegate)); @@ -316,7 +316,7 @@ public class Store extends AbstractIndexShardComponent { } public Directory[] delegates() { - return delegates; + return distributor.all(); } @Override @@ -397,29 +397,11 @@ public class Store extends AbstractIndexShardComponent { } public IndexOutput createOutput(String name, IOContext context, boolean raw) throws IOException { - Directory directory = null; + Directory directory; if (isChecksum(name)) { - directory = delegates[0]; + directory = distributor.primary(); } else { - if (delegates.length == 1) { - directory = delegates[0]; - } else { - long size = Long.MIN_VALUE; - for (Directory delegate : delegates) { - if (delegate instanceof FSDirectory) { - long currentSize = ((FSDirectory) delegate).getDirectory().getUsableSpace(); - if (currentSize > size) { - size = currentSize; - directory = delegate; - } else if (currentSize == size && ThreadLocalRandom.current().nextBoolean()) { - directory = delegate; - } else { - } - } else { - directory = delegate; // really, make sense to have multiple directories for FS - } - } - } + directory = distributor.any(); } IndexOutput out = directory.createOutput(name, context); synchronized (mutex) { @@ -474,7 +456,7 @@ public class Store extends AbstractIndexShardComponent { @Override public void close() throws IOException { - for (Directory delegate : delegates) { + for (Directory delegate : distributor.all()) { delegate.close(); } synchronized (mutex) { @@ -485,27 +467,27 @@ public class Store extends AbstractIndexShardComponent { @Override public Lock makeLock(String name) { - return delegates[0].makeLock(name); + return distributor.primary().makeLock(name); } @Override public void clearLock(String name) throws IOException { - delegates[0].clearLock(name); + distributor.primary().clearLock(name); } @Override public void setLockFactory(LockFactory lockFactory) throws IOException { - delegates[0].setLockFactory(lockFactory); + distributor.primary().setLockFactory(lockFactory); } @Override public LockFactory getLockFactory() { - return delegates[0].getLockFactory(); + return distributor.primary().getLockFactory(); } @Override public String getLockID() { - return delegates[0].getLockID(); + return distributor.primary().getLockID(); } @Override diff --git a/src/main/java/org/elasticsearch/index/store/StoreModule.java b/src/main/java/org/elasticsearch/index/store/StoreModule.java index 1c7865e94e4..6e7a5618428 100644 --- a/src/main/java/org/elasticsearch/index/store/StoreModule.java +++ b/src/main/java/org/elasticsearch/index/store/StoreModule.java @@ -21,6 +21,9 @@ package org.elasticsearch.index.store; import org.elasticsearch.common.inject.AbstractModule; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.store.distributor.Distributor; +import org.elasticsearch.index.store.distributor.LeastUsedDistributor; +import org.elasticsearch.index.store.distributor.RandomWeightedDistributor; import org.elasticsearch.jmx.JmxService; /** @@ -32,11 +35,17 @@ public class StoreModule extends AbstractModule { private final IndexStore indexStore; + private Class distributor; + public StoreModule(Settings settings, IndexStore indexStore) { this.indexStore = indexStore; this.settings = settings; } + public void setDistributor(Class distributor) { + this.distributor = distributor; + } + @Override protected void configure() { bind(DirectoryService.class).to(indexStore.shardDirectory()).asEagerSingleton(); @@ -44,5 +53,24 @@ public class StoreModule extends AbstractModule { if (JmxService.shouldExport(settings)) { bind(StoreManagement.class).asEagerSingleton(); } + if (distributor == null) { + distributor = loadDistributor(settings); + } + bind(Distributor.class).to(distributor).asEagerSingleton(); } + + private Class loadDistributor(Settings settings) { + final Class distributor; + final String type = settings.get("index.store.distributor"); + if ("least_used".equals(type)) { + distributor = LeastUsedDistributor.class; + } else if ("random".equals(type)) { + distributor = RandomWeightedDistributor.class; + } else { + distributor = settings.getAsClass("index.store.distributor", LeastUsedDistributor.class, + "org.elasticsearch.index.store.distributor.", "Distributor"); + } + return distributor; + } + } diff --git a/src/main/java/org/elasticsearch/index/store/distributor/AbstractDistributor.java b/src/main/java/org/elasticsearch/index/store/distributor/AbstractDistributor.java new file mode 100644 index 00000000000..69f78957aac --- /dev/null +++ b/src/main/java/org/elasticsearch/index/store/distributor/AbstractDistributor.java @@ -0,0 +1,55 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.store.distributor; + +import org.apache.lucene.store.Directory; +import org.elasticsearch.index.store.DirectoryService; + +import java.io.IOException; + +public abstract class AbstractDistributor implements Distributor { + + protected final Directory[] delegates; + + protected AbstractDistributor(DirectoryService directoryService) throws IOException { + delegates = directoryService.build(); + } + + public Directory[] all() { + return delegates; + } + + @Override + public Directory primary() { + return delegates[0]; + } + + @Override + public Directory any() { + if (delegates.length == 1) { + return delegates[0]; + } else { + return doAny(); + } + } + + protected abstract Directory doAny(); + +} diff --git a/src/main/java/org/elasticsearch/index/store/distributor/Distributor.java b/src/main/java/org/elasticsearch/index/store/distributor/Distributor.java new file mode 100644 index 00000000000..47fd3bdf234 --- /dev/null +++ b/src/main/java/org/elasticsearch/index/store/distributor/Distributor.java @@ -0,0 +1,44 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.store.distributor; + +import org.apache.lucene.store.Directory; + +/** + * Keeps track of available directories and selects a directory + * based on some distribution strategy + */ +public interface Distributor { + + /** + * Returns primary directory (typically first directory in the list) + */ + Directory primary(); + + /** + * Returns all directories + */ + Directory[] all(); + + /** + * Selects one of the directories based on distribution strategy + */ + Directory any(); +} diff --git a/src/main/java/org/elasticsearch/index/store/distributor/LeastUsedDistributor.java b/src/main/java/org/elasticsearch/index/store/distributor/LeastUsedDistributor.java new file mode 100644 index 00000000000..1da7edca001 --- /dev/null +++ b/src/main/java/org/elasticsearch/index/store/distributor/LeastUsedDistributor.java @@ -0,0 +1,61 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.store.distributor; + +import jsr166y.ThreadLocalRandom; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.FSDirectory; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.index.store.DirectoryService; + +import java.io.IOException; + +/** + * Implements directory distributor that always return the directory is the most available space + */ +public class LeastUsedDistributor extends AbstractDistributor { + + @Inject + public LeastUsedDistributor(DirectoryService directoryService) throws IOException { + super(directoryService); + } + + @Override + public Directory doAny() { + Directory directory = null; + long size = Long.MIN_VALUE; + for (Directory delegate : delegates) { + if (delegate instanceof FSDirectory) { + long currentSize = ((FSDirectory) delegate).getDirectory().getUsableSpace(); + if (currentSize > size) { + size = currentSize; + directory = delegate; + } else if (currentSize == size && ThreadLocalRandom.current().nextBoolean()) { + directory = delegate; + } else { + } + } else { + directory = delegate; // really, make sense to have multiple directories for FS + } + } + return directory; + + } +} diff --git a/src/main/java/org/elasticsearch/index/store/distributor/RandomWeightedDistributor.java b/src/main/java/org/elasticsearch/index/store/distributor/RandomWeightedDistributor.java new file mode 100644 index 00000000000..a74ef520da9 --- /dev/null +++ b/src/main/java/org/elasticsearch/index/store/distributor/RandomWeightedDistributor.java @@ -0,0 +1,68 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.store.distributor; + +import jsr166y.ThreadLocalRandom; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.FSDirectory; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.index.store.DirectoryService; + +import java.io.IOException; + +/** + * Implements directory distributor that picks a directory at random. The probability of selecting a directory + * is proportional to the amount of usable space in this directory. + */ +public class RandomWeightedDistributor extends AbstractDistributor { + + @Inject + public RandomWeightedDistributor(DirectoryService directoryService) throws IOException { + super(directoryService); + } + + @Override + public Directory doAny() { + long[] usableSpace = new long[delegates.length]; + long size = 0; + + for (int i = 0; i < delegates.length; i++) { + Directory delegate = delegates[i]; + if (delegate instanceof FSDirectory) { + size += ((FSDirectory) delegate).getDirectory().getUsableSpace(); + } else { + // makes little sense to use multiple non fs directories + } + usableSpace[i] = size; + } + + if (size != 0) { + long random = ThreadLocalRandom.current().nextLong(size); + for (int i = 0; i < delegates.length; i++) { + if (usableSpace[i] > random) { + return delegates[i]; + } + } + } + + // TODO: size is 0 - should we bail out or fall back on random distribution? + return delegates[ThreadLocalRandom.current().nextInt(delegates.length)]; + } +} diff --git a/src/test/java/org/elasticsearch/test/unit/index/engine/AbstractSimpleEngineTests.java b/src/test/java/org/elasticsearch/test/unit/index/engine/AbstractSimpleEngineTests.java index f8868f801b9..8c1888abf6f 100644 --- a/src/test/java/org/elasticsearch/test/unit/index/engine/AbstractSimpleEngineTests.java +++ b/src/test/java/org/elasticsearch/test/unit/index/engine/AbstractSimpleEngineTests.java @@ -43,7 +43,9 @@ import org.elasticsearch.index.merge.scheduler.MergeSchedulerProvider; import org.elasticsearch.index.merge.scheduler.SerialMergeSchedulerProvider; import org.elasticsearch.index.settings.IndexSettingsService; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.store.DirectoryService; import org.elasticsearch.index.store.Store; +import org.elasticsearch.index.store.distributor.LeastUsedDistributor; import org.elasticsearch.index.store.ram.RamDirectoryService; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.fs.FsTranslog; @@ -122,11 +124,13 @@ public abstract class AbstractSimpleEngineTests { } protected Store createStore() throws IOException { - return new Store(shardId, EMPTY_SETTINGS, null, new RamDirectoryService(shardId, EMPTY_SETTINGS)); + DirectoryService directoryService = new RamDirectoryService(shardId, EMPTY_SETTINGS); + return new Store(shardId, EMPTY_SETTINGS, null, directoryService, new LeastUsedDistributor(directoryService)); } protected Store createStoreReplica() throws IOException { - return new Store(shardId, EMPTY_SETTINGS, null, new RamDirectoryService(shardId, EMPTY_SETTINGS)); + DirectoryService directoryService = new RamDirectoryService(shardId, EMPTY_SETTINGS); + return new Store(shardId, EMPTY_SETTINGS, null, directoryService, new LeastUsedDistributor(directoryService)); } protected Translog createTranslog() { diff --git a/src/test/java/org/elasticsearch/test/unit/index/store/distributor/DistributorTests.java b/src/test/java/org/elasticsearch/test/unit/index/store/distributor/DistributorTests.java new file mode 100644 index 00000000000..17c5223e983 --- /dev/null +++ b/src/test/java/org/elasticsearch/test/unit/index/store/distributor/DistributorTests.java @@ -0,0 +1,183 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.test.unit.index.store.distributor; + +import org.apache.lucene.store.*; +import org.elasticsearch.index.store.DirectoryService; +import org.elasticsearch.index.store.distributor.LeastUsedDistributor; +import org.elasticsearch.index.store.distributor.RandomWeightedDistributor; +import org.testng.annotations.Test; + +import java.io.File; +import java.io.IOException; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.*; + +/** + */ +public class DistributorTests { + + @Test + public void testEmptyFirstDistributor() throws Exception { + FakeFsDirectory[] directories = new FakeFsDirectory[]{ + new FakeFsDirectory("dir0", 10L), + new FakeFsDirectory("dir1", 20L), + new FakeFsDirectory("dir2", 30L) + }; + FakeDirectoryService directoryService = new FakeDirectoryService(directories); + + LeastUsedDistributor distributor = new LeastUsedDistributor(directoryService); + for (int i = 0; i < 5; i++) { + assertThat(distributor.any(), equalTo((Directory) directories[2])); + } + + directories[2].setUsableSpace(5L); + for (int i = 0; i < 5; i++) { + assertThat(distributor.any(), equalTo((Directory) directories[1])); + } + + directories[1].setUsableSpace(0L); + for (int i = 0; i < 5; i++) { + assertThat(distributor.any(), equalTo((Directory) directories[0])); + } + + + } + + @Test + public void testRandomWeightedDistributor() throws Exception { + FakeFsDirectory[] directories = new FakeFsDirectory[]{ + new FakeFsDirectory("dir0", 10L), + new FakeFsDirectory("dir1", 20L), + new FakeFsDirectory("dir2", 30L) + }; + FakeDirectoryService directoryService = new FakeDirectoryService(directories); + + RandomWeightedDistributor randomWeightedDistributor = new RandomWeightedDistributor(directoryService); + for (int i = 0; i < 10000; i++) { + ((FakeFsDirectory) randomWeightedDistributor.any()).incrementAllocationCount(); + } + for (FakeFsDirectory directory : directories) { + assertThat(directory.getAllocationCount(), greaterThan(0)); + } + assertThat((double) directories[1].getAllocationCount() / directories[0].getAllocationCount(), closeTo(2.0, 0.5)); + assertThat((double) directories[2].getAllocationCount() / directories[0].getAllocationCount(), closeTo(3.0, 0.5)); + + for (FakeFsDirectory directory : directories) { + directory.resetAllocationCount(); + } + + directories[1].setUsableSpace(0L); + + for (int i = 0; i < 1000; i++) { + ((FakeFsDirectory) randomWeightedDistributor.any()).incrementAllocationCount(); + } + + assertThat(directories[0].getAllocationCount(), greaterThan(0)); + assertThat(directories[1].getAllocationCount(), equalTo(0)); + assertThat(directories[2].getAllocationCount(), greaterThan(0)); + + } + + public static class FakeDirectoryService implements DirectoryService { + + private final Directory[] directories; + + public FakeDirectoryService(Directory[] directories) { + this.directories = directories; + } + + @Override + public Directory[] build() throws IOException { + return directories; + } + + @Override + public long throttleTimeInNanos() { + return 0; + } + + @Override + public void renameFile(Directory dir, String from, String to) throws IOException { + } + + @Override + public void fullDelete(Directory dir) throws IOException { + } + } + + public static class FakeFsDirectory extends FSDirectory { + + public int allocationCount; + + public FakeFile fakeFile; + + public FakeFsDirectory(String path, long usableSpace) throws IOException { + super(new File(path), NoLockFactory.getNoLockFactory()); + fakeFile = new FakeFile(path, usableSpace); + allocationCount = 0; + } + + @Override + public IndexInput openInput(String name, IOContext context) throws IOException { + throw new UnsupportedOperationException("Shouldn't be called in the test"); + } + + public void setUsableSpace(long usableSpace) { + fakeFile.setUsableSpace(usableSpace); + } + + public void incrementAllocationCount() { + allocationCount++; + } + + public int getAllocationCount() { + return allocationCount; + } + + public void resetAllocationCount() { + allocationCount = 0; + } + + @Override + public File getDirectory() { + return fakeFile; + } + } + + public static class FakeFile extends File { + private long usableSpace; + + public FakeFile(String s, long usableSpace) { + super(s); + this.usableSpace = usableSpace; + } + + @Override + public long getUsableSpace() { + return usableSpace; + } + + public void setUsableSpace(long usableSpace) { + this.usableSpace = usableSpace; + } + } +}