mirror of
synced 2025-03-24 17:09:48 +00:00
@ -22,7 +22,6 @@ package org.elasticsearch.index.store;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import jsr166y.ThreadLocalRandom;
import org.apache.lucene.store.*;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.MapBuilder;
@ -37,6 +36,7 @@ import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.distributor.Distributor;
import org.elasticsearch.index.store.support.ForceSyncDirectory;
import java.io.File;
@ -73,12 +73,12 @@ public class Store extends AbstractIndexShardComponent {
private final boolean sync;
public Store(ShardId shardId, @IndexSettings Settings indexSettings, IndexStore indexStore, DirectoryService directoryService) throws IOException {
public Store(ShardId shardId, @IndexSettings Settings indexSettings, IndexStore indexStore, DirectoryService directoryService, Distributor distributor) throws IOException {
super(shardId, indexSettings);
this.indexStore = indexStore;
this.directoryService = directoryService;
this.sync = componentSettings.getAsBoolean("sync", true); // TODO we don't really need to fsync when using shared gateway...
this.directory = new StoreDirectory(directoryService.build());
this.directory = new StoreDirectory(distributor);
public IndexStore indexStore() {
@ -297,14 +297,14 @@ public class Store extends AbstractIndexShardComponent {
class StoreDirectory extends Directory implements ForceSyncDirectory {
private final Directory[] delegates;
private final Distributor distributor;
StoreDirectory(Directory[] delegates) throws IOException {
this.delegates = delegates;
StoreDirectory(Distributor distributor) throws IOException {
this.distributor = distributor;
synchronized (mutex) {
MapBuilder<String, StoreFileMetaData> builder = MapBuilder.newMapBuilder();
Map<String, String> checksums = readChecksums(delegates, new HashMap<String, String>());
for (Directory delegate : delegates) {
Map<String, String> checksums = readChecksums(distributor.all(), new HashMap<String, String>());
for (Directory delegate : distributor.all()) {
for (String file : delegate.listAll()) {
String checksum = checksums.get(file);
builder.put(file, new StoreFileMetaData(file, delegate.fileLength(file), checksum, delegate));
@ -316,7 +316,7 @@ public class Store extends AbstractIndexShardComponent {
public Directory[] delegates() {
return delegates;
return distributor.all();
@ -397,29 +397,11 @@ public class Store extends AbstractIndexShardComponent {
public IndexOutput createOutput(String name, IOContext context, boolean raw) throws IOException {
Directory directory = null;
Directory directory;
if (isChecksum(name)) {
directory = delegates[0];
directory = distributor.primary();
} else {
if (delegates.length == 1) {
directory = delegates[0];
} else {
long size = Long.MIN_VALUE;
for (Directory delegate : delegates) {
if (delegate instanceof FSDirectory) {
long currentSize = ((FSDirectory) delegate).getDirectory().getUsableSpace();
if (currentSize > size) {
size = currentSize;
directory = delegate;
} else if (currentSize == size && ThreadLocalRandom.current().nextBoolean()) {
directory = delegate;
} else {
} else {
directory = delegate; // really, make sense to have multiple directories for FS
directory = distributor.any();
IndexOutput out = directory.createOutput(name, context);
synchronized (mutex) {
@ -474,7 +456,7 @@ public class Store extends AbstractIndexShardComponent {
public void close() throws IOException {
for (Directory delegate : delegates) {
for (Directory delegate : distributor.all()) {
synchronized (mutex) {
@ -485,27 +467,27 @@ public class Store extends AbstractIndexShardComponent {
public Lock makeLock(String name) {
return delegates[0].makeLock(name);
return distributor.primary().makeLock(name);
public void clearLock(String name) throws IOException {
public void setLockFactory(LockFactory lockFactory) throws IOException {
public LockFactory getLockFactory() {
return delegates[0].getLockFactory();
return distributor.primary().getLockFactory();
public String getLockID() {
return delegates[0].getLockID();
return distributor.primary().getLockID();
@ -21,6 +21,9 @@ package org.elasticsearch.index.store;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.store.distributor.Distributor;
import org.elasticsearch.index.store.distributor.LeastUsedDistributor;
import org.elasticsearch.index.store.distributor.RandomWeightedDistributor;
import org.elasticsearch.jmx.JmxService;
@ -32,11 +35,17 @@ public class StoreModule extends AbstractModule {
private final IndexStore indexStore;
private Class<? extends Distributor> distributor;
public StoreModule(Settings settings, IndexStore indexStore) {
this.indexStore = indexStore;
this.settings = settings;
public void setDistributor(Class<? extends Distributor> distributor) {
this.distributor = distributor;
protected void configure() {
@ -44,5 +53,24 @@ public class StoreModule extends AbstractModule {
if (JmxService.shouldExport(settings)) {
if (distributor == null) {
distributor = loadDistributor(settings);
private Class<? extends Distributor> loadDistributor(Settings settings) {
final Class<? extends Distributor> distributor;
final String type = settings.get("index.store.distributor");
if ("least_used".equals(type)) {
distributor = LeastUsedDistributor.class;
} else if ("random".equals(type)) {
distributor = RandomWeightedDistributor.class;
} else {
distributor = settings.getAsClass("index.store.distributor", LeastUsedDistributor.class,
"org.elasticsearch.index.store.distributor.", "Distributor");
return distributor;
@ -0,0 +1,55 @@
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
package org.elasticsearch.index.store.distributor;
import org.apache.lucene.store.Directory;
import org.elasticsearch.index.store.DirectoryService;
import java.io.IOException;
public abstract class AbstractDistributor implements Distributor {
protected final Directory[] delegates;
protected AbstractDistributor(DirectoryService directoryService) throws IOException {
delegates = directoryService.build();
public Directory[] all() {
return delegates;
public Directory primary() {
return delegates[0];
public Directory any() {
if (delegates.length == 1) {
return delegates[0];
} else {
return doAny();
protected abstract Directory doAny();
@ -0,0 +1,44 @@
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
package org.elasticsearch.index.store.distributor;
import org.apache.lucene.store.Directory;
* Keeps track of available directories and selects a directory
* based on some distribution strategy
public interface Distributor {
* Returns primary directory (typically first directory in the list)
Directory primary();
* Returns all directories
Directory[] all();
* Selects one of the directories based on distribution strategy
Directory any();
@ -0,0 +1,61 @@
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
package org.elasticsearch.index.store.distributor;
import jsr166y.ThreadLocalRandom;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.index.store.DirectoryService;
import java.io.IOException;
* Implements directory distributor that always return the directory is the most available space
public class LeastUsedDistributor extends AbstractDistributor {
public LeastUsedDistributor(DirectoryService directoryService) throws IOException {
public Directory doAny() {
Directory directory = null;
long size = Long.MIN_VALUE;
for (Directory delegate : delegates) {
if (delegate instanceof FSDirectory) {
long currentSize = ((FSDirectory) delegate).getDirectory().getUsableSpace();
if (currentSize > size) {
size = currentSize;
directory = delegate;
} else if (currentSize == size && ThreadLocalRandom.current().nextBoolean()) {
directory = delegate;
} else {
} else {
directory = delegate; // really, make sense to have multiple directories for FS
return directory;
@ -0,0 +1,68 @@
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
package org.elasticsearch.index.store.distributor;
import jsr166y.ThreadLocalRandom;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.index.store.DirectoryService;
import java.io.IOException;
* Implements directory distributor that picks a directory at random. The probability of selecting a directory
* is proportional to the amount of usable space in this directory.
public class RandomWeightedDistributor extends AbstractDistributor {
public RandomWeightedDistributor(DirectoryService directoryService) throws IOException {
public Directory doAny() {
long[] usableSpace = new long[delegates.length];
long size = 0;
for (int i = 0; i < delegates.length; i++) {
Directory delegate = delegates[i];
if (delegate instanceof FSDirectory) {
size += ((FSDirectory) delegate).getDirectory().getUsableSpace();
} else {
// makes little sense to use multiple non fs directories
usableSpace[i] = size;
if (size != 0) {
long random = ThreadLocalRandom.current().nextLong(size);
for (int i = 0; i < delegates.length; i++) {
if (usableSpace[i] > random) {
return delegates[i];
// TODO: size is 0 - should we bail out or fall back on random distribution?
return delegates[ThreadLocalRandom.current().nextInt(delegates.length)];
@ -43,7 +43,9 @@ import org.elasticsearch.index.merge.scheduler.MergeSchedulerProvider;
import org.elasticsearch.index.merge.scheduler.SerialMergeSchedulerProvider;
import org.elasticsearch.index.settings.IndexSettingsService;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.DirectoryService;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.store.distributor.LeastUsedDistributor;
import org.elasticsearch.index.store.ram.RamDirectoryService;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.fs.FsTranslog;
@ -122,11 +124,13 @@ public abstract class AbstractSimpleEngineTests {
protected Store createStore() throws IOException {
return new Store(shardId, EMPTY_SETTINGS, null, new RamDirectoryService(shardId, EMPTY_SETTINGS));
DirectoryService directoryService = new RamDirectoryService(shardId, EMPTY_SETTINGS);
return new Store(shardId, EMPTY_SETTINGS, null, directoryService, new LeastUsedDistributor(directoryService));
protected Store createStoreReplica() throws IOException {
return new Store(shardId, EMPTY_SETTINGS, null, new RamDirectoryService(shardId, EMPTY_SETTINGS));
DirectoryService directoryService = new RamDirectoryService(shardId, EMPTY_SETTINGS);
return new Store(shardId, EMPTY_SETTINGS, null, directoryService, new LeastUsedDistributor(directoryService));
protected Translog createTranslog() {
@ -0,0 +1,183 @@
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
package org.elasticsearch.test.unit.index.store.distributor;
import org.apache.lucene.store.*;
import org.elasticsearch.index.store.DirectoryService;
import org.elasticsearch.index.store.distributor.LeastUsedDistributor;
import org.elasticsearch.index.store.distributor.RandomWeightedDistributor;
import org.testng.annotations.Test;
import java.io.File;
import java.io.IOException;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.*;
public class DistributorTests {
public void testEmptyFirstDistributor() throws Exception {
FakeFsDirectory[] directories = new FakeFsDirectory[]{
new FakeFsDirectory("dir0", 10L),
new FakeFsDirectory("dir1", 20L),
new FakeFsDirectory("dir2", 30L)
FakeDirectoryService directoryService = new FakeDirectoryService(directories);
LeastUsedDistributor distributor = new LeastUsedDistributor(directoryService);
for (int i = 0; i < 5; i++) {
assertThat(distributor.any(), equalTo((Directory) directories[2]));
for (int i = 0; i < 5; i++) {
assertThat(distributor.any(), equalTo((Directory) directories[1]));
for (int i = 0; i < 5; i++) {
assertThat(distributor.any(), equalTo((Directory) directories[0]));
public void testRandomWeightedDistributor() throws Exception {
FakeFsDirectory[] directories = new FakeFsDirectory[]{
new FakeFsDirectory("dir0", 10L),
new FakeFsDirectory("dir1", 20L),
new FakeFsDirectory("dir2", 30L)
FakeDirectoryService directoryService = new FakeDirectoryService(directories);
RandomWeightedDistributor randomWeightedDistributor = new RandomWeightedDistributor(directoryService);
for (int i = 0; i < 10000; i++) {
((FakeFsDirectory) randomWeightedDistributor.any()).incrementAllocationCount();
for (FakeFsDirectory directory : directories) {
assertThat(directory.getAllocationCount(), greaterThan(0));
assertThat((double) directories[1].getAllocationCount() / directories[0].getAllocationCount(), closeTo(2.0, 0.5));
assertThat((double) directories[2].getAllocationCount() / directories[0].getAllocationCount(), closeTo(3.0, 0.5));
for (FakeFsDirectory directory : directories) {
for (int i = 0; i < 1000; i++) {
((FakeFsDirectory) randomWeightedDistributor.any()).incrementAllocationCount();
assertThat(directories[0].getAllocationCount(), greaterThan(0));
assertThat(directories[1].getAllocationCount(), equalTo(0));
assertThat(directories[2].getAllocationCount(), greaterThan(0));
public static class FakeDirectoryService implements DirectoryService {
private final Directory[] directories;
public FakeDirectoryService(Directory[] directories) {
this.directories = directories;
public Directory[] build() throws IOException {
return directories;
public long throttleTimeInNanos() {
return 0;
public void renameFile(Directory dir, String from, String to) throws IOException {
public void fullDelete(Directory dir) throws IOException {
public static class FakeFsDirectory extends FSDirectory {
public int allocationCount;
public FakeFile fakeFile;
public FakeFsDirectory(String path, long usableSpace) throws IOException {
super(new File(path), NoLockFactory.getNoLockFactory());
fakeFile = new FakeFile(path, usableSpace);
allocationCount = 0;
public IndexInput openInput(String name, IOContext context) throws IOException {
throw new UnsupportedOperationException("Shouldn't be called in the test");
public void setUsableSpace(long usableSpace) {
public void incrementAllocationCount() {
public int getAllocationCount() {
return allocationCount;
public void resetAllocationCount() {
allocationCount = 0;
public File getDirectory() {
return fakeFile;
public static class FakeFile extends File {
private long usableSpace;
public FakeFile(String s, long usableSpace) {
this.usableSpace = usableSpace;
public long getUsableSpace() {
return usableSpace;
public void setUsableSpace(long usableSpace) {
this.usableSpace = usableSpace;
Reference in New Issue
Block a user