Improve peer recovery of index files to reduce chances of corruption, closes #1158.

This commit is contained in:
Shay Banon 2011-07-26 07:18:44 +03:00
parent 9df339f23b
commit 9aa11f967a
8 changed files with 205 additions and 38 deletions

View File

@ -46,7 +46,7 @@ import java.util.concurrent.ConcurrentHashMap;
*/
public class ByteBufferDirectory extends Directory {
private final Map<String, ByteBufferFile> files = new ConcurrentHashMap<String, ByteBufferFile>();
protected final Map<String, ByteBufferFile> files = new ConcurrentHashMap<String, ByteBufferFile>();
private final ByteBufferAllocator allocator;

View File

@ -40,6 +40,7 @@ public class RecoveryStatus {
}
ConcurrentMap<String, IndexOutput> openIndexOutputs = ConcurrentCollections.newConcurrentMap();
ConcurrentMap<String, String> checksums = ConcurrentCollections.newConcurrentMap();
final long startTime = System.currentTimeMillis();
long time;

View File

@ -23,6 +23,7 @@ import org.apache.lucene.store.IndexOutput;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.StopWatch;
import org.elasticsearch.common.collect.Sets;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.VoidStreamable;
@ -33,7 +34,11 @@ import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.index.IndexShardMissingException;
import org.elasticsearch.index.engine.RecoveryEngineException;
import org.elasticsearch.index.service.IndexService;
import org.elasticsearch.index.shard.*;
import org.elasticsearch.index.shard.IllegalIndexShardStateException;
import org.elasticsearch.index.shard.IndexShardClosedException;
import org.elasticsearch.index.shard.IndexShardNotStartedException;
import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.index.shard.service.InternalIndexShard;
import org.elasticsearch.index.translog.Translog;
@ -41,11 +46,16 @@ import org.elasticsearch.indices.IndexMissingException;
import org.elasticsearch.indices.IndicesLifecycle;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.*;
import org.elasticsearch.transport.BaseTransportRequestHandler;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.FutureTransportResponseHandler;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import static org.elasticsearch.common.unit.TimeValue.*;
@ -270,6 +280,7 @@ public class RecoveryTarget extends AbstractComponent {
}
}
peerRecoveryStatus.openIndexOutputs = null;
peerRecoveryStatus.checksums = null;
}
}
@ -391,6 +402,44 @@ public class RecoveryTarget extends AbstractComponent {
@Override public void messageReceived(RecoveryCleanFilesRequest request, TransportChannel channel) throws Exception {
InternalIndexShard shard = (InternalIndexShard) indicesService.indexServiceSafe(request.shardId().index().name()).shardSafe(request.shardId().id());
RecoveryStatus onGoingRecovery = onGoingRecoveries.get(shard.shardId());
if (onGoingRecovery == null) {
// shard is getting closed on us
throw new IndexShardClosedException(shard.shardId());
}
// first, we go and move files that were created with the recovery id suffix to
// the actual names, its ok if we have a corrupted index here, since we have replicas
// to recover from in case of a full cluster shutdown just when this code executes...
String suffix = "." + onGoingRecovery.startTime;
Set<String> filesToRename = Sets.newHashSet();
for (String existingFile : shard.store().directory().listAll()) {
if (existingFile.endsWith(suffix)) {
filesToRename.add(existingFile.substring(0, existingFile.length() - suffix.length()));
}
}
Exception failureToRename = null;
if (!filesToRename.isEmpty()) {
// first, go and delete the existing ones
for (String fileToRename : filesToRename) {
shard.store().directory().deleteFile(fileToRename);
}
for (String fileToRename : filesToRename) {
// now, rename the files...
try {
shard.store().renameFile(fileToRename + suffix, fileToRename);
} catch (Exception e) {
failureToRename = e;
break;
}
}
}
if (failureToRename != null) {
throw failureToRename;
}
// now write checksums
shard.store().writeChecksums(onGoingRecovery.checksums);
for (String existingFile : shard.store().directory().listAll()) {
if (!request.snapshotFiles().contains(existingFile)) {
try {
@ -425,6 +474,7 @@ public class RecoveryTarget extends AbstractComponent {
IndexOutput indexOutput;
if (request.position() == 0) {
// first request
onGoingRecovery.checksums.remove(request.name());
indexOutput = onGoingRecovery.openIndexOutputs.remove(request.name());
if (indexOutput != null) {
try {
@ -435,7 +485,19 @@ public class RecoveryTarget extends AbstractComponent {
}
// we create an output with no checksum, this is because the pure binary data of the file is not
// the checksum (because of seek). We will create the checksum file once copying is done
indexOutput = shard.store().createOutputWithNoChecksum(request.name());
// also, we check if the file already exists, if it does, we create a file name based
// on the current recovery "id" and later we make the switch, the reason for that is that
// we only want to overwrite the index files once we copied all over, and not create a
// case where the index is half moved
String name = request.name();
if (shard.store().directory().fileExists(name)) {
name = name + "." + onGoingRecovery.startTime;
}
indexOutput = shard.store().createOutputWithNoChecksum(name);
onGoingRecovery.openIndexOutputs.put(request.name(), indexOutput);
} else {
indexOutput = onGoingRecovery.openIndexOutputs.get(request.name());
@ -453,7 +515,7 @@ public class RecoveryTarget extends AbstractComponent {
indexOutput.close();
// write the checksum
if (request.checksum() != null) {
shard.store().writeChecksum(request.name(), request.checksum());
onGoingRecovery.checksums.put(request.name(), request.checksum());
}
shard.store().directory().sync(Collections.singleton(request.name()));
onGoingRecovery.openIndexOutputs.remove(request.name());

View File

@ -26,6 +26,7 @@ import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.index.shard.IndexShardComponent;
import java.io.IOException;
import java.util.Map;
/**
* @author kimchy (shay.banon)
@ -41,6 +42,8 @@ public interface Store extends IndexShardComponent {
void writeChecksum(String name, String checksum) throws IOException;
void writeChecksums(Map<String, String> checksums) throws IOException;
StoreFileMetaData metaData(String name) throws IOException;
ImmutableMap<String, StoreFileMetaData> list() throws IOException;
@ -50,6 +53,11 @@ public interface Store extends IndexShardComponent {
*/
void deleteContent() throws IOException;
/**
* Renames, note, might not be atomic, and can fail "in the middle".
*/
void renameFile(String from, String to) throws IOException;
/**
* Deletes the store completely. For example, in FS ones, also deletes the parent
* directory.

View File

@ -19,7 +19,12 @@
package org.elasticsearch.index.store.fs;
import org.apache.lucene.store.*;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory;
import org.apache.lucene.store.LockFactory;
import org.apache.lucene.store.NativeFSLockFactory;
import org.apache.lucene.store.NoLockFactory;
import org.apache.lucene.store.SimpleFSLockFactory;
import org.apache.lucene.store.bytebuffer.ByteBufferDirectory;
import org.elasticsearch.cache.memory.ByteBufferCache;
import org.elasticsearch.common.collect.ImmutableSet;
@ -33,7 +38,9 @@ import org.elasticsearch.index.store.IndexStore;
import org.elasticsearch.index.store.support.AbstractStore;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InterruptedIOException;
/**
* @author kimchy (shay.banon)
@ -47,11 +54,40 @@ public abstract class FsStore extends AbstractStore {
}
@Override public void fullDelete() throws IOException {
FileSystemUtils.deleteRecursively(fsDirectory().getFile());
FileSystemUtils.deleteRecursively(fsDirectory().getDirectory());
// if we are the last ones, delete also the actual index
String[] list = fsDirectory().getFile().getParentFile().list();
String[] list = fsDirectory().getDirectory().getParentFile().list();
if (list == null || list.length == 0) {
FileSystemUtils.deleteRecursively(fsDirectory().getFile().getParentFile());
FileSystemUtils.deleteRecursively(fsDirectory().getDirectory().getParentFile());
}
}
@Override protected void doRenameFile(String from, String to) throws IOException {
File directory = fsDirectory().getDirectory();
File old = new File(directory, from);
File nu = new File(directory, to);
if (nu.exists())
if (!nu.delete())
throw new IOException("Cannot delete " + nu);
if (!old.exists()) {
throw new FileNotFoundException("Can't rename from [" + from + "] to [" + to + "], from does not exists");
}
boolean renamed = false;
for (int i = 0; i < 3; i++) {
if (old.renameTo(nu)) {
renamed = true;
break;
}
try {
Thread.sleep(100);
} catch (InterruptedException e) {
throw new InterruptedIOException(e.getMessage());
}
}
if (!renamed) {
throw new IOException("Failed to rename, from [" + from + "], to [" + to + "]");
}
}
@ -62,30 +98,7 @@ public abstract class FsStore extends AbstractStore {
LockFactory lockFactory = new NoLockFactory();
if (fsLock.equals("native")) {
// TODO LUCENE MONITOR: this is not needed in next Lucene version
lockFactory = new NativeFSLockFactory() {
@Override public void clearLock(String lockName) throws IOException {
// Note that this isn't strictly required anymore
// because the existence of these files does not mean
// they are locked, but, still do this in case people
// really want to see the files go away:
if (lockDir.exists()) {
// Try to release the lock first - if it's held by another process, this
// method should not silently fail.
// NOTE: makeLock fixes the lock name by prefixing it w/ lockPrefix.
// Therefore it should be called before the code block next which prefixes
// the given name.
makeLock(lockName).release();
if (lockPrefix != null) {
lockName = lockPrefix + "-" + lockName;
}
// As mentioned above, we don't care if the deletion of the file failed.
new File(lockDir, lockName).delete();
}
}
};
lockFactory = new NativeFSLockFactory();
} else if (fsLock.equals("simple")) {
lockFactory = new SimpleFSLockFactory();
}

View File

@ -20,7 +20,9 @@
package org.elasticsearch.index.store.memory;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.bytebuffer.ByteBufferAllocator;
import org.apache.lucene.store.bytebuffer.ByteBufferDirectory;
import org.apache.lucene.store.bytebuffer.ByteBufferFile;
import org.elasticsearch.cache.memory.ByteBufferCache;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
@ -29,6 +31,7 @@ import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.IndexStore;
import org.elasticsearch.index.store.support.AbstractStore;
import java.io.FileNotFoundException;
import java.io.IOException;
/**
@ -36,12 +39,15 @@ import java.io.IOException;
*/
public class ByteBufferStore extends AbstractStore {
private final CustomByteBufferDirectory bbDirectory;
private final Directory directory;
@Inject public ByteBufferStore(ShardId shardId, @IndexSettings Settings indexSettings, IndexStore indexStore, ByteBufferCache byteBufferCache) throws IOException {
super(shardId, indexSettings, indexStore);
this.directory = wrapDirectory(new ByteBufferDirectory(byteBufferCache));
this.bbDirectory = new CustomByteBufferDirectory(byteBufferCache);
this.directory = wrapDirectory(bbDirectory);
logger.debug("Using [byte_buffer] store");
}
@ -55,4 +61,29 @@ public class ByteBufferStore extends AbstractStore {
@Override public boolean suggestUseCompoundFile() {
return false;
}
@Override protected void doRenameFile(String from, String to) throws IOException {
bbDirectory.renameTo(from, to);
}
static class CustomByteBufferDirectory extends ByteBufferDirectory {
CustomByteBufferDirectory() {
}
CustomByteBufferDirectory(ByteBufferAllocator allocator) {
super(allocator);
}
public void renameTo(String from, String to) throws IOException {
ByteBufferFile fromFile = files.get(from);
if (fromFile == null)
throw new FileNotFoundException(from);
ByteBufferFile toFile = files.get(to);
if (toFile != null) {
files.remove(from);
}
files.put(to, fromFile);
}
}
}

View File

@ -21,6 +21,7 @@ package org.elasticsearch.index.store.ram;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.RAMDirectory;
import org.apache.lucene.store.RAMFile;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.settings.IndexSettings;
@ -28,6 +29,7 @@ import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.IndexStore;
import org.elasticsearch.index.store.support.AbstractStore;
import java.io.FileNotFoundException;
import java.io.IOException;
/**
@ -35,11 +37,14 @@ import java.io.IOException;
*/
public class RamStore extends AbstractStore {
private Directory directory;
private final CustomRAMDirectory ramDirectory;
private final Directory directory;
@Inject public RamStore(ShardId shardId, @IndexSettings Settings indexSettings, IndexStore indexStore) throws IOException {
super(shardId, indexSettings, indexStore);
this.directory = wrapDirectory(new RAMDirectory());
this.ramDirectory = new CustomRAMDirectory();
this.directory = wrapDirectory(ramDirectory);
logger.debug("Using [ram] Store");
}
@ -53,4 +58,23 @@ public class RamStore extends AbstractStore {
@Override public boolean suggestUseCompoundFile() {
return false;
}
@Override protected void doRenameFile(String from, String to) throws IOException {
ramDirectory.renameTo(from, to);
}
static class CustomRAMDirectory extends RAMDirectory {
public synchronized void renameTo(String from, String to) throws IOException {
RAMFile fromFile = fileMap.get(from);
if (fromFile == null)
throw new FileNotFoundException(from);
RAMFile toFile = fileMap.get(to);
if (toFile != null) {
sizeInBytes.addAndGet(-fileLength(from));
fileMap.remove(from);
}
fileMap.put(to, fromFile);
}
}
}

View File

@ -19,7 +19,11 @@
package org.elasticsearch.index.store.support;
import org.apache.lucene.store.*;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.store.Lock;
import org.apache.lucene.store.LockFactory;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.Unicode;
import org.elasticsearch.common.collect.ImmutableMap;
@ -121,6 +125,18 @@ public abstract class AbstractStore extends AbstractIndexShardComponent implemen
return Directories.estimateSize(directory());
}
@Override public void renameFile(String from, String to) throws IOException {
doRenameFile(from, to);
synchronized (mutex) {
StoreFileMetaData fromMetaData = filesMetadata.get(from); // we should always find this one
StoreFileMetaData toMetaData = new StoreFileMetaData(fromMetaData.name(), fromMetaData.length(), fromMetaData.lastModified(), fromMetaData.checksum());
filesMetadata = MapBuilder.newMapBuilder(filesMetadata).remove(from).put(to, toMetaData).immutableMap();
files = filesMetadata.keySet().toArray(new String[filesMetadata.size()]);
}
}
protected abstract void doRenameFile(String from, String to) throws IOException;
public static Map<String, String> readChecksums(Directory dir) throws IOException {
long lastFound = -1;
for (String name : dir.listAll()) {
@ -202,10 +218,22 @@ public abstract class AbstractStore extends AbstractIndexShardComponent implemen
}
}
@Override public void writeChecksums(Map<String, String> checksums) throws IOException {
// update the metadata to include the checksum and write a new checksums file
synchronized (mutex) {
for (Map.Entry<String, String> entry : checksums.entrySet()) {
StoreFileMetaData metaData = filesMetadata.get(entry.getKey());
metaData = new StoreFileMetaData(metaData.name(), metaData.length(), metaData.lastModified(), entry.getValue());
filesMetadata = MapBuilder.newMapBuilder(filesMetadata).put(entry.getKey(), metaData).immutableMap();
}
writeChecksums();
}
}
/**
* The idea of the store directory is to cache file level meta data, as well as md5 of it
*/
class StoreDirectory extends Directory implements ForceSyncDirectory {
protected class StoreDirectory extends Directory implements ForceSyncDirectory {
private final Directory delegate;