[CORE] remove usage of Directory#fileExists
This commit removes the usage of #fileExists which has been deprecated long ago an can be a source of race conditions.
This commit is contained in:
parent
35ede09511
commit
5190cee7fc
|
@ -103,7 +103,7 @@ public final class DistributorDirectory extends BaseDirectory {
|
|||
|
||||
@Override
|
||||
public void deleteFile(String name) throws IOException {
|
||||
getDirectory(name, true, true).deleteFile(name);
|
||||
getDirectory(name, true).deleteFile(name);
|
||||
Directory remove = nameDirMapping.remove(name);
|
||||
assert usePrimary(name) || remove != null : "Tried to delete file " + name + " but couldn't";
|
||||
}
|
||||
|
@ -115,7 +115,7 @@ public final class DistributorDirectory extends BaseDirectory {
|
|||
|
||||
@Override
|
||||
public IndexOutput createOutput(String name, IOContext context) throws IOException {
|
||||
return getDirectory(name, false, false).createOutput(name, context);
|
||||
return getDirectory(name, false).createOutput(name, context);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -141,7 +141,7 @@ public final class DistributorDirectory extends BaseDirectory {
|
|||
* @throws IOException if the name has not yet been associated with any directory ie. fi the file does not exists
|
||||
*/
|
||||
private Directory getDirectory(String name) throws IOException {
|
||||
return getDirectory(name, true, false);
|
||||
return getDirectory(name, true);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -155,23 +155,12 @@ public final class DistributorDirectory extends BaseDirectory {
|
|||
* Returns the directory that has previously been associated with this file name or associates the name with a directory
|
||||
* if failIfNotAssociated is set to false.
|
||||
*/
|
||||
private Directory getDirectory(String name, boolean failIfNotAssociated, boolean iterate) throws IOException {
|
||||
private Directory getDirectory(String name, boolean failIfNotAssociated) throws IOException {
|
||||
if (usePrimary(name)) {
|
||||
return distributor.primary();
|
||||
}
|
||||
Directory directory = nameDirMapping.get(name);
|
||||
if (directory == null) {
|
||||
// name is not yet bound to a directory:
|
||||
|
||||
if (iterate) { // in order to get stuff like "write.lock" that might not be written through this directory
|
||||
for (Directory dir : distributor.all()) {
|
||||
if (dir.fileExists(name)) {
|
||||
directory = nameDirMapping.putIfAbsent(name, dir);
|
||||
return directory == null ? dir : directory;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (failIfNotAssociated) {
|
||||
throw new FileNotFoundException("No such file [" + name + "]");
|
||||
}
|
||||
|
|
|
@ -32,6 +32,7 @@ import org.apache.lucene.util.Version;
|
|||
import org.elasticsearch.ExceptionsHelper;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.collect.Tuple;
|
||||
import org.elasticsearch.common.compress.Compressor;
|
||||
import org.elasticsearch.common.compress.CompressorFactory;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
|
@ -500,7 +501,7 @@ public class Store extends AbstractIndexShardComponent implements CloseableIndex
|
|||
|
||||
ImmutableMap<String, StoreFileMetaData> buildMetadata(IndexCommit commit, Directory directory, ESLogger logger) throws IOException {
|
||||
ImmutableMap.Builder<String, StoreFileMetaData> builder = ImmutableMap.builder();
|
||||
Map<String, String> checksumMap = readLegacyChecksums(directory);
|
||||
Map<String, String> checksumMap = readLegacyChecksums(directory).v1();
|
||||
try {
|
||||
final SegmentInfos segmentCommitInfos = Store.readSegmentsInfo(commit, directory);
|
||||
Version maxVersion = Version.LUCENE_3_0; // we don't know which version was used to write so we take the max version.
|
||||
|
@ -545,7 +546,7 @@ public class Store extends AbstractIndexShardComponent implements CloseableIndex
|
|||
return builder.build();
|
||||
}
|
||||
|
||||
static Map<String, String> readLegacyChecksums(Directory directory) throws IOException {
|
||||
static Tuple<Map<String, String>, Long> readLegacyChecksums(Directory directory) throws IOException {
|
||||
synchronized (directory) {
|
||||
long lastFound = -1;
|
||||
for (String name : directory.listAll()) {
|
||||
|
@ -560,10 +561,10 @@ public class Store extends AbstractIndexShardComponent implements CloseableIndex
|
|||
if (lastFound > -1) {
|
||||
try (IndexInput indexInput = directory.openInput(CHECKSUMS_PREFIX + lastFound, IOContext.READONCE)) {
|
||||
indexInput.readInt(); // version
|
||||
return indexInput.readStringStringMap();
|
||||
return new Tuple(indexInput.readStringStringMap(), lastFound);
|
||||
}
|
||||
}
|
||||
return new HashMap<>();
|
||||
return new Tuple(new HashMap<>(), -1l);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -758,19 +759,20 @@ public class Store extends AbstractIndexShardComponent implements CloseableIndex
|
|||
|
||||
public synchronized void write(Store store) throws IOException {
|
||||
synchronized (store.distributorDirectory) {
|
||||
Map<String, String> stringStringMap = MetadataSnapshot.readLegacyChecksums(store.distributorDirectory);
|
||||
stringStringMap.putAll(legacyChecksums);
|
||||
if (!stringStringMap.isEmpty()) {
|
||||
writeChecksums(store.directory, stringStringMap);
|
||||
Tuple<Map<String, String>, Long> tuple = MetadataSnapshot.readLegacyChecksums(store.distributorDirectory);
|
||||
tuple.v1().putAll(legacyChecksums);
|
||||
if (!tuple.v1().isEmpty()) {
|
||||
writeChecksums(store.directory, tuple.v1(), tuple.v2());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
synchronized void writeChecksums(Directory directory, Map<String, String> checksums) throws IOException {
|
||||
String checksumName = CHECKSUMS_PREFIX + System.currentTimeMillis();
|
||||
while (directory.fileExists(checksumName)) {
|
||||
checksumName = CHECKSUMS_PREFIX + System.currentTimeMillis();
|
||||
synchronized void writeChecksums(Directory directory, Map<String, String> checksums, long lastVersion) throws IOException {
|
||||
long nextVersion = System.currentTimeMillis();
|
||||
while (nextVersion <= lastVersion) {
|
||||
nextVersion = System.currentTimeMillis();
|
||||
}
|
||||
final String checksumName = CHECKSUMS_PREFIX + nextVersion;
|
||||
try (IndexOutput output = directory.createOutput(checksumName, IOContext.DEFAULT)) {
|
||||
output.writeInt(0); // version
|
||||
output.writeStringStringMap(checksums);
|
||||
|
|
|
@ -19,11 +19,15 @@
|
|||
|
||||
package org.elasticsearch.index.deletionpolicy;
|
||||
|
||||
import com.google.common.collect.Sets;
|
||||
import org.hamcrest.Description;
|
||||
import org.hamcrest.Matcher;
|
||||
import org.hamcrest.TypeSafeMatcher;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
*
|
||||
|
@ -32,14 +36,15 @@ public class SnapshotIndexCommitExistsMatcher extends TypeSafeMatcher<SnapshotIn
|
|||
|
||||
@Override
|
||||
public boolean matchesSafely(SnapshotIndexCommit snapshotIndexCommit) {
|
||||
for (String fileName : snapshotIndexCommit.getFiles()) {
|
||||
try {
|
||||
if (!snapshotIndexCommit.getDirectory().fileExists(fileName)) {
|
||||
try {
|
||||
HashSet<String> files = Sets.newHashSet(snapshotIndexCommit.getDirectory().listAll());
|
||||
for (String fileName : snapshotIndexCommit.getFiles()) {
|
||||
if (files.contains(fileName) == false) {
|
||||
return false;
|
||||
}
|
||||
} catch (IOException e) {
|
||||
return false;
|
||||
}
|
||||
} catch (IOException ex) {
|
||||
throw new RuntimeException(ex);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue