Internal: Improve checksum process by bulk writing them into a single file instead of checksum file per index file, closes #747.
This commit is contained in:
parent
b629d36d8b
commit
4b92928c77
|
@ -25,6 +25,7 @@
|
|||
<w>charfilter</w>
|
||||
<w>charsets</w>
|
||||
<w>checksum</w>
|
||||
<w>checksums</w>
|
||||
<w>chunking</w>
|
||||
<w>closeable</w>
|
||||
<w>cloudfiles</w>
|
||||
|
|
|
@ -20,8 +20,6 @@
|
|||
package org.elasticsearch.index.store.support;
|
||||
|
||||
import org.apache.lucene.store.*;
|
||||
import org.elasticsearch.common.Digest;
|
||||
import org.elasticsearch.common.Hex;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.Unicode;
|
||||
import org.elasticsearch.common.collect.ImmutableMap;
|
||||
|
@ -38,8 +36,10 @@ import org.elasticsearch.index.store.StoreFileMetaData;
|
|||
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.security.MessageDigest;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.zip.CRC32;
|
||||
import java.util.zip.Checksum;
|
||||
|
||||
/**
|
||||
* @author kimchy (shay.banon)
|
||||
|
@ -101,6 +101,59 @@ public abstract class AbstractStore extends AbstractIndexShardComponent implemen
|
|||
return Directories.estimateSize(directory());
|
||||
}
|
||||
|
||||
public static Map<String, String> readChecksums(Directory dir) throws IOException {
|
||||
long lastFound = -1;
|
||||
for (String name : dir.listAll()) {
|
||||
if (!name.startsWith("_checksums-")) {
|
||||
continue;
|
||||
}
|
||||
long current = Long.parseLong(name.substring("_checksums-".length()));
|
||||
if (current > lastFound) {
|
||||
lastFound = current;
|
||||
}
|
||||
}
|
||||
if (lastFound == -1) {
|
||||
return ImmutableMap.of();
|
||||
}
|
||||
IndexInput indexInput = dir.openInput("_checksums-" + lastFound);
|
||||
try {
|
||||
indexInput.readInt(); // version
|
||||
return indexInput.readStringStringMap();
|
||||
} finally {
|
||||
indexInput.close();
|
||||
}
|
||||
}
|
||||
|
||||
public void writeChecksums() throws IOException {
|
||||
writeChecksums((StoreDirectory) directory());
|
||||
}
|
||||
|
||||
private void writeChecksums(StoreDirectory dir) throws IOException {
|
||||
String checksumName = "_checksums-" + System.currentTimeMillis();
|
||||
ImmutableMap<String, StoreFileMetaData> files = list();
|
||||
synchronized (mutex) {
|
||||
Map<String, String> checksums = new HashMap<String, String>();
|
||||
for (StoreFileMetaData metaData : files.values()) {
|
||||
if (metaData.checksum() != null) {
|
||||
checksums.put(metaData.name(), metaData.checksum());
|
||||
}
|
||||
}
|
||||
IndexOutput output = dir.createOutput(checksumName, false);
|
||||
output.writeInt(0); // version
|
||||
output.writeStringStringMap(checksums);
|
||||
output.close();
|
||||
}
|
||||
for (StoreFileMetaData metaData : files.values()) {
|
||||
if (metaData.name().startsWith("_checksums") && !checksumName.equals(metaData.name())) {
|
||||
try {
|
||||
directory().deleteFile(metaData.name());
|
||||
} catch (Exception e) {
|
||||
// ignore
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns <tt>true</tt> by default.
|
||||
*/
|
||||
|
@ -117,16 +170,12 @@ public abstract class AbstractStore extends AbstractIndexShardComponent implemen
|
|||
}
|
||||
|
||||
@Override public void writeChecksum(String name, String checksum) throws IOException {
|
||||
// write the checksum (using the delegate, so we won't checksum this one as well...)
|
||||
IndexOutput checkSumOutput = ((StoreDirectory) directory()).delegate().createOutput(name + ".cks");
|
||||
byte[] checksumBytes = Unicode.fromStringAsBytes(checksum);
|
||||
checkSumOutput.writeBytes(checksumBytes, checksumBytes.length);
|
||||
checkSumOutput.close();
|
||||
// update the metadata to include the checksum
|
||||
// update the metadata to include the checksum and write a new checksums file
|
||||
synchronized (mutex) {
|
||||
StoreFileMetaData metaData = filesMetadata.get(name);
|
||||
metaData = new StoreFileMetaData(metaData.name(), metaData.length(), metaData.lastModified(), checksum);
|
||||
filesMetadata = MapBuilder.newMapBuilder(filesMetadata).put(name, metaData).immutableMap();
|
||||
writeChecksums();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -140,23 +189,28 @@ public abstract class AbstractStore extends AbstractIndexShardComponent implemen
|
|||
StoreDirectory(Directory delegate) throws IOException {
|
||||
this.delegate = delegate;
|
||||
synchronized (mutex) {
|
||||
Map<String, String> checksums = readChecksums(delegate);
|
||||
MapBuilder<String, StoreFileMetaData> builder = MapBuilder.newMapBuilder();
|
||||
for (String file : delegate.listAll()) {
|
||||
// BACKWARD CKS SUPPORT
|
||||
if (file.endsWith(".cks")) { // ignore checksum files here
|
||||
continue;
|
||||
}
|
||||
// try and load the checksum for the file
|
||||
String checksum = null;
|
||||
if (delegate.fileExists(file + ".cks")) {
|
||||
IndexInput indexInput = delegate.openInput(file + ".cks");
|
||||
try {
|
||||
if (indexInput.length() > 0) {
|
||||
byte[] checksumBytes = new byte[(int) indexInput.length()];
|
||||
indexInput.readBytes(checksumBytes, 0, checksumBytes.length, false);
|
||||
checksum = Unicode.fromBytes(checksumBytes);
|
||||
String checksum = checksums.get(file);
|
||||
|
||||
// BACKWARD CKS SUPPORT
|
||||
if (checksum == null) {
|
||||
if (delegate.fileExists(file + ".cks")) {
|
||||
IndexInput indexInput = delegate.openInput(file + ".cks");
|
||||
try {
|
||||
if (indexInput.length() > 0) {
|
||||
byte[] checksumBytes = new byte[(int) indexInput.length()];
|
||||
indexInput.readBytes(checksumBytes, 0, checksumBytes.length, false);
|
||||
checksum = Unicode.fromBytes(checksumBytes);
|
||||
}
|
||||
} finally {
|
||||
indexInput.close();
|
||||
}
|
||||
} finally {
|
||||
indexInput.close();
|
||||
}
|
||||
}
|
||||
builder.put(file, new StoreFileMetaData(file, delegate.fileLength(file), delegate.fileModified(file), checksum));
|
||||
|
@ -203,11 +257,6 @@ public abstract class AbstractStore extends AbstractIndexShardComponent implemen
|
|||
|
||||
@Override public void deleteFile(String name) throws IOException {
|
||||
delegate.deleteFile(name);
|
||||
try {
|
||||
delegate.deleteFile(name + ".cks");
|
||||
} catch (Exception e) {
|
||||
// ignore
|
||||
}
|
||||
synchronized (mutex) {
|
||||
filesMetadata = MapBuilder.newMapBuilder(filesMetadata).remove(name).immutableMap();
|
||||
files = filesMetadata.keySet().toArray(new String[filesMetadata.size()]);
|
||||
|
@ -232,14 +281,6 @@ public abstract class AbstractStore extends AbstractIndexShardComponent implemen
|
|||
|
||||
public IndexOutput createOutput(String name, boolean computeChecksum) throws IOException {
|
||||
IndexOutput out = delegate.createOutput(name);
|
||||
// delete the relevant cks file for an existing file, if exists
|
||||
if (filesMetadata.containsKey(name)) {
|
||||
try {
|
||||
delegate.deleteFile(name + ".cks");
|
||||
} catch (Exception e) {
|
||||
// ignore
|
||||
}
|
||||
}
|
||||
synchronized (mutex) {
|
||||
StoreFileMetaData metaData = new StoreFileMetaData(name, -1, -1, null);
|
||||
filesMetadata = MapBuilder.newMapBuilder(filesMetadata).put(name, metaData).immutableMap();
|
||||
|
@ -288,6 +329,10 @@ public abstract class AbstractStore extends AbstractIndexShardComponent implemen
|
|||
if (sync) {
|
||||
delegate.sync(name);
|
||||
}
|
||||
// write the checksums file when we sync on the segments file (committed)
|
||||
if (!name.equals("segments.gen") && name.startsWith("segments")) {
|
||||
writeChecksums();
|
||||
}
|
||||
}
|
||||
|
||||
@Override public void forceSync(String name) throws IOException {
|
||||
|
@ -301,7 +346,7 @@ public abstract class AbstractStore extends AbstractIndexShardComponent implemen
|
|||
|
||||
private final String name;
|
||||
|
||||
private final MessageDigest digest;
|
||||
private final Checksum digest;
|
||||
|
||||
StoreIndexOutput(IndexOutput delegate, String name, boolean computeChecksum) {
|
||||
this.delegate = delegate;
|
||||
|
@ -315,7 +360,7 @@ public abstract class AbstractStore extends AbstractIndexShardComponent implemen
|
|||
// and since we, in any case, always recover the segments files
|
||||
this.digest = null;
|
||||
} else {
|
||||
this.digest = Digest.getMd5Digest();
|
||||
this.digest = new CRC32();
|
||||
}
|
||||
} else {
|
||||
this.digest = null;
|
||||
|
@ -326,11 +371,7 @@ public abstract class AbstractStore extends AbstractIndexShardComponent implemen
|
|||
delegate.close();
|
||||
String checksum = null;
|
||||
if (digest != null) {
|
||||
checksum = Hex.encodeHexString(digest.digest());
|
||||
IndexOutput checkSumOutput = ((StoreDirectory) directory()).delegate().createOutput(name + ".cks");
|
||||
byte[] checksumBytes = Unicode.fromStringAsBytes(checksum);
|
||||
checkSumOutput.writeBytes(checksumBytes, checksumBytes.length);
|
||||
checkSumOutput.close();
|
||||
checksum = Long.toString(digest.getValue(), Character.MAX_RADIX);
|
||||
}
|
||||
synchronized (mutex) {
|
||||
StoreFileMetaData md = new StoreFileMetaData(name, directory().fileLength(name), directory().fileModified(name), checksum);
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
|
||||
package org.elasticsearch.indices.store;
|
||||
|
||||
import org.apache.lucene.store.FSDirectory;
|
||||
import org.elasticsearch.ElasticSearchException;
|
||||
import org.elasticsearch.action.ActionFuture;
|
||||
import org.elasticsearch.action.FailedNodeException;
|
||||
|
@ -44,6 +45,7 @@ import org.elasticsearch.index.service.IndexService;
|
|||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.index.shard.service.InternalIndexShard;
|
||||
import org.elasticsearch.index.store.StoreFileMetaData;
|
||||
import org.elasticsearch.index.store.support.AbstractStore;
|
||||
import org.elasticsearch.indices.IndicesService;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
|
@ -162,10 +164,32 @@ public class TransportNodesListShardStoreMetaData extends TransportNodesOperatio
|
|||
return new StoreFilesMetaData(false, shardId, ImmutableMap.<String, StoreFileMetaData>of());
|
||||
}
|
||||
Map<String, StoreFileMetaData> files = Maps.newHashMap();
|
||||
// read the checksums file
|
||||
FSDirectory directory = FSDirectory.open(indexFile);
|
||||
try {
|
||||
Map<String, String> checksums = AbstractStore.readChecksums(directory);
|
||||
for (File file : indexFile.listFiles()) {
|
||||
// BACKWARD CKS SUPPORT
|
||||
if (file.getName().endsWith(".cks")) {
|
||||
continue;
|
||||
}
|
||||
if (file.getName().startsWith("_checksums")) {
|
||||
continue;
|
||||
}
|
||||
files.put(file.getName(), new StoreFileMetaData(file.getName(), file.length(), file.lastModified(), checksums.get(file.getName())));
|
||||
}
|
||||
} finally {
|
||||
directory.close();
|
||||
}
|
||||
|
||||
// BACKWARD CKS SUPPORT
|
||||
for (File file : indexFile.listFiles()) {
|
||||
if (file.getName().endsWith(".cks")) {
|
||||
continue;
|
||||
}
|
||||
if (file.getName().startsWith("_checksums")) {
|
||||
continue;
|
||||
}
|
||||
// try and load the checksum
|
||||
String checksum = null;
|
||||
File checksumFile = new File(file.getParentFile(), file.getName() + ".cks");
|
||||
|
@ -177,6 +201,7 @@ public class TransportNodesListShardStoreMetaData extends TransportNodesOperatio
|
|||
}
|
||||
files.put(file.getName(), new StoreFileMetaData(file.getName(), file.length(), file.lastModified(), checksum));
|
||||
}
|
||||
|
||||
return new StoreFilesMetaData(false, shardId, files);
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue