use sync id when recovering

Skip phase 1 of recovery in case an identical sync id was found on primary
and replica. Relates to #10032

closes #10775
This commit is contained in:
Britta Weber 2015-04-24 10:57:53 +02:00
parent d6e0ab3a10
commit 43eae13c57
14 changed files with 558 additions and 242 deletions

View File

@ -330,7 +330,16 @@ public class GatewayAllocator extends AbstractComponent {
if (primaryNodeStore != null && primaryNodeStore.allocated()) {
long sizeMatched = 0;
// see if we have a sync id we can make use of
if (storeFilesMetaData.syncId() != null && storeFilesMetaData.syncId().equals(primaryNodeStore.syncId())) {
logger.trace("{}: node [{}] has same sync id {} as primary", shard,, storeFilesMetaData.syncId());
lastNodeMatched = node;
lastSizeMatched = Long.MAX_VALUE;
lastDiscoNodeMatched = discoNode;
} else {
for (StoreFileMetaData storeFileMetaData : storeFilesMetaData) {
logger.trace("{}: node [{}] has file {}",
if (primaryNodeStore.fileExists( && primaryNodeStore.file( {
sizeMatched += storeFileMetaData.length();
@ -347,6 +356,7 @@ public class GatewayAllocator extends AbstractComponent {
if (lastNodeMatched != null) {
// we only check on THROTTLE since we checked before before on NO

View File

@ -73,7 +73,9 @@ public class InternalEngine extends Engine {
private final FailEngineOnMergeFailure mergeSchedulerFailureListener;
private final MergeSchedulerListener mergeSchedulerListener;
/** When we last pruned expired tombstones from versionMap.deletes: */
* When we last pruned expired tombstones from versionMap.deletes:
private volatile long lastDeleteVersionPruneTimeMSec;
private final ShardIndexingService indexingService;
@ -150,10 +152,21 @@ public class InternalEngine extends Engine {
long nextTranslogID = translogId.v2();
if (translogId.v1() != null && skipInitialTranslogRecovery == false) {
// recovering from local store
recoverFromTranslog(translogId.v1(), transformer);
} else {
// recovering from a different source
// nocommit
// when we create the Engine on a target shard after recovery we must make sure that
// if a sync id is there then it is not overwritten by a forced flush
if (lastCommittedSegmentInfos.getUserData().get(SYNC_COMMIT_ID) == null) {
flush(true, true);
} else {
SyncedFlushResult syncedFlushResult = syncFlushIfNoPendingChanges(lastCommittedSegmentInfos.getUserData().get(SYNC_COMMIT_ID), lastCommittedSegmentInfos.getId());
assert syncedFlushResult.equals(SyncedFlushResult.SUCCESS) : "skipped translog recovery but synced flush failed";
} catch (IOException | EngineException ex) {
throw new EngineCreationFailureException(shardId, "failed to recover from translog", ex);
@ -1058,7 +1071,8 @@ public class InternalEngine extends Engine {
boolean verbose = false;
try {
verbose = Boolean.parseBoolean(System.getProperty("tests.verbose"));
} catch (Throwable ignore) {}
} catch (Throwable ignore) {
iwc.setInfoStream(verbose ? InfoStream.getDefault() : new LoggerInfoStream(logger));
MergePolicy mergePolicy = mergePolicyProvider.getMergePolicy();
@ -1109,7 +1123,9 @@ public class InternalEngine extends Engine {
/** Extended SearcherFactory that warms the segments if needed when acquiring a new searcher */
* Extended SearcherFactory that warms the segments if needed when acquiring a new searcher
class SearchFactory extends EngineSearcherFactory {
SearchFactory(EngineConfig engineConfig) {
@ -1271,9 +1287,20 @@ public class InternalEngine extends Engine {
throw new EngineException(shardId, "failed to recover from translog", e);
flush(true, true);
// nocommit: when we recover from gateway we recover ops from the translog we found and then create a new translog with new id.
// we flush here because we need to write a new translog id after recovery.
// we need to make sure here that an existing sync id is not overwritten by this flush if one exists.
// so, in case the old translog did not contain any ops, we should use the old sync id for flushing.
// nocommit because not sure if this here is the best solution for this...
if (operationsRecovered > 0) {
flush(true, true);
refresh("translog recovery");
} else if (lastCommittedSegmentInfos.getUserData().get(SYNC_COMMIT_ID) == null) {
flush(true, true);
} else {
SyncedFlushResult syncedFlushResult = syncFlushIfNoPendingChanges(lastCommittedSegmentInfos.getUserData().get(SYNC_COMMIT_ID), lastCommittedSegmentInfos.getId());
assert syncedFlushResult.equals(SyncedFlushResult.SUCCESS) : "no operations during translog recovery but synced flush failed";

View File

@ -186,7 +186,7 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements
public void verify(String seed) {
BlobContainer testBlobContainer = blobStore.blobContainer(basePath);;
BlobContainer testBlobContainer = blobStore.blobContainer(basePath);
DiscoveryNode localNode = clusterService.localNode();
if (testBlobContainer.blobExists(testBlobPrefix(seed) + "-master")) {
try (OutputStream outputStream = testBlobContainer.createOutput(testBlobPrefix(seed) + "-" + localNode.getId())) {
@ -247,7 +247,7 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements
* @param stream JSON
* @return snapshot
* @throws IOException if an IOException occurs
* */
public static BlobStoreIndexShardSnapshot readSnapshot(InputStream stream) throws IOException {
try (XContentParser parser = XContentFactory.xContent(XContentType.JSON).createParser(stream)) {
@ -734,7 +734,7 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements
snapshotMetaData.put(fileInfo.metadata().name(), fileInfo.metadata());
fileInfos.put(fileInfo.metadata().name(), fileInfo);
final Store.MetadataSnapshot sourceMetaData = new Store.MetadataSnapshot(snapshotMetaData);
final Store.MetadataSnapshot sourceMetaData = new Store.MetadataSnapshot(snapshotMetaData, Collections.EMPTY_MAP);
final Store.RecoveryDiff diff = sourceMetaData.recoveryDiff(recoveryTargetMetadata);
for (StoreFileMetaData md : diff.identical) {
FileInfo fileInfo = fileInfos.get(;
@ -804,8 +804,8 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements
try (final IndexOutput indexOutput = store.createVerifyingOutput(fileInfo.physicalName(), fileInfo.metadata(), IOContext.DEFAULT)) {
final byte[] buffer = new byte[BUFFER_SIZE];
int length;
while ((length = > 0) {
indexOutput.writeBytes(buffer, 0, length);
recoveryState.getIndex().addRecoveredBytesToFile(, length);
if (restoreRateLimiter != null) {

View File

@ -28,6 +28,7 @@ import*;
import org.apache.lucene.util.*;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.apache.lucene.util.Version;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.inject.Inject;
@ -35,6 +36,7 @@ import;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.lucene.Lucene;
@ -46,6 +48,7 @@ import org.elasticsearch.common.util.SingleObjectCache;
import org.elasticsearch.common.util.concurrent.AbstractRefCounted;
import org.elasticsearch.common.util.concurrent.RefCounted;
import org.elasticsearch.env.ShardLock;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
import org.elasticsearch.index.shard.ShardId;
@ -363,7 +366,7 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
* @throws IOException if the index we try to read is corrupted
public static MetadataSnapshot readMetadataSnapshot(Path indexLocation, ESLogger logger) throws IOException {
try (Directory dir = new SimpleFSDirectory(indexLocation)){
try (Directory dir = new SimpleFSDirectory(indexLocation)) {
failIfCorrupted(dir, new ShardId("", 1));
return new MetadataSnapshot(null, dir, logger);
} catch (IndexNotFoundException ex) {
@ -656,32 +659,60 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
* @see StoreFileMetaData
public final static class MetadataSnapshot implements Iterable<StoreFileMetaData>, Streamable {
public final static class MetadataSnapshot implements Iterable<StoreFileMetaData>, Writeable<MetadataSnapshot> {
private static final ESLogger logger = Loggers.getLogger(MetadataSnapshot.class);
private static final Version FIRST_LUCENE_CHECKSUM_VERSION = Version.LUCENE_4_8;
private Map<String, StoreFileMetaData> metadata;
private final ImmutableMap<String, StoreFileMetaData> metadata;
public static final MetadataSnapshot EMPTY = new MetadataSnapshot();
public MetadataSnapshot(Map<String, StoreFileMetaData> metadata) {
this.metadata = metadata;
private final ImmutableMap<String, String> commitUserData;
public MetadataSnapshot(Map<String, StoreFileMetaData> metadata, Map<String, String> commitUserData) {
ImmutableMap.Builder<String, StoreFileMetaData> metaDataBuilder = ImmutableMap.builder();
this.metadata = metaDataBuilder.putAll(metadata).build();
ImmutableMap.Builder<String, String> commitUserDataBuilder = ImmutableMap.builder();
this.commitUserData = commitUserDataBuilder.putAll(commitUserData).build();
MetadataSnapshot() {
this.metadata = Collections.emptyMap();
metadata = ImmutableMap.of();
commitUserData = ImmutableMap.of();
MetadataSnapshot(IndexCommit commit, Directory directory, ESLogger logger) throws IOException {
metadata = buildMetadata(commit, directory, logger);
Tuple<ImmutableMap<String, StoreFileMetaData>, ImmutableMap<String, String>> loadedMetadata = loadMetadata(commit, directory, logger);
metadata = loadedMetadata.v1();
commitUserData = loadedMetadata.v2();
assert metadata.isEmpty() || numSegmentFiles() == 1 : "numSegmentFiles: " + numSegmentFiles();
ImmutableMap<String, StoreFileMetaData> buildMetadata(IndexCommit commit, Directory directory, ESLogger logger) throws IOException {
public MetadataSnapshot(StreamInput in) throws IOException {
int size = in.readVInt();
ImmutableMap.Builder<String, StoreFileMetaData> metadataBuilder = ImmutableMap.builder();
for (int i = 0; i < size; i++) {
StoreFileMetaData meta = StoreFileMetaData.readStoreFileMetaData(in);
metadataBuilder.put(, meta);
ImmutableMap.Builder<String, String> commitUserDataBuilder = ImmutableMap.builder();
int num = in.readVInt();
for (int i = num; i > 0; i--) {
commitUserDataBuilder.put(in.readString(), in.readString());
this.commitUserData =;
this.metadata =;
assert metadata.isEmpty() || numSegmentFiles() == 1 : "numSegmentFiles: " + numSegmentFiles();
static Tuple<ImmutableMap<String, StoreFileMetaData>, ImmutableMap<String, String>> loadMetadata(IndexCommit commit, Directory directory, ESLogger logger) throws IOException {
ImmutableMap.Builder<String, StoreFileMetaData> builder = ImmutableMap.builder();
Map<String, String> checksumMap = readLegacyChecksums(directory).v1();
ImmutableMap.Builder<String, String> commitUserDataBuilder = ImmutableMap.builder();
try {
final SegmentInfos segmentCommitInfos = Store.readSegmentsInfo(commit, directory);
Version maxVersion = Version.LUCENE_4_0; // we don't know which version was used to write so we take the max version.
for (SegmentCommitInfo info : segmentCommitInfos) {
final Version version =;
@ -734,7 +765,7 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
throw ex;
return new Tuple<ImmutableMap<String, StoreFileMetaData>, ImmutableMap<String, String>>(,;
@ -955,30 +986,21 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
return metadata.size();
public static MetadataSnapshot read(StreamInput in) throws IOException {
MetadataSnapshot storeFileMetaDatas = new MetadataSnapshot();
return storeFileMetaDatas;
public void readFrom(StreamInput in) throws IOException {
int size = in.readVInt();
ImmutableMap.Builder<String, StoreFileMetaData> builder = ImmutableMap.builder();
for (int i = 0; i < size; i++) {
StoreFileMetaData meta = StoreFileMetaData.readStoreFileMetaData(in);
builder.put(, meta);
this.metadata =;
assert metadata.isEmpty() || numSegmentFiles() == 1 : "numSegmentFiles: " + numSegmentFiles();
public void writeTo(StreamOutput out) throws IOException {
for (StoreFileMetaData meta : this) {
for (Map.Entry<String, String> entry : commitUserData.entrySet()) {
public Map<String, String> getCommitUserData() {
return commitUserData;
@ -1010,6 +1032,20 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
return count;
* Returns the sync id of the commit point that this MetadataSnapshot represents.
* @return sync id if exists, else null
public String getSyncId() {
return commitUserData.get(Engine.SYNC_COMMIT_ID);
public MetadataSnapshot readFrom(StreamInput in) throws IOException {
return new MetadataSnapshot(in);

View File

@ -61,7 +61,7 @@ class RecoveryCleanFilesRequest extends TransportRequest {
recoveryId = in.readLong();
shardId = ShardId.readShardId(in);
snapshotFiles =;
snapshotFiles = new Store.MetadataSnapshot(in);
totalTranslogOps = in.readVInt();

View File

@ -65,6 +65,7 @@ import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportService;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.*;
@ -137,11 +138,11 @@ public class RecoverySourceHandler implements Engine.RecoveryHandler {
* Perform phase1 of the recovery operations. Once this {@link SnapshotIndexCommit}
* snapshot has been performed no commit operations (files being fsync'd)
* are effectively allowed on this index until all recovery phases are done
* <p/>
* Phase1 examines the segment files on the target node and copies over the
* segments that are missing. Only segments that have the same size and
* checksum can be reused
* <p/>
* {@code InternalEngine#recover} is responsible for snapshotting the index
* and releasing the snapshot once all 3 phases of recovery are complete
@ -168,7 +169,23 @@ public class RecoverySourceHandler implements Engine.RecoveryHandler {
// Generate a "diff" of all the identical, different, and missing
// segment files on the target node, using the existing files on
// the source node
final Store.RecoveryDiff diff = recoverySourceMetadata.recoveryDiff(new Store.MetadataSnapshot(request.existingFiles()));
String recoverySourceSyncId = recoverySourceMetadata.getSyncId();
String recoveryTargetSyncId = request.metadataSnapshot().getSyncId();
final boolean recoverWithSyncId = recoverySourceSyncId != null &&
if (recoverWithSyncId) {
for (StoreFileMetaData md : request.metadataSnapshot()) {
existingTotalSize += md.length();
if (logger.isTraceEnabled()) {
logger.trace("[{}][{}] recovery [phase1] to {}: not recovering [{}], checksum [{}], size [{}], sync ids {} coincide, will skip file copy",
indexName, shardId, request.targetNode(),, md.checksum(), md.length(), recoverySourceMetadata.getCommitUserData().get(Engine.SYNC_COMMIT_ID));
totalSize += md.length();
} else {
final Store.RecoveryDiff diff = recoverySourceMetadata.recoveryDiff(request.metadataSnapshot());
for (StoreFileMetaData md : diff.identical) {
@ -180,9 +197,9 @@ public class RecoverySourceHandler implements Engine.RecoveryHandler {
totalSize += md.length();
for (StoreFileMetaData md : Iterables.concat(diff.different, diff.missing)) {
if (request.existingFiles().containsKey( {
if (request.metadataSnapshot().asMap().containsKey( {
logger.trace("[{}][{}] recovery [phase1] to {}: recovering [{}], exists in local store, but is different: remote [{}], local [{}]",
indexName, shardId, request.targetNode(),, request.existingFiles().get(, md);
indexName, shardId, request.targetNode(),, request.metadataSnapshot().asMap().get(, md);
} else {
logger.trace("[{}][{}] recovery [phase1] to {}: recovering [{}], does not exists in remote",
indexName, shardId, request.targetNode(),;
@ -191,6 +208,7 @@ public class RecoverySourceHandler implements Engine.RecoveryHandler {
totalSize += md.length();
response.phase1TotalSize = totalSize;
response.phase1ExistingTotalSize = existingTotalSize;
@ -209,7 +227,6 @@ public class RecoverySourceHandler implements Engine.RecoveryHandler {
// This latch will be used to wait until all files have been transferred to the target node
final CountDownLatch latch = new CountDownLatch(response.phase1FileNames.size());
final CopyOnWriteArrayList<Throwable> exceptions = new CopyOnWriteArrayList<>();
@ -364,8 +381,9 @@ public class RecoverySourceHandler implements Engine.RecoveryHandler {
// related to this recovery (out of date segments, for example)
// are deleted
try {
final Store.MetadataSnapshot remainingFilesAfterCleanup = recoverWithSyncId? request.metadataSnapshot(): recoverySourceMetadata;
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.CLEAN_FILES,
new RecoveryCleanFilesRequest(request.recoveryId(), shard.shardId(), recoverySourceMetadata, shard.translog().estimatedNumberOfOperations()),
new RecoveryCleanFilesRequest(request.recoveryId(), shard.shardId(), remainingFilesAfterCleanup, shard.translog().estimatedNumberOfOperations()),
} catch (RemoteTransportException remoteException) {
@ -418,12 +436,12 @@ public class RecoverySourceHandler implements Engine.RecoveryHandler {
* Perform phase2 of the recovery process
* <p/>
* Phase2 takes a snapshot of the current translog *without* acquiring the
* write lock (however, the translog snapshot is a point-in-time view of
* the translog). It then sends each translog operation to the target node
* so it can be replayed into the new shard.
* <p/>
* {@code InternalEngine#recover} is responsible for taking the snapshot
* of the translog and releasing it once all 3 phases of recovery are complete
@ -469,11 +487,11 @@ public class RecoverySourceHandler implements Engine.RecoveryHandler {
* Perform phase 3 of the recovery process
* <p/>
* Phase3 again takes a snapshot of the translog, however this time the
* snapshot is acquired under a write lock. The translog operations are
* sent to the target node where they are replayed.
* <p/>
* {@code InternalEngine#recover} is responsible for taking the snapshot
* of the translog, and after phase 3 completes the snapshots from all
* three phases are released.
@ -587,7 +605,7 @@ public class RecoverySourceHandler implements Engine.RecoveryHandler {
* Send the given snapshot's operations to this handler's target node.
* <p/>
* Operations are bulked into a single request depending on an operation
* count limit or size-in-bytes limit
@ -601,7 +619,7 @@ public class RecoverySourceHandler implements Engine.RecoveryHandler {
Translog.Operation operation;
try {
operation =; // this ex should bubble up
} catch (IOException ex){
} catch (IOException ex) {
throw new ElasticsearchException("failed to get next operation from translog", ex);
@ -659,9 +677,10 @@ public class RecoverySourceHandler implements Engine.RecoveryHandler {
try {
operation =; // this ex should bubble up
} catch (IOException ex){
} catch (IOException ex) {
throw new ElasticsearchException("failed to get next operation from translog", ex);
} }
// send the leftover
if (!operations.isEmpty()) {
cancellableThreads.execute(new Interruptable() {

View File

@ -26,6 +26,7 @@ import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.logging.ESLoggerFactory;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
@ -47,16 +48,24 @@ public class RecoveryState implements ToXContent, Streamable {
public static enum Stage {
INIT((byte) 0),
/** recovery of lucene files, either reusing local ones are copying new ones */
* recovery of lucene files, either reusing local ones are copying new ones
INDEX((byte) 1),
/** potentially running check index */
* potentially running check index
VERIFY_INDEX((byte) 2),
/** starting up the engine, replaying the translog */
* starting up the engine, replaying the translog
TRANSLOG((byte) 3),
/** performing final task after all translog ops have been done */
* performing final task after all translog ops have been done
FINALIZE((byte) 4),
DONE((byte) 5);
@ -494,7 +503,9 @@ public class RecoveryState implements ToXContent, Streamable {
assert total == UNKNOWN || total >= recovered : "total, if known, should be > recovered. total [" + total + "], recovered [" + recovered + "]";
/** returns the total number of translog operations recovered so far */
* returns the total number of translog operations recovered so far
public synchronized int recoveredOperations() {
return recovered;
@ -587,22 +598,30 @@ public class RecoveryState implements ToXContent, Streamable {
recovered += bytes;
/** file name * */
* file name *
public String name() {
return name;
/** file length * */
* file length *
public long length() {
return length;
/** number of bytes recovered for this file (so far). 0 if the file is reused * */
* number of bytes recovered for this file (so far). 0 if the file is reused *
public long recovered() {
return recovered;
/** returns true if the file is reused from a local copy */
* returns true if the file is reused from a local copy
public boolean reused() {
return reused;
@ -729,12 +748,16 @@ public class RecoveryState implements ToXContent, Streamable {
return TimeValue.timeValueNanos(targetThrottleTimeInNanos);
/** total number of files that are part of this recovery, both re-used and recovered */
* total number of files that are part of this recovery, both re-used and recovered
public synchronized int totalFileCount() {
return fileDetails.size();
/** total number of files to be recovered (potentially not yet done) */
* total number of files to be recovered (potentially not yet done)
public synchronized int totalRecoverFiles() {
int total = 0;
for (File file : fileDetails.values()) {
@ -746,7 +769,9 @@ public class RecoveryState implements ToXContent, Streamable {
/** number of file that were recovered (excluding on ongoing files) */
* number of file that were recovered (excluding on ongoing files)
public synchronized int recoveredFileCount() {
int count = 0;
for (File file : fileDetails.values()) {
@ -757,7 +782,9 @@ public class RecoveryState implements ToXContent, Streamable {
return count;
/** percent of recovered (i.e., not reused) files out of the total files to be recovered */
* percent of recovered (i.e., not reused) files out of the total files to be recovered
public synchronized float recoveredFilesPercent() {
int total = 0;
int recovered = 0;
@ -780,7 +807,9 @@ public class RecoveryState implements ToXContent, Streamable {
/** total number of bytes in th shard */
* total number of bytes in th shard
public synchronized long totalBytes() {
long total = 0;
for (File file : fileDetails.values()) {
@ -789,7 +818,9 @@ public class RecoveryState implements ToXContent, Streamable {
return total;
/** total number of bytes recovered so far, including both existing and reused */
* total number of bytes recovered so far, including both existing and reused
public synchronized long recoveredBytes() {
long recovered = 0;
for (File file : fileDetails.values()) {
@ -798,7 +829,9 @@ public class RecoveryState implements ToXContent, Streamable {
return recovered;
/** total bytes of files to be recovered (potentially not yet done) */
* total bytes of files to be recovered (potentially not yet done)
public synchronized long totalRecoverBytes() {
long total = 0;
for (File file : fileDetails.values()) {
@ -819,7 +852,9 @@ public class RecoveryState implements ToXContent, Streamable {
return total;
/** percent of bytes recovered out of total files bytes *to be* recovered */
* percent of bytes recovered out of total files bytes *to be* recovered
public synchronized float recoveredBytesPercent() {
long total = 0;
long recovered = 0;

View File

@ -40,6 +40,7 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.CancellableThreads;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.index.IndexShardMissingException;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.RecoveryEngineException;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.shard.*;
@ -155,12 +156,12 @@ public class RecoveryTarget extends AbstractComponent {
assert recoveryStatus.sourceNode() != null : "can't do a recovery without a source node";
logger.trace("collecting local files for {}", recoveryStatus);
Map<String, StoreFileMetaData> existingFiles;
Store.MetadataSnapshot metadataSnapshot = null;
try {
existingFiles =;
metadataSnapshot =;
} catch (IOException e) {
logger.warn("error while listing local files, recover as if there are none", e);
existingFiles = Store.MetadataSnapshot.EMPTY.asMap();
metadataSnapshot = Store.MetadataSnapshot.EMPTY;
} catch (Exception e) {
// this will be logged as warning later on...
logger.trace("unexpected error while listing local files, failing recovery", e);
@ -169,7 +170,7 @@ public class RecoveryTarget extends AbstractComponent {
final StartRecoveryRequest request = new StartRecoveryRequest(recoveryStatus.shardId(), recoveryStatus.sourceNode(), clusterService.localNode(),
false, existingFiles, recoveryStatus.state().getType(), recoveryStatus.recoveryId());
false, metadataSnapshot, recoveryStatus.state().getType(), recoveryStatus.recoveryId());
final AtomicReference<RecoveryResponse> responseHolder = new AtomicReference<>();
try {

View File

@ -25,6 +25,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.transport.TransportRequest;
@ -46,7 +47,7 @@ public class StartRecoveryRequest extends TransportRequest {
private boolean markAsRelocated;
private Map<String, StoreFileMetaData> existingFiles;
private Store.MetadataSnapshot metadataSnapshot;
private RecoveryState.Type recoveryType;
@ -60,17 +61,16 @@ public class StartRecoveryRequest extends TransportRequest {
* @param sourceNode The node to recover from
* @param targetNode The node to recover to
* @param markAsRelocated
* @param existingFiles
* @param metadataSnapshot
public StartRecoveryRequest(ShardId shardId, DiscoveryNode sourceNode, DiscoveryNode targetNode, boolean markAsRelocated, Map<String,
StoreFileMetaData> existingFiles, RecoveryState.Type recoveryType, long recoveryId) {
public StartRecoveryRequest(ShardId shardId, DiscoveryNode sourceNode, DiscoveryNode targetNode, boolean markAsRelocated, Store.MetadataSnapshot metadataSnapshot, RecoveryState.Type recoveryType, long recoveryId) {
this.recoveryId = recoveryId;
this.shardId = shardId;
this.sourceNode = sourceNode;
this.targetNode = targetNode;
this.markAsRelocated = markAsRelocated;
this.existingFiles = existingFiles;
this.recoveryType = recoveryType;
this.metadataSnapshot = metadataSnapshot;
public long recoveryId() {
@ -93,14 +93,14 @@ public class StartRecoveryRequest extends TransportRequest {
return markAsRelocated;
public Map<String, StoreFileMetaData> existingFiles() {
return existingFiles;
public RecoveryState.Type recoveryType() {
return recoveryType;
public Store.MetadataSnapshot metadataSnapshot() {
return metadataSnapshot;
public void readFrom(StreamInput in) throws IOException {
@ -109,13 +109,9 @@ public class StartRecoveryRequest extends TransportRequest {
sourceNode = DiscoveryNode.readNode(in);
targetNode = DiscoveryNode.readNode(in);
markAsRelocated = in.readBoolean();
int size = in.readVInt();
existingFiles = Maps.newHashMapWithExpectedSize(size);
for (int i = 0; i < size; i++) {
StoreFileMetaData md = StoreFileMetaData.readStoreFileMetaData(in);
existingFiles.put(, md);
metadataSnapshot = new Store.MetadataSnapshot(in);
recoveryType = RecoveryState.Type.fromId(in.readByte());
@ -126,10 +122,8 @@ public class StartRecoveryRequest extends TransportRequest {
for (StoreFileMetaData md : existingFiles.values()) {

View File

@ -40,6 +40,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardPath;
@ -144,7 +145,7 @@ public class TransportNodesListShardStoreMetaData extends TransportNodesOperatio
try {
exists = true;
return new StoreFilesMetaData(true, shardId, store.getMetadataOrEmpty().asMap());
return new StoreFilesMetaData(true, shardId, store.getMetadataOrEmpty());
} finally {
@ -153,17 +154,17 @@ public class TransportNodesListShardStoreMetaData extends TransportNodesOperatio
// try and see if we an list unallocated
IndexMetaData metaData = clusterService.state().metaData().index(shardId.index().name());
if (metaData == null) {
return new StoreFilesMetaData(false, shardId, ImmutableMap.<String, StoreFileMetaData>of());
return new StoreFilesMetaData(false, shardId, Store.MetadataSnapshot.EMPTY);
String storeType = metaData.settings().get(IndexStoreModule.STORE_TYPE, "fs");
if (!storeType.contains("fs")) {
return new StoreFilesMetaData(false, shardId, ImmutableMap.<String, StoreFileMetaData>of());
return new StoreFilesMetaData(false, shardId, Store.MetadataSnapshot.EMPTY);
final ShardPath shardPath = ShardPath.loadShardPath(logger, nodeEnv, shardId, metaData.settings());
if (shardPath == null) {
return new StoreFilesMetaData(false, shardId, ImmutableMap.<String, StoreFileMetaData>of());
return new StoreFilesMetaData(false, shardId, Store.MetadataSnapshot.EMPTY);
return new StoreFilesMetaData(false, shardId, Store.readMetadataSnapshot(shardPath.resolveIndex(), logger).asMap());
return new StoreFilesMetaData(false, shardId, Store.readMetadataSnapshot(shardPath.resolveIndex(), logger));
} finally {
TimeValue took = new TimeValue(System.currentTimeMillis() - startTime);
if (exists) {
@ -180,17 +181,18 @@ public class TransportNodesListShardStoreMetaData extends TransportNodesOperatio
public static class StoreFilesMetaData implements Iterable<StoreFileMetaData>, Streamable {
// here also trasmit sync id, else recovery will not use sync id because of stupid gateway allocator every now and then...
private boolean allocated;
private ShardId shardId;
private Map<String, StoreFileMetaData> files;
Store.MetadataSnapshot metadataSnapshot;
StoreFilesMetaData() {
public StoreFilesMetaData(boolean allocated, ShardId shardId, Map<String, StoreFileMetaData> files) {
public StoreFilesMetaData(boolean allocated, ShardId shardId, Store.MetadataSnapshot metadataSnapshot) {
this.allocated = allocated;
this.shardId = shardId;
this.files = files;
this.metadataSnapshot = metadataSnapshot;
public boolean allocated() {
@ -203,15 +205,15 @@ public class TransportNodesListShardStoreMetaData extends TransportNodesOperatio
public Iterator<StoreFileMetaData> iterator() {
return files.values().iterator();
return metadataSnapshot.iterator();
public boolean fileExists(String name) {
return files.containsKey(name);
return metadataSnapshot.asMap().containsKey(name);
public StoreFileMetaData file(String name) {
return files.get(name);
return metadataSnapshot.asMap().get(name);
public static StoreFilesMetaData readStoreFilesMetaData(StreamInput in) throws IOException {
@ -224,22 +226,18 @@ public class TransportNodesListShardStoreMetaData extends TransportNodesOperatio
public void readFrom(StreamInput in) throws IOException {
allocated = in.readBoolean();
shardId = ShardId.readShardId(in);
int size = in.readVInt();
files = Maps.newHashMapWithExpectedSize(size);
for (int i = 0; i < size; i++) {
StoreFileMetaData md = StoreFileMetaData.readStoreFileMetaData(in);
files.put(, md);
this.metadataSnapshot = new Store.MetadataSnapshot(in);
public void writeTo(StreamOutput out) throws IOException {
for (StoreFileMetaData md : files.values()) {
public String syncId() {
return metadataSnapshot.getSyncId();

View File

@ -22,19 +22,26 @@ package org.elasticsearch.gateway;
import org.apache.lucene.util.LuceneTestCase.Slow;
import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse;
import org.elasticsearch.action.admin.indices.recovery.ShardRecoveryResponse;
import org.elasticsearch.action.admin.indices.stats.IndexStats;
import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.query.FilterBuilders;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.SyncedFlushService;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope;
import org.elasticsearch.test.InternalTestCluster.RestartCallback;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.junit.Test;
@ -47,10 +54,7 @@ import static org.elasticsearch.index.query.QueryBuilders.termQuery;
import static org.elasticsearch.test.ElasticsearchIntegrationTest.Scope;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.*;
@ -346,11 +350,13 @@ public class RecoveryFromGatewayTests extends ElasticsearchIntegrationTest {
public void testReusePeerRecovery() throws Exception {
final Settings settings = settingsBuilder()
.put("action.admin.cluster.node.shutdown.delay", "10ms")
.put(MockFSDirectoryService.CHECK_INDEX_ON_CLOSE, false)
.put("gateway.recover_after_nodes", 4)
.put(MockFSDirectoryService.CRASH_INDEX, false).build();
@ -377,6 +383,8 @@ public class RecoveryFromGatewayTests extends ElasticsearchIntegrationTest {
client().admin().indices().prepareOptimize("test").setMaxNumSegments(100).get(); // just wait for merges
boolean useSyncIds = randomBoolean();
if (useSyncIds == false) {"--> disabling allocation while the cluster is shut down");
// Disable allocations while we are closing nodes
@ -389,8 +397,17 @@ public class RecoveryFromGatewayTests extends ElasticsearchIntegrationTest {"--> waiting for cluster to return to green after first shutdown");
} else {"--> trying to sync flush");
int numShards = Integer.parseInt(client().admin().indices().prepareGetSettings("test").get().getSetting("test", "index.number_of_shards"));
SyncedFlushService syncedFlushService = internalCluster().getInstance(SyncedFlushService.class);
for (int i = 0; i < numShards; i++) {
assertTrue(syncedFlushService.attemptSyncedFlush(new ShardId("test", i)).success());
}"--> disabling allocation while the cluster is shut down second time");"--> disabling allocation while the cluster is shut down", useSyncIds ? "" : " a second time");
// Disable allocations while we are closing nodes
@ -399,9 +416,12 @@ public class RecoveryFromGatewayTests extends ElasticsearchIntegrationTest {"--> full cluster restart");
internalCluster().fullRestart();"--> waiting for cluster to return to green after second shutdown");"--> waiting for cluster to return to green after {}shutdown", useSyncIds ? "" : "second ");
if (useSyncIds) {
RecoveryResponse recoveryResponse = client().admin().indices().prepareRecoveries("test").get();
for (ShardRecoveryResponse response : recoveryResponse.shardResponses().get("test")) {
RecoveryState recoveryState = response.recoveryState();
@ -411,25 +431,94 @@ public class RecoveryFromGatewayTests extends ElasticsearchIntegrationTest {
recovered += file.length();
if (!recoveryState.getPrimary()) {
if (!recoveryState.getPrimary() && (useSyncIds == false)) {"--> replica shard {} recovered from {} to {}, recovered {}, reuse {}",
response.getShardId(), recoveryState.getSourceNode().name(), recoveryState.getTargetNode().name(),
recoveryState.getIndex().recoveredBytes(), recoveryState.getIndex().reusedBytes());
assertThat("no bytes should be recovered", recoveryState.getIndex().recoveredBytes(), equalTo(recovered));
assertThat("data should have been reused", recoveryState.getIndex().reusedBytes(), greaterThan(0l));
// we have to recover the segments file since we commit the translog ID on engine startup
assertThat("all bytes should be reused except of the segments file", recoveryState.getIndex().reusedBytes(), equalTo(recoveryState.getIndex().totalBytes()-recovered));
assertThat("all bytes should be reused except of the segments file", recoveryState.getIndex().reusedBytes(), equalTo(recoveryState.getIndex().totalBytes() - recovered));
assertThat("no files should be recovered except of the segments file", recoveryState.getIndex().recoveredFileCount(), equalTo(1));
assertThat("all files should be reused except of the segments file", recoveryState.getIndex().reusedFileCount(), equalTo(recoveryState.getIndex().totalFileCount()-1));
assertThat("all files should be reused except of the segments file", recoveryState.getIndex().reusedFileCount(), equalTo(recoveryState.getIndex().totalFileCount() - 1));
assertThat("> 0 files should be reused", recoveryState.getIndex().reusedFileCount(), greaterThan(0));
} else {
if (useSyncIds && !recoveryState.getPrimary()) {"--> replica shard {} recovered from {} to {} using sync id, recovered {}, reuse {}",
response.getShardId(), recoveryState.getSourceNode().name(), recoveryState.getTargetNode().name(),
recoveryState.getIndex().recoveredBytes(), recoveryState.getIndex().reusedBytes());
assertThat(recoveryState.getIndex().recoveredBytes(), equalTo(0l));
assertThat(recoveryState.getIndex().reusedBytes(), equalTo(recoveryState.getIndex().totalBytes()));
assertThat(recoveryState.getIndex().recoveredFileCount(), equalTo(0));
assertThat(recoveryState.getIndex().reusedFileCount(), equalTo(recoveryState.getIndex().totalFileCount()));
public void assertSyncIdsNotNull() {
IndexStats indexStats = client().admin().indices().prepareStats("test").get().getIndex("test");
for (ShardStats shardStats : indexStats.getShards()) {
public void testSyncFlushedRecovery() throws Exception {
final Settings settings = settingsBuilder()
.put("action.admin.cluster.node.shutdown.delay", "10ms")
.put(MockFSDirectoryService.CHECK_INDEX_ON_CLOSE, false)
.put("gateway.recover_after_nodes", 4)
.put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_CONCURRENT_RECOVERIES, 4).build();
internalCluster().startNodesAsync(4, settings).get();
// prevent any rebalance actions during the recovery
.put(EnableAllocationDecider.INDEX_ROUTING_REBALANCE_ENABLE, EnableAllocationDecider.Rebalance.NONE)));
ensureGreen();"--> indexing docs");
for (int i = 0; i < 1000; i++) {
client().prepareIndex("test", "type").setSource("field", "value").execute().actionGet();
}"--> disabling allocation while the cluster is shut down");
// Disable allocations while we are closing nodes
.put(EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE, EnableAllocationDecider.Allocation.NONE))
SyncedFlushService syncedFlushService = internalCluster().getInstance(SyncedFlushService.class);
assertTrue(syncedFlushService.attemptSyncedFlush(new ShardId("test", 0)).success());"--> full cluster restart");
internalCluster().fullRestart();"--> waiting for cluster to return to green after first shutdown");
RecoveryResponse recoveryResponse = client().admin().indices().prepareRecoveries("test").get();
for (ShardRecoveryResponse response : recoveryResponse.shardResponses().get("test")) {
RecoveryState recoveryState = response.recoveryState();
if (!recoveryState.getPrimary()) {"--> replica shard {} recovered from {} to {}, recovered {}, reuse {}",
response.getShardId(), recoveryState.getSourceNode().name(), recoveryState.getTargetNode().name(),
recoveryState.getIndex().recoveredBytes(), recoveryState.getIndex().reusedBytes());
} else {"--> replica shard {} recovered from {} to {}, recovered {}, reuse {}",
response.getShardId(), recoveryState.getSourceNode().name(), recoveryState.getTargetNode().name(),
recoveryState.getIndex().recoveredBytes(), recoveryState.getIndex().reusedBytes());
assertThat("no bytes should be recovered", recoveryState.getIndex().recoveredBytes(), equalTo(0l));
assertThat("data should have been reused", recoveryState.getIndex().reusedBytes(), equalTo(recoveryState.getIndex().totalBytes()));
assertThat("no files should be recovered", recoveryState.getIndex().recoveredFileCount(), equalTo(0));
assertThat("all files should be reused", recoveryState.getIndex().reusedFileCount(), equalTo(recoveryState.getIndex().totalFileCount()));

View File

@ -32,17 +32,26 @@ import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.TestUtil;
import org.apache.lucene.util.Version;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.env.ShardLock;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.deletionpolicy.KeepOnlyLastDeletionPolicy;
import org.elasticsearch.index.deletionpolicy.SnapshotDeletionPolicy;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.test.DummyShardLock;
import org.elasticsearch.test.ElasticsearchTestCase;
import org.hamcrest.Matchers;
import org.junit.Test;
import java.nio.file.NoSuchFileException;
@ -51,6 +60,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import static org.elasticsearch.common.settings.ImmutableSettings.Builder.EMPTY_SETTINGS;
import static org.elasticsearch.test.VersionUtils.randomVersion;
import static org.hamcrest.Matchers.*;
public class StoreTest extends ElasticsearchTestCase {
@ -180,6 +191,7 @@ public class StoreTest extends ElasticsearchTestCase {
public SegmentInfo read(Directory directory, String segmentName, byte[] segmentID, IOContext context) throws IOException {
return, segmentName, segmentID, context);
// this sucks it's a full copy of Lucene50SegmentInfoFormat but hey I couldn't find a way to make it write 4_5_0 versions
// somebody was too paranoid when implementing this. ey rmuir, was that you? - go fix it :P
@ -536,7 +548,7 @@ public class StoreTest extends ElasticsearchTestCase {
final long luceneChecksum;
final long adler32LegacyChecksum = adler32.getValue();
try(IndexInput indexInput = dir.openInput("lucene_checksum.bin", IOContext.DEFAULT)) {
try (IndexInput indexInput = dir.openInput("lucene_checksum.bin", IOContext.DEFAULT)) {
assertEquals(luceneFileLength, indexInput.length());
luceneChecksum = CodecUtil.retrieveChecksum(indexInput);
@ -551,8 +563,8 @@ public class StoreTest extends ElasticsearchTestCase {
{ // negative check - wrong checksum
StoreFileMetaData lucene = new StoreFileMetaData("lucene_checksum.bin", luceneFileLength, Store.digestToString(luceneChecksum+1), Version.LUCENE_4_8_0);
StoreFileMetaData legacy = new StoreFileMetaData("legacy.bin", legacyFileLength, Store.digestToString(adler32LegacyChecksum+1));
StoreFileMetaData lucene = new StoreFileMetaData("lucene_checksum.bin", luceneFileLength, Store.digestToString(luceneChecksum + 1), Version.LUCENE_4_8_0);
StoreFileMetaData legacy = new StoreFileMetaData("legacy.bin", legacyFileLength, Store.digestToString(adler32LegacyChecksum + 1));
assertFalse(Store.checkIntegrityNoException(lucene, dir));
@ -560,8 +572,8 @@ public class StoreTest extends ElasticsearchTestCase {
{ // negative check - wrong length
StoreFileMetaData lucene = new StoreFileMetaData("lucene_checksum.bin", luceneFileLength+1, Store.digestToString(luceneChecksum), Version.LUCENE_4_8_0);
StoreFileMetaData legacy = new StoreFileMetaData("legacy.bin", legacyFileLength+1, Store.digestToString(adler32LegacyChecksum));
StoreFileMetaData lucene = new StoreFileMetaData("lucene_checksum.bin", luceneFileLength + 1, Store.digestToString(luceneChecksum), Version.LUCENE_4_8_0);
StoreFileMetaData legacy = new StoreFileMetaData("legacy.bin", legacyFileLength + 1, Store.digestToString(adler32LegacyChecksum));
assertFalse(Store.checkIntegrityNoException(lucene, dir));
@ -616,19 +628,19 @@ public class StoreTest extends ElasticsearchTestCase {
private void readIndexInputFullyWithRandomSeeks(IndexInput indexInput) throws IOException{
private void readIndexInputFullyWithRandomSeeks(IndexInput indexInput) throws IOException {
BytesRef ref = new BytesRef(scaledRandomIntBetween(1, 1024));
long pos = 0;
while (pos < indexInput.length()) {
assertEquals(pos, indexInput.getFilePointer());
int op = random().nextInt(5);
if (op == 0 ) {
if (op == 0) {
int shift = 100 - randomIntBetween(0, 200);
pos = Math.min(indexInput.length() - 1, Math.max(0, pos + shift));;
} else if (op == 1) {
pos ++;
} else {
int min = (int) Math.min(indexInput.length() - pos, ref.bytes.length);
indexInput.readBytes(ref.bytes, ref.offset, min);
@ -673,16 +685,18 @@ public class StoreTest extends ElasticsearchTestCase {
public LuceneManagedDirectoryService(Random random) {
this(random, true);
public LuceneManagedDirectoryService(Random random, boolean preventDoubleWrite) {
super(new ShardId("fake", 1), ImmutableSettings.EMPTY);
dir = StoreTest.newDirectory(random);
if (dir instanceof MockDirectoryWrapper) {
((MockDirectoryWrapper) dir).setPreventDoubleWrite(preventDoubleWrite);
// TODO: fix this test to handle virus checker
((MockDirectoryWrapper) dir).setEnableVirusScanner(false);
this.random = random;
public Directory newDirectory() throws IOException {
return dir;
@ -711,11 +725,11 @@ public class StoreTest extends ElasticsearchTestCase {
public void testRecoveryDiffWithLegacyCommit() {
Map<String, StoreFileMetaData> metaDataMap = new HashMap<>();
metaDataMap.put("segments_1", new StoreFileMetaData("segments_1", 50, null, null, new BytesRef(new byte[] {1})));
metaDataMap.put("segments_1", new StoreFileMetaData("segments_1", 50, null, null, new BytesRef(new byte[]{1})));
metaDataMap.put("_0_1.del", new StoreFileMetaData("_0_1.del", 42, "foobarbaz", null, new BytesRef()));
Store.MetadataSnapshot first = new Store.MetadataSnapshot(metaDataMap);
Store.MetadataSnapshot first = new Store.MetadataSnapshot(metaDataMap, Collections.EMPTY_MAP);
Store.MetadataSnapshot second = new Store.MetadataSnapshot(metaDataMap);
Store.MetadataSnapshot second = new Store.MetadataSnapshot(metaDataMap, Collections.EMPTY_MAP);
Store.RecoveryDiff recoveryDiff = first.recoveryDiff(second);
assertEquals(recoveryDiff.toString(), recoveryDiff.different.size(), 2);
@ -760,7 +774,7 @@ public class StoreTest extends ElasticsearchTestCase {
long time = new Date().getTime();
while(time == new Date().getTime()) {
while (time == new Date().getTime()) {
Thread.sleep(10); // bump the time
Store.MetadataSnapshot second;
@ -827,7 +841,7 @@ public class StoreTest extends ElasticsearchTestCase {
Store.RecoveryDiff afterDeleteDiff = metadata.recoveryDiff(second);
if (delFile != null) {
assertThat(afterDeleteDiff.identical.size(), equalTo(metadata.size()-2)); // segments_N + del file
assertThat(afterDeleteDiff.identical.size(), equalTo(metadata.size() - 2)); // segments_N + del file
assertThat(afterDeleteDiff.different.size(), equalTo(0));
assertThat(afterDeleteDiff.missing.size(), equalTo(2));
} else {
@ -856,7 +870,7 @@ public class StoreTest extends ElasticsearchTestCase {
Store.MetadataSnapshot newCommitMetaData = store.getMetadata();
Store.RecoveryDiff newCommitDiff = newCommitMetaData.recoveryDiff(metadata);
if (delFile != null) {
assertThat(newCommitDiff.identical.size(), equalTo(newCommitMetaData.size()-5)); // segments_N, del file, cfs, cfe, si for the new segment
assertThat(newCommitDiff.identical.size(), equalTo(newCommitMetaData.size() - 5)); // segments_N, del file, cfs, cfe, si for the new segment
assertThat(newCommitDiff.different.size(), equalTo(1)); // the del file must be different
assertThat(newCommitDiff.different.get(0).name(), endsWith(".liv"));
assertThat(newCommitDiff.missing.size(), equalTo(4)); // segments_N,cfs, cfe, si for the new segment
@ -883,7 +897,7 @@ public class StoreTest extends ElasticsearchTestCase {
int docs = 1 + random().nextInt(100);
int numCommits = 0;
for (int i = 0; i < docs; i++) {
if (i > 0 && randomIntBetween(0, 10 ) == 0) {
if (i > 0 && randomIntBetween(0, 10) == 0) {
@ -985,7 +999,7 @@ public class StoreTest extends ElasticsearchTestCase {
Map<String, StoreFileMetaData> metaDataMap = new HashMap<>();
metaDataMap.put("segments_1", new StoreFileMetaData("segments_1", 50, null, null, new BytesRef(new byte[]{1})));
metaDataMap.put("_0_1.del", new StoreFileMetaData("_0_1.del", 42, "foobarbaz", null, new BytesRef()));
Store.MetadataSnapshot snapshot = new Store.MetadataSnapshot(metaDataMap);
Store.MetadataSnapshot snapshot = new Store.MetadataSnapshot(metaDataMap, Collections.EMPTY_MAP);
final ShardId shardId = new ShardId(new Index("index"), 1);
DirectoryService directoryService = new LuceneManagedDirectoryService(random());
@ -1009,7 +1023,7 @@ public class StoreTest extends ElasticsearchTestCase {
final AtomicInteger count = new AtomicInteger(0);
final ShardLock lock = new DummyShardLock(shardId);
Store store = new Store(shardId, ImmutableSettings.EMPTY, directoryService, lock , new Store.OnClose() {
Store store = new Store(shardId, ImmutableSettings.EMPTY, directoryService, lock, new Store.OnClose() {
public void handle(ShardLock theLock) {
assertEquals(shardId, theLock.getShardId());
@ -1081,4 +1095,95 @@ public class StoreTest extends ElasticsearchTestCase {
return numNonExtra;
public void testMetadataSnapshotStreaming() throws Exception {
Store.MetadataSnapshot outMetadataSnapshot = createMetaDataSnapshot();
org.elasticsearch.Version targetNodeVersion = randomVersion(random());
ByteArrayOutputStream outBuffer = new ByteArrayOutputStream();
OutputStreamStreamOutput out = new OutputStreamStreamOutput(outBuffer);
ByteArrayInputStream inBuffer = new ByteArrayInputStream(outBuffer.toByteArray());
InputStreamStreamInput in = new InputStreamStreamInput(inBuffer);
Store.MetadataSnapshot inMetadataSnapshot = new Store.MetadataSnapshot(in);
Map<String, StoreFileMetaData> origEntries = new HashMap<>();
for (Map.Entry<String, StoreFileMetaData> entry : inMetadataSnapshot.asMap().entrySet()) {
assertThat(entry.getValue().name(), equalTo(origEntries.remove(entry.getKey()).name()));
assertThat(origEntries.size(), equalTo(0));
assertThat(inMetadataSnapshot.getCommitUserData(), equalTo(outMetadataSnapshot.getCommitUserData()));
protected Store.MetadataSnapshot createMetaDataSnapshot() {
StoreFileMetaData storeFileMetaData1 = new StoreFileMetaData("segments", 1);
StoreFileMetaData storeFileMetaData2 = new StoreFileMetaData("no_segments", 1);
Map<String, StoreFileMetaData> storeFileMetaDataMap = new HashMap<>();
storeFileMetaDataMap.put(, storeFileMetaData1);
storeFileMetaDataMap.put(, storeFileMetaData2);
Map<String, String> commitUserData = new HashMap<>();
commitUserData.put("userdata_1", "test");
commitUserData.put("userdata_2", "test");
return new Store.MetadataSnapshot(storeFileMetaDataMap, commitUserData);
public void testUserDataRead() throws IOException {
final ShardId shardId = new ShardId(new Index("index"), 1);
DirectoryService directoryService = new LuceneManagedDirectoryService(random());
Store store = new Store(shardId, ImmutableSettings.EMPTY, directoryService, new DummyShardLock(shardId));
IndexWriterConfig config = newIndexWriterConfig(random(), new MockAnalyzer(random())).setCodec(TestUtil.getDefaultCodec());
SnapshotDeletionPolicy deletionPolicy = new SnapshotDeletionPolicy(new KeepOnlyLastDeletionPolicy(shardId, EMPTY_SETTINGS));
IndexWriter writer = new IndexWriter(, config);
Document doc = new Document();
doc.add(new TextField("id", "1", Field.Store.NO));
Map<String, String> commitData = new HashMap<>(2);
String syncId = "a sync id";
String translogId = "a translog id";
commitData.put(Engine.SYNC_COMMIT_ID, syncId);
commitData.put(Translog.TRANSLOG_ID_KEY, translogId);
Store.MetadataSnapshot metadata;
if (randomBoolean()) {
metadata = store.getMetadata();
} else {
metadata = store.getMetadata(deletionPolicy.snapshot());
// do not check for correct files, we have enough tests for that above
assertThat(metadata.getCommitUserData().get(Engine.SYNC_COMMIT_ID), equalTo(syncId));
assertThat(metadata.getCommitUserData().get(Translog.TRANSLOG_ID_KEY), equalTo(translogId));
assertDeleteContent(store, directoryService);
public void testStreamStoreFilesMetaData() throws Exception {
Store.MetadataSnapshot metadataSnapshot = createMetaDataSnapshot();
TransportNodesListShardStoreMetaData.StoreFilesMetaData outStoreFileMetaData = new TransportNodesListShardStoreMetaData.StoreFilesMetaData(randomBoolean(), new ShardId("test", 0),metadataSnapshot);
ByteArrayOutputStream outBuffer = new ByteArrayOutputStream();
OutputStreamStreamOutput out = new OutputStreamStreamOutput(outBuffer);
org.elasticsearch.Version targetNodeVersion = randomVersion(random());
ByteArrayInputStream inBuffer = new ByteArrayInputStream(outBuffer.toByteArray());
InputStreamStreamInput in = new InputStreamStreamInput(inBuffer);
TransportNodesListShardStoreMetaData.StoreFilesMetaData inStoreFileMetaData = TransportNodesListShardStoreMetaData.StoreFilesMetaData.readStoreFilesMetaData(in);
Iterator<StoreFileMetaData> outFiles = outStoreFileMetaData.iterator();
for (StoreFileMetaData inFile : inStoreFileMetaData) {
assertThat(, equalTo(;
assertThat(outStoreFileMetaData.syncId(), equalTo(inStoreFileMetaData.syncId()));

View File

@ -19,12 +19,14 @@
package org.elasticsearch.indices.recovery;
import org.apache.lucene.index.IndexFileNames;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.transport.LocalTransportAddress;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.ElasticsearchTestCase;
import org.junit.Test;
@ -32,6 +34,8 @@ import org.junit.Test;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import static org.elasticsearch.test.VersionUtils.randomVersion;
import static org.hamcrest.Matchers.equalTo;
@ -49,7 +53,7 @@ public class StartRecoveryRequestTest extends ElasticsearchTestCase {
new DiscoveryNode("a", new LocalTransportAddress("1"), targetNodeVersion),
new DiscoveryNode("b", new LocalTransportAddress("1"), targetNodeVersion),
Collections.<String, StoreFileMetaData>emptyMap(),
@ -69,11 +73,9 @@ public class StartRecoveryRequestTest extends ElasticsearchTestCase {
assertThat(outRequest.sourceNode(), equalTo(inRequest.sourceNode()));
assertThat(outRequest.targetNode(), equalTo(inRequest.targetNode()));
assertThat(outRequest.markAsRelocated(), equalTo(inRequest.markAsRelocated()));
assertThat(outRequest.existingFiles(), equalTo(inRequest.existingFiles()));
assertThat(outRequest.metadataSnapshot().asMap(), equalTo(inRequest.metadataSnapshot().asMap()));
assertThat(outRequest.recoveryId(), equalTo(inRequest.recoveryId()));
assertThat(outRequest.recoveryType(), equalTo(inRequest.recoveryType()));