diff --git a/modules/elasticsearch/src/main/java/org/apache/lucene/store/bytebuffer/ByteBufferDirectory.java b/modules/elasticsearch/src/main/java/org/apache/lucene/store/bytebuffer/ByteBufferDirectory.java index ddbc03931a3..836033841f1 100644 --- a/modules/elasticsearch/src/main/java/org/apache/lucene/store/bytebuffer/ByteBufferDirectory.java +++ b/modules/elasticsearch/src/main/java/org/apache/lucene/store/bytebuffer/ByteBufferDirectory.java @@ -46,7 +46,7 @@ import java.util.concurrent.ConcurrentHashMap; */ public class ByteBufferDirectory extends Directory { - private final Map files = new ConcurrentHashMap(); + protected final Map files = new ConcurrentHashMap(); private final ByteBufferAllocator allocator; diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoveryStatus.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoveryStatus.java index 3b87166eb16..c20c2f01c3b 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoveryStatus.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoveryStatus.java @@ -40,6 +40,7 @@ public class RecoveryStatus { } ConcurrentMap openIndexOutputs = ConcurrentCollections.newConcurrentMap(); + ConcurrentMap checksums = ConcurrentCollections.newConcurrentMap(); final long startTime = System.currentTimeMillis(); long time; diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoveryTarget.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoveryTarget.java index 61c349faff3..54dfdcbd52f 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoveryTarget.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoveryTarget.java @@ -23,6 +23,7 @@ import org.apache.lucene.store.IndexOutput; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.StopWatch; +import org.elasticsearch.common.collect.Sets; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.VoidStreamable; @@ -33,7 +34,11 @@ import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.index.IndexShardMissingException; import org.elasticsearch.index.engine.RecoveryEngineException; import org.elasticsearch.index.service.IndexService; -import org.elasticsearch.index.shard.*; +import org.elasticsearch.index.shard.IllegalIndexShardStateException; +import org.elasticsearch.index.shard.IndexShardClosedException; +import org.elasticsearch.index.shard.IndexShardNotStartedException; +import org.elasticsearch.index.shard.IndexShardState; +import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.service.IndexShard; import org.elasticsearch.index.shard.service.InternalIndexShard; import org.elasticsearch.index.translog.Translog; @@ -41,11 +46,16 @@ import org.elasticsearch.indices.IndexMissingException; import org.elasticsearch.indices.IndicesLifecycle; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.transport.*; +import org.elasticsearch.transport.BaseTransportRequestHandler; +import org.elasticsearch.transport.ConnectTransportException; +import org.elasticsearch.transport.FutureTransportResponseHandler; +import org.elasticsearch.transport.TransportChannel; +import org.elasticsearch.transport.TransportService; import java.io.IOException; import java.util.Collections; import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentMap; import static org.elasticsearch.common.unit.TimeValue.*; @@ -270,6 +280,7 @@ public class RecoveryTarget extends AbstractComponent { } } peerRecoveryStatus.openIndexOutputs = null; + peerRecoveryStatus.checksums = null; } } @@ -391,6 +402,44 @@ public class RecoveryTarget extends AbstractComponent { @Override public void messageReceived(RecoveryCleanFilesRequest request, TransportChannel channel) throws Exception { InternalIndexShard shard = (InternalIndexShard) indicesService.indexServiceSafe(request.shardId().index().name()).shardSafe(request.shardId().id()); + RecoveryStatus onGoingRecovery = onGoingRecoveries.get(shard.shardId()); + if (onGoingRecovery == null) { + // shard is getting closed on us + throw new IndexShardClosedException(shard.shardId()); + } + + // first, we go and move files that were created with the recovery id suffix to + // the actual names, its ok if we have a corrupted index here, since we have replicas + // to recover from in case of a full cluster shutdown just when this code executes... + String suffix = "." + onGoingRecovery.startTime; + Set filesToRename = Sets.newHashSet(); + for (String existingFile : shard.store().directory().listAll()) { + if (existingFile.endsWith(suffix)) { + filesToRename.add(existingFile.substring(0, existingFile.length() - suffix.length())); + } + } + Exception failureToRename = null; + if (!filesToRename.isEmpty()) { + // first, go and delete the existing ones + for (String fileToRename : filesToRename) { + shard.store().directory().deleteFile(fileToRename); + } + for (String fileToRename : filesToRename) { + // now, rename the files... + try { + shard.store().renameFile(fileToRename + suffix, fileToRename); + } catch (Exception e) { + failureToRename = e; + break; + } + } + } + if (failureToRename != null) { + throw failureToRename; + } + // now write checksums + shard.store().writeChecksums(onGoingRecovery.checksums); + for (String existingFile : shard.store().directory().listAll()) { if (!request.snapshotFiles().contains(existingFile)) { try { @@ -425,6 +474,7 @@ public class RecoveryTarget extends AbstractComponent { IndexOutput indexOutput; if (request.position() == 0) { // first request + onGoingRecovery.checksums.remove(request.name()); indexOutput = onGoingRecovery.openIndexOutputs.remove(request.name()); if (indexOutput != null) { try { @@ -435,7 +485,19 @@ public class RecoveryTarget extends AbstractComponent { } // we create an output with no checksum, this is because the pure binary data of the file is not // the checksum (because of seek). We will create the checksum file once copying is done - indexOutput = shard.store().createOutputWithNoChecksum(request.name()); + + // also, we check if the file already exists, if it does, we create a file name based + // on the current recovery "id" and later we make the switch, the reason for that is that + // we only want to overwrite the index files once we copied all over, and not create a + // case where the index is half moved + + String name = request.name(); + if (shard.store().directory().fileExists(name)) { + name = name + "." + onGoingRecovery.startTime; + } + + indexOutput = shard.store().createOutputWithNoChecksum(name); + onGoingRecovery.openIndexOutputs.put(request.name(), indexOutput); } else { indexOutput = onGoingRecovery.openIndexOutputs.get(request.name()); @@ -453,7 +515,7 @@ public class RecoveryTarget extends AbstractComponent { indexOutput.close(); // write the checksum if (request.checksum() != null) { - shard.store().writeChecksum(request.name(), request.checksum()); + onGoingRecovery.checksums.put(request.name(), request.checksum()); } shard.store().directory().sync(Collections.singleton(request.name())); onGoingRecovery.openIndexOutputs.remove(request.name()); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/Store.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/Store.java index 7dd7c827721..7588a3fc6a4 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/Store.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/Store.java @@ -26,6 +26,7 @@ import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.index.shard.IndexShardComponent; import java.io.IOException; +import java.util.Map; /** * @author kimchy (shay.banon) @@ -41,6 +42,8 @@ public interface Store extends IndexShardComponent { void writeChecksum(String name, String checksum) throws IOException; + void writeChecksums(Map checksums) throws IOException; + StoreFileMetaData metaData(String name) throws IOException; ImmutableMap list() throws IOException; @@ -50,6 +53,11 @@ public interface Store extends IndexShardComponent { */ void deleteContent() throws IOException; + /** + * Renames, note, might not be atomic, and can fail "in the middle". + */ + void renameFile(String from, String to) throws IOException; + /** * Deletes the store completely. For example, in FS ones, also deletes the parent * directory. diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/fs/FsStore.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/fs/FsStore.java index d2fbb5cf0cf..8edc0fe0be0 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/fs/FsStore.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/fs/FsStore.java @@ -19,7 +19,12 @@ package org.elasticsearch.index.store.fs; -import org.apache.lucene.store.*; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.FSDirectory; +import org.apache.lucene.store.LockFactory; +import org.apache.lucene.store.NativeFSLockFactory; +import org.apache.lucene.store.NoLockFactory; +import org.apache.lucene.store.SimpleFSLockFactory; import org.apache.lucene.store.bytebuffer.ByteBufferDirectory; import org.elasticsearch.cache.memory.ByteBufferCache; import org.elasticsearch.common.collect.ImmutableSet; @@ -33,7 +38,9 @@ import org.elasticsearch.index.store.IndexStore; import org.elasticsearch.index.store.support.AbstractStore; import java.io.File; +import java.io.FileNotFoundException; import java.io.IOException; +import java.io.InterruptedIOException; /** * @author kimchy (shay.banon) @@ -47,11 +54,40 @@ public abstract class FsStore extends AbstractStore { } @Override public void fullDelete() throws IOException { - FileSystemUtils.deleteRecursively(fsDirectory().getFile()); + FileSystemUtils.deleteRecursively(fsDirectory().getDirectory()); // if we are the last ones, delete also the actual index - String[] list = fsDirectory().getFile().getParentFile().list(); + String[] list = fsDirectory().getDirectory().getParentFile().list(); if (list == null || list.length == 0) { - FileSystemUtils.deleteRecursively(fsDirectory().getFile().getParentFile()); + FileSystemUtils.deleteRecursively(fsDirectory().getDirectory().getParentFile()); + } + } + + @Override protected void doRenameFile(String from, String to) throws IOException { + File directory = fsDirectory().getDirectory(); + File old = new File(directory, from); + File nu = new File(directory, to); + if (nu.exists()) + if (!nu.delete()) + throw new IOException("Cannot delete " + nu); + + if (!old.exists()) { + throw new FileNotFoundException("Can't rename from [" + from + "] to [" + to + "], from does not exists"); + } + + boolean renamed = false; + for (int i = 0; i < 3; i++) { + if (old.renameTo(nu)) { + renamed = true; + break; + } + try { + Thread.sleep(100); + } catch (InterruptedException e) { + throw new InterruptedIOException(e.getMessage()); + } + } + if (!renamed) { + throw new IOException("Failed to rename, from [" + from + "], to [" + to + "]"); } } @@ -62,30 +98,7 @@ public abstract class FsStore extends AbstractStore { LockFactory lockFactory = new NoLockFactory(); if (fsLock.equals("native")) { // TODO LUCENE MONITOR: this is not needed in next Lucene version - lockFactory = new NativeFSLockFactory() { - @Override public void clearLock(String lockName) throws IOException { - // Note that this isn't strictly required anymore - // because the existence of these files does not mean - // they are locked, but, still do this in case people - // really want to see the files go away: - if (lockDir.exists()) { - - // Try to release the lock first - if it's held by another process, this - // method should not silently fail. - // NOTE: makeLock fixes the lock name by prefixing it w/ lockPrefix. - // Therefore it should be called before the code block next which prefixes - // the given name. - makeLock(lockName).release(); - - if (lockPrefix != null) { - lockName = lockPrefix + "-" + lockName; - } - - // As mentioned above, we don't care if the deletion of the file failed. - new File(lockDir, lockName).delete(); - } - } - }; + lockFactory = new NativeFSLockFactory(); } else if (fsLock.equals("simple")) { lockFactory = new SimpleFSLockFactory(); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/memory/ByteBufferStore.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/memory/ByteBufferStore.java index 0657c2706ed..d1a8ef381e0 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/memory/ByteBufferStore.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/memory/ByteBufferStore.java @@ -20,7 +20,9 @@ package org.elasticsearch.index.store.memory; import org.apache.lucene.store.Directory; +import org.apache.lucene.store.bytebuffer.ByteBufferAllocator; import org.apache.lucene.store.bytebuffer.ByteBufferDirectory; +import org.apache.lucene.store.bytebuffer.ByteBufferFile; import org.elasticsearch.cache.memory.ByteBufferCache; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; @@ -29,6 +31,7 @@ import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.store.IndexStore; import org.elasticsearch.index.store.support.AbstractStore; +import java.io.FileNotFoundException; import java.io.IOException; /** @@ -36,12 +39,15 @@ import java.io.IOException; */ public class ByteBufferStore extends AbstractStore { + private final CustomByteBufferDirectory bbDirectory; + private final Directory directory; @Inject public ByteBufferStore(ShardId shardId, @IndexSettings Settings indexSettings, IndexStore indexStore, ByteBufferCache byteBufferCache) throws IOException { super(shardId, indexSettings, indexStore); - this.directory = wrapDirectory(new ByteBufferDirectory(byteBufferCache)); + this.bbDirectory = new CustomByteBufferDirectory(byteBufferCache); + this.directory = wrapDirectory(bbDirectory); logger.debug("Using [byte_buffer] store"); } @@ -55,4 +61,29 @@ public class ByteBufferStore extends AbstractStore { @Override public boolean suggestUseCompoundFile() { return false; } + + @Override protected void doRenameFile(String from, String to) throws IOException { + bbDirectory.renameTo(from, to); + } + + static class CustomByteBufferDirectory extends ByteBufferDirectory { + + CustomByteBufferDirectory() { + } + + CustomByteBufferDirectory(ByteBufferAllocator allocator) { + super(allocator); + } + + public void renameTo(String from, String to) throws IOException { + ByteBufferFile fromFile = files.get(from); + if (fromFile == null) + throw new FileNotFoundException(from); + ByteBufferFile toFile = files.get(to); + if (toFile != null) { + files.remove(from); + } + files.put(to, fromFile); + } + } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/ram/RamStore.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/ram/RamStore.java index 4a0101ca1d9..21d9e48fad9 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/ram/RamStore.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/ram/RamStore.java @@ -21,6 +21,7 @@ package org.elasticsearch.index.store.ram; import org.apache.lucene.store.Directory; import org.apache.lucene.store.RAMDirectory; +import org.apache.lucene.store.RAMFile; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.settings.IndexSettings; @@ -28,6 +29,7 @@ import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.store.IndexStore; import org.elasticsearch.index.store.support.AbstractStore; +import java.io.FileNotFoundException; import java.io.IOException; /** @@ -35,11 +37,14 @@ import java.io.IOException; */ public class RamStore extends AbstractStore { - private Directory directory; + private final CustomRAMDirectory ramDirectory; + + private final Directory directory; @Inject public RamStore(ShardId shardId, @IndexSettings Settings indexSettings, IndexStore indexStore) throws IOException { super(shardId, indexSettings, indexStore); - this.directory = wrapDirectory(new RAMDirectory()); + this.ramDirectory = new CustomRAMDirectory(); + this.directory = wrapDirectory(ramDirectory); logger.debug("Using [ram] Store"); } @@ -53,4 +58,23 @@ public class RamStore extends AbstractStore { @Override public boolean suggestUseCompoundFile() { return false; } + + @Override protected void doRenameFile(String from, String to) throws IOException { + ramDirectory.renameTo(from, to); + } + + static class CustomRAMDirectory extends RAMDirectory { + + public synchronized void renameTo(String from, String to) throws IOException { + RAMFile fromFile = fileMap.get(from); + if (fromFile == null) + throw new FileNotFoundException(from); + RAMFile toFile = fileMap.get(to); + if (toFile != null) { + sizeInBytes.addAndGet(-fileLength(from)); + fileMap.remove(from); + } + fileMap.put(to, fromFile); + } + } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/support/AbstractStore.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/support/AbstractStore.java index e52e83e4545..074cfca2d4e 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/support/AbstractStore.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/support/AbstractStore.java @@ -19,7 +19,11 @@ package org.elasticsearch.index.store.support; -import org.apache.lucene.store.*; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.IndexInput; +import org.apache.lucene.store.IndexOutput; +import org.apache.lucene.store.Lock; +import org.apache.lucene.store.LockFactory; import org.elasticsearch.common.Strings; import org.elasticsearch.common.Unicode; import org.elasticsearch.common.collect.ImmutableMap; @@ -121,6 +125,18 @@ public abstract class AbstractStore extends AbstractIndexShardComponent implemen return Directories.estimateSize(directory()); } + @Override public void renameFile(String from, String to) throws IOException { + doRenameFile(from, to); + synchronized (mutex) { + StoreFileMetaData fromMetaData = filesMetadata.get(from); // we should always find this one + StoreFileMetaData toMetaData = new StoreFileMetaData(fromMetaData.name(), fromMetaData.length(), fromMetaData.lastModified(), fromMetaData.checksum()); + filesMetadata = MapBuilder.newMapBuilder(filesMetadata).remove(from).put(to, toMetaData).immutableMap(); + files = filesMetadata.keySet().toArray(new String[filesMetadata.size()]); + } + } + + protected abstract void doRenameFile(String from, String to) throws IOException; + public static Map readChecksums(Directory dir) throws IOException { long lastFound = -1; for (String name : dir.listAll()) { @@ -202,10 +218,22 @@ public abstract class AbstractStore extends AbstractIndexShardComponent implemen } } + @Override public void writeChecksums(Map checksums) throws IOException { + // update the metadata to include the checksum and write a new checksums file + synchronized (mutex) { + for (Map.Entry entry : checksums.entrySet()) { + StoreFileMetaData metaData = filesMetadata.get(entry.getKey()); + metaData = new StoreFileMetaData(metaData.name(), metaData.length(), metaData.lastModified(), entry.getValue()); + filesMetadata = MapBuilder.newMapBuilder(filesMetadata).put(entry.getKey(), metaData).immutableMap(); + } + writeChecksums(); + } + } + /** * The idea of the store directory is to cache file level meta data, as well as md5 of it */ - class StoreDirectory extends Directory implements ForceSyncDirectory { + protected class StoreDirectory extends Directory implements ForceSyncDirectory { private final Directory delegate;