diff --git a/core/src/main/java/org/elasticsearch/index/store/Store.java b/core/src/main/java/org/elasticsearch/index/store/Store.java index 82c6a2db6d7..61d970c5594 100644 --- a/core/src/main/java/org/elasticsearch/index/store/Store.java +++ b/core/src/main/java/org/elasticsearch/index/store/Store.java @@ -1322,9 +1322,11 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref } public void deleteQuiet(String... files) { + ensureOpen(); + StoreDirectory directory = this.directory; for (String file : files) { try { - directory().deleteFile(file); + directory.deleteFile("Store.deleteQuiet", file); } catch (Exception ex) { // ignore :( } diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveriesCollection.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveriesCollection.java index 19d7a4edf93..c7aa8287875 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveriesCollection.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveriesCollection.java @@ -30,6 +30,8 @@ import org.elasticsearch.index.shard.IndexShardClosedException; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.threadpool.ThreadPool; +import java.io.IOException; +import java.io.UncheckedIOException; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicBoolean; @@ -68,6 +70,27 @@ public class RecoveriesCollection { return status.recoveryId(); } + + /** + * Resets the recovery and performs a recovery restart on the currently recovering index shard + * + * @see IndexShard#performRecoveryRestart() + */ + public void resetRecovery(long id, ShardId shardId) throws IOException { + try (RecoveryRef ref = getRecoverySafe(id, shardId)) { + // instead of adding complicated state to RecoveryTarget we just flip the + // target instance when we reset a recovery, that way we have only one cleanup + // path on the RecoveryTarget and are always within the bounds of ref-counting + // which is important since we verify files are on disk etc. after we have written them etc. + RecoveryTarget status = ref.status(); + RecoveryTarget resetRecovery = status.resetRecovery(); + if (onGoingRecoveries.replace(id, status, resetRecovery) == false) { + resetRecovery.cancel("replace failed"); + throw new IllegalStateException("failed to replace recovery target"); + } + } + } + /** * gets the {@link RecoveryTarget } for a given id. The RecoveryStatus returned has it's ref count already incremented * to make sure it's safe to use. However, you must call {@link RecoveryTarget#decRef()} when you are done with it, typically diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java index 16cf36fcfa9..fe8c7abdb82 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java @@ -29,6 +29,7 @@ import org.apache.lucene.util.BytesRefIterator; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.Loggers; @@ -78,27 +79,36 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget private final AtomicBoolean finished = new AtomicBoolean(); private final ConcurrentMap openIndexOutputs = ConcurrentCollections.newConcurrentMap(); - private final CancellableThreads cancellableThreads = new CancellableThreads(); + private final CancellableThreads cancellableThreads; // last time this status was accessed private volatile long lastAccessTime = System.nanoTime(); private final Map tempFileNames = ConcurrentCollections.newConcurrentMap(); - public RecoveryTarget(IndexShard indexShard, DiscoveryNode sourceNode, RecoveryTargetService.RecoveryListener listener) { + private RecoveryTarget(RecoveryTarget copyFrom) { // copy constructor + this(copyFrom.indexShard(), copyFrom.sourceNode(), copyFrom.listener, copyFrom.cancellableThreads, copyFrom.recoveryId()); + } + public RecoveryTarget(IndexShard indexShard, DiscoveryNode sourceNode, RecoveryTargetService.RecoveryListener listener) { + this(indexShard, sourceNode, listener, new CancellableThreads(), idGenerator.incrementAndGet()); + } + + private RecoveryTarget(IndexShard indexShard, DiscoveryNode sourceNode, RecoveryTargetService.RecoveryListener listener, + CancellableThreads cancellableThreads, long recoveryId) { super("recovery_status"); - this.recoveryId = idGenerator.incrementAndGet(); + this.cancellableThreads = cancellableThreads; + this.recoveryId = recoveryId; this.listener = listener; this.logger = Loggers.getLogger(getClass(), indexShard.indexSettings().getSettings(), indexShard.shardId()); this.indexShard = indexShard; this.sourceNode = sourceNode; this.shardId = indexShard.shardId(); - this.tempFilePrefix = RECOVERY_PREFIX + indexShard.recoveryState().getTimer().startTime() + "."; + this.tempFilePrefix = RECOVERY_PREFIX + UUIDs.base64UUID() + "."; this.store = indexShard.store(); - indexShard.recoveryStats().incCurrentAsTarget(); // make sure the store is not released until we are done. store.incRef(); + indexShard.recoveryStats().incCurrentAsTarget(); } public long recoveryId() { @@ -151,6 +161,21 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget store.renameTempFilesSafe(tempFileNames); } + /** + * Closes the current recovery target and returns a + * clone to reset the ongoing recovery + */ + RecoveryTarget resetRecovery() throws IOException { + ensureRefCount(); + RecoveryTarget copy = new RecoveryTarget(this); + if (finished.compareAndSet(false, true)) { + // release the initial reference. recovery files will be cleaned as soon as ref count goes to zero, potentially now + decRef(); + } + indexShard.performRecoveryRestart(); + return copy; + } + /** * cancel the recovery. calling this method will clean temporary files and release the store * unless this object is in use (in which case it will be cleaned once all ongoing users call @@ -243,15 +268,26 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget return indexOutput; } - public void resetRecovery() throws IOException { - cleanOpenFiles(); - indexShard().performRecoveryRestart(); - } - @Override protected void closeInternal() { try { - cleanOpenFiles(); + // clean open index outputs + Iterator> iterator = openIndexOutputs.entrySet().iterator(); + while (iterator.hasNext()) { + Map.Entry entry = iterator.next(); + logger.trace("closing IndexOutput file [{}]", entry.getValue()); + try { + entry.getValue().close(); + } catch (Exception e) { + logger.debug("error while closing recovery output [{}]", e, entry.getValue()); + } + iterator.remove(); + } + // trash temporary files + for (String file : tempFileNames.keySet()) { + logger.trace("cleaning temporary file [{}]", file); + store.deleteQuiet(file); + } } finally { // free store. increment happens in constructor store.decRef(); @@ -259,26 +295,6 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget } } - protected void cleanOpenFiles() { - // clean open index outputs - Iterator> iterator = openIndexOutputs.entrySet().iterator(); - while (iterator.hasNext()) { - Map.Entry entry = iterator.next(); - logger.trace("closing IndexOutput file [{}]", entry.getValue()); - try { - entry.getValue().close(); - } catch (Exception e) { - logger.debug("error while closing recovery output [{}]", e, entry.getValue()); - } - iterator.remove(); - } - // trash temporary files - for (String file : tempFileNames.keySet()) { - logger.trace("cleaning temporary file [{}]", file); - store.deleteQuiet(file); - } - } - @Override public String toString() { return shardId + " [" + recoveryId + "]"; @@ -394,23 +410,11 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget indexOutput.close(); } final String temporaryFileName = getTempNameForFile(name); - assert assertTempFileExists(temporaryFileName); + assert Arrays.asList(store.directory().listAll()).contains(temporaryFileName) : + "expected: [" + temporaryFileName + "] in " + Arrays.toString(store.directory().listAll()); store.directory().sync(Collections.singleton(temporaryFileName)); IndexOutput remove = removeOpenIndexOutputs(name); assert remove == null || remove == indexOutput; // remove maybe null if we got finished } } - - private boolean assertTempFileExists(String temporaryFileName) throws IOException { - try { - assert Arrays.asList(store.directory().listAll()).contains(temporaryFileName) : - "expected: [" + temporaryFileName + "] in " + Arrays.toString(store.directory().listAll()); - } catch (AssertionError error) { - if (finished.get() == false) { - // if we got canceled stuff might not be here anymore.. - throw error; - } - } - return true; - } } diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetService.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetService.java index d54d7859089..e641d199ab4 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetService.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetService.java @@ -150,7 +150,7 @@ public class RecoveryTargetService extends AbstractComponent implements IndexEve private void retryRecovery(final RecoveryTarget recoveryTarget, TimeValue retryAfter, final StartRecoveryRequest currentRequest) { try { - recoveryTarget.resetRecovery(); + onGoingRecoveries.resetRecovery(recoveryTarget.recoveryId(), recoveryTarget.shardId()); } catch (Exception e) { onGoingRecoveries.failRecovery(recoveryTarget.recoveryId(), new RecoveryFailedException(currentRequest, e), true); } diff --git a/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java b/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java index 2ccae5a287b..d7afd095d01 100644 --- a/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java +++ b/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java @@ -29,6 +29,8 @@ import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.flush.FlushRequest; +import org.elasticsearch.action.admin.indices.recovery.RecoveryRequest; +import org.elasticsearch.action.admin.indices.stats.IndexShardStats; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.index.TransportIndexAction; @@ -61,6 +63,7 @@ import org.elasticsearch.index.mapper.Uid; import org.elasticsearch.index.mapper.internal.UidFieldMapper; import org.elasticsearch.index.shard.IndexEventListener; import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.IndexShardState; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardPath; import org.elasticsearch.index.similarity.SimilarityService; @@ -106,7 +109,7 @@ import static org.hamcrest.Matchers.equalTo; public abstract class ESIndexLevelReplicationTestCase extends ESTestCase { - private ThreadPool threadPool; + protected ThreadPool threadPool; private final Index index = new Index("test", "uuid"); private final ShardId shardId = new ShardId(index, 0); private final Map indexMapping = Collections.singletonMap("type", "{ \"type\": {} }"); @@ -160,14 +163,15 @@ public abstract class ESIndexLevelReplicationTestCase extends ESTestCase { } } - - @Before - public void setup() { + @Override + public void setUp() throws Exception { + super.setUp(); threadPool = new TestThreadPool(getClass().getName()); } - @After - public void destroy() { + @Override + public void tearDown() throws Exception { + super.tearDown(); ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS); } @@ -197,8 +201,8 @@ public abstract class ESIndexLevelReplicationTestCase extends ESTestCase { return new ReplicationGroup(metaData, homePath); } - private DiscoveryNode getDiscoveryNode(String id) { - return new DiscoveryNode(id, id, LocalTransportAddress.buildUnique(), Collections.emptyMap(), + protected DiscoveryNode getDiscoveryNode(String id) { + return new DiscoveryNode(id, id, new LocalTransportAddress(id), Collections.emptyMap(), Collections.singleton(DiscoveryNode.Role.DATA), Version.CURRENT); } @@ -227,7 +231,7 @@ public abstract class ESIndexLevelReplicationTestCase extends ESTestCase { } - class ReplicationGroup implements AutoCloseable, Iterable { + protected class ReplicationGroup implements AutoCloseable, Iterable { private final IndexShard primary; private final List replicas; private final IndexMetaData indexMetaData; @@ -279,15 +283,21 @@ public abstract class ESIndexLevelReplicationTestCase extends ESTestCase { replicas.add(replica); return replica; } - public void recoverReplica(IndexShard replica, BiFunction targetSupplier) throws IOException { - final DiscoveryNode pNode; - synchronized (this) { - pNode = getDiscoveryNode(primary.routingEntry().currentNodeId()); - } + recoverReplica(replica, targetSupplier, true); + } + + public void recoverReplica(IndexShard replica, BiFunction targetSupplier, + boolean markAsRecovering) + throws IOException { + final DiscoveryNode pNode = getPrimaryNode(); final DiscoveryNode rNode = getDiscoveryNode(replica.routingEntry().currentNodeId()); - replica.markAsRecovering("remote", new RecoveryState(replica.shardId(), false, RecoveryState.Type.REPLICA, pNode, rNode)); + if (markAsRecovering) { + replica.markAsRecovering("remote", new RecoveryState(replica.shardId(), false, RecoveryState.Type.REPLICA, pNode, rNode)); + } else { + assertEquals(replica.state(), IndexShardState.RECOVERING); + } replica.prepareForIndexRecovery(); RecoveryTarget recoveryTarget = targetSupplier.apply(replica, pNode); StartRecoveryRequest request = new StartRecoveryRequest(replica.shardId(), pNode, rNode, @@ -299,6 +309,10 @@ public abstract class ESIndexLevelReplicationTestCase extends ESTestCase { replica.updateRoutingEntry(ShardRoutingHelper.moveToStarted(replica.routingEntry())); } + public synchronized DiscoveryNode getPrimaryNode() { + return getDiscoveryNode(primary.routingEntry().currentNodeId()); + } + public Future asyncRecoverReplica(IndexShard replica, BiFunction targetSupplier) throws IOException { FutureTask task = new FutureTask<>(() -> { @@ -375,6 +389,10 @@ public abstract class ESIndexLevelReplicationTestCase extends ESTestCase { public Iterator iterator() { return Iterators.concat(replicas.iterator(), Collections.singleton(primary).iterator()); } + + public IndexShard getPrimary() { + return primary; + } } class IndexingOp extends ReplicationOperation { diff --git a/core/src/test/java/org/elasticsearch/indices/recovery/RecoveryStatusTests.java b/core/src/test/java/org/elasticsearch/indices/recovery/RecoveryStatusTests.java index be94f236bc9..fde14c4825c 100644 --- a/core/src/test/java/org/elasticsearch/indices/recovery/RecoveryStatusTests.java +++ b/core/src/test/java/org/elasticsearch/indices/recovery/RecoveryStatusTests.java @@ -36,8 +36,6 @@ import java.util.regex.Pattern; import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; -/** - */ public class RecoveryStatusTests extends ESSingleNodeTestCase { public void testRenameTempFiles() throws IOException { @@ -73,7 +71,7 @@ public class RecoveryStatusTests extends ESSingleNodeTestCase { Set strings = Sets.newHashSet(status.store().directory().listAll()); String expectedFile = null; for (String file : strings) { - if (Pattern.compile("recovery[.]\\d+[.]foo[.]bar").matcher(file).matches()) { + if (Pattern.compile("recovery[.][\\w-]+[.]foo[.]bar").matcher(file).matches()) { expectedFile = file; break; } diff --git a/core/src/test/java/org/elasticsearch/recovery/RecoveriesCollectionTests.java b/core/src/test/java/org/elasticsearch/recovery/RecoveriesCollectionTests.java index d56e1341165..bc323ecec8f 100644 --- a/core/src/test/java/org/elasticsearch/recovery/RecoveriesCollectionTests.java +++ b/core/src/test/java/org/elasticsearch/recovery/RecoveriesCollectionTests.java @@ -18,12 +18,15 @@ */ package org.elasticsearch.recovery; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.Version; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.routing.TestShardRouting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.LocalTransportAddress; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.index.replication.ESIndexLevelReplicationTestCase; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.IndicesService; @@ -32,8 +35,12 @@ import org.elasticsearch.indices.recovery.RecoveryFailedException; import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.indices.recovery.RecoveryTargetService; import org.elasticsearch.test.ESSingleNodeTestCase; +import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; +import org.junit.After; +import org.junit.Before; +import java.io.IOException; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -43,7 +50,7 @@ import static java.util.Collections.emptySet; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.lessThan; -public class RecoveriesCollectionTests extends ESSingleNodeTestCase { +public class RecoveriesCollectionTests extends ESIndexLevelReplicationTestCase { static final RecoveryTargetService.RecoveryListener listener = new RecoveryTargetService.RecoveryListener() { @Override public void onRecoveryDone(RecoveryState state) { @@ -57,83 +64,115 @@ public class RecoveriesCollectionTests extends ESSingleNodeTestCase { }; public void testLastAccessTimeUpdate() throws Exception { - createIndex(); - final RecoveriesCollection collection = new RecoveriesCollection(logger, getInstanceFromNode(ThreadPool.class)); - final long recoveryId = startRecovery(collection); - try (RecoveriesCollection.RecoveryRef status = collection.getRecovery(recoveryId)) { - final long lastSeenTime = status.status().lastAccessTime(); - assertBusy(new Runnable() { - @Override - public void run() { + try (ReplicationGroup shards = createGroup(0)) { + final RecoveriesCollection collection = new RecoveriesCollection(logger, threadPool); + final long recoveryId = startRecovery(collection, shards.getPrimaryNode(), shards.addReplica()); + try (RecoveriesCollection.RecoveryRef status = collection.getRecovery(recoveryId)) { + final long lastSeenTime = status.status().lastAccessTime(); + assertBusy(() -> { try (RecoveriesCollection.RecoveryRef currentStatus = collection.getRecovery(recoveryId)) { assertThat("access time failed to update", lastSeenTime, lessThan(currentStatus.status().lastAccessTime())); } - } - }); - } finally { - collection.cancelRecovery(recoveryId, "life"); + }); + } finally { + collection.cancelRecovery(recoveryId, "life"); + } } } - public void testRecoveryTimeout() throws InterruptedException { - createIndex(); - final RecoveriesCollection collection = new RecoveriesCollection(logger, getInstanceFromNode(ThreadPool.class)); - final AtomicBoolean failed = new AtomicBoolean(); - final CountDownLatch latch = new CountDownLatch(1); - final long recoveryId = startRecovery(collection, new RecoveryTargetService.RecoveryListener() { - @Override - public void onRecoveryDone(RecoveryState state) { - latch.countDown(); - } + public void testRecoveryTimeout() throws Exception { + try (ReplicationGroup shards = createGroup(0)) { + final RecoveriesCollection collection = new RecoveriesCollection(logger, threadPool); + final AtomicBoolean failed = new AtomicBoolean(); + final CountDownLatch latch = new CountDownLatch(1); + final long recoveryId = startRecovery(collection, shards.getPrimaryNode(), shards.addReplica(), + new RecoveryTargetService.RecoveryListener() { + @Override + public void onRecoveryDone(RecoveryState state) { + latch.countDown(); + } - @Override - public void onRecoveryFailure(RecoveryState state, RecoveryFailedException e, boolean sendShardFailure) { - failed.set(true); - latch.countDown(); + @Override + public void onRecoveryFailure(RecoveryState state, RecoveryFailedException e, boolean sendShardFailure) { + failed.set(true); + latch.countDown(); + } + }, TimeValue.timeValueMillis(100)); + try { + latch.await(30, TimeUnit.SECONDS); + assertTrue("recovery failed to timeout", failed.get()); + } finally { + collection.cancelRecovery(recoveryId, "meh"); } - }, TimeValue.timeValueMillis(100)); - try { - latch.await(30, TimeUnit.SECONDS); - assertTrue("recovery failed to timeout", failed.get()); - } finally { - collection.cancelRecovery(recoveryId, "meh"); } } public void testRecoveryCancellation() throws Exception { - createIndex(); - final RecoveriesCollection collection = new RecoveriesCollection(logger, getInstanceFromNode(ThreadPool.class)); - final long recoveryId = startRecovery(collection); - final long recoveryId2 = startRecovery(collection); - try (RecoveriesCollection.RecoveryRef recoveryRef = collection.getRecovery(recoveryId)) { - ShardId shardId = recoveryRef.status().shardId(); - assertTrue("failed to cancel recoveries", collection.cancelRecoveriesForShard(shardId, "test")); - assertThat("all recoveries should be cancelled", collection.size(), equalTo(0)); - } finally { - collection.cancelRecovery(recoveryId, "meh"); - collection.cancelRecovery(recoveryId2, "meh"); + try (ReplicationGroup shards = createGroup(0)) { + final RecoveriesCollection collection = new RecoveriesCollection(logger, threadPool); + final long recoveryId = startRecovery(collection, shards.getPrimaryNode(), shards.addReplica()); + final long recoveryId2 = startRecovery(collection, shards.getPrimaryNode(), shards.addReplica()); + try (RecoveriesCollection.RecoveryRef recoveryRef = collection.getRecovery(recoveryId)) { + ShardId shardId = recoveryRef.status().shardId(); + assertTrue("failed to cancel recoveries", collection.cancelRecoveriesForShard(shardId, "test")); + assertThat("all recoveries should be cancelled", collection.size(), equalTo(0)); + } finally { + collection.cancelRecovery(recoveryId, "meh"); + collection.cancelRecovery(recoveryId2, "meh"); + } } } - protected void createIndex() { - createIndex("test", - Settings.builder() - .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1, IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) - .build()); - ensureGreen(); + public void testResetRecovery() throws Exception { + try (ReplicationGroup shards = createGroup(0)) { + shards.startAll(); + int numDocs = randomIntBetween(1, 15); + shards.indexDocs(numDocs); + final RecoveriesCollection collection = new RecoveriesCollection(logger, threadPool); + IndexShard shard = shards.addReplica(); + final long recoveryId = startRecovery(collection, shards.getPrimaryNode(), shard); + try (RecoveriesCollection.RecoveryRef recovery = collection.getRecovery(recoveryId)) { + final int currentAsTarget = shard.recoveryStats().currentAsTarget(); + final int referencesToStore = recovery.status().store().refCount(); + String tempFileName = recovery.status().getTempNameForFile("foobar"); + collection.resetRecovery(recoveryId, recovery.status().shardId()); + try (RecoveriesCollection.RecoveryRef resetRecovery = collection.getRecovery(recoveryId)) { + assertNotSame(recovery.status(), resetRecovery); + assertSame(recovery.status().CancellableThreads(), resetRecovery.status().CancellableThreads()); + assertSame(recovery.status().indexShard(), resetRecovery.status().indexShard()); + assertSame(recovery.status().store(), resetRecovery.status().store()); + assertEquals(referencesToStore + 1, resetRecovery.status().store().refCount()); + assertEquals(currentAsTarget+1, shard.recoveryStats().currentAsTarget()); // we blink for a short moment... + recovery.close(); + expectThrows(ElasticsearchException.class, () -> recovery.status().store()); + assertEquals(referencesToStore, resetRecovery.status().store().refCount()); + String resetTempFileName = resetRecovery.status().getTempNameForFile("foobar"); + assertNotEquals(tempFileName, resetTempFileName); + } + assertEquals(currentAsTarget, shard.recoveryStats().currentAsTarget()); + } + try (RecoveriesCollection.RecoveryRef resetRecovery = collection.getRecovery(recoveryId)) { + shards.recoverReplica(shard, (s, n) -> { + assertSame(s, resetRecovery.status().indexShard()); + return resetRecovery.status(); + }, false); + } + shards.assertAllEqual(numDocs); + assertNull("recovery is done", collection.getRecovery(recoveryId)); + } } - - long startRecovery(RecoveriesCollection collection) { - return startRecovery(collection, listener, TimeValue.timeValueMinutes(60)); + long startRecovery(RecoveriesCollection collection, DiscoveryNode sourceNode, IndexShard shard) { + return startRecovery(collection,sourceNode, shard, listener, TimeValue.timeValueMinutes(60)); } - long startRecovery(RecoveriesCollection collection, RecoveryTargetService.RecoveryListener listener, TimeValue timeValue) { - IndicesService indexServices = getInstanceFromNode(IndicesService.class); - IndexShard indexShard = indexServices.indexServiceSafe(resolveIndex("test")).getShardOrNull(0); - final DiscoveryNode sourceNode = new DiscoveryNode("id", LocalTransportAddress.buildUnique(), emptyMap(), emptySet(), - Version.CURRENT); + long startRecovery(RecoveriesCollection collection, DiscoveryNode sourceNode, IndexShard indexShard, + RecoveryTargetService.RecoveryListener listener, TimeValue timeValue) { + final DiscoveryNode rNode = getDiscoveryNode(indexShard.routingEntry().currentNodeId()); + indexShard.markAsRecovering("remote", new RecoveryState(indexShard.shardId(), false, RecoveryState.Type.REPLICA, sourceNode, + rNode)); + indexShard.prepareForIndexRecovery(); return collection.startRecovery(indexShard, sourceNode, listener, timeValue); } }