Never release store using CancellableThreads (#45409)

Today we can release a Store using CancellableThreads. If we are holding
the last reference, then we will verify the node lock before deleting
the store. Checking node lock performs some I/O on FileChannel. If the
current thread is interrupted, then the channel will be closed and the
node lock will also be invalid.

Closes #45237
This commit is contained in:
Nhat Nguyen 2019-08-21 18:16:58 -04:00
parent 9b180314e3
commit 3029887451
3 changed files with 44 additions and 2 deletions

View File

@ -112,6 +112,11 @@ public class PeerRecoverySourceService implements IndexEventListener {
}
}
// exposed for testing
final int numberOfOngoingRecoveries() {
return ongoingRecoveries.ongoingRecoveries.size();
}
final class OngoingRecoveries {
private final Map<IndexShard, ShardRecoveryContext> ongoingRecoveries = new HashMap<>();

View File

@ -33,7 +33,9 @@ import org.apache.lucene.util.SetOnce;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.StepListener;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.ThreadedActionListener;
import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.cluster.metadata.IndexMetaData;
@ -232,8 +234,7 @@ public class RecoverySourceHandler {
try {
final int estimateNumOps = shard.estimateNumberOfHistoryOperations("peer-recovery", startingSeqNo);
shard.store().incRef();
final Releasable releaseStore = Releasables.releaseOnce(shard.store()::decRef);
final Releasable releaseStore = acquireStore(shard.store());
resources.add(releaseStore);
sendFileStep.whenComplete(r -> IOUtils.close(safeCommitRef, releaseStore), e -> {
try {
@ -396,6 +397,25 @@ public class RecoverySourceHandler {
});
}
/**
* Increases the store reference and returns a {@link Releasable} that will decrease the store reference using the generic thread pool.
* We must never release the store using an interruptible thread as we can risk invalidating the node lock.
*/
private Releasable acquireStore(Store store) {
store.incRef();
return Releasables.releaseOnce(() -> {
final PlainActionFuture<Void> future = new PlainActionFuture<>();
threadPool.generic().execute(new ActionRunnable<Void>(future) {
@Override
protected void doRun() {
store.decRef();
listener.onResponse(null);
}
});
FutureUtils.get(future);
});
}
static final class SendFileResult {
final List<String> phase1FileNames;
final List<Long> phase1FileSizes;

View File

@ -36,6 +36,7 @@ import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.WriteRequest.RefreshPolicy;
import org.elasticsearch.action.support.replication.ReplicationResponse;
@ -1488,4 +1489,20 @@ public class IndexRecoveryIT extends ESIntegTestCase {
}
ensureGreen(indexName);
}
public void testCancelRecoveryWithAutoExpandReplicas() throws Exception {
internalCluster().startMasterOnlyNode();
assertAcked(client().admin().indices().prepareCreate("test")
.setSettings(Settings.builder().put(IndexMetaData.SETTING_AUTO_EXPAND_REPLICAS, "0-all"))
.setWaitForActiveShards(ActiveShardCount.NONE));
internalCluster().startNode();
internalCluster().startNode();
client().admin().cluster().prepareReroute().setRetryFailed(true).get();
assertAcked(client().admin().indices().prepareDelete("test")); // cancel recoveries
assertBusy(() -> {
for (PeerRecoverySourceService recoveryService : internalCluster().getDataNodeInstances(PeerRecoverySourceService.class)) {
assertThat(recoveryService.numberOfOngoingRecoveries(), equalTo(0));
}
});
}
}