diff --git a/src/main/java/org/elasticsearch/gateway/GatewayAllocator.java b/src/main/java/org/elasticsearch/gateway/GatewayAllocator.java index 399e9607cfb..84d63ed3322 100644 --- a/src/main/java/org/elasticsearch/gateway/GatewayAllocator.java +++ b/src/main/java/org/elasticsearch/gateway/GatewayAllocator.java @@ -330,17 +330,27 @@ public class GatewayAllocator extends AbstractComponent { if (primaryNodeStore != null && primaryNodeStore.allocated()) { long sizeMatched = 0; - for (StoreFileMetaData storeFileMetaData : storeFilesMetaData) { - if (primaryNodeStore.fileExists(storeFileMetaData.name()) && primaryNodeStore.file(storeFileMetaData.name()).isSame(storeFileMetaData)) { - sizeMatched += storeFileMetaData.length(); - } - } - logger.trace("{}: node [{}] has [{}/{}] bytes of re-usable data", - shard, discoNode.name(), new ByteSizeValue(sizeMatched), sizeMatched); - if (sizeMatched > lastSizeMatched) { - lastSizeMatched = sizeMatched; - lastDiscoNodeMatched = discoNode; + // see if we have a sync id we can make use of + if (storeFilesMetaData.syncId() != null && storeFilesMetaData.syncId().equals(primaryNodeStore.syncId())) { + logger.trace("{}: node [{}] has same sync id {} as primary", shard, discoNode.name(), storeFilesMetaData.syncId()); lastNodeMatched = node; + lastSizeMatched = Long.MAX_VALUE; + lastDiscoNodeMatched = discoNode; + } else { + for (StoreFileMetaData storeFileMetaData : storeFilesMetaData) { + logger.trace("{}: node [{}] has file {}", + shard, discoNode.name(), storeFileMetaData.name()); + if (primaryNodeStore.fileExists(storeFileMetaData.name()) && primaryNodeStore.file(storeFileMetaData.name()).isSame(storeFileMetaData)) { + sizeMatched += storeFileMetaData.length(); + } + } + logger.trace("{}: node [{}] has [{}/{}] bytes of re-usable data", + shard, discoNode.name(), new ByteSizeValue(sizeMatched), sizeMatched); + if (sizeMatched > lastSizeMatched) { + lastSizeMatched = sizeMatched; + lastDiscoNodeMatched = discoNode; + lastNodeMatched = node; + } } } } diff --git a/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 1a717416a87..d038509c77e 100644 --- a/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -73,7 +73,9 @@ public class InternalEngine extends Engine { private final FailEngineOnMergeFailure mergeSchedulerFailureListener; private final MergeSchedulerListener mergeSchedulerListener; - /** When we last pruned expired tombstones from versionMap.deletes: */ + /** + * When we last pruned expired tombstones from versionMap.deletes: + */ private volatile long lastDeleteVersionPruneTimeMSec; private final ShardIndexingService indexingService; @@ -150,10 +152,21 @@ public class InternalEngine extends Engine { long nextTranslogID = translogId.v2(); translog.newTranslog(nextTranslogID); translogIdGenerator.set(nextTranslogID); + if (translogId.v1() != null && skipInitialTranslogRecovery == false) { + // recovering from local store recoverFromTranslog(translogId.v1(), transformer); } else { - flush(true, true); + // recovering from a different source + // nocommit + // when we create the Engine on a target shard after recovery we must make sure that + // if a sync id is there then it is not overwritten by a forced flush + if (lastCommittedSegmentInfos.getUserData().get(SYNC_COMMIT_ID) == null) { + flush(true, true); + } else { + SyncedFlushResult syncedFlushResult = syncFlushIfNoPendingChanges(lastCommittedSegmentInfos.getUserData().get(SYNC_COMMIT_ID), lastCommittedSegmentInfos.getId()); + assert syncedFlushResult.equals(SyncedFlushResult.SUCCESS) : "skipped translog recovery but synced flush failed"; + } } } catch (IOException | EngineException ex) { throw new EngineCreationFailureException(shardId, "failed to recover from translog", ex); @@ -185,7 +198,7 @@ public class InternalEngine extends Engine { final long currentTranslogId = Long.parseLong(commitUserData.get(Translog.TRANSLOG_ID_KEY)); return new Tuple<>(currentTranslogId, nextTranslogId); } - // translog id is not in the metadata - fix this inconsistency some code relies on this and old indices might not have it. + // translog id is not in the metadata - fix this inconsistency some code relies on this and old indices might not have it. writer.setCommitData(Collections.singletonMap(Translog.TRANSLOG_ID_KEY, Long.toString(nextTranslogId))); commitIndexWriter(writer); logger.debug("no translog ID present in the current commit - creating one"); @@ -1058,7 +1071,8 @@ public class InternalEngine extends Engine { boolean verbose = false; try { verbose = Boolean.parseBoolean(System.getProperty("tests.verbose")); - } catch (Throwable ignore) {} + } catch (Throwable ignore) { + } iwc.setInfoStream(verbose ? InfoStream.getDefault() : new LoggerInfoStream(logger)); iwc.setMergeScheduler(mergeScheduler.newMergeScheduler()); MergePolicy mergePolicy = mergePolicyProvider.getMergePolicy(); @@ -1109,7 +1123,9 @@ public class InternalEngine extends Engine { } } - /** Extended SearcherFactory that warms the segments if needed when acquiring a new searcher */ + /** + * Extended SearcherFactory that warms the segments if needed when acquiring a new searcher + */ class SearchFactory extends EngineSearcherFactory { SearchFactory(EngineConfig engineConfig) { @@ -1271,9 +1287,20 @@ public class InternalEngine extends Engine { IOUtils.closeWhileHandlingException(translog); throw new EngineException(shardId, "failed to recover from translog", e); } - flush(true, true); + + // nocommit: when we recover from gateway we recover ops from the translog we found and then create a new translog with new id. + // we flush here because we need to write a new translog id after recovery. + // we need to make sure here that an existing sync id is not overwritten by this flush if one exists. + // so, in case the old translog did not contain any ops, we should use the old sync id for flushing. + // nocommit because not sure if this here is the best solution for this... if (operationsRecovered > 0) { + flush(true, true); refresh("translog recovery"); + } else if (lastCommittedSegmentInfos.getUserData().get(SYNC_COMMIT_ID) == null) { + flush(true, true); + } else { + SyncedFlushResult syncedFlushResult = syncFlushIfNoPendingChanges(lastCommittedSegmentInfos.getUserData().get(SYNC_COMMIT_ID), lastCommittedSegmentInfos.getId()); + assert syncedFlushResult.equals(SyncedFlushResult.SUCCESS) : "no operations during translog recovery but synced flush failed"; } translog.clearUnreferenced(); } diff --git a/src/main/java/org/elasticsearch/index/snapshots/blobstore/BlobStoreIndexShardRepository.java b/src/main/java/org/elasticsearch/index/snapshots/blobstore/BlobStoreIndexShardRepository.java index d8c906886b0..7e9360ccbbc 100644 --- a/src/main/java/org/elasticsearch/index/snapshots/blobstore/BlobStoreIndexShardRepository.java +++ b/src/main/java/org/elasticsearch/index/snapshots/blobstore/BlobStoreIndexShardRepository.java @@ -186,7 +186,7 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements @Override public void verify(String seed) { - BlobContainer testBlobContainer = blobStore.blobContainer(basePath);; + BlobContainer testBlobContainer = blobStore.blobContainer(basePath); DiscoveryNode localNode = clusterService.localNode(); if (testBlobContainer.blobExists(testBlobPrefix(seed) + "-master")) { try (OutputStream outputStream = testBlobContainer.createOutput(testBlobPrefix(seed) + "-" + localNode.getId())) { @@ -232,7 +232,7 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements * Serializes snapshot to JSON * * @param snapshot snapshot - * @param stream the stream to output the snapshot JSON represetation to + * @param stream the stream to output the snapshot JSON represetation to * @throws IOException if an IOException occurs */ public static void writeSnapshot(BlobStoreIndexShardSnapshot snapshot, OutputStream stream) throws IOException { @@ -247,7 +247,7 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements * @param stream JSON * @return snapshot * @throws IOException if an IOException occurs - * */ + */ public static BlobStoreIndexShardSnapshot readSnapshot(InputStream stream) throws IOException { try (XContentParser parser = XContentFactory.xContent(XContentType.JSON).createParser(stream)) { parser.nextToken(); @@ -314,7 +314,7 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements public BlobStoreIndexShardSnapshot loadSnapshot() { BlobStoreIndexShardSnapshot snapshot; try (InputStream stream = blobContainer.openInput(snapshotBlobName(snapshotId))) { - snapshot = readSnapshot(stream); + snapshot = readSnapshot(stream); } catch (IOException ex) { throw new IndexShardRestoreFailedException(shardId, "failed to read shard snapshot file", ex); } @@ -472,7 +472,7 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements // don't have this hash we try to read that hash from the blob store // in a bwc compatible way. maybeRecalculateMetadataHash(blobContainer, fileInfo, metadata); - } catch (Throwable e) { + } catch (Throwable e) { logger.warn("{} Can't calculate hash from blob for file [{}] [{}]", e, shardId, fileInfo.physicalName(), fileInfo.metadata()); } if (fileInfo == null || !fileInfo.isSame(md) || !snapshotFileExistsInBlobs(fileInfo, blobs)) { @@ -550,7 +550,7 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements try (IndexInput indexInput = store.openVerifyingInput(file, IOContext.READONCE, fileInfo.metadata())) { for (int i = 0; i < fileInfo.numberOfParts(); i++) { final InputStreamIndexInput inputStreamIndexInput = new InputStreamIndexInput(indexInput, fileInfo.partBytes()); - InputStream inputStream = snapshotRateLimiter == null ? inputStreamIndexInput : new RateLimitingInputStream(inputStreamIndexInput, snapshotRateLimiter, snapshotThrottleListener); + InputStream inputStream = snapshotRateLimiter == null ? inputStreamIndexInput : new RateLimitingInputStream(inputStreamIndexInput, snapshotRateLimiter, snapshotThrottleListener); inputStream = new AbortableInputStream(inputStream, fileInfo.physicalName()); try (OutputStream output = blobContainer.createOutput(fileInfo.partName(i))) { int len; @@ -727,14 +727,14 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements // don't have this hash we try to read that hash from the blob store // in a bwc compatible way. maybeRecalculateMetadataHash(blobContainer, fileInfo, recoveryTargetMetadata); - } catch (Throwable e) { + } catch (Throwable e) { // if the index is broken we might not be able to read it logger.warn("{} Can't calculate hash from blog for file [{}] [{}]", e, shardId, fileInfo.physicalName(), fileInfo.metadata()); } snapshotMetaData.put(fileInfo.metadata().name(), fileInfo.metadata()); fileInfos.put(fileInfo.metadata().name(), fileInfo); } - final Store.MetadataSnapshot sourceMetaData = new Store.MetadataSnapshot(snapshotMetaData); + final Store.MetadataSnapshot sourceMetaData = new Store.MetadataSnapshot(snapshotMetaData, Collections.EMPTY_MAP); final Store.RecoveryDiff diff = sourceMetaData.recoveryDiff(recoveryTargetMetadata); for (StoreFileMetaData md : diff.identical) { FileInfo fileInfo = fileInfos.get(md.name()); @@ -804,8 +804,8 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements try (final IndexOutput indexOutput = store.createVerifyingOutput(fileInfo.physicalName(), fileInfo.metadata(), IOContext.DEFAULT)) { final byte[] buffer = new byte[BUFFER_SIZE]; int length; - while((length=stream.read(buffer))>0){ - indexOutput.writeBytes(buffer,0,length); + while ((length = stream.read(buffer)) > 0) { + indexOutput.writeBytes(buffer, 0, length); recoveryState.getIndex().addRecoveredBytesToFile(fileInfo.name(), length); if (restoreRateLimiter != null) { rateLimiterListener.onRestorePause(restoreRateLimiter.pause(length)); @@ -838,7 +838,7 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements } } - + public interface RateLimiterListener { void onRestorePause(long nanos); diff --git a/src/main/java/org/elasticsearch/index/store/Store.java b/src/main/java/org/elasticsearch/index/store/Store.java index 71dd77c690e..de86d3b4dc3 100644 --- a/src/main/java/org/elasticsearch/index/store/Store.java +++ b/src/main/java/org/elasticsearch/index/store/Store.java @@ -28,6 +28,7 @@ import org.apache.lucene.store.*; import org.apache.lucene.util.*; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ExceptionsHelper; +import org.apache.lucene.util.Version; import org.elasticsearch.common.Strings; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.inject.Inject; @@ -35,6 +36,7 @@ import org.elasticsearch.common.io.Streams; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Streamable; +import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.lucene.Lucene; @@ -46,6 +48,7 @@ import org.elasticsearch.common.util.SingleObjectCache; import org.elasticsearch.common.util.concurrent.AbstractRefCounted; import org.elasticsearch.common.util.concurrent.RefCounted; import org.elasticsearch.env.ShardLock; +import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.settings.IndexSettings; import org.elasticsearch.index.shard.AbstractIndexShardComponent; import org.elasticsearch.index.shard.ShardId; @@ -166,10 +169,10 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref * Returns a new MetadataSnapshot for the latest commit in this store or * an empty snapshot if no index exists or can not be opened. * - * @throws CorruptIndexException if the lucene index is corrupted. This can be caused by a checksum mismatch or an - * unexpected exception when opening the index reading the segments file. - * @throws IndexFormatTooOldException if the lucene index is too old to be opened. - * @throws IndexFormatTooNewException if the lucene index is too new to be opened. + * @throws CorruptIndexException if the lucene index is corrupted. This can be caused by a checksum mismatch or an + * unexpected exception when opening the index reading the segments file. + * @throws IndexFormatTooOldException if the lucene index is too old to be opened. + * @throws IndexFormatTooNewException if the lucene index is too new to be opened. */ public MetadataSnapshot getMetadataOrEmpty() throws IOException { try { @@ -185,13 +188,13 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref /** * Returns a new MetadataSnapshot for the latest commit in this store. * - * @throws CorruptIndexException if the lucene index is corrupted. This can be caused by a checksum mismatch or an - * unexpected exception when opening the index reading the segments file. - * @throws IndexFormatTooOldException if the lucene index is too old to be opened. - * @throws IndexFormatTooNewException if the lucene index is too new to be opened. - * @throws FileNotFoundException if one or more files referenced by a commit are not present. - * @throws NoSuchFileException if one or more files referenced by a commit are not present. - * @throws IndexNotFoundException if no index / valid commit-point can be found in this store + * @throws CorruptIndexException if the lucene index is corrupted. This can be caused by a checksum mismatch or an + * unexpected exception when opening the index reading the segments file. + * @throws IndexFormatTooOldException if the lucene index is too old to be opened. + * @throws IndexFormatTooNewException if the lucene index is too new to be opened. + * @throws FileNotFoundException if one or more files referenced by a commit are not present. + * @throws NoSuchFileException if one or more files referenced by a commit are not present. + * @throws IndexNotFoundException if no index / valid commit-point can be found in this store */ public MetadataSnapshot getMetadata() throws IOException { return getMetadata(null); @@ -201,13 +204,13 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref * Returns a new MetadataSnapshot for the given commit. If the given commit is null * the latest commit point is used. * - * @throws CorruptIndexException if the lucene index is corrupted. This can be caused by a checksum mismatch or an - * unexpected exception when opening the index reading the segments file. - * @throws IndexFormatTooOldException if the lucene index is too old to be opened. - * @throws IndexFormatTooNewException if the lucene index is too new to be opened. - * @throws FileNotFoundException if one or more files referenced by a commit are not present. - * @throws NoSuchFileException if one or more files referenced by a commit are not present. - * @throws IndexNotFoundException if the commit point can't be found in this store + * @throws CorruptIndexException if the lucene index is corrupted. This can be caused by a checksum mismatch or an + * unexpected exception when opening the index reading the segments file. + * @throws IndexFormatTooOldException if the lucene index is too old to be opened. + * @throws IndexFormatTooNewException if the lucene index is too new to be opened. + * @throws FileNotFoundException if one or more files referenced by a commit are not present. + * @throws NoSuchFileException if one or more files referenced by a commit are not present. + * @throws IndexNotFoundException if the commit point can't be found in this store */ public MetadataSnapshot getMetadata(IndexCommit commit) throws IOException { ensureOpen(); @@ -363,7 +366,7 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref * @throws IOException if the index we try to read is corrupted */ public static MetadataSnapshot readMetadataSnapshot(Path indexLocation, ESLogger logger) throws IOException { - try (Directory dir = new SimpleFSDirectory(indexLocation)){ + try (Directory dir = new SimpleFSDirectory(indexLocation)) { failIfCorrupted(dir, new ShardId("", 1)); return new MetadataSnapshot(null, dir, logger); } catch (IndexNotFoundException ex) { @@ -433,7 +436,7 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref public boolean checkIntegrityNoException(StoreFileMetaData md) { return checkIntegrityNoException(md, directory()); } - + public static boolean checkIntegrityNoException(StoreFileMetaData md, Directory directory) { try { checkIntegrity(md, directory); @@ -454,7 +457,7 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref // throw exception if metadata is inconsistent if (!checksum.equals(md.checksum())) { throw new CorruptIndexException("inconsistent metadata: lucene checksum=" + checksum + - ", metadata checksum=" + md.checksum(), input); + ", metadata checksum=" + md.checksum(), input); } } else if (md.hasLegacyChecksum()) { // legacy checksum verification - no footer that we need to omit in the checksum! @@ -472,7 +475,7 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref String adler32 = Store.digestToString(checksum.getValue()); if (!adler32.equals(md.checksum())) { throw new CorruptIndexException("checksum failed (hardware problem?) : expected=" + md.checksum() + - " actual=" + adler32, input); + " actual=" + adler32, input); } } } @@ -530,7 +533,7 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref * * @param reason the reason for this cleanup operation logged for each deleted file * @param sourceMetaData the metadata used for cleanup. all files in this metadata should be kept around. - * @throws IOException if an IOException occurs + * @throws IOException if an IOException occurs * @throws IllegalStateException if the latest snapshot in this store differs from the given one after the cleanup. */ public void cleanupAndVerify(String reason, MetadataSnapshot sourceMetaData) throws IOException { @@ -549,7 +552,7 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref // FNF should not happen since we hold a write lock? } catch (IOException ex) { if (existingFile.startsWith(IndexFileNames.SEGMENTS) - || existingFile.equals(IndexFileNames.OLD_SEGMENTS_GEN)) { + || existingFile.equals(IndexFileNames.OLD_SEGMENTS_GEN)) { // TODO do we need to also fail this if we can't delete the pending commit file? // if one of those files can't be deleted we better fail the cleanup otherwise we might leave an old commit point around? throw new IllegalStateException("Can't delete " + existingFile + " - cleanup failed", ex); @@ -656,32 +659,60 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref * * @see StoreFileMetaData */ - public final static class MetadataSnapshot implements Iterable, Streamable { + public final static class MetadataSnapshot implements Iterable, Writeable { private static final ESLogger logger = Loggers.getLogger(MetadataSnapshot.class); private static final Version FIRST_LUCENE_CHECKSUM_VERSION = Version.LUCENE_4_8; - private Map metadata; + private final ImmutableMap metadata; public static final MetadataSnapshot EMPTY = new MetadataSnapshot(); - public MetadataSnapshot(Map metadata) { - this.metadata = metadata; + private final ImmutableMap commitUserData; + + public MetadataSnapshot(Map metadata, Map commitUserData) { + ImmutableMap.Builder metaDataBuilder = ImmutableMap.builder(); + this.metadata = metaDataBuilder.putAll(metadata).build(); + ImmutableMap.Builder commitUserDataBuilder = ImmutableMap.builder(); + this.commitUserData = commitUserDataBuilder.putAll(commitUserData).build(); } MetadataSnapshot() { - this.metadata = Collections.emptyMap(); + metadata = ImmutableMap.of(); + commitUserData = ImmutableMap.of(); } MetadataSnapshot(IndexCommit commit, Directory directory, ESLogger logger) throws IOException { - metadata = buildMetadata(commit, directory, logger); + Tuple, ImmutableMap> loadedMetadata = loadMetadata(commit, directory, logger); + metadata = loadedMetadata.v1(); + commitUserData = loadedMetadata.v2(); assert metadata.isEmpty() || numSegmentFiles() == 1 : "numSegmentFiles: " + numSegmentFiles(); } - ImmutableMap buildMetadata(IndexCommit commit, Directory directory, ESLogger logger) throws IOException { + public MetadataSnapshot(StreamInput in) throws IOException { + int size = in.readVInt(); + ImmutableMap.Builder metadataBuilder = ImmutableMap.builder(); + for (int i = 0; i < size; i++) { + StoreFileMetaData meta = StoreFileMetaData.readStoreFileMetaData(in); + metadataBuilder.put(meta.name(), meta); + } + ImmutableMap.Builder commitUserDataBuilder = ImmutableMap.builder(); + int num = in.readVInt(); + for (int i = num; i > 0; i--) { + commitUserDataBuilder.put(in.readString(), in.readString()); + } + + this.commitUserData = commitUserDataBuilder.build(); + this.metadata = metadataBuilder.build(); + assert metadata.isEmpty() || numSegmentFiles() == 1 : "numSegmentFiles: " + numSegmentFiles(); + } + + static Tuple, ImmutableMap> loadMetadata(IndexCommit commit, Directory directory, ESLogger logger) throws IOException { ImmutableMap.Builder builder = ImmutableMap.builder(); Map checksumMap = readLegacyChecksums(directory).v1(); + ImmutableMap.Builder commitUserDataBuilder = ImmutableMap.builder(); try { final SegmentInfos segmentCommitInfos = Store.readSegmentsInfo(commit, directory); + commitUserDataBuilder.putAll(segmentCommitInfos.getUserData()); Version maxVersion = Version.LUCENE_4_0; // we don't know which version was used to write so we take the max version. for (SegmentCommitInfo info : segmentCommitInfos) { final Version version = info.info.getVersion(); @@ -734,7 +765,7 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref throw ex; } - return builder.build(); + return new Tuple, ImmutableMap>(builder.build(), commitUserDataBuilder.build()); } /** @@ -955,30 +986,21 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref return metadata.size(); } - public static MetadataSnapshot read(StreamInput in) throws IOException { - MetadataSnapshot storeFileMetaDatas = new MetadataSnapshot(); - storeFileMetaDatas.readFrom(in); - return storeFileMetaDatas; - } - - @Override - public void readFrom(StreamInput in) throws IOException { - int size = in.readVInt(); - ImmutableMap.Builder builder = ImmutableMap.builder(); - for (int i = 0; i < size; i++) { - StoreFileMetaData meta = StoreFileMetaData.readStoreFileMetaData(in); - builder.put(meta.name(), meta); - } - this.metadata = builder.build(); - assert metadata.isEmpty() || numSegmentFiles() == 1 : "numSegmentFiles: " + numSegmentFiles(); - } - @Override public void writeTo(StreamOutput out) throws IOException { out.writeVInt(this.metadata.size()); for (StoreFileMetaData meta : this) { meta.writeTo(out); } + out.writeVInt(commitUserData.size()); + for (Map.Entry entry : commitUserData.entrySet()) { + out.writeString(entry.getKey()); + out.writeString(entry.getValue()); + } + } + + public Map getCommitUserData() { + return commitUserData; } /** @@ -1010,6 +1032,20 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref } return count; } + + /** + * Returns the sync id of the commit point that this MetadataSnapshot represents. + * + * @return sync id if exists, else null + */ + public String getSyncId() { + return commitUserData.get(Engine.SYNC_COMMIT_ID); + } + + @Override + public MetadataSnapshot readFrom(StreamInput in) throws IOException { + return new MetadataSnapshot(in); + } } /** @@ -1360,7 +1396,7 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref } @Override - protected StoreStats refresh() { + protected StoreStats refresh() { try { return new StoreStats(estimateSize(directory), directoryService.throttleTimeInNanos()); } catch (IOException ex) { diff --git a/src/main/java/org/elasticsearch/indices/recovery/RecoveryCleanFilesRequest.java b/src/main/java/org/elasticsearch/indices/recovery/RecoveryCleanFilesRequest.java index 0ff00d7c008..b0d224c41ef 100644 --- a/src/main/java/org/elasticsearch/indices/recovery/RecoveryCleanFilesRequest.java +++ b/src/main/java/org/elasticsearch/indices/recovery/RecoveryCleanFilesRequest.java @@ -61,7 +61,7 @@ class RecoveryCleanFilesRequest extends TransportRequest { super.readFrom(in); recoveryId = in.readLong(); shardId = ShardId.readShardId(in); - snapshotFiles = Store.MetadataSnapshot.read(in); + snapshotFiles = new Store.MetadataSnapshot(in); totalTranslogOps = in.readVInt(); } diff --git a/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index 6a429974f69..2535caa7c0b 100644 --- a/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -65,6 +65,7 @@ import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportService; import java.io.IOException; +import java.util.Collections; import java.util.Comparator; import java.util.List; import java.util.concurrent.*; @@ -137,11 +138,11 @@ public class RecoverySourceHandler implements Engine.RecoveryHandler { * Perform phase1 of the recovery operations. Once this {@link SnapshotIndexCommit} * snapshot has been performed no commit operations (files being fsync'd) * are effectively allowed on this index until all recovery phases are done - * + *

* Phase1 examines the segment files on the target node and copies over the * segments that are missing. Only segments that have the same size and * checksum can be reused - * + *

* {@code InternalEngine#recover} is responsible for snapshotting the index * and releasing the snapshot once all 3 phases of recovery are complete */ @@ -168,28 +169,45 @@ public class RecoverySourceHandler implements Engine.RecoveryHandler { // Generate a "diff" of all the identical, different, and missing // segment files on the target node, using the existing files on // the source node - final Store.RecoveryDiff diff = recoverySourceMetadata.recoveryDiff(new Store.MetadataSnapshot(request.existingFiles())); - for (StoreFileMetaData md : diff.identical) { - response.phase1ExistingFileNames.add(md.name()); - response.phase1ExistingFileSizes.add(md.length()); - existingTotalSize += md.length(); - if (logger.isTraceEnabled()) { - logger.trace("[{}][{}] recovery [phase1] to {}: not recovering [{}], exists in local store and has checksum [{}], size [{}]", - indexName, shardId, request.targetNode(), md.name(), md.checksum(), md.length()); + String recoverySourceSyncId = recoverySourceMetadata.getSyncId(); + String recoveryTargetSyncId = request.metadataSnapshot().getSyncId(); + final boolean recoverWithSyncId = recoverySourceSyncId != null && + recoverySourceSyncId.equals(recoveryTargetSyncId); + if (recoverWithSyncId) { + for (StoreFileMetaData md : request.metadataSnapshot()) { + response.phase1ExistingFileNames.add(md.name()); + response.phase1ExistingFileSizes.add(md.length()); + existingTotalSize += md.length(); + if (logger.isTraceEnabled()) { + logger.trace("[{}][{}] recovery [phase1] to {}: not recovering [{}], checksum [{}], size [{}], sync ids {} coincide, will skip file copy", + indexName, shardId, request.targetNode(), md.name(), md.checksum(), md.length(), recoverySourceMetadata.getCommitUserData().get(Engine.SYNC_COMMIT_ID)); + } + totalSize += md.length(); } - totalSize += md.length(); - } - for (StoreFileMetaData md : Iterables.concat(diff.different, diff.missing)) { - if (request.existingFiles().containsKey(md.name())) { - logger.trace("[{}][{}] recovery [phase1] to {}: recovering [{}], exists in local store, but is different: remote [{}], local [{}]", - indexName, shardId, request.targetNode(), md.name(), request.existingFiles().get(md.name()), md); - } else { - logger.trace("[{}][{}] recovery [phase1] to {}: recovering [{}], does not exists in remote", - indexName, shardId, request.targetNode(), md.name()); + } else { + final Store.RecoveryDiff diff = recoverySourceMetadata.recoveryDiff(request.metadataSnapshot()); + for (StoreFileMetaData md : diff.identical) { + response.phase1ExistingFileNames.add(md.name()); + response.phase1ExistingFileSizes.add(md.length()); + existingTotalSize += md.length(); + if (logger.isTraceEnabled()) { + logger.trace("[{}][{}] recovery [phase1] to {}: not recovering [{}], exists in local store and has checksum [{}], size [{}]", + indexName, shardId, request.targetNode(), md.name(), md.checksum(), md.length()); + } + totalSize += md.length(); + } + for (StoreFileMetaData md : Iterables.concat(diff.different, diff.missing)) { + if (request.metadataSnapshot().asMap().containsKey(md.name())) { + logger.trace("[{}][{}] recovery [phase1] to {}: recovering [{}], exists in local store, but is different: remote [{}], local [{}]", + indexName, shardId, request.targetNode(), md.name(), request.metadataSnapshot().asMap().get(md.name()), md); + } else { + logger.trace("[{}][{}] recovery [phase1] to {}: recovering [{}], does not exists in remote", + indexName, shardId, request.targetNode(), md.name()); + } + response.phase1FileNames.add(md.name()); + response.phase1FileSizes.add(md.length()); + totalSize += md.length(); } - response.phase1FileNames.add(md.name()); - response.phase1FileSizes.add(md.length()); - totalSize += md.length(); } response.phase1TotalSize = totalSize; response.phase1ExistingTotalSize = existingTotalSize; @@ -209,7 +227,6 @@ public class RecoverySourceHandler implements Engine.RecoveryHandler { } }); - // This latch will be used to wait until all files have been transferred to the target node final CountDownLatch latch = new CountDownLatch(response.phase1FileNames.size()); final CopyOnWriteArrayList exceptions = new CopyOnWriteArrayList<>(); @@ -364,8 +381,9 @@ public class RecoverySourceHandler implements Engine.RecoveryHandler { // related to this recovery (out of date segments, for example) // are deleted try { + final Store.MetadataSnapshot remainingFilesAfterCleanup = recoverWithSyncId? request.metadataSnapshot(): recoverySourceMetadata; transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.CLEAN_FILES, - new RecoveryCleanFilesRequest(request.recoveryId(), shard.shardId(), recoverySourceMetadata, shard.translog().estimatedNumberOfOperations()), + new RecoveryCleanFilesRequest(request.recoveryId(), shard.shardId(), remainingFilesAfterCleanup, shard.translog().estimatedNumberOfOperations()), TransportRequestOptions.options().withTimeout(recoverySettings.internalActionTimeout()), EmptyTransportResponseHandler.INSTANCE_SAME).txGet(); } catch (RemoteTransportException remoteException) { @@ -418,12 +436,12 @@ public class RecoverySourceHandler implements Engine.RecoveryHandler { /** * Perform phase2 of the recovery process - * + *

* Phase2 takes a snapshot of the current translog *without* acquiring the * write lock (however, the translog snapshot is a point-in-time view of * the translog). It then sends each translog operation to the target node * so it can be replayed into the new shard. - * + *

* {@code InternalEngine#recover} is responsible for taking the snapshot * of the translog and releasing it once all 3 phases of recovery are complete */ @@ -469,11 +487,11 @@ public class RecoverySourceHandler implements Engine.RecoveryHandler { /** * Perform phase 3 of the recovery process - * + *

* Phase3 again takes a snapshot of the translog, however this time the * snapshot is acquired under a write lock. The translog operations are * sent to the target node where they are replayed. - * + *

* {@code InternalEngine#recover} is responsible for taking the snapshot * of the translog, and after phase 3 completes the snapshots from all * three phases are released. @@ -587,7 +605,7 @@ public class RecoverySourceHandler implements Engine.RecoveryHandler { /** * Send the given snapshot's operations to this handler's target node. - * + *

* Operations are bulked into a single request depending on an operation * count limit or size-in-bytes limit * @@ -600,8 +618,8 @@ public class RecoverySourceHandler implements Engine.RecoveryHandler { final List operations = Lists.newArrayList(); Translog.Operation operation; try { - operation = snapshot.next(); // this ex should bubble up - } catch (IOException ex){ + operation = snapshot.next(); // this ex should bubble up + } catch (IOException ex) { throw new ElasticsearchException("failed to get next operation from translog", ex); } @@ -659,9 +677,10 @@ public class RecoverySourceHandler implements Engine.RecoveryHandler { } try { operation = snapshot.next(); // this ex should bubble up - } catch (IOException ex){ + } catch (IOException ex) { throw new ElasticsearchException("failed to get next operation from translog", ex); - } } + } + } // send the leftover if (!operations.isEmpty()) { cancellableThreads.execute(new Interruptable() { diff --git a/src/main/java/org/elasticsearch/indices/recovery/RecoveryState.java b/src/main/java/org/elasticsearch/indices/recovery/RecoveryState.java index c2af0657bc5..2f8339afc21 100644 --- a/src/main/java/org/elasticsearch/indices/recovery/RecoveryState.java +++ b/src/main/java/org/elasticsearch/indices/recovery/RecoveryState.java @@ -26,6 +26,7 @@ import org.elasticsearch.common.Nullable; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Streamable; +import org.elasticsearch.common.logging.ESLoggerFactory; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; @@ -47,16 +48,24 @@ public class RecoveryState implements ToXContent, Streamable { public static enum Stage { INIT((byte) 0), - /** recovery of lucene files, either reusing local ones are copying new ones */ + /** + * recovery of lucene files, either reusing local ones are copying new ones + */ INDEX((byte) 1), - /** potentially running check index */ + /** + * potentially running check index + */ VERIFY_INDEX((byte) 2), - /** starting up the engine, replaying the translog */ + /** + * starting up the engine, replaying the translog + */ TRANSLOG((byte) 3), - /** performing final task after all translog ops have been done */ + /** + * performing final task after all translog ops have been done + */ FINALIZE((byte) 4), DONE((byte) 5); @@ -494,7 +503,9 @@ public class RecoveryState implements ToXContent, Streamable { assert total == UNKNOWN || total >= recovered : "total, if known, should be > recovered. total [" + total + "], recovered [" + recovered + "]"; } - /** returns the total number of translog operations recovered so far */ + /** + * returns the total number of translog operations recovered so far + */ public synchronized int recoveredOperations() { return recovered; } @@ -587,22 +598,30 @@ public class RecoveryState implements ToXContent, Streamable { recovered += bytes; } - /** file name * */ + /** + * file name * + */ public String name() { return name; } - /** file length * */ + /** + * file length * + */ public long length() { return length; } - /** number of bytes recovered for this file (so far). 0 if the file is reused * */ + /** + * number of bytes recovered for this file (so far). 0 if the file is reused * + */ public long recovered() { return recovered; } - /** returns true if the file is reused from a local copy */ + /** + * returns true if the file is reused from a local copy + */ public boolean reused() { return reused; } @@ -729,12 +748,16 @@ public class RecoveryState implements ToXContent, Streamable { return TimeValue.timeValueNanos(targetThrottleTimeInNanos); } - /** total number of files that are part of this recovery, both re-used and recovered */ + /** + * total number of files that are part of this recovery, both re-used and recovered + */ public synchronized int totalFileCount() { return fileDetails.size(); } - /** total number of files to be recovered (potentially not yet done) */ + /** + * total number of files to be recovered (potentially not yet done) + */ public synchronized int totalRecoverFiles() { int total = 0; for (File file : fileDetails.values()) { @@ -746,7 +769,9 @@ public class RecoveryState implements ToXContent, Streamable { } - /** number of file that were recovered (excluding on ongoing files) */ + /** + * number of file that were recovered (excluding on ongoing files) + */ public synchronized int recoveredFileCount() { int count = 0; for (File file : fileDetails.values()) { @@ -757,7 +782,9 @@ public class RecoveryState implements ToXContent, Streamable { return count; } - /** percent of recovered (i.e., not reused) files out of the total files to be recovered */ + /** + * percent of recovered (i.e., not reused) files out of the total files to be recovered + */ public synchronized float recoveredFilesPercent() { int total = 0; int recovered = 0; @@ -780,7 +807,9 @@ public class RecoveryState implements ToXContent, Streamable { } } - /** total number of bytes in th shard */ + /** + * total number of bytes in th shard + */ public synchronized long totalBytes() { long total = 0; for (File file : fileDetails.values()) { @@ -789,7 +818,9 @@ public class RecoveryState implements ToXContent, Streamable { return total; } - /** total number of bytes recovered so far, including both existing and reused */ + /** + * total number of bytes recovered so far, including both existing and reused + */ public synchronized long recoveredBytes() { long recovered = 0; for (File file : fileDetails.values()) { @@ -798,7 +829,9 @@ public class RecoveryState implements ToXContent, Streamable { return recovered; } - /** total bytes of files to be recovered (potentially not yet done) */ + /** + * total bytes of files to be recovered (potentially not yet done) + */ public synchronized long totalRecoverBytes() { long total = 0; for (File file : fileDetails.values()) { @@ -819,7 +852,9 @@ public class RecoveryState implements ToXContent, Streamable { return total; } - /** percent of bytes recovered out of total files bytes *to be* recovered */ + /** + * percent of bytes recovered out of total files bytes *to be* recovered + */ public synchronized float recoveredBytesPercent() { long total = 0; long recovered = 0; diff --git a/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java b/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java index fcfc9722a03..185f1e8cd07 100644 --- a/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java +++ b/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java @@ -40,6 +40,7 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.CancellableThreads; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.index.IndexShardMissingException; +import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.RecoveryEngineException; import org.elasticsearch.index.settings.IndexSettings; import org.elasticsearch.index.shard.*; @@ -155,12 +156,12 @@ public class RecoveryTarget extends AbstractComponent { assert recoveryStatus.sourceNode() != null : "can't do a recovery without a source node"; logger.trace("collecting local files for {}", recoveryStatus); - Map existingFiles; + Store.MetadataSnapshot metadataSnapshot = null; try { - existingFiles = recoveryStatus.store().getMetadataOrEmpty().asMap(); + metadataSnapshot = recoveryStatus.store().getMetadataOrEmpty(); } catch (IOException e) { logger.warn("error while listing local files, recover as if there are none", e); - existingFiles = Store.MetadataSnapshot.EMPTY.asMap(); + metadataSnapshot = Store.MetadataSnapshot.EMPTY; } catch (Exception e) { // this will be logged as warning later on... logger.trace("unexpected error while listing local files, failing recovery", e); @@ -169,7 +170,7 @@ public class RecoveryTarget extends AbstractComponent { return; } final StartRecoveryRequest request = new StartRecoveryRequest(recoveryStatus.shardId(), recoveryStatus.sourceNode(), clusterService.localNode(), - false, existingFiles, recoveryStatus.state().getType(), recoveryStatus.recoveryId()); + false, metadataSnapshot, recoveryStatus.state().getType(), recoveryStatus.recoveryId()); final AtomicReference responseHolder = new AtomicReference<>(); try { diff --git a/src/main/java/org/elasticsearch/indices/recovery/StartRecoveryRequest.java b/src/main/java/org/elasticsearch/indices/recovery/StartRecoveryRequest.java index 88c14d7f0ce..b5fa3ee6047 100644 --- a/src/main/java/org/elasticsearch/indices/recovery/StartRecoveryRequest.java +++ b/src/main/java/org/elasticsearch/indices/recovery/StartRecoveryRequest.java @@ -25,6 +25,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.store.Store; import org.elasticsearch.index.store.StoreFileMetaData; import org.elasticsearch.transport.TransportRequest; @@ -46,7 +47,7 @@ public class StartRecoveryRequest extends TransportRequest { private boolean markAsRelocated; - private Map existingFiles; + private Store.MetadataSnapshot metadataSnapshot; private RecoveryState.Type recoveryType; @@ -57,20 +58,19 @@ public class StartRecoveryRequest extends TransportRequest { * Start recovery request. * * @param shardId - * @param sourceNode The node to recover from - * @param targetNode The node to recover to + * @param sourceNode The node to recover from + * @param targetNode The node to recover to * @param markAsRelocated - * @param existingFiles + * @param metadataSnapshot */ - public StartRecoveryRequest(ShardId shardId, DiscoveryNode sourceNode, DiscoveryNode targetNode, boolean markAsRelocated, Map existingFiles, RecoveryState.Type recoveryType, long recoveryId) { + public StartRecoveryRequest(ShardId shardId, DiscoveryNode sourceNode, DiscoveryNode targetNode, boolean markAsRelocated, Store.MetadataSnapshot metadataSnapshot, RecoveryState.Type recoveryType, long recoveryId) { this.recoveryId = recoveryId; this.shardId = shardId; this.sourceNode = sourceNode; this.targetNode = targetNode; this.markAsRelocated = markAsRelocated; - this.existingFiles = existingFiles; this.recoveryType = recoveryType; + this.metadataSnapshot = metadataSnapshot; } public long recoveryId() { @@ -93,14 +93,14 @@ public class StartRecoveryRequest extends TransportRequest { return markAsRelocated; } - public Map existingFiles() { - return existingFiles; - } - public RecoveryState.Type recoveryType() { return recoveryType; } + public Store.MetadataSnapshot metadataSnapshot() { + return metadataSnapshot; + } + @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); @@ -109,13 +109,9 @@ public class StartRecoveryRequest extends TransportRequest { sourceNode = DiscoveryNode.readNode(in); targetNode = DiscoveryNode.readNode(in); markAsRelocated = in.readBoolean(); - int size = in.readVInt(); - existingFiles = Maps.newHashMapWithExpectedSize(size); - for (int i = 0; i < size; i++) { - StoreFileMetaData md = StoreFileMetaData.readStoreFileMetaData(in); - existingFiles.put(md.name(), md); - } + metadataSnapshot = new Store.MetadataSnapshot(in); recoveryType = RecoveryState.Type.fromId(in.readByte()); + } @Override @@ -126,10 +122,8 @@ public class StartRecoveryRequest extends TransportRequest { sourceNode.writeTo(out); targetNode.writeTo(out); out.writeBoolean(markAsRelocated); - out.writeVInt(existingFiles.size()); - for (StoreFileMetaData md : existingFiles.values()) { - md.writeTo(out); - } + metadataSnapshot.writeTo(out); out.writeByte(recoveryType.id()); } + } diff --git a/src/main/java/org/elasticsearch/indices/store/TransportNodesListShardStoreMetaData.java b/src/main/java/org/elasticsearch/indices/store/TransportNodesListShardStoreMetaData.java index f95cfd8ece9..e65a260eb06 100644 --- a/src/main/java/org/elasticsearch/indices/store/TransportNodesListShardStoreMetaData.java +++ b/src/main/java/org/elasticsearch/indices/store/TransportNodesListShardStoreMetaData.java @@ -40,6 +40,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.index.IndexService; +import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardPath; @@ -144,7 +145,7 @@ public class TransportNodesListShardStoreMetaData extends TransportNodesOperatio store.incRef(); try { exists = true; - return new StoreFilesMetaData(true, shardId, store.getMetadataOrEmpty().asMap()); + return new StoreFilesMetaData(true, shardId, store.getMetadataOrEmpty()); } finally { store.decRef(); } @@ -153,17 +154,17 @@ public class TransportNodesListShardStoreMetaData extends TransportNodesOperatio // try and see if we an list unallocated IndexMetaData metaData = clusterService.state().metaData().index(shardId.index().name()); if (metaData == null) { - return new StoreFilesMetaData(false, shardId, ImmutableMap.of()); + return new StoreFilesMetaData(false, shardId, Store.MetadataSnapshot.EMPTY); } String storeType = metaData.settings().get(IndexStoreModule.STORE_TYPE, "fs"); if (!storeType.contains("fs")) { - return new StoreFilesMetaData(false, shardId, ImmutableMap.of()); + return new StoreFilesMetaData(false, shardId, Store.MetadataSnapshot.EMPTY); } final ShardPath shardPath = ShardPath.loadShardPath(logger, nodeEnv, shardId, metaData.settings()); if (shardPath == null) { - return new StoreFilesMetaData(false, shardId, ImmutableMap.of()); + return new StoreFilesMetaData(false, shardId, Store.MetadataSnapshot.EMPTY); } - return new StoreFilesMetaData(false, shardId, Store.readMetadataSnapshot(shardPath.resolveIndex(), logger).asMap()); + return new StoreFilesMetaData(false, shardId, Store.readMetadataSnapshot(shardPath.resolveIndex(), logger)); } finally { TimeValue took = new TimeValue(System.currentTimeMillis() - startTime); if (exists) { @@ -180,17 +181,18 @@ public class TransportNodesListShardStoreMetaData extends TransportNodesOperatio } public static class StoreFilesMetaData implements Iterable, Streamable { + // here also trasmit sync id, else recovery will not use sync id because of stupid gateway allocator every now and then... private boolean allocated; private ShardId shardId; - private Map files; + Store.MetadataSnapshot metadataSnapshot; StoreFilesMetaData() { } - public StoreFilesMetaData(boolean allocated, ShardId shardId, Map files) { + public StoreFilesMetaData(boolean allocated, ShardId shardId, Store.MetadataSnapshot metadataSnapshot) { this.allocated = allocated; this.shardId = shardId; - this.files = files; + this.metadataSnapshot = metadataSnapshot; } public boolean allocated() { @@ -203,15 +205,15 @@ public class TransportNodesListShardStoreMetaData extends TransportNodesOperatio @Override public Iterator iterator() { - return files.values().iterator(); + return metadataSnapshot.iterator(); } public boolean fileExists(String name) { - return files.containsKey(name); + return metadataSnapshot.asMap().containsKey(name); } public StoreFileMetaData file(String name) { - return files.get(name); + return metadataSnapshot.asMap().get(name); } public static StoreFilesMetaData readStoreFilesMetaData(StreamInput in) throws IOException { @@ -224,22 +226,18 @@ public class TransportNodesListShardStoreMetaData extends TransportNodesOperatio public void readFrom(StreamInput in) throws IOException { allocated = in.readBoolean(); shardId = ShardId.readShardId(in); - int size = in.readVInt(); - files = Maps.newHashMapWithExpectedSize(size); - for (int i = 0; i < size; i++) { - StoreFileMetaData md = StoreFileMetaData.readStoreFileMetaData(in); - files.put(md.name(), md); - } + this.metadataSnapshot = new Store.MetadataSnapshot(in); } @Override public void writeTo(StreamOutput out) throws IOException { out.writeBoolean(allocated); shardId.writeTo(out); - out.writeVInt(files.size()); - for (StoreFileMetaData md : files.values()) { - md.writeTo(out); - } + metadataSnapshot.writeTo(out); + } + + public String syncId() { + return metadataSnapshot.getSyncId(); } } diff --git a/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayTests.java b/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayTests.java index 44321fad582..4889a6ed6c4 100644 --- a/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayTests.java +++ b/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayTests.java @@ -22,19 +22,26 @@ package org.elasticsearch.gateway; import org.apache.lucene.util.LuceneTestCase.Slow; import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse; import org.elasticsearch.action.admin.indices.recovery.ShardRecoveryResponse; +import org.elasticsearch.action.admin.indices.stats.IndexStats; +import org.elasticsearch.action.admin.indices.stats.ShardStats; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider; import org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider; import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.query.FilterBuilders; import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.indices.SyncedFlushService; import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.test.ElasticsearchIntegrationTest; import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope; import org.elasticsearch.test.InternalTestCluster.RestartCallback; +import org.elasticsearch.test.junit.annotations.TestLogging; import org.elasticsearch.test.store.MockFSDirectoryService; import org.junit.Test; @@ -47,10 +54,7 @@ import static org.elasticsearch.index.query.QueryBuilders.termQuery; import static org.elasticsearch.test.ElasticsearchIntegrationTest.Scope; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.greaterThan; -import static org.hamcrest.Matchers.not; -import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.*; /** * @@ -346,11 +350,13 @@ public class RecoveryFromGatewayTests extends ElasticsearchIntegrationTest { @Test @Slow + @TestLogging("gateway:TRACE,indices.recovery:TRACE,index.engine:TRACE") public void testReusePeerRecovery() throws Exception { final Settings settings = settingsBuilder() .put("action.admin.cluster.node.shutdown.delay", "10ms") .put(MockFSDirectoryService.CHECK_INDEX_ON_CLOSE, false) .put("gateway.recover_after_nodes", 4) + .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_CONCURRENT_RECOVERIES, 4) .put(MockFSDirectoryService.CRASH_INDEX, false).build(); @@ -377,6 +383,109 @@ public class RecoveryFromGatewayTests extends ElasticsearchIntegrationTest { client().admin().indices().prepareOptimize("test").setMaxNumSegments(100).get(); // just wait for merges client().admin().indices().prepareFlush().setWaitIfOngoing(true).setForce(true).get(); + boolean useSyncIds = randomBoolean(); + if (useSyncIds == false) { + logger.info("--> disabling allocation while the cluster is shut down"); + + // Disable allocations while we are closing nodes + client().admin().cluster().prepareUpdateSettings() + .setTransientSettings(settingsBuilder() + .put(EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE, EnableAllocationDecider.Allocation.NONE)) + .get(); + logger.info("--> full cluster restart"); + internalCluster().fullRestart(); + + logger.info("--> waiting for cluster to return to green after first shutdown"); + ensureGreen(); + } else { + logger.info("--> trying to sync flush"); + int numShards = Integer.parseInt(client().admin().indices().prepareGetSettings("test").get().getSetting("test", "index.number_of_shards")); + SyncedFlushService syncedFlushService = internalCluster().getInstance(SyncedFlushService.class); + for (int i = 0; i < numShards; i++) { + assertTrue(syncedFlushService.attemptSyncedFlush(new ShardId("test", i)).success()); + } + assertSyncIdsNotNull(); + } + + logger.info("--> disabling allocation while the cluster is shut down", useSyncIds ? "" : " a second time"); + // Disable allocations while we are closing nodes + client().admin().cluster().prepareUpdateSettings() + .setTransientSettings(settingsBuilder() + .put(EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE, EnableAllocationDecider.Allocation.NONE)) + .get(); + logger.info("--> full cluster restart"); + internalCluster().fullRestart(); + + logger.info("--> waiting for cluster to return to green after {}shutdown", useSyncIds ? "" : "second "); + ensureGreen(); + + if (useSyncIds) { + assertSyncIdsNotNull(); + } + RecoveryResponse recoveryResponse = client().admin().indices().prepareRecoveries("test").get(); + for (ShardRecoveryResponse response : recoveryResponse.shardResponses().get("test")) { + RecoveryState recoveryState = response.recoveryState(); + long recovered = 0; + for (RecoveryState.File file : recoveryState.getIndex().fileDetails()) { + if (file.name().startsWith("segments")) { + recovered += file.length(); + } + } + if (!recoveryState.getPrimary() && (useSyncIds == false)) { + logger.info("--> replica shard {} recovered from {} to {}, recovered {}, reuse {}", + response.getShardId(), recoveryState.getSourceNode().name(), recoveryState.getTargetNode().name(), + recoveryState.getIndex().recoveredBytes(), recoveryState.getIndex().reusedBytes()); + assertThat("no bytes should be recovered", recoveryState.getIndex().recoveredBytes(), equalTo(recovered)); + assertThat("data should have been reused", recoveryState.getIndex().reusedBytes(), greaterThan(0l)); + // we have to recover the segments file since we commit the translog ID on engine startup + assertThat("all bytes should be reused except of the segments file", recoveryState.getIndex().reusedBytes(), equalTo(recoveryState.getIndex().totalBytes() - recovered)); + assertThat("no files should be recovered except of the segments file", recoveryState.getIndex().recoveredFileCount(), equalTo(1)); + assertThat("all files should be reused except of the segments file", recoveryState.getIndex().reusedFileCount(), equalTo(recoveryState.getIndex().totalFileCount() - 1)); + assertThat("> 0 files should be reused", recoveryState.getIndex().reusedFileCount(), greaterThan(0)); + } else { + if (useSyncIds && !recoveryState.getPrimary()) { + logger.info("--> replica shard {} recovered from {} to {} using sync id, recovered {}, reuse {}", + response.getShardId(), recoveryState.getSourceNode().name(), recoveryState.getTargetNode().name(), + recoveryState.getIndex().recoveredBytes(), recoveryState.getIndex().reusedBytes()); + } + assertThat(recoveryState.getIndex().recoveredBytes(), equalTo(0l)); + assertThat(recoveryState.getIndex().reusedBytes(), equalTo(recoveryState.getIndex().totalBytes())); + assertThat(recoveryState.getIndex().recoveredFileCount(), equalTo(0)); + assertThat(recoveryState.getIndex().reusedFileCount(), equalTo(recoveryState.getIndex().totalFileCount())); + } + } + } + + public void assertSyncIdsNotNull() { + IndexStats indexStats = client().admin().indices().prepareStats("test").get().getIndex("test"); + for (ShardStats shardStats : indexStats.getShards()) { + assertNotNull(shardStats.getCommitStats().getUserData().get(Engine.SYNC_COMMIT_ID)); + } + } + + @Test + @Slow + @TestLogging("indices.recovery:TRACE,index.store:TRACE") + public void testSyncFlushedRecovery() throws Exception { + final Settings settings = settingsBuilder() + .put("action.admin.cluster.node.shutdown.delay", "10ms") + .put(MockFSDirectoryService.CHECK_INDEX_ON_CLOSE, false) + .put("gateway.recover_after_nodes", 4) + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_CONCURRENT_RECOVERIES, 4).build(); + + internalCluster().startNodesAsync(4, settings).get(); + // prevent any rebalance actions during the recovery + assertAcked(prepareCreate("test").setSettings(ImmutableSettings.builder() + .put(indexSettings()) + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(EnableAllocationDecider.INDEX_ROUTING_REBALANCE_ENABLE, EnableAllocationDecider.Rebalance.NONE))); + ensureGreen(); + logger.info("--> indexing docs"); + for (int i = 0; i < 1000; i++) { + client().prepareIndex("test", "type").setSource("field", "value").execute().actionGet(); + } + logger.info("--> disabling allocation while the cluster is shut down"); // Disable allocations while we are closing nodes @@ -384,52 +493,32 @@ public class RecoveryFromGatewayTests extends ElasticsearchIntegrationTest { .setTransientSettings(settingsBuilder() .put(EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE, EnableAllocationDecider.Allocation.NONE)) .get(); + + SyncedFlushService syncedFlushService = internalCluster().getInstance(SyncedFlushService.class); + assertTrue(syncedFlushService.attemptSyncedFlush(new ShardId("test", 0)).success()); logger.info("--> full cluster restart"); internalCluster().fullRestart(); logger.info("--> waiting for cluster to return to green after first shutdown"); ensureGreen(); - - logger.info("--> disabling allocation while the cluster is shut down second time"); - // Disable allocations while we are closing nodes - client().admin().cluster().prepareUpdateSettings() - .setTransientSettings(settingsBuilder() - .put(EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE, EnableAllocationDecider.Allocation.NONE)) - .get(); - logger.info("--> full cluster restart"); - internalCluster().fullRestart(); - - logger.info("--> waiting for cluster to return to green after second shutdown"); - ensureGreen(); - + logClusterState(); RecoveryResponse recoveryResponse = client().admin().indices().prepareRecoveries("test").get(); for (ShardRecoveryResponse response : recoveryResponse.shardResponses().get("test")) { RecoveryState recoveryState = response.recoveryState(); - long recovered = 0; - for (RecoveryState.File file : recoveryState.getIndex().fileDetails()) { - if (file.name().startsWith("segments")) { - recovered += file.length(); - } - } if (!recoveryState.getPrimary()) { logger.info("--> replica shard {} recovered from {} to {}, recovered {}, reuse {}", response.getShardId(), recoveryState.getSourceNode().name(), recoveryState.getTargetNode().name(), recoveryState.getIndex().recoveredBytes(), recoveryState.getIndex().reusedBytes()); - assertThat("no bytes should be recovered", recoveryState.getIndex().recoveredBytes(), equalTo(recovered)); - assertThat("data should have been reused", recoveryState.getIndex().reusedBytes(), greaterThan(0l)); - // we have to recover the segments file since we commit the translog ID on engine startup - assertThat("all bytes should be reused except of the segments file", recoveryState.getIndex().reusedBytes(), equalTo(recoveryState.getIndex().totalBytes()-recovered)); - assertThat("no files should be recovered except of the segments file", recoveryState.getIndex().recoveredFileCount(), equalTo(1)); - assertThat("all files should be reused except of the segments file", recoveryState.getIndex().reusedFileCount(), equalTo(recoveryState.getIndex().totalFileCount()-1)); - assertThat("> 0 files should be reused", recoveryState.getIndex().reusedFileCount(), greaterThan(0)); } else { - assertThat(recoveryState.getIndex().recoveredBytes(), equalTo(0l)); - assertThat(recoveryState.getIndex().reusedBytes(), equalTo(recoveryState.getIndex().totalBytes())); - assertThat(recoveryState.getIndex().recoveredFileCount(), equalTo(0)); - assertThat(recoveryState.getIndex().reusedFileCount(), equalTo(recoveryState.getIndex().totalFileCount())); + logger.info("--> replica shard {} recovered from {} to {}, recovered {}, reuse {}", + response.getShardId(), recoveryState.getSourceNode().name(), recoveryState.getTargetNode().name(), + recoveryState.getIndex().recoveredBytes(), recoveryState.getIndex().reusedBytes()); } + assertThat("no bytes should be recovered", recoveryState.getIndex().recoveredBytes(), equalTo(0l)); + assertThat("data should have been reused", recoveryState.getIndex().reusedBytes(), equalTo(recoveryState.getIndex().totalBytes())); + assertThat("no files should be recovered", recoveryState.getIndex().recoveredFileCount(), equalTo(0)); + assertThat("all files should be reused", recoveryState.getIndex().reusedFileCount(), equalTo(recoveryState.getIndex().totalFileCount())); } - } @Test diff --git a/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 1c86db6b451..3dfdb5e629c 100644 --- a/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -2002,5 +2002,5 @@ public class InternalEngineTests extends ElasticsearchTestCase { recoveredOps.incrementAndGet(); } } - + } diff --git a/src/test/java/org/elasticsearch/index/store/StoreTest.java b/src/test/java/org/elasticsearch/index/store/StoreTest.java index c4e7ae9a7ea..2befce3fc44 100644 --- a/src/test/java/org/elasticsearch/index/store/StoreTest.java +++ b/src/test/java/org/elasticsearch/index/store/StoreTest.java @@ -32,17 +32,26 @@ import org.apache.lucene.util.IOUtils; import org.apache.lucene.util.TestUtil; import org.apache.lucene.util.Version; import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.common.io.stream.InputStreamStreamInput; +import org.elasticsearch.common.io.stream.OutputStreamStreamOutput; import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.env.ShardLock; import org.elasticsearch.index.Index; +import org.elasticsearch.index.deletionpolicy.KeepOnlyLastDeletionPolicy; +import org.elasticsearch.index.deletionpolicy.SnapshotDeletionPolicy; +import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.translog.Translog; +import org.elasticsearch.indices.store.TransportNodesListShardStoreMetaData; import org.elasticsearch.test.DummyShardLock; import org.elasticsearch.test.ElasticsearchTestCase; import org.hamcrest.Matchers; import org.junit.Test; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; import java.io.FileNotFoundException; import java.io.IOException; import java.nio.file.NoSuchFileException; @@ -51,6 +60,8 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.zip.Adler32; +import static org.elasticsearch.common.settings.ImmutableSettings.Builder.EMPTY_SETTINGS; +import static org.elasticsearch.test.VersionUtils.randomVersion; import static org.hamcrest.Matchers.*; public class StoreTest extends ElasticsearchTestCase { @@ -180,6 +191,7 @@ public class StoreTest extends ElasticsearchTestCase { public SegmentInfo read(Directory directory, String segmentName, byte[] segmentID, IOContext context) throws IOException { return segmentInfoFormat.read(directory, segmentName, segmentID, context); } + // this sucks it's a full copy of Lucene50SegmentInfoFormat but hey I couldn't find a way to make it write 4_5_0 versions // somebody was too paranoid when implementing this. ey rmuir, was that you? - go fix it :P @Override @@ -536,7 +548,7 @@ public class StoreTest extends ElasticsearchTestCase { } final long luceneChecksum; final long adler32LegacyChecksum = adler32.getValue(); - try(IndexInput indexInput = dir.openInput("lucene_checksum.bin", IOContext.DEFAULT)) { + try (IndexInput indexInput = dir.openInput("lucene_checksum.bin", IOContext.DEFAULT)) { assertEquals(luceneFileLength, indexInput.length()); luceneChecksum = CodecUtil.retrieveChecksum(indexInput); } @@ -551,8 +563,8 @@ public class StoreTest extends ElasticsearchTestCase { } { // negative check - wrong checksum - StoreFileMetaData lucene = new StoreFileMetaData("lucene_checksum.bin", luceneFileLength, Store.digestToString(luceneChecksum+1), Version.LUCENE_4_8_0); - StoreFileMetaData legacy = new StoreFileMetaData("legacy.bin", legacyFileLength, Store.digestToString(adler32LegacyChecksum+1)); + StoreFileMetaData lucene = new StoreFileMetaData("lucene_checksum.bin", luceneFileLength, Store.digestToString(luceneChecksum + 1), Version.LUCENE_4_8_0); + StoreFileMetaData legacy = new StoreFileMetaData("legacy.bin", legacyFileLength, Store.digestToString(adler32LegacyChecksum + 1)); assertTrue(legacy.hasLegacyChecksum()); assertFalse(lucene.hasLegacyChecksum()); assertFalse(Store.checkIntegrityNoException(lucene, dir)); @@ -560,8 +572,8 @@ public class StoreTest extends ElasticsearchTestCase { } { // negative check - wrong length - StoreFileMetaData lucene = new StoreFileMetaData("lucene_checksum.bin", luceneFileLength+1, Store.digestToString(luceneChecksum), Version.LUCENE_4_8_0); - StoreFileMetaData legacy = new StoreFileMetaData("legacy.bin", legacyFileLength+1, Store.digestToString(adler32LegacyChecksum)); + StoreFileMetaData lucene = new StoreFileMetaData("lucene_checksum.bin", luceneFileLength + 1, Store.digestToString(luceneChecksum), Version.LUCENE_4_8_0); + StoreFileMetaData legacy = new StoreFileMetaData("legacy.bin", legacyFileLength + 1, Store.digestToString(adler32LegacyChecksum)); assertTrue(legacy.hasLegacyChecksum()); assertFalse(lucene.hasLegacyChecksum()); assertFalse(Store.checkIntegrityNoException(lucene, dir)); @@ -616,19 +628,19 @@ public class StoreTest extends ElasticsearchTestCase { IOUtils.close(dir); } - private void readIndexInputFullyWithRandomSeeks(IndexInput indexInput) throws IOException{ + private void readIndexInputFullyWithRandomSeeks(IndexInput indexInput) throws IOException { BytesRef ref = new BytesRef(scaledRandomIntBetween(1, 1024)); long pos = 0; while (pos < indexInput.length()) { assertEquals(pos, indexInput.getFilePointer()); int op = random().nextInt(5); - if (op == 0 ) { - int shift = 100 - randomIntBetween(0, 200); - pos = Math.min(indexInput.length() - 1, Math.max(0, pos + shift)); + if (op == 0) { + int shift = 100 - randomIntBetween(0, 200); + pos = Math.min(indexInput.length() - 1, Math.max(0, pos + shift)); indexInput.seek(pos); } else if (op == 1) { indexInput.readByte(); - pos ++; + pos++; } else { int min = (int) Math.min(indexInput.length() - pos, ref.bytes.length); indexInput.readBytes(ref.bytes, ref.offset, min); @@ -673,16 +685,18 @@ public class StoreTest extends ElasticsearchTestCase { public LuceneManagedDirectoryService(Random random) { this(random, true); } + public LuceneManagedDirectoryService(Random random, boolean preventDoubleWrite) { super(new ShardId("fake", 1), ImmutableSettings.EMPTY); - dir = StoreTest.newDirectory(random); - if (dir instanceof MockDirectoryWrapper) { - ((MockDirectoryWrapper)dir).setPreventDoubleWrite(preventDoubleWrite); - // TODO: fix this test to handle virus checker - ((MockDirectoryWrapper)dir).setEnableVirusScanner(false); - } + dir = StoreTest.newDirectory(random); + if (dir instanceof MockDirectoryWrapper) { + ((MockDirectoryWrapper) dir).setPreventDoubleWrite(preventDoubleWrite); + // TODO: fix this test to handle virus checker + ((MockDirectoryWrapper) dir).setEnableVirusScanner(false); + } this.random = random; } + @Override public Directory newDirectory() throws IOException { return dir; @@ -711,11 +725,11 @@ public class StoreTest extends ElasticsearchTestCase { @Test public void testRecoveryDiffWithLegacyCommit() { Map metaDataMap = new HashMap<>(); - metaDataMap.put("segments_1", new StoreFileMetaData("segments_1", 50, null, null, new BytesRef(new byte[] {1}))); + metaDataMap.put("segments_1", new StoreFileMetaData("segments_1", 50, null, null, new BytesRef(new byte[]{1}))); metaDataMap.put("_0_1.del", new StoreFileMetaData("_0_1.del", 42, "foobarbaz", null, new BytesRef())); - Store.MetadataSnapshot first = new Store.MetadataSnapshot(metaDataMap); + Store.MetadataSnapshot first = new Store.MetadataSnapshot(metaDataMap, Collections.EMPTY_MAP); - Store.MetadataSnapshot second = new Store.MetadataSnapshot(metaDataMap); + Store.MetadataSnapshot second = new Store.MetadataSnapshot(metaDataMap, Collections.EMPTY_MAP); Store.RecoveryDiff recoveryDiff = first.recoveryDiff(second); assertEquals(recoveryDiff.toString(), recoveryDiff.different.size(), 2); } @@ -760,7 +774,7 @@ public class StoreTest extends ElasticsearchTestCase { store.close(); } long time = new Date().getTime(); - while(time == new Date().getTime()) { + while (time == new Date().getTime()) { Thread.sleep(10); // bump the time } Store.MetadataSnapshot second; @@ -827,7 +841,7 @@ public class StoreTest extends ElasticsearchTestCase { } Store.RecoveryDiff afterDeleteDiff = metadata.recoveryDiff(second); if (delFile != null) { - assertThat(afterDeleteDiff.identical.size(), equalTo(metadata.size()-2)); // segments_N + del file + assertThat(afterDeleteDiff.identical.size(), equalTo(metadata.size() - 2)); // segments_N + del file assertThat(afterDeleteDiff.different.size(), equalTo(0)); assertThat(afterDeleteDiff.missing.size(), equalTo(2)); } else { @@ -856,7 +870,7 @@ public class StoreTest extends ElasticsearchTestCase { Store.MetadataSnapshot newCommitMetaData = store.getMetadata(); Store.RecoveryDiff newCommitDiff = newCommitMetaData.recoveryDiff(metadata); if (delFile != null) { - assertThat(newCommitDiff.identical.size(), equalTo(newCommitMetaData.size()-5)); // segments_N, del file, cfs, cfe, si for the new segment + assertThat(newCommitDiff.identical.size(), equalTo(newCommitMetaData.size() - 5)); // segments_N, del file, cfs, cfe, si for the new segment assertThat(newCommitDiff.different.size(), equalTo(1)); // the del file must be different assertThat(newCommitDiff.different.get(0).name(), endsWith(".liv")); assertThat(newCommitDiff.missing.size(), equalTo(4)); // segments_N,cfs, cfe, si for the new segment @@ -883,7 +897,7 @@ public class StoreTest extends ElasticsearchTestCase { int docs = 1 + random().nextInt(100); int numCommits = 0; for (int i = 0; i < docs; i++) { - if (i > 0 && randomIntBetween(0, 10 ) == 0) { + if (i > 0 && randomIntBetween(0, 10) == 0) { writer.commit(); numCommits++; } @@ -948,7 +962,7 @@ public class StoreTest extends ElasticsearchTestCase { assertTrue(firstMeta.contains(file) || Store.isChecksum(file) || file.equals("write.lock")); if (Store.isChecksum(file)) { numChecksums++; - } else if (secondMeta.contains(file) == false) { + } else if (secondMeta.contains(file) == false) { numNotFound++; } @@ -967,7 +981,7 @@ public class StoreTest extends ElasticsearchTestCase { assertTrue(file, secondMeta.contains(file) || Store.isChecksum(file) || file.equals("write.lock")); if (Store.isChecksum(file)) { numChecksums++; - } else if (firstMeta.contains(file) == false) { + } else if (firstMeta.contains(file) == false) { numNotFound++; } @@ -985,7 +999,7 @@ public class StoreTest extends ElasticsearchTestCase { Map metaDataMap = new HashMap<>(); metaDataMap.put("segments_1", new StoreFileMetaData("segments_1", 50, null, null, new BytesRef(new byte[]{1}))); metaDataMap.put("_0_1.del", new StoreFileMetaData("_0_1.del", 42, "foobarbaz", null, new BytesRef())); - Store.MetadataSnapshot snapshot = new Store.MetadataSnapshot(metaDataMap); + Store.MetadataSnapshot snapshot = new Store.MetadataSnapshot(metaDataMap, Collections.EMPTY_MAP); final ShardId shardId = new ShardId(new Index("index"), 1); DirectoryService directoryService = new LuceneManagedDirectoryService(random()); @@ -1009,7 +1023,7 @@ public class StoreTest extends ElasticsearchTestCase { final AtomicInteger count = new AtomicInteger(0); final ShardLock lock = new DummyShardLock(shardId); - Store store = new Store(shardId, ImmutableSettings.EMPTY, directoryService, lock , new Store.OnClose() { + Store store = new Store(shardId, ImmutableSettings.EMPTY, directoryService, lock, new Store.OnClose() { @Override public void handle(ShardLock theLock) { assertEquals(shardId, theLock.getShardId()); @@ -1081,4 +1095,95 @@ public class StoreTest extends ElasticsearchTestCase { } return numNonExtra; } + + @Test + public void testMetadataSnapshotStreaming() throws Exception { + + Store.MetadataSnapshot outMetadataSnapshot = createMetaDataSnapshot(); + org.elasticsearch.Version targetNodeVersion = randomVersion(random()); + + ByteArrayOutputStream outBuffer = new ByteArrayOutputStream(); + OutputStreamStreamOutput out = new OutputStreamStreamOutput(outBuffer); + out.setVersion(targetNodeVersion); + outMetadataSnapshot.writeTo(out); + + ByteArrayInputStream inBuffer = new ByteArrayInputStream(outBuffer.toByteArray()); + InputStreamStreamInput in = new InputStreamStreamInput(inBuffer); + in.setVersion(targetNodeVersion); + Store.MetadataSnapshot inMetadataSnapshot = new Store.MetadataSnapshot(in); + Map origEntries = new HashMap<>(); + origEntries.putAll(outMetadataSnapshot.asMap()); + for (Map.Entry entry : inMetadataSnapshot.asMap().entrySet()) { + assertThat(entry.getValue().name(), equalTo(origEntries.remove(entry.getKey()).name())); + } + assertThat(origEntries.size(), equalTo(0)); + assertThat(inMetadataSnapshot.getCommitUserData(), equalTo(outMetadataSnapshot.getCommitUserData())); + } + + protected Store.MetadataSnapshot createMetaDataSnapshot() { + StoreFileMetaData storeFileMetaData1 = new StoreFileMetaData("segments", 1); + StoreFileMetaData storeFileMetaData2 = new StoreFileMetaData("no_segments", 1); + Map storeFileMetaDataMap = new HashMap<>(); + storeFileMetaDataMap.put(storeFileMetaData1.name(), storeFileMetaData1); + storeFileMetaDataMap.put(storeFileMetaData2.name(), storeFileMetaData2); + Map commitUserData = new HashMap<>(); + commitUserData.put("userdata_1", "test"); + commitUserData.put("userdata_2", "test"); + return new Store.MetadataSnapshot(storeFileMetaDataMap, commitUserData); + } + + @Test + public void testUserDataRead() throws IOException { + final ShardId shardId = new ShardId(new Index("index"), 1); + DirectoryService directoryService = new LuceneManagedDirectoryService(random()); + Store store = new Store(shardId, ImmutableSettings.EMPTY, directoryService, new DummyShardLock(shardId)); + IndexWriterConfig config = newIndexWriterConfig(random(), new MockAnalyzer(random())).setCodec(TestUtil.getDefaultCodec()); + SnapshotDeletionPolicy deletionPolicy = new SnapshotDeletionPolicy(new KeepOnlyLastDeletionPolicy(shardId, EMPTY_SETTINGS)); + config.setIndexDeletionPolicy(deletionPolicy); + IndexWriter writer = new IndexWriter(store.directory(), config); + Document doc = new Document(); + doc.add(new TextField("id", "1", Field.Store.NO)); + writer.addDocument(doc); + Map commitData = new HashMap<>(2); + String syncId = "a sync id"; + String translogId = "a translog id"; + commitData.put(Engine.SYNC_COMMIT_ID, syncId); + commitData.put(Translog.TRANSLOG_ID_KEY, translogId); + writer.setCommitData(commitData); + writer.commit(); + writer.close(); + Store.MetadataSnapshot metadata; + if (randomBoolean()) { + metadata = store.getMetadata(); + } else { + metadata = store.getMetadata(deletionPolicy.snapshot()); + } + assertFalse(metadata.asMap().isEmpty()); + // do not check for correct files, we have enough tests for that above + assertThat(metadata.getCommitUserData().get(Engine.SYNC_COMMIT_ID), equalTo(syncId)); + assertThat(metadata.getCommitUserData().get(Translog.TRANSLOG_ID_KEY), equalTo(translogId)); + TestUtil.checkIndex(store.directory()); + assertDeleteContent(store, directoryService); + IOUtils.close(store); + } + + @Test + public void testStreamStoreFilesMetaData() throws Exception { + Store.MetadataSnapshot metadataSnapshot = createMetaDataSnapshot(); + TransportNodesListShardStoreMetaData.StoreFilesMetaData outStoreFileMetaData = new TransportNodesListShardStoreMetaData.StoreFilesMetaData(randomBoolean(), new ShardId("test", 0),metadataSnapshot); + ByteArrayOutputStream outBuffer = new ByteArrayOutputStream(); + OutputStreamStreamOutput out = new OutputStreamStreamOutput(outBuffer); + org.elasticsearch.Version targetNodeVersion = randomVersion(random()); + out.setVersion(targetNodeVersion); + outStoreFileMetaData.writeTo(out); + ByteArrayInputStream inBuffer = new ByteArrayInputStream(outBuffer.toByteArray()); + InputStreamStreamInput in = new InputStreamStreamInput(inBuffer); + in.setVersion(targetNodeVersion); + TransportNodesListShardStoreMetaData.StoreFilesMetaData inStoreFileMetaData = TransportNodesListShardStoreMetaData.StoreFilesMetaData.readStoreFilesMetaData(in); + Iterator outFiles = outStoreFileMetaData.iterator(); + for (StoreFileMetaData inFile : inStoreFileMetaData) { + assertThat(inFile.name(), equalTo(outFiles.next().name())); + } + assertThat(outStoreFileMetaData.syncId(), equalTo(inStoreFileMetaData.syncId())); + } } diff --git a/src/test/java/org/elasticsearch/indices/recovery/StartRecoveryRequestTest.java b/src/test/java/org/elasticsearch/indices/recovery/StartRecoveryRequestTest.java index c15d1d8b552..4a1586e5c45 100644 --- a/src/test/java/org/elasticsearch/indices/recovery/StartRecoveryRequestTest.java +++ b/src/test/java/org/elasticsearch/indices/recovery/StartRecoveryRequestTest.java @@ -19,12 +19,14 @@ package org.elasticsearch.indices.recovery; +import org.apache.lucene.index.IndexFileNames; import org.elasticsearch.Version; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.io.stream.InputStreamStreamInput; import org.elasticsearch.common.io.stream.OutputStreamStreamOutput; import org.elasticsearch.common.transport.LocalTransportAddress; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.store.Store; import org.elasticsearch.index.store.StoreFileMetaData; import org.elasticsearch.test.ElasticsearchTestCase; import org.junit.Test; @@ -32,6 +34,8 @@ import org.junit.Test; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.util.Collections; +import java.util.HashMap; +import java.util.Map; import static org.elasticsearch.test.VersionUtils.randomVersion; import static org.hamcrest.Matchers.equalTo; @@ -49,7 +53,7 @@ public class StartRecoveryRequestTest extends ElasticsearchTestCase { new DiscoveryNode("a", new LocalTransportAddress("1"), targetNodeVersion), new DiscoveryNode("b", new LocalTransportAddress("1"), targetNodeVersion), true, - Collections.emptyMap(), + Store.MetadataSnapshot.EMPTY, RecoveryState.Type.RELOCATION, 1l @@ -69,11 +73,9 @@ public class StartRecoveryRequestTest extends ElasticsearchTestCase { assertThat(outRequest.sourceNode(), equalTo(inRequest.sourceNode())); assertThat(outRequest.targetNode(), equalTo(inRequest.targetNode())); assertThat(outRequest.markAsRelocated(), equalTo(inRequest.markAsRelocated())); - assertThat(outRequest.existingFiles(), equalTo(inRequest.existingFiles())); + assertThat(outRequest.metadataSnapshot().asMap(), equalTo(inRequest.metadataSnapshot().asMap())); assertThat(outRequest.recoveryId(), equalTo(inRequest.recoveryId())); assertThat(outRequest.recoveryType(), equalTo(inRequest.recoveryType())); } - - }