diff --git a/.idea/dictionaries/kimchy.xml b/.idea/dictionaries/kimchy.xml
index 835c47c52c5..cf3699ea307 100644
--- a/.idea/dictionaries/kimchy.xml
+++ b/.idea/dictionaries/kimchy.xml
@@ -84,6 +84,7 @@
multibinder
multicast
multiline
+ multipart
mvel
nanos
newcount
diff --git a/modules/benchmark/micro/src/main/java/org/elasticsearch/benchmark/index/engine/SimpleEngineBenchmark.java b/modules/benchmark/micro/src/main/java/org/elasticsearch/benchmark/index/engine/SimpleEngineBenchmark.java
index 9d5c3a58daa..98feef3b657 100644
--- a/modules/benchmark/micro/src/main/java/org/elasticsearch/benchmark/index/engine/SimpleEngineBenchmark.java
+++ b/modules/benchmark/micro/src/main/java/org/elasticsearch/benchmark/index/engine/SimpleEngineBenchmark.java
@@ -42,10 +42,11 @@ import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.similarity.SimilarityService;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.store.memory.ByteBufferStore;
-import org.elasticsearch.index.translog.memory.MemoryTranslog;
+import org.elasticsearch.index.translog.fs.FsTranslog;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.threadpool.scaling.ScalingThreadPool;
+import java.io.File;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
@@ -302,7 +303,7 @@ public class SimpleEngineBenchmark {
ThreadPool threadPool = new ScalingThreadPool();
SnapshotDeletionPolicy deletionPolicy = new SnapshotDeletionPolicy(new KeepOnlyLastDeletionPolicy(shardId, settings));
- Engine engine = new RobinEngine(shardId, settings, store, deletionPolicy, new MemoryTranslog(shardId, settings), new LogByteSizeMergePolicyProvider(store),
+ Engine engine = new RobinEngine(shardId, settings, store, deletionPolicy, new FsTranslog(shardId, EMPTY_SETTINGS, new File("work/fs-translog"), false), new LogByteSizeMergePolicyProvider(store),
new ConcurrentMergeSchedulerProvider(shardId, settings), new AnalysisService(shardId.index()), new SimilarityService(shardId.index()));
engine.start();
diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/status/GatewaySnapshotStatus.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/status/GatewaySnapshotStatus.java
index 33a0328d784..5c3858cc017 100644
--- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/status/GatewaySnapshotStatus.java
+++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/status/GatewaySnapshotStatus.java
@@ -72,14 +72,11 @@ public class GatewaySnapshotStatus {
final long indexSize;
- final long translogOperations;
-
- public GatewaySnapshotStatus(Stage stage, long startTime, long time, long indexSize, long translogOperations) {
+ public GatewaySnapshotStatus(Stage stage, long startTime, long time, long indexSize) {
this.stage = stage;
this.startTime = startTime;
this.time = time;
this.indexSize = indexSize;
- this.translogOperations = translogOperations;
}
public Stage stage() {
@@ -113,12 +110,4 @@ public class GatewaySnapshotStatus {
public ByteSizeValue getIndexSize() {
return indexSize();
}
-
- public long translogOperations() {
- return this.translogOperations;
- }
-
- public long getTranslogOperations() {
- return translogOperations();
- }
}
diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/status/ShardStatus.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/status/ShardStatus.java
index 05dcdd2a000..67096da83f5 100644
--- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/status/ShardStatus.java
+++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/status/ShardStatus.java
@@ -254,7 +254,6 @@ public class ShardStatus extends BroadcastShardOperationResponse {
out.writeVLong(gatewaySnapshotStatus.startTime);
out.writeVLong(gatewaySnapshotStatus.time);
out.writeVLong(gatewaySnapshotStatus.indexSize);
- out.writeVLong(gatewaySnapshotStatus.translogOperations);
}
}
@@ -285,7 +284,7 @@ public class ShardStatus extends BroadcastShardOperationResponse {
if (in.readBoolean()) {
gatewaySnapshotStatus = new GatewaySnapshotStatus(GatewaySnapshotStatus.Stage.fromValue(in.readByte()),
- in.readVLong(), in.readVLong(), in.readVLong(), in.readVLong());
+ in.readVLong(), in.readVLong(), in.readVLong());
}
}
}
diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/status/TransportIndicesStatusAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/status/TransportIndicesStatusAction.java
index 0fd4b626087..1bafcd4677d 100644
--- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/status/TransportIndicesStatusAction.java
+++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/status/TransportIndicesStatusAction.java
@@ -239,7 +239,7 @@ public class TransportIndicesStatusAction extends TransportBroadcastOperationAct
break;
}
shardStatus.gatewaySnapshotStatus = new GatewaySnapshotStatus(stage, snapshotStatus.startTime(), snapshotStatus.time(),
- snapshotStatus.index().totalSize(), snapshotStatus.translog().currentTranslogOperations());
+ snapshotStatus.index().totalSize());
}
return shardStatus;
diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/PreferUnallocatedShardUnassignedStrategy.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/PreferUnallocatedShardUnassignedStrategy.java
index 756db6b81a3..6c9d5665297 100644
--- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/PreferUnallocatedShardUnassignedStrategy.java
+++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/PreferUnallocatedShardUnassignedStrategy.java
@@ -27,12 +27,11 @@ import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.MutableShardRouting;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RoutingNodes;
-import org.elasticsearch.common.blobstore.BlobMetaData;
-import org.elasticsearch.common.collect.ImmutableMap;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
+import org.elasticsearch.index.gateway.CommitPoint;
import org.elasticsearch.index.gateway.blobstore.BlobStoreIndexGateway;
import org.elasticsearch.index.service.InternalIndexService;
import org.elasticsearch.index.shard.ShardId;
@@ -155,29 +154,29 @@ public class PreferUnallocatedShardUnassignedStrategy extends AbstractComponent
if (shard.primary() && indexService.gateway() instanceof BlobStoreIndexGateway) {
BlobStoreIndexGateway indexGateway = (BlobStoreIndexGateway) indexService.gateway();
try {
- ImmutableMap indexBlobsMetaData = indexGateway.listIndexBlobs(shard.id());
+ CommitPoint commitPoint = indexGateway.findCommitPoint(shard.id());
if (logger.isDebugEnabled()) {
StringBuilder sb = new StringBuilder(shard + ": checking for pre_allocation (gateway) on node " + discoNode + "\n");
sb.append(" gateway_files:\n");
- for (BlobMetaData md : indexBlobsMetaData.values()) {
- sb.append(" [").append(md.name()).append("], size [").append(new ByteSizeValue(md.sizeInBytes())).append("], md5 [").append(md.md5()).append("]\n");
+ for (CommitPoint.FileInfo fileInfo : commitPoint.indexFiles()) {
+ sb.append(" [").append(fileInfo.name()).append("]/[" + fileInfo.physicalName() + "], size [").append(new ByteSizeValue(fileInfo.length())).append("]\n");
}
sb.append(" node_files:\n");
for (StoreFileMetaData md : storeFilesMetaData) {
- sb.append(" [").append(md.name()).append("], size [").append(new ByteSizeValue(md.sizeInBytes())).append("], md5 [").append(md.md5()).append("]\n");
+ sb.append(" [").append(md.name()).append("], size [").append(new ByteSizeValue(md.length())).append("]\n");
}
logger.debug(sb.toString());
}
- logger.trace("{}: checking for pre_allocation (gateway) on node [{}]\n gateway files", shard, discoNode, indexBlobsMetaData.keySet());
long sizeMatched = 0;
for (StoreFileMetaData storeFileMetaData : storeFilesMetaData) {
- if (indexBlobsMetaData.containsKey(storeFileMetaData.name())) {
- if (indexBlobsMetaData.get(storeFileMetaData.name()).md5().equals(storeFileMetaData.md5())) {
- logger.trace("{}: [{}] reusing file since it exists on remote node and on gateway (same md5) with size [{}]", shard, storeFileMetaData.name(), new ByteSizeValue(storeFileMetaData.sizeInBytes()));
- sizeMatched += storeFileMetaData.sizeInBytes();
+ CommitPoint.FileInfo fileInfo = commitPoint.findPhysicalIndexFile(storeFileMetaData.name());
+ if (fileInfo != null) {
+ if (fileInfo.length() == storeFileMetaData.length()) {
+ logger.trace("{}: [{}] reusing file since it exists on remote node and on gateway with size [{}]", shard, storeFileMetaData.name(), new ByteSizeValue(storeFileMetaData.length()));
+ sizeMatched += storeFileMetaData.length();
} else {
- logger.trace("{}: [{}] ignore file since it exists on remote node and on gateway but has different md5, remote node [{}], gateway [{}]", shard, storeFileMetaData.name(), storeFileMetaData.md5(), indexBlobsMetaData.get(storeFileMetaData.name()).md5());
+ logger.trace("{}: [{}] ignore file since it exists on remote node and on gateway but has different size, remote node [{}], gateway [{}]", shard, storeFileMetaData.name(), storeFileMetaData.length(), fileInfo.length());
}
} else {
logger.trace("{}: [{}] exists on remote node, does not exists on gateway", shard, storeFileMetaData.name());
@@ -210,8 +209,8 @@ public class PreferUnallocatedShardUnassignedStrategy extends AbstractComponent
IndexStore.StoreFilesMetaData primaryStoreFilesMetaData = primaryNodeStoreFileMetaData.storeFilesMetaData();
for (StoreFileMetaData storeFileMetaData : storeFilesMetaData) {
- if (primaryStoreFilesMetaData.fileExists(storeFileMetaData.name()) && primaryStoreFilesMetaData.file(storeFileMetaData.name()).sizeInBytes() == storeFileMetaData.sizeInBytes()) {
- sizeMatched += storeFileMetaData.sizeInBytes();
+ if (primaryStoreFilesMetaData.fileExists(storeFileMetaData.name()) && primaryStoreFilesMetaData.file(storeFileMetaData.name()).length() == storeFileMetaData.length()) {
+ sizeMatched += storeFileMetaData.length();
}
}
if (sizeMatched > lastSizeMatched) {
diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/common/blobstore/AppendableBlobContainer.java b/modules/elasticsearch/src/main/java/org/elasticsearch/common/blobstore/AppendableBlobContainer.java
deleted file mode 100644
index a6f9b53e797..00000000000
--- a/modules/elasticsearch/src/main/java/org/elasticsearch/common/blobstore/AppendableBlobContainer.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to Elastic Search and Shay Banon under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. Elastic Search licenses this
- * file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.elasticsearch.common.blobstore;
-
-import org.elasticsearch.common.io.stream.StreamOutput;
-
-import java.io.IOException;
-
-/**
- * @author kimchy (shay.banon)
- */
-public interface AppendableBlobContainer extends BlobContainer {
-
- interface AppendBlobListener {
- void withStream(StreamOutput os) throws IOException;
-
- void onCompleted();
-
- void onFailure(Throwable t);
- }
-
- interface AppendableBlob {
-
- void append(AppendBlobListener listener);
-
- void close();
- }
-
- AppendableBlob appendBlob(String blobName) throws IOException;
-}
diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/common/blobstore/BlobMetaData.java b/modules/elasticsearch/src/main/java/org/elasticsearch/common/blobstore/BlobMetaData.java
index f698fde531f..4d6e41379ef 100644
--- a/modules/elasticsearch/src/main/java/org/elasticsearch/common/blobstore/BlobMetaData.java
+++ b/modules/elasticsearch/src/main/java/org/elasticsearch/common/blobstore/BlobMetaData.java
@@ -26,10 +26,5 @@ public interface BlobMetaData {
String name();
- long sizeInBytes();
-
- /**
- * If the blob store support native md5 checksum, return it. Can be null.
- */
- String md5();
+ long length();
}
diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/common/blobstore/BlobStore.java b/modules/elasticsearch/src/main/java/org/elasticsearch/common/blobstore/BlobStore.java
index 0b326d1860c..4ac7927e4f8 100644
--- a/modules/elasticsearch/src/main/java/org/elasticsearch/common/blobstore/BlobStore.java
+++ b/modules/elasticsearch/src/main/java/org/elasticsearch/common/blobstore/BlobStore.java
@@ -7,8 +7,6 @@ public interface BlobStore {
ImmutableBlobContainer immutableBlobContainer(BlobPath path);
- AppendableBlobContainer appendableBlobContainer(BlobPath path);
-
void delete(BlobPath path);
void close();
diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/common/blobstore/fs/AbstractFsBlobContainer.java b/modules/elasticsearch/src/main/java/org/elasticsearch/common/blobstore/fs/AbstractFsBlobContainer.java
index dbfb46b3488..9a0acb7b25d 100644
--- a/modules/elasticsearch/src/main/java/org/elasticsearch/common/blobstore/fs/AbstractFsBlobContainer.java
+++ b/modules/elasticsearch/src/main/java/org/elasticsearch/common/blobstore/fs/AbstractFsBlobContainer.java
@@ -52,7 +52,7 @@ public abstract class AbstractFsBlobContainer extends AbstractBlobContainer {
}
ImmutableMap.Builder builder = ImmutableMap.builder();
for (File file : files) {
- builder.put(file.getName(), new PlainBlobMetaData(file.getName(), file.length(), null));
+ builder.put(file.getName(), new PlainBlobMetaData(file.getName(), file.length()));
}
return builder.build();
}
diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/common/blobstore/fs/FsAppendableBlobContainer.java b/modules/elasticsearch/src/main/java/org/elasticsearch/common/blobstore/fs/FsAppendableBlobContainer.java
deleted file mode 100644
index 32b37a28b26..00000000000
--- a/modules/elasticsearch/src/main/java/org/elasticsearch/common/blobstore/fs/FsAppendableBlobContainer.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to Elastic Search and Shay Banon under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. Elastic Search licenses this
- * file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.elasticsearch.common.blobstore.fs;
-
-import org.elasticsearch.common.blobstore.AppendableBlobContainer;
-import org.elasticsearch.common.blobstore.BlobPath;
-import org.elasticsearch.common.io.FileSystemUtils;
-import org.elasticsearch.common.io.stream.DataOutputStreamOutput;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.RandomAccessFile;
-
-/**
- * @author kimchy (shay.banon)
- */
-public class FsAppendableBlobContainer extends AbstractFsBlobContainer implements AppendableBlobContainer {
-
- public FsAppendableBlobContainer(FsBlobStore blobStore, BlobPath blobPath, File path) {
- super(blobStore, blobPath, path);
- }
-
- @Override public AppendableBlob appendBlob(String blobName) throws IOException {
- return new FsAppendableBlob(new File(path, blobName));
- }
-
- private class FsAppendableBlob implements AppendableBlob {
-
- private final File file;
-
- public FsAppendableBlob(File file) throws IOException {
- this.file = file;
- }
-
- @Override public void append(final AppendBlobListener listener) {
- blobStore.executor().execute(new Runnable() {
- @Override public void run() {
- RandomAccessFile raf = null;
- try {
- raf = new RandomAccessFile(file, "rw");
- raf.seek(raf.length());
- listener.withStream(new DataOutputStreamOutput(raf));
- raf.close();
- FileSystemUtils.syncFile(file);
- listener.onCompleted();
- } catch (IOException e) {
- listener.onFailure(e);
- } finally {
- if (raf != null) {
- try {
- raf.close();
- } catch (IOException e) {
- // ignore
- }
- }
- }
- }
- });
- }
-
- @Override public void close() {
- // nothing to do there
- }
- }
-}
\ No newline at end of file
diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobStore.java b/modules/elasticsearch/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobStore.java
index b1d9e5b3c36..8e1acd55c4c 100644
--- a/modules/elasticsearch/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobStore.java
+++ b/modules/elasticsearch/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobStore.java
@@ -19,7 +19,10 @@
package org.elasticsearch.common.blobstore.fs;
-import org.elasticsearch.common.blobstore.*;
+import org.elasticsearch.common.blobstore.BlobPath;
+import org.elasticsearch.common.blobstore.BlobStore;
+import org.elasticsearch.common.blobstore.BlobStoreException;
+import org.elasticsearch.common.blobstore.ImmutableBlobContainer;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.io.FileSystemUtils;
import org.elasticsearch.common.settings.Settings;
@@ -76,10 +79,6 @@ public class FsBlobStore extends AbstractComponent implements BlobStore {
return new FsImmutableBlobContainer(this, path, buildAndCreate(path));
}
- @Override public AppendableBlobContainer appendableBlobContainer(BlobPath path) {
- return new FsAppendableBlobContainer(this, path, buildAndCreate(path));
- }
-
@Override public void delete(BlobPath path) {
FileSystemUtils.deleteRecursively(buildPath(path));
}
diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/common/blobstore/support/ImmutableAppendableBlobContainer.java b/modules/elasticsearch/src/main/java/org/elasticsearch/common/blobstore/support/ImmutableAppendableBlobContainer.java
deleted file mode 100644
index 4ff6e63d3ca..00000000000
--- a/modules/elasticsearch/src/main/java/org/elasticsearch/common/blobstore/support/ImmutableAppendableBlobContainer.java
+++ /dev/null
@@ -1,166 +0,0 @@
-/*
- * Licensed to Elastic Search and Shay Banon under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. Elastic Search licenses this
- * file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.elasticsearch.common.blobstore.support;
-
-import org.elasticsearch.common.blobstore.AppendableBlobContainer;
-import org.elasticsearch.common.blobstore.BlobMetaData;
-import org.elasticsearch.common.blobstore.ImmutableBlobContainer;
-import org.elasticsearch.common.collect.ImmutableMap;
-import org.elasticsearch.common.collect.Sets;
-import org.elasticsearch.common.io.stream.BytesStreamOutput;
-
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.util.Set;
-
-/**
- * An appendable container that uses an immutable container to implement an appendable one.
- *
- * @author kimchy (shay.banon)
- */
-public class ImmutableAppendableBlobContainer extends AbstractBlobContainer implements AppendableBlobContainer {
-
- private final ImmutableBlobContainer container;
-
- public ImmutableAppendableBlobContainer(ImmutableBlobContainer container) {
- super(container.path());
- this.container = container;
- }
-
- @Override public AppendableBlob appendBlob(final String blobName) throws IOException {
- return new AppendableBlob() {
- int part = 0;
-
- @Override public void append(final AppendBlobListener listener) {
- BytesStreamOutput out = new BytesStreamOutput();
- try {
- listener.withStream(out);
- } catch (Exception e) {
- listener.onFailure(e);
- return;
- }
- if (out.size() == 0) {
- // nothing to write, bail
- listener.onCompleted();
- return;
- }
- String partBlobName = blobName + ".a" + (part++);
- // use teh sync one
- ByteArrayInputStream is = new ByteArrayInputStream(out.unsafeByteArray(), 0, out.size());
- container.writeBlob(partBlobName, is, out.size(), new ImmutableBlobContainer.WriterListener() {
- @Override public void onCompleted() {
- listener.onCompleted();
- }
-
- @Override public void onFailure(Throwable t) {
- listener.onFailure(t);
- }
- });
- }
-
- @Override public void close() {
-
- }
- };
- }
-
- @Override public void readBlob(final String blobName, final ReadBlobListener listener) {
- container.readBlob(blobName + ".a0", new ReadBlobListener() {
- int part = 0;
-
- @Override public void onPartial(byte[] data, int offset, int size) throws IOException {
- listener.onPartial(data, offset, size);
- }
-
- @Override public void onCompleted() {
- part++;
- if (container.blobExists(blobName + ".a" + part)) {
- container.readBlob(blobName + ".a" + part, this);
- } else {
- listener.onCompleted();
- }
- }
-
- @Override public void onFailure(Throwable t) {
- listener.onFailure(t);
- }
- });
- }
-
- @Override public ImmutableMap listBlobs() throws IOException {
- return buildVirtualBlobs(container.listBlobs());
- }
-
- @Override public ImmutableMap listBlobsByPrefix(String blobNamePrefix) throws IOException {
- return buildVirtualBlobs(container.listBlobsByPrefix(blobNamePrefix));
- }
-
- @Override public boolean blobExists(String blobName) {
- return container.blobExists(blobName + ".a0");
- }
-
- @Override public boolean deleteBlob(String blobName) throws IOException {
- container.deleteBlobsByPrefix(blobName + ".");
- return true;
- }
-
- @Override public void deleteBlobsByFilter(BlobNameFilter filter) throws IOException {
- ImmutableMap blobs = buildVirtualBlobs(container.listBlobs());
- for (String blobName : blobs.keySet()) {
- if (filter.accept(blobName)) {
- deleteBlob(blobName);
- }
- }
- }
-
- @Override public void deleteBlobsByPrefix(String blobNamePrefix) throws IOException {
- container.deleteBlobsByPrefix(blobNamePrefix);
- }
-
- private ImmutableMap buildVirtualBlobs(ImmutableMap blobs) {
- Set names = Sets.newHashSet();
- for (BlobMetaData blob : blobs.values()) {
- if (blob.name().endsWith(".a0")) {
- names.add(blob.name().substring(0, blob.name().lastIndexOf(".a0")));
- }
- }
- ImmutableMap.Builder builder = ImmutableMap.builder();
- for (String name : names) {
- long sizeInBytes = 0;
- if (blobs.containsKey(name)) {
- // no chunking
- sizeInBytes = blobs.get(name).sizeInBytes();
- } else {
- // chunking...
- int part = 0;
- while (true) {
- BlobMetaData md = blobs.get(name + ".a" + part);
- if (md == null) {
- break;
- }
- sizeInBytes += md.sizeInBytes();
- part++;
- }
- }
- builder.put(name, new PlainBlobMetaData(name, sizeInBytes, null));
- }
- return builder.build();
- }
-}
diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/common/blobstore/support/PlainBlobMetaData.java b/modules/elasticsearch/src/main/java/org/elasticsearch/common/blobstore/support/PlainBlobMetaData.java
index 0266e25016e..65e59c2b5b1 100644
--- a/modules/elasticsearch/src/main/java/org/elasticsearch/common/blobstore/support/PlainBlobMetaData.java
+++ b/modules/elasticsearch/src/main/java/org/elasticsearch/common/blobstore/support/PlainBlobMetaData.java
@@ -28,29 +28,22 @@ public class PlainBlobMetaData implements BlobMetaData {
private final String name;
- private final long sizeInBytes;
+ private final long length;
- private final String md5;
-
- public PlainBlobMetaData(String name, long sizeInBytes, String md5) {
+ public PlainBlobMetaData(String name, long length) {
this.name = name;
- this.sizeInBytes = sizeInBytes;
- this.md5 = md5;
+ this.length = length;
}
@Override public String name() {
return this.name;
}
- @Override public long sizeInBytes() {
- return this.sizeInBytes;
- }
-
- @Override public String md5() {
- return this.md5;
+ @Override public long length() {
+ return this.length;
}
@Override public String toString() {
- return "name[" + name + "], sizeInBytes[" + sizeInBytes + "], md5[" + md5 + "]";
+ return "name [" + name + "], length [" + length + "]";
}
}
diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/common/io/FileChannelInputStream.java b/modules/elasticsearch/src/main/java/org/elasticsearch/common/io/FileChannelInputStream.java
new file mode 100644
index 00000000000..32a7063e768
--- /dev/null
+++ b/modules/elasticsearch/src/main/java/org/elasticsearch/common/io/FileChannelInputStream.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to Elastic Search and Shay Banon under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Elastic Search licenses this
+ * file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.common.io;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+
+/**
+ * @author kimchy (shay.banon)
+ */
+public class FileChannelInputStream extends InputStream {
+
+ private final FileChannel channel;
+
+ private long position;
+
+ private long length;
+
+ private ByteBuffer bb = null;
+ private byte[] bs = null; // Invoker's previous array
+ private byte[] b1 = null;
+
+ /**
+ * @param channel The channel to read from
+ * @param position The position to start reading from
+ * @param length The length to read
+ */
+ public FileChannelInputStream(FileChannel channel, long position, long length) {
+ this.channel = channel;
+ this.position = position;
+ this.length = position + length; // easier to work with total length
+ }
+
+ @Override public int read() throws IOException {
+ if (b1 == null) {
+ b1 = new byte[1];
+ }
+ int n = read(b1);
+ if (n == 1) {
+ return b1[0] & 0xff;
+ }
+ return -1;
+ }
+
+ @Override public int read(byte[] bs, int off, int len) throws IOException {
+ if (len == 0) {
+ return 0;
+ }
+
+ if ((length - position) < len) {
+ len = (int) (length - position);
+ }
+
+ if (len == 0) {
+ return -1;
+ }
+
+ ByteBuffer bb = ((this.bs == bs) ? this.bb : ByteBuffer.wrap(bs));
+ bb.limit(Math.min(off + len, bb.capacity()));
+ bb.position(off);
+
+ this.bb = bb;
+ this.bs = bs;
+ int read = channel.read(bb, position);
+ if (read > 0) {
+ position += read;
+ }
+ return read;
+ }
+}
diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/engine/robin/RobinEngine.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/engine/robin/RobinEngine.java
index 9ee97656e48..b5ea73a9066 100644
--- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/engine/robin/RobinEngine.java
+++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/engine/robin/RobinEngine.java
@@ -154,7 +154,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine,
}
try {
- translog.newTranslog(IndexReader.getCurrentVersion(store.directory()));
+ translog.newTranslog();
this.nrtResource = buildNrtResource(indexWriter);
} catch (IOException e) {
try {
@@ -346,7 +346,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine,
AcquirableResource current = nrtResource;
nrtResource = buildNrtResource(indexWriter);
current.markForClose();
- translog.newTranslog(IndexReader.getCurrentVersion(store.directory()));
+ translog.newTranslog();
} catch (IOException e) {
throw new FlushFailedEngineException(shardId, e);
} finally {
@@ -355,7 +355,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine,
} else {
try {
indexWriter.commit();
- translog.newTranslog(IndexReader.getCurrentVersion(store.directory()));
+ translog.newTranslog();
} catch (IOException e) {
throw new FlushFailedEngineException(shardId, e);
}
diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/CommitPoint.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/CommitPoint.java
new file mode 100644
index 00000000000..74602d42110
--- /dev/null
+++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/CommitPoint.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to Elastic Search and Shay Banon under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Elastic Search licenses this
+ * file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.index.gateway;
+
+import org.elasticsearch.common.collect.ImmutableList;
+
+import java.util.List;
+
+/**
+ * @author kimchy (shay.banon)
+ */
+public class CommitPoint {
+
+ public static class FileInfo {
+ private final String name;
+ private final String physicalName;
+ private final long length;
+
+ public FileInfo(String name, String physicalName, long length) {
+ this.name = name;
+ this.physicalName = physicalName;
+ this.length = length;
+ }
+
+ public String name() {
+ return name;
+ }
+
+ public String physicalName() {
+ return this.physicalName;
+ }
+
+ public long length() {
+ return length;
+ }
+ }
+
+ public static enum Type {
+ GENERATED,
+ SAVED
+ }
+
+ private final long version;
+
+ private final String name;
+
+ private final Type type;
+
+ private final ImmutableList indexFiles;
+
+ private final ImmutableList translogFiles;
+
+ public CommitPoint(long version, String name, Type type, List indexFiles, List translogFiles) {
+ this.version = version;
+ this.name = name;
+ this.type = type;
+ this.indexFiles = ImmutableList.copyOf(indexFiles);
+ this.translogFiles = ImmutableList.copyOf(translogFiles);
+ }
+
+ public long version() {
+ return version;
+ }
+
+ public String name() {
+ return this.name;
+ }
+
+ public Type type() {
+ return this.type;
+ }
+
+ public ImmutableList indexFiles() {
+ return this.indexFiles;
+ }
+
+ public ImmutableList translogFiles() {
+ return this.translogFiles;
+ }
+
+ public boolean containPhysicalIndexFile(String physicalName) {
+ return findPhysicalIndexFile(physicalName) != null;
+ }
+
+ public CommitPoint.FileInfo findPhysicalIndexFile(String physicalName) {
+ for (FileInfo file : indexFiles) {
+ if (file.physicalName().equals(physicalName)) {
+ return file;
+ }
+ }
+ return null;
+ }
+
+ public CommitPoint.FileInfo findNameFile(String name) {
+ CommitPoint.FileInfo fileInfo = findNameIndexFile(name);
+ if (fileInfo != null) {
+ return fileInfo;
+ }
+ return findNameTranslogFile(name);
+ }
+
+ public CommitPoint.FileInfo findNameIndexFile(String name) {
+ for (FileInfo file : indexFiles) {
+ if (file.name().equals(name)) {
+ return file;
+ }
+ }
+ return null;
+ }
+
+ public CommitPoint.FileInfo findNameTranslogFile(String name) {
+ for (FileInfo file : translogFiles) {
+ if (file.name().equals(name)) {
+ return file;
+ }
+ }
+ return null;
+ }
+}
diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/CommitPoints.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/CommitPoints.java
new file mode 100644
index 00000000000..3f2bf3d636f
--- /dev/null
+++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/CommitPoints.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to Elastic Search and Shay Banon under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Elastic Search licenses this
+ * file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.index.gateway;
+
+import org.elasticsearch.common.collect.ImmutableList;
+import org.elasticsearch.common.collect.Lists;
+import org.elasticsearch.common.xcontent.XContentFactory;
+import org.elasticsearch.common.xcontent.XContentParser;
+import org.elasticsearch.common.xcontent.XContentType;
+import org.elasticsearch.common.xcontent.builder.XContentBuilder;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * @author kimchy (shay.banon)
+ */
+public class CommitPoints implements Iterable {
+
+ private final ImmutableList commitPoints;
+
+ public CommitPoints(List commitPoints) {
+ Collections.sort(commitPoints, new Comparator() {
+ @Override public int compare(CommitPoint o1, CommitPoint o2) {
+ return (o2.version() < o1.version() ? -1 : (o2.version() == o1.version() ? 0 : 1));
+ }
+ });
+ this.commitPoints = ImmutableList.copyOf(commitPoints);
+ }
+
+ public ImmutableList commits() {
+ return this.commitPoints;
+ }
+
+ public boolean hasVersion(long version) {
+ for (CommitPoint commitPoint : commitPoints) {
+ if (commitPoint.version() == version) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ public CommitPoint.FileInfo findPhysicalIndexFile(String physicalName) {
+ for (CommitPoint commitPoint : commitPoints) {
+ CommitPoint.FileInfo fileInfo = commitPoint.findPhysicalIndexFile(physicalName);
+ if (fileInfo != null) {
+ return fileInfo;
+ }
+ }
+ return null;
+ }
+
+ public CommitPoint.FileInfo findNameFile(String name) {
+ for (CommitPoint commitPoint : commitPoints) {
+ CommitPoint.FileInfo fileInfo = commitPoint.findNameFile(name);
+ if (fileInfo != null) {
+ return fileInfo;
+ }
+ }
+ return null;
+ }
+
+ @Override public Iterator iterator() {
+ return commitPoints.iterator();
+ }
+
+ public static byte[] toXContent(CommitPoint commitPoint) throws Exception {
+ XContentBuilder builder = XContentFactory.contentBinaryBuilder(XContentType.JSON).prettyPrint();
+ builder.startObject();
+ builder.field("version", commitPoint.version());
+ builder.field("name", commitPoint.name());
+ builder.field("type", commitPoint.type().toString());
+
+ builder.startObject("index_files");
+ for (CommitPoint.FileInfo fileInfo : commitPoint.indexFiles()) {
+ builder.startObject(fileInfo.name());
+ builder.field("physical_name", fileInfo.physicalName());
+ builder.field("length", fileInfo.length());
+ builder.endObject();
+ }
+ builder.endObject();
+
+ builder.startObject("translog_files");
+ for (CommitPoint.FileInfo fileInfo : commitPoint.translogFiles()) {
+ builder.startObject(fileInfo.name());
+ builder.field("physical_name", fileInfo.physicalName());
+ builder.field("length", fileInfo.length());
+ builder.endObject();
+ }
+ builder.endObject();
+
+ builder.endObject();
+ return builder.copiedBytes();
+ }
+
+ public static CommitPoint fromXContent(byte[] data) throws Exception {
+ XContentParser parser = XContentFactory.xContent(XContentType.JSON).createParser(data);
+ try {
+ String currentFieldName = null;
+ XContentParser.Token token = parser.nextToken();
+ if (token == null) {
+ // no data...
+ throw new IOException("No commit point data");
+ }
+ long version = -1;
+ String name = null;
+ CommitPoint.Type type = null;
+ List indexFiles = Lists.newArrayList();
+ List translogFiles = Lists.newArrayList();
+ while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
+ if (token == XContentParser.Token.FIELD_NAME) {
+ currentFieldName = parser.currentName();
+ } else if (token == XContentParser.Token.START_OBJECT) {
+ List files = null;
+ if ("index_files".equals(currentFieldName) || "indexFiles".equals(currentFieldName)) {
+ files = indexFiles;
+ } else if ("translog_files".equals(currentFieldName) || "translogFiles".equals(currentFieldName)) {
+ files = translogFiles;
+ } else {
+ throw new IOException("Can't handle object with name [" + currentFieldName + "]");
+ }
+ while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
+ if (token == XContentParser.Token.FIELD_NAME) {
+ currentFieldName = parser.currentName();
+ } else if (token == XContentParser.Token.START_OBJECT) {
+ String fileName = currentFieldName;
+ String physicalName = null;
+ long size = -1;
+ while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
+ if (token == XContentParser.Token.FIELD_NAME) {
+ currentFieldName = parser.currentName();
+ } else if (token.isValue()) {
+ if ("physical_name".equals(currentFieldName) || "physicalName".equals(currentFieldName)) {
+ physicalName = parser.text();
+ } else if ("length".equals(currentFieldName)) {
+ size = parser.longValue();
+ }
+ }
+ }
+ if (physicalName == null) {
+ throw new IOException("Malformed commit, missing physical_name for [" + fileName + "]");
+ }
+ if (size == -1) {
+ throw new IOException("Malformed commit, missing size for [" + fileName + "]");
+ }
+ files.add(new CommitPoint.FileInfo(fileName, physicalName, size));
+ }
+ }
+ } else if (token.isValue()) {
+ if ("version".equals(currentFieldName)) {
+ version = parser.longValue();
+ } else if ("name".equals(currentFieldName)) {
+ name = parser.text();
+ } else if ("type".equals(currentFieldName)) {
+ type = CommitPoint.Type.valueOf(parser.text());
+ }
+ }
+ }
+
+ if (version == -1) {
+ throw new IOException("Malformed commit, missing version");
+ }
+ if (name == null) {
+ throw new IOException("Malformed commit, missing name");
+ }
+ if (type == null) {
+ throw new IOException("Malformed commit, missing type");
+ }
+
+ return new CommitPoint(version, name, type, indexFiles, translogFiles);
+ } finally {
+ parser.close();
+ }
+ }
+}
diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/IndexShardGatewayService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/IndexShardGatewayService.java
index 91cdcd2bcc4..8f9d456a2f4 100644
--- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/IndexShardGatewayService.java
+++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/IndexShardGatewayService.java
@@ -237,7 +237,7 @@ public class IndexShardGatewayService extends AbstractIndexShardComponent implem
StringBuilder sb = new StringBuilder();
sb.append("snapshot (").append(reason).append(") completed to ").append(shardGateway).append(", took [").append(TimeValue.timeValueMillis(snapshotStatus.time())).append("]\n");
sb.append(" index : version [").append(lastIndexVersion).append("], number_of_files [").append(snapshotStatus.index().numberOfFiles()).append("] with total_size [").append(new ByteSizeValue(snapshotStatus.index().totalSize())).append("], took [").append(TimeValue.timeValueMillis(snapshotStatus.index().time())).append("]\n");
- sb.append(" translog : id [").append(lastTranslogId).append("], number_of_operations [").append(snapshotStatus.translog().currentTranslogOperations()).append("], took [").append(TimeValue.timeValueMillis(snapshotStatus.translog().time())).append("]");
+ sb.append(" translog : id [").append(lastTranslogId).append("], took [").append(TimeValue.timeValueMillis(snapshotStatus.translog().time())).append("]");
logger.debug(sb.toString());
}
}
diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/SnapshotStatus.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/SnapshotStatus.java
index ed8c1ec72a9..59663b86819 100644
--- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/SnapshotStatus.java
+++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/SnapshotStatus.java
@@ -120,8 +120,6 @@ public class SnapshotStatus {
}
public static class Translog {
- private volatile int currentTranslogOperations;
-
private long startTime;
private long time;
@@ -140,13 +138,5 @@ public class SnapshotStatus {
public void time(long time) {
this.time = time;
}
-
- public void addTranslogOperations(long count) {
- this.currentTranslogOperations += count;
- }
-
- public long currentTranslogOperations() {
- return this.currentTranslogOperations;
- }
}
}
diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/blobstore/BlobStoreIndexGateway.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/blobstore/BlobStoreIndexGateway.java
index 5a83d5b6a43..6c400716306 100644
--- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/blobstore/BlobStoreIndexGateway.java
+++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/blobstore/BlobStoreIndexGateway.java
@@ -25,6 +25,7 @@ import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.BlobStore;
import org.elasticsearch.common.blobstore.ImmutableBlobContainer;
import org.elasticsearch.common.collect.ImmutableMap;
+import org.elasticsearch.common.collect.Lists;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.gateway.Gateway;
@@ -32,10 +33,13 @@ import org.elasticsearch.gateway.blobstore.BlobStoreGateway;
import org.elasticsearch.gateway.none.NoneGateway;
import org.elasticsearch.index.AbstractIndexComponent;
import org.elasticsearch.index.Index;
+import org.elasticsearch.index.gateway.CommitPoint;
+import org.elasticsearch.index.gateway.CommitPoints;
import org.elasticsearch.index.gateway.IndexGateway;
import org.elasticsearch.index.settings.IndexSettings;
import java.io.IOException;
+import java.util.List;
/**
* @author kimchy (shay.banon)
@@ -65,9 +69,24 @@ public abstract class BlobStoreIndexGateway extends AbstractIndexComponent imple
this.indexPath = this.gateway.basePath().add("indices").add(index.name());
}
- public ImmutableMap listIndexBlobs(int shardId) throws IOException {
- ImmutableBlobContainer indexContainer = blobStore.immutableBlobContainer(shardIndexPath(shardId));
- return BlobStoreIndexShardGateway.buildVirtualBlobs(indexContainer, indexContainer.listBlobs(), null);
+ public CommitPoint findCommitPoint(int shardId) throws IOException {
+ ImmutableBlobContainer container = blobStore.immutableBlobContainer(shardPath(shardId));
+ ImmutableMap blobs = container.listBlobs();
+ List commitPointsList = Lists.newArrayList();
+ for (String name : blobs.keySet()) {
+ if (name.startsWith("commit-")) {
+ try {
+ commitPointsList.add(CommitPoints.fromXContent(container.readBlobFully(name)));
+ } catch (Exception e) {
+ logger.warn("failed to read commit point [{}]", name);
+ }
+ }
+ }
+ CommitPoints commitPoints = new CommitPoints(commitPointsList);
+ if (commitPoints.commits().isEmpty()) {
+ return null;
+ }
+ return commitPoints.commits().get(0);
}
@Override public String toString() {
@@ -78,10 +97,6 @@ public abstract class BlobStoreIndexGateway extends AbstractIndexComponent imple
return blobStore;
}
- public BlobPath indexPath() {
- return this.indexPath;
- }
-
public ByteSizeValue chunkSize() {
return this.chunkSize;
}
@@ -90,14 +105,6 @@ public abstract class BlobStoreIndexGateway extends AbstractIndexComponent imple
return indexPath.add(Integer.toString(shardId));
}
- public BlobPath shardIndexPath(int shardId) {
- return shardPath(shardId).add("index");
- }
-
- public BlobPath shardTranslogPath(int shardId) {
- return shardPath(shardId).add("translog");
- }
-
@Override public void close(boolean delete) throws ElasticSearchException {
if (delete) {
blobStore.delete(indexPath);
diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/blobstore/BlobStoreIndexShardGateway.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/blobstore/BlobStoreIndexShardGateway.java
index ab520d856d3..d177a33ccec 100644
--- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/blobstore/BlobStoreIndexShardGateway.java
+++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/blobstore/BlobStoreIndexShardGateway.java
@@ -24,22 +24,17 @@ import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput;
import org.elasticsearch.ElasticSearchException;
-import org.elasticsearch.common.Digest;
-import org.elasticsearch.common.Hex;
import org.elasticsearch.common.blobstore.*;
-import org.elasticsearch.common.blobstore.support.PlainBlobMetaData;
import org.elasticsearch.common.collect.ImmutableMap;
-import org.elasticsearch.common.collect.Sets;
+import org.elasticsearch.common.collect.Iterables;
+import org.elasticsearch.common.collect.Lists;
+import org.elasticsearch.common.io.FastByteArrayInputStream;
import org.elasticsearch.common.io.FastByteArrayOutputStream;
import org.elasticsearch.common.io.stream.BytesStreamInput;
-import org.elasticsearch.common.io.stream.BytesStreamOutput;
-import org.elasticsearch.common.io.stream.CachedStreamOutput;
-import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.lucene.store.InputStreamIndexInput;
import org.elasticsearch.common.lucene.store.ThreadSafeInputStreamIndexInput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
-import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit;
import org.elasticsearch.index.gateway.*;
import org.elasticsearch.index.settings.IndexSettings;
@@ -53,16 +48,9 @@ import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogStreams;
import org.elasticsearch.threadpool.ThreadPool;
-import javax.annotation.Nullable;
-import java.io.ByteArrayInputStream;
-import java.io.FileNotFoundException;
import java.io.IOException;
-import java.security.MessageDigest;
-import java.util.ArrayList;
+import java.util.Iterator;
import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
@@ -86,13 +74,7 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo
protected final BlobPath shardPath;
- protected final ImmutableBlobContainer indexContainer;
-
- protected final AppendableBlobContainer translogContainer;
-
- protected final ConcurrentMap cachedMd5 = ConcurrentCollections.newConcurrentMap();
-
- private volatile AppendableBlobContainer.AppendableBlob translogBlob;
+ protected final ImmutableBlobContainer blobContainer;
private volatile RecoveryStatus recoveryStatus;
@@ -114,8 +96,7 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo
this.blobStore = blobStoreIndexGateway.blobStore();
this.shardPath = blobStoreIndexGateway.shardPath(shardId.id());
- this.indexContainer = blobStore.immutableBlobContainer(blobStoreIndexGateway.shardIndexPath(shardId.id()));
- this.translogContainer = blobStore.appendableBlobContainer(blobStoreIndexGateway.shardTranslogPath(shardId.id()));
+ this.blobContainer = blobStore.immutableBlobContainer(shardPath);
this.recoveryStatus = new RecoveryStatus();
}
@@ -133,10 +114,6 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo
}
@Override public void close(boolean delete) throws ElasticSearchException {
- if (translogBlob != null) {
- translogBlob.close();
- translogBlob = null;
- }
if (delete) {
blobStore.delete(shardPath);
}
@@ -182,85 +159,73 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo
}
private void doSnapshot(final Snapshot snapshot) throws IndexShardGatewaySnapshotFailedException {
+ ImmutableMap blobs;
+ try {
+ blobs = blobContainer.listBlobs();
+ } catch (IOException e) {
+ throw new IndexShardGatewaySnapshotFailedException(shardId, "failed to list blobs", e);
+ }
+
+ long generation = findLatestFileNameGeneration(blobs);
+ CommitPoints commitPoints = buildCommitPoints(blobs);
+
currentSnapshotStatus.index().startTime(System.currentTimeMillis());
currentSnapshotStatus.updateStage(SnapshotStatus.Stage.INDEX);
- boolean indexDirty = false;
-
final SnapshotIndexCommit snapshotIndexCommit = snapshot.indexCommit();
final Translog.Snapshot translogSnapshot = snapshot.translogSnapshot();
- ImmutableMap indicesBlobs = null;
- ImmutableMap virtualIndicesBlobs = null;
+ final CountDownLatch indexLatch = new CountDownLatch(snapshotIndexCommit.getFiles().length);
+ final CopyOnWriteArrayList failures = new CopyOnWriteArrayList();
+ final List indexCommitPointFiles = Lists.newArrayList();
int indexNumberOfFiles = 0;
long indexTotalFilesSize = 0;
- if (snapshot.indexChanged()) {
- long time = System.currentTimeMillis();
- indexDirty = true;
-
+ for (final String fileName : snapshotIndexCommit.getFiles()) {
+ StoreFileMetaData storeMetaData;
try {
- indicesBlobs = indexContainer.listBlobs();
+ storeMetaData = store.metaData(fileName);
} catch (IOException e) {
- throw new IndexShardGatewaySnapshotFailedException(shardId, "Failed to list indices files from gateway", e);
+ throw new IndexShardGatewaySnapshotFailedException(shardId, "Failed to get store file metadata", e);
}
- virtualIndicesBlobs = buildVirtualBlobs(indexContainer, indicesBlobs, cachedMd5);
- // snapshot into the index
- final CountDownLatch latch = new CountDownLatch(snapshotIndexCommit.getFiles().length);
- final CopyOnWriteArrayList failures = new CopyOnWriteArrayList();
+ boolean snapshotRequired = false;
+ if (snapshot.indexChanged() && fileName.equals(snapshotIndexCommit.getSegmentsFileName())) {
+ snapshotRequired = true; // we want to always snapshot the segment file if the index changed
+ }
- for (final String fileName : snapshotIndexCommit.getFiles()) {
- StoreFileMetaData snapshotFileMetaData;
- try {
- snapshotFileMetaData = store.metaDataWithMd5(fileName);
- } catch (IOException e) {
- throw new IndexShardGatewaySnapshotFailedException(shardId, "Failed to get store file metadata", e);
- }
- // don't copy over the segments file, it will be copied over later on as part of the
- // final snapshot phase
- if (fileName.equals(snapshotIndexCommit.getSegmentsFileName())) {
- latch.countDown();
- continue;
- }
- // if the file exists in the gateway, and has the same length, don't copy it over
- if (virtualIndicesBlobs.containsKey(fileName) && virtualIndicesBlobs.get(fileName).md5().equals(snapshotFileMetaData.md5())) {
- latch.countDown();
- continue;
- }
-
- // we are snapshotting the file
+ CommitPoint.FileInfo fileInfo = commitPoints.findPhysicalIndexFile(fileName);
+ if (fileInfo == null || fileInfo.length() != storeMetaData.length() || !commitPointFileExistsInBlobs(fileInfo, blobs)) {
+ // commit point file does not exists in any commit point, or has different length, or does not fully exists in the listed blobs
+ snapshotRequired = true;
+ }
+ if (snapshotRequired) {
indexNumberOfFiles++;
- indexTotalFilesSize += snapshotFileMetaData.sizeInBytes();
-
- if (virtualIndicesBlobs.containsKey(fileName)) {
- try {
- cachedMd5.remove(fileName);
- indexContainer.deleteBlobsByPrefix(fileName);
- } catch (IOException e) {
- logger.debug("failed to delete [" + fileName + "] before snapshotting, ignoring...");
- }
- }
-
+ indexTotalFilesSize += storeMetaData.length();
+ // create a new FileInfo
try {
- snapshotFile(snapshotIndexCommit.getDirectory(), snapshotFileMetaData, latch, failures);
+ CommitPoint.FileInfo snapshotFileInfo = new CommitPoint.FileInfo(fileNameFromGeneration(++generation), storeMetaData.name(), storeMetaData.length());
+ indexCommitPointFiles.add(snapshotFileInfo);
+ snapshotFile(snapshotIndexCommit.getDirectory(), snapshotFileInfo, indexLatch, failures);
} catch (IOException e) {
failures.add(e);
- latch.countDown();
+ indexLatch.countDown();
}
+ } else {
+ indexCommitPointFiles.add(fileInfo);
+ indexLatch.countDown();
}
+ }
+ currentSnapshotStatus.index().files(indexNumberOfFiles, indexTotalFilesSize);
- currentSnapshotStatus.index().files(indexNumberOfFiles + 1 /* for the segment */, indexTotalFilesSize);
-
- try {
- latch.await();
- } catch (InterruptedException e) {
- failures.add(e);
- }
- if (!failures.isEmpty()) {
- throw new IndexShardGatewaySnapshotFailedException(shardId(), "Failed to perform snapshot (index files)", failures.get(failures.size() - 1));
- }
+ try {
+ indexLatch.await();
+ } catch (InterruptedException e) {
+ failures.add(e);
+ }
+ if (!failures.isEmpty()) {
+ throw new IndexShardGatewaySnapshotFailedException(shardId(), "Failed to perform snapshot (index files)", failures.get(failures.size() - 1));
}
currentSnapshotStatus.index().time(System.currentTimeMillis() - currentSnapshotStatus.index().startTime());
@@ -268,133 +233,97 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo
currentSnapshotStatus.updateStage(SnapshotStatus.Stage.TRANSLOG);
currentSnapshotStatus.translog().startTime(System.currentTimeMillis());
- if (snapshot.newTranslogCreated() || snapshot.sameTranslogNewOperations()) {
- if (snapshot.newTranslogCreated() && translogBlob != null) {
- translogBlob.close();
- translogBlob = null;
- }
-
- if (translogBlob == null) {
- try {
- translogBlob = translogContainer.appendBlob("translog-" + translogSnapshot.translogId());
- } catch (IOException e) {
- throw new IndexShardGatewaySnapshotFailedException(shardId, "Failed to create translog", e);
- }
- }
-
- final CountDownLatch latch = new CountDownLatch(1);
- final AtomicReference failure = new AtomicReference();
- translogBlob.append(new AppendableBlobContainer.AppendBlobListener() {
- @Override public void withStream(StreamOutput os) throws IOException {
- if (!snapshot.newTranslogCreated()) {
- translogSnapshot.seekForward(snapshot.lastTranslogPosition());
- }
- BytesStreamOutput bout = CachedStreamOutput.cachedBytes();
- while (translogSnapshot.hasNext()) {
- bout.reset();
-
- bout.writeInt(0);
- TranslogStreams.writeTranslogOperation(bout, translogSnapshot.next());
- bout.flush();
-
- int size = bout.size();
- bout.seek(0);
- bout.writeInt(size - 4);
-
- os.writeBytes(bout.unsafeByteArray(), size);
- currentSnapshotStatus.translog().addTranslogOperations(1);
- }
- }
-
- @Override public void onCompleted() {
- latch.countDown();
- }
-
- @Override public void onFailure(Throwable t) {
- failure.set(t);
- latch.countDown();
- }
- });
-
- try {
- latch.await();
- } catch (InterruptedException e) {
- failure.set(e);
- }
-
- if (failure.get() != null) {
- throw new IndexShardGatewaySnapshotFailedException(shardId, "Failed to snapshot translog", failure.get());
- }
-
- }
- currentSnapshotStatus.translog().time(System.currentTimeMillis() - currentSnapshotStatus.translog().startTime());
-
- // now write the segments file
- if (indexDirty) {
- try {
- if (indicesBlobs.containsKey(snapshotIndexCommit.getSegmentsFileName())) {
- cachedMd5.remove(snapshotIndexCommit.getSegmentsFileName());
- indexContainer.deleteBlob(snapshotIndexCommit.getSegmentsFileName());
- }
-
- StoreFileMetaData snapshotFileMetaData = store.metaDataWithMd5(snapshotIndexCommit.getSegmentsFileName());
- indexTotalFilesSize += snapshotFileMetaData.sizeInBytes();
-
- long time = System.currentTimeMillis();
- CountDownLatch latch = new CountDownLatch(1);
- CopyOnWriteArrayList failures = new CopyOnWriteArrayList();
- snapshotFile(snapshotIndexCommit.getDirectory(), snapshotFileMetaData, latch, failures);
- latch.await();
- if (!failures.isEmpty()) {
- throw new IndexShardGatewaySnapshotFailedException(shardId(), "Failed to perform snapshot (segment index file)", failures.get(failures.size() - 1));
- }
- } catch (Exception e) {
- if (e instanceof IndexShardGatewaySnapshotFailedException) {
- throw (IndexShardGatewaySnapshotFailedException) e;
- }
- throw new IndexShardGatewaySnapshotFailedException(shardId(), "Failed to finalize index snapshot into [" + snapshotIndexCommit.getSegmentsFileName() + "]", e);
- }
- }
-
- currentSnapshotStatus.updateStage(SnapshotStatus.Stage.FINALIZE);
-
- // delete the old translog
- if (snapshot.newTranslogCreated()) {
- try {
- translogContainer.deleteBlobsByFilter(new BlobContainer.BlobNameFilter() {
- @Override public boolean accept(String blobName) {
- // delete all the ones that are not this translog
- return !blobName.equals("translog-" + translogSnapshot.translogId());
- }
- });
- } catch (Exception e) {
- // ignore
- }
- // NOT doing this one, the above allows us to clean the translog properly
-// try {
-// translogContainer.deleteBlob("translog-" + snapshot.lastTranslogId());
-// } catch (Exception e) {
-// // ignore
-// }
- }
-
- // delete old index files
- if (indexDirty) {
- for (BlobMetaData md : virtualIndicesBlobs.values()) {
- boolean found = false;
- for (final String fileName : snapshotIndexCommit.getFiles()) {
- if (md.name().equals(fileName)) {
- found = true;
+ // Note, we assume the snapshot is always started from "base 0". We need to seek forward if we want to lastTranslogPosition if we want the delta
+ List translogCommitPointFiles = Lists.newArrayList();
+ boolean snapshotRequired = snapshot.newTranslogCreated();
+ if (!snapshot.newTranslogCreated()) {
+ // if we have a commit point, check that we have all the files listed in it
+ if (!commitPoints.commits().isEmpty()) {
+ CommitPoint commitPoint = commitPoints.commits().get(0);
+ boolean allTranslogFilesExists = true;
+ for (CommitPoint.FileInfo fileInfo : commitPoint.translogFiles()) {
+ if (!commitPointFileExistsInBlobs(fileInfo, blobs)) {
+ allTranslogFilesExists = false;
break;
}
}
- if (!found) {
- try {
- cachedMd5.remove(md.name());
- indexContainer.deleteBlobsByPrefix(md.name());
- } catch (IOException e) {
- logger.debug("failed to delete unused index files, will retry later...", e);
+ // if everything exists, we can seek forward in case there are new operations, otherwise, we copy over all again...
+ if (allTranslogFilesExists) {
+ translogCommitPointFiles.addAll(commitPoint.translogFiles());
+ if (snapshot.sameTranslogNewOperations()) {
+ translogSnapshot.seekForward(snapshot.lastTranslogPosition());
+ snapshotRequired = true;
}
+ } else {
+ snapshotRequired = true;
+ }
+ }
+ }
+
+ if (snapshotRequired) {
+ CommitPoint.FileInfo addedTranslogFileInfo = new CommitPoint.FileInfo(fileNameFromGeneration(++generation), "translog-" + translogSnapshot.translogId(), translogSnapshot.lengthInBytes());
+ translogCommitPointFiles.add(addedTranslogFileInfo);
+ try {
+ snapshotTranslog(translogSnapshot, addedTranslogFileInfo);
+ } catch (Exception e) {
+ throw new IndexShardGatewaySnapshotFailedException(shardId, "Failed to snapshot translog", e);
+ }
+ }
+ currentSnapshotStatus.translog().time(System.currentTimeMillis() - currentSnapshotStatus.translog().startTime());
+
+ // now create and write the commit point
+ currentSnapshotStatus.updateStage(SnapshotStatus.Stage.FINALIZE);
+ long version = 0;
+ if (!commitPoints.commits().isEmpty()) {
+ version = commitPoints.commits().iterator().next().version() + 1;
+ }
+ String commitPointName = "commit-" + Long.toString(version, Character.MAX_RADIX);
+ CommitPoint commitPoint = new CommitPoint(version, commitPointName, CommitPoint.Type.GENERATED, indexCommitPointFiles, translogCommitPointFiles);
+ try {
+ byte[] commitPointData = CommitPoints.toXContent(commitPoint);
+ blobContainer.writeBlob(commitPointName, new FastByteArrayInputStream(commitPointData), commitPointData.length);
+ } catch (Exception e) {
+ throw new IndexShardGatewaySnapshotFailedException(shardId, "Failed to write commit point", e);
+ }
+
+ // delete all files that are not referenced by any commit point
+ // build a new CommitPoint, that includes this one and all the saved ones
+ List newCommitPointsList = Lists.newArrayList();
+ newCommitPointsList.add(commitPoint);
+ for (CommitPoint point : commitPoints) {
+ if (point.type() == CommitPoint.Type.SAVED) {
+ newCommitPointsList.add(point);
+ }
+ }
+ CommitPoints newCommitPoints = new CommitPoints(newCommitPointsList);
+ // first, go over and delete all the commit points
+ for (String blobName : blobs.keySet()) {
+ if (!blobName.startsWith("commit-")) {
+ continue;
+ }
+ long checkedVersion = Long.parseLong(blobName.substring("commit-".length()), Character.MAX_RADIX);
+ if (!newCommitPoints.hasVersion(checkedVersion)) {
+ try {
+ blobContainer.deleteBlob(blobName);
+ } catch (IOException e) {
+ // ignore
+ }
+ }
+ }
+ // now go over all the blobs, and if they don't exists in a commit point, delete them
+ for (String blobName : blobs.keySet()) {
+ String name = blobName;
+ if (name.startsWith("commit-")) {
+ continue;
+ }
+ if (blobName.contains(".part")) {
+ name = blobName.substring(0, blobName.indexOf(".part"));
+ }
+ if (newCommitPoints.findNameFile(name) == null) {
+ try {
+ blobContainer.deleteBlob(blobName);
+ } catch (IOException e) {
+ // ignore, will delete it laters
}
}
}
@@ -403,46 +332,65 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo
@Override public void recover(RecoveryStatus recoveryStatus) throws IndexShardGatewayRecoveryException {
this.recoveryStatus = recoveryStatus;
- recoveryStatus.index().startTime(System.currentTimeMillis());
- recoveryStatus.updateStage(RecoveryStatus.Stage.INDEX);
- recoverIndex();
- recoveryStatus.index().time(System.currentTimeMillis() - recoveryStatus.index().startTime());
+ final ImmutableMap blobs;
+ try {
+ blobs = blobContainer.listBlobs();
+ } catch (IOException e) {
+ throw new IndexShardGatewayRecoveryException(shardId, "Failed to list content of gateway", e);
+ }
- recoveryStatus.translog().startTime(System.currentTimeMillis());
- recoveryStatus.updateStage(RecoveryStatus.Stage.TRANSLOG);
- recoverTranslog();
- recoveryStatus.translog().time(System.currentTimeMillis() - recoveryStatus.index().startTime());
+ CommitPoints commitPoints = buildCommitPoints(blobs);
+ if (commitPoints.commits().isEmpty()) {
+ recoveryStatus.index().startTime(System.currentTimeMillis());
+ recoveryStatus.index().time(System.currentTimeMillis() - recoveryStatus.index().startTime());
+ recoveryStatus.translog().startTime(System.currentTimeMillis());
+ recoveryStatus.translog().time(System.currentTimeMillis() - recoveryStatus.index().startTime());
+ return;
+ }
+
+ for (CommitPoint commitPoint : commitPoints) {
+ if (!commitPointExistsInBlobs(commitPoint, blobs)) {
+ logger.warn("listed commit_point [{}]/[{}], but not all files exists, ignoring", commitPoint.name(), commitPoint.version());
+ continue;
+ }
+ try {
+ recoveryStatus.index().startTime(System.currentTimeMillis());
+ recoveryStatus.updateStage(RecoveryStatus.Stage.INDEX);
+ recoverIndex(commitPoint, blobs);
+ recoveryStatus.index().time(System.currentTimeMillis() - recoveryStatus.index().startTime());
+
+ recoveryStatus.translog().startTime(System.currentTimeMillis());
+ recoveryStatus.updateStage(RecoveryStatus.Stage.TRANSLOG);
+ recoverTranslog(commitPoint, blobs);
+ recoveryStatus.translog().time(System.currentTimeMillis() - recoveryStatus.index().startTime());
+ return;
+ } catch (Exception e) {
+ throw new IndexShardGatewayRecoveryException(shardId, "failed to recover commit_point [" + commitPoint.name() + "]/[" + commitPoint.version() + "]", e);
+ }
+ }
+ throw new IndexShardGatewayRecoveryException(shardId, "No commit point data is available in gateway", null);
}
- private void recoverTranslog() throws IndexShardGatewayRecoveryException {
- long translogId;
- try {
- translogId = IndexReader.getCurrentVersion(store.directory());
- } catch (FileNotFoundException e) {
- // no index, that fine
- indexShard.start();
- return;
- } catch (IOException e) {
- throw new IndexShardGatewayRecoveryException(shardId, "Failed to recover translog, can't read current index version", e);
- }
- if (!translogContainer.blobExists("translog-" + translogId)) {
- // no recovery file found, start the shard and bail
+ private void recoverTranslog(CommitPoint commitPoint, ImmutableMap blobs) throws IndexShardGatewayRecoveryException {
+ if (commitPoint.translogFiles().isEmpty()) {
+ // no translog files, bail
indexShard.start();
return;
}
-
try {
indexShard.performRecoveryPrepareForTranslog();
final AtomicReference failure = new AtomicReference();
final CountDownLatch latch = new CountDownLatch(1);
- translogContainer.readBlob("translog-" + translogId, new BlobContainer.ReadBlobListener() {
+ final Iterator transIt = commitPoint.translogFiles().iterator();
+
+ blobContainer.readBlob(transIt.next().name(), new BlobContainer.ReadBlobListener() {
FastByteArrayOutputStream bos = new FastByteArrayOutputStream();
boolean ignore = false;
- @Override public synchronized void onPartial(byte[] data, int offset, int size) throws IOException {
+ @Override public void onPartial(byte[] data, int offset, int size) throws IOException {
if (ignore) {
return;
}
@@ -493,15 +441,20 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo
}
@Override public synchronized void onCompleted() {
- latch.countDown();
+ if (!transIt.hasNext()) {
+ latch.countDown();
+ return;
+ }
+ blobContainer.readBlob(transIt.next().name(), this);
}
- @Override public synchronized void onFailure(Throwable t) {
+ @Override public void onFailure(Throwable t) {
failure.set(t);
latch.countDown();
}
});
+
latch.await();
if (failure.get() != null) {
throw failure.get();
@@ -513,52 +466,40 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo
}
}
- private void recoverIndex() throws IndexShardGatewayRecoveryException {
- final ImmutableMap indicesBlobs;
- try {
- indicesBlobs = indexContainer.listBlobs();
- } catch (IOException e) {
- throw new IndexShardGatewayRecoveryException(shardId, "Failed to list content of gateway", e);
- }
- ImmutableMap virtualIndicesBlobs = buildVirtualBlobs(indexContainer, indicesBlobs, cachedMd5);
-
+ private void recoverIndex(CommitPoint commitPoint, ImmutableMap blobs) throws Exception {
int numberOfFiles = 0;
long totalSize = 0;
int numberOfExistingFiles = 0;
long existingTotalSize = 0;
- // filter out only the files that we need to recover, and reuse ones that exists in the store
- List filesToRecover = new ArrayList();
- for (BlobMetaData virtualMd : virtualIndicesBlobs.values()) {
- // if the store has the file, and it has the same length, don't recover it
- try {
- StoreFileMetaData storeMd = store.metaDataWithMd5(virtualMd.name());
- if (storeMd != null && storeMd.md5().equals(virtualMd.md5())) {
- numberOfExistingFiles++;
- existingTotalSize += virtualMd.sizeInBytes();
- totalSize += virtualMd.sizeInBytes();
- if (logger.isTraceEnabled()) {
- logger.trace("not_recovering [{}], exists in local store and has same md5 [{}]", virtualMd.name(), virtualMd.md5());
- }
- } else {
- if (logger.isTraceEnabled()) {
- if (storeMd == null) {
- logger.trace("recovering [{}], does not exists in local store", virtualMd.name());
- } else {
- logger.trace("recovering [{}], exists in local store but has different md5: gateway [{}], local [{}]", virtualMd.name(), virtualMd.md5(), storeMd.md5());
- }
- }
- numberOfFiles++;
- totalSize += virtualMd.sizeInBytes();
- filesToRecover.add(virtualMd);
+ List filesToRecover = Lists.newArrayList();
+ for (CommitPoint.FileInfo fileInfo : commitPoint.indexFiles()) {
+ StoreFileMetaData storeFile = store.metaData(fileInfo.physicalName());
+ if (storeFile != null && !storeFile.name().contains("segment") && storeFile.length() == fileInfo.length()) {
+ numberOfExistingFiles++;
+ existingTotalSize += storeFile.length();
+ totalSize += storeFile.length();
+ if (logger.isTraceEnabled()) {
+ logger.trace("not_recovering [{}], exists in local store and has same length [{}]", fileInfo.physicalName(), fileInfo.length());
}
- } catch (Exception e) {
- filesToRecover.add(virtualMd);
- logger.debug("failed to check local store for existence of [{}]", e, virtualMd.name());
+ } else {
+ if (logger.isTraceEnabled()) {
+ if (storeFile == null) {
+ logger.trace("recovering [{}], does not exists in local store", fileInfo.physicalName());
+ } else {
+ logger.trace("recovering [{}], exists in local store but has different length: gateway [{}], local [{}]", fileInfo.physicalName(), fileInfo.length(), storeFile.length());
+ }
+ }
+ numberOfFiles++;
+ totalSize += fileInfo.length();
+ filesToRecover.add(fileInfo);
}
}
recoveryStatus.index().files(numberOfFiles, totalSize, numberOfExistingFiles, existingTotalSize);
+ if (filesToRecover.isEmpty()) {
+ logger.trace("no files to recover, all exists within the local store");
+ }
if (logger.isTraceEnabled()) {
logger.trace("recovering_files [{}] with total_size [{}], reusing_files [{}] with reused_size [{}]", numberOfFiles, new ByteSizeValue(totalSize), numberOfExistingFiles, new ByteSizeValue(existingTotalSize));
@@ -566,8 +507,9 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo
final CountDownLatch latch = new CountDownLatch(filesToRecover.size());
final CopyOnWriteArrayList failures = new CopyOnWriteArrayList();
- for (final BlobMetaData fileToRecover : filesToRecover) {
- recoverFile(fileToRecover, indicesBlobs, latch, failures);
+
+ for (final CommitPoint.FileInfo fileToRecover : filesToRecover) {
+ recoverFile(fileToRecover, blobs, latch, failures);
}
try {
@@ -594,7 +536,7 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo
/// now, go over and clean files that are in the store, but were not in the gateway
try {
for (String storeFile : store.directory().listAll()) {
- if (!virtualIndicesBlobs.containsKey(storeFile)) {
+ if (!commitPoint.containPhysicalIndexFile(storeFile)) {
try {
store.directory().deleteFile(storeFile);
} catch (IOException e) {
@@ -607,42 +549,41 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo
}
}
- private void recoverFile(final BlobMetaData fileToRecover, final ImmutableMap blobs, final CountDownLatch latch, final List failures) {
+ private void recoverFile(final CommitPoint.FileInfo fileInfo, final ImmutableMap blobs, final CountDownLatch latch, final List failures) {
final IndexOutput indexOutput;
try {
- indexOutput = store.directory().createOutput(fileToRecover.name());
+ indexOutput = store.directory().createOutput(fileInfo.physicalName());
} catch (IOException e) {
failures.add(e);
latch.countDown();
return;
}
- String firstFileToRecover = fileToRecover.name();
- if (!blobs.containsKey(fileToRecover.name())) {
+ String firstFileToRecover = fileInfo.name();
+ if (!blobs.containsKey(fileInfo.name())) {
// chunking, append part0 to it
- firstFileToRecover = fileToRecover.name() + ".part0";
+ firstFileToRecover = fileInfo.name() + ".part0";
}
if (!blobs.containsKey(firstFileToRecover)) {
// no file, what to do, what to do?
- logger.warn("no file [{}] to recover, even though it has md5, ignoring it", fileToRecover.name());
+ logger.warn("no file [{}]/[{}] to recover, ignoring it", fileInfo.name(), fileInfo.physicalName());
latch.countDown();
return;
}
final AtomicInteger partIndex = new AtomicInteger();
- final MessageDigest digest = Digest.getMd5Digest();
- indexContainer.readBlob(firstFileToRecover, new BlobContainer.ReadBlobListener() {
+
+ blobContainer.readBlob(firstFileToRecover, new BlobContainer.ReadBlobListener() {
@Override public synchronized void onPartial(byte[] data, int offset, int size) throws IOException {
recoveryStatus.index().addCurrentFilesSize(size);
indexOutput.writeBytes(data, offset, size);
- digest.update(data, offset, size);
}
@Override public synchronized void onCompleted() {
int part = partIndex.incrementAndGet();
- String partName = fileToRecover.name() + ".part" + part;
+ String partName = fileInfo.name() + ".part" + part;
if (blobs.containsKey(partName)) {
// continue with the new part
- indexContainer.readBlob(partName, this);
+ blobContainer.readBlob(partName, this);
return;
} else {
// we are done...
@@ -653,12 +594,6 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo
return;
}
}
- // double check the md5, warn if it does not equal...
- String md5 = Hex.encodeHexString(digest.digest());
- if (!md5.equals(fileToRecover.md5())) {
- logger.warn("file [{}] has different md5, actual read content [{}], store [{}]", fileToRecover.name(), md5, fileToRecover.md5());
- }
-
latch.countDown();
}
@@ -669,13 +604,46 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo
});
}
- private void snapshotFile(Directory dir, final StoreFileMetaData fileMetaData, final CountDownLatch latch, final List failures) throws IOException {
+ private void snapshotTranslog(Translog.Snapshot snapshot, CommitPoint.FileInfo fileInfo) throws IOException {
+ blobContainer.writeBlob(fileInfo.name(), snapshot.stream(), snapshot.lengthInBytes());
+//
+// long chunkBytes = Long.MAX_VALUE;
+// if (chunkSize != null) {
+// chunkBytes = chunkSize.bytes();
+// }
+//
+// long totalLength = fileInfo.length();
+// long numberOfChunks = totalLength / chunkBytes;
+// if (totalLength % chunkBytes > 0) {
+// numberOfChunks++;
+// }
+// if (numberOfChunks == 0) {
+// numberOfChunks++;
+// }
+//
+// if (numberOfChunks == 1) {
+// blobContainer.writeBlob(fileInfo.name(), snapshot.stream(), snapshot.lengthInBytes());
+// } else {
+// InputStream translogStream = snapshot.stream();
+// long totalLengthLeftToWrite = totalLength;
+// for (int i = 0; i < numberOfChunks; i++) {
+// long lengthToWrite = chunkBytes;
+// if (totalLengthLeftToWrite < chunkBytes) {
+// lengthToWrite = totalLengthLeftToWrite;
+// }
+// blobContainer.writeBlob(fileInfo.name() + ".part" + i, new LimitInputStream(translogStream, lengthToWrite), lengthToWrite);
+// totalLengthLeftToWrite -= lengthToWrite;
+// }
+// }
+ }
+
+ private void snapshotFile(Directory dir, final CommitPoint.FileInfo fileInfo, final CountDownLatch latch, final List failures) throws IOException {
long chunkBytes = Long.MAX_VALUE;
if (chunkSize != null) {
chunkBytes = chunkSize.bytes();
}
- long totalLength = fileMetaData.sizeInBytes();
+ long totalLength = fileInfo.length();
long numberOfChunks = totalLength / chunkBytes;
if (totalLength % chunkBytes > 0) {
numberOfChunks++;
@@ -687,22 +655,22 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo
final long fNumberOfChunks = numberOfChunks;
final AtomicLong counter = new AtomicLong(numberOfChunks);
for (long i = 0; i < fNumberOfChunks; i++) {
- final long chunkNumber = i;
+ final long partNumber = i;
IndexInput indexInput = null;
try {
- indexInput = dir.openInput(fileMetaData.name());
- indexInput.seek(chunkNumber * chunkBytes);
+ indexInput = dir.openInput(fileInfo.physicalName());
+ indexInput.seek(partNumber * chunkBytes);
InputStreamIndexInput is = new ThreadSafeInputStreamIndexInput(indexInput, chunkBytes);
- String blobName = fileMetaData.name();
+ String blobName = fileInfo.name();
if (fNumberOfChunks > 1) {
// if we do chunks, then all of them are in the form of "[xxx].part[N]".
- blobName += ".part" + chunkNumber;
+ blobName += ".part" + partNumber;
}
final IndexInput fIndexInput = indexInput;
- indexContainer.writeBlob(blobName, is, is.actualSizeToRead(), new ImmutableBlobContainer.WriterListener() {
+ blobContainer.writeBlob(blobName, is, is.actualSizeToRead(), new ImmutableBlobContainer.WriterListener() {
@Override public void onCompleted() {
try {
fIndexInput.close();
@@ -710,18 +678,7 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo
// ignore
}
if (counter.decrementAndGet() == 0) {
- // now, write the expected md5
- byte[] md5 = Digest.md5HexToByteArray(fileMetaData.md5());
- indexContainer.writeBlob(fileMetaData.name() + ".md5", new ByteArrayInputStream(md5), md5.length, new ImmutableBlobContainer.WriterListener() {
- @Override public void onCompleted() {
- latch.countDown();
- }
-
- @Override public void onFailure(Throwable t) {
- failures.add(t);
- latch.countDown();
- }
- });
+ latch.countDown();
}
}
@@ -751,48 +708,80 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo
}
}
- public static ImmutableMap buildVirtualBlobs(ImmutableBlobContainer container, ImmutableMap blobs, @Nullable Map cachedMd5) {
- // create a set of all the actual files based on .md5 extension
- Set names = Sets.newHashSet();
- for (BlobMetaData blob : blobs.values()) {
- if (blob.name().endsWith(".md5")) {
- names.add(blob.name().substring(0, blob.name().lastIndexOf(".md5")));
+ private void writeCommitPoint(CommitPoint commitPoint) throws Exception {
+ byte[] data = CommitPoints.toXContent(commitPoint);
+ blobContainer.writeBlob("commit-" + commitPoint.version(), new FastByteArrayInputStream(data), data.length);
+ }
+
+ private boolean commitPointExistsInBlobs(CommitPoint commitPoint, ImmutableMap blobs) {
+ for (CommitPoint.FileInfo fileInfo : Iterables.concat(commitPoint.indexFiles(), commitPoint.translogFiles())) {
+ if (!commitPointFileExistsInBlobs(fileInfo, blobs)) {
+ return false;
}
}
- ImmutableMap.Builder builder = ImmutableMap.builder();
- for (String name : names) {
- long sizeInBytes = 0;
- if (blobs.containsKey(name)) {
- // no chunking
- sizeInBytes = blobs.get(name).sizeInBytes();
- } else {
- // chunking...
- int part = 0;
- while (true) {
- BlobMetaData md = blobs.get(name + ".part" + part);
- if (md == null) {
- break;
- }
- sizeInBytes += md.sizeInBytes();
- part++;
+ return true;
+ }
+
+ private boolean commitPointFileExistsInBlobs(CommitPoint.FileInfo fileInfo, ImmutableMap blobs) {
+ BlobMetaData blobMetaData = blobs.get(fileInfo.name());
+ if (blobMetaData != null) {
+ if (blobMetaData.length() != fileInfo.length()) {
+ return false;
+ }
+ } else if (blobs.containsKey(fileInfo.name() + ".part0")) {
+ // multi part file sum up the size and check
+ int part = 0;
+ long totalSize = 0;
+ while (true) {
+ blobMetaData = blobs.get(fileInfo.name() + ".part" + part++);
+ if (blobMetaData == null) {
+ break;
}
+ totalSize += blobMetaData.length();
+ }
+ if (totalSize != fileInfo.length()) {
+ return false;
+ }
+ } else {
+ // no file, not exact and not multipart
+ return false;
+ }
+ return true;
+ }
+
+ private CommitPoints buildCommitPoints(ImmutableMap blobs) {
+ List commitPoints = Lists.newArrayList();
+ for (String name : blobs.keySet()) {
+ if (name.startsWith("commit-")) {
+ try {
+ commitPoints.add(CommitPoints.fromXContent(blobContainer.readBlobFully(name)));
+ } catch (Exception e) {
+ logger.warn("failed to read commit point [{}]", name);
+ }
+ }
+ }
+ return new CommitPoints(commitPoints);
+ }
+
+ private String fileNameFromGeneration(long generation) {
+ return "__" + Long.toString(generation, Character.MAX_RADIX);
+ }
+
+ private long findLatestFileNameGeneration(ImmutableMap blobs) {
+ long generation = -1;
+ for (String name : blobs.keySet()) {
+ if (name.startsWith("commit-")) {
+ continue;
+ }
+ if (name.contains(".part")) {
+ name = name.substring(0, name.indexOf(".part"));
}
- if (cachedMd5 != null && cachedMd5.containsKey(name)) {
- builder.put(name, new PlainBlobMetaData(name, sizeInBytes, cachedMd5.get(name)));
- } else {
- // no md5, get it
- try {
- String md5 = Digest.md5HexFromByteArray(container.readBlobFully(name + ".md5"));
- if (cachedMd5 != null) {
- cachedMd5.put(name, md5);
- }
- builder.put(name, new PlainBlobMetaData(name, sizeInBytes, md5));
- } catch (Exception e) {
- // don't add it!
- }
+ long currentGen = Long.parseLong(name.substring(2) /*__*/, Character.MAX_RADIX);
+ if (currentGen > generation) {
+ generation = currentGen;
}
}
- return builder.build();
+ return generation;
}
}
diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoverySource.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoverySource.java
index 00e77230067..3b541c8cd9b 100644
--- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoverySource.java
+++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoverySource.java
@@ -97,28 +97,28 @@ public class RecoverySource extends AbstractComponent {
StopWatch stopWatch = new StopWatch().start();
for (String name : snapshot.getFiles()) {
- StoreFileMetaData md = shard.store().metaDataWithMd5(name);
+ StoreFileMetaData md = shard.store().metaData(name);
boolean useExisting = false;
- if (request.existingFiles.containsKey(name)) {
- if (md.md5().equals(request.existingFiles.get(name).md5())) {
+ if (request.existingFiles().containsKey(name)) {
+ if (!md.name().contains("segment") && md.length() == request.existingFiles().get(name).length()) {
response.phase1ExistingFileNames.add(name);
- response.phase1ExistingFileSizes.add(md.sizeInBytes());
- existingTotalSize += md.sizeInBytes();
+ response.phase1ExistingFileSizes.add(md.length());
+ existingTotalSize += md.length();
useExisting = true;
if (logger.isTraceEnabled()) {
- logger.trace("[{}][{}] recovery [phase1] to {}: not recovering [{}], exists in local store and has md5 [{}]", request.shardId().index().name(), request.shardId().id(), request.targetNode(), name, md.md5());
+ logger.trace("[{}][{}] recovery [phase1] to {}: not recovering [{}], exists in local store and has size [{}]", request.shardId().index().name(), request.shardId().id(), request.targetNode(), name, md.length());
}
}
}
if (!useExisting) {
- if (request.existingFiles.containsKey(name)) {
- logger.trace("[{}][{}] recovery [phase1] to {}: recovering [{}], exists in local store, but has different md5: remote [{}], local [{}]", request.shardId().index().name(), request.shardId().id(), request.targetNode(), name, request.existingFiles.get(name).md5(), md.md5());
+ if (request.existingFiles().containsKey(name)) {
+ logger.trace("[{}][{}] recovery [phase1] to {}: recovering [{}], exists in local store, but has different length: remote [{}], local [{}]", request.shardId().index().name(), request.shardId().id(), request.targetNode(), name, request.existingFiles().get(name).length(), md.length());
} else {
logger.trace("[{}][{}] recovery [phase1] to {}: recovering [{}], does not exists in remote", request.shardId().index().name(), request.shardId().id(), request.targetNode(), name);
}
response.phase1FileNames.add(name);
- response.phase1FileSizes.add(md.sizeInBytes());
- totalSize += md.sizeInBytes();
+ response.phase1FileSizes.add(md.length());
+ totalSize += md.length();
}
}
response.phase1TotalSize = totalSize;
diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/StartRecoveryRequest.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/StartRecoveryRequest.java
index 97b8c48c90a..c413eacbcf2 100644
--- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/StartRecoveryRequest.java
+++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/StartRecoveryRequest.java
@@ -43,7 +43,7 @@ public class StartRecoveryRequest implements Streamable {
private boolean markAsRelocated;
- Map existingFiles;
+ private Map existingFiles;
StartRecoveryRequest() {
}
diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/IndexStore.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/IndexStore.java
index 94c006f9b69..18453f0cb9a 100644
--- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/IndexStore.java
+++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/IndexStore.java
@@ -91,7 +91,7 @@ public interface IndexStore extends IndexComponent {
public long totalSizeInBytes() {
long totalSizeInBytes = 0;
for (StoreFileMetaData file : this) {
- totalSizeInBytes += file.sizeInBytes();
+ totalSizeInBytes += file.length();
}
return totalSizeInBytes;
}
diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/Store.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/Store.java
index 58bb820f0d4..3619ff22f78 100644
--- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/Store.java
+++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/Store.java
@@ -38,12 +38,8 @@ public interface Store extends IndexShardComponent {
StoreFileMetaData metaData(String name) throws IOException;
- StoreFileMetaData metaDataWithMd5(String name) throws IOException;
-
ImmutableMap list() throws IOException;
- ImmutableMap listWithMd5() throws IOException;
-
/**
* Just deletes the content of the store.
*/
diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/StoreFileMetaData.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/StoreFileMetaData.java
index 5cd745566f0..af98cc613d7 100644
--- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/StoreFileMetaData.java
+++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/StoreFileMetaData.java
@@ -34,18 +34,15 @@ public class StoreFileMetaData implements Streamable {
private long lastModified;
- private long sizeInBytes;
-
- private String md5;
+ private long length;
StoreFileMetaData() {
}
- public StoreFileMetaData(String name, long sizeInBytes, long lastModified, String md5) {
+ public StoreFileMetaData(String name, long length, long lastModified) {
this.name = name;
this.lastModified = lastModified;
- this.sizeInBytes = sizeInBytes;
- this.md5 = md5;
+ this.length = length;
}
public String name() {
@@ -56,12 +53,8 @@ public class StoreFileMetaData implements Streamable {
return this.lastModified;
}
- public long sizeInBytes() {
- return sizeInBytes;
- }
-
- public String md5() {
- return md5;
+ public long length() {
+ return length;
}
public static StoreFileMetaData readStoreFileMetaData(StreamInput in) throws IOException {
@@ -71,25 +64,16 @@ public class StoreFileMetaData implements Streamable {
}
@Override public String toString() {
- return "name[" + name + "], sizeInBytes[" + sizeInBytes + "], md5[" + md5 + "]";
+ return "name [" + name + "], length [" + length + "]";
}
@Override public void readFrom(StreamInput in) throws IOException {
name = in.readUTF();
- sizeInBytes = in.readVLong();
- if (in.readBoolean()) {
- md5 = in.readUTF();
- }
+ length = in.readVLong();
}
@Override public void writeTo(StreamOutput out) throws IOException {
out.writeUTF(name);
- out.writeVLong(sizeInBytes);
- if (md5 == null) {
- out.writeBoolean(false);
- } else {
- out.writeBoolean(true);
- out.writeUTF(md5);
- }
+ out.writeVLong(length);
}
}
diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/fs/FsIndexStore.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/fs/FsIndexStore.java
index 7c82d4097aa..9710283cbb3 100644
--- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/fs/FsIndexStore.java
+++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/fs/FsIndexStore.java
@@ -20,14 +20,11 @@
package org.elasticsearch.index.store.fs;
import org.elasticsearch.ElasticSearchIllegalStateException;
-import org.elasticsearch.common.Digest;
import org.elasticsearch.common.collect.ImmutableMap;
import org.elasticsearch.common.collect.Maps;
import org.elasticsearch.common.io.FileSystemUtils;
-import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
-import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.service.IndexService;
@@ -37,10 +34,8 @@ import org.elasticsearch.index.store.StoreFileMetaData;
import org.elasticsearch.index.store.support.AbstractIndexStore;
import java.io.File;
-import java.io.FileInputStream;
import java.io.IOException;
import java.util.Map;
-import java.util.concurrent.ConcurrentMap;
/**
* @author kimchy (shay.banon)
@@ -49,8 +44,6 @@ public abstract class FsIndexStore extends AbstractIndexStore {
private final File location;
- private final ConcurrentMap> cachedUnallocatedMd5s = ConcurrentCollections.newConcurrentMap();
-
public FsIndexStore(Index index, @IndexSettings Settings indexSettings, IndexService indexService, NodeEnvironment nodeEnv) {
super(index, indexSettings, indexService);
if (nodeEnv.hasNodeFile()) {
@@ -93,7 +86,6 @@ public abstract class FsIndexStore extends AbstractIndexStore {
if (indexService.hasShard(shardId.id())) {
throw new ElasticSearchIllegalStateException(shardId + " allocated, can't be deleted");
}
- cachedUnallocatedMd5s.remove(shardId);
FileSystemUtils.deleteRecursively(shardLocation(shardId));
}
@@ -105,53 +97,13 @@ public abstract class FsIndexStore extends AbstractIndexStore {
if (!shardIndexLocation.exists()) {
return new StoreFilesMetaData(false, shardId, ImmutableMap.of());
}
- ConcurrentMap shardIdCachedMd5s = cachedUnallocatedMd5s.get(shardId);
- if (shardIdCachedMd5s == null) {
- shardIdCachedMd5s = ConcurrentCollections.newConcurrentMap();
- cachedUnallocatedMd5s.put(shardId, shardIdCachedMd5s);
- }
Map files = Maps.newHashMap();
for (File file : shardIndexLocation.listFiles()) {
- // ignore md5 files
- if (file.getName().endsWith(".md5")) {
- continue;
- }
- // calculate md5
- String md5 = shardIdCachedMd5s.get(file.getName());
- if (md5 == null) {
- // first, see if there is an md5 file associated with it
- File md5File = new File(shardIndexLocation, file.getName() + ".md5");
- if (md5File.exists()) {
- try {
- byte[] md5Bytes = Streams.copyToByteArray(md5File);
- md5 = Digest.md5HexFromByteArray(md5Bytes);
- } catch (Exception e) {
- // ignore, compute...
- }
- }
- // not created as a file, compute it
- if (md5 == null) {
- FileInputStream is = new FileInputStream(file);
- try {
- md5 = Digest.md5Hex(is);
- } finally {
- is.close();
- }
- }
-
- if (md5 != null) {
- shardIdCachedMd5s.put(file.getName(), md5);
- }
- }
- files.put(file.getName(), new StoreFileMetaData(file.getName(), file.length(), file.lastModified(), md5));
+ files.put(file.getName(), new StoreFileMetaData(file.getName(), file.length(), file.lastModified()));
}
return new StoreFilesMetaData(false, shardId, files);
}
- ConcurrentMap cachedShardMd5s(ShardId shardId) {
- return cachedUnallocatedMd5s.get(shardId);
- }
-
public File shardLocation(ShardId shardId) {
return new File(location, Integer.toString(shardId.id()));
}
diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/fs/FsStore.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/fs/FsStore.java
index b890590ac20..8e93d848bc7 100644
--- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/fs/FsStore.java
+++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/fs/FsStore.java
@@ -34,7 +34,6 @@ import org.elasticsearch.index.store.support.AbstractStore;
import java.io.File;
import java.io.IOException;
-import java.util.concurrent.ConcurrentMap;
/**
* @author kimchy (shay.banon)
@@ -45,14 +44,6 @@ public abstract class FsStore extends AbstractStore {
super(shardId, indexSettings, indexStore);
}
- @Override protected String preComputedMd5(String fileName) {
- ConcurrentMap shardIdCachedMd5s = ((FsIndexStore) indexStore).cachedShardMd5s(shardId);
- if (shardIdCachedMd5s == null) {
- return null;
- }
- return shardIdCachedMd5s.get(fileName);
- }
-
@Override public void fullDelete() throws IOException {
FileSystemUtils.deleteRecursively(fsDirectory().getFile());
// if we are the last ones, delete also the actual index
diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/support/AbstractIndexStore.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/support/AbstractIndexStore.java
index d6523b0c055..4eb40a244d4 100644
--- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/support/AbstractIndexStore.java
+++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/support/AbstractIndexStore.java
@@ -53,7 +53,7 @@ public abstract class AbstractIndexStore extends AbstractIndexComponent implemen
if (indexShard == null) {
return listUnallocatedStoreMetaData(shardId);
} else {
- return new StoreFilesMetaData(true, shardId, indexShard.store().listWithMd5());
+ return new StoreFilesMetaData(true, shardId, indexShard.store().list());
}
}
diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/support/AbstractStore.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/support/AbstractStore.java
index d8038710495..1eb4472ca5e 100644
--- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/support/AbstractStore.java
+++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/support/AbstractStore.java
@@ -20,13 +20,10 @@
package org.elasticsearch.index.store.support;
import org.apache.lucene.store.*;
-import org.elasticsearch.common.Digest;
-import org.elasticsearch.common.Hex;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.ImmutableMap;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.lucene.Directories;
-import org.elasticsearch.common.lucene.store.InputStreamIndexInput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.index.settings.IndexSettings;
@@ -38,7 +35,6 @@ import org.elasticsearch.index.store.StoreFileMetaData;
import java.io.FileNotFoundException;
import java.io.IOException;
-import java.security.MessageDigest;
import java.util.Map;
/**
@@ -72,44 +68,12 @@ public abstract class AbstractStore extends AbstractIndexShardComponent implemen
return null;
}
// IndexOutput not closed, does not exists
- if (md.lastModified() == -1 || md.sizeInBytes() == -1) {
+ if (md.lastModified() == -1 || md.length() == -1) {
return null;
}
return md;
}
- @Override public StoreFileMetaData metaDataWithMd5(String name) throws IOException {
- StoreFileMetaData md = metaData(name);
- if (md == null) {
- return null;
- }
- if (md.md5() == null) {
- IndexInput in = directory().openInput(name);
- String md5;
- try {
- InputStreamIndexInput is = new InputStreamIndexInput(in, Long.MAX_VALUE);
- md5 = Digest.md5Hex(is);
- } finally {
- in.close();
- }
- synchronized (mutex) {
- md = metaData(name);
- if (md == null) {
- return null;
- }
- if (md.md5() == null) {
- if (shouldWriteMd5(name)) {
- writeMd5File(directory(), name, md5);
- }
-
- md = new StoreFileMetaData(md.name(), md.sizeInBytes(), md.sizeInBytes(), md5);
- filesMetadata = MapBuilder.newMapBuilder(filesMetadata).put(name, md).immutableMap();
- }
- }
- }
- return md;
- }
-
@Override public ImmutableMap list() throws IOException {
ImmutableMap.Builder builder = ImmutableMap.builder();
for (String name : files) {
@@ -121,17 +85,6 @@ public abstract class AbstractStore extends AbstractIndexShardComponent implemen
return builder.build();
}
- @Override public ImmutableMap listWithMd5() throws IOException {
- ImmutableMap.Builder builder = ImmutableMap.builder();
- for (String name : files) {
- StoreFileMetaData md = metaDataWithMd5(name);
- if (md != null) {
- builder.put(md.name(), md);
- }
- }
- return builder.build();
- }
-
@Override public void deleteContent() throws IOException {
Directories.deleteFiles(directory());
}
@@ -155,21 +108,6 @@ public abstract class AbstractStore extends AbstractIndexShardComponent implemen
directory().close();
}
- protected String preComputedMd5(String fileName) {
- return null;
- }
-
- private boolean shouldWriteMd5(String name) {
- return !name.startsWith("segments") && !name.endsWith(".md5");
- }
-
- private void writeMd5File(Directory directory, String file, String md5) throws IOException {
- byte[] md5Bytes = Digest.md5HexToByteArray(md5);
- IndexOutput output = directory.createOutput(file + ".md5");
- output.writeBytes(md5Bytes, md5Bytes.length);
- output.close();
- }
-
/**
* The idea of the store directory is to cache file level meta data, as well as md5 of it
*/
@@ -182,23 +120,7 @@ public abstract class AbstractStore extends AbstractIndexShardComponent implemen
synchronized (mutex) {
MapBuilder builder = MapBuilder.newMapBuilder();
for (String file : delegate.listAll()) {
- if (file.endsWith(".md5")) {
- // md5 are files we create, ignore them
- continue;
- }
- try {
- String md5 = preComputedMd5(file);
-
- if (md5 != null) {
- if (shouldWriteMd5(file)) {
- writeMd5File(delegate, file, md5);
- }
- }
-
- builder.put(file, new StoreFileMetaData(file, delegate.fileLength(file), delegate.fileModified(file), md5));
- } catch (FileNotFoundException e) {
- // ignore
- }
+ builder.put(file, new StoreFileMetaData(file, delegate.fileLength(file), delegate.fileModified(file)));
}
filesMetadata = builder.immutableMap();
files = filesMetadata.keySet().toArray(new String[filesMetadata.size()]);
@@ -230,23 +152,14 @@ public abstract class AbstractStore extends AbstractIndexShardComponent implemen
synchronized (mutex) {
StoreFileMetaData metaData = filesMetadata.get(name);
if (metaData != null) {
- metaData = new StoreFileMetaData(metaData.name(), metaData.sizeInBytes(), delegate.fileModified(name), metaData.md5());
+ metaData = new StoreFileMetaData(metaData.name(), metaData.length(), delegate.fileModified(name));
filesMetadata = MapBuilder.newMapBuilder(filesMetadata).put(name, metaData).immutableMap();
}
}
}
@Override public void deleteFile(String name) throws IOException {
- if (name.endsWith(".md5")) {
- // ignore, this should not really happen...
- return;
- }
delegate.deleteFile(name);
- try {
- delegate.deleteFile(name + ".md5");
- } catch (Exception e) {
- // ignore
- }
synchronized (mutex) {
filesMetadata = MapBuilder.newMapBuilder(filesMetadata).remove(name).immutableMap();
files = filesMetadata.keySet().toArray(new String[filesMetadata.size()]);
@@ -259,8 +172,8 @@ public abstract class AbstractStore extends AbstractIndexShardComponent implemen
throw new FileNotFoundException(name);
}
// not set yet (IndexOutput not closed)
- if (metaData.sizeInBytes() != -1) {
- return metaData.sizeInBytes();
+ if (metaData.length() != -1) {
+ return metaData.length();
}
return delegate.fileLength(name);
}
@@ -268,7 +181,7 @@ public abstract class AbstractStore extends AbstractIndexShardComponent implemen
@Override public IndexOutput createOutput(String name) throws IOException {
IndexOutput out = delegate.createOutput(name);
synchronized (mutex) {
- StoreFileMetaData metaData = new StoreFileMetaData(name, -1, -1, null);
+ StoreFileMetaData metaData = new StoreFileMetaData(name, -1, -1);
filesMetadata = MapBuilder.newMapBuilder(filesMetadata).put(name, metaData).immutableMap();
files = filesMetadata.keySet().toArray(new String[filesMetadata.size()]);
}
@@ -328,31 +241,15 @@ public abstract class AbstractStore extends AbstractIndexShardComponent implemen
private final String name;
- private MessageDigest digest;
-
private StoreIndexOutput(IndexOutput delegate, String name) {
this.delegate = delegate;
this.name = name;
- if (shouldWriteMd5(name)) {
- this.digest = Digest.getMd5Digest();
- } else {
- this.digest = Digest.NULL_DIGEST;
- }
}
@Override public void close() throws IOException {
delegate.close();
synchronized (mutex) {
- StoreFileMetaData md = filesMetadata.get(name);
- String md5 = md == null ? null : md.md5();
- byte[] digestBytes = digest.digest();
- if (digestBytes != null) {
- md5 = Hex.encodeHexString(digestBytes);
- if (shouldWriteMd5(name)) {
- writeMd5File(directory(), name, md5);
- }
- }
- md = new StoreFileMetaData(name, directory().fileLength(name), directory().fileModified(name), md5);
+ StoreFileMetaData md = new StoreFileMetaData(name, directory().fileLength(name), directory().fileModified(name));
filesMetadata = MapBuilder.newMapBuilder(filesMetadata).put(name, md).immutableMap();
files = filesMetadata.keySet().toArray(new String[filesMetadata.size()]);
}
@@ -360,12 +257,10 @@ public abstract class AbstractStore extends AbstractIndexShardComponent implemen
@Override public void writeByte(byte b) throws IOException {
delegate.writeByte(b);
- digest.update(b);
}
@Override public void writeBytes(byte[] b, int offset, int length) throws IOException {
delegate.writeBytes(b, offset, length);
- digest.update(b, offset, length);
}
// don't override it, base class method simple reads from input and writes to this output
@@ -383,8 +278,6 @@ public abstract class AbstractStore extends AbstractIndexShardComponent implemen
@Override public void seek(long pos) throws IOException {
delegate.seek(pos);
- // once we seek, digest is not applicable
- digest = Digest.NULL_DIGEST;
}
@Override public long length() throws IOException {
diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/Translog.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/Translog.java
index 4b73125af23..627e56f92b5 100644
--- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/Translog.java
+++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/Translog.java
@@ -35,6 +35,7 @@ import org.elasticsearch.index.shard.service.IndexShard;
import javax.annotation.Nullable;
import java.io.IOException;
+import java.io.InputStream;
/**
* @author kimchy (shay.banon)
@@ -61,7 +62,7 @@ public interface Translog extends IndexShardComponent {
* Creates a new transaction log internally. Note, users of this class should make
* sure that no operations are performed on the trans log when this is called.
*/
- void newTranslog(long id) throws TranslogException;
+ void newTranslog() throws TranslogException;
/**
* Adds a create operation to the transaction log.
@@ -108,6 +109,16 @@ public interface Translog extends IndexShardComponent {
Operation next();
void seekForward(long length);
+
+ /**
+ * Returns a stream of this snapshot.
+ */
+ InputStream stream() throws IOException;
+
+ /**
+ * The length in bytes of this channel.
+ */
+ long lengthInBytes();
}
/**
diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/fs/FsChannelSnapshot.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/fs/FsChannelSnapshot.java
index 084cdf989e9..f42886c46f6 100644
--- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/fs/FsChannelSnapshot.java
+++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/fs/FsChannelSnapshot.java
@@ -20,12 +20,15 @@
package org.elasticsearch.index.translog.fs;
import org.elasticsearch.ElasticSearchException;
+import org.elasticsearch.common.io.FileChannelInputStream;
import org.elasticsearch.common.io.stream.BytesStreamInput;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogStreams;
import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
@@ -70,6 +73,14 @@ public class FsChannelSnapshot implements Translog.Snapshot {
return this.length;
}
+ @Override public InputStream stream() throws IOException {
+ return new FileChannelInputStream(channel, position, lengthInBytes());
+ }
+
+ @Override public long lengthInBytes() {
+ return length - position;
+ }
+
@Override public boolean hasNext() {
try {
if (position > length) {
diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/fs/FsStreamSnapshot.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/fs/FsStreamSnapshot.java
index fd9e1f4e506..642e93d79f1 100644
--- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/fs/FsStreamSnapshot.java
+++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/fs/FsStreamSnapshot.java
@@ -26,10 +26,7 @@ import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogException;
import org.elasticsearch.index.translog.TranslogStreams;
-import java.io.DataInputStream;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.IOException;
+import java.io.*;
/**
* @author kimchy (shay.banon)
@@ -72,6 +69,14 @@ public class FsStreamSnapshot implements Translog.Snapshot {
return this.length;
}
+ @Override public InputStream stream() throws IOException {
+ return dis;
+ }
+
+ @Override public long lengthInBytes() {
+ return length - position;
+ }
+
@Override public boolean hasNext() {
try {
if (position > length) {
diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/fs/FsTranslog.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/fs/FsTranslog.java
index c51703f6769..43c2a317079 100644
--- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/fs/FsTranslog.java
+++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/fs/FsTranslog.java
@@ -49,7 +49,7 @@ public class FsTranslog extends AbstractIndexShardComponent implements Translog
private final Object mutex = new Object();
- private volatile long id;
+ private volatile long id = 0;
private final AtomicInteger operationCounter = new AtomicInteger();
@@ -87,11 +87,11 @@ public class FsTranslog extends AbstractIndexShardComponent implements Translog
return new ByteSizeValue(0, ByteSizeUnit.BYTES);
}
- @Override public void newTranslog(long id) throws TranslogException {
+ @Override public void newTranslog() throws TranslogException {
synchronized (mutex) {
operationCounter.set(0);
lastPosition = 0;
- this.id = id;
+ this.id = id + 1;
if (raf != null) {
raf.decreaseRefCount();
}
diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/memory/MemorySnapshot.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/memory/MemorySnapshot.java
deleted file mode 100644
index dc1f583d17d..00000000000
--- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/memory/MemorySnapshot.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to Elastic Search and Shay Banon under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. Elastic Search licenses this
- * file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.elasticsearch.index.translog.memory;
-
-import org.elasticsearch.ElasticSearchException;
-import org.elasticsearch.index.translog.Translog;
-
-import java.util.Iterator;
-import java.util.Queue;
-
-/**
- * @author kimchy (shay.banon)
- */
-public class MemorySnapshot implements Translog.Snapshot {
-
- private final long id;
-
- private final Iterator operationsIt;
-
- private final long length;
-
- private long position = 0;
-
- public MemorySnapshot(long id, Queue operations, long length) {
- this.id = id;
- this.operationsIt = operations.iterator();
- this.length = length;
- }
-
- @Override public long translogId() {
- return id;
- }
-
- @Override public boolean release() throws ElasticSearchException {
- return true;
- }
-
- @Override public long length() {
- return length;
- }
-
- @Override public long position() {
- return this.position;
- }
-
- @Override public boolean hasNext() {
- return position < length;
- }
-
- @Override public Translog.Operation next() {
- Translog.Operation operation = operationsIt.next();
- position++;
- return operation;
- }
-
- @Override public void seekForward(long length) {
- long numberToSeek = this.position + length;
- while (numberToSeek-- != 0) {
- operationsIt.next();
- }
- this.position += length;
- }
-}
diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/memory/MemoryTranslog.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/memory/MemoryTranslog.java
deleted file mode 100644
index af0a112e6d1..00000000000
--- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/memory/MemoryTranslog.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to Elastic Search and Shay Banon under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. Elastic Search licenses this
- * file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.elasticsearch.index.translog.memory;
-
-import org.elasticsearch.common.inject.Inject;
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.unit.ByteSizeUnit;
-import org.elasticsearch.common.unit.ByteSizeValue;
-import org.elasticsearch.common.util.concurrent.ThreadSafe;
-import org.elasticsearch.common.util.concurrent.jsr166y.LinkedTransferQueue;
-import org.elasticsearch.index.settings.IndexSettings;
-import org.elasticsearch.index.shard.AbstractIndexShardComponent;
-import org.elasticsearch.index.shard.ShardId;
-import org.elasticsearch.index.translog.Translog;
-import org.elasticsearch.index.translog.TranslogException;
-
-import java.util.Queue;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-
-/**
- * @author kimchy (shay.banon)
- */
-@ThreadSafe
-public class MemoryTranslog extends AbstractIndexShardComponent implements Translog {
-
- private final Object mutex = new Object();
-
- private final AtomicLong estimatedMemorySize = new AtomicLong();
-
- private volatile long id;
-
- private volatile Queue operations;
-
- private final AtomicInteger operationCounter = new AtomicInteger();
-
- @Inject public MemoryTranslog(ShardId shardId, @IndexSettings Settings indexSettings) {
- super(shardId, indexSettings);
- }
-
- @Override public long currentId() {
- return this.id;
- }
-
- @Override public int size() {
- return operationCounter.get();
- }
-
- @Override public ByteSizeValue estimateMemorySize() {
- return new ByteSizeValue(estimatedMemorySize.get(), ByteSizeUnit.BYTES);
- }
-
- @Override public void newTranslog(long id) {
- synchronized (mutex) {
- estimatedMemorySize.set(0);
- operations = new LinkedTransferQueue();
- operationCounter.set(0);
- this.id = id;
- }
- }
-
- @Override public void add(Operation operation) throws TranslogException {
- operations.add(operation);
- operationCounter.incrementAndGet();
- estimatedMemorySize.addAndGet(operation.estimateSize() + 50);
- }
-
- @Override public Snapshot snapshot() {
- synchronized (mutex) {
- return new MemorySnapshot(currentId(), operations, operationCounter.get());
- }
- }
-
- @Override public Snapshot snapshot(Snapshot snapshot) {
- synchronized (mutex) {
- MemorySnapshot memorySnapshot = (MemorySnapshot) snapshot;
- if (currentId() != snapshot.translogId()) {
- return snapshot();
- }
- MemorySnapshot newSnapshot = new MemorySnapshot(currentId(), operations, operationCounter.get());
- newSnapshot.seekForward(memorySnapshot.position());
- return newSnapshot;
- }
- }
-
- @Override public void close() {
- }
-}
diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java
index 575458e81e6..5a3c5392476 100644
--- a/modules/elasticsearch/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java
+++ b/modules/elasticsearch/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java
@@ -324,7 +324,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent indexFiles = Lists.newArrayList();
+ indexFiles.add(new CommitPoint.FileInfo("file1", "file1_p", 100));
+ indexFiles.add(new CommitPoint.FileInfo("file2", "file2_p", 200));
+
+ ArrayList translogFiles = Lists.newArrayList();
+ translogFiles.add(new CommitPoint.FileInfo("t_file1", "t_file1_p", 100));
+ translogFiles.add(new CommitPoint.FileInfo("t_file2", "t_file2_p", 200));
+
+ CommitPoint commitPoint = new CommitPoint(1, "test", CommitPoint.Type.GENERATED, indexFiles, translogFiles);
+
+ byte[] serialized = CommitPoints.toXContent(commitPoint);
+ logger.info("serialized commit_point {}", new String(serialized));
+
+ CommitPoint desCp = CommitPoints.fromXContent(serialized);
+ assertThat(desCp.version(), equalTo(commitPoint.version()));
+ assertThat(desCp.name(), equalTo(commitPoint.name()));
+
+ assertThat(desCp.indexFiles().size(), equalTo(commitPoint.indexFiles().size()));
+ for (int i = 0; i < desCp.indexFiles().size(); i++) {
+ assertThat(desCp.indexFiles().get(i).name(), equalTo(commitPoint.indexFiles().get(i).name()));
+ assertThat(desCp.indexFiles().get(i).physicalName(), equalTo(commitPoint.indexFiles().get(i).physicalName()));
+ assertThat(desCp.indexFiles().get(i).length(), equalTo(commitPoint.indexFiles().get(i).length()));
+ }
+
+ assertThat(desCp.translogFiles().size(), equalTo(commitPoint.translogFiles().size()));
+ for (int i = 0; i < desCp.indexFiles().size(); i++) {
+ assertThat(desCp.translogFiles().get(i).name(), equalTo(commitPoint.translogFiles().get(i).name()));
+ assertThat(desCp.translogFiles().get(i).physicalName(), equalTo(commitPoint.translogFiles().get(i).physicalName()));
+ assertThat(desCp.translogFiles().get(i).length(), equalTo(commitPoint.translogFiles().get(i).length()));
+ }
+ }
+}
diff --git a/modules/elasticsearch/src/test/java/org/elasticsearch/index/shard/SimpleIndexShardTests.java b/modules/elasticsearch/src/test/java/org/elasticsearch/index/shard/SimpleIndexShardTests.java
index cfd4a772acf..c382d910bbb 100644
--- a/modules/elasticsearch/src/test/java/org/elasticsearch/index/shard/SimpleIndexShardTests.java
+++ b/modules/elasticsearch/src/test/java/org/elasticsearch/index/shard/SimpleIndexShardTests.java
@@ -38,13 +38,15 @@ import org.elasticsearch.index.shard.service.InternalIndexShard;
import org.elasticsearch.index.similarity.SimilarityService;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.store.ram.RamStore;
-import org.elasticsearch.index.translog.memory.MemoryTranslog;
+import org.elasticsearch.index.translog.Translog;
+import org.elasticsearch.index.translog.fs.FsTranslog;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.threadpool.scaling.ScalingThreadPool;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
+import java.io.File;
import java.io.IOException;
import static org.elasticsearch.common.settings.ImmutableSettings.Builder.*;
@@ -72,7 +74,7 @@ public class SimpleIndexShardTests {
SnapshotDeletionPolicy policy = new SnapshotDeletionPolicy(new KeepOnlyLastDeletionPolicy(shardId, settings));
Store store = new RamStore(shardId, settings, null);
- MemoryTranslog translog = new MemoryTranslog(shardId, settings);
+ Translog translog = new FsTranslog(shardId, EMPTY_SETTINGS, new File("work/fs-translog"), false);
Engine engine = new RobinEngine(shardId, settings, store, policy, translog,
new LogByteSizeMergePolicyProvider(store), new SerialMergeSchedulerProvider(shardId, settings),
analysisService, new SimilarityService(shardId.index()));
diff --git a/modules/elasticsearch/src/test/java/org/elasticsearch/index/translog/AbstractSimpleTranslogTests.java b/modules/elasticsearch/src/test/java/org/elasticsearch/index/translog/AbstractSimpleTranslogTests.java
index 15ac5ca2840..54deea25542 100644
--- a/modules/elasticsearch/src/test/java/org/elasticsearch/index/translog/AbstractSimpleTranslogTests.java
+++ b/modules/elasticsearch/src/test/java/org/elasticsearch/index/translog/AbstractSimpleTranslogTests.java
@@ -42,7 +42,7 @@ public abstract class AbstractSimpleTranslogTests {
@BeforeMethod public void setUp() {
translog = create();
- translog.newTranslog(0);
+ translog.newTranslog();
}
@AfterMethod public void tearDown() {
@@ -99,7 +99,7 @@ public abstract class AbstractSimpleTranslogTests {
snapshot.release();
long firstId = translog.currentId();
- translog.newTranslog(1);
+ translog.newTranslog();
assertThat(translog.currentId(), Matchers.not(equalTo(firstId)));
snapshot = translog.snapshot();
@@ -152,7 +152,7 @@ public abstract class AbstractSimpleTranslogTests {
translog.add(new Translog.Index("test", "2", new byte[]{2}));
- translog.newTranslog(2);
+ translog.newTranslog();
translog.add(new Translog.Index("test", "3", new byte[]{3}));
diff --git a/modules/elasticsearch/src/test/java/org/elasticsearch/index/translog/memory/MemorySimpleTranslogTests.java b/modules/elasticsearch/src/test/java/org/elasticsearch/index/translog/memory/MemorySimpleTranslogTests.java
deleted file mode 100644
index 78355f4fccc..00000000000
--- a/modules/elasticsearch/src/test/java/org/elasticsearch/index/translog/memory/MemorySimpleTranslogTests.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to Elastic Search and Shay Banon under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. Elastic Search licenses this
- * file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.elasticsearch.index.translog.memory;
-
-import org.elasticsearch.index.translog.AbstractSimpleTranslogTests;
-import org.elasticsearch.index.translog.Translog;
-
-import static org.elasticsearch.common.settings.ImmutableSettings.Builder.*;
-
-/**
- * @author kimchy (shay.banon)
- */
-public class MemorySimpleTranslogTests extends AbstractSimpleTranslogTests {
-
- @Override protected Translog create() {
- return new MemoryTranslog(shardId, EMPTY_SETTINGS);
- }
-}
diff --git a/modules/test/integration/src/test/java/org/elasticsearch/test/integration/gateway/AbstractSimpleIndexGatewayTests.java b/modules/test/integration/src/test/java/org/elasticsearch/test/integration/gateway/AbstractSimpleIndexGatewayTests.java
index 2941e594d05..031d17c8d61 100644
--- a/modules/test/integration/src/test/java/org/elasticsearch/test/integration/gateway/AbstractSimpleIndexGatewayTests.java
+++ b/modules/test/integration/src/test/java/org/elasticsearch/test/integration/gateway/AbstractSimpleIndexGatewayTests.java
@@ -39,7 +39,7 @@ import static org.hamcrest.MatcherAssert.*;
import static org.hamcrest.Matchers.*;
/**
- * @author kimchy (Shay Banon)
+ * @author kimchy (shay.banon)
*/
public abstract class AbstractSimpleIndexGatewayTests extends AbstractNodesTests {
@@ -82,6 +82,11 @@ public abstract class AbstractSimpleIndexGatewayTests extends AbstractNodesTests
client("server1").index(Requests.indexRequest("test").type("type1").id("1").source(source("1", "test"))).actionGet();
logger.info("Indexing #2");
client("server1").index(Requests.indexRequest("test").type("type1").id("2").source(source("2", "test"))).actionGet();
+
+ // perform snapshot to the index
+ logger.info("Gateway Snapshot");
+ client("server1").admin().indices().gatewaySnapshot(gatewaySnapshotRequest("test")).actionGet();
+
logger.info("Deleting #1");
client("server1").delete(deleteRequest("test").type("type1").id("1")).actionGet();
diff --git a/plugins/cloud/aws/src/main/java/org/elasticsearch/cloud/aws/blobstore/AbstarctS3BlobContainer.java b/plugins/cloud/aws/src/main/java/org/elasticsearch/cloud/aws/blobstore/AbstarctS3BlobContainer.java
index 66e7e94a0e4..8f894c36de8 100644
--- a/plugins/cloud/aws/src/main/java/org/elasticsearch/cloud/aws/blobstore/AbstarctS3BlobContainer.java
+++ b/plugins/cloud/aws/src/main/java/org/elasticsearch/cloud/aws/blobstore/AbstarctS3BlobContainer.java
@@ -107,7 +107,7 @@ public class AbstarctS3BlobContainer extends AbstractBlobContainer {
}
for (S3ObjectSummary summary : list.getObjectSummaries()) {
String name = summary.getKey().substring(keyPath.length());
- blobsBuilder.put(name, new PlainBlobMetaData(name, summary.getSize(), null));
+ blobsBuilder.put(name, new PlainBlobMetaData(name, summary.getSize()));
}
if (list.isTruncated()) {
prevListing = list;
diff --git a/plugins/cloud/aws/src/main/java/org/elasticsearch/cloud/aws/blobstore/S3BlobStore.java b/plugins/cloud/aws/src/main/java/org/elasticsearch/cloud/aws/blobstore/S3BlobStore.java
index fd686ce592f..c148ba99871 100644
--- a/plugins/cloud/aws/src/main/java/org/elasticsearch/cloud/aws/blobstore/S3BlobStore.java
+++ b/plugins/cloud/aws/src/main/java/org/elasticsearch/cloud/aws/blobstore/S3BlobStore.java
@@ -22,11 +22,9 @@ package org.elasticsearch.cloud.aws.blobstore;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.ObjectListing;
import com.amazonaws.services.s3.model.S3ObjectSummary;
-import org.elasticsearch.common.blobstore.AppendableBlobContainer;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.BlobStore;
import org.elasticsearch.common.blobstore.ImmutableBlobContainer;
-import org.elasticsearch.common.blobstore.support.ImmutableAppendableBlobContainer;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
@@ -92,10 +90,6 @@ public class S3BlobStore extends AbstractComponent implements BlobStore {
return new S3ImmutableBlobContainer(path, this);
}
- @Override public AppendableBlobContainer appendableBlobContainer(BlobPath path) {
- return new ImmutableAppendableBlobContainer(immutableBlobContainer(path));
- }
-
@Override public void delete(BlobPath path) {
ObjectListing prevListing = null;
while (true) {
diff --git a/plugins/hadoop/src/main/java/org/elasticsearch/common/blobstore/hdfs/AbstractHdfsBlobContainer.java b/plugins/hadoop/src/main/java/org/elasticsearch/common/blobstore/hdfs/AbstractHdfsBlobContainer.java
index a97fce3642b..49ac1f556e2 100644
--- a/plugins/hadoop/src/main/java/org/elasticsearch/common/blobstore/hdfs/AbstractHdfsBlobContainer.java
+++ b/plugins/hadoop/src/main/java/org/elasticsearch/common/blobstore/hdfs/AbstractHdfsBlobContainer.java
@@ -53,7 +53,7 @@ public abstract class AbstractHdfsBlobContainer extends AbstractBlobContainer {
}
ImmutableMap.Builder builder = ImmutableMap.builder();
for (FileStatus file : files) {
- builder.put(file.getPath().getName(), new PlainBlobMetaData(file.getPath().getName(), file.getLen(), null));
+ builder.put(file.getPath().getName(), new PlainBlobMetaData(file.getPath().getName(), file.getLen()));
}
return builder.build();
}
@@ -69,7 +69,7 @@ public abstract class AbstractHdfsBlobContainer extends AbstractBlobContainer {
}
ImmutableMap.Builder builder = ImmutableMap.builder();
for (FileStatus file : files) {
- builder.put(file.getPath().getName(), new PlainBlobMetaData(file.getPath().getName(), file.getLen(), null));
+ builder.put(file.getPath().getName(), new PlainBlobMetaData(file.getPath().getName(), file.getLen()));
}
return builder.build();
}
diff --git a/plugins/hadoop/src/main/java/org/elasticsearch/common/blobstore/hdfs/HdfsAppendableBlobContainer.java b/plugins/hadoop/src/main/java/org/elasticsearch/common/blobstore/hdfs/HdfsAppendableBlobContainer.java
deleted file mode 100644
index f306c25a6ad..00000000000
--- a/plugins/hadoop/src/main/java/org/elasticsearch/common/blobstore/hdfs/HdfsAppendableBlobContainer.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to Elastic Search and Shay Banon under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. Elastic Search licenses this
- * file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.elasticsearch.common.blobstore.hdfs;
-
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.Path;
-import org.elasticsearch.common.blobstore.AppendableBlobContainer;
-import org.elasticsearch.common.blobstore.BlobPath;
-import org.elasticsearch.common.io.stream.DataOutputStreamOutput;
-
-import java.io.IOException;
-
-/**
- * @author kimchy (shay.banon)
- */
-public class HdfsAppendableBlobContainer extends AbstractHdfsBlobContainer implements AppendableBlobContainer {
-
- public HdfsAppendableBlobContainer(HdfsBlobStore blobStore, BlobPath blobPath, Path path) {
- super(blobStore, blobPath, path);
- }
-
- @Override public AppendableBlob appendBlob(String blobName) throws IOException {
- return new HdfsAppendableBlob(new Path(path, blobName));
- }
-
- private class HdfsAppendableBlob implements AppendableBlob {
-
- private final Path file;
-
- private final FSDataOutputStream fsDataStream;
-
- private final DataOutputStreamOutput out;
-
- public HdfsAppendableBlob(Path file) throws IOException {
- this.file = file;
- this.fsDataStream = blobStore.fileSystem().create(file, true);
- this.out = new DataOutputStreamOutput(fsDataStream);
- }
-
- @Override public void append(final AppendBlobListener listener) {
- blobStore.executor().execute(new Runnable() {
- @Override public void run() {
- try {
- listener.withStream(out);
- out.flush();
- fsDataStream.flush();
- fsDataStream.sync();
- listener.onCompleted();
- } catch (IOException e) {
- listener.onFailure(e);
- }
- }
- });
- }
-
- @Override public void close() {
- try {
- fsDataStream.close();
- } catch (IOException e) {
- // ignore
- }
- }
- }
-}
\ No newline at end of file
diff --git a/plugins/hadoop/src/main/java/org/elasticsearch/common/blobstore/hdfs/HdfsBlobStore.java b/plugins/hadoop/src/main/java/org/elasticsearch/common/blobstore/hdfs/HdfsBlobStore.java
index 51a89e64c82..28471403bd4 100644
--- a/plugins/hadoop/src/main/java/org/elasticsearch/common/blobstore/hdfs/HdfsBlobStore.java
+++ b/plugins/hadoop/src/main/java/org/elasticsearch/common/blobstore/hdfs/HdfsBlobStore.java
@@ -21,7 +21,6 @@ package org.elasticsearch.common.blobstore.hdfs;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.elasticsearch.common.blobstore.AppendableBlobContainer;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.BlobStore;
import org.elasticsearch.common.blobstore.ImmutableBlobContainer;
@@ -81,10 +80,6 @@ public class HdfsBlobStore implements BlobStore {
return new HdfsImmutableBlobContainer(this, path, buildAndCreate(path));
}
- @Override public AppendableBlobContainer appendableBlobContainer(BlobPath path) {
- return new HdfsAppendableBlobContainer(this, path, buildAndCreate(path));
- }
-
@Override public void delete(BlobPath path) {
try {
fileSystem.delete(buildPath(path), true);