From 4ff1b429f1351c17ae5f3a4338a7ac33db49ef70 Mon Sep 17 00:00:00 2001 From: kimchy Date: Mon, 1 Nov 2010 23:00:16 +0200 Subject: [PATCH] Possible (rare) shard index corruption / different doc count on recovery (gateway / shard), closes #466. --- .../checksum/ChecksumBenchmarkTest.java | 65 +++++ .../BlobReuseExistingNodeAllocation.java | 20 +- .../local/LocalGatewayNodeAllocation.java | 2 +- .../index/gateway/CommitPoint.java | 17 +- .../index/gateway/CommitPoints.java | 8 +- .../blobstore/BlobStoreIndexShardGateway.java | 42 +-- .../gateway/local/LocalIndexShardGateway.java | 2 +- .../recovery/RecoveryFileChunkRequest.java | 18 +- .../index/shard/recovery/RecoverySource.java | 20 +- .../index/shard/recovery/RecoveryTarget.java | 12 +- .../shard/service/InternalIndexShard.java | 13 +- .../org/elasticsearch/index/store/Store.java | 7 + .../index/store/StoreFileMetaData.java | 28 +- .../index/store/support/AbstractStore.java | 117 ++++++++- .../TransportNodesListShardStoreMetaData.java | 16 +- .../index/gateway/CommitPointsTests.java | 10 +- .../fullrestart/FullRestartStressTest.java | 239 ++++++++++++++++++ .../RollingRestartStressTest.java | 4 +- 18 files changed, 570 insertions(+), 70 deletions(-) create mode 100644 modules/benchmark/micro/src/main/java/org/elasticsearch/benchmark/checksum/ChecksumBenchmarkTest.java create mode 100644 modules/test/integration/src/test/java/org/elasticsearch/test/stress/fullrestart/FullRestartStressTest.java diff --git a/modules/benchmark/micro/src/main/java/org/elasticsearch/benchmark/checksum/ChecksumBenchmarkTest.java b/modules/benchmark/micro/src/main/java/org/elasticsearch/benchmark/checksum/ChecksumBenchmarkTest.java new file mode 100644 index 00000000000..5895c33a9e7 --- /dev/null +++ b/modules/benchmark/micro/src/main/java/org/elasticsearch/benchmark/checksum/ChecksumBenchmarkTest.java @@ -0,0 +1,65 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.benchmark.checksum; + +import org.elasticsearch.common.Digest; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.unit.TimeValue; + +import java.security.MessageDigest; +import java.util.zip.CRC32; + +/** + * @author kimchy (shay.banon) + */ +public class ChecksumBenchmarkTest { + + public static final int BATCH_SIZE = 16 * 1024; + + public static void main(String[] args) { + long dataSize = ByteSizeValue.parseBytesSizeValue("1g", null).bytes(); + crc(dataSize); + md5(dataSize); + } + + private static void crc(long dataSize) { + long start = System.currentTimeMillis(); + CRC32 crc = new CRC32(); + byte[] data = new byte[BATCH_SIZE]; + long iter = dataSize / BATCH_SIZE; + for (long i = 0; i < iter; i++) { + crc.update(data); + } + crc.getValue(); + System.out.println("CRC took " + new TimeValue(System.currentTimeMillis() - start)); + } + + private static void md5(long dataSize) { + long start = System.currentTimeMillis(); + byte[] data = new byte[BATCH_SIZE]; + long iter = dataSize / BATCH_SIZE; + MessageDigest digest = Digest.getMd5Digest(); + for (long i = 0; i < iter; i++) { + digest.update(data); + } + digest.digest(); + System.out.println("md5 took " + new TimeValue(System.currentTimeMillis() - start)); + } +} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/blobstore/BlobReuseExistingNodeAllocation.java b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/blobstore/BlobReuseExistingNodeAllocation.java index d5753efd90a..07511bdbb92 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/blobstore/BlobReuseExistingNodeAllocation.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/blobstore/BlobReuseExistingNodeAllocation.java @@ -174,27 +174,15 @@ public class BlobReuseExistingNodeAllocation extends NodeAllocation { break; } - if (logger.isTraceEnabled()) { - StringBuilder sb = new StringBuilder(shard + ": checking for pre_allocation (gateway) on node " + discoNode + "\n"); - sb.append(" gateway_files:\n"); - for (CommitPoint.FileInfo fileInfo : commitPoint.indexFiles()) { - sb.append(" [").append(fileInfo.name()).append("]/[").append(fileInfo.physicalName()).append("], size [").append(new ByteSizeValue(fileInfo.length())).append("]\n"); - } - sb.append(" node_files:\n"); - for (StoreFileMetaData md : storeFilesMetaData) { - sb.append(" [").append(md.name()).append("], size [").append(new ByteSizeValue(md.length())).append("]\n"); - } - logger.trace(sb.toString()); - } long sizeMatched = 0; for (StoreFileMetaData storeFileMetaData : storeFilesMetaData) { CommitPoint.FileInfo fileInfo = commitPoint.findPhysicalIndexFile(storeFileMetaData.name()); if (fileInfo != null) { - if (fileInfo.length() == storeFileMetaData.length()) { - logger.trace("{}: [{}] reusing file since it exists on remote node and on gateway with size [{}]", shard, storeFileMetaData.name(), new ByteSizeValue(storeFileMetaData.length())); + if (fileInfo.isSame(storeFileMetaData)) { + logger.trace("{}: [{}] reusing file since it exists on remote node and on gateway", shard, storeFileMetaData.name()); sizeMatched += storeFileMetaData.length(); } else { - logger.trace("{}: [{}] ignore file since it exists on remote node and on gateway but has different size, remote node [{}], gateway [{}]", shard, storeFileMetaData.name(), storeFileMetaData.length(), fileInfo.length()); + logger.trace("{}: [{}] ignore file since it exists on remote node and on gateway but is different", shard, storeFileMetaData.name()); } } else { logger.trace("{}: [{}] exists on remote node, does not exists on gateway", shard, storeFileMetaData.name()); @@ -224,7 +212,7 @@ public class BlobReuseExistingNodeAllocation extends NodeAllocation { long sizeMatched = 0; for (StoreFileMetaData storeFileMetaData : storeFilesMetaData) { - if (primaryNodeStore.fileExists(storeFileMetaData.name()) && primaryNodeStore.file(storeFileMetaData.name()).length() == storeFileMetaData.length()) { + if (primaryNodeStore.fileExists(storeFileMetaData.name()) && primaryNodeStore.file(storeFileMetaData.name()).isSame(storeFileMetaData)) { sizeMatched += storeFileMetaData.length(); } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/LocalGatewayNodeAllocation.java b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/LocalGatewayNodeAllocation.java index 8ccae2550c4..7fa4573ab79 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/LocalGatewayNodeAllocation.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/LocalGatewayNodeAllocation.java @@ -315,7 +315,7 @@ public class LocalGatewayNodeAllocation extends NodeAllocation { long sizeMatched = 0; for (StoreFileMetaData storeFileMetaData : storeFilesMetaData) { - if (primaryNodeStore.fileExists(storeFileMetaData.name()) && primaryNodeStore.file(storeFileMetaData.name()).length() == storeFileMetaData.length()) { + if (primaryNodeStore.fileExists(storeFileMetaData.name()) && primaryNodeStore.file(storeFileMetaData.name()).isSame(storeFileMetaData)) { sizeMatched += storeFileMetaData.length(); } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/CommitPoint.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/CommitPoint.java index 5b491556d6c..42f86e063a2 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/CommitPoint.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/CommitPoint.java @@ -20,7 +20,9 @@ package org.elasticsearch.index.gateway; import org.elasticsearch.common.collect.ImmutableList; +import org.elasticsearch.index.store.StoreFileMetaData; +import javax.annotation.Nullable; import java.util.List; /** @@ -34,11 +36,13 @@ public class CommitPoint { private final String name; private final String physicalName; private final long length; + private final String checksum; - public FileInfo(String name, String physicalName, long length) { + public FileInfo(String name, String physicalName, long length, String checksum) { this.name = name; this.physicalName = physicalName; this.length = length; + this.checksum = checksum; } public String name() { @@ -52,6 +56,17 @@ public class CommitPoint { public long length() { return length; } + + @Nullable public String checksum() { + return checksum; + } + + public boolean isSame(StoreFileMetaData md) { + if (checksum != null && md.checksum() != null) { + return checksum.equals(md.checksum()); + } + return length == md.length(); + } } public static enum Type { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/CommitPoints.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/CommitPoints.java index 6b1c0889058..cefb41176b7 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/CommitPoints.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/CommitPoints.java @@ -97,6 +97,9 @@ public class CommitPoints implements Iterable { builder.startObject(fileInfo.name()); builder.field("physical_name", fileInfo.physicalName()); builder.field("length", fileInfo.length()); + if (fileInfo.checksum() != null) { + builder.field("checksum", fileInfo.checksum()); + } builder.endObject(); } builder.endObject(); @@ -147,6 +150,7 @@ public class CommitPoints implements Iterable { String fileName = currentFieldName; String physicalName = null; long size = -1; + String checksum = null; while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { if (token == XContentParser.Token.FIELD_NAME) { currentFieldName = parser.currentName(); @@ -155,6 +159,8 @@ public class CommitPoints implements Iterable { physicalName = parser.text(); } else if ("length".equals(currentFieldName)) { size = parser.longValue(); + } else if ("checksum".equals(currentFieldName)) { + checksum = parser.text(); } } } @@ -164,7 +170,7 @@ public class CommitPoints implements Iterable { if (size == -1) { throw new IOException("Malformed commit, missing length for [" + fileName + "]"); } - files.add(new CommitPoint.FileInfo(fileName, physicalName, size)); + files.add(new CommitPoint.FileInfo(fileName, physicalName, size, checksum)); } } } else if (token.isValue()) { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/blobstore/BlobStoreIndexShardGateway.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/blobstore/BlobStoreIndexShardGateway.java index d95049cb3f5..536f1011cbb 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/blobstore/BlobStoreIndexShardGateway.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/blobstore/BlobStoreIndexShardGateway.java @@ -43,6 +43,7 @@ import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.service.IndexShard; import org.elasticsearch.index.shard.service.InternalIndexShard; import org.elasticsearch.index.store.Store; +import org.elasticsearch.index.store.StoreFileMetaData; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.TranslogStreams; import org.elasticsearch.threadpool.ThreadPool; @@ -181,9 +182,9 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo int indexNumberOfFiles = 0; long indexTotalFilesSize = 0; for (final String fileName : snapshotIndexCommit.getFiles()) { - long fileLength = 0; + StoreFileMetaData md; try { - fileLength = store.directory().fileLength(fileName); + md = store.metaData(fileName); } catch (IOException e) { throw new IndexShardGatewaySnapshotFailedException(shardId, "Failed to get store file metadata", e); } @@ -194,17 +195,17 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo } CommitPoint.FileInfo fileInfo = commitPoints.findPhysicalIndexFile(fileName); - if (fileInfo == null || fileInfo.length() != fileLength || !commitPointFileExistsInBlobs(fileInfo, blobs)) { + if (fileInfo == null || !fileInfo.isSame(md) || !commitPointFileExistsInBlobs(fileInfo, blobs)) { // commit point file does not exists in any commit point, or has different length, or does not fully exists in the listed blobs snapshotRequired = true; } if (snapshotRequired) { indexNumberOfFiles++; - indexTotalFilesSize += fileLength; + indexTotalFilesSize += md.length(); // create a new FileInfo try { - CommitPoint.FileInfo snapshotFileInfo = new CommitPoint.FileInfo(fileNameFromGeneration(++generation), fileName, fileLength); + CommitPoint.FileInfo snapshotFileInfo = new CommitPoint.FileInfo(fileNameFromGeneration(++generation), fileName, md.length(), md.checksum()); indexCommitPointFiles.add(snapshotFileInfo); snapshotFile(snapshotIndexCommit.getDirectory(), snapshotFileInfo, indexLatch, failures); } catch (IOException e) { @@ -280,7 +281,7 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo currentSnapshotStatus.translog().expectedNumberOfOperations(expectedNumberOfOperations); if (snapshotRequired) { - CommitPoint.FileInfo addedTranslogFileInfo = new CommitPoint.FileInfo(fileNameFromGeneration(++generation), "translog-" + translogSnapshot.translogId(), translogSnapshot.lengthInBytes()); + CommitPoint.FileInfo addedTranslogFileInfo = new CommitPoint.FileInfo(fileNameFromGeneration(++generation), "translog-" + translogSnapshot.translogId(), translogSnapshot.lengthInBytes(), null /* no need for checksum in translog */); translogCommitPointFiles.add(addedTranslogFileInfo); try { snapshotTranslog(translogSnapshot, addedTranslogFileInfo); @@ -520,26 +521,26 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo List filesToRecover = Lists.newArrayList(); for (CommitPoint.FileInfo fileInfo : commitPoint.indexFiles()) { String fileName = fileInfo.physicalName(); - long fileLength = -1; + StoreFileMetaData md = null; try { - fileLength = store.directory().fileLength(fileName); + md = store.metaData(fileName); } catch (Exception e) { // no file } - if (!fileName.contains("segment") && fileLength == fileInfo.length()) { + if (!fileName.contains("segment") && md != null && fileInfo.isSame(md)) { numberOfFiles++; - totalSize += fileLength; + totalSize += md.length(); numberOfReusedFiles++; - reusedTotalSize += fileLength; + reusedTotalSize += md.length(); if (logger.isTraceEnabled()) { - logger.trace("not_recovering [{}], exists in local store and has same length [{}]", fileInfo.physicalName(), fileInfo.length()); + logger.trace("not_recovering [{}], exists in local store and is same", fileInfo.physicalName()); } } else { if (logger.isTraceEnabled()) { - if (fileLength == -1) { + if (md == null) { logger.trace("recovering [{}], does not exists in local store", fileInfo.physicalName()); } else { - logger.trace("recovering [{}], exists in local store but has different length: gateway [{}], local [{}]", fileInfo.physicalName(), fileInfo.length(), fileLength); + logger.trace("recovering [{}], exists in local store but is different", fileInfo.physicalName()); } } numberOfFiles++; @@ -591,12 +592,12 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo if (!commitPoint.containPhysicalIndexFile(storeFile)) { try { store.directory().deleteFile(storeFile); - } catch (IOException e) { + } catch (Exception e) { // ignore } } } - } catch (IOException e) { + } catch (Exception e) { // ignore } } @@ -604,7 +605,9 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo private void recoverFile(final CommitPoint.FileInfo fileInfo, final ImmutableMap blobs, final CountDownLatch latch, final List failures) { final IndexOutput indexOutput; try { - indexOutput = store.directory().createOutput(fileInfo.physicalName()); + // we create an output with no checksum, this is because the pure binary data of the file is not + // the checksum (because of seek). We will create the checksum file once copying is done + indexOutput = store.createOutputWithNoChecksum(fileInfo.physicalName()); } catch (IOException e) { failures.add(e); latch.countDown(); @@ -641,6 +644,11 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo // we are done... try { indexOutput.close(); + // write the checksum + if (fileInfo.checksum() != null) { + store.writeChecksum(fileInfo.physicalName(), fileInfo.checksum()); + } + store.directory().sync(fileInfo.physicalName()); } catch (IOException e) { onFailure(e); return; diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/local/LocalIndexShardGateway.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/local/LocalIndexShardGateway.java index aef2550bd1c..123e04fe4a7 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/local/LocalIndexShardGateway.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/local/LocalIndexShardGateway.java @@ -81,7 +81,7 @@ public class LocalIndexShardGateway extends AbstractIndexShardComponent implemen } @Override public void recover(RecoveryStatus recoveryStatus) throws IndexShardGatewayRecoveryException { - recoveryStatus().index().startTime(System.currentTimeMillis()); + recoveryStatus.index().startTime(System.currentTimeMillis()); long version = -1; try { if (IndexReader.indexExists(indexShard.store().directory())) { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoveryFileChunkRequest.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoveryFileChunkRequest.java index 584a7a57486..14b489942f8 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoveryFileChunkRequest.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoveryFileChunkRequest.java @@ -24,6 +24,7 @@ import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Streamable; import org.elasticsearch.index.shard.ShardId; +import javax.annotation.Nullable; import java.io.IOException; /** @@ -35,17 +36,19 @@ class RecoveryFileChunkRequest implements Streamable { private String name; private long position; private long length; + private String checksum; private byte[] content; private int contentLength; RecoveryFileChunkRequest() { } - RecoveryFileChunkRequest(ShardId shardId, String name, long position, long length, byte[] content, int contentLength) { + RecoveryFileChunkRequest(ShardId shardId, String name, long position, long length, String checksum, byte[] content, int contentLength) { this.shardId = shardId; this.name = name; this.position = position; this.length = length; + this.checksum = checksum; this.content = content; this.contentLength = contentLength; } @@ -62,6 +65,10 @@ class RecoveryFileChunkRequest implements Streamable { return position; } + @Nullable public String checksum() { + return this.checksum; + } + public long length() { return length; } @@ -85,6 +92,9 @@ class RecoveryFileChunkRequest implements Streamable { name = in.readUTF(); position = in.readVLong(); length = in.readVLong(); + if (in.readBoolean()) { + checksum = in.readUTF(); + } contentLength = in.readVInt(); content = new byte[contentLength]; in.readFully(content); @@ -95,6 +105,12 @@ class RecoveryFileChunkRequest implements Streamable { out.writeUTF(name); out.writeVLong(position); out.writeVLong(length); + if (checksum == null) { + out.writeBoolean(false); + } else { + out.writeBoolean(true); + out.writeUTF(checksum); + } out.writeVInt(contentLength); out.writeBytes(content, 0, contentLength); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoverySource.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoverySource.java index 93082a184be..f999847ea98 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoverySource.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoverySource.java @@ -35,6 +35,7 @@ import org.elasticsearch.index.shard.IllegalIndexShardStateException; import org.elasticsearch.index.shard.IndexShardClosedException; import org.elasticsearch.index.shard.IndexShardState; import org.elasticsearch.index.shard.service.InternalIndexShard; +import org.elasticsearch.index.store.StoreFileMetaData; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.threadpool.ThreadPool; @@ -96,28 +97,28 @@ public class RecoverySource extends AbstractComponent { StopWatch stopWatch = new StopWatch().start(); for (String name : snapshot.getFiles()) { - long length = shard.store().directory().fileLength(name); + StoreFileMetaData md = shard.store().metaData(name); boolean useExisting = false; if (request.existingFiles().containsKey(name)) { - if (!name.contains("segment") && length == request.existingFiles().get(name).length()) { + if (!name.contains("segment") && md.isSame(request.existingFiles().get(name))) { response.phase1ExistingFileNames.add(name); - response.phase1ExistingFileSizes.add(length); - existingTotalSize += length; + response.phase1ExistingFileSizes.add(md.length()); + existingTotalSize += md.length(); useExisting = true; if (logger.isTraceEnabled()) { - logger.trace("[{}][{}] recovery [phase1] to {}: not recovering [{}], exists in local store and has size [{}]", request.shardId().index().name(), request.shardId().id(), request.targetNode(), name, length); + logger.trace("[{}][{}] recovery [phase1] to {}: not recovering [{}], exists in local store and has checksum [{}], size [{}]", request.shardId().index().name(), request.shardId().id(), request.targetNode(), name, md.checksum(), md.length()); } } } if (!useExisting) { if (request.existingFiles().containsKey(name)) { - logger.trace("[{}][{}] recovery [phase1] to {}: recovering [{}], exists in local store, but has different length: remote [{}], local [{}]", request.shardId().index().name(), request.shardId().id(), request.targetNode(), name, request.existingFiles().get(name).length(), length); + logger.trace("[{}][{}] recovery [phase1] to {}: recovering [{}], exists in local store, but is different: remote [{}], local [{}]", request.shardId().index().name(), request.shardId().id(), request.targetNode(), name, request.existingFiles().get(name), md); } else { logger.trace("[{}][{}] recovery [phase1] to {}: recovering [{}], does not exists in remote", request.shardId().index().name(), request.shardId().id(), request.targetNode(), name); } response.phase1FileNames.add(name); - response.phase1FileSizes.add(length); - totalSize += length; + response.phase1FileSizes.add(md.length()); + totalSize += md.length(); } } response.phase1TotalSize = totalSize; @@ -138,6 +139,7 @@ public class RecoverySource extends AbstractComponent { try { final int BUFFER_SIZE = (int) fileChunkSize.bytes(); byte[] buf = new byte[BUFFER_SIZE]; + StoreFileMetaData md = shard.store().metaData(name); indexInput = snapshot.getDirectory().openInput(name); long len = indexInput.length(); long readCount = 0; @@ -148,7 +150,7 @@ public class RecoverySource extends AbstractComponent { int toRead = readCount + BUFFER_SIZE > len ? (int) (len - readCount) : BUFFER_SIZE; long position = indexInput.getFilePointer(); indexInput.readBytes(buf, 0, toRead, false); - transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.FILE_CHUNK, new RecoveryFileChunkRequest(request.shardId(), name, position, len, buf, toRead), + transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.FILE_CHUNK, new RecoveryFileChunkRequest(request.shardId(), name, position, len, md.checksum(), buf, toRead), TransportRequestOptions.options().withCompress(compress), VoidTransportResponseHandler.INSTANCE).txGet(); readCount += toRead; } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoveryTarget.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoveryTarget.java index a2e78807911..19d2501b273 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoveryTarget.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoveryTarget.java @@ -364,7 +364,7 @@ public class RecoveryTarget extends AbstractComponent { if (!request.snapshotFiles().contains(existingFile)) { try { shard.store().directory().deleteFile(existingFile); - } catch (IOException e) { + } catch (Exception e) { // ignore, we don't really care, will get deleted later on } } @@ -398,7 +398,9 @@ public class RecoveryTarget extends AbstractComponent { // ignore } } - indexOutput = shard.store().directory().createOutput(request.name()); + // we create an output with no checksum, this is because the pure binary data of the file is not + // the checksum (because of seek). We will create the checksum file once copying is done + indexOutput = shard.store().createOutputWithNoChecksum(request.name()); onGoingRecovery.openIndexOutputs.put(request.name(), indexOutput); } else { indexOutput = onGoingRecovery.openIndexOutputs.get(request.name()); @@ -414,6 +416,11 @@ public class RecoveryTarget extends AbstractComponent { if (indexOutput.getFilePointer() == request.length()) { // we are done indexOutput.close(); + // write the checksum + if (request.checksum() != null) { + shard.store().writeChecksum(request.name(), request.checksum()); + } + shard.store().directory().sync(request.name()); onGoingRecovery.openIndexOutputs.remove(request.name()); } } catch (IOException e) { @@ -423,6 +430,7 @@ public class RecoveryTarget extends AbstractComponent { } catch (IOException e1) { // ignore } + throw e; } } channel.sendResponse(VoidStreamable.INSTANCE); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java index fc9142f712d..d67c90f0c92 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java @@ -21,6 +21,7 @@ package org.elasticsearch.index.shard.service; import org.apache.lucene.document.Document; import org.apache.lucene.index.CheckIndex; +import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.Term; import org.apache.lucene.search.BooleanClause; import org.apache.lucene.search.FilterClause; @@ -188,10 +189,10 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I if (state == IndexShardState.RELOCATED) { throw new IndexShardRelocatedException(shardId); } - engine.start(); if (checkIndex) { checkIndex(true); } + engine.start(); scheduleRefresherIfNeeded(); logger.debug("state: [{}]->[{}]", state, IndexShardState.STARTED); state = IndexShardState.STARTED; @@ -436,6 +437,10 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I if (state != IndexShardState.RECOVERING) { throw new IndexShardNotRecoveringException(shardId, state); } + // also check here, before we apply the translog + if (checkIndex) { + checkIndex(true); + } engine.start(); } @@ -455,9 +460,6 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I if (withFlush) { engine.flush(new Engine.Flush()); } - if (checkIndex) { - checkIndex(true); - } synchronized (mutex) { logger.debug("state: [{}]->[{}]", state, IndexShardState.STARTED); state = IndexShardState.STARTED; @@ -576,6 +578,9 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I private void checkIndex(boolean throwException) throws IndexShardException { try { + if (!IndexReader.indexExists(store.directory())) { + return; + } CheckIndex checkIndex = new CheckIndex(store.directory()); FastByteArrayOutputStream os = new FastByteArrayOutputStream(); PrintStream out = new PrintStream(os); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/Store.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/Store.java index 8d916ae4a4d..7dd7c827721 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/Store.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/Store.java @@ -20,6 +20,7 @@ package org.elasticsearch.index.store; import org.apache.lucene.store.Directory; +import org.apache.lucene.store.IndexOutput; import org.elasticsearch.common.collect.ImmutableMap; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.index.shard.IndexShardComponent; @@ -36,6 +37,12 @@ public interface Store extends IndexShardComponent { */ Directory directory(); + IndexOutput createOutputWithNoChecksum(String name) throws IOException; + + void writeChecksum(String name, String checksum) throws IOException; + + StoreFileMetaData metaData(String name) throws IOException; + ImmutableMap list() throws IOException; /** diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/StoreFileMetaData.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/StoreFileMetaData.java index af98cc613d7..841c3ca8053 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/StoreFileMetaData.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/StoreFileMetaData.java @@ -23,6 +23,7 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Streamable; +import javax.annotation.Nullable; import java.io.IOException; /** @@ -36,13 +37,16 @@ public class StoreFileMetaData implements Streamable { private long length; + private String checksum; + StoreFileMetaData() { } - public StoreFileMetaData(String name, long length, long lastModified) { + public StoreFileMetaData(String name, long length, long lastModified, String checksum) { this.name = name; this.lastModified = lastModified; this.length = length; + this.checksum = checksum; } public String name() { @@ -57,6 +61,17 @@ public class StoreFileMetaData implements Streamable { return length; } + @Nullable public String checksum() { + return this.checksum; + } + + public boolean isSame(StoreFileMetaData other) { + if (checksum != null && other.checksum != null) { + return checksum.equals(other.checksum); + } + return length == other.length; + } + public static StoreFileMetaData readStoreFileMetaData(StreamInput in) throws IOException { StoreFileMetaData md = new StoreFileMetaData(); md.readFrom(in); @@ -64,16 +79,25 @@ public class StoreFileMetaData implements Streamable { } @Override public String toString() { - return "name [" + name + "], length [" + length + "]"; + return "name [" + name + "], length [" + length + "], checksum [" + checksum + "]"; } @Override public void readFrom(StreamInput in) throws IOException { name = in.readUTF(); length = in.readVLong(); + if (in.readBoolean()) { + checksum = in.readUTF(); + } } @Override public void writeTo(StreamOutput out) throws IOException { out.writeUTF(name); out.writeVLong(length); + if (checksum == null) { + out.writeBoolean(false); + } else { + out.writeBoolean(true); + out.writeUTF(checksum); + } } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/support/AbstractStore.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/support/AbstractStore.java index 047b2b8a0f2..9d5577a78c7 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/support/AbstractStore.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/support/AbstractStore.java @@ -20,7 +20,10 @@ package org.elasticsearch.index.store.support; import org.apache.lucene.store.*; +import org.elasticsearch.common.Digest; +import org.elasticsearch.common.Hex; import org.elasticsearch.common.Strings; +import org.elasticsearch.common.Unicode; import org.elasticsearch.common.collect.ImmutableMap; import org.elasticsearch.common.collect.MapBuilder; import org.elasticsearch.common.lucene.Directories; @@ -35,6 +38,7 @@ import org.elasticsearch.index.store.StoreFileMetaData; import java.io.FileNotFoundException; import java.io.IOException; +import java.security.MessageDigest; import java.util.Map; /** @@ -73,7 +77,7 @@ public abstract class AbstractStore extends AbstractIndexShardComponent implemen return builder.build(); } - private StoreFileMetaData metaData(String name) throws IOException { + public StoreFileMetaData metaData(String name) throws IOException { StoreFileMetaData md = filesMetadata.get(name); if (md == null) { return null; @@ -108,6 +112,24 @@ public abstract class AbstractStore extends AbstractIndexShardComponent implemen directory().close(); } + @Override public IndexOutput createOutputWithNoChecksum(String name) throws IOException { + return ((StoreDirectory) directory()).createOutput(name, false); + } + + @Override public void writeChecksum(String name, String checksum) throws IOException { + // write the checksum (using the delegate, so we won't checksum this one as well...) + IndexOutput checkSumOutput = ((StoreDirectory) directory()).delegate().createOutput(name + ".cks"); + byte[] checksumBytes = Unicode.fromStringAsBytes(checksum); + checkSumOutput.writeBytes(checksumBytes, checksumBytes.length); + checkSumOutput.close(); + // update the metadata to include the checksum + synchronized (mutex) { + StoreFileMetaData metaData = filesMetadata.get(name); + metaData = new StoreFileMetaData(metaData.name(), metaData.length(), metaData.lastModified(), checksum); + filesMetadata = MapBuilder.newMapBuilder(filesMetadata).put(name, metaData).immutableMap(); + } + } + /** * The idea of the store directory is to cache file level meta data, as well as md5 of it */ @@ -120,13 +142,34 @@ public abstract class AbstractStore extends AbstractIndexShardComponent implemen synchronized (mutex) { MapBuilder builder = MapBuilder.newMapBuilder(); for (String file : delegate.listAll()) { - builder.put(file, new StoreFileMetaData(file, delegate.fileLength(file), delegate.fileModified(file))); + if (file.endsWith(".cks")) { // ignore checksum files here + continue; + } + // try and load the checksum for the file + String checksum = null; + if (delegate.fileExists(file + ".cks")) { + IndexInput indexInput = delegate.openInput(file + ".cks"); + try { + if (indexInput.length() > 0) { + byte[] checksumBytes = new byte[(int) indexInput.length()]; + indexInput.readBytes(checksumBytes, 0, checksumBytes.length, false); + checksum = Unicode.fromBytes(checksumBytes); + } + } finally { + indexInput.close(); + } + } + builder.put(file, new StoreFileMetaData(file, delegate.fileLength(file), delegate.fileModified(file), checksum)); } filesMetadata = builder.immutableMap(); files = filesMetadata.keySet().toArray(new String[filesMetadata.size()]); } } + public Directory delegate() { + return delegate; + } + @Override public String[] listAll() throws IOException { return files; } @@ -152,7 +195,7 @@ public abstract class AbstractStore extends AbstractIndexShardComponent implemen synchronized (mutex) { StoreFileMetaData metaData = filesMetadata.get(name); if (metaData != null) { - metaData = new StoreFileMetaData(metaData.name(), metaData.length(), delegate.fileModified(name)); + metaData = new StoreFileMetaData(metaData.name(), metaData.length(), delegate.fileModified(name), metaData.checksum()); filesMetadata = MapBuilder.newMapBuilder(filesMetadata).put(name, metaData).immutableMap(); } } @@ -160,6 +203,11 @@ public abstract class AbstractStore extends AbstractIndexShardComponent implemen @Override public void deleteFile(String name) throws IOException { delegate.deleteFile(name); + try { + delegate.deleteFile(name + ".cks"); + } catch (Exception e) { + // ignore + } synchronized (mutex) { filesMetadata = MapBuilder.newMapBuilder(filesMetadata).remove(name).immutableMap(); files = filesMetadata.keySet().toArray(new String[filesMetadata.size()]); @@ -179,13 +227,23 @@ public abstract class AbstractStore extends AbstractIndexShardComponent implemen } @Override public IndexOutput createOutput(String name) throws IOException { + return createOutput(name, true); + } + + public IndexOutput createOutput(String name, boolean computeChecksum) throws IOException { IndexOutput out = delegate.createOutput(name); + // delete the relevant cks file for an existing file, if exists + try { + delegate.deleteFile(name + ".cks"); + } catch (Exception e) { + // ignore + } synchronized (mutex) { - StoreFileMetaData metaData = new StoreFileMetaData(name, -1, -1); + StoreFileMetaData metaData = new StoreFileMetaData(name, -1, -1, null); filesMetadata = MapBuilder.newMapBuilder(filesMetadata).put(name, metaData).immutableMap(); files = filesMetadata.keySet().toArray(new String[filesMetadata.size()]); } - return new StoreIndexOutput(out, name); + return new StoreIndexOutput(out, name, computeChecksum); } @Override public IndexInput openInput(String name) throws IOException { @@ -227,29 +285,63 @@ public abstract class AbstractStore extends AbstractIndexShardComponent implemen @Override public void sync(String name) throws IOException { if (sync) { delegate.sync(name); + try { + if (delegate.fileExists(name + ".cks")) { + delegate.sync(name + ".cks"); + } + } catch (Exception e) { + //ignore + } } } @Override public void forceSync(String name) throws IOException { delegate.sync(name); + try { + if (delegate.fileExists(name + ".cks")) { + delegate.sync(name + ".cks"); + } + } catch (Exception e) { + //ignore + } } } - private class StoreIndexOutput extends IndexOutput { + class StoreIndexOutput extends IndexOutput { private final IndexOutput delegate; private final String name; - private StoreIndexOutput(IndexOutput delegate, String name) { + private final MessageDigest digest; + + StoreIndexOutput(IndexOutput delegate, String name, boolean computeChecksum) { this.delegate = delegate; this.name = name; + if (computeChecksum) { + if ("segments.gen".equals(name)) { + // no need to create checksum for segments.gen since its not snapshot to recovery + this.digest = null; + } else { + this.digest = Digest.getMd5Digest(); + } + } else { + this.digest = null; + } } @Override public void close() throws IOException { delegate.close(); + String checksum = null; + if (digest != null) { + checksum = Hex.encodeHexString(digest.digest()); + IndexOutput checkSumOutput = ((StoreDirectory) directory()).delegate().createOutput(name + ".cks"); + byte[] checksumBytes = Unicode.fromStringAsBytes(checksum); + checkSumOutput.writeBytes(checksumBytes, checksumBytes.length); + checkSumOutput.close(); + } synchronized (mutex) { - StoreFileMetaData md = new StoreFileMetaData(name, directory().fileLength(name), directory().fileModified(name)); + StoreFileMetaData md = new StoreFileMetaData(name, directory().fileLength(name), directory().fileModified(name), checksum); filesMetadata = MapBuilder.newMapBuilder(filesMetadata).put(name, md).immutableMap(); files = filesMetadata.keySet().toArray(new String[filesMetadata.size()]); } @@ -257,10 +349,16 @@ public abstract class AbstractStore extends AbstractIndexShardComponent implemen @Override public void writeByte(byte b) throws IOException { delegate.writeByte(b); + if (digest != null) { + digest.update(b); + } } @Override public void writeBytes(byte[] b, int offset, int length) throws IOException { delegate.writeBytes(b, offset, length); + if (digest != null) { + digest.update(b, offset, length); + } } // don't override it, base class method simple reads from input and writes to this output @@ -277,6 +375,9 @@ public abstract class AbstractStore extends AbstractIndexShardComponent implemen } @Override public void seek(long pos) throws IOException { + // seek might be called on files, which means that the checksum is not file checksum + // but a checksum of the bytes written to this stream, which is the same for each + // type of file in lucene delegate.seek(pos); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/indices/store/TransportNodesListShardStoreMetaData.java b/modules/elasticsearch/src/main/java/org/elasticsearch/indices/store/TransportNodesListShardStoreMetaData.java index e837a303bc4..b06cbbbb788 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/indices/store/TransportNodesListShardStoreMetaData.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/indices/store/TransportNodesListShardStoreMetaData.java @@ -27,10 +27,12 @@ import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.Unicode; import org.elasticsearch.common.collect.ImmutableMap; import org.elasticsearch.common.collect.Lists; import org.elasticsearch.common.collect.Maps; import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.Streams; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Streamable; @@ -157,7 +159,19 @@ public class TransportNodesListShardStoreMetaData extends TransportNodesOperatio } Map files = Maps.newHashMap(); for (File file : indexFile.listFiles()) { - files.put(file.getName(), new StoreFileMetaData(file.getName(), file.length(), file.lastModified())); + if (file.getName().endsWith(".cks")) { + continue; + } + // try and load the checksum + String checksum = null; + File checksumFile = new File(file.getParentFile(), file.getName() + ".cks"); + if (checksumFile.exists()) { + byte[] checksumBytes = Streams.copyToByteArray(checksumFile); + if (checksumBytes.length > 0) { + checksum = Unicode.fromBytes(checksumBytes); + } + } + files.put(file.getName(), new StoreFileMetaData(file.getName(), file.length(), file.lastModified(), checksum)); } return new StoreFilesMetaData(false, shardId, files); } diff --git a/modules/elasticsearch/src/test/java/org/elasticsearch/index/gateway/CommitPointsTests.java b/modules/elasticsearch/src/test/java/org/elasticsearch/index/gateway/CommitPointsTests.java index abcf7e19627..59623fcf298 100644 --- a/modules/elasticsearch/src/test/java/org/elasticsearch/index/gateway/CommitPointsTests.java +++ b/modules/elasticsearch/src/test/java/org/elasticsearch/index/gateway/CommitPointsTests.java @@ -38,12 +38,12 @@ public class CommitPointsTests { @Test public void testCommitPointXContent() throws Exception { ArrayList indexFiles = Lists.newArrayList(); - indexFiles.add(new CommitPoint.FileInfo("file1", "file1_p", 100)); - indexFiles.add(new CommitPoint.FileInfo("file2", "file2_p", 200)); + indexFiles.add(new CommitPoint.FileInfo("file1", "file1_p", 100, "ck1")); + indexFiles.add(new CommitPoint.FileInfo("file2", "file2_p", 200, "ck2")); ArrayList translogFiles = Lists.newArrayList(); - translogFiles.add(new CommitPoint.FileInfo("t_file1", "t_file1_p", 100)); - translogFiles.add(new CommitPoint.FileInfo("t_file2", "t_file2_p", 200)); + translogFiles.add(new CommitPoint.FileInfo("t_file1", "t_file1_p", 100, null)); + translogFiles.add(new CommitPoint.FileInfo("t_file2", "t_file2_p", 200, null)); CommitPoint commitPoint = new CommitPoint(1, "test", CommitPoint.Type.GENERATED, indexFiles, translogFiles); @@ -59,6 +59,7 @@ public class CommitPointsTests { assertThat(desCp.indexFiles().get(i).name(), equalTo(commitPoint.indexFiles().get(i).name())); assertThat(desCp.indexFiles().get(i).physicalName(), equalTo(commitPoint.indexFiles().get(i).physicalName())); assertThat(desCp.indexFiles().get(i).length(), equalTo(commitPoint.indexFiles().get(i).length())); + assertThat(desCp.indexFiles().get(i).checksum(), equalTo(commitPoint.indexFiles().get(i).checksum())); } assertThat(desCp.translogFiles().size(), equalTo(commitPoint.translogFiles().size())); @@ -66,6 +67,7 @@ public class CommitPointsTests { assertThat(desCp.translogFiles().get(i).name(), equalTo(commitPoint.translogFiles().get(i).name())); assertThat(desCp.translogFiles().get(i).physicalName(), equalTo(commitPoint.translogFiles().get(i).physicalName())); assertThat(desCp.translogFiles().get(i).length(), equalTo(commitPoint.translogFiles().get(i).length())); + assertThat(desCp.translogFiles().get(i).checksum(), nullValue()); } } } diff --git a/modules/test/integration/src/test/java/org/elasticsearch/test/stress/fullrestart/FullRestartStressTest.java b/modules/test/integration/src/test/java/org/elasticsearch/test/stress/fullrestart/FullRestartStressTest.java new file mode 100644 index 00000000000..492f61b45fa --- /dev/null +++ b/modules/test/integration/src/test/java/org/elasticsearch/test/stress/fullrestart/FullRestartStressTest.java @@ -0,0 +1,239 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.test.stress.fullrestart; + +import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; +import org.elasticsearch.action.count.CountResponse; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.client.Requests; +import org.elasticsearch.client.action.bulk.BulkRequestBuilder; +import org.elasticsearch.common.UUID; +import org.elasticsearch.common.io.FileSystemUtils; +import org.elasticsearch.common.logging.ESLogger; +import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.jsr166y.ThreadLocalRandom; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.env.NodeEnvironment; +import org.elasticsearch.node.Node; +import org.elasticsearch.node.NodeBuilder; +import org.elasticsearch.node.internal.InternalNode; + +import java.io.File; +import java.util.concurrent.atomic.AtomicLong; + +import static org.elasticsearch.index.query.xcontent.QueryBuilders.*; + +/** + * @author kimchy (shay.banon) + */ +public class FullRestartStressTest { + + private final ESLogger logger = Loggers.getLogger(getClass()); + + private int numberOfNodes = 4; + + private boolean clearNodeWork = false; + + private int numberOfIndices = 5; + private int textTokens = 150; + private int numberOfFields = 10; + private int bulkSize = 1000; + private int numberOfDocsPerRound = 50000; + + private Settings settings = ImmutableSettings.Builder.EMPTY_SETTINGS; + + private TimeValue period = TimeValue.timeValueMinutes(20); + + private AtomicLong indexCounter = new AtomicLong(); + + public FullRestartStressTest numberOfNodes(int numberOfNodes) { + this.numberOfNodes = numberOfNodes; + return this; + } + + public FullRestartStressTest numberOfIndices(int numberOfIndices) { + this.numberOfIndices = numberOfIndices; + return this; + } + + public FullRestartStressTest textTokens(int textTokens) { + this.textTokens = textTokens; + return this; + } + + public FullRestartStressTest numberOfFields(int numberOfFields) { + this.numberOfFields = numberOfFields; + return this; + } + + public FullRestartStressTest bulkSize(int bulkSize) { + this.bulkSize = bulkSize; + return this; + } + + public FullRestartStressTest numberOfDocsPerRound(int numberOfDocsPerRound) { + this.numberOfDocsPerRound = numberOfDocsPerRound; + return this; + } + + public FullRestartStressTest settings(Settings settings) { + this.settings = settings; + return this; + } + + public FullRestartStressTest period(TimeValue period) { + this.period = period; + return this; + } + + public FullRestartStressTest clearNodeWork(boolean clearNodeWork) { + this.clearNodeWork = clearNodeWork; + return this; + } + + public void run() throws Exception { + long numberOfRounds = 0; + long testStart = System.currentTimeMillis(); + while (true) { + Node[] nodes = new Node[numberOfNodes]; + for (int i = 0; i < nodes.length; i++) { + nodes[i] = NodeBuilder.nodeBuilder().settings(settings).node(); + } + Node client = NodeBuilder.nodeBuilder().settings(settings).client(true).node(); + + // verify that the indices are there + for (int i = 0; i < numberOfIndices; i++) { + try { + client.client().admin().indices().prepareCreate("test" + i).execute().actionGet(); + } catch (Exception e) { + // might already exists, fine + } + } + + logger.info("*** Waiting for GREEN status"); + try { + ClusterHealthResponse clusterHealth = client.client().admin().cluster().prepareHealth().setWaitForGreenStatus().setTimeout("10m").execute().actionGet(); + if (clusterHealth.timedOut()) { + logger.warn("timed out waiting for green status...."); + } + } catch (Exception e) { + logger.warn("failed to execute cluster health...."); + } + + CountResponse count = client.client().prepareCount().setQuery(matchAllQuery()).execute().actionGet(); + logger.info("*** index_count [{}], expected_count [{}]", count.count(), indexCounter.get()); + // verify count + for (int i = 0; i < (nodes.length * 5); i++) { + count = client.client().prepareCount().setQuery(matchAllQuery()).execute().actionGet(); + logger.debug("index_count [{}], expected_count [{}]", count.count(), indexCounter.get()); + if (count.count() != indexCounter.get()) { + logger.warn("!!! count does not match, index_count [{}], expected_count [{}]", count.count(), indexCounter.get()); + throw new Exception("failed test, count does not match..."); + } + } + + // verify search + for (int i = 0; i < (nodes.length * 5); i++) { + // do a search with norms field, so we don't rely on match all filtering cache + SearchResponse search = client.client().prepareSearch().setQuery(matchAllQuery().normsField("field")).execute().actionGet(); + logger.debug("index_count [{}], expected_count [{}]", search.hits().totalHits(), indexCounter.get()); + if (count.count() != indexCounter.get()) { + logger.warn("!!! search does not match, index_count [{}], expected_count [{}]", search.hits().totalHits(), indexCounter.get()); + throw new Exception("failed test, count does not match..."); + } + } + + logger.info("*** ROUND {}", ++numberOfRounds); + // bulk index data + int numberOfBulks = numberOfDocsPerRound / bulkSize; + for (int b = 0; b < numberOfBulks; b++) { + BulkRequestBuilder bulk = client.client().prepareBulk(); + for (int k = 0; k < bulkSize; k++) { + StringBuffer sb = new StringBuffer(); + XContentBuilder json = XContentFactory.jsonBuilder().startObject() + .field("field", "value" + ThreadLocalRandom.current().nextInt()); + + int fields = ThreadLocalRandom.current().nextInt() % numberOfFields; + for (int i = 0; i < fields; i++) { + json.field("num_" + i, ThreadLocalRandom.current().nextDouble()); + int tokens = ThreadLocalRandom.current().nextInt() % textTokens; + sb.setLength(0); + for (int j = 0; j < tokens; j++) { + sb.append(UUID.randomBase64UUID()).append(' '); + } + json.field("text_" + i, sb.toString()); + } + + json.endObject(); + + bulk.add(Requests.indexRequest("test" + (Math.abs(ThreadLocalRandom.current().nextInt()) % numberOfIndices)).type("type1").source(json)); + indexCounter.incrementAndGet(); + } + bulk.execute().actionGet(); + } + + client.client().admin().indices().prepareGatewaySnapshot().execute().actionGet(); + + client.close(); + for (Node node : nodes) { + File nodeWork = ((InternalNode) node).injector().getInstance(NodeEnvironment.class).nodeLocation(); + node.close(); + if (clearNodeWork && !settings.get("gateway.type").equals("local")) { + FileSystemUtils.deleteRecursively(nodeWork); + } + } + + if ((System.currentTimeMillis() - testStart) > period.millis()) { + logger.info("test finished, full_restart_rounds [{}]", numberOfRounds); + break; + } + + } + } + + public static void main(String[] args) throws Exception { + System.setProperty("es.logger.prefix", ""); + + int numberOfNodes = 2; + Settings settings = ImmutableSettings.settingsBuilder() + .put("index.shard.check_index", true) + .put("gateway.type", "fs") + .put("gateway.recover_after_nodes", numberOfNodes) + .put("index.number_of_shards", 1) + .build(); + + FullRestartStressTest test = new FullRestartStressTest() + .settings(settings) + .period(TimeValue.timeValueMinutes(20)) + .clearNodeWork(false) // only applies to shared gateway + .numberOfNodes(numberOfNodes) + .numberOfIndices(1) + .textTokens(150) + .numberOfFields(10) + .bulkSize(1000) + .numberOfDocsPerRound(10000); + + test.run(); + } +} diff --git a/modules/test/integration/src/test/java/org/elasticsearch/test/stress/rollingrestart/RollingRestartStressTest.java b/modules/test/integration/src/test/java/org/elasticsearch/test/stress/rollingrestart/RollingRestartStressTest.java index 4ff43cd2928..cb0a11a8107 100644 --- a/modules/test/integration/src/test/java/org/elasticsearch/test/stress/rollingrestart/RollingRestartStressTest.java +++ b/modules/test/integration/src/test/java/org/elasticsearch/test/stress/rollingrestart/RollingRestartStressTest.java @@ -104,7 +104,7 @@ public class RollingRestartStressTest { return this; } - public RollingRestartStressTest cleanNodeWork(boolean cleanNodeWork) { + public RollingRestartStressTest cleanNodeWork(boolean clearNodeWork) { this.clearNodeWork = clearNodeWork; return this; } @@ -222,7 +222,7 @@ public class RollingRestartStressTest { XContentBuilder json = XContentFactory.jsonBuilder().startObject() .field("field", "value" + ThreadLocalRandom.current().nextInt()); - int fields = ThreadLocalRandom.current().nextInt() % numberOfFields; + int fields = Math.abs(ThreadLocalRandom.current().nextInt()) % numberOfFields; for (int i = 0; i < fields; i++) { json.field("num_" + i, ThreadLocalRandom.current().nextDouble()); int tokens = ThreadLocalRandom.current().nextInt() % textTokens;