[STORE] Synchronize operations that modify file mappings on DistributorDirectory
The rename(String, String) method doesn't allow this implementation to use a simple concurrent map. There is a race during a rename operation where files are not fully renamed but already visible via #listAll(). This inconsistency can lead to problems when opening commit points since the pending_segments_N as well as segments_N are visible but not yet atomically renamed. Yet, non of the methods that are synced are long running such that adding sychronization doesn't introduce bottlenecks here. The Direcotry#sync(...) method is not synchronized since it doesn't change any mapping nor does it depend on the mapping.
This commit is contained in:
parent
2eccbf50fe
commit
0ff44d4d27
|
@ -22,13 +22,12 @@ import org.apache.lucene.store.*;
|
|||
import org.apache.lucene.util.IOUtils;
|
||||
import org.elasticsearch.common.logging.ESLogger;
|
||||
import org.elasticsearch.common.math.MathUtils;
|
||||
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
|
||||
import org.elasticsearch.index.store.distributor.Distributor;
|
||||
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.util.Collection;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.HashMap;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
/**
|
||||
|
@ -38,7 +37,7 @@ import java.util.concurrent.atomic.AtomicInteger;
|
|||
public final class DistributorDirectory extends BaseDirectory {
|
||||
|
||||
private final Distributor distributor;
|
||||
private final ConcurrentMap<String, Directory> nameDirMapping = ConcurrentCollections.newConcurrentMap();
|
||||
private final HashMap<String, Directory> nameDirMapping = new HashMap<>();
|
||||
|
||||
/**
|
||||
* Creates a new DistributorDirectory from multiple directories. Note: The first directory in the given array
|
||||
|
@ -80,60 +79,55 @@ public final class DistributorDirectory extends BaseDirectory {
|
|||
}
|
||||
|
||||
@Override
|
||||
public final String[] listAll() throws IOException {
|
||||
return nameDirMapping.keySet().toArray(new String[0]);
|
||||
public synchronized final String[] listAll() throws IOException {
|
||||
return nameDirMapping.keySet().toArray(new String[nameDirMapping.size()]);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void deleteFile(String name) throws IOException {
|
||||
public synchronized void deleteFile(String name) throws IOException {
|
||||
getDirectory(name, true).deleteFile(name);
|
||||
Directory remove = nameDirMapping.remove(name);
|
||||
assert remove != null : "Tried to delete file " + name + " but couldn't";
|
||||
}
|
||||
|
||||
@Override
|
||||
public long fileLength(String name) throws IOException {
|
||||
public synchronized long fileLength(String name) throws IOException {
|
||||
return getDirectory(name).fileLength(name);
|
||||
}
|
||||
|
||||
@Override
|
||||
public IndexOutput createOutput(String name, IOContext context) throws IOException {
|
||||
public synchronized IndexOutput createOutput(String name, IOContext context) throws IOException {
|
||||
return getDirectory(name, false).createOutput(name, context);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void sync(Collection<String> names) throws IOException {
|
||||
// no need to sync this operation it could be long running too
|
||||
for (Directory dir : distributor.all()) {
|
||||
dir.sync(names);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void renameFile(String source, String dest) throws IOException {
|
||||
Directory directory = getDirectory(source);
|
||||
if (nameDirMapping.putIfAbsent(dest, directory) != null) {
|
||||
public synchronized void renameFile(String source, String dest) throws IOException {
|
||||
final Directory directory = getDirectory(source);
|
||||
final Directory targetDir = nameDirMapping.get(dest);
|
||||
if (targetDir != null && targetDir != directory) {
|
||||
throw new IOException("Can't rename file from " + source
|
||||
+ " to: " + dest + ": target file already exists");
|
||||
}
|
||||
boolean success = false;
|
||||
try {
|
||||
directory.renameFile(source, dest);
|
||||
nameDirMapping.remove(source);
|
||||
success = true;
|
||||
} finally {
|
||||
if (!success) {
|
||||
nameDirMapping.remove(dest);
|
||||
}
|
||||
+ " to: " + dest + ": target file already exists in a different directory");
|
||||
}
|
||||
directory.renameFile(source, dest);
|
||||
nameDirMapping.remove(source);
|
||||
nameDirMapping.put(dest, directory);
|
||||
}
|
||||
|
||||
@Override
|
||||
public IndexInput openInput(String name, IOContext context) throws IOException {
|
||||
public synchronized IndexInput openInput(String name, IOContext context) throws IOException {
|
||||
return getDirectory(name).openInput(name, context);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
public synchronized void close() throws IOException {
|
||||
IOUtils.close(distributor.all());
|
||||
}
|
||||
|
||||
|
@ -142,7 +136,7 @@ public final class DistributorDirectory extends BaseDirectory {
|
|||
*
|
||||
* @throws IOException if the name has not yet been associated with any directory ie. fi the file does not exists
|
||||
*/
|
||||
private Directory getDirectory(String name) throws IOException {
|
||||
Directory getDirectory(String name) throws IOException { // pkg private for testing
|
||||
return getDirectory(name, true);
|
||||
}
|
||||
|
||||
|
@ -151,36 +145,34 @@ public final class DistributorDirectory extends BaseDirectory {
|
|||
* if failIfNotAssociated is set to false.
|
||||
*/
|
||||
private Directory getDirectory(String name, boolean failIfNotAssociated) throws IOException {
|
||||
Directory directory = nameDirMapping.get(name);
|
||||
final Directory directory = nameDirMapping.get(name);
|
||||
if (directory == null) {
|
||||
if (failIfNotAssociated) {
|
||||
throw new FileNotFoundException("No such file [" + name + "]");
|
||||
}
|
||||
// Pick a directory and associate this new file with it:
|
||||
final Directory dir = distributor.any();
|
||||
directory = nameDirMapping.putIfAbsent(name, dir);
|
||||
if (directory == null) {
|
||||
// putIfAbsent did in fact put dir:
|
||||
directory = dir;
|
||||
}
|
||||
assert nameDirMapping.containsKey(name) == false;
|
||||
nameDirMapping.put(name, dir);
|
||||
return dir;
|
||||
}
|
||||
|
||||
return directory;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setLockFactory(LockFactory lockFactory) throws IOException {
|
||||
public synchronized void setLockFactory(LockFactory lockFactory) throws IOException {
|
||||
distributor.primary().setLockFactory(lockFactory);
|
||||
super.setLockFactory(new DistributorLockFactoryWrapper(distributor.primary()));
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getLockID() {
|
||||
public synchronized String getLockID() {
|
||||
return distributor.primary().getLockID();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
public synchronized String toString() {
|
||||
return distributor.toString();
|
||||
}
|
||||
|
||||
|
@ -192,31 +184,33 @@ public final class DistributorDirectory extends BaseDirectory {
|
|||
* Basic checks to ensure the internal mapping is consistent - should only be used in assertions
|
||||
*/
|
||||
static boolean assertConsistency(ESLogger logger, DistributorDirectory dir) throws IOException {
|
||||
boolean consistent = true;
|
||||
StringBuilder builder = new StringBuilder();
|
||||
Directory[] all = dir.distributor.all();
|
||||
for (Directory d : all) {
|
||||
for (String file : d.listAll()) {
|
||||
final Directory directory = dir.nameDirMapping.get(file);
|
||||
if (directory == null) {
|
||||
consistent = false;
|
||||
builder.append("File ").append(file)
|
||||
.append(" was not mapped to a directory but exists in one of the distributors directories")
|
||||
.append(System.lineSeparator());
|
||||
} else if (directory != d) {
|
||||
consistent = false;
|
||||
builder.append("File ").append(file).append(" was mapped to a directory ").append(directory)
|
||||
.append(" but exists in another distributor directory").append(d)
|
||||
.append(System.lineSeparator());
|
||||
}
|
||||
synchronized (dir) {
|
||||
boolean consistent = true;
|
||||
StringBuilder builder = new StringBuilder();
|
||||
Directory[] all = dir.distributor.all();
|
||||
for (Directory d : all) {
|
||||
for (String file : d.listAll()) {
|
||||
final Directory directory = dir.nameDirMapping.get(file);
|
||||
if (directory == null) {
|
||||
consistent = false;
|
||||
builder.append("File ").append(file)
|
||||
.append(" was not mapped to a directory but exists in one of the distributors directories")
|
||||
.append(System.lineSeparator());
|
||||
} else if (directory != d) {
|
||||
consistent = false;
|
||||
builder.append("File ").append(file).append(" was mapped to a directory ").append(directory)
|
||||
.append(" but exists in another distributor directory").append(d)
|
||||
.append(System.lineSeparator());
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
if (consistent == false) {
|
||||
logger.info(builder.toString());
|
||||
}
|
||||
assert consistent : builder.toString();
|
||||
return consistent; // return boolean so it can be easily be used in asserts
|
||||
}
|
||||
if (consistent == false) {
|
||||
logger.info(builder.toString());
|
||||
}
|
||||
assert consistent: builder.toString();
|
||||
return consistent; // return boolean so it can be easily be used in asserts
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -279,8 +273,12 @@ public final class DistributorDirectory extends BaseDirectory {
|
|||
public boolean obtain() throws IOException {
|
||||
if (delegateLock.obtain()) {
|
||||
if (writesFiles) {
|
||||
assert (nameDirMapping.containsKey(name) == false || nameDirMapping.get(name) == dir);
|
||||
nameDirMapping.putIfAbsent(name, dir);
|
||||
synchronized (DistributorDirectory.this) {
|
||||
assert (nameDirMapping.containsKey(name) == false || nameDirMapping.get(name) == dir);
|
||||
if (nameDirMapping.get(name) == null) {
|
||||
nameDirMapping.put(name, dir);
|
||||
}
|
||||
}
|
||||
}
|
||||
return true;
|
||||
} else {
|
||||
|
|
|
@ -158,14 +158,15 @@ public class DistributorDirectoryTest extends BaseDirectoryTestCase {
|
|||
try (IndexOutput out = dd.createOutput("foo.bar", IOContext.DEFAULT)) {
|
||||
out.writeInt(1);
|
||||
}
|
||||
try {
|
||||
dd.renameFile("foo.bar", file);
|
||||
fail("target file already exists");
|
||||
} catch (IOException ex) {
|
||||
// target file already exists
|
||||
assertNotNull(dd);
|
||||
if (dd.getDirectory("foo.bar") != dd.getDirectory(file)) {
|
||||
try {
|
||||
dd.renameFile("foo.bar", file);
|
||||
fail("target file already exists in a different directory");
|
||||
} catch (IOException ex) {
|
||||
// target file already exists
|
||||
}
|
||||
}
|
||||
|
||||
theDir.deleteFile(file);
|
||||
assertTrue(DistributorDirectory.assertConsistency(logger, dd));
|
||||
IOUtils.close(dd);
|
||||
}
|
||||
|
|
|
@ -415,19 +415,28 @@ public class StoreTest extends ElasticsearchLuceneTestCase {
|
|||
CodecUtil.writeFooter(output);
|
||||
output.close();
|
||||
}
|
||||
try {
|
||||
store.renameFile("foo.bar", "bar.foo");
|
||||
fail("targe file already exists");
|
||||
} catch (IOException ex) {
|
||||
// expected
|
||||
}
|
||||
DistributorDirectory distributorDirectory = DirectoryUtils.getLeaf(store.directory(), DistributorDirectory.class);
|
||||
assertNotNull(distributorDirectory);
|
||||
if (distributorDirectory.getDirectory("foo.bar") != distributorDirectory.getDirectory("bar.foo")) {
|
||||
try {
|
||||
store.renameFile("foo.bar", "bar.foo");
|
||||
fail("target file already exists in a different directory");
|
||||
} catch (IOException ex) {
|
||||
// expected
|
||||
}
|
||||
|
||||
try (IndexInput input = store.directory().openInput("bar.foo", IOContext.DEFAULT)) {
|
||||
assertThat(lastChecksum, equalTo(CodecUtil.checksumEntireFile(input)));
|
||||
try (IndexInput input = store.directory().openInput("bar.foo", IOContext.DEFAULT)) {
|
||||
assertThat(lastChecksum, equalTo(CodecUtil.checksumEntireFile(input)));
|
||||
}
|
||||
assertThat(store.directory().listAll().length, is(2));
|
||||
assertDeleteContent(store, directoryService);
|
||||
IOUtils.close(store);
|
||||
} else {
|
||||
store.renameFile("foo.bar", "bar.foo");
|
||||
assertThat(store.directory().listAll().length, is(1));
|
||||
assertDeleteContent(store, directoryService);
|
||||
IOUtils.close(store);
|
||||
}
|
||||
assertThat(store.directory().listAll().length, is(2));
|
||||
assertDeleteContent(store, directoryService);
|
||||
IOUtils.close(store);
|
||||
}
|
||||
|
||||
public void testCheckIntegrity() throws IOException {
|
||||
|
|
Loading…
Reference in New Issue