Ensure sendBatch not called recursively (#39988)

This PR introduces AsyncRecoveryTarget which executes remote calls of
peer recovery asynchronously. In this change, we also add a new
assertion to ensure that method sendBatch, which sends a batch of
history operations in phase2, is never called recursively on the same
thread. This new assertion will also be used in method sendFileChunks.
This commit is contained in:
Nhat Nguyen 2019-03-13 12:10:14 -04:00
parent 7325b2a3a7
commit d720a64b9e
8 changed files with 187 additions and 40 deletions

View File

@ -584,6 +584,7 @@ public class RecoverySourceHandler {
final long maxSeqNoOfUpdatesOrDeletes,
final RetentionLeases retentionLeases,
final ActionListener<Long> listener) throws IOException {
assert ThreadPool.assertCurrentMethodIsNotCalledRecursively();
final List<Translog.Operation> operations = nextBatch.get();
// send the leftover operations or if no operations were sent, request the target to respond with its local checkpoint
if (operations.isEmpty() == false || firstBatch) {

View File

@ -786,4 +786,18 @@ public class ThreadPool implements Scheduler, Closeable {
"Expected current thread [" + Thread.currentThread() + "] to not be the scheduler thread. Reason: [" + reason + "]";
return true;
}
public static boolean assertCurrentMethodIsNotCalledRecursively() {
final StackTraceElement[] stackTraceElements = Thread.currentThread().getStackTrace();
assert stackTraceElements.length >= 3 : stackTraceElements.length;
assert stackTraceElements[0].getMethodName().equals("getStackTrace") : stackTraceElements[0];
assert stackTraceElements[1].getMethodName().equals("assertCurrentMethodIsNotCalledRecursively") : stackTraceElements[1];
final StackTraceElement testingMethod = stackTraceElements[2];
for (int i = 3; i < stackTraceElements.length; i++) {
assert stackTraceElements[i].getClassName().equals(testingMethod.getClassName()) == false
|| stackTraceElements[i].getMethodName().equals(testingMethod.getMethodName()) == false :
testingMethod.getClassName() + "#" + testingMethod.getMethodName() + " is called recursively";
}
return true;
}
}

View File

@ -31,14 +31,13 @@ import org.elasticsearch.index.seqno.RetentionLease;
import org.elasticsearch.index.seqno.RetentionLeaseStats;
import org.elasticsearch.index.seqno.RetentionLeases;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
@ -47,10 +46,6 @@ import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.hasSize;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class IndexShardRetentionLeaseTests extends IndexShardTestCase {
@ -58,16 +53,12 @@ public class IndexShardRetentionLeaseTests extends IndexShardTestCase {
@Override
protected ThreadPool setUpThreadPool() {
final ThreadPool threadPool = mock(ThreadPool.class);
doAnswer(invocationOnMock -> currentTimeMillis.get()).when(threadPool).absoluteTimeInMillis();
when(threadPool.executor(anyString())).thenReturn(mock(ExecutorService.class));
when(threadPool.scheduler()).thenReturn(mock(ScheduledExecutorService.class));
return threadPool;
}
return new TestThreadPool(getClass().getName(), threadPoolSettings()) {
@Override
protected void tearDownThreadPool() {
public long absoluteTimeInMillis() {
return currentTimeMillis.get();
}
};
}
public void testAddOrRenewRetentionLease() throws IOException {

View File

@ -2443,7 +2443,13 @@ public class IndexShardTests extends IndexShardTestCase {
maxSeenAutoIdTimestamp,
maxSeqNoOfUpdatesOrDeletes,
retentionLeases,
ActionListener.runAfter(listener, () -> assertFalse(replica.isSyncNeeded())));
ActionListener.wrap(
r -> {
assertFalse(replica.isSyncNeeded());
listener.onResponse(r);
},
listener::onFailure
));
}
}, true, true);
@ -2604,8 +2610,12 @@ public class IndexShardTests extends IndexShardTestCase {
// we're only checking that listeners are called when the engine is open, before there is no point
@Override
public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps, ActionListener<Void> listener) {
super.prepareForTranslogOperations(fileBasedRecovery, totalTranslogOps, listener);
super.prepareForTranslogOperations(fileBasedRecovery, totalTranslogOps,
ActionListener.wrap(
r -> {
assertListenerCalled.accept(replica);
listener.onResponse(r);
}, listener::onFailure));
}
@Override
@ -2622,15 +2632,21 @@ public class IndexShardTests extends IndexShardTestCase {
maxAutoIdTimestamp,
maxSeqNoOfUpdatesOrDeletes,
retentionLeases,
ActionListener.map(listener, checkpoint -> {
ActionListener.wrap(
r -> {
assertListenerCalled.accept(replica);
return checkpoint;
}));
listener.onResponse(r);
}, listener::onFailure));
}
@Override
public void finalizeRecovery(long globalCheckpoint, ActionListener<Void> listener) {
super.finalizeRecovery(globalCheckpoint, ActionListener.runAfter(listener, () -> assertListenerCalled.accept(replica)));
super.finalizeRecovery(globalCheckpoint,
ActionListener.wrap(
r -> {
assertListenerCalled.accept(replica);
listener.onResponse(r);
}, listener::onFailure));
}
}, false, true);

View File

@ -240,10 +240,10 @@ public class RecoverySourceHandlerTests extends ESTestCase {
RetentionLeases retentionLeases, ActionListener<Long> listener) {
shippedOps.addAll(operations);
checkpointOnTarget.set(randomLongBetween(checkpointOnTarget.get(), Long.MAX_VALUE));
maybeExecuteAsync(() -> listener.onResponse(checkpointOnTarget.get()));
}
listener.onResponse(checkpointOnTarget.get()); }
};
RecoverySourceHandler handler = new RecoverySourceHandler(shard, recoveryTarget, request, fileChunkSizeInBytes, between(1, 10));
RecoverySourceHandler handler = new RecoverySourceHandler(
shard, new AsyncRecoveryTarget(recoveryTarget, threadPool.generic()), request, fileChunkSizeInBytes, between(1, 10));
PlainActionFuture<RecoverySourceHandler.SendSnapshotResult> future = new PlainActionFuture<>();
handler.phase2(startingSeqNo, endingSeqNo, newTranslogSnapshot(operations, Collections.emptyList()),
randomNonNegativeLong(), randomNonNegativeLong(), RetentionLeases.EMPTY, future);
@ -274,14 +274,15 @@ public class RecoverySourceHandlerTests extends ESTestCase {
public void indexTranslogOperations(List<Translog.Operation> operations, int totalTranslogOps, long timestamp,
long msu, RetentionLeases retentionLeases, ActionListener<Long> listener) {
if (randomBoolean()) {
maybeExecuteAsync(() -> listener.onResponse(SequenceNumbers.NO_OPS_PERFORMED));
listener.onResponse(SequenceNumbers.NO_OPS_PERFORMED);
} else {
maybeExecuteAsync(() -> listener.onFailure(new RuntimeException("test - failed to index")));
listener.onFailure(new RuntimeException("test - failed to index"));
wasFailed.set(true);
}
}
};
RecoverySourceHandler handler = new RecoverySourceHandler(shard, recoveryTarget, request, fileChunkSizeInBytes, between(1, 10));
RecoverySourceHandler handler = new RecoverySourceHandler(
shard, new AsyncRecoveryTarget(recoveryTarget, threadPool.generic()), request, fileChunkSizeInBytes, between(1, 10));
PlainActionFuture<RecoverySourceHandler.SendSnapshotResult> future = new PlainActionFuture<>();
final long startingSeqNo = randomLongBetween(0, ops.size() - 1L);
final long endingSeqNo = randomLongBetween(startingSeqNo, ops.size() - 1L);
@ -761,12 +762,4 @@ public class RecoverySourceHandlerTests extends ESTestCase {
}
};
}
private void maybeExecuteAsync(Runnable runnable) {
if (randomBoolean()) {
threadPool.generic().execute(runnable);
} else {
runnable.run();
}
}
}

View File

@ -20,9 +20,14 @@
package org.elasticsearch.threadpool;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.test.ESTestCase;
import java.util.concurrent.ExecutorService;
import static org.elasticsearch.threadpool.ThreadPool.ESTIMATED_TIME_INTERVAL_SETTING;
import static org.elasticsearch.threadpool.ThreadPool.assertCurrentMethodIsNotCalledRecursively;
import static org.hamcrest.CoreMatchers.equalTo;
public class ThreadPoolTests extends ESTestCase {
@ -67,4 +72,35 @@ public class ThreadPoolTests extends ESTestCase {
Exception e = expectThrows(IllegalArgumentException.class, () -> ESTIMATED_TIME_INTERVAL_SETTING.get(settings));
assertEquals("failed to parse value [-1] for setting [thread_pool.estimated_time_interval], must be >= [0ms]", e.getMessage());
}
int factorial(int n) {
assertCurrentMethodIsNotCalledRecursively();
if (n <= 1) {
return 1;
} else {
return n * factorial(n - 1);
}
}
int factorialForked(int n, ExecutorService executor) {
assertCurrentMethodIsNotCalledRecursively();
if (n <= 1) {
return 1;
}
return n * FutureUtils.get(executor.submit(() -> factorialForked(n - 1, executor)));
}
public void testAssertCurrentMethodIsNotCalledRecursively() {
expectThrows(AssertionError.class, () -> factorial(between(2, 10)));
assertThat(factorial(1), equalTo(1)); // is not called recursively
assertThat(expectThrows(AssertionError.class, () -> factorial(between(2, 10))).getMessage(),
equalTo("org.elasticsearch.threadpool.ThreadPoolTests#factorial is called recursively"));
TestThreadPool threadPool = new TestThreadPool("test");
assertThat(factorialForked(1, threadPool.generic()), equalTo(1));
assertThat(factorialForked(10, threadPool.generic()), equalTo(3628800));
assertThat(expectThrows(AssertionError.class,
() -> factorialForked(between(2, 10), EsExecutors.newDirectExecutorService())).getMessage(),
equalTo("org.elasticsearch.threadpool.ThreadPoolTests#factorialForked is called recursively"));
terminate(threadPool);
}
}

View File

@ -67,6 +67,7 @@ import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService;
import org.elasticsearch.indices.recovery.AsyncRecoveryTarget;
import org.elasticsearch.indices.recovery.PeerRecoveryTargetService;
import org.elasticsearch.indices.recovery.RecoveryFailedException;
import org.elasticsearch.indices.recovery.RecoveryResponse;
@ -629,8 +630,9 @@ public abstract class IndexShardTestCase extends ESTestCase {
final StartRecoveryRequest request = new StartRecoveryRequest(replica.shardId(), targetAllocationId,
pNode, rNode, snapshot, replica.routingEntry().primary(), 0, startingSeqNo);
final RecoverySourceHandler recovery = new RecoverySourceHandler(
primary, recoveryTarget, request, Math.toIntExact(ByteSizeUnit.MB.toBytes(1)), between(1, 8));
final RecoverySourceHandler recovery = new RecoverySourceHandler(primary,
new AsyncRecoveryTarget(recoveryTarget, threadPool.generic()),
request, Math.toIntExact(ByteSizeUnit.MB.toBytes(1)), between(1, 8));
primary.updateShardState(primary.routingEntry(), primary.getPendingPrimaryTerm(), null,
currentClusterStateVersion.incrementAndGet(), inSyncIds, routingTable, Collections.emptySet());

View File

@ -0,0 +1,94 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.indices.recovery;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.index.seqno.ReplicationTracker;
import org.elasticsearch.index.seqno.RetentionLeases;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.store.StoreFileMetaData;
import org.elasticsearch.index.translog.Translog;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.Executor;
/**
* Wraps a {@link RecoveryTarget} to make all remote calls to be executed asynchronously using the provided {@code executor}.
*/
public class AsyncRecoveryTarget implements RecoveryTargetHandler {
private final RecoveryTargetHandler target;
private final Executor executor;
public AsyncRecoveryTarget(RecoveryTargetHandler target, Executor executor) {
this.executor = executor;
this.target = target;
}
@Override
public void ensureClusterStateVersion(long clusterStateVersion) {
target.ensureClusterStateVersion(clusterStateVersion);
}
@Override
public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps, ActionListener<Void> listener) {
executor.execute(() -> target.prepareForTranslogOperations(fileBasedRecovery, totalTranslogOps, listener));
}
@Override
public void finalizeRecovery(long globalCheckpoint, ActionListener<Void> listener) {
executor.execute(() -> target.finalizeRecovery(globalCheckpoint, listener));
}
@Override
public void handoffPrimaryContext(ReplicationTracker.PrimaryContext primaryContext) {
target.handoffPrimaryContext(primaryContext);
}
@Override
public void indexTranslogOperations(List<Translog.Operation> operations, int totalTranslogOps,
long maxSeenAutoIdTimestampOnPrimary, long maxSeqNoOfDeletesOrUpdatesOnPrimary,
RetentionLeases retentionLeases, ActionListener<Long> listener) {
executor.execute(() -> target.indexTranslogOperations(
operations, totalTranslogOps, maxSeenAutoIdTimestampOnPrimary, maxSeqNoOfDeletesOrUpdatesOnPrimary, retentionLeases, listener));
}
@Override
public void receiveFileInfo(List<String> phase1FileNames, List<Long> phase1FileSizes, List<String> phase1ExistingFileNames,
List<Long> phase1ExistingFileSizes, int totalTranslogOps) {
target.receiveFileInfo(phase1FileNames, phase1FileSizes, phase1ExistingFileNames, phase1ExistingFileSizes, totalTranslogOps);
}
@Override
public void cleanFiles(int totalTranslogOps, Store.MetadataSnapshot sourceMetaData) throws IOException {
target.cleanFiles(totalTranslogOps, sourceMetaData);
}
@Override
public void writeFileChunk(StoreFileMetaData fileMetaData, long position, BytesReference content,
boolean lastChunk, int totalTranslogOps, ActionListener<Void> listener) {
// TODO: remove this clone once we send file chunk async
final BytesReference copy = new BytesArray(BytesRef.deepCopyOf(content.toBytesRef()));
executor.execute(() -> target.writeFileChunk(fileMetaData, position, copy, lastChunk, totalTranslogOps, listener));
}
}