Make DistributorDirectory not call fsync on sub directories and missing files.
Related to #9145
This commit is contained in:
parent
9090e0381f
commit
999bec1243
|
@ -25,8 +25,11 @@ import org.elasticsearch.index.store.distributor.Distributor;
|
||||||
|
|
||||||
import java.io.FileNotFoundException;
|
import java.io.FileNotFoundException;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
import java.util.IdentityHashMap;
|
||||||
|
import java.util.Map;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -102,8 +105,20 @@ public final class DistributorDirectory extends Directory {
|
||||||
@Override
|
@Override
|
||||||
public void sync(Collection<String> names) throws IOException {
|
public void sync(Collection<String> names) throws IOException {
|
||||||
// no need to sync this operation it could be long running too
|
// no need to sync this operation it could be long running too
|
||||||
for (Directory dir : distributor.all()) {
|
final Map<Directory, Collection<String>> perDirectory = new IdentityHashMap<>();
|
||||||
dir.sync(names);
|
for (String name : names) {
|
||||||
|
final Directory dir = getDirectory(name);
|
||||||
|
Collection<String> dirNames = perDirectory.get(dir);
|
||||||
|
if (dirNames == null) {
|
||||||
|
dirNames = new ArrayList<>();
|
||||||
|
perDirectory.put(dir, dirNames);
|
||||||
|
}
|
||||||
|
dirNames.add(name);
|
||||||
|
}
|
||||||
|
for (Map.Entry<Directory, Collection<String>> entry : perDirectory.entrySet()) {
|
||||||
|
final Directory dir = entry.getKey();
|
||||||
|
final Collection<String> dirNames = entry.getValue();
|
||||||
|
dir.sync(dirNames);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -19,7 +19,10 @@
|
||||||
package org.elasticsearch.index.store;
|
package org.elasticsearch.index.store;
|
||||||
|
|
||||||
import com.carrotsearch.randomizedtesting.annotations.*;
|
import com.carrotsearch.randomizedtesting.annotations.*;
|
||||||
|
import com.carrotsearch.randomizedtesting.generators.RandomInts;
|
||||||
import com.carrotsearch.randomizedtesting.generators.RandomPicks;
|
import com.carrotsearch.randomizedtesting.generators.RandomPicks;
|
||||||
|
import com.google.common.collect.ImmutableSet;
|
||||||
|
|
||||||
import org.apache.lucene.index.IndexFileNames;
|
import org.apache.lucene.index.IndexFileNames;
|
||||||
import org.apache.lucene.store.*;
|
import org.apache.lucene.store.*;
|
||||||
import org.apache.lucene.util.IOUtils;
|
import org.apache.lucene.util.IOUtils;
|
||||||
|
@ -36,6 +39,9 @@ import java.io.IOException;
|
||||||
import java.nio.file.NoSuchFileException;
|
import java.nio.file.NoSuchFileException;
|
||||||
import java.nio.file.Path;
|
import java.nio.file.Path;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
@ThreadLeakFilters(defaultFilters = true, filters = {ElasticsearchThreadFilter.class})
|
@ThreadLeakFilters(defaultFilters = true, filters = {ElasticsearchThreadFilter.class})
|
||||||
@ThreadLeakScope(ThreadLeakScope.Scope.SUITE)
|
@ThreadLeakScope(ThreadLeakScope.Scope.SUITE)
|
||||||
|
@ -157,4 +163,41 @@ public class DistributorDirectoryTest extends BaseDirectoryTestCase {
|
||||||
IOUtils.close(dd);
|
IOUtils.close(dd);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testSync() throws IOException {
|
||||||
|
final Set<String> syncedFiles = new HashSet<>();
|
||||||
|
final Directory[] directories = new Directory[RandomInts.randomIntBetween(random(), 1, 5)];
|
||||||
|
for (int i = 0; i < directories.length; ++i) {
|
||||||
|
final Directory dir = newDirectory();
|
||||||
|
directories[i] = new FilterDirectory(dir) {
|
||||||
|
@Override
|
||||||
|
public void sync(Collection<String> names) throws IOException {
|
||||||
|
super.sync(names);
|
||||||
|
syncedFiles.addAll(names);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
final Directory directory = new DistributorDirectory(directories);
|
||||||
|
|
||||||
|
for (String file : Arrays.asList("a.bin", "b.bin")) {
|
||||||
|
try (IndexOutput out = directory.createOutput(file, IOContext.DEFAULT)) {
|
||||||
|
out.writeInt(random().nextInt());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// syncing on a missing file throws an exception
|
||||||
|
try {
|
||||||
|
directory.sync(Arrays.asList("a.bin", "c.bin"));
|
||||||
|
} catch (FileNotFoundException e) {
|
||||||
|
// expected
|
||||||
|
}
|
||||||
|
assertEquals(ImmutableSet.of(), syncedFiles);
|
||||||
|
|
||||||
|
// but syncing on existing files actually delegates
|
||||||
|
directory.sync(Arrays.asList("a.bin", "b.bin"));
|
||||||
|
assertEquals(ImmutableSet.of("a.bin", "b.bin"), syncedFiles);
|
||||||
|
|
||||||
|
directory.close();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue