Move `reset recovery` into RecoveriesCollection (#19466)

Today when we reset a recovery because of the source not being
ready or the shard is getting removed on the source (for whatever reason)
we wipe all temp files and reset the recovery without respecting any
reference counting or locking etc. all streams are closed and files are
wiped. Yet, this is problematic since we assert that some files are on disk
etc. when we finish writing a file. These assertions don't hold anymore if we
concurrently wipe the tmp files.

This change moves the logic out of RecoveryTarget into RecoveriesCollection which
basically clones the RecoveryTarget on reset instead which allows in-flight operations
to finish gracefully. This means we now have a single path for cleanups in RecoveryTarget
and can safely use assertions in the class since files won't be removed unless the recovery
is either canceled, failed or finished.

Closes  #19473
This commit is contained in:
Simon Willnauer 2016-07-19 10:23:02 +02:00 committed by GitHub
parent 37e20c6f34
commit 5b07f81fcf
7 changed files with 207 additions and 123 deletions

View File

@ -1322,9 +1322,11 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
} }
public void deleteQuiet(String... files) { public void deleteQuiet(String... files) {
ensureOpen();
StoreDirectory directory = this.directory;
for (String file : files) { for (String file : files) {
try { try {
directory().deleteFile(file); directory.deleteFile("Store.deleteQuiet", file);
} catch (Exception ex) { } catch (Exception ex) {
// ignore :( // ignore :(
} }

View File

@ -30,6 +30,8 @@ import org.elasticsearch.index.shard.IndexShardClosedException;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
@ -68,6 +70,27 @@ public class RecoveriesCollection {
return status.recoveryId(); return status.recoveryId();
} }
/**
* Resets the recovery and performs a recovery restart on the currently recovering index shard
*
* @see IndexShard#performRecoveryRestart()
*/
public void resetRecovery(long id, ShardId shardId) throws IOException {
try (RecoveryRef ref = getRecoverySafe(id, shardId)) {
// instead of adding complicated state to RecoveryTarget we just flip the
// target instance when we reset a recovery, that way we have only one cleanup
// path on the RecoveryTarget and are always within the bounds of ref-counting
// which is important since we verify files are on disk etc. after we have written them etc.
RecoveryTarget status = ref.status();
RecoveryTarget resetRecovery = status.resetRecovery();
if (onGoingRecoveries.replace(id, status, resetRecovery) == false) {
resetRecovery.cancel("replace failed");
throw new IllegalStateException("failed to replace recovery target");
}
}
}
/** /**
* gets the {@link RecoveryTarget } for a given id. The RecoveryStatus returned has it's ref count already incremented * gets the {@link RecoveryTarget } for a given id. The RecoveryStatus returned has it's ref count already incremented
* to make sure it's safe to use. However, you must call {@link RecoveryTarget#decRef()} when you are done with it, typically * to make sure it's safe to use. However, you must call {@link RecoveryTarget#decRef()} when you are done with it, typically

View File

@ -29,6 +29,7 @@ import org.apache.lucene.util.BytesRefIterator;
import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.logging.Loggers;
@ -78,27 +79,36 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget
private final AtomicBoolean finished = new AtomicBoolean(); private final AtomicBoolean finished = new AtomicBoolean();
private final ConcurrentMap<String, IndexOutput> openIndexOutputs = ConcurrentCollections.newConcurrentMap(); private final ConcurrentMap<String, IndexOutput> openIndexOutputs = ConcurrentCollections.newConcurrentMap();
private final CancellableThreads cancellableThreads = new CancellableThreads(); private final CancellableThreads cancellableThreads;
// last time this status was accessed // last time this status was accessed
private volatile long lastAccessTime = System.nanoTime(); private volatile long lastAccessTime = System.nanoTime();
private final Map<String, String> tempFileNames = ConcurrentCollections.newConcurrentMap(); private final Map<String, String> tempFileNames = ConcurrentCollections.newConcurrentMap();
public RecoveryTarget(IndexShard indexShard, DiscoveryNode sourceNode, RecoveryTargetService.RecoveryListener listener) { private RecoveryTarget(RecoveryTarget copyFrom) { // copy constructor
this(copyFrom.indexShard(), copyFrom.sourceNode(), copyFrom.listener, copyFrom.cancellableThreads, copyFrom.recoveryId());
}
public RecoveryTarget(IndexShard indexShard, DiscoveryNode sourceNode, RecoveryTargetService.RecoveryListener listener) {
this(indexShard, sourceNode, listener, new CancellableThreads(), idGenerator.incrementAndGet());
}
private RecoveryTarget(IndexShard indexShard, DiscoveryNode sourceNode, RecoveryTargetService.RecoveryListener listener,
CancellableThreads cancellableThreads, long recoveryId) {
super("recovery_status"); super("recovery_status");
this.recoveryId = idGenerator.incrementAndGet(); this.cancellableThreads = cancellableThreads;
this.recoveryId = recoveryId;
this.listener = listener; this.listener = listener;
this.logger = Loggers.getLogger(getClass(), indexShard.indexSettings().getSettings(), indexShard.shardId()); this.logger = Loggers.getLogger(getClass(), indexShard.indexSettings().getSettings(), indexShard.shardId());
this.indexShard = indexShard; this.indexShard = indexShard;
this.sourceNode = sourceNode; this.sourceNode = sourceNode;
this.shardId = indexShard.shardId(); this.shardId = indexShard.shardId();
this.tempFilePrefix = RECOVERY_PREFIX + indexShard.recoveryState().getTimer().startTime() + "."; this.tempFilePrefix = RECOVERY_PREFIX + UUIDs.base64UUID() + ".";
this.store = indexShard.store(); this.store = indexShard.store();
indexShard.recoveryStats().incCurrentAsTarget();
// make sure the store is not released until we are done. // make sure the store is not released until we are done.
store.incRef(); store.incRef();
indexShard.recoveryStats().incCurrentAsTarget();
} }
public long recoveryId() { public long recoveryId() {
@ -151,6 +161,21 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget
store.renameTempFilesSafe(tempFileNames); store.renameTempFilesSafe(tempFileNames);
} }
/**
* Closes the current recovery target and returns a
* clone to reset the ongoing recovery
*/
RecoveryTarget resetRecovery() throws IOException {
ensureRefCount();
RecoveryTarget copy = new RecoveryTarget(this);
if (finished.compareAndSet(false, true)) {
// release the initial reference. recovery files will be cleaned as soon as ref count goes to zero, potentially now
decRef();
}
indexShard.performRecoveryRestart();
return copy;
}
/** /**
* cancel the recovery. calling this method will clean temporary files and release the store * cancel the recovery. calling this method will clean temporary files and release the store
* unless this object is in use (in which case it will be cleaned once all ongoing users call * unless this object is in use (in which case it will be cleaned once all ongoing users call
@ -243,15 +268,26 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget
return indexOutput; return indexOutput;
} }
public void resetRecovery() throws IOException {
cleanOpenFiles();
indexShard().performRecoveryRestart();
}
@Override @Override
protected void closeInternal() { protected void closeInternal() {
try { try {
cleanOpenFiles(); // clean open index outputs
Iterator<Entry<String, IndexOutput>> iterator = openIndexOutputs.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<String, IndexOutput> entry = iterator.next();
logger.trace("closing IndexOutput file [{}]", entry.getValue());
try {
entry.getValue().close();
} catch (Exception e) {
logger.debug("error while closing recovery output [{}]", e, entry.getValue());
}
iterator.remove();
}
// trash temporary files
for (String file : tempFileNames.keySet()) {
logger.trace("cleaning temporary file [{}]", file);
store.deleteQuiet(file);
}
} finally { } finally {
// free store. increment happens in constructor // free store. increment happens in constructor
store.decRef(); store.decRef();
@ -259,26 +295,6 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget
} }
} }
protected void cleanOpenFiles() {
// clean open index outputs
Iterator<Entry<String, IndexOutput>> iterator = openIndexOutputs.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<String, IndexOutput> entry = iterator.next();
logger.trace("closing IndexOutput file [{}]", entry.getValue());
try {
entry.getValue().close();
} catch (Exception e) {
logger.debug("error while closing recovery output [{}]", e, entry.getValue());
}
iterator.remove();
}
// trash temporary files
for (String file : tempFileNames.keySet()) {
logger.trace("cleaning temporary file [{}]", file);
store.deleteQuiet(file);
}
}
@Override @Override
public String toString() { public String toString() {
return shardId + " [" + recoveryId + "]"; return shardId + " [" + recoveryId + "]";
@ -394,23 +410,11 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget
indexOutput.close(); indexOutput.close();
} }
final String temporaryFileName = getTempNameForFile(name); final String temporaryFileName = getTempNameForFile(name);
assert assertTempFileExists(temporaryFileName); assert Arrays.asList(store.directory().listAll()).contains(temporaryFileName) :
"expected: [" + temporaryFileName + "] in " + Arrays.toString(store.directory().listAll());
store.directory().sync(Collections.singleton(temporaryFileName)); store.directory().sync(Collections.singleton(temporaryFileName));
IndexOutput remove = removeOpenIndexOutputs(name); IndexOutput remove = removeOpenIndexOutputs(name);
assert remove == null || remove == indexOutput; // remove maybe null if we got finished assert remove == null || remove == indexOutput; // remove maybe null if we got finished
} }
} }
private boolean assertTempFileExists(String temporaryFileName) throws IOException {
try {
assert Arrays.asList(store.directory().listAll()).contains(temporaryFileName) :
"expected: [" + temporaryFileName + "] in " + Arrays.toString(store.directory().listAll());
} catch (AssertionError error) {
if (finished.get() == false) {
// if we got canceled stuff might not be here anymore..
throw error;
}
}
return true;
}
} }

View File

@ -150,7 +150,7 @@ public class RecoveryTargetService extends AbstractComponent implements IndexEve
private void retryRecovery(final RecoveryTarget recoveryTarget, TimeValue retryAfter, final StartRecoveryRequest currentRequest) { private void retryRecovery(final RecoveryTarget recoveryTarget, TimeValue retryAfter, final StartRecoveryRequest currentRequest) {
try { try {
recoveryTarget.resetRecovery(); onGoingRecoveries.resetRecovery(recoveryTarget.recoveryId(), recoveryTarget.shardId());
} catch (Exception e) { } catch (Exception e) {
onGoingRecoveries.failRecovery(recoveryTarget.recoveryId(), new RecoveryFailedException(currentRequest, e), true); onGoingRecoveries.failRecovery(recoveryTarget.recoveryId(), new RecoveryFailedException(currentRequest, e), true);
} }

View File

@ -29,6 +29,8 @@ import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version; import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.flush.FlushRequest; import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.admin.indices.recovery.RecoveryRequest;
import org.elasticsearch.action.admin.indices.stats.IndexShardStats;
import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.index.TransportIndexAction; import org.elasticsearch.action.index.TransportIndexAction;
@ -61,6 +63,7 @@ import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.mapper.internal.UidFieldMapper; import org.elasticsearch.index.mapper.internal.UidFieldMapper;
import org.elasticsearch.index.shard.IndexEventListener; import org.elasticsearch.index.shard.IndexEventListener;
import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardPath; import org.elasticsearch.index.shard.ShardPath;
import org.elasticsearch.index.similarity.SimilarityService; import org.elasticsearch.index.similarity.SimilarityService;
@ -106,7 +109,7 @@ import static org.hamcrest.Matchers.equalTo;
public abstract class ESIndexLevelReplicationTestCase extends ESTestCase { public abstract class ESIndexLevelReplicationTestCase extends ESTestCase {
private ThreadPool threadPool; protected ThreadPool threadPool;
private final Index index = new Index("test", "uuid"); private final Index index = new Index("test", "uuid");
private final ShardId shardId = new ShardId(index, 0); private final ShardId shardId = new ShardId(index, 0);
private final Map<String, String> indexMapping = Collections.singletonMap("type", "{ \"type\": {} }"); private final Map<String, String> indexMapping = Collections.singletonMap("type", "{ \"type\": {} }");
@ -160,14 +163,15 @@ public abstract class ESIndexLevelReplicationTestCase extends ESTestCase {
} }
} }
@Override
@Before public void setUp() throws Exception {
public void setup() { super.setUp();
threadPool = new TestThreadPool(getClass().getName()); threadPool = new TestThreadPool(getClass().getName());
} }
@After @Override
public void destroy() { public void tearDown() throws Exception {
super.tearDown();
ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS); ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS);
} }
@ -197,8 +201,8 @@ public abstract class ESIndexLevelReplicationTestCase extends ESTestCase {
return new ReplicationGroup(metaData, homePath); return new ReplicationGroup(metaData, homePath);
} }
private DiscoveryNode getDiscoveryNode(String id) { protected DiscoveryNode getDiscoveryNode(String id) {
return new DiscoveryNode(id, id, LocalTransportAddress.buildUnique(), Collections.emptyMap(), return new DiscoveryNode(id, id, new LocalTransportAddress(id), Collections.emptyMap(),
Collections.singleton(DiscoveryNode.Role.DATA), Version.CURRENT); Collections.singleton(DiscoveryNode.Role.DATA), Version.CURRENT);
} }
@ -227,7 +231,7 @@ public abstract class ESIndexLevelReplicationTestCase extends ESTestCase {
} }
class ReplicationGroup implements AutoCloseable, Iterable<IndexShard> { protected class ReplicationGroup implements AutoCloseable, Iterable<IndexShard> {
private final IndexShard primary; private final IndexShard primary;
private final List<IndexShard> replicas; private final List<IndexShard> replicas;
private final IndexMetaData indexMetaData; private final IndexMetaData indexMetaData;
@ -279,15 +283,21 @@ public abstract class ESIndexLevelReplicationTestCase extends ESTestCase {
replicas.add(replica); replicas.add(replica);
return replica; return replica;
} }
public void recoverReplica(IndexShard replica, BiFunction<IndexShard, DiscoveryNode, RecoveryTarget> targetSupplier) public void recoverReplica(IndexShard replica, BiFunction<IndexShard, DiscoveryNode, RecoveryTarget> targetSupplier)
throws IOException { throws IOException {
final DiscoveryNode pNode; recoverReplica(replica, targetSupplier, true);
synchronized (this) { }
pNode = getDiscoveryNode(primary.routingEntry().currentNodeId());
} public void recoverReplica(IndexShard replica, BiFunction<IndexShard, DiscoveryNode, RecoveryTarget> targetSupplier,
boolean markAsRecovering)
throws IOException {
final DiscoveryNode pNode = getPrimaryNode();
final DiscoveryNode rNode = getDiscoveryNode(replica.routingEntry().currentNodeId()); final DiscoveryNode rNode = getDiscoveryNode(replica.routingEntry().currentNodeId());
replica.markAsRecovering("remote", new RecoveryState(replica.shardId(), false, RecoveryState.Type.REPLICA, pNode, rNode)); if (markAsRecovering) {
replica.markAsRecovering("remote", new RecoveryState(replica.shardId(), false, RecoveryState.Type.REPLICA, pNode, rNode));
} else {
assertEquals(replica.state(), IndexShardState.RECOVERING);
}
replica.prepareForIndexRecovery(); replica.prepareForIndexRecovery();
RecoveryTarget recoveryTarget = targetSupplier.apply(replica, pNode); RecoveryTarget recoveryTarget = targetSupplier.apply(replica, pNode);
StartRecoveryRequest request = new StartRecoveryRequest(replica.shardId(), pNode, rNode, StartRecoveryRequest request = new StartRecoveryRequest(replica.shardId(), pNode, rNode,
@ -299,6 +309,10 @@ public abstract class ESIndexLevelReplicationTestCase extends ESTestCase {
replica.updateRoutingEntry(ShardRoutingHelper.moveToStarted(replica.routingEntry())); replica.updateRoutingEntry(ShardRoutingHelper.moveToStarted(replica.routingEntry()));
} }
public synchronized DiscoveryNode getPrimaryNode() {
return getDiscoveryNode(primary.routingEntry().currentNodeId());
}
public Future<Void> asyncRecoverReplica(IndexShard replica, BiFunction<IndexShard, DiscoveryNode, RecoveryTarget> targetSupplier) public Future<Void> asyncRecoverReplica(IndexShard replica, BiFunction<IndexShard, DiscoveryNode, RecoveryTarget> targetSupplier)
throws IOException { throws IOException {
FutureTask<Void> task = new FutureTask<>(() -> { FutureTask<Void> task = new FutureTask<>(() -> {
@ -375,6 +389,10 @@ public abstract class ESIndexLevelReplicationTestCase extends ESTestCase {
public Iterator<IndexShard> iterator() { public Iterator<IndexShard> iterator() {
return Iterators.<IndexShard>concat(replicas.iterator(), Collections.singleton(primary).iterator()); return Iterators.<IndexShard>concat(replicas.iterator(), Collections.singleton(primary).iterator());
} }
public IndexShard getPrimary() {
return primary;
}
} }
class IndexingOp extends ReplicationOperation<IndexRequest, IndexRequest, IndexingResult> { class IndexingOp extends ReplicationOperation<IndexRequest, IndexRequest, IndexingResult> {

View File

@ -36,8 +36,6 @@ import java.util.regex.Pattern;
import static java.util.Collections.emptyMap; import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet; import static java.util.Collections.emptySet;
/**
*/
public class RecoveryStatusTests extends ESSingleNodeTestCase { public class RecoveryStatusTests extends ESSingleNodeTestCase {
public void testRenameTempFiles() throws IOException { public void testRenameTempFiles() throws IOException {
@ -73,7 +71,7 @@ public class RecoveryStatusTests extends ESSingleNodeTestCase {
Set<String> strings = Sets.newHashSet(status.store().directory().listAll()); Set<String> strings = Sets.newHashSet(status.store().directory().listAll());
String expectedFile = null; String expectedFile = null;
for (String file : strings) { for (String file : strings) {
if (Pattern.compile("recovery[.]\\d+[.]foo[.]bar").matcher(file).matches()) { if (Pattern.compile("recovery[.][\\w-]+[.]foo[.]bar").matcher(file).matches()) {
expectedFile = file; expectedFile = file;
break; break;
} }

View File

@ -18,12 +18,15 @@
*/ */
package org.elasticsearch.recovery; package org.elasticsearch.recovery;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version; import org.elasticsearch.Version;
import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.TestShardRouting;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.LocalTransportAddress; import org.elasticsearch.common.transport.LocalTransportAddress;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.replication.ESIndexLevelReplicationTestCase;
import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.IndicesService;
@ -32,8 +35,12 @@ import org.elasticsearch.indices.recovery.RecoveryFailedException;
import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.indices.recovery.RecoveryTargetService; import org.elasticsearch.indices.recovery.RecoveryTargetService;
import org.elasticsearch.test.ESSingleNodeTestCase; import org.elasticsearch.test.ESSingleNodeTestCase;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.junit.After;
import org.junit.Before;
import java.io.IOException;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
@ -43,7 +50,7 @@ import static java.util.Collections.emptySet;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.lessThan; import static org.hamcrest.Matchers.lessThan;
public class RecoveriesCollectionTests extends ESSingleNodeTestCase { public class RecoveriesCollectionTests extends ESIndexLevelReplicationTestCase {
static final RecoveryTargetService.RecoveryListener listener = new RecoveryTargetService.RecoveryListener() { static final RecoveryTargetService.RecoveryListener listener = new RecoveryTargetService.RecoveryListener() {
@Override @Override
public void onRecoveryDone(RecoveryState state) { public void onRecoveryDone(RecoveryState state) {
@ -57,83 +64,115 @@ public class RecoveriesCollectionTests extends ESSingleNodeTestCase {
}; };
public void testLastAccessTimeUpdate() throws Exception { public void testLastAccessTimeUpdate() throws Exception {
createIndex(); try (ReplicationGroup shards = createGroup(0)) {
final RecoveriesCollection collection = new RecoveriesCollection(logger, getInstanceFromNode(ThreadPool.class)); final RecoveriesCollection collection = new RecoveriesCollection(logger, threadPool);
final long recoveryId = startRecovery(collection); final long recoveryId = startRecovery(collection, shards.getPrimaryNode(), shards.addReplica());
try (RecoveriesCollection.RecoveryRef status = collection.getRecovery(recoveryId)) { try (RecoveriesCollection.RecoveryRef status = collection.getRecovery(recoveryId)) {
final long lastSeenTime = status.status().lastAccessTime(); final long lastSeenTime = status.status().lastAccessTime();
assertBusy(new Runnable() { assertBusy(() -> {
@Override
public void run() {
try (RecoveriesCollection.RecoveryRef currentStatus = collection.getRecovery(recoveryId)) { try (RecoveriesCollection.RecoveryRef currentStatus = collection.getRecovery(recoveryId)) {
assertThat("access time failed to update", lastSeenTime, lessThan(currentStatus.status().lastAccessTime())); assertThat("access time failed to update", lastSeenTime, lessThan(currentStatus.status().lastAccessTime()));
} }
} });
}); } finally {
} finally { collection.cancelRecovery(recoveryId, "life");
collection.cancelRecovery(recoveryId, "life"); }
} }
} }
public void testRecoveryTimeout() throws InterruptedException { public void testRecoveryTimeout() throws Exception {
createIndex(); try (ReplicationGroup shards = createGroup(0)) {
final RecoveriesCollection collection = new RecoveriesCollection(logger, getInstanceFromNode(ThreadPool.class)); final RecoveriesCollection collection = new RecoveriesCollection(logger, threadPool);
final AtomicBoolean failed = new AtomicBoolean(); final AtomicBoolean failed = new AtomicBoolean();
final CountDownLatch latch = new CountDownLatch(1); final CountDownLatch latch = new CountDownLatch(1);
final long recoveryId = startRecovery(collection, new RecoveryTargetService.RecoveryListener() { final long recoveryId = startRecovery(collection, shards.getPrimaryNode(), shards.addReplica(),
@Override new RecoveryTargetService.RecoveryListener() {
public void onRecoveryDone(RecoveryState state) { @Override
latch.countDown(); public void onRecoveryDone(RecoveryState state) {
} latch.countDown();
}
@Override @Override
public void onRecoveryFailure(RecoveryState state, RecoveryFailedException e, boolean sendShardFailure) { public void onRecoveryFailure(RecoveryState state, RecoveryFailedException e, boolean sendShardFailure) {
failed.set(true); failed.set(true);
latch.countDown(); latch.countDown();
}
}, TimeValue.timeValueMillis(100));
try {
latch.await(30, TimeUnit.SECONDS);
assertTrue("recovery failed to timeout", failed.get());
} finally {
collection.cancelRecovery(recoveryId, "meh");
} }
}, TimeValue.timeValueMillis(100));
try {
latch.await(30, TimeUnit.SECONDS);
assertTrue("recovery failed to timeout", failed.get());
} finally {
collection.cancelRecovery(recoveryId, "meh");
} }
} }
public void testRecoveryCancellation() throws Exception { public void testRecoveryCancellation() throws Exception {
createIndex(); try (ReplicationGroup shards = createGroup(0)) {
final RecoveriesCollection collection = new RecoveriesCollection(logger, getInstanceFromNode(ThreadPool.class)); final RecoveriesCollection collection = new RecoveriesCollection(logger, threadPool);
final long recoveryId = startRecovery(collection); final long recoveryId = startRecovery(collection, shards.getPrimaryNode(), shards.addReplica());
final long recoveryId2 = startRecovery(collection); final long recoveryId2 = startRecovery(collection, shards.getPrimaryNode(), shards.addReplica());
try (RecoveriesCollection.RecoveryRef recoveryRef = collection.getRecovery(recoveryId)) { try (RecoveriesCollection.RecoveryRef recoveryRef = collection.getRecovery(recoveryId)) {
ShardId shardId = recoveryRef.status().shardId(); ShardId shardId = recoveryRef.status().shardId();
assertTrue("failed to cancel recoveries", collection.cancelRecoveriesForShard(shardId, "test")); assertTrue("failed to cancel recoveries", collection.cancelRecoveriesForShard(shardId, "test"));
assertThat("all recoveries should be cancelled", collection.size(), equalTo(0)); assertThat("all recoveries should be cancelled", collection.size(), equalTo(0));
} finally { } finally {
collection.cancelRecovery(recoveryId, "meh"); collection.cancelRecovery(recoveryId, "meh");
collection.cancelRecovery(recoveryId2, "meh"); collection.cancelRecovery(recoveryId2, "meh");
}
} }
} }
protected void createIndex() { public void testResetRecovery() throws Exception {
createIndex("test", try (ReplicationGroup shards = createGroup(0)) {
Settings.builder() shards.startAll();
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1, IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) int numDocs = randomIntBetween(1, 15);
.build()); shards.indexDocs(numDocs);
ensureGreen(); final RecoveriesCollection collection = new RecoveriesCollection(logger, threadPool);
IndexShard shard = shards.addReplica();
final long recoveryId = startRecovery(collection, shards.getPrimaryNode(), shard);
try (RecoveriesCollection.RecoveryRef recovery = collection.getRecovery(recoveryId)) {
final int currentAsTarget = shard.recoveryStats().currentAsTarget();
final int referencesToStore = recovery.status().store().refCount();
String tempFileName = recovery.status().getTempNameForFile("foobar");
collection.resetRecovery(recoveryId, recovery.status().shardId());
try (RecoveriesCollection.RecoveryRef resetRecovery = collection.getRecovery(recoveryId)) {
assertNotSame(recovery.status(), resetRecovery);
assertSame(recovery.status().CancellableThreads(), resetRecovery.status().CancellableThreads());
assertSame(recovery.status().indexShard(), resetRecovery.status().indexShard());
assertSame(recovery.status().store(), resetRecovery.status().store());
assertEquals(referencesToStore + 1, resetRecovery.status().store().refCount());
assertEquals(currentAsTarget+1, shard.recoveryStats().currentAsTarget()); // we blink for a short moment...
recovery.close();
expectThrows(ElasticsearchException.class, () -> recovery.status().store());
assertEquals(referencesToStore, resetRecovery.status().store().refCount());
String resetTempFileName = resetRecovery.status().getTempNameForFile("foobar");
assertNotEquals(tempFileName, resetTempFileName);
}
assertEquals(currentAsTarget, shard.recoveryStats().currentAsTarget());
}
try (RecoveriesCollection.RecoveryRef resetRecovery = collection.getRecovery(recoveryId)) {
shards.recoverReplica(shard, (s, n) -> {
assertSame(s, resetRecovery.status().indexShard());
return resetRecovery.status();
}, false);
}
shards.assertAllEqual(numDocs);
assertNull("recovery is done", collection.getRecovery(recoveryId));
}
} }
long startRecovery(RecoveriesCollection collection, DiscoveryNode sourceNode, IndexShard shard) {
long startRecovery(RecoveriesCollection collection) { return startRecovery(collection,sourceNode, shard, listener, TimeValue.timeValueMinutes(60));
return startRecovery(collection, listener, TimeValue.timeValueMinutes(60));
} }
long startRecovery(RecoveriesCollection collection, RecoveryTargetService.RecoveryListener listener, TimeValue timeValue) { long startRecovery(RecoveriesCollection collection, DiscoveryNode sourceNode, IndexShard indexShard,
IndicesService indexServices = getInstanceFromNode(IndicesService.class); RecoveryTargetService.RecoveryListener listener, TimeValue timeValue) {
IndexShard indexShard = indexServices.indexServiceSafe(resolveIndex("test")).getShardOrNull(0); final DiscoveryNode rNode = getDiscoveryNode(indexShard.routingEntry().currentNodeId());
final DiscoveryNode sourceNode = new DiscoveryNode("id", LocalTransportAddress.buildUnique(), emptyMap(), emptySet(), indexShard.markAsRecovering("remote", new RecoveryState(indexShard.shardId(), false, RecoveryState.Type.REPLICA, sourceNode,
Version.CURRENT); rNode));
indexShard.prepareForIndexRecovery();
return collection.startRecovery(indexShard, sourceNode, listener, timeValue); return collection.startRecovery(indexShard, sourceNode, listener, timeValue);
} }
} }