initial chunk of work on refactoring the gateway, syntatic gateway files names, commit points that include translog information

This commit is contained in:
kimchy 2010-08-23 01:45:03 +03:00
parent 36ad3b246a
commit 1517fa3d28
50 changed files with 971 additions and 1279 deletions

View File

@ -84,6 +84,7 @@
<w>multibinder</w> <w>multibinder</w>
<w>multicast</w> <w>multicast</w>
<w>multiline</w> <w>multiline</w>
<w>multipart</w>
<w>mvel</w> <w>mvel</w>
<w>nanos</w> <w>nanos</w>
<w>newcount</w> <w>newcount</w>

View File

@ -42,10 +42,11 @@ import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.similarity.SimilarityService; import org.elasticsearch.index.similarity.SimilarityService;
import org.elasticsearch.index.store.Store; import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.store.memory.ByteBufferStore; import org.elasticsearch.index.store.memory.ByteBufferStore;
import org.elasticsearch.index.translog.memory.MemoryTranslog; import org.elasticsearch.index.translog.fs.FsTranslog;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.threadpool.scaling.ScalingThreadPool; import org.elasticsearch.threadpool.scaling.ScalingThreadPool;
import java.io.File;
import java.util.concurrent.*; import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
@ -302,7 +303,7 @@ public class SimpleEngineBenchmark {
ThreadPool threadPool = new ScalingThreadPool(); ThreadPool threadPool = new ScalingThreadPool();
SnapshotDeletionPolicy deletionPolicy = new SnapshotDeletionPolicy(new KeepOnlyLastDeletionPolicy(shardId, settings)); SnapshotDeletionPolicy deletionPolicy = new SnapshotDeletionPolicy(new KeepOnlyLastDeletionPolicy(shardId, settings));
Engine engine = new RobinEngine(shardId, settings, store, deletionPolicy, new MemoryTranslog(shardId, settings), new LogByteSizeMergePolicyProvider(store), Engine engine = new RobinEngine(shardId, settings, store, deletionPolicy, new FsTranslog(shardId, EMPTY_SETTINGS, new File("work/fs-translog"), false), new LogByteSizeMergePolicyProvider(store),
new ConcurrentMergeSchedulerProvider(shardId, settings), new AnalysisService(shardId.index()), new SimilarityService(shardId.index())); new ConcurrentMergeSchedulerProvider(shardId, settings), new AnalysisService(shardId.index()), new SimilarityService(shardId.index()));
engine.start(); engine.start();

View File

@ -72,14 +72,11 @@ public class GatewaySnapshotStatus {
final long indexSize; final long indexSize;
final long translogOperations; public GatewaySnapshotStatus(Stage stage, long startTime, long time, long indexSize) {
public GatewaySnapshotStatus(Stage stage, long startTime, long time, long indexSize, long translogOperations) {
this.stage = stage; this.stage = stage;
this.startTime = startTime; this.startTime = startTime;
this.time = time; this.time = time;
this.indexSize = indexSize; this.indexSize = indexSize;
this.translogOperations = translogOperations;
} }
public Stage stage() { public Stage stage() {
@ -113,12 +110,4 @@ public class GatewaySnapshotStatus {
public ByteSizeValue getIndexSize() { public ByteSizeValue getIndexSize() {
return indexSize(); return indexSize();
} }
public long translogOperations() {
return this.translogOperations;
}
public long getTranslogOperations() {
return translogOperations();
}
} }

View File

@ -254,7 +254,6 @@ public class ShardStatus extends BroadcastShardOperationResponse {
out.writeVLong(gatewaySnapshotStatus.startTime); out.writeVLong(gatewaySnapshotStatus.startTime);
out.writeVLong(gatewaySnapshotStatus.time); out.writeVLong(gatewaySnapshotStatus.time);
out.writeVLong(gatewaySnapshotStatus.indexSize); out.writeVLong(gatewaySnapshotStatus.indexSize);
out.writeVLong(gatewaySnapshotStatus.translogOperations);
} }
} }
@ -285,7 +284,7 @@ public class ShardStatus extends BroadcastShardOperationResponse {
if (in.readBoolean()) { if (in.readBoolean()) {
gatewaySnapshotStatus = new GatewaySnapshotStatus(GatewaySnapshotStatus.Stage.fromValue(in.readByte()), gatewaySnapshotStatus = new GatewaySnapshotStatus(GatewaySnapshotStatus.Stage.fromValue(in.readByte()),
in.readVLong(), in.readVLong(), in.readVLong(), in.readVLong()); in.readVLong(), in.readVLong(), in.readVLong());
} }
} }
} }

View File

@ -239,7 +239,7 @@ public class TransportIndicesStatusAction extends TransportBroadcastOperationAct
break; break;
} }
shardStatus.gatewaySnapshotStatus = new GatewaySnapshotStatus(stage, snapshotStatus.startTime(), snapshotStatus.time(), shardStatus.gatewaySnapshotStatus = new GatewaySnapshotStatus(stage, snapshotStatus.startTime(), snapshotStatus.time(),
snapshotStatus.index().totalSize(), snapshotStatus.translog().currentTranslogOperations()); snapshotStatus.index().totalSize());
} }
return shardStatus; return shardStatus;

View File

@ -27,12 +27,11 @@ import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.MutableShardRouting; import org.elasticsearch.cluster.routing.MutableShardRouting;
import org.elasticsearch.cluster.routing.RoutingNode; import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RoutingNodes; import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.common.blobstore.BlobMetaData;
import org.elasticsearch.common.collect.ImmutableMap;
import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.index.gateway.CommitPoint;
import org.elasticsearch.index.gateway.blobstore.BlobStoreIndexGateway; import org.elasticsearch.index.gateway.blobstore.BlobStoreIndexGateway;
import org.elasticsearch.index.service.InternalIndexService; import org.elasticsearch.index.service.InternalIndexService;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
@ -155,29 +154,29 @@ public class PreferUnallocatedShardUnassignedStrategy extends AbstractComponent
if (shard.primary() && indexService.gateway() instanceof BlobStoreIndexGateway) { if (shard.primary() && indexService.gateway() instanceof BlobStoreIndexGateway) {
BlobStoreIndexGateway indexGateway = (BlobStoreIndexGateway) indexService.gateway(); BlobStoreIndexGateway indexGateway = (BlobStoreIndexGateway) indexService.gateway();
try { try {
ImmutableMap<String, BlobMetaData> indexBlobsMetaData = indexGateway.listIndexBlobs(shard.id()); CommitPoint commitPoint = indexGateway.findCommitPoint(shard.id());
if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {
StringBuilder sb = new StringBuilder(shard + ": checking for pre_allocation (gateway) on node " + discoNode + "\n"); StringBuilder sb = new StringBuilder(shard + ": checking for pre_allocation (gateway) on node " + discoNode + "\n");
sb.append(" gateway_files:\n"); sb.append(" gateway_files:\n");
for (BlobMetaData md : indexBlobsMetaData.values()) { for (CommitPoint.FileInfo fileInfo : commitPoint.indexFiles()) {
sb.append(" [").append(md.name()).append("], size [").append(new ByteSizeValue(md.sizeInBytes())).append("], md5 [").append(md.md5()).append("]\n"); sb.append(" [").append(fileInfo.name()).append("]/[" + fileInfo.physicalName() + "], size [").append(new ByteSizeValue(fileInfo.length())).append("]\n");
} }
sb.append(" node_files:\n"); sb.append(" node_files:\n");
for (StoreFileMetaData md : storeFilesMetaData) { for (StoreFileMetaData md : storeFilesMetaData) {
sb.append(" [").append(md.name()).append("], size [").append(new ByteSizeValue(md.sizeInBytes())).append("], md5 [").append(md.md5()).append("]\n"); sb.append(" [").append(md.name()).append("], size [").append(new ByteSizeValue(md.length())).append("]\n");
} }
logger.debug(sb.toString()); logger.debug(sb.toString());
} }
logger.trace("{}: checking for pre_allocation (gateway) on node [{}]\n gateway files", shard, discoNode, indexBlobsMetaData.keySet());
long sizeMatched = 0; long sizeMatched = 0;
for (StoreFileMetaData storeFileMetaData : storeFilesMetaData) { for (StoreFileMetaData storeFileMetaData : storeFilesMetaData) {
if (indexBlobsMetaData.containsKey(storeFileMetaData.name())) { CommitPoint.FileInfo fileInfo = commitPoint.findPhysicalIndexFile(storeFileMetaData.name());
if (indexBlobsMetaData.get(storeFileMetaData.name()).md5().equals(storeFileMetaData.md5())) { if (fileInfo != null) {
logger.trace("{}: [{}] reusing file since it exists on remote node and on gateway (same md5) with size [{}]", shard, storeFileMetaData.name(), new ByteSizeValue(storeFileMetaData.sizeInBytes())); if (fileInfo.length() == storeFileMetaData.length()) {
sizeMatched += storeFileMetaData.sizeInBytes(); logger.trace("{}: [{}] reusing file since it exists on remote node and on gateway with size [{}]", shard, storeFileMetaData.name(), new ByteSizeValue(storeFileMetaData.length()));
sizeMatched += storeFileMetaData.length();
} else { } else {
logger.trace("{}: [{}] ignore file since it exists on remote node and on gateway but has different md5, remote node [{}], gateway [{}]", shard, storeFileMetaData.name(), storeFileMetaData.md5(), indexBlobsMetaData.get(storeFileMetaData.name()).md5()); logger.trace("{}: [{}] ignore file since it exists on remote node and on gateway but has different size, remote node [{}], gateway [{}]", shard, storeFileMetaData.name(), storeFileMetaData.length(), fileInfo.length());
} }
} else { } else {
logger.trace("{}: [{}] exists on remote node, does not exists on gateway", shard, storeFileMetaData.name()); logger.trace("{}: [{}] exists on remote node, does not exists on gateway", shard, storeFileMetaData.name());
@ -210,8 +209,8 @@ public class PreferUnallocatedShardUnassignedStrategy extends AbstractComponent
IndexStore.StoreFilesMetaData primaryStoreFilesMetaData = primaryNodeStoreFileMetaData.storeFilesMetaData(); IndexStore.StoreFilesMetaData primaryStoreFilesMetaData = primaryNodeStoreFileMetaData.storeFilesMetaData();
for (StoreFileMetaData storeFileMetaData : storeFilesMetaData) { for (StoreFileMetaData storeFileMetaData : storeFilesMetaData) {
if (primaryStoreFilesMetaData.fileExists(storeFileMetaData.name()) && primaryStoreFilesMetaData.file(storeFileMetaData.name()).sizeInBytes() == storeFileMetaData.sizeInBytes()) { if (primaryStoreFilesMetaData.fileExists(storeFileMetaData.name()) && primaryStoreFilesMetaData.file(storeFileMetaData.name()).length() == storeFileMetaData.length()) {
sizeMatched += storeFileMetaData.sizeInBytes(); sizeMatched += storeFileMetaData.length();
} }
} }
if (sizeMatched > lastSizeMatched) { if (sizeMatched > lastSizeMatched) {

View File

@ -1,47 +0,0 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.blobstore;
import org.elasticsearch.common.io.stream.StreamOutput;
import java.io.IOException;
/**
* @author kimchy (shay.banon)
*/
public interface AppendableBlobContainer extends BlobContainer {
interface AppendBlobListener {
void withStream(StreamOutput os) throws IOException;
void onCompleted();
void onFailure(Throwable t);
}
interface AppendableBlob {
void append(AppendBlobListener listener);
void close();
}
AppendableBlob appendBlob(String blobName) throws IOException;
}

View File

@ -26,10 +26,5 @@ public interface BlobMetaData {
String name(); String name();
long sizeInBytes(); long length();
/**
* If the blob store support native md5 checksum, return it. Can be <tt>null</tt>.
*/
String md5();
} }

View File

@ -7,8 +7,6 @@ public interface BlobStore {
ImmutableBlobContainer immutableBlobContainer(BlobPath path); ImmutableBlobContainer immutableBlobContainer(BlobPath path);
AppendableBlobContainer appendableBlobContainer(BlobPath path);
void delete(BlobPath path); void delete(BlobPath path);
void close(); void close();

View File

@ -52,7 +52,7 @@ public abstract class AbstractFsBlobContainer extends AbstractBlobContainer {
} }
ImmutableMap.Builder<String, BlobMetaData> builder = ImmutableMap.builder(); ImmutableMap.Builder<String, BlobMetaData> builder = ImmutableMap.builder();
for (File file : files) { for (File file : files) {
builder.put(file.getName(), new PlainBlobMetaData(file.getName(), file.length(), null)); builder.put(file.getName(), new PlainBlobMetaData(file.getName(), file.length()));
} }
return builder.build(); return builder.build();
} }

View File

@ -1,82 +0,0 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.blobstore.fs;
import org.elasticsearch.common.blobstore.AppendableBlobContainer;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.io.FileSystemUtils;
import org.elasticsearch.common.io.stream.DataOutputStreamOutput;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
/**
* @author kimchy (shay.banon)
*/
public class FsAppendableBlobContainer extends AbstractFsBlobContainer implements AppendableBlobContainer {
public FsAppendableBlobContainer(FsBlobStore blobStore, BlobPath blobPath, File path) {
super(blobStore, blobPath, path);
}
@Override public AppendableBlob appendBlob(String blobName) throws IOException {
return new FsAppendableBlob(new File(path, blobName));
}
private class FsAppendableBlob implements AppendableBlob {
private final File file;
public FsAppendableBlob(File file) throws IOException {
this.file = file;
}
@Override public void append(final AppendBlobListener listener) {
blobStore.executor().execute(new Runnable() {
@Override public void run() {
RandomAccessFile raf = null;
try {
raf = new RandomAccessFile(file, "rw");
raf.seek(raf.length());
listener.withStream(new DataOutputStreamOutput(raf));
raf.close();
FileSystemUtils.syncFile(file);
listener.onCompleted();
} catch (IOException e) {
listener.onFailure(e);
} finally {
if (raf != null) {
try {
raf.close();
} catch (IOException e) {
// ignore
}
}
}
}
});
}
@Override public void close() {
// nothing to do there
}
}
}

View File

@ -19,7 +19,10 @@
package org.elasticsearch.common.blobstore.fs; package org.elasticsearch.common.blobstore.fs;
import org.elasticsearch.common.blobstore.*; import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.BlobStore;
import org.elasticsearch.common.blobstore.BlobStoreException;
import org.elasticsearch.common.blobstore.ImmutableBlobContainer;
import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.io.FileSystemUtils; import org.elasticsearch.common.io.FileSystemUtils;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
@ -76,10 +79,6 @@ public class FsBlobStore extends AbstractComponent implements BlobStore {
return new FsImmutableBlobContainer(this, path, buildAndCreate(path)); return new FsImmutableBlobContainer(this, path, buildAndCreate(path));
} }
@Override public AppendableBlobContainer appendableBlobContainer(BlobPath path) {
return new FsAppendableBlobContainer(this, path, buildAndCreate(path));
}
@Override public void delete(BlobPath path) { @Override public void delete(BlobPath path) {
FileSystemUtils.deleteRecursively(buildPath(path)); FileSystemUtils.deleteRecursively(buildPath(path));
} }

View File

@ -1,166 +0,0 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.blobstore.support;
import org.elasticsearch.common.blobstore.AppendableBlobContainer;
import org.elasticsearch.common.blobstore.BlobMetaData;
import org.elasticsearch.common.blobstore.ImmutableBlobContainer;
import org.elasticsearch.common.collect.ImmutableMap;
import org.elasticsearch.common.collect.Sets;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.util.Set;
/**
* An appendable container that uses an immutable container to implement an appendable one.
*
* @author kimchy (shay.banon)
*/
public class ImmutableAppendableBlobContainer extends AbstractBlobContainer implements AppendableBlobContainer {
private final ImmutableBlobContainer container;
public ImmutableAppendableBlobContainer(ImmutableBlobContainer container) {
super(container.path());
this.container = container;
}
@Override public AppendableBlob appendBlob(final String blobName) throws IOException {
return new AppendableBlob() {
int part = 0;
@Override public void append(final AppendBlobListener listener) {
BytesStreamOutput out = new BytesStreamOutput();
try {
listener.withStream(out);
} catch (Exception e) {
listener.onFailure(e);
return;
}
if (out.size() == 0) {
// nothing to write, bail
listener.onCompleted();
return;
}
String partBlobName = blobName + ".a" + (part++);
// use teh sync one
ByteArrayInputStream is = new ByteArrayInputStream(out.unsafeByteArray(), 0, out.size());
container.writeBlob(partBlobName, is, out.size(), new ImmutableBlobContainer.WriterListener() {
@Override public void onCompleted() {
listener.onCompleted();
}
@Override public void onFailure(Throwable t) {
listener.onFailure(t);
}
});
}
@Override public void close() {
}
};
}
@Override public void readBlob(final String blobName, final ReadBlobListener listener) {
container.readBlob(blobName + ".a0", new ReadBlobListener() {
int part = 0;
@Override public void onPartial(byte[] data, int offset, int size) throws IOException {
listener.onPartial(data, offset, size);
}
@Override public void onCompleted() {
part++;
if (container.blobExists(blobName + ".a" + part)) {
container.readBlob(blobName + ".a" + part, this);
} else {
listener.onCompleted();
}
}
@Override public void onFailure(Throwable t) {
listener.onFailure(t);
}
});
}
@Override public ImmutableMap<String, BlobMetaData> listBlobs() throws IOException {
return buildVirtualBlobs(container.listBlobs());
}
@Override public ImmutableMap<String, BlobMetaData> listBlobsByPrefix(String blobNamePrefix) throws IOException {
return buildVirtualBlobs(container.listBlobsByPrefix(blobNamePrefix));
}
@Override public boolean blobExists(String blobName) {
return container.blobExists(blobName + ".a0");
}
@Override public boolean deleteBlob(String blobName) throws IOException {
container.deleteBlobsByPrefix(blobName + ".");
return true;
}
@Override public void deleteBlobsByFilter(BlobNameFilter filter) throws IOException {
ImmutableMap<String, BlobMetaData> blobs = buildVirtualBlobs(container.listBlobs());
for (String blobName : blobs.keySet()) {
if (filter.accept(blobName)) {
deleteBlob(blobName);
}
}
}
@Override public void deleteBlobsByPrefix(String blobNamePrefix) throws IOException {
container.deleteBlobsByPrefix(blobNamePrefix);
}
private ImmutableMap<String, BlobMetaData> buildVirtualBlobs(ImmutableMap<String, BlobMetaData> blobs) {
Set<String> names = Sets.newHashSet();
for (BlobMetaData blob : blobs.values()) {
if (blob.name().endsWith(".a0")) {
names.add(blob.name().substring(0, blob.name().lastIndexOf(".a0")));
}
}
ImmutableMap.Builder<String, BlobMetaData> builder = ImmutableMap.builder();
for (String name : names) {
long sizeInBytes = 0;
if (blobs.containsKey(name)) {
// no chunking
sizeInBytes = blobs.get(name).sizeInBytes();
} else {
// chunking...
int part = 0;
while (true) {
BlobMetaData md = blobs.get(name + ".a" + part);
if (md == null) {
break;
}
sizeInBytes += md.sizeInBytes();
part++;
}
}
builder.put(name, new PlainBlobMetaData(name, sizeInBytes, null));
}
return builder.build();
}
}

View File

@ -28,29 +28,22 @@ public class PlainBlobMetaData implements BlobMetaData {
private final String name; private final String name;
private final long sizeInBytes; private final long length;
private final String md5; public PlainBlobMetaData(String name, long length) {
public PlainBlobMetaData(String name, long sizeInBytes, String md5) {
this.name = name; this.name = name;
this.sizeInBytes = sizeInBytes; this.length = length;
this.md5 = md5;
} }
@Override public String name() { @Override public String name() {
return this.name; return this.name;
} }
@Override public long sizeInBytes() { @Override public long length() {
return this.sizeInBytes; return this.length;
}
@Override public String md5() {
return this.md5;
} }
@Override public String toString() { @Override public String toString() {
return "name[" + name + "], sizeInBytes[" + sizeInBytes + "], md5[" + md5 + "]"; return "name [" + name + "], length [" + length + "]";
} }
} }

View File

@ -0,0 +1,89 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.io;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
/**
* @author kimchy (shay.banon)
*/
public class FileChannelInputStream extends InputStream {
private final FileChannel channel;
private long position;
private long length;
private ByteBuffer bb = null;
private byte[] bs = null; // Invoker's previous array
private byte[] b1 = null;
/**
* @param channel The channel to read from
* @param position The position to start reading from
* @param length The length to read
*/
public FileChannelInputStream(FileChannel channel, long position, long length) {
this.channel = channel;
this.position = position;
this.length = position + length; // easier to work with total length
}
@Override public int read() throws IOException {
if (b1 == null) {
b1 = new byte[1];
}
int n = read(b1);
if (n == 1) {
return b1[0] & 0xff;
}
return -1;
}
@Override public int read(byte[] bs, int off, int len) throws IOException {
if (len == 0) {
return 0;
}
if ((length - position) < len) {
len = (int) (length - position);
}
if (len == 0) {
return -1;
}
ByteBuffer bb = ((this.bs == bs) ? this.bb : ByteBuffer.wrap(bs));
bb.limit(Math.min(off + len, bb.capacity()));
bb.position(off);
this.bb = bb;
this.bs = bs;
int read = channel.read(bb, position);
if (read > 0) {
position += read;
}
return read;
}
}

View File

@ -154,7 +154,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine,
} }
try { try {
translog.newTranslog(IndexReader.getCurrentVersion(store.directory())); translog.newTranslog();
this.nrtResource = buildNrtResource(indexWriter); this.nrtResource = buildNrtResource(indexWriter);
} catch (IOException e) { } catch (IOException e) {
try { try {
@ -346,7 +346,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine,
AcquirableResource<ReaderSearcherHolder> current = nrtResource; AcquirableResource<ReaderSearcherHolder> current = nrtResource;
nrtResource = buildNrtResource(indexWriter); nrtResource = buildNrtResource(indexWriter);
current.markForClose(); current.markForClose();
translog.newTranslog(IndexReader.getCurrentVersion(store.directory())); translog.newTranslog();
} catch (IOException e) { } catch (IOException e) {
throw new FlushFailedEngineException(shardId, e); throw new FlushFailedEngineException(shardId, e);
} finally { } finally {
@ -355,7 +355,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine,
} else { } else {
try { try {
indexWriter.commit(); indexWriter.commit();
translog.newTranslog(IndexReader.getCurrentVersion(store.directory())); translog.newTranslog();
} catch (IOException e) { } catch (IOException e) {
throw new FlushFailedEngineException(shardId, e); throw new FlushFailedEngineException(shardId, e);
} }

View File

@ -0,0 +1,136 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.gateway;
import org.elasticsearch.common.collect.ImmutableList;
import java.util.List;
/**
* @author kimchy (shay.banon)
*/
public class CommitPoint {
public static class FileInfo {
private final String name;
private final String physicalName;
private final long length;
public FileInfo(String name, String physicalName, long length) {
this.name = name;
this.physicalName = physicalName;
this.length = length;
}
public String name() {
return name;
}
public String physicalName() {
return this.physicalName;
}
public long length() {
return length;
}
}
public static enum Type {
GENERATED,
SAVED
}
private final long version;
private final String name;
private final Type type;
private final ImmutableList<FileInfo> indexFiles;
private final ImmutableList<FileInfo> translogFiles;
public CommitPoint(long version, String name, Type type, List<FileInfo> indexFiles, List<FileInfo> translogFiles) {
this.version = version;
this.name = name;
this.type = type;
this.indexFiles = ImmutableList.copyOf(indexFiles);
this.translogFiles = ImmutableList.copyOf(translogFiles);
}
public long version() {
return version;
}
public String name() {
return this.name;
}
public Type type() {
return this.type;
}
public ImmutableList<FileInfo> indexFiles() {
return this.indexFiles;
}
public ImmutableList<FileInfo> translogFiles() {
return this.translogFiles;
}
public boolean containPhysicalIndexFile(String physicalName) {
return findPhysicalIndexFile(physicalName) != null;
}
public CommitPoint.FileInfo findPhysicalIndexFile(String physicalName) {
for (FileInfo file : indexFiles) {
if (file.physicalName().equals(physicalName)) {
return file;
}
}
return null;
}
public CommitPoint.FileInfo findNameFile(String name) {
CommitPoint.FileInfo fileInfo = findNameIndexFile(name);
if (fileInfo != null) {
return fileInfo;
}
return findNameTranslogFile(name);
}
public CommitPoint.FileInfo findNameIndexFile(String name) {
for (FileInfo file : indexFiles) {
if (file.name().equals(name)) {
return file;
}
}
return null;
}
public CommitPoint.FileInfo findNameTranslogFile(String name) {
for (FileInfo file : translogFiles) {
if (file.name().equals(name)) {
return file;
}
}
return null;
}
}

View File

@ -0,0 +1,196 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.gateway;
import org.elasticsearch.common.collect.ImmutableList;
import org.elasticsearch.common.collect.Lists;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.builder.XContentBuilder;
import java.io.IOException;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
/**
* @author kimchy (shay.banon)
*/
public class CommitPoints implements Iterable<CommitPoint> {
private final ImmutableList<CommitPoint> commitPoints;
public CommitPoints(List<CommitPoint> commitPoints) {
Collections.sort(commitPoints, new Comparator<CommitPoint>() {
@Override public int compare(CommitPoint o1, CommitPoint o2) {
return (o2.version() < o1.version() ? -1 : (o2.version() == o1.version() ? 0 : 1));
}
});
this.commitPoints = ImmutableList.copyOf(commitPoints);
}
public ImmutableList<CommitPoint> commits() {
return this.commitPoints;
}
public boolean hasVersion(long version) {
for (CommitPoint commitPoint : commitPoints) {
if (commitPoint.version() == version) {
return true;
}
}
return false;
}
public CommitPoint.FileInfo findPhysicalIndexFile(String physicalName) {
for (CommitPoint commitPoint : commitPoints) {
CommitPoint.FileInfo fileInfo = commitPoint.findPhysicalIndexFile(physicalName);
if (fileInfo != null) {
return fileInfo;
}
}
return null;
}
public CommitPoint.FileInfo findNameFile(String name) {
for (CommitPoint commitPoint : commitPoints) {
CommitPoint.FileInfo fileInfo = commitPoint.findNameFile(name);
if (fileInfo != null) {
return fileInfo;
}
}
return null;
}
@Override public Iterator<CommitPoint> iterator() {
return commitPoints.iterator();
}
public static byte[] toXContent(CommitPoint commitPoint) throws Exception {
XContentBuilder builder = XContentFactory.contentBinaryBuilder(XContentType.JSON).prettyPrint();
builder.startObject();
builder.field("version", commitPoint.version());
builder.field("name", commitPoint.name());
builder.field("type", commitPoint.type().toString());
builder.startObject("index_files");
for (CommitPoint.FileInfo fileInfo : commitPoint.indexFiles()) {
builder.startObject(fileInfo.name());
builder.field("physical_name", fileInfo.physicalName());
builder.field("length", fileInfo.length());
builder.endObject();
}
builder.endObject();
builder.startObject("translog_files");
for (CommitPoint.FileInfo fileInfo : commitPoint.translogFiles()) {
builder.startObject(fileInfo.name());
builder.field("physical_name", fileInfo.physicalName());
builder.field("length", fileInfo.length());
builder.endObject();
}
builder.endObject();
builder.endObject();
return builder.copiedBytes();
}
public static CommitPoint fromXContent(byte[] data) throws Exception {
XContentParser parser = XContentFactory.xContent(XContentType.JSON).createParser(data);
try {
String currentFieldName = null;
XContentParser.Token token = parser.nextToken();
if (token == null) {
// no data...
throw new IOException("No commit point data");
}
long version = -1;
String name = null;
CommitPoint.Type type = null;
List<CommitPoint.FileInfo> indexFiles = Lists.newArrayList();
List<CommitPoint.FileInfo> translogFiles = Lists.newArrayList();
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else if (token == XContentParser.Token.START_OBJECT) {
List<CommitPoint.FileInfo> files = null;
if ("index_files".equals(currentFieldName) || "indexFiles".equals(currentFieldName)) {
files = indexFiles;
} else if ("translog_files".equals(currentFieldName) || "translogFiles".equals(currentFieldName)) {
files = translogFiles;
} else {
throw new IOException("Can't handle object with name [" + currentFieldName + "]");
}
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else if (token == XContentParser.Token.START_OBJECT) {
String fileName = currentFieldName;
String physicalName = null;
long size = -1;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else if (token.isValue()) {
if ("physical_name".equals(currentFieldName) || "physicalName".equals(currentFieldName)) {
physicalName = parser.text();
} else if ("length".equals(currentFieldName)) {
size = parser.longValue();
}
}
}
if (physicalName == null) {
throw new IOException("Malformed commit, missing physical_name for [" + fileName + "]");
}
if (size == -1) {
throw new IOException("Malformed commit, missing size for [" + fileName + "]");
}
files.add(new CommitPoint.FileInfo(fileName, physicalName, size));
}
}
} else if (token.isValue()) {
if ("version".equals(currentFieldName)) {
version = parser.longValue();
} else if ("name".equals(currentFieldName)) {
name = parser.text();
} else if ("type".equals(currentFieldName)) {
type = CommitPoint.Type.valueOf(parser.text());
}
}
}
if (version == -1) {
throw new IOException("Malformed commit, missing version");
}
if (name == null) {
throw new IOException("Malformed commit, missing name");
}
if (type == null) {
throw new IOException("Malformed commit, missing type");
}
return new CommitPoint(version, name, type, indexFiles, translogFiles);
} finally {
parser.close();
}
}
}

View File

@ -237,7 +237,7 @@ public class IndexShardGatewayService extends AbstractIndexShardComponent implem
StringBuilder sb = new StringBuilder(); StringBuilder sb = new StringBuilder();
sb.append("snapshot (").append(reason).append(") completed to ").append(shardGateway).append(", took [").append(TimeValue.timeValueMillis(snapshotStatus.time())).append("]\n"); sb.append("snapshot (").append(reason).append(") completed to ").append(shardGateway).append(", took [").append(TimeValue.timeValueMillis(snapshotStatus.time())).append("]\n");
sb.append(" index : version [").append(lastIndexVersion).append("], number_of_files [").append(snapshotStatus.index().numberOfFiles()).append("] with total_size [").append(new ByteSizeValue(snapshotStatus.index().totalSize())).append("], took [").append(TimeValue.timeValueMillis(snapshotStatus.index().time())).append("]\n"); sb.append(" index : version [").append(lastIndexVersion).append("], number_of_files [").append(snapshotStatus.index().numberOfFiles()).append("] with total_size [").append(new ByteSizeValue(snapshotStatus.index().totalSize())).append("], took [").append(TimeValue.timeValueMillis(snapshotStatus.index().time())).append("]\n");
sb.append(" translog : id [").append(lastTranslogId).append("], number_of_operations [").append(snapshotStatus.translog().currentTranslogOperations()).append("], took [").append(TimeValue.timeValueMillis(snapshotStatus.translog().time())).append("]"); sb.append(" translog : id [").append(lastTranslogId).append("], took [").append(TimeValue.timeValueMillis(snapshotStatus.translog().time())).append("]");
logger.debug(sb.toString()); logger.debug(sb.toString());
} }
} }

View File

@ -120,8 +120,6 @@ public class SnapshotStatus {
} }
public static class Translog { public static class Translog {
private volatile int currentTranslogOperations;
private long startTime; private long startTime;
private long time; private long time;
@ -140,13 +138,5 @@ public class SnapshotStatus {
public void time(long time) { public void time(long time) {
this.time = time; this.time = time;
} }
public void addTranslogOperations(long count) {
this.currentTranslogOperations += count;
}
public long currentTranslogOperations() {
return this.currentTranslogOperations;
}
} }
} }

View File

@ -25,6 +25,7 @@ import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.BlobStore; import org.elasticsearch.common.blobstore.BlobStore;
import org.elasticsearch.common.blobstore.ImmutableBlobContainer; import org.elasticsearch.common.blobstore.ImmutableBlobContainer;
import org.elasticsearch.common.collect.ImmutableMap; import org.elasticsearch.common.collect.ImmutableMap;
import org.elasticsearch.common.collect.Lists;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.gateway.Gateway; import org.elasticsearch.gateway.Gateway;
@ -32,10 +33,13 @@ import org.elasticsearch.gateway.blobstore.BlobStoreGateway;
import org.elasticsearch.gateway.none.NoneGateway; import org.elasticsearch.gateway.none.NoneGateway;
import org.elasticsearch.index.AbstractIndexComponent; import org.elasticsearch.index.AbstractIndexComponent;
import org.elasticsearch.index.Index; import org.elasticsearch.index.Index;
import org.elasticsearch.index.gateway.CommitPoint;
import org.elasticsearch.index.gateway.CommitPoints;
import org.elasticsearch.index.gateway.IndexGateway; import org.elasticsearch.index.gateway.IndexGateway;
import org.elasticsearch.index.settings.IndexSettings; import org.elasticsearch.index.settings.IndexSettings;
import java.io.IOException; import java.io.IOException;
import java.util.List;
/** /**
* @author kimchy (shay.banon) * @author kimchy (shay.banon)
@ -65,9 +69,24 @@ public abstract class BlobStoreIndexGateway extends AbstractIndexComponent imple
this.indexPath = this.gateway.basePath().add("indices").add(index.name()); this.indexPath = this.gateway.basePath().add("indices").add(index.name());
} }
public ImmutableMap<String, BlobMetaData> listIndexBlobs(int shardId) throws IOException { public CommitPoint findCommitPoint(int shardId) throws IOException {
ImmutableBlobContainer indexContainer = blobStore.immutableBlobContainer(shardIndexPath(shardId)); ImmutableBlobContainer container = blobStore.immutableBlobContainer(shardPath(shardId));
return BlobStoreIndexShardGateway.buildVirtualBlobs(indexContainer, indexContainer.listBlobs(), null); ImmutableMap<String, BlobMetaData> blobs = container.listBlobs();
List<CommitPoint> commitPointsList = Lists.newArrayList();
for (String name : blobs.keySet()) {
if (name.startsWith("commit-")) {
try {
commitPointsList.add(CommitPoints.fromXContent(container.readBlobFully(name)));
} catch (Exception e) {
logger.warn("failed to read commit point [{}]", name);
}
}
}
CommitPoints commitPoints = new CommitPoints(commitPointsList);
if (commitPoints.commits().isEmpty()) {
return null;
}
return commitPoints.commits().get(0);
} }
@Override public String toString() { @Override public String toString() {
@ -78,10 +97,6 @@ public abstract class BlobStoreIndexGateway extends AbstractIndexComponent imple
return blobStore; return blobStore;
} }
public BlobPath indexPath() {
return this.indexPath;
}
public ByteSizeValue chunkSize() { public ByteSizeValue chunkSize() {
return this.chunkSize; return this.chunkSize;
} }
@ -90,14 +105,6 @@ public abstract class BlobStoreIndexGateway extends AbstractIndexComponent imple
return indexPath.add(Integer.toString(shardId)); return indexPath.add(Integer.toString(shardId));
} }
public BlobPath shardIndexPath(int shardId) {
return shardPath(shardId).add("index");
}
public BlobPath shardTranslogPath(int shardId) {
return shardPath(shardId).add("translog");
}
@Override public void close(boolean delete) throws ElasticSearchException { @Override public void close(boolean delete) throws ElasticSearchException {
if (delete) { if (delete) {
blobStore.delete(indexPath); blobStore.delete(indexPath);

View File

@ -24,22 +24,17 @@ import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IndexInput; import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput; import org.apache.lucene.store.IndexOutput;
import org.elasticsearch.ElasticSearchException; import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.common.Digest;
import org.elasticsearch.common.Hex;
import org.elasticsearch.common.blobstore.*; import org.elasticsearch.common.blobstore.*;
import org.elasticsearch.common.blobstore.support.PlainBlobMetaData;
import org.elasticsearch.common.collect.ImmutableMap; import org.elasticsearch.common.collect.ImmutableMap;
import org.elasticsearch.common.collect.Sets; import org.elasticsearch.common.collect.Iterables;
import org.elasticsearch.common.collect.Lists;
import org.elasticsearch.common.io.FastByteArrayInputStream;
import org.elasticsearch.common.io.FastByteArrayOutputStream; import org.elasticsearch.common.io.FastByteArrayOutputStream;
import org.elasticsearch.common.io.stream.BytesStreamInput; import org.elasticsearch.common.io.stream.BytesStreamInput;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.CachedStreamOutput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.lucene.store.InputStreamIndexInput; import org.elasticsearch.common.lucene.store.InputStreamIndexInput;
import org.elasticsearch.common.lucene.store.ThreadSafeInputStreamIndexInput; import org.elasticsearch.common.lucene.store.ThreadSafeInputStreamIndexInput;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit; import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit;
import org.elasticsearch.index.gateway.*; import org.elasticsearch.index.gateway.*;
import org.elasticsearch.index.settings.IndexSettings; import org.elasticsearch.index.settings.IndexSettings;
@ -53,16 +48,9 @@ import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogStreams; import org.elasticsearch.index.translog.TranslogStreams;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import javax.annotation.Nullable;
import java.io.ByteArrayInputStream;
import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.security.MessageDigest; import java.util.Iterator;
import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
@ -86,13 +74,7 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo
protected final BlobPath shardPath; protected final BlobPath shardPath;
protected final ImmutableBlobContainer indexContainer; protected final ImmutableBlobContainer blobContainer;
protected final AppendableBlobContainer translogContainer;
protected final ConcurrentMap<String, String> cachedMd5 = ConcurrentCollections.newConcurrentMap();
private volatile AppendableBlobContainer.AppendableBlob translogBlob;
private volatile RecoveryStatus recoveryStatus; private volatile RecoveryStatus recoveryStatus;
@ -114,8 +96,7 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo
this.blobStore = blobStoreIndexGateway.blobStore(); this.blobStore = blobStoreIndexGateway.blobStore();
this.shardPath = blobStoreIndexGateway.shardPath(shardId.id()); this.shardPath = blobStoreIndexGateway.shardPath(shardId.id());
this.indexContainer = blobStore.immutableBlobContainer(blobStoreIndexGateway.shardIndexPath(shardId.id())); this.blobContainer = blobStore.immutableBlobContainer(shardPath);
this.translogContainer = blobStore.appendableBlobContainer(blobStoreIndexGateway.shardTranslogPath(shardId.id()));
this.recoveryStatus = new RecoveryStatus(); this.recoveryStatus = new RecoveryStatus();
} }
@ -133,10 +114,6 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo
} }
@Override public void close(boolean delete) throws ElasticSearchException { @Override public void close(boolean delete) throws ElasticSearchException {
if (translogBlob != null) {
translogBlob.close();
translogBlob = null;
}
if (delete) { if (delete) {
blobStore.delete(shardPath); blobStore.delete(shardPath);
} }
@ -182,85 +159,73 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo
} }
private void doSnapshot(final Snapshot snapshot) throws IndexShardGatewaySnapshotFailedException { private void doSnapshot(final Snapshot snapshot) throws IndexShardGatewaySnapshotFailedException {
ImmutableMap<String, BlobMetaData> blobs;
try {
blobs = blobContainer.listBlobs();
} catch (IOException e) {
throw new IndexShardGatewaySnapshotFailedException(shardId, "failed to list blobs", e);
}
long generation = findLatestFileNameGeneration(blobs);
CommitPoints commitPoints = buildCommitPoints(blobs);
currentSnapshotStatus.index().startTime(System.currentTimeMillis()); currentSnapshotStatus.index().startTime(System.currentTimeMillis());
currentSnapshotStatus.updateStage(SnapshotStatus.Stage.INDEX); currentSnapshotStatus.updateStage(SnapshotStatus.Stage.INDEX);
boolean indexDirty = false;
final SnapshotIndexCommit snapshotIndexCommit = snapshot.indexCommit(); final SnapshotIndexCommit snapshotIndexCommit = snapshot.indexCommit();
final Translog.Snapshot translogSnapshot = snapshot.translogSnapshot(); final Translog.Snapshot translogSnapshot = snapshot.translogSnapshot();
ImmutableMap<String, BlobMetaData> indicesBlobs = null; final CountDownLatch indexLatch = new CountDownLatch(snapshotIndexCommit.getFiles().length);
ImmutableMap<String, BlobMetaData> virtualIndicesBlobs = null; final CopyOnWriteArrayList<Throwable> failures = new CopyOnWriteArrayList<Throwable>();
final List<CommitPoint.FileInfo> indexCommitPointFiles = Lists.newArrayList();
int indexNumberOfFiles = 0; int indexNumberOfFiles = 0;
long indexTotalFilesSize = 0; long indexTotalFilesSize = 0;
if (snapshot.indexChanged()) { for (final String fileName : snapshotIndexCommit.getFiles()) {
long time = System.currentTimeMillis(); StoreFileMetaData storeMetaData;
indexDirty = true;
try { try {
indicesBlobs = indexContainer.listBlobs(); storeMetaData = store.metaData(fileName);
} catch (IOException e) { } catch (IOException e) {
throw new IndexShardGatewaySnapshotFailedException(shardId, "Failed to list indices files from gateway", e); throw new IndexShardGatewaySnapshotFailedException(shardId, "Failed to get store file metadata", e);
} }
virtualIndicesBlobs = buildVirtualBlobs(indexContainer, indicesBlobs, cachedMd5);
// snapshot into the index boolean snapshotRequired = false;
final CountDownLatch latch = new CountDownLatch(snapshotIndexCommit.getFiles().length); if (snapshot.indexChanged() && fileName.equals(snapshotIndexCommit.getSegmentsFileName())) {
final CopyOnWriteArrayList<Throwable> failures = new CopyOnWriteArrayList<Throwable>(); snapshotRequired = true; // we want to always snapshot the segment file if the index changed
}
for (final String fileName : snapshotIndexCommit.getFiles()) { CommitPoint.FileInfo fileInfo = commitPoints.findPhysicalIndexFile(fileName);
StoreFileMetaData snapshotFileMetaData; if (fileInfo == null || fileInfo.length() != storeMetaData.length() || !commitPointFileExistsInBlobs(fileInfo, blobs)) {
try { // commit point file does not exists in any commit point, or has different length, or does not fully exists in the listed blobs
snapshotFileMetaData = store.metaDataWithMd5(fileName); snapshotRequired = true;
} catch (IOException e) { }
throw new IndexShardGatewaySnapshotFailedException(shardId, "Failed to get store file metadata", e);
}
// don't copy over the segments file, it will be copied over later on as part of the
// final snapshot phase
if (fileName.equals(snapshotIndexCommit.getSegmentsFileName())) {
latch.countDown();
continue;
}
// if the file exists in the gateway, and has the same length, don't copy it over
if (virtualIndicesBlobs.containsKey(fileName) && virtualIndicesBlobs.get(fileName).md5().equals(snapshotFileMetaData.md5())) {
latch.countDown();
continue;
}
// we are snapshotting the file
if (snapshotRequired) {
indexNumberOfFiles++; indexNumberOfFiles++;
indexTotalFilesSize += snapshotFileMetaData.sizeInBytes(); indexTotalFilesSize += storeMetaData.length();
// create a new FileInfo
if (virtualIndicesBlobs.containsKey(fileName)) {
try {
cachedMd5.remove(fileName);
indexContainer.deleteBlobsByPrefix(fileName);
} catch (IOException e) {
logger.debug("failed to delete [" + fileName + "] before snapshotting, ignoring...");
}
}
try { try {
snapshotFile(snapshotIndexCommit.getDirectory(), snapshotFileMetaData, latch, failures); CommitPoint.FileInfo snapshotFileInfo = new CommitPoint.FileInfo(fileNameFromGeneration(++generation), storeMetaData.name(), storeMetaData.length());
indexCommitPointFiles.add(snapshotFileInfo);
snapshotFile(snapshotIndexCommit.getDirectory(), snapshotFileInfo, indexLatch, failures);
} catch (IOException e) { } catch (IOException e) {
failures.add(e); failures.add(e);
latch.countDown(); indexLatch.countDown();
} }
} else {
indexCommitPointFiles.add(fileInfo);
indexLatch.countDown();
} }
}
currentSnapshotStatus.index().files(indexNumberOfFiles, indexTotalFilesSize);
currentSnapshotStatus.index().files(indexNumberOfFiles + 1 /* for the segment */, indexTotalFilesSize); try {
indexLatch.await();
try { } catch (InterruptedException e) {
latch.await(); failures.add(e);
} catch (InterruptedException e) { }
failures.add(e); if (!failures.isEmpty()) {
} throw new IndexShardGatewaySnapshotFailedException(shardId(), "Failed to perform snapshot (index files)", failures.get(failures.size() - 1));
if (!failures.isEmpty()) {
throw new IndexShardGatewaySnapshotFailedException(shardId(), "Failed to perform snapshot (index files)", failures.get(failures.size() - 1));
}
} }
currentSnapshotStatus.index().time(System.currentTimeMillis() - currentSnapshotStatus.index().startTime()); currentSnapshotStatus.index().time(System.currentTimeMillis() - currentSnapshotStatus.index().startTime());
@ -268,133 +233,97 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo
currentSnapshotStatus.updateStage(SnapshotStatus.Stage.TRANSLOG); currentSnapshotStatus.updateStage(SnapshotStatus.Stage.TRANSLOG);
currentSnapshotStatus.translog().startTime(System.currentTimeMillis()); currentSnapshotStatus.translog().startTime(System.currentTimeMillis());
if (snapshot.newTranslogCreated() || snapshot.sameTranslogNewOperations()) { // Note, we assume the snapshot is always started from "base 0". We need to seek forward if we want to lastTranslogPosition if we want the delta
if (snapshot.newTranslogCreated() && translogBlob != null) { List<CommitPoint.FileInfo> translogCommitPointFiles = Lists.newArrayList();
translogBlob.close(); boolean snapshotRequired = snapshot.newTranslogCreated();
translogBlob = null; if (!snapshot.newTranslogCreated()) {
} // if we have a commit point, check that we have all the files listed in it
if (!commitPoints.commits().isEmpty()) {
if (translogBlob == null) { CommitPoint commitPoint = commitPoints.commits().get(0);
try { boolean allTranslogFilesExists = true;
translogBlob = translogContainer.appendBlob("translog-" + translogSnapshot.translogId()); for (CommitPoint.FileInfo fileInfo : commitPoint.translogFiles()) {
} catch (IOException e) { if (!commitPointFileExistsInBlobs(fileInfo, blobs)) {
throw new IndexShardGatewaySnapshotFailedException(shardId, "Failed to create translog", e); allTranslogFilesExists = false;
}
}
final CountDownLatch latch = new CountDownLatch(1);
final AtomicReference<Throwable> failure = new AtomicReference<Throwable>();
translogBlob.append(new AppendableBlobContainer.AppendBlobListener() {
@Override public void withStream(StreamOutput os) throws IOException {
if (!snapshot.newTranslogCreated()) {
translogSnapshot.seekForward(snapshot.lastTranslogPosition());
}
BytesStreamOutput bout = CachedStreamOutput.cachedBytes();
while (translogSnapshot.hasNext()) {
bout.reset();
bout.writeInt(0);
TranslogStreams.writeTranslogOperation(bout, translogSnapshot.next());
bout.flush();
int size = bout.size();
bout.seek(0);
bout.writeInt(size - 4);
os.writeBytes(bout.unsafeByteArray(), size);
currentSnapshotStatus.translog().addTranslogOperations(1);
}
}
@Override public void onCompleted() {
latch.countDown();
}
@Override public void onFailure(Throwable t) {
failure.set(t);
latch.countDown();
}
});
try {
latch.await();
} catch (InterruptedException e) {
failure.set(e);
}
if (failure.get() != null) {
throw new IndexShardGatewaySnapshotFailedException(shardId, "Failed to snapshot translog", failure.get());
}
}
currentSnapshotStatus.translog().time(System.currentTimeMillis() - currentSnapshotStatus.translog().startTime());
// now write the segments file
if (indexDirty) {
try {
if (indicesBlobs.containsKey(snapshotIndexCommit.getSegmentsFileName())) {
cachedMd5.remove(snapshotIndexCommit.getSegmentsFileName());
indexContainer.deleteBlob(snapshotIndexCommit.getSegmentsFileName());
}
StoreFileMetaData snapshotFileMetaData = store.metaDataWithMd5(snapshotIndexCommit.getSegmentsFileName());
indexTotalFilesSize += snapshotFileMetaData.sizeInBytes();
long time = System.currentTimeMillis();
CountDownLatch latch = new CountDownLatch(1);
CopyOnWriteArrayList<Throwable> failures = new CopyOnWriteArrayList<Throwable>();
snapshotFile(snapshotIndexCommit.getDirectory(), snapshotFileMetaData, latch, failures);
latch.await();
if (!failures.isEmpty()) {
throw new IndexShardGatewaySnapshotFailedException(shardId(), "Failed to perform snapshot (segment index file)", failures.get(failures.size() - 1));
}
} catch (Exception e) {
if (e instanceof IndexShardGatewaySnapshotFailedException) {
throw (IndexShardGatewaySnapshotFailedException) e;
}
throw new IndexShardGatewaySnapshotFailedException(shardId(), "Failed to finalize index snapshot into [" + snapshotIndexCommit.getSegmentsFileName() + "]", e);
}
}
currentSnapshotStatus.updateStage(SnapshotStatus.Stage.FINALIZE);
// delete the old translog
if (snapshot.newTranslogCreated()) {
try {
translogContainer.deleteBlobsByFilter(new BlobContainer.BlobNameFilter() {
@Override public boolean accept(String blobName) {
// delete all the ones that are not this translog
return !blobName.equals("translog-" + translogSnapshot.translogId());
}
});
} catch (Exception e) {
// ignore
}
// NOT doing this one, the above allows us to clean the translog properly
// try {
// translogContainer.deleteBlob("translog-" + snapshot.lastTranslogId());
// } catch (Exception e) {
// // ignore
// }
}
// delete old index files
if (indexDirty) {
for (BlobMetaData md : virtualIndicesBlobs.values()) {
boolean found = false;
for (final String fileName : snapshotIndexCommit.getFiles()) {
if (md.name().equals(fileName)) {
found = true;
break; break;
} }
} }
if (!found) { // if everything exists, we can seek forward in case there are new operations, otherwise, we copy over all again...
try { if (allTranslogFilesExists) {
cachedMd5.remove(md.name()); translogCommitPointFiles.addAll(commitPoint.translogFiles());
indexContainer.deleteBlobsByPrefix(md.name()); if (snapshot.sameTranslogNewOperations()) {
} catch (IOException e) { translogSnapshot.seekForward(snapshot.lastTranslogPosition());
logger.debug("failed to delete unused index files, will retry later...", e); snapshotRequired = true;
} }
} else {
snapshotRequired = true;
}
}
}
if (snapshotRequired) {
CommitPoint.FileInfo addedTranslogFileInfo = new CommitPoint.FileInfo(fileNameFromGeneration(++generation), "translog-" + translogSnapshot.translogId(), translogSnapshot.lengthInBytes());
translogCommitPointFiles.add(addedTranslogFileInfo);
try {
snapshotTranslog(translogSnapshot, addedTranslogFileInfo);
} catch (Exception e) {
throw new IndexShardGatewaySnapshotFailedException(shardId, "Failed to snapshot translog", e);
}
}
currentSnapshotStatus.translog().time(System.currentTimeMillis() - currentSnapshotStatus.translog().startTime());
// now create and write the commit point
currentSnapshotStatus.updateStage(SnapshotStatus.Stage.FINALIZE);
long version = 0;
if (!commitPoints.commits().isEmpty()) {
version = commitPoints.commits().iterator().next().version() + 1;
}
String commitPointName = "commit-" + Long.toString(version, Character.MAX_RADIX);
CommitPoint commitPoint = new CommitPoint(version, commitPointName, CommitPoint.Type.GENERATED, indexCommitPointFiles, translogCommitPointFiles);
try {
byte[] commitPointData = CommitPoints.toXContent(commitPoint);
blobContainer.writeBlob(commitPointName, new FastByteArrayInputStream(commitPointData), commitPointData.length);
} catch (Exception e) {
throw new IndexShardGatewaySnapshotFailedException(shardId, "Failed to write commit point", e);
}
// delete all files that are not referenced by any commit point
// build a new CommitPoint, that includes this one and all the saved ones
List<CommitPoint> newCommitPointsList = Lists.newArrayList();
newCommitPointsList.add(commitPoint);
for (CommitPoint point : commitPoints) {
if (point.type() == CommitPoint.Type.SAVED) {
newCommitPointsList.add(point);
}
}
CommitPoints newCommitPoints = new CommitPoints(newCommitPointsList);
// first, go over and delete all the commit points
for (String blobName : blobs.keySet()) {
if (!blobName.startsWith("commit-")) {
continue;
}
long checkedVersion = Long.parseLong(blobName.substring("commit-".length()), Character.MAX_RADIX);
if (!newCommitPoints.hasVersion(checkedVersion)) {
try {
blobContainer.deleteBlob(blobName);
} catch (IOException e) {
// ignore
}
}
}
// now go over all the blobs, and if they don't exists in a commit point, delete them
for (String blobName : blobs.keySet()) {
String name = blobName;
if (name.startsWith("commit-")) {
continue;
}
if (blobName.contains(".part")) {
name = blobName.substring(0, blobName.indexOf(".part"));
}
if (newCommitPoints.findNameFile(name) == null) {
try {
blobContainer.deleteBlob(blobName);
} catch (IOException e) {
// ignore, will delete it laters
} }
} }
} }
@ -403,46 +332,65 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo
@Override public void recover(RecoveryStatus recoveryStatus) throws IndexShardGatewayRecoveryException { @Override public void recover(RecoveryStatus recoveryStatus) throws IndexShardGatewayRecoveryException {
this.recoveryStatus = recoveryStatus; this.recoveryStatus = recoveryStatus;
recoveryStatus.index().startTime(System.currentTimeMillis()); final ImmutableMap<String, BlobMetaData> blobs;
recoveryStatus.updateStage(RecoveryStatus.Stage.INDEX); try {
recoverIndex(); blobs = blobContainer.listBlobs();
recoveryStatus.index().time(System.currentTimeMillis() - recoveryStatus.index().startTime()); } catch (IOException e) {
throw new IndexShardGatewayRecoveryException(shardId, "Failed to list content of gateway", e);
}
recoveryStatus.translog().startTime(System.currentTimeMillis()); CommitPoints commitPoints = buildCommitPoints(blobs);
recoveryStatus.updateStage(RecoveryStatus.Stage.TRANSLOG); if (commitPoints.commits().isEmpty()) {
recoverTranslog(); recoveryStatus.index().startTime(System.currentTimeMillis());
recoveryStatus.translog().time(System.currentTimeMillis() - recoveryStatus.index().startTime()); recoveryStatus.index().time(System.currentTimeMillis() - recoveryStatus.index().startTime());
recoveryStatus.translog().startTime(System.currentTimeMillis());
recoveryStatus.translog().time(System.currentTimeMillis() - recoveryStatus.index().startTime());
return;
}
for (CommitPoint commitPoint : commitPoints) {
if (!commitPointExistsInBlobs(commitPoint, blobs)) {
logger.warn("listed commit_point [{}]/[{}], but not all files exists, ignoring", commitPoint.name(), commitPoint.version());
continue;
}
try {
recoveryStatus.index().startTime(System.currentTimeMillis());
recoveryStatus.updateStage(RecoveryStatus.Stage.INDEX);
recoverIndex(commitPoint, blobs);
recoveryStatus.index().time(System.currentTimeMillis() - recoveryStatus.index().startTime());
recoveryStatus.translog().startTime(System.currentTimeMillis());
recoveryStatus.updateStage(RecoveryStatus.Stage.TRANSLOG);
recoverTranslog(commitPoint, blobs);
recoveryStatus.translog().time(System.currentTimeMillis() - recoveryStatus.index().startTime());
return;
} catch (Exception e) {
throw new IndexShardGatewayRecoveryException(shardId, "failed to recover commit_point [" + commitPoint.name() + "]/[" + commitPoint.version() + "]", e);
}
}
throw new IndexShardGatewayRecoveryException(shardId, "No commit point data is available in gateway", null);
} }
private void recoverTranslog() throws IndexShardGatewayRecoveryException { private void recoverTranslog(CommitPoint commitPoint, ImmutableMap<String, BlobMetaData> blobs) throws IndexShardGatewayRecoveryException {
long translogId; if (commitPoint.translogFiles().isEmpty()) {
try { // no translog files, bail
translogId = IndexReader.getCurrentVersion(store.directory());
} catch (FileNotFoundException e) {
// no index, that fine
indexShard.start();
return;
} catch (IOException e) {
throw new IndexShardGatewayRecoveryException(shardId, "Failed to recover translog, can't read current index version", e);
}
if (!translogContainer.blobExists("translog-" + translogId)) {
// no recovery file found, start the shard and bail
indexShard.start(); indexShard.start();
return; return;
} }
try { try {
indexShard.performRecoveryPrepareForTranslog(); indexShard.performRecoveryPrepareForTranslog();
final AtomicReference<Throwable> failure = new AtomicReference<Throwable>(); final AtomicReference<Throwable> failure = new AtomicReference<Throwable>();
final CountDownLatch latch = new CountDownLatch(1); final CountDownLatch latch = new CountDownLatch(1);
translogContainer.readBlob("translog-" + translogId, new BlobContainer.ReadBlobListener() { final Iterator<CommitPoint.FileInfo> transIt = commitPoint.translogFiles().iterator();
blobContainer.readBlob(transIt.next().name(), new BlobContainer.ReadBlobListener() {
FastByteArrayOutputStream bos = new FastByteArrayOutputStream(); FastByteArrayOutputStream bos = new FastByteArrayOutputStream();
boolean ignore = false; boolean ignore = false;
@Override public synchronized void onPartial(byte[] data, int offset, int size) throws IOException { @Override public void onPartial(byte[] data, int offset, int size) throws IOException {
if (ignore) { if (ignore) {
return; return;
} }
@ -493,15 +441,20 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo
} }
@Override public synchronized void onCompleted() { @Override public synchronized void onCompleted() {
latch.countDown(); if (!transIt.hasNext()) {
latch.countDown();
return;
}
blobContainer.readBlob(transIt.next().name(), this);
} }
@Override public synchronized void onFailure(Throwable t) { @Override public void onFailure(Throwable t) {
failure.set(t); failure.set(t);
latch.countDown(); latch.countDown();
} }
}); });
latch.await(); latch.await();
if (failure.get() != null) { if (failure.get() != null) {
throw failure.get(); throw failure.get();
@ -513,52 +466,40 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo
} }
} }
private void recoverIndex() throws IndexShardGatewayRecoveryException { private void recoverIndex(CommitPoint commitPoint, ImmutableMap<String, BlobMetaData> blobs) throws Exception {
final ImmutableMap<String, BlobMetaData> indicesBlobs;
try {
indicesBlobs = indexContainer.listBlobs();
} catch (IOException e) {
throw new IndexShardGatewayRecoveryException(shardId, "Failed to list content of gateway", e);
}
ImmutableMap<String, BlobMetaData> virtualIndicesBlobs = buildVirtualBlobs(indexContainer, indicesBlobs, cachedMd5);
int numberOfFiles = 0; int numberOfFiles = 0;
long totalSize = 0; long totalSize = 0;
int numberOfExistingFiles = 0; int numberOfExistingFiles = 0;
long existingTotalSize = 0; long existingTotalSize = 0;
// filter out only the files that we need to recover, and reuse ones that exists in the store List<CommitPoint.FileInfo> filesToRecover = Lists.newArrayList();
List<BlobMetaData> filesToRecover = new ArrayList<BlobMetaData>(); for (CommitPoint.FileInfo fileInfo : commitPoint.indexFiles()) {
for (BlobMetaData virtualMd : virtualIndicesBlobs.values()) { StoreFileMetaData storeFile = store.metaData(fileInfo.physicalName());
// if the store has the file, and it has the same length, don't recover it if (storeFile != null && !storeFile.name().contains("segment") && storeFile.length() == fileInfo.length()) {
try { numberOfExistingFiles++;
StoreFileMetaData storeMd = store.metaDataWithMd5(virtualMd.name()); existingTotalSize += storeFile.length();
if (storeMd != null && storeMd.md5().equals(virtualMd.md5())) { totalSize += storeFile.length();
numberOfExistingFiles++; if (logger.isTraceEnabled()) {
existingTotalSize += virtualMd.sizeInBytes(); logger.trace("not_recovering [{}], exists in local store and has same length [{}]", fileInfo.physicalName(), fileInfo.length());
totalSize += virtualMd.sizeInBytes();
if (logger.isTraceEnabled()) {
logger.trace("not_recovering [{}], exists in local store and has same md5 [{}]", virtualMd.name(), virtualMd.md5());
}
} else {
if (logger.isTraceEnabled()) {
if (storeMd == null) {
logger.trace("recovering [{}], does not exists in local store", virtualMd.name());
} else {
logger.trace("recovering [{}], exists in local store but has different md5: gateway [{}], local [{}]", virtualMd.name(), virtualMd.md5(), storeMd.md5());
}
}
numberOfFiles++;
totalSize += virtualMd.sizeInBytes();
filesToRecover.add(virtualMd);
} }
} catch (Exception e) { } else {
filesToRecover.add(virtualMd); if (logger.isTraceEnabled()) {
logger.debug("failed to check local store for existence of [{}]", e, virtualMd.name()); if (storeFile == null) {
logger.trace("recovering [{}], does not exists in local store", fileInfo.physicalName());
} else {
logger.trace("recovering [{}], exists in local store but has different length: gateway [{}], local [{}]", fileInfo.physicalName(), fileInfo.length(), storeFile.length());
}
}
numberOfFiles++;
totalSize += fileInfo.length();
filesToRecover.add(fileInfo);
} }
} }
recoveryStatus.index().files(numberOfFiles, totalSize, numberOfExistingFiles, existingTotalSize); recoveryStatus.index().files(numberOfFiles, totalSize, numberOfExistingFiles, existingTotalSize);
if (filesToRecover.isEmpty()) {
logger.trace("no files to recover, all exists within the local store");
}
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace("recovering_files [{}] with total_size [{}], reusing_files [{}] with reused_size [{}]", numberOfFiles, new ByteSizeValue(totalSize), numberOfExistingFiles, new ByteSizeValue(existingTotalSize)); logger.trace("recovering_files [{}] with total_size [{}], reusing_files [{}] with reused_size [{}]", numberOfFiles, new ByteSizeValue(totalSize), numberOfExistingFiles, new ByteSizeValue(existingTotalSize));
@ -566,8 +507,9 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo
final CountDownLatch latch = new CountDownLatch(filesToRecover.size()); final CountDownLatch latch = new CountDownLatch(filesToRecover.size());
final CopyOnWriteArrayList<Throwable> failures = new CopyOnWriteArrayList<Throwable>(); final CopyOnWriteArrayList<Throwable> failures = new CopyOnWriteArrayList<Throwable>();
for (final BlobMetaData fileToRecover : filesToRecover) {
recoverFile(fileToRecover, indicesBlobs, latch, failures); for (final CommitPoint.FileInfo fileToRecover : filesToRecover) {
recoverFile(fileToRecover, blobs, latch, failures);
} }
try { try {
@ -594,7 +536,7 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo
/// now, go over and clean files that are in the store, but were not in the gateway /// now, go over and clean files that are in the store, but were not in the gateway
try { try {
for (String storeFile : store.directory().listAll()) { for (String storeFile : store.directory().listAll()) {
if (!virtualIndicesBlobs.containsKey(storeFile)) { if (!commitPoint.containPhysicalIndexFile(storeFile)) {
try { try {
store.directory().deleteFile(storeFile); store.directory().deleteFile(storeFile);
} catch (IOException e) { } catch (IOException e) {
@ -607,42 +549,41 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo
} }
} }
private void recoverFile(final BlobMetaData fileToRecover, final ImmutableMap<String, BlobMetaData> blobs, final CountDownLatch latch, final List<Throwable> failures) { private void recoverFile(final CommitPoint.FileInfo fileInfo, final ImmutableMap<String, BlobMetaData> blobs, final CountDownLatch latch, final List<Throwable> failures) {
final IndexOutput indexOutput; final IndexOutput indexOutput;
try { try {
indexOutput = store.directory().createOutput(fileToRecover.name()); indexOutput = store.directory().createOutput(fileInfo.physicalName());
} catch (IOException e) { } catch (IOException e) {
failures.add(e); failures.add(e);
latch.countDown(); latch.countDown();
return; return;
} }
String firstFileToRecover = fileToRecover.name(); String firstFileToRecover = fileInfo.name();
if (!blobs.containsKey(fileToRecover.name())) { if (!blobs.containsKey(fileInfo.name())) {
// chunking, append part0 to it // chunking, append part0 to it
firstFileToRecover = fileToRecover.name() + ".part0"; firstFileToRecover = fileInfo.name() + ".part0";
} }
if (!blobs.containsKey(firstFileToRecover)) { if (!blobs.containsKey(firstFileToRecover)) {
// no file, what to do, what to do? // no file, what to do, what to do?
logger.warn("no file [{}] to recover, even though it has md5, ignoring it", fileToRecover.name()); logger.warn("no file [{}]/[{}] to recover, ignoring it", fileInfo.name(), fileInfo.physicalName());
latch.countDown(); latch.countDown();
return; return;
} }
final AtomicInteger partIndex = new AtomicInteger(); final AtomicInteger partIndex = new AtomicInteger();
final MessageDigest digest = Digest.getMd5Digest();
indexContainer.readBlob(firstFileToRecover, new BlobContainer.ReadBlobListener() { blobContainer.readBlob(firstFileToRecover, new BlobContainer.ReadBlobListener() {
@Override public synchronized void onPartial(byte[] data, int offset, int size) throws IOException { @Override public synchronized void onPartial(byte[] data, int offset, int size) throws IOException {
recoveryStatus.index().addCurrentFilesSize(size); recoveryStatus.index().addCurrentFilesSize(size);
indexOutput.writeBytes(data, offset, size); indexOutput.writeBytes(data, offset, size);
digest.update(data, offset, size);
} }
@Override public synchronized void onCompleted() { @Override public synchronized void onCompleted() {
int part = partIndex.incrementAndGet(); int part = partIndex.incrementAndGet();
String partName = fileToRecover.name() + ".part" + part; String partName = fileInfo.name() + ".part" + part;
if (blobs.containsKey(partName)) { if (blobs.containsKey(partName)) {
// continue with the new part // continue with the new part
indexContainer.readBlob(partName, this); blobContainer.readBlob(partName, this);
return; return;
} else { } else {
// we are done... // we are done...
@ -653,12 +594,6 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo
return; return;
} }
} }
// double check the md5, warn if it does not equal...
String md5 = Hex.encodeHexString(digest.digest());
if (!md5.equals(fileToRecover.md5())) {
logger.warn("file [{}] has different md5, actual read content [{}], store [{}]", fileToRecover.name(), md5, fileToRecover.md5());
}
latch.countDown(); latch.countDown();
} }
@ -669,13 +604,46 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo
}); });
} }
private void snapshotFile(Directory dir, final StoreFileMetaData fileMetaData, final CountDownLatch latch, final List<Throwable> failures) throws IOException { private void snapshotTranslog(Translog.Snapshot snapshot, CommitPoint.FileInfo fileInfo) throws IOException {
blobContainer.writeBlob(fileInfo.name(), snapshot.stream(), snapshot.lengthInBytes());
//
// long chunkBytes = Long.MAX_VALUE;
// if (chunkSize != null) {
// chunkBytes = chunkSize.bytes();
// }
//
// long totalLength = fileInfo.length();
// long numberOfChunks = totalLength / chunkBytes;
// if (totalLength % chunkBytes > 0) {
// numberOfChunks++;
// }
// if (numberOfChunks == 0) {
// numberOfChunks++;
// }
//
// if (numberOfChunks == 1) {
// blobContainer.writeBlob(fileInfo.name(), snapshot.stream(), snapshot.lengthInBytes());
// } else {
// InputStream translogStream = snapshot.stream();
// long totalLengthLeftToWrite = totalLength;
// for (int i = 0; i < numberOfChunks; i++) {
// long lengthToWrite = chunkBytes;
// if (totalLengthLeftToWrite < chunkBytes) {
// lengthToWrite = totalLengthLeftToWrite;
// }
// blobContainer.writeBlob(fileInfo.name() + ".part" + i, new LimitInputStream(translogStream, lengthToWrite), lengthToWrite);
// totalLengthLeftToWrite -= lengthToWrite;
// }
// }
}
private void snapshotFile(Directory dir, final CommitPoint.FileInfo fileInfo, final CountDownLatch latch, final List<Throwable> failures) throws IOException {
long chunkBytes = Long.MAX_VALUE; long chunkBytes = Long.MAX_VALUE;
if (chunkSize != null) { if (chunkSize != null) {
chunkBytes = chunkSize.bytes(); chunkBytes = chunkSize.bytes();
} }
long totalLength = fileMetaData.sizeInBytes(); long totalLength = fileInfo.length();
long numberOfChunks = totalLength / chunkBytes; long numberOfChunks = totalLength / chunkBytes;
if (totalLength % chunkBytes > 0) { if (totalLength % chunkBytes > 0) {
numberOfChunks++; numberOfChunks++;
@ -687,22 +655,22 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo
final long fNumberOfChunks = numberOfChunks; final long fNumberOfChunks = numberOfChunks;
final AtomicLong counter = new AtomicLong(numberOfChunks); final AtomicLong counter = new AtomicLong(numberOfChunks);
for (long i = 0; i < fNumberOfChunks; i++) { for (long i = 0; i < fNumberOfChunks; i++) {
final long chunkNumber = i; final long partNumber = i;
IndexInput indexInput = null; IndexInput indexInput = null;
try { try {
indexInput = dir.openInput(fileMetaData.name()); indexInput = dir.openInput(fileInfo.physicalName());
indexInput.seek(chunkNumber * chunkBytes); indexInput.seek(partNumber * chunkBytes);
InputStreamIndexInput is = new ThreadSafeInputStreamIndexInput(indexInput, chunkBytes); InputStreamIndexInput is = new ThreadSafeInputStreamIndexInput(indexInput, chunkBytes);
String blobName = fileMetaData.name(); String blobName = fileInfo.name();
if (fNumberOfChunks > 1) { if (fNumberOfChunks > 1) {
// if we do chunks, then all of them are in the form of "[xxx].part[N]". // if we do chunks, then all of them are in the form of "[xxx].part[N]".
blobName += ".part" + chunkNumber; blobName += ".part" + partNumber;
} }
final IndexInput fIndexInput = indexInput; final IndexInput fIndexInput = indexInput;
indexContainer.writeBlob(blobName, is, is.actualSizeToRead(), new ImmutableBlobContainer.WriterListener() { blobContainer.writeBlob(blobName, is, is.actualSizeToRead(), new ImmutableBlobContainer.WriterListener() {
@Override public void onCompleted() { @Override public void onCompleted() {
try { try {
fIndexInput.close(); fIndexInput.close();
@ -710,18 +678,7 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo
// ignore // ignore
} }
if (counter.decrementAndGet() == 0) { if (counter.decrementAndGet() == 0) {
// now, write the expected md5 latch.countDown();
byte[] md5 = Digest.md5HexToByteArray(fileMetaData.md5());
indexContainer.writeBlob(fileMetaData.name() + ".md5", new ByteArrayInputStream(md5), md5.length, new ImmutableBlobContainer.WriterListener() {
@Override public void onCompleted() {
latch.countDown();
}
@Override public void onFailure(Throwable t) {
failures.add(t);
latch.countDown();
}
});
} }
} }
@ -751,48 +708,80 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo
} }
} }
public static ImmutableMap<String, BlobMetaData> buildVirtualBlobs(ImmutableBlobContainer container, ImmutableMap<String, BlobMetaData> blobs, @Nullable Map<String, String> cachedMd5) { private void writeCommitPoint(CommitPoint commitPoint) throws Exception {
// create a set of all the actual files based on .md5 extension byte[] data = CommitPoints.toXContent(commitPoint);
Set<String> names = Sets.newHashSet(); blobContainer.writeBlob("commit-" + commitPoint.version(), new FastByteArrayInputStream(data), data.length);
for (BlobMetaData blob : blobs.values()) { }
if (blob.name().endsWith(".md5")) {
names.add(blob.name().substring(0, blob.name().lastIndexOf(".md5"))); private boolean commitPointExistsInBlobs(CommitPoint commitPoint, ImmutableMap<String, BlobMetaData> blobs) {
for (CommitPoint.FileInfo fileInfo : Iterables.concat(commitPoint.indexFiles(), commitPoint.translogFiles())) {
if (!commitPointFileExistsInBlobs(fileInfo, blobs)) {
return false;
} }
} }
ImmutableMap.Builder<String, BlobMetaData> builder = ImmutableMap.builder(); return true;
for (String name : names) { }
long sizeInBytes = 0;
if (blobs.containsKey(name)) { private boolean commitPointFileExistsInBlobs(CommitPoint.FileInfo fileInfo, ImmutableMap<String, BlobMetaData> blobs) {
// no chunking BlobMetaData blobMetaData = blobs.get(fileInfo.name());
sizeInBytes = blobs.get(name).sizeInBytes(); if (blobMetaData != null) {
} else { if (blobMetaData.length() != fileInfo.length()) {
// chunking... return false;
int part = 0; }
while (true) { } else if (blobs.containsKey(fileInfo.name() + ".part0")) {
BlobMetaData md = blobs.get(name + ".part" + part); // multi part file sum up the size and check
if (md == null) { int part = 0;
break; long totalSize = 0;
} while (true) {
sizeInBytes += md.sizeInBytes(); blobMetaData = blobs.get(fileInfo.name() + ".part" + part++);
part++; if (blobMetaData == null) {
break;
} }
totalSize += blobMetaData.length();
}
if (totalSize != fileInfo.length()) {
return false;
}
} else {
// no file, not exact and not multipart
return false;
}
return true;
}
private CommitPoints buildCommitPoints(ImmutableMap<String, BlobMetaData> blobs) {
List<CommitPoint> commitPoints = Lists.newArrayList();
for (String name : blobs.keySet()) {
if (name.startsWith("commit-")) {
try {
commitPoints.add(CommitPoints.fromXContent(blobContainer.readBlobFully(name)));
} catch (Exception e) {
logger.warn("failed to read commit point [{}]", name);
}
}
}
return new CommitPoints(commitPoints);
}
private String fileNameFromGeneration(long generation) {
return "__" + Long.toString(generation, Character.MAX_RADIX);
}
private long findLatestFileNameGeneration(ImmutableMap<String, BlobMetaData> blobs) {
long generation = -1;
for (String name : blobs.keySet()) {
if (name.startsWith("commit-")) {
continue;
}
if (name.contains(".part")) {
name = name.substring(0, name.indexOf(".part"));
} }
if (cachedMd5 != null && cachedMd5.containsKey(name)) { long currentGen = Long.parseLong(name.substring(2) /*__*/, Character.MAX_RADIX);
builder.put(name, new PlainBlobMetaData(name, sizeInBytes, cachedMd5.get(name))); if (currentGen > generation) {
} else { generation = currentGen;
// no md5, get it
try {
String md5 = Digest.md5HexFromByteArray(container.readBlobFully(name + ".md5"));
if (cachedMd5 != null) {
cachedMd5.put(name, md5);
}
builder.put(name, new PlainBlobMetaData(name, sizeInBytes, md5));
} catch (Exception e) {
// don't add it!
}
} }
} }
return builder.build(); return generation;
} }
} }

View File

@ -97,28 +97,28 @@ public class RecoverySource extends AbstractComponent {
StopWatch stopWatch = new StopWatch().start(); StopWatch stopWatch = new StopWatch().start();
for (String name : snapshot.getFiles()) { for (String name : snapshot.getFiles()) {
StoreFileMetaData md = shard.store().metaDataWithMd5(name); StoreFileMetaData md = shard.store().metaData(name);
boolean useExisting = false; boolean useExisting = false;
if (request.existingFiles.containsKey(name)) { if (request.existingFiles().containsKey(name)) {
if (md.md5().equals(request.existingFiles.get(name).md5())) { if (!md.name().contains("segment") && md.length() == request.existingFiles().get(name).length()) {
response.phase1ExistingFileNames.add(name); response.phase1ExistingFileNames.add(name);
response.phase1ExistingFileSizes.add(md.sizeInBytes()); response.phase1ExistingFileSizes.add(md.length());
existingTotalSize += md.sizeInBytes(); existingTotalSize += md.length();
useExisting = true; useExisting = true;
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace("[{}][{}] recovery [phase1] to {}: not recovering [{}], exists in local store and has md5 [{}]", request.shardId().index().name(), request.shardId().id(), request.targetNode(), name, md.md5()); logger.trace("[{}][{}] recovery [phase1] to {}: not recovering [{}], exists in local store and has size [{}]", request.shardId().index().name(), request.shardId().id(), request.targetNode(), name, md.length());
} }
} }
} }
if (!useExisting) { if (!useExisting) {
if (request.existingFiles.containsKey(name)) { if (request.existingFiles().containsKey(name)) {
logger.trace("[{}][{}] recovery [phase1] to {}: recovering [{}], exists in local store, but has different md5: remote [{}], local [{}]", request.shardId().index().name(), request.shardId().id(), request.targetNode(), name, request.existingFiles.get(name).md5(), md.md5()); logger.trace("[{}][{}] recovery [phase1] to {}: recovering [{}], exists in local store, but has different length: remote [{}], local [{}]", request.shardId().index().name(), request.shardId().id(), request.targetNode(), name, request.existingFiles().get(name).length(), md.length());
} else { } else {
logger.trace("[{}][{}] recovery [phase1] to {}: recovering [{}], does not exists in remote", request.shardId().index().name(), request.shardId().id(), request.targetNode(), name); logger.trace("[{}][{}] recovery [phase1] to {}: recovering [{}], does not exists in remote", request.shardId().index().name(), request.shardId().id(), request.targetNode(), name);
} }
response.phase1FileNames.add(name); response.phase1FileNames.add(name);
response.phase1FileSizes.add(md.sizeInBytes()); response.phase1FileSizes.add(md.length());
totalSize += md.sizeInBytes(); totalSize += md.length();
} }
} }
response.phase1TotalSize = totalSize; response.phase1TotalSize = totalSize;

View File

@ -43,7 +43,7 @@ public class StartRecoveryRequest implements Streamable {
private boolean markAsRelocated; private boolean markAsRelocated;
Map<String, StoreFileMetaData> existingFiles; private Map<String, StoreFileMetaData> existingFiles;
StartRecoveryRequest() { StartRecoveryRequest() {
} }

View File

@ -91,7 +91,7 @@ public interface IndexStore extends IndexComponent {
public long totalSizeInBytes() { public long totalSizeInBytes() {
long totalSizeInBytes = 0; long totalSizeInBytes = 0;
for (StoreFileMetaData file : this) { for (StoreFileMetaData file : this) {
totalSizeInBytes += file.sizeInBytes(); totalSizeInBytes += file.length();
} }
return totalSizeInBytes; return totalSizeInBytes;
} }

View File

@ -38,12 +38,8 @@ public interface Store extends IndexShardComponent {
StoreFileMetaData metaData(String name) throws IOException; StoreFileMetaData metaData(String name) throws IOException;
StoreFileMetaData metaDataWithMd5(String name) throws IOException;
ImmutableMap<String, StoreFileMetaData> list() throws IOException; ImmutableMap<String, StoreFileMetaData> list() throws IOException;
ImmutableMap<String, StoreFileMetaData> listWithMd5() throws IOException;
/** /**
* Just deletes the content of the store. * Just deletes the content of the store.
*/ */

View File

@ -34,18 +34,15 @@ public class StoreFileMetaData implements Streamable {
private long lastModified; private long lastModified;
private long sizeInBytes; private long length;
private String md5;
StoreFileMetaData() { StoreFileMetaData() {
} }
public StoreFileMetaData(String name, long sizeInBytes, long lastModified, String md5) { public StoreFileMetaData(String name, long length, long lastModified) {
this.name = name; this.name = name;
this.lastModified = lastModified; this.lastModified = lastModified;
this.sizeInBytes = sizeInBytes; this.length = length;
this.md5 = md5;
} }
public String name() { public String name() {
@ -56,12 +53,8 @@ public class StoreFileMetaData implements Streamable {
return this.lastModified; return this.lastModified;
} }
public long sizeInBytes() { public long length() {
return sizeInBytes; return length;
}
public String md5() {
return md5;
} }
public static StoreFileMetaData readStoreFileMetaData(StreamInput in) throws IOException { public static StoreFileMetaData readStoreFileMetaData(StreamInput in) throws IOException {
@ -71,25 +64,16 @@ public class StoreFileMetaData implements Streamable {
} }
@Override public String toString() { @Override public String toString() {
return "name[" + name + "], sizeInBytes[" + sizeInBytes + "], md5[" + md5 + "]"; return "name [" + name + "], length [" + length + "]";
} }
@Override public void readFrom(StreamInput in) throws IOException { @Override public void readFrom(StreamInput in) throws IOException {
name = in.readUTF(); name = in.readUTF();
sizeInBytes = in.readVLong(); length = in.readVLong();
if (in.readBoolean()) {
md5 = in.readUTF();
}
} }
@Override public void writeTo(StreamOutput out) throws IOException { @Override public void writeTo(StreamOutput out) throws IOException {
out.writeUTF(name); out.writeUTF(name);
out.writeVLong(sizeInBytes); out.writeVLong(length);
if (md5 == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
out.writeUTF(md5);
}
} }
} }

View File

@ -20,14 +20,11 @@
package org.elasticsearch.index.store.fs; package org.elasticsearch.index.store.fs;
import org.elasticsearch.ElasticSearchIllegalStateException; import org.elasticsearch.ElasticSearchIllegalStateException;
import org.elasticsearch.common.Digest;
import org.elasticsearch.common.collect.ImmutableMap; import org.elasticsearch.common.collect.ImmutableMap;
import org.elasticsearch.common.collect.Maps; import org.elasticsearch.common.collect.Maps;
import org.elasticsearch.common.io.FileSystemUtils; import org.elasticsearch.common.io.FileSystemUtils;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.Index; import org.elasticsearch.index.Index;
import org.elasticsearch.index.service.IndexService; import org.elasticsearch.index.service.IndexService;
@ -37,10 +34,8 @@ import org.elasticsearch.index.store.StoreFileMetaData;
import org.elasticsearch.index.store.support.AbstractIndexStore; import org.elasticsearch.index.store.support.AbstractIndexStore;
import java.io.File; import java.io.File;
import java.io.FileInputStream;
import java.io.IOException; import java.io.IOException;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentMap;
/** /**
* @author kimchy (shay.banon) * @author kimchy (shay.banon)
@ -49,8 +44,6 @@ public abstract class FsIndexStore extends AbstractIndexStore {
private final File location; private final File location;
private final ConcurrentMap<ShardId, ConcurrentMap<String, String>> cachedUnallocatedMd5s = ConcurrentCollections.newConcurrentMap();
public FsIndexStore(Index index, @IndexSettings Settings indexSettings, IndexService indexService, NodeEnvironment nodeEnv) { public FsIndexStore(Index index, @IndexSettings Settings indexSettings, IndexService indexService, NodeEnvironment nodeEnv) {
super(index, indexSettings, indexService); super(index, indexSettings, indexService);
if (nodeEnv.hasNodeFile()) { if (nodeEnv.hasNodeFile()) {
@ -93,7 +86,6 @@ public abstract class FsIndexStore extends AbstractIndexStore {
if (indexService.hasShard(shardId.id())) { if (indexService.hasShard(shardId.id())) {
throw new ElasticSearchIllegalStateException(shardId + " allocated, can't be deleted"); throw new ElasticSearchIllegalStateException(shardId + " allocated, can't be deleted");
} }
cachedUnallocatedMd5s.remove(shardId);
FileSystemUtils.deleteRecursively(shardLocation(shardId)); FileSystemUtils.deleteRecursively(shardLocation(shardId));
} }
@ -105,53 +97,13 @@ public abstract class FsIndexStore extends AbstractIndexStore {
if (!shardIndexLocation.exists()) { if (!shardIndexLocation.exists()) {
return new StoreFilesMetaData(false, shardId, ImmutableMap.<String, StoreFileMetaData>of()); return new StoreFilesMetaData(false, shardId, ImmutableMap.<String, StoreFileMetaData>of());
} }
ConcurrentMap<String, String> shardIdCachedMd5s = cachedUnallocatedMd5s.get(shardId);
if (shardIdCachedMd5s == null) {
shardIdCachedMd5s = ConcurrentCollections.newConcurrentMap();
cachedUnallocatedMd5s.put(shardId, shardIdCachedMd5s);
}
Map<String, StoreFileMetaData> files = Maps.newHashMap(); Map<String, StoreFileMetaData> files = Maps.newHashMap();
for (File file : shardIndexLocation.listFiles()) { for (File file : shardIndexLocation.listFiles()) {
// ignore md5 files files.put(file.getName(), new StoreFileMetaData(file.getName(), file.length(), file.lastModified()));
if (file.getName().endsWith(".md5")) {
continue;
}
// calculate md5
String md5 = shardIdCachedMd5s.get(file.getName());
if (md5 == null) {
// first, see if there is an md5 file associated with it
File md5File = new File(shardIndexLocation, file.getName() + ".md5");
if (md5File.exists()) {
try {
byte[] md5Bytes = Streams.copyToByteArray(md5File);
md5 = Digest.md5HexFromByteArray(md5Bytes);
} catch (Exception e) {
// ignore, compute...
}
}
// not created as a file, compute it
if (md5 == null) {
FileInputStream is = new FileInputStream(file);
try {
md5 = Digest.md5Hex(is);
} finally {
is.close();
}
}
if (md5 != null) {
shardIdCachedMd5s.put(file.getName(), md5);
}
}
files.put(file.getName(), new StoreFileMetaData(file.getName(), file.length(), file.lastModified(), md5));
} }
return new StoreFilesMetaData(false, shardId, files); return new StoreFilesMetaData(false, shardId, files);
} }
ConcurrentMap<String, String> cachedShardMd5s(ShardId shardId) {
return cachedUnallocatedMd5s.get(shardId);
}
public File shardLocation(ShardId shardId) { public File shardLocation(ShardId shardId) {
return new File(location, Integer.toString(shardId.id())); return new File(location, Integer.toString(shardId.id()));
} }

View File

@ -34,7 +34,6 @@ import org.elasticsearch.index.store.support.AbstractStore;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.util.concurrent.ConcurrentMap;
/** /**
* @author kimchy (shay.banon) * @author kimchy (shay.banon)
@ -45,14 +44,6 @@ public abstract class FsStore extends AbstractStore {
super(shardId, indexSettings, indexStore); super(shardId, indexSettings, indexStore);
} }
@Override protected String preComputedMd5(String fileName) {
ConcurrentMap<String, String> shardIdCachedMd5s = ((FsIndexStore) indexStore).cachedShardMd5s(shardId);
if (shardIdCachedMd5s == null) {
return null;
}
return shardIdCachedMd5s.get(fileName);
}
@Override public void fullDelete() throws IOException { @Override public void fullDelete() throws IOException {
FileSystemUtils.deleteRecursively(fsDirectory().getFile()); FileSystemUtils.deleteRecursively(fsDirectory().getFile());
// if we are the last ones, delete also the actual index // if we are the last ones, delete also the actual index

View File

@ -53,7 +53,7 @@ public abstract class AbstractIndexStore extends AbstractIndexComponent implemen
if (indexShard == null) { if (indexShard == null) {
return listUnallocatedStoreMetaData(shardId); return listUnallocatedStoreMetaData(shardId);
} else { } else {
return new StoreFilesMetaData(true, shardId, indexShard.store().listWithMd5()); return new StoreFilesMetaData(true, shardId, indexShard.store().list());
} }
} }

View File

@ -20,13 +20,10 @@
package org.elasticsearch.index.store.support; package org.elasticsearch.index.store.support;
import org.apache.lucene.store.*; import org.apache.lucene.store.*;
import org.elasticsearch.common.Digest;
import org.elasticsearch.common.Hex;
import org.elasticsearch.common.Strings; import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.ImmutableMap; import org.elasticsearch.common.collect.ImmutableMap;
import org.elasticsearch.common.collect.MapBuilder; import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.lucene.Directories; import org.elasticsearch.common.lucene.Directories;
import org.elasticsearch.common.lucene.store.InputStreamIndexInput;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.index.settings.IndexSettings; import org.elasticsearch.index.settings.IndexSettings;
@ -38,7 +35,6 @@ import org.elasticsearch.index.store.StoreFileMetaData;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.security.MessageDigest;
import java.util.Map; import java.util.Map;
/** /**
@ -72,44 +68,12 @@ public abstract class AbstractStore extends AbstractIndexShardComponent implemen
return null; return null;
} }
// IndexOutput not closed, does not exists // IndexOutput not closed, does not exists
if (md.lastModified() == -1 || md.sizeInBytes() == -1) { if (md.lastModified() == -1 || md.length() == -1) {
return null; return null;
} }
return md; return md;
} }
@Override public StoreFileMetaData metaDataWithMd5(String name) throws IOException {
StoreFileMetaData md = metaData(name);
if (md == null) {
return null;
}
if (md.md5() == null) {
IndexInput in = directory().openInput(name);
String md5;
try {
InputStreamIndexInput is = new InputStreamIndexInput(in, Long.MAX_VALUE);
md5 = Digest.md5Hex(is);
} finally {
in.close();
}
synchronized (mutex) {
md = metaData(name);
if (md == null) {
return null;
}
if (md.md5() == null) {
if (shouldWriteMd5(name)) {
writeMd5File(directory(), name, md5);
}
md = new StoreFileMetaData(md.name(), md.sizeInBytes(), md.sizeInBytes(), md5);
filesMetadata = MapBuilder.newMapBuilder(filesMetadata).put(name, md).immutableMap();
}
}
}
return md;
}
@Override public ImmutableMap<String, StoreFileMetaData> list() throws IOException { @Override public ImmutableMap<String, StoreFileMetaData> list() throws IOException {
ImmutableMap.Builder<String, StoreFileMetaData> builder = ImmutableMap.builder(); ImmutableMap.Builder<String, StoreFileMetaData> builder = ImmutableMap.builder();
for (String name : files) { for (String name : files) {
@ -121,17 +85,6 @@ public abstract class AbstractStore extends AbstractIndexShardComponent implemen
return builder.build(); return builder.build();
} }
@Override public ImmutableMap<String, StoreFileMetaData> listWithMd5() throws IOException {
ImmutableMap.Builder<String, StoreFileMetaData> builder = ImmutableMap.builder();
for (String name : files) {
StoreFileMetaData md = metaDataWithMd5(name);
if (md != null) {
builder.put(md.name(), md);
}
}
return builder.build();
}
@Override public void deleteContent() throws IOException { @Override public void deleteContent() throws IOException {
Directories.deleteFiles(directory()); Directories.deleteFiles(directory());
} }
@ -155,21 +108,6 @@ public abstract class AbstractStore extends AbstractIndexShardComponent implemen
directory().close(); directory().close();
} }
protected String preComputedMd5(String fileName) {
return null;
}
private boolean shouldWriteMd5(String name) {
return !name.startsWith("segments") && !name.endsWith(".md5");
}
private void writeMd5File(Directory directory, String file, String md5) throws IOException {
byte[] md5Bytes = Digest.md5HexToByteArray(md5);
IndexOutput output = directory.createOutput(file + ".md5");
output.writeBytes(md5Bytes, md5Bytes.length);
output.close();
}
/** /**
* The idea of the store directory is to cache file level meta data, as well as md5 of it * The idea of the store directory is to cache file level meta data, as well as md5 of it
*/ */
@ -182,23 +120,7 @@ public abstract class AbstractStore extends AbstractIndexShardComponent implemen
synchronized (mutex) { synchronized (mutex) {
MapBuilder<String, StoreFileMetaData> builder = MapBuilder.newMapBuilder(); MapBuilder<String, StoreFileMetaData> builder = MapBuilder.newMapBuilder();
for (String file : delegate.listAll()) { for (String file : delegate.listAll()) {
if (file.endsWith(".md5")) { builder.put(file, new StoreFileMetaData(file, delegate.fileLength(file), delegate.fileModified(file)));
// md5 are files we create, ignore them
continue;
}
try {
String md5 = preComputedMd5(file);
if (md5 != null) {
if (shouldWriteMd5(file)) {
writeMd5File(delegate, file, md5);
}
}
builder.put(file, new StoreFileMetaData(file, delegate.fileLength(file), delegate.fileModified(file), md5));
} catch (FileNotFoundException e) {
// ignore
}
} }
filesMetadata = builder.immutableMap(); filesMetadata = builder.immutableMap();
files = filesMetadata.keySet().toArray(new String[filesMetadata.size()]); files = filesMetadata.keySet().toArray(new String[filesMetadata.size()]);
@ -230,23 +152,14 @@ public abstract class AbstractStore extends AbstractIndexShardComponent implemen
synchronized (mutex) { synchronized (mutex) {
StoreFileMetaData metaData = filesMetadata.get(name); StoreFileMetaData metaData = filesMetadata.get(name);
if (metaData != null) { if (metaData != null) {
metaData = new StoreFileMetaData(metaData.name(), metaData.sizeInBytes(), delegate.fileModified(name), metaData.md5()); metaData = new StoreFileMetaData(metaData.name(), metaData.length(), delegate.fileModified(name));
filesMetadata = MapBuilder.newMapBuilder(filesMetadata).put(name, metaData).immutableMap(); filesMetadata = MapBuilder.newMapBuilder(filesMetadata).put(name, metaData).immutableMap();
} }
} }
} }
@Override public void deleteFile(String name) throws IOException { @Override public void deleteFile(String name) throws IOException {
if (name.endsWith(".md5")) {
// ignore, this should not really happen...
return;
}
delegate.deleteFile(name); delegate.deleteFile(name);
try {
delegate.deleteFile(name + ".md5");
} catch (Exception e) {
// ignore
}
synchronized (mutex) { synchronized (mutex) {
filesMetadata = MapBuilder.newMapBuilder(filesMetadata).remove(name).immutableMap(); filesMetadata = MapBuilder.newMapBuilder(filesMetadata).remove(name).immutableMap();
files = filesMetadata.keySet().toArray(new String[filesMetadata.size()]); files = filesMetadata.keySet().toArray(new String[filesMetadata.size()]);
@ -259,8 +172,8 @@ public abstract class AbstractStore extends AbstractIndexShardComponent implemen
throw new FileNotFoundException(name); throw new FileNotFoundException(name);
} }
// not set yet (IndexOutput not closed) // not set yet (IndexOutput not closed)
if (metaData.sizeInBytes() != -1) { if (metaData.length() != -1) {
return metaData.sizeInBytes(); return metaData.length();
} }
return delegate.fileLength(name); return delegate.fileLength(name);
} }
@ -268,7 +181,7 @@ public abstract class AbstractStore extends AbstractIndexShardComponent implemen
@Override public IndexOutput createOutput(String name) throws IOException { @Override public IndexOutput createOutput(String name) throws IOException {
IndexOutput out = delegate.createOutput(name); IndexOutput out = delegate.createOutput(name);
synchronized (mutex) { synchronized (mutex) {
StoreFileMetaData metaData = new StoreFileMetaData(name, -1, -1, null); StoreFileMetaData metaData = new StoreFileMetaData(name, -1, -1);
filesMetadata = MapBuilder.newMapBuilder(filesMetadata).put(name, metaData).immutableMap(); filesMetadata = MapBuilder.newMapBuilder(filesMetadata).put(name, metaData).immutableMap();
files = filesMetadata.keySet().toArray(new String[filesMetadata.size()]); files = filesMetadata.keySet().toArray(new String[filesMetadata.size()]);
} }
@ -328,31 +241,15 @@ public abstract class AbstractStore extends AbstractIndexShardComponent implemen
private final String name; private final String name;
private MessageDigest digest;
private StoreIndexOutput(IndexOutput delegate, String name) { private StoreIndexOutput(IndexOutput delegate, String name) {
this.delegate = delegate; this.delegate = delegate;
this.name = name; this.name = name;
if (shouldWriteMd5(name)) {
this.digest = Digest.getMd5Digest();
} else {
this.digest = Digest.NULL_DIGEST;
}
} }
@Override public void close() throws IOException { @Override public void close() throws IOException {
delegate.close(); delegate.close();
synchronized (mutex) { synchronized (mutex) {
StoreFileMetaData md = filesMetadata.get(name); StoreFileMetaData md = new StoreFileMetaData(name, directory().fileLength(name), directory().fileModified(name));
String md5 = md == null ? null : md.md5();
byte[] digestBytes = digest.digest();
if (digestBytes != null) {
md5 = Hex.encodeHexString(digestBytes);
if (shouldWriteMd5(name)) {
writeMd5File(directory(), name, md5);
}
}
md = new StoreFileMetaData(name, directory().fileLength(name), directory().fileModified(name), md5);
filesMetadata = MapBuilder.newMapBuilder(filesMetadata).put(name, md).immutableMap(); filesMetadata = MapBuilder.newMapBuilder(filesMetadata).put(name, md).immutableMap();
files = filesMetadata.keySet().toArray(new String[filesMetadata.size()]); files = filesMetadata.keySet().toArray(new String[filesMetadata.size()]);
} }
@ -360,12 +257,10 @@ public abstract class AbstractStore extends AbstractIndexShardComponent implemen
@Override public void writeByte(byte b) throws IOException { @Override public void writeByte(byte b) throws IOException {
delegate.writeByte(b); delegate.writeByte(b);
digest.update(b);
} }
@Override public void writeBytes(byte[] b, int offset, int length) throws IOException { @Override public void writeBytes(byte[] b, int offset, int length) throws IOException {
delegate.writeBytes(b, offset, length); delegate.writeBytes(b, offset, length);
digest.update(b, offset, length);
} }
// don't override it, base class method simple reads from input and writes to this output // don't override it, base class method simple reads from input and writes to this output
@ -383,8 +278,6 @@ public abstract class AbstractStore extends AbstractIndexShardComponent implemen
@Override public void seek(long pos) throws IOException { @Override public void seek(long pos) throws IOException {
delegate.seek(pos); delegate.seek(pos);
// once we seek, digest is not applicable
digest = Digest.NULL_DIGEST;
} }
@Override public long length() throws IOException { @Override public long length() throws IOException {

View File

@ -35,6 +35,7 @@ import org.elasticsearch.index.shard.service.IndexShard;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream;
/** /**
* @author kimchy (shay.banon) * @author kimchy (shay.banon)
@ -61,7 +62,7 @@ public interface Translog extends IndexShardComponent {
* Creates a new transaction log internally. Note, users of this class should make * Creates a new transaction log internally. Note, users of this class should make
* sure that no operations are performed on the trans log when this is called. * sure that no operations are performed on the trans log when this is called.
*/ */
void newTranslog(long id) throws TranslogException; void newTranslog() throws TranslogException;
/** /**
* Adds a create operation to the transaction log. * Adds a create operation to the transaction log.
@ -108,6 +109,16 @@ public interface Translog extends IndexShardComponent {
Operation next(); Operation next();
void seekForward(long length); void seekForward(long length);
/**
* Returns a stream of this snapshot.
*/
InputStream stream() throws IOException;
/**
* The length in bytes of this channel.
*/
long lengthInBytes();
} }
/** /**

View File

@ -20,12 +20,15 @@
package org.elasticsearch.index.translog.fs; package org.elasticsearch.index.translog.fs;
import org.elasticsearch.ElasticSearchException; import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.common.io.FileChannelInputStream;
import org.elasticsearch.common.io.stream.BytesStreamInput; import org.elasticsearch.common.io.stream.BytesStreamInput;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogStreams; import org.elasticsearch.index.translog.TranslogStreams;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.FileChannel; import java.nio.channels.FileChannel;
@ -70,6 +73,14 @@ public class FsChannelSnapshot implements Translog.Snapshot {
return this.length; return this.length;
} }
@Override public InputStream stream() throws IOException {
return new FileChannelInputStream(channel, position, lengthInBytes());
}
@Override public long lengthInBytes() {
return length - position;
}
@Override public boolean hasNext() { @Override public boolean hasNext() {
try { try {
if (position > length) { if (position > length) {

View File

@ -26,10 +26,7 @@ import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogException; import org.elasticsearch.index.translog.TranslogException;
import org.elasticsearch.index.translog.TranslogStreams; import org.elasticsearch.index.translog.TranslogStreams;
import java.io.DataInputStream; import java.io.*;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
/** /**
* @author kimchy (shay.banon) * @author kimchy (shay.banon)
@ -72,6 +69,14 @@ public class FsStreamSnapshot implements Translog.Snapshot {
return this.length; return this.length;
} }
@Override public InputStream stream() throws IOException {
return dis;
}
@Override public long lengthInBytes() {
return length - position;
}
@Override public boolean hasNext() { @Override public boolean hasNext() {
try { try {
if (position > length) { if (position > length) {

View File

@ -49,7 +49,7 @@ public class FsTranslog extends AbstractIndexShardComponent implements Translog
private final Object mutex = new Object(); private final Object mutex = new Object();
private volatile long id; private volatile long id = 0;
private final AtomicInteger operationCounter = new AtomicInteger(); private final AtomicInteger operationCounter = new AtomicInteger();
@ -87,11 +87,11 @@ public class FsTranslog extends AbstractIndexShardComponent implements Translog
return new ByteSizeValue(0, ByteSizeUnit.BYTES); return new ByteSizeValue(0, ByteSizeUnit.BYTES);
} }
@Override public void newTranslog(long id) throws TranslogException { @Override public void newTranslog() throws TranslogException {
synchronized (mutex) { synchronized (mutex) {
operationCounter.set(0); operationCounter.set(0);
lastPosition = 0; lastPosition = 0;
this.id = id; this.id = id + 1;
if (raf != null) { if (raf != null) {
raf.decreaseRefCount(); raf.decreaseRefCount();
} }

View File

@ -1,80 +0,0 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.translog.memory;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.index.translog.Translog;
import java.util.Iterator;
import java.util.Queue;
/**
* @author kimchy (shay.banon)
*/
public class MemorySnapshot implements Translog.Snapshot {
private final long id;
private final Iterator<Translog.Operation> operationsIt;
private final long length;
private long position = 0;
public MemorySnapshot(long id, Queue<Translog.Operation> operations, long length) {
this.id = id;
this.operationsIt = operations.iterator();
this.length = length;
}
@Override public long translogId() {
return id;
}
@Override public boolean release() throws ElasticSearchException {
return true;
}
@Override public long length() {
return length;
}
@Override public long position() {
return this.position;
}
@Override public boolean hasNext() {
return position < length;
}
@Override public Translog.Operation next() {
Translog.Operation operation = operationsIt.next();
position++;
return operation;
}
@Override public void seekForward(long length) {
long numberToSeek = this.position + length;
while (numberToSeek-- != 0) {
operationsIt.next();
}
this.position += length;
}
}

View File

@ -1,105 +0,0 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.translog.memory;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.concurrent.ThreadSafe;
import org.elasticsearch.common.util.concurrent.jsr166y.LinkedTransferQueue;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogException;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
/**
* @author kimchy (shay.banon)
*/
@ThreadSafe
public class MemoryTranslog extends AbstractIndexShardComponent implements Translog {
private final Object mutex = new Object();
private final AtomicLong estimatedMemorySize = new AtomicLong();
private volatile long id;
private volatile Queue<Operation> operations;
private final AtomicInteger operationCounter = new AtomicInteger();
@Inject public MemoryTranslog(ShardId shardId, @IndexSettings Settings indexSettings) {
super(shardId, indexSettings);
}
@Override public long currentId() {
return this.id;
}
@Override public int size() {
return operationCounter.get();
}
@Override public ByteSizeValue estimateMemorySize() {
return new ByteSizeValue(estimatedMemorySize.get(), ByteSizeUnit.BYTES);
}
@Override public void newTranslog(long id) {
synchronized (mutex) {
estimatedMemorySize.set(0);
operations = new LinkedTransferQueue<Operation>();
operationCounter.set(0);
this.id = id;
}
}
@Override public void add(Operation operation) throws TranslogException {
operations.add(operation);
operationCounter.incrementAndGet();
estimatedMemorySize.addAndGet(operation.estimateSize() + 50);
}
@Override public Snapshot snapshot() {
synchronized (mutex) {
return new MemorySnapshot(currentId(), operations, operationCounter.get());
}
}
@Override public Snapshot snapshot(Snapshot snapshot) {
synchronized (mutex) {
MemorySnapshot memorySnapshot = (MemorySnapshot) snapshot;
if (currentId() != snapshot.translogId()) {
return snapshot();
}
MemorySnapshot newSnapshot = new MemorySnapshot(currentId(), operations, operationCounter.get());
newSnapshot.seekForward(memorySnapshot.position());
return newSnapshot;
}
}
@Override public void close() {
}
}

View File

@ -324,7 +324,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
final DiscoveryNode sourceNode = nodes.get(entry.currentNodeId()); final DiscoveryNode sourceNode = nodes.get(entry.currentNodeId());
try { try {
// we are recovering a backup from a primary, so no need to mark it as relocated // we are recovering a backup from a primary, so no need to mark it as relocated
final StartRecoveryRequest request = new StartRecoveryRequest(indexShard.shardId(), sourceNode, nodes.localNode(), false, indexShard.store().listWithMd5()); final StartRecoveryRequest request = new StartRecoveryRequest(indexShard.shardId(), sourceNode, nodes.localNode(), false, indexShard.store().list());
recoveryTarget.startRecovery(request, false, new PeerRecoveryListener(request, shardRouting, indexService)); recoveryTarget.startRecovery(request, false, new PeerRecoveryListener(request, shardRouting, indexService));
} catch (Exception e) { } catch (Exception e) {
handleRecoveryFailure(indexService, shardRouting, true, e); handleRecoveryFailure(indexService, shardRouting, true, e);
@ -354,7 +354,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
final DiscoveryNode sourceNode = nodes.get(shardRouting.relocatingNodeId()); final DiscoveryNode sourceNode = nodes.get(shardRouting.relocatingNodeId());
try { try {
// we are recovering a backup from a primary, so no need to mark it as relocated // we are recovering a backup from a primary, so no need to mark it as relocated
final StartRecoveryRequest request = new StartRecoveryRequest(indexShard.shardId(), sourceNode, nodes.localNode(), false, indexShard.store().listWithMd5()); final StartRecoveryRequest request = new StartRecoveryRequest(indexShard.shardId(), sourceNode, nodes.localNode(), false, indexShard.store().list());
recoveryTarget.startRecovery(request, false, new PeerRecoveryListener(request, shardRouting, indexService)); recoveryTarget.startRecovery(request, false, new PeerRecoveryListener(request, shardRouting, indexService));
} catch (Exception e) { } catch (Exception e) {
handleRecoveryFailure(indexService, shardRouting, true, e); handleRecoveryFailure(indexService, shardRouting, true, e);

View File

@ -205,10 +205,6 @@ public class RestIndicesStatusAction extends BaseRestHandler {
builder.field("size_in_bytes", gatewaySnapshotStatus.indexSize().bytes()); builder.field("size_in_bytes", gatewaySnapshotStatus.indexSize().bytes());
builder.endObject(); builder.endObject();
builder.startObject("translog");
builder.field("operations", gatewaySnapshotStatus.translogOperations());
builder.endObject();
builder.endObject(); builder.endObject();
} }

View File

@ -35,11 +35,12 @@ import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.Store; import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.store.ram.RamStore; import org.elasticsearch.index.store.ram.RamStore;
import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.memory.MemoryTranslog; import org.elasticsearch.index.translog.fs.FsTranslog;
import org.testng.annotations.AfterMethod; import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod; import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test; import org.testng.annotations.Test;
import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
@ -82,7 +83,7 @@ public abstract class AbstractSimpleEngineTests {
} }
protected Translog createTranslog() { protected Translog createTranslog() {
return new MemoryTranslog(shardId, EMPTY_SETTINGS); return new FsTranslog(shardId, EMPTY_SETTINGS, new File("work/fs-translog"), false);
} }
protected IndexDeletionPolicy createIndexDeletionPolicy() { protected IndexDeletionPolicy createIndexDeletionPolicy() {

View File

@ -0,0 +1,71 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.gateway;
import org.elasticsearch.common.collect.Lists;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.testng.annotations.Test;
import java.util.ArrayList;
import static org.hamcrest.MatcherAssert.*;
import static org.hamcrest.Matchers.*;
/**
* @author kimchy (shay.banon)
*/
public class CommitPointsTests {
private final ESLogger logger = Loggers.getLogger(CommitPointsTests.class);
@Test public void testCommitPointXContent() throws Exception {
ArrayList<CommitPoint.FileInfo> indexFiles = Lists.newArrayList();
indexFiles.add(new CommitPoint.FileInfo("file1", "file1_p", 100));
indexFiles.add(new CommitPoint.FileInfo("file2", "file2_p", 200));
ArrayList<CommitPoint.FileInfo> translogFiles = Lists.newArrayList();
translogFiles.add(new CommitPoint.FileInfo("t_file1", "t_file1_p", 100));
translogFiles.add(new CommitPoint.FileInfo("t_file2", "t_file2_p", 200));
CommitPoint commitPoint = new CommitPoint(1, "test", CommitPoint.Type.GENERATED, indexFiles, translogFiles);
byte[] serialized = CommitPoints.toXContent(commitPoint);
logger.info("serialized commit_point {}", new String(serialized));
CommitPoint desCp = CommitPoints.fromXContent(serialized);
assertThat(desCp.version(), equalTo(commitPoint.version()));
assertThat(desCp.name(), equalTo(commitPoint.name()));
assertThat(desCp.indexFiles().size(), equalTo(commitPoint.indexFiles().size()));
for (int i = 0; i < desCp.indexFiles().size(); i++) {
assertThat(desCp.indexFiles().get(i).name(), equalTo(commitPoint.indexFiles().get(i).name()));
assertThat(desCp.indexFiles().get(i).physicalName(), equalTo(commitPoint.indexFiles().get(i).physicalName()));
assertThat(desCp.indexFiles().get(i).length(), equalTo(commitPoint.indexFiles().get(i).length()));
}
assertThat(desCp.translogFiles().size(), equalTo(commitPoint.translogFiles().size()));
for (int i = 0; i < desCp.indexFiles().size(); i++) {
assertThat(desCp.translogFiles().get(i).name(), equalTo(commitPoint.translogFiles().get(i).name()));
assertThat(desCp.translogFiles().get(i).physicalName(), equalTo(commitPoint.translogFiles().get(i).physicalName()));
assertThat(desCp.translogFiles().get(i).length(), equalTo(commitPoint.translogFiles().get(i).length()));
}
}
}

View File

@ -38,13 +38,15 @@ import org.elasticsearch.index.shard.service.InternalIndexShard;
import org.elasticsearch.index.similarity.SimilarityService; import org.elasticsearch.index.similarity.SimilarityService;
import org.elasticsearch.index.store.Store; import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.store.ram.RamStore; import org.elasticsearch.index.store.ram.RamStore;
import org.elasticsearch.index.translog.memory.MemoryTranslog; import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.fs.FsTranslog;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.threadpool.scaling.ScalingThreadPool; import org.elasticsearch.threadpool.scaling.ScalingThreadPool;
import org.testng.annotations.AfterMethod; import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod; import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test; import org.testng.annotations.Test;
import java.io.File;
import java.io.IOException; import java.io.IOException;
import static org.elasticsearch.common.settings.ImmutableSettings.Builder.*; import static org.elasticsearch.common.settings.ImmutableSettings.Builder.*;
@ -72,7 +74,7 @@ public class SimpleIndexShardTests {
SnapshotDeletionPolicy policy = new SnapshotDeletionPolicy(new KeepOnlyLastDeletionPolicy(shardId, settings)); SnapshotDeletionPolicy policy = new SnapshotDeletionPolicy(new KeepOnlyLastDeletionPolicy(shardId, settings));
Store store = new RamStore(shardId, settings, null); Store store = new RamStore(shardId, settings, null);
MemoryTranslog translog = new MemoryTranslog(shardId, settings); Translog translog = new FsTranslog(shardId, EMPTY_SETTINGS, new File("work/fs-translog"), false);
Engine engine = new RobinEngine(shardId, settings, store, policy, translog, Engine engine = new RobinEngine(shardId, settings, store, policy, translog,
new LogByteSizeMergePolicyProvider(store), new SerialMergeSchedulerProvider(shardId, settings), new LogByteSizeMergePolicyProvider(store), new SerialMergeSchedulerProvider(shardId, settings),
analysisService, new SimilarityService(shardId.index())); analysisService, new SimilarityService(shardId.index()));

View File

@ -42,7 +42,7 @@ public abstract class AbstractSimpleTranslogTests {
@BeforeMethod public void setUp() { @BeforeMethod public void setUp() {
translog = create(); translog = create();
translog.newTranslog(0); translog.newTranslog();
} }
@AfterMethod public void tearDown() { @AfterMethod public void tearDown() {
@ -99,7 +99,7 @@ public abstract class AbstractSimpleTranslogTests {
snapshot.release(); snapshot.release();
long firstId = translog.currentId(); long firstId = translog.currentId();
translog.newTranslog(1); translog.newTranslog();
assertThat(translog.currentId(), Matchers.not(equalTo(firstId))); assertThat(translog.currentId(), Matchers.not(equalTo(firstId)));
snapshot = translog.snapshot(); snapshot = translog.snapshot();
@ -152,7 +152,7 @@ public abstract class AbstractSimpleTranslogTests {
translog.add(new Translog.Index("test", "2", new byte[]{2})); translog.add(new Translog.Index("test", "2", new byte[]{2}));
translog.newTranslog(2); translog.newTranslog();
translog.add(new Translog.Index("test", "3", new byte[]{3})); translog.add(new Translog.Index("test", "3", new byte[]{3}));

View File

@ -1,35 +0,0 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.translog.memory;
import org.elasticsearch.index.translog.AbstractSimpleTranslogTests;
import org.elasticsearch.index.translog.Translog;
import static org.elasticsearch.common.settings.ImmutableSettings.Builder.*;
/**
* @author kimchy (shay.banon)
*/
public class MemorySimpleTranslogTests extends AbstractSimpleTranslogTests {
@Override protected Translog create() {
return new MemoryTranslog(shardId, EMPTY_SETTINGS);
}
}

View File

@ -39,7 +39,7 @@ import static org.hamcrest.MatcherAssert.*;
import static org.hamcrest.Matchers.*; import static org.hamcrest.Matchers.*;
/** /**
* @author kimchy (Shay Banon) * @author kimchy (shay.banon)
*/ */
public abstract class AbstractSimpleIndexGatewayTests extends AbstractNodesTests { public abstract class AbstractSimpleIndexGatewayTests extends AbstractNodesTests {
@ -82,6 +82,11 @@ public abstract class AbstractSimpleIndexGatewayTests extends AbstractNodesTests
client("server1").index(Requests.indexRequest("test").type("type1").id("1").source(source("1", "test"))).actionGet(); client("server1").index(Requests.indexRequest("test").type("type1").id("1").source(source("1", "test"))).actionGet();
logger.info("Indexing #2"); logger.info("Indexing #2");
client("server1").index(Requests.indexRequest("test").type("type1").id("2").source(source("2", "test"))).actionGet(); client("server1").index(Requests.indexRequest("test").type("type1").id("2").source(source("2", "test"))).actionGet();
// perform snapshot to the index
logger.info("Gateway Snapshot");
client("server1").admin().indices().gatewaySnapshot(gatewaySnapshotRequest("test")).actionGet();
logger.info("Deleting #1"); logger.info("Deleting #1");
client("server1").delete(deleteRequest("test").type("type1").id("1")).actionGet(); client("server1").delete(deleteRequest("test").type("type1").id("1")).actionGet();

View File

@ -107,7 +107,7 @@ public class AbstarctS3BlobContainer extends AbstractBlobContainer {
} }
for (S3ObjectSummary summary : list.getObjectSummaries()) { for (S3ObjectSummary summary : list.getObjectSummaries()) {
String name = summary.getKey().substring(keyPath.length()); String name = summary.getKey().substring(keyPath.length());
blobsBuilder.put(name, new PlainBlobMetaData(name, summary.getSize(), null)); blobsBuilder.put(name, new PlainBlobMetaData(name, summary.getSize()));
} }
if (list.isTruncated()) { if (list.isTruncated()) {
prevListing = list; prevListing = list;

View File

@ -22,11 +22,9 @@ package org.elasticsearch.cloud.aws.blobstore;
import com.amazonaws.services.s3.AmazonS3; import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.ObjectListing; import com.amazonaws.services.s3.model.ObjectListing;
import com.amazonaws.services.s3.model.S3ObjectSummary; import com.amazonaws.services.s3.model.S3ObjectSummary;
import org.elasticsearch.common.blobstore.AppendableBlobContainer;
import org.elasticsearch.common.blobstore.BlobPath; import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.BlobStore; import org.elasticsearch.common.blobstore.BlobStore;
import org.elasticsearch.common.blobstore.ImmutableBlobContainer; import org.elasticsearch.common.blobstore.ImmutableBlobContainer;
import org.elasticsearch.common.blobstore.support.ImmutableAppendableBlobContainer;
import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeUnit;
@ -92,10 +90,6 @@ public class S3BlobStore extends AbstractComponent implements BlobStore {
return new S3ImmutableBlobContainer(path, this); return new S3ImmutableBlobContainer(path, this);
} }
@Override public AppendableBlobContainer appendableBlobContainer(BlobPath path) {
return new ImmutableAppendableBlobContainer(immutableBlobContainer(path));
}
@Override public void delete(BlobPath path) { @Override public void delete(BlobPath path) {
ObjectListing prevListing = null; ObjectListing prevListing = null;
while (true) { while (true) {

View File

@ -53,7 +53,7 @@ public abstract class AbstractHdfsBlobContainer extends AbstractBlobContainer {
} }
ImmutableMap.Builder<String, BlobMetaData> builder = ImmutableMap.builder(); ImmutableMap.Builder<String, BlobMetaData> builder = ImmutableMap.builder();
for (FileStatus file : files) { for (FileStatus file : files) {
builder.put(file.getPath().getName(), new PlainBlobMetaData(file.getPath().getName(), file.getLen(), null)); builder.put(file.getPath().getName(), new PlainBlobMetaData(file.getPath().getName(), file.getLen()));
} }
return builder.build(); return builder.build();
} }
@ -69,7 +69,7 @@ public abstract class AbstractHdfsBlobContainer extends AbstractBlobContainer {
} }
ImmutableMap.Builder<String, BlobMetaData> builder = ImmutableMap.builder(); ImmutableMap.Builder<String, BlobMetaData> builder = ImmutableMap.builder();
for (FileStatus file : files) { for (FileStatus file : files) {
builder.put(file.getPath().getName(), new PlainBlobMetaData(file.getPath().getName(), file.getLen(), null)); builder.put(file.getPath().getName(), new PlainBlobMetaData(file.getPath().getName(), file.getLen()));
} }
return builder.build(); return builder.build();
} }

View File

@ -1,81 +0,0 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.blobstore.hdfs;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.elasticsearch.common.blobstore.AppendableBlobContainer;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.io.stream.DataOutputStreamOutput;
import java.io.IOException;
/**
* @author kimchy (shay.banon)
*/
public class HdfsAppendableBlobContainer extends AbstractHdfsBlobContainer implements AppendableBlobContainer {
public HdfsAppendableBlobContainer(HdfsBlobStore blobStore, BlobPath blobPath, Path path) {
super(blobStore, blobPath, path);
}
@Override public AppendableBlob appendBlob(String blobName) throws IOException {
return new HdfsAppendableBlob(new Path(path, blobName));
}
private class HdfsAppendableBlob implements AppendableBlob {
private final Path file;
private final FSDataOutputStream fsDataStream;
private final DataOutputStreamOutput out;
public HdfsAppendableBlob(Path file) throws IOException {
this.file = file;
this.fsDataStream = blobStore.fileSystem().create(file, true);
this.out = new DataOutputStreamOutput(fsDataStream);
}
@Override public void append(final AppendBlobListener listener) {
blobStore.executor().execute(new Runnable() {
@Override public void run() {
try {
listener.withStream(out);
out.flush();
fsDataStream.flush();
fsDataStream.sync();
listener.onCompleted();
} catch (IOException e) {
listener.onFailure(e);
}
}
});
}
@Override public void close() {
try {
fsDataStream.close();
} catch (IOException e) {
// ignore
}
}
}
}

View File

@ -21,7 +21,6 @@ package org.elasticsearch.common.blobstore.hdfs;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.elasticsearch.common.blobstore.AppendableBlobContainer;
import org.elasticsearch.common.blobstore.BlobPath; import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.BlobStore; import org.elasticsearch.common.blobstore.BlobStore;
import org.elasticsearch.common.blobstore.ImmutableBlobContainer; import org.elasticsearch.common.blobstore.ImmutableBlobContainer;
@ -81,10 +80,6 @@ public class HdfsBlobStore implements BlobStore {
return new HdfsImmutableBlobContainer(this, path, buildAndCreate(path)); return new HdfsImmutableBlobContainer(this, path, buildAndCreate(path));
} }
@Override public AppendableBlobContainer appendableBlobContainer(BlobPath path) {
return new HdfsAppendableBlobContainer(this, path, buildAndCreate(path));
}
@Override public void delete(BlobPath path) { @Override public void delete(BlobPath path) {
try { try {
fileSystem.delete(buildPath(path), true); fileSystem.delete(buildPath(path), true);