Possible (rare) shard index corruption / different doc count on recovery (gateway / shard), closes #466.

This commit is contained in:
kimchy 2010-11-01 23:00:16 +02:00
parent 49439a09e2
commit 4ff1b429f1
18 changed files with 570 additions and 70 deletions

View File

@ -0,0 +1,65 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.benchmark.checksum;
import org.elasticsearch.common.Digest;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import java.security.MessageDigest;
import java.util.zip.CRC32;
/**
* @author kimchy (shay.banon)
*/
public class ChecksumBenchmarkTest {
public static final int BATCH_SIZE = 16 * 1024;
public static void main(String[] args) {
long dataSize = ByteSizeValue.parseBytesSizeValue("1g", null).bytes();
crc(dataSize);
md5(dataSize);
}
private static void crc(long dataSize) {
long start = System.currentTimeMillis();
CRC32 crc = new CRC32();
byte[] data = new byte[BATCH_SIZE];
long iter = dataSize / BATCH_SIZE;
for (long i = 0; i < iter; i++) {
crc.update(data);
}
crc.getValue();
System.out.println("CRC took " + new TimeValue(System.currentTimeMillis() - start));
}
private static void md5(long dataSize) {
long start = System.currentTimeMillis();
byte[] data = new byte[BATCH_SIZE];
long iter = dataSize / BATCH_SIZE;
MessageDigest digest = Digest.getMd5Digest();
for (long i = 0; i < iter; i++) {
digest.update(data);
}
digest.digest();
System.out.println("md5 took " + new TimeValue(System.currentTimeMillis() - start));
}
}

View File

@ -174,27 +174,15 @@ public class BlobReuseExistingNodeAllocation extends NodeAllocation {
break;
}
if (logger.isTraceEnabled()) {
StringBuilder sb = new StringBuilder(shard + ": checking for pre_allocation (gateway) on node " + discoNode + "\n");
sb.append(" gateway_files:\n");
for (CommitPoint.FileInfo fileInfo : commitPoint.indexFiles()) {
sb.append(" [").append(fileInfo.name()).append("]/[").append(fileInfo.physicalName()).append("], size [").append(new ByteSizeValue(fileInfo.length())).append("]\n");
}
sb.append(" node_files:\n");
for (StoreFileMetaData md : storeFilesMetaData) {
sb.append(" [").append(md.name()).append("], size [").append(new ByteSizeValue(md.length())).append("]\n");
}
logger.trace(sb.toString());
}
long sizeMatched = 0;
for (StoreFileMetaData storeFileMetaData : storeFilesMetaData) {
CommitPoint.FileInfo fileInfo = commitPoint.findPhysicalIndexFile(storeFileMetaData.name());
if (fileInfo != null) {
if (fileInfo.length() == storeFileMetaData.length()) {
logger.trace("{}: [{}] reusing file since it exists on remote node and on gateway with size [{}]", shard, storeFileMetaData.name(), new ByteSizeValue(storeFileMetaData.length()));
if (fileInfo.isSame(storeFileMetaData)) {
logger.trace("{}: [{}] reusing file since it exists on remote node and on gateway", shard, storeFileMetaData.name());
sizeMatched += storeFileMetaData.length();
} else {
logger.trace("{}: [{}] ignore file since it exists on remote node and on gateway but has different size, remote node [{}], gateway [{}]", shard, storeFileMetaData.name(), storeFileMetaData.length(), fileInfo.length());
logger.trace("{}: [{}] ignore file since it exists on remote node and on gateway but is different", shard, storeFileMetaData.name());
}
} else {
logger.trace("{}: [{}] exists on remote node, does not exists on gateway", shard, storeFileMetaData.name());
@ -224,7 +212,7 @@ public class BlobReuseExistingNodeAllocation extends NodeAllocation {
long sizeMatched = 0;
for (StoreFileMetaData storeFileMetaData : storeFilesMetaData) {
if (primaryNodeStore.fileExists(storeFileMetaData.name()) && primaryNodeStore.file(storeFileMetaData.name()).length() == storeFileMetaData.length()) {
if (primaryNodeStore.fileExists(storeFileMetaData.name()) && primaryNodeStore.file(storeFileMetaData.name()).isSame(storeFileMetaData)) {
sizeMatched += storeFileMetaData.length();
}
}

View File

@ -315,7 +315,7 @@ public class LocalGatewayNodeAllocation extends NodeAllocation {
long sizeMatched = 0;
for (StoreFileMetaData storeFileMetaData : storeFilesMetaData) {
if (primaryNodeStore.fileExists(storeFileMetaData.name()) && primaryNodeStore.file(storeFileMetaData.name()).length() == storeFileMetaData.length()) {
if (primaryNodeStore.fileExists(storeFileMetaData.name()) && primaryNodeStore.file(storeFileMetaData.name()).isSame(storeFileMetaData)) {
sizeMatched += storeFileMetaData.length();
}
}

View File

@ -20,7 +20,9 @@
package org.elasticsearch.index.gateway;
import org.elasticsearch.common.collect.ImmutableList;
import org.elasticsearch.index.store.StoreFileMetaData;
import javax.annotation.Nullable;
import java.util.List;
/**
@ -34,11 +36,13 @@ public class CommitPoint {
private final String name;
private final String physicalName;
private final long length;
private final String checksum;
public FileInfo(String name, String physicalName, long length) {
public FileInfo(String name, String physicalName, long length, String checksum) {
this.name = name;
this.physicalName = physicalName;
this.length = length;
this.checksum = checksum;
}
public String name() {
@ -52,6 +56,17 @@ public class CommitPoint {
public long length() {
return length;
}
@Nullable public String checksum() {
return checksum;
}
public boolean isSame(StoreFileMetaData md) {
if (checksum != null && md.checksum() != null) {
return checksum.equals(md.checksum());
}
return length == md.length();
}
}
public static enum Type {

View File

@ -97,6 +97,9 @@ public class CommitPoints implements Iterable<CommitPoint> {
builder.startObject(fileInfo.name());
builder.field("physical_name", fileInfo.physicalName());
builder.field("length", fileInfo.length());
if (fileInfo.checksum() != null) {
builder.field("checksum", fileInfo.checksum());
}
builder.endObject();
}
builder.endObject();
@ -147,6 +150,7 @@ public class CommitPoints implements Iterable<CommitPoint> {
String fileName = currentFieldName;
String physicalName = null;
long size = -1;
String checksum = null;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
@ -155,6 +159,8 @@ public class CommitPoints implements Iterable<CommitPoint> {
physicalName = parser.text();
} else if ("length".equals(currentFieldName)) {
size = parser.longValue();
} else if ("checksum".equals(currentFieldName)) {
checksum = parser.text();
}
}
}
@ -164,7 +170,7 @@ public class CommitPoints implements Iterable<CommitPoint> {
if (size == -1) {
throw new IOException("Malformed commit, missing length for [" + fileName + "]");
}
files.add(new CommitPoint.FileInfo(fileName, physicalName, size));
files.add(new CommitPoint.FileInfo(fileName, physicalName, size, checksum));
}
}
} else if (token.isValue()) {

View File

@ -43,6 +43,7 @@ import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.index.shard.service.InternalIndexShard;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.store.StoreFileMetaData;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogStreams;
import org.elasticsearch.threadpool.ThreadPool;
@ -181,9 +182,9 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo
int indexNumberOfFiles = 0;
long indexTotalFilesSize = 0;
for (final String fileName : snapshotIndexCommit.getFiles()) {
long fileLength = 0;
StoreFileMetaData md;
try {
fileLength = store.directory().fileLength(fileName);
md = store.metaData(fileName);
} catch (IOException e) {
throw new IndexShardGatewaySnapshotFailedException(shardId, "Failed to get store file metadata", e);
}
@ -194,17 +195,17 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo
}
CommitPoint.FileInfo fileInfo = commitPoints.findPhysicalIndexFile(fileName);
if (fileInfo == null || fileInfo.length() != fileLength || !commitPointFileExistsInBlobs(fileInfo, blobs)) {
if (fileInfo == null || !fileInfo.isSame(md) || !commitPointFileExistsInBlobs(fileInfo, blobs)) {
// commit point file does not exists in any commit point, or has different length, or does not fully exists in the listed blobs
snapshotRequired = true;
}
if (snapshotRequired) {
indexNumberOfFiles++;
indexTotalFilesSize += fileLength;
indexTotalFilesSize += md.length();
// create a new FileInfo
try {
CommitPoint.FileInfo snapshotFileInfo = new CommitPoint.FileInfo(fileNameFromGeneration(++generation), fileName, fileLength);
CommitPoint.FileInfo snapshotFileInfo = new CommitPoint.FileInfo(fileNameFromGeneration(++generation), fileName, md.length(), md.checksum());
indexCommitPointFiles.add(snapshotFileInfo);
snapshotFile(snapshotIndexCommit.getDirectory(), snapshotFileInfo, indexLatch, failures);
} catch (IOException e) {
@ -280,7 +281,7 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo
currentSnapshotStatus.translog().expectedNumberOfOperations(expectedNumberOfOperations);
if (snapshotRequired) {
CommitPoint.FileInfo addedTranslogFileInfo = new CommitPoint.FileInfo(fileNameFromGeneration(++generation), "translog-" + translogSnapshot.translogId(), translogSnapshot.lengthInBytes());
CommitPoint.FileInfo addedTranslogFileInfo = new CommitPoint.FileInfo(fileNameFromGeneration(++generation), "translog-" + translogSnapshot.translogId(), translogSnapshot.lengthInBytes(), null /* no need for checksum in translog */);
translogCommitPointFiles.add(addedTranslogFileInfo);
try {
snapshotTranslog(translogSnapshot, addedTranslogFileInfo);
@ -520,26 +521,26 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo
List<CommitPoint.FileInfo> filesToRecover = Lists.newArrayList();
for (CommitPoint.FileInfo fileInfo : commitPoint.indexFiles()) {
String fileName = fileInfo.physicalName();
long fileLength = -1;
StoreFileMetaData md = null;
try {
fileLength = store.directory().fileLength(fileName);
md = store.metaData(fileName);
} catch (Exception e) {
// no file
}
if (!fileName.contains("segment") && fileLength == fileInfo.length()) {
if (!fileName.contains("segment") && md != null && fileInfo.isSame(md)) {
numberOfFiles++;
totalSize += fileLength;
totalSize += md.length();
numberOfReusedFiles++;
reusedTotalSize += fileLength;
reusedTotalSize += md.length();
if (logger.isTraceEnabled()) {
logger.trace("not_recovering [{}], exists in local store and has same length [{}]", fileInfo.physicalName(), fileInfo.length());
logger.trace("not_recovering [{}], exists in local store and is same", fileInfo.physicalName());
}
} else {
if (logger.isTraceEnabled()) {
if (fileLength == -1) {
if (md == null) {
logger.trace("recovering [{}], does not exists in local store", fileInfo.physicalName());
} else {
logger.trace("recovering [{}], exists in local store but has different length: gateway [{}], local [{}]", fileInfo.physicalName(), fileInfo.length(), fileLength);
logger.trace("recovering [{}], exists in local store but is different", fileInfo.physicalName());
}
}
numberOfFiles++;
@ -591,12 +592,12 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo
if (!commitPoint.containPhysicalIndexFile(storeFile)) {
try {
store.directory().deleteFile(storeFile);
} catch (IOException e) {
} catch (Exception e) {
// ignore
}
}
}
} catch (IOException e) {
} catch (Exception e) {
// ignore
}
}
@ -604,7 +605,9 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo
private void recoverFile(final CommitPoint.FileInfo fileInfo, final ImmutableMap<String, BlobMetaData> blobs, final CountDownLatch latch, final List<Throwable> failures) {
final IndexOutput indexOutput;
try {
indexOutput = store.directory().createOutput(fileInfo.physicalName());
// we create an output with no checksum, this is because the pure binary data of the file is not
// the checksum (because of seek). We will create the checksum file once copying is done
indexOutput = store.createOutputWithNoChecksum(fileInfo.physicalName());
} catch (IOException e) {
failures.add(e);
latch.countDown();
@ -641,6 +644,11 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo
// we are done...
try {
indexOutput.close();
// write the checksum
if (fileInfo.checksum() != null) {
store.writeChecksum(fileInfo.physicalName(), fileInfo.checksum());
}
store.directory().sync(fileInfo.physicalName());
} catch (IOException e) {
onFailure(e);
return;

View File

@ -81,7 +81,7 @@ public class LocalIndexShardGateway extends AbstractIndexShardComponent implemen
}
@Override public void recover(RecoveryStatus recoveryStatus) throws IndexShardGatewayRecoveryException {
recoveryStatus().index().startTime(System.currentTimeMillis());
recoveryStatus.index().startTime(System.currentTimeMillis());
long version = -1;
try {
if (IndexReader.indexExists(indexShard.store().directory())) {

View File

@ -24,6 +24,7 @@ import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.index.shard.ShardId;
import javax.annotation.Nullable;
import java.io.IOException;
/**
@ -35,17 +36,19 @@ class RecoveryFileChunkRequest implements Streamable {
private String name;
private long position;
private long length;
private String checksum;
private byte[] content;
private int contentLength;
RecoveryFileChunkRequest() {
}
RecoveryFileChunkRequest(ShardId shardId, String name, long position, long length, byte[] content, int contentLength) {
RecoveryFileChunkRequest(ShardId shardId, String name, long position, long length, String checksum, byte[] content, int contentLength) {
this.shardId = shardId;
this.name = name;
this.position = position;
this.length = length;
this.checksum = checksum;
this.content = content;
this.contentLength = contentLength;
}
@ -62,6 +65,10 @@ class RecoveryFileChunkRequest implements Streamable {
return position;
}
@Nullable public String checksum() {
return this.checksum;
}
public long length() {
return length;
}
@ -85,6 +92,9 @@ class RecoveryFileChunkRequest implements Streamable {
name = in.readUTF();
position = in.readVLong();
length = in.readVLong();
if (in.readBoolean()) {
checksum = in.readUTF();
}
contentLength = in.readVInt();
content = new byte[contentLength];
in.readFully(content);
@ -95,6 +105,12 @@ class RecoveryFileChunkRequest implements Streamable {
out.writeUTF(name);
out.writeVLong(position);
out.writeVLong(length);
if (checksum == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
out.writeUTF(checksum);
}
out.writeVInt(contentLength);
out.writeBytes(content, 0, contentLength);
}

View File

@ -35,6 +35,7 @@ import org.elasticsearch.index.shard.IllegalIndexShardStateException;
import org.elasticsearch.index.shard.IndexShardClosedException;
import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.shard.service.InternalIndexShard;
import org.elasticsearch.index.store.StoreFileMetaData;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.threadpool.ThreadPool;
@ -96,28 +97,28 @@ public class RecoverySource extends AbstractComponent {
StopWatch stopWatch = new StopWatch().start();
for (String name : snapshot.getFiles()) {
long length = shard.store().directory().fileLength(name);
StoreFileMetaData md = shard.store().metaData(name);
boolean useExisting = false;
if (request.existingFiles().containsKey(name)) {
if (!name.contains("segment") && length == request.existingFiles().get(name).length()) {
if (!name.contains("segment") && md.isSame(request.existingFiles().get(name))) {
response.phase1ExistingFileNames.add(name);
response.phase1ExistingFileSizes.add(length);
existingTotalSize += length;
response.phase1ExistingFileSizes.add(md.length());
existingTotalSize += md.length();
useExisting = true;
if (logger.isTraceEnabled()) {
logger.trace("[{}][{}] recovery [phase1] to {}: not recovering [{}], exists in local store and has size [{}]", request.shardId().index().name(), request.shardId().id(), request.targetNode(), name, length);
logger.trace("[{}][{}] recovery [phase1] to {}: not recovering [{}], exists in local store and has checksum [{}], size [{}]", request.shardId().index().name(), request.shardId().id(), request.targetNode(), name, md.checksum(), md.length());
}
}
}
if (!useExisting) {
if (request.existingFiles().containsKey(name)) {
logger.trace("[{}][{}] recovery [phase1] to {}: recovering [{}], exists in local store, but has different length: remote [{}], local [{}]", request.shardId().index().name(), request.shardId().id(), request.targetNode(), name, request.existingFiles().get(name).length(), length);
logger.trace("[{}][{}] recovery [phase1] to {}: recovering [{}], exists in local store, but is different: remote [{}], local [{}]", request.shardId().index().name(), request.shardId().id(), request.targetNode(), name, request.existingFiles().get(name), md);
} else {
logger.trace("[{}][{}] recovery [phase1] to {}: recovering [{}], does not exists in remote", request.shardId().index().name(), request.shardId().id(), request.targetNode(), name);
}
response.phase1FileNames.add(name);
response.phase1FileSizes.add(length);
totalSize += length;
response.phase1FileSizes.add(md.length());
totalSize += md.length();
}
}
response.phase1TotalSize = totalSize;
@ -138,6 +139,7 @@ public class RecoverySource extends AbstractComponent {
try {
final int BUFFER_SIZE = (int) fileChunkSize.bytes();
byte[] buf = new byte[BUFFER_SIZE];
StoreFileMetaData md = shard.store().metaData(name);
indexInput = snapshot.getDirectory().openInput(name);
long len = indexInput.length();
long readCount = 0;
@ -148,7 +150,7 @@ public class RecoverySource extends AbstractComponent {
int toRead = readCount + BUFFER_SIZE > len ? (int) (len - readCount) : BUFFER_SIZE;
long position = indexInput.getFilePointer();
indexInput.readBytes(buf, 0, toRead, false);
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.FILE_CHUNK, new RecoveryFileChunkRequest(request.shardId(), name, position, len, buf, toRead),
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.FILE_CHUNK, new RecoveryFileChunkRequest(request.shardId(), name, position, len, md.checksum(), buf, toRead),
TransportRequestOptions.options().withCompress(compress), VoidTransportResponseHandler.INSTANCE).txGet();
readCount += toRead;
}

View File

@ -364,7 +364,7 @@ public class RecoveryTarget extends AbstractComponent {
if (!request.snapshotFiles().contains(existingFile)) {
try {
shard.store().directory().deleteFile(existingFile);
} catch (IOException e) {
} catch (Exception e) {
// ignore, we don't really care, will get deleted later on
}
}
@ -398,7 +398,9 @@ public class RecoveryTarget extends AbstractComponent {
// ignore
}
}
indexOutput = shard.store().directory().createOutput(request.name());
// we create an output with no checksum, this is because the pure binary data of the file is not
// the checksum (because of seek). We will create the checksum file once copying is done
indexOutput = shard.store().createOutputWithNoChecksum(request.name());
onGoingRecovery.openIndexOutputs.put(request.name(), indexOutput);
} else {
indexOutput = onGoingRecovery.openIndexOutputs.get(request.name());
@ -414,6 +416,11 @@ public class RecoveryTarget extends AbstractComponent {
if (indexOutput.getFilePointer() == request.length()) {
// we are done
indexOutput.close();
// write the checksum
if (request.checksum() != null) {
shard.store().writeChecksum(request.name(), request.checksum());
}
shard.store().directory().sync(request.name());
onGoingRecovery.openIndexOutputs.remove(request.name());
}
} catch (IOException e) {
@ -423,6 +430,7 @@ public class RecoveryTarget extends AbstractComponent {
} catch (IOException e1) {
// ignore
}
throw e;
}
}
channel.sendResponse(VoidStreamable.INSTANCE);

View File

@ -21,6 +21,7 @@ package org.elasticsearch.index.shard.service;
import org.apache.lucene.document.Document;
import org.apache.lucene.index.CheckIndex;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.BooleanClause;
import org.apache.lucene.search.FilterClause;
@ -188,10 +189,10 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
if (state == IndexShardState.RELOCATED) {
throw new IndexShardRelocatedException(shardId);
}
engine.start();
if (checkIndex) {
checkIndex(true);
}
engine.start();
scheduleRefresherIfNeeded();
logger.debug("state: [{}]->[{}]", state, IndexShardState.STARTED);
state = IndexShardState.STARTED;
@ -436,6 +437,10 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
if (state != IndexShardState.RECOVERING) {
throw new IndexShardNotRecoveringException(shardId, state);
}
// also check here, before we apply the translog
if (checkIndex) {
checkIndex(true);
}
engine.start();
}
@ -455,9 +460,6 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
if (withFlush) {
engine.flush(new Engine.Flush());
}
if (checkIndex) {
checkIndex(true);
}
synchronized (mutex) {
logger.debug("state: [{}]->[{}]", state, IndexShardState.STARTED);
state = IndexShardState.STARTED;
@ -576,6 +578,9 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
private void checkIndex(boolean throwException) throws IndexShardException {
try {
if (!IndexReader.indexExists(store.directory())) {
return;
}
CheckIndex checkIndex = new CheckIndex(store.directory());
FastByteArrayOutputStream os = new FastByteArrayOutputStream();
PrintStream out = new PrintStream(os);

View File

@ -20,6 +20,7 @@
package org.elasticsearch.index.store;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IndexOutput;
import org.elasticsearch.common.collect.ImmutableMap;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.index.shard.IndexShardComponent;
@ -36,6 +37,12 @@ public interface Store extends IndexShardComponent {
*/
Directory directory();
IndexOutput createOutputWithNoChecksum(String name) throws IOException;
void writeChecksum(String name, String checksum) throws IOException;
StoreFileMetaData metaData(String name) throws IOException;
ImmutableMap<String, StoreFileMetaData> list() throws IOException;
/**

View File

@ -23,6 +23,7 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import javax.annotation.Nullable;
import java.io.IOException;
/**
@ -36,13 +37,16 @@ public class StoreFileMetaData implements Streamable {
private long length;
private String checksum;
StoreFileMetaData() {
}
public StoreFileMetaData(String name, long length, long lastModified) {
public StoreFileMetaData(String name, long length, long lastModified, String checksum) {
this.name = name;
this.lastModified = lastModified;
this.length = length;
this.checksum = checksum;
}
public String name() {
@ -57,6 +61,17 @@ public class StoreFileMetaData implements Streamable {
return length;
}
@Nullable public String checksum() {
return this.checksum;
}
public boolean isSame(StoreFileMetaData other) {
if (checksum != null && other.checksum != null) {
return checksum.equals(other.checksum);
}
return length == other.length;
}
public static StoreFileMetaData readStoreFileMetaData(StreamInput in) throws IOException {
StoreFileMetaData md = new StoreFileMetaData();
md.readFrom(in);
@ -64,16 +79,25 @@ public class StoreFileMetaData implements Streamable {
}
@Override public String toString() {
return "name [" + name + "], length [" + length + "]";
return "name [" + name + "], length [" + length + "], checksum [" + checksum + "]";
}
@Override public void readFrom(StreamInput in) throws IOException {
name = in.readUTF();
length = in.readVLong();
if (in.readBoolean()) {
checksum = in.readUTF();
}
}
@Override public void writeTo(StreamOutput out) throws IOException {
out.writeUTF(name);
out.writeVLong(length);
if (checksum == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
out.writeUTF(checksum);
}
}
}

View File

@ -20,7 +20,10 @@
package org.elasticsearch.index.store.support;
import org.apache.lucene.store.*;
import org.elasticsearch.common.Digest;
import org.elasticsearch.common.Hex;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.Unicode;
import org.elasticsearch.common.collect.ImmutableMap;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.lucene.Directories;
@ -35,6 +38,7 @@ import org.elasticsearch.index.store.StoreFileMetaData;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.security.MessageDigest;
import java.util.Map;
/**
@ -73,7 +77,7 @@ public abstract class AbstractStore extends AbstractIndexShardComponent implemen
return builder.build();
}
private StoreFileMetaData metaData(String name) throws IOException {
public StoreFileMetaData metaData(String name) throws IOException {
StoreFileMetaData md = filesMetadata.get(name);
if (md == null) {
return null;
@ -108,6 +112,24 @@ public abstract class AbstractStore extends AbstractIndexShardComponent implemen
directory().close();
}
@Override public IndexOutput createOutputWithNoChecksum(String name) throws IOException {
return ((StoreDirectory) directory()).createOutput(name, false);
}
@Override public void writeChecksum(String name, String checksum) throws IOException {
// write the checksum (using the delegate, so we won't checksum this one as well...)
IndexOutput checkSumOutput = ((StoreDirectory) directory()).delegate().createOutput(name + ".cks");
byte[] checksumBytes = Unicode.fromStringAsBytes(checksum);
checkSumOutput.writeBytes(checksumBytes, checksumBytes.length);
checkSumOutput.close();
// update the metadata to include the checksum
synchronized (mutex) {
StoreFileMetaData metaData = filesMetadata.get(name);
metaData = new StoreFileMetaData(metaData.name(), metaData.length(), metaData.lastModified(), checksum);
filesMetadata = MapBuilder.newMapBuilder(filesMetadata).put(name, metaData).immutableMap();
}
}
/**
* The idea of the store directory is to cache file level meta data, as well as md5 of it
*/
@ -120,13 +142,34 @@ public abstract class AbstractStore extends AbstractIndexShardComponent implemen
synchronized (mutex) {
MapBuilder<String, StoreFileMetaData> builder = MapBuilder.newMapBuilder();
for (String file : delegate.listAll()) {
builder.put(file, new StoreFileMetaData(file, delegate.fileLength(file), delegate.fileModified(file)));
if (file.endsWith(".cks")) { // ignore checksum files here
continue;
}
// try and load the checksum for the file
String checksum = null;
if (delegate.fileExists(file + ".cks")) {
IndexInput indexInput = delegate.openInput(file + ".cks");
try {
if (indexInput.length() > 0) {
byte[] checksumBytes = new byte[(int) indexInput.length()];
indexInput.readBytes(checksumBytes, 0, checksumBytes.length, false);
checksum = Unicode.fromBytes(checksumBytes);
}
} finally {
indexInput.close();
}
}
builder.put(file, new StoreFileMetaData(file, delegate.fileLength(file), delegate.fileModified(file), checksum));
}
filesMetadata = builder.immutableMap();
files = filesMetadata.keySet().toArray(new String[filesMetadata.size()]);
}
}
public Directory delegate() {
return delegate;
}
@Override public String[] listAll() throws IOException {
return files;
}
@ -152,7 +195,7 @@ public abstract class AbstractStore extends AbstractIndexShardComponent implemen
synchronized (mutex) {
StoreFileMetaData metaData = filesMetadata.get(name);
if (metaData != null) {
metaData = new StoreFileMetaData(metaData.name(), metaData.length(), delegate.fileModified(name));
metaData = new StoreFileMetaData(metaData.name(), metaData.length(), delegate.fileModified(name), metaData.checksum());
filesMetadata = MapBuilder.newMapBuilder(filesMetadata).put(name, metaData).immutableMap();
}
}
@ -160,6 +203,11 @@ public abstract class AbstractStore extends AbstractIndexShardComponent implemen
@Override public void deleteFile(String name) throws IOException {
delegate.deleteFile(name);
try {
delegate.deleteFile(name + ".cks");
} catch (Exception e) {
// ignore
}
synchronized (mutex) {
filesMetadata = MapBuilder.newMapBuilder(filesMetadata).remove(name).immutableMap();
files = filesMetadata.keySet().toArray(new String[filesMetadata.size()]);
@ -179,13 +227,23 @@ public abstract class AbstractStore extends AbstractIndexShardComponent implemen
}
@Override public IndexOutput createOutput(String name) throws IOException {
return createOutput(name, true);
}
public IndexOutput createOutput(String name, boolean computeChecksum) throws IOException {
IndexOutput out = delegate.createOutput(name);
// delete the relevant cks file for an existing file, if exists
try {
delegate.deleteFile(name + ".cks");
} catch (Exception e) {
// ignore
}
synchronized (mutex) {
StoreFileMetaData metaData = new StoreFileMetaData(name, -1, -1);
StoreFileMetaData metaData = new StoreFileMetaData(name, -1, -1, null);
filesMetadata = MapBuilder.newMapBuilder(filesMetadata).put(name, metaData).immutableMap();
files = filesMetadata.keySet().toArray(new String[filesMetadata.size()]);
}
return new StoreIndexOutput(out, name);
return new StoreIndexOutput(out, name, computeChecksum);
}
@Override public IndexInput openInput(String name) throws IOException {
@ -227,29 +285,63 @@ public abstract class AbstractStore extends AbstractIndexShardComponent implemen
@Override public void sync(String name) throws IOException {
if (sync) {
delegate.sync(name);
try {
if (delegate.fileExists(name + ".cks")) {
delegate.sync(name + ".cks");
}
} catch (Exception e) {
//ignore
}
}
}
@Override public void forceSync(String name) throws IOException {
delegate.sync(name);
try {
if (delegate.fileExists(name + ".cks")) {
delegate.sync(name + ".cks");
}
} catch (Exception e) {
//ignore
}
}
}
private class StoreIndexOutput extends IndexOutput {
class StoreIndexOutput extends IndexOutput {
private final IndexOutput delegate;
private final String name;
private StoreIndexOutput(IndexOutput delegate, String name) {
private final MessageDigest digest;
StoreIndexOutput(IndexOutput delegate, String name, boolean computeChecksum) {
this.delegate = delegate;
this.name = name;
if (computeChecksum) {
if ("segments.gen".equals(name)) {
// no need to create checksum for segments.gen since its not snapshot to recovery
this.digest = null;
} else {
this.digest = Digest.getMd5Digest();
}
} else {
this.digest = null;
}
}
@Override public void close() throws IOException {
delegate.close();
String checksum = null;
if (digest != null) {
checksum = Hex.encodeHexString(digest.digest());
IndexOutput checkSumOutput = ((StoreDirectory) directory()).delegate().createOutput(name + ".cks");
byte[] checksumBytes = Unicode.fromStringAsBytes(checksum);
checkSumOutput.writeBytes(checksumBytes, checksumBytes.length);
checkSumOutput.close();
}
synchronized (mutex) {
StoreFileMetaData md = new StoreFileMetaData(name, directory().fileLength(name), directory().fileModified(name));
StoreFileMetaData md = new StoreFileMetaData(name, directory().fileLength(name), directory().fileModified(name), checksum);
filesMetadata = MapBuilder.newMapBuilder(filesMetadata).put(name, md).immutableMap();
files = filesMetadata.keySet().toArray(new String[filesMetadata.size()]);
}
@ -257,10 +349,16 @@ public abstract class AbstractStore extends AbstractIndexShardComponent implemen
@Override public void writeByte(byte b) throws IOException {
delegate.writeByte(b);
if (digest != null) {
digest.update(b);
}
}
@Override public void writeBytes(byte[] b, int offset, int length) throws IOException {
delegate.writeBytes(b, offset, length);
if (digest != null) {
digest.update(b, offset, length);
}
}
// don't override it, base class method simple reads from input and writes to this output
@ -277,6 +375,9 @@ public abstract class AbstractStore extends AbstractIndexShardComponent implemen
}
@Override public void seek(long pos) throws IOException {
// seek might be called on files, which means that the checksum is not file checksum
// but a checksum of the bytes written to this stream, which is the same for each
// type of file in lucene
delegate.seek(pos);
}

View File

@ -27,10 +27,12 @@ import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Unicode;
import org.elasticsearch.common.collect.ImmutableMap;
import org.elasticsearch.common.collect.Lists;
import org.elasticsearch.common.collect.Maps;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
@ -157,7 +159,19 @@ public class TransportNodesListShardStoreMetaData extends TransportNodesOperatio
}
Map<String, StoreFileMetaData> files = Maps.newHashMap();
for (File file : indexFile.listFiles()) {
files.put(file.getName(), new StoreFileMetaData(file.getName(), file.length(), file.lastModified()));
if (file.getName().endsWith(".cks")) {
continue;
}
// try and load the checksum
String checksum = null;
File checksumFile = new File(file.getParentFile(), file.getName() + ".cks");
if (checksumFile.exists()) {
byte[] checksumBytes = Streams.copyToByteArray(checksumFile);
if (checksumBytes.length > 0) {
checksum = Unicode.fromBytes(checksumBytes);
}
}
files.put(file.getName(), new StoreFileMetaData(file.getName(), file.length(), file.lastModified(), checksum));
}
return new StoreFilesMetaData(false, shardId, files);
}

View File

@ -38,12 +38,12 @@ public class CommitPointsTests {
@Test public void testCommitPointXContent() throws Exception {
ArrayList<CommitPoint.FileInfo> indexFiles = Lists.newArrayList();
indexFiles.add(new CommitPoint.FileInfo("file1", "file1_p", 100));
indexFiles.add(new CommitPoint.FileInfo("file2", "file2_p", 200));
indexFiles.add(new CommitPoint.FileInfo("file1", "file1_p", 100, "ck1"));
indexFiles.add(new CommitPoint.FileInfo("file2", "file2_p", 200, "ck2"));
ArrayList<CommitPoint.FileInfo> translogFiles = Lists.newArrayList();
translogFiles.add(new CommitPoint.FileInfo("t_file1", "t_file1_p", 100));
translogFiles.add(new CommitPoint.FileInfo("t_file2", "t_file2_p", 200));
translogFiles.add(new CommitPoint.FileInfo("t_file1", "t_file1_p", 100, null));
translogFiles.add(new CommitPoint.FileInfo("t_file2", "t_file2_p", 200, null));
CommitPoint commitPoint = new CommitPoint(1, "test", CommitPoint.Type.GENERATED, indexFiles, translogFiles);
@ -59,6 +59,7 @@ public class CommitPointsTests {
assertThat(desCp.indexFiles().get(i).name(), equalTo(commitPoint.indexFiles().get(i).name()));
assertThat(desCp.indexFiles().get(i).physicalName(), equalTo(commitPoint.indexFiles().get(i).physicalName()));
assertThat(desCp.indexFiles().get(i).length(), equalTo(commitPoint.indexFiles().get(i).length()));
assertThat(desCp.indexFiles().get(i).checksum(), equalTo(commitPoint.indexFiles().get(i).checksum()));
}
assertThat(desCp.translogFiles().size(), equalTo(commitPoint.translogFiles().size()));
@ -66,6 +67,7 @@ public class CommitPointsTests {
assertThat(desCp.translogFiles().get(i).name(), equalTo(commitPoint.translogFiles().get(i).name()));
assertThat(desCp.translogFiles().get(i).physicalName(), equalTo(commitPoint.translogFiles().get(i).physicalName()));
assertThat(desCp.translogFiles().get(i).length(), equalTo(commitPoint.translogFiles().get(i).length()));
assertThat(desCp.translogFiles().get(i).checksum(), nullValue());
}
}
}

View File

@ -0,0 +1,239 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.test.stress.fullrestart;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.count.CountResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Requests;
import org.elasticsearch.client.action.bulk.BulkRequestBuilder;
import org.elasticsearch.common.UUID;
import org.elasticsearch.common.io.FileSystemUtils;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.jsr166y.ThreadLocalRandom;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.node.Node;
import org.elasticsearch.node.NodeBuilder;
import org.elasticsearch.node.internal.InternalNode;
import java.io.File;
import java.util.concurrent.atomic.AtomicLong;
import static org.elasticsearch.index.query.xcontent.QueryBuilders.*;
/**
* @author kimchy (shay.banon)
*/
public class FullRestartStressTest {
private final ESLogger logger = Loggers.getLogger(getClass());
private int numberOfNodes = 4;
private boolean clearNodeWork = false;
private int numberOfIndices = 5;
private int textTokens = 150;
private int numberOfFields = 10;
private int bulkSize = 1000;
private int numberOfDocsPerRound = 50000;
private Settings settings = ImmutableSettings.Builder.EMPTY_SETTINGS;
private TimeValue period = TimeValue.timeValueMinutes(20);
private AtomicLong indexCounter = new AtomicLong();
public FullRestartStressTest numberOfNodes(int numberOfNodes) {
this.numberOfNodes = numberOfNodes;
return this;
}
public FullRestartStressTest numberOfIndices(int numberOfIndices) {
this.numberOfIndices = numberOfIndices;
return this;
}
public FullRestartStressTest textTokens(int textTokens) {
this.textTokens = textTokens;
return this;
}
public FullRestartStressTest numberOfFields(int numberOfFields) {
this.numberOfFields = numberOfFields;
return this;
}
public FullRestartStressTest bulkSize(int bulkSize) {
this.bulkSize = bulkSize;
return this;
}
public FullRestartStressTest numberOfDocsPerRound(int numberOfDocsPerRound) {
this.numberOfDocsPerRound = numberOfDocsPerRound;
return this;
}
public FullRestartStressTest settings(Settings settings) {
this.settings = settings;
return this;
}
public FullRestartStressTest period(TimeValue period) {
this.period = period;
return this;
}
public FullRestartStressTest clearNodeWork(boolean clearNodeWork) {
this.clearNodeWork = clearNodeWork;
return this;
}
public void run() throws Exception {
long numberOfRounds = 0;
long testStart = System.currentTimeMillis();
while (true) {
Node[] nodes = new Node[numberOfNodes];
for (int i = 0; i < nodes.length; i++) {
nodes[i] = NodeBuilder.nodeBuilder().settings(settings).node();
}
Node client = NodeBuilder.nodeBuilder().settings(settings).client(true).node();
// verify that the indices are there
for (int i = 0; i < numberOfIndices; i++) {
try {
client.client().admin().indices().prepareCreate("test" + i).execute().actionGet();
} catch (Exception e) {
// might already exists, fine
}
}
logger.info("*** Waiting for GREEN status");
try {
ClusterHealthResponse clusterHealth = client.client().admin().cluster().prepareHealth().setWaitForGreenStatus().setTimeout("10m").execute().actionGet();
if (clusterHealth.timedOut()) {
logger.warn("timed out waiting for green status....");
}
} catch (Exception e) {
logger.warn("failed to execute cluster health....");
}
CountResponse count = client.client().prepareCount().setQuery(matchAllQuery()).execute().actionGet();
logger.info("*** index_count [{}], expected_count [{}]", count.count(), indexCounter.get());
// verify count
for (int i = 0; i < (nodes.length * 5); i++) {
count = client.client().prepareCount().setQuery(matchAllQuery()).execute().actionGet();
logger.debug("index_count [{}], expected_count [{}]", count.count(), indexCounter.get());
if (count.count() != indexCounter.get()) {
logger.warn("!!! count does not match, index_count [{}], expected_count [{}]", count.count(), indexCounter.get());
throw new Exception("failed test, count does not match...");
}
}
// verify search
for (int i = 0; i < (nodes.length * 5); i++) {
// do a search with norms field, so we don't rely on match all filtering cache
SearchResponse search = client.client().prepareSearch().setQuery(matchAllQuery().normsField("field")).execute().actionGet();
logger.debug("index_count [{}], expected_count [{}]", search.hits().totalHits(), indexCounter.get());
if (count.count() != indexCounter.get()) {
logger.warn("!!! search does not match, index_count [{}], expected_count [{}]", search.hits().totalHits(), indexCounter.get());
throw new Exception("failed test, count does not match...");
}
}
logger.info("*** ROUND {}", ++numberOfRounds);
// bulk index data
int numberOfBulks = numberOfDocsPerRound / bulkSize;
for (int b = 0; b < numberOfBulks; b++) {
BulkRequestBuilder bulk = client.client().prepareBulk();
for (int k = 0; k < bulkSize; k++) {
StringBuffer sb = new StringBuffer();
XContentBuilder json = XContentFactory.jsonBuilder().startObject()
.field("field", "value" + ThreadLocalRandom.current().nextInt());
int fields = ThreadLocalRandom.current().nextInt() % numberOfFields;
for (int i = 0; i < fields; i++) {
json.field("num_" + i, ThreadLocalRandom.current().nextDouble());
int tokens = ThreadLocalRandom.current().nextInt() % textTokens;
sb.setLength(0);
for (int j = 0; j < tokens; j++) {
sb.append(UUID.randomBase64UUID()).append(' ');
}
json.field("text_" + i, sb.toString());
}
json.endObject();
bulk.add(Requests.indexRequest("test" + (Math.abs(ThreadLocalRandom.current().nextInt()) % numberOfIndices)).type("type1").source(json));
indexCounter.incrementAndGet();
}
bulk.execute().actionGet();
}
client.client().admin().indices().prepareGatewaySnapshot().execute().actionGet();
client.close();
for (Node node : nodes) {
File nodeWork = ((InternalNode) node).injector().getInstance(NodeEnvironment.class).nodeLocation();
node.close();
if (clearNodeWork && !settings.get("gateway.type").equals("local")) {
FileSystemUtils.deleteRecursively(nodeWork);
}
}
if ((System.currentTimeMillis() - testStart) > period.millis()) {
logger.info("test finished, full_restart_rounds [{}]", numberOfRounds);
break;
}
}
}
public static void main(String[] args) throws Exception {
System.setProperty("es.logger.prefix", "");
int numberOfNodes = 2;
Settings settings = ImmutableSettings.settingsBuilder()
.put("index.shard.check_index", true)
.put("gateway.type", "fs")
.put("gateway.recover_after_nodes", numberOfNodes)
.put("index.number_of_shards", 1)
.build();
FullRestartStressTest test = new FullRestartStressTest()
.settings(settings)
.period(TimeValue.timeValueMinutes(20))
.clearNodeWork(false) // only applies to shared gateway
.numberOfNodes(numberOfNodes)
.numberOfIndices(1)
.textTokens(150)
.numberOfFields(10)
.bulkSize(1000)
.numberOfDocsPerRound(10000);
test.run();
}
}

View File

@ -104,7 +104,7 @@ public class RollingRestartStressTest {
return this;
}
public RollingRestartStressTest cleanNodeWork(boolean cleanNodeWork) {
public RollingRestartStressTest cleanNodeWork(boolean clearNodeWork) {
this.clearNodeWork = clearNodeWork;
return this;
}
@ -222,7 +222,7 @@ public class RollingRestartStressTest {
XContentBuilder json = XContentFactory.jsonBuilder().startObject()
.field("field", "value" + ThreadLocalRandom.current().nextInt());
int fields = ThreadLocalRandom.current().nextInt() % numberOfFields;
int fields = Math.abs(ThreadLocalRandom.current().nextInt()) % numberOfFields;
for (int i = 0; i < fields; i++) {
json.field("num_" + i, ThreadLocalRandom.current().nextDouble());
int tokens = ThreadLocalRandom.current().nextInt() % textTokens;