Currently the shard bulk request can be rejected by the write threadpool after a mapping update. This introduces a scenario where the mapping listener thread will attempt to finish the request and fsync. This thread can potentially be a transport thread. This commit fixes this issue by forcing the finish action to happen on the write threadpool. Fixes #51904.
This commit is contained in:
parent
848d3bc153
commit
c8ef9649e2
|
@ -168,16 +168,29 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
|
|||
|
||||
@Override
|
||||
public void onRejection(Exception e) {
|
||||
// Fail all operations after a bulk rejection hit an action that waited for a mapping update and finish the request
|
||||
while (context.hasMoreOperationsToExecute()) {
|
||||
context.setRequestToExecute(context.getCurrent());
|
||||
final DocWriteRequest<?> docWriteRequest = context.getRequestToExecute();
|
||||
onComplete(
|
||||
exceptionToResult(
|
||||
e, primary, docWriteRequest.opType() == DocWriteRequest.OpType.DELETE, docWriteRequest.version()),
|
||||
context, null);
|
||||
}
|
||||
finishRequest();
|
||||
// We must finish the outstanding request. Finishing the outstanding request can include
|
||||
//refreshing and fsyncing. Therefore, we must force execution on the WRITE thread.
|
||||
executor.execute(new ActionRunnable<PrimaryResult<BulkShardRequest, BulkShardResponse>>(listener) {
|
||||
|
||||
@Override
|
||||
protected void doRun() {
|
||||
// Fail all operations after a bulk rejection hit an action that waited for a mapping update and finish the request
|
||||
while (context.hasMoreOperationsToExecute()) {
|
||||
context.setRequestToExecute(context.getCurrent());
|
||||
final DocWriteRequest<?> docWriteRequest = context.getRequestToExecute();
|
||||
onComplete(
|
||||
exceptionToResult(
|
||||
e, primary, docWriteRequest.opType() == DocWriteRequest.OpType.DELETE, docWriteRequest.version()),
|
||||
context, null);
|
||||
}
|
||||
finishRequest();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isForceExecution() {
|
||||
return true;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private void finishRequest() {
|
||||
|
|
|
@ -40,6 +40,7 @@ import org.elasticsearch.client.Requests;
|
|||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.common.lucene.uid.Versions;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
|
||||
import org.elasticsearch.index.IndexSettings;
|
||||
import org.elasticsearch.index.VersionType;
|
||||
import org.elasticsearch.index.engine.Engine;
|
||||
|
@ -53,13 +54,18 @@ import org.elasticsearch.index.shard.IndexShardTestCase;
|
|||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.index.translog.Translog;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
import org.elasticsearch.threadpool.TestThreadPool;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
import java.util.concurrent.BrokenBarrierException;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.CyclicBarrier;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import static org.hamcrest.CoreMatchers.equalTo;
|
||||
import static org.hamcrest.CoreMatchers.instanceOf;
|
||||
import static org.hamcrest.CoreMatchers.not;
|
||||
import static org.hamcrest.CoreMatchers.notNullValue;
|
||||
import static org.hamcrest.Matchers.arrayWithSize;
|
||||
|
@ -818,6 +824,105 @@ public class TransportShardBulkActionTests extends IndexShardTestCase {
|
|||
latch.await();
|
||||
}
|
||||
|
||||
public void testForceExecutionOnRejectionAfterMappingUpdate() throws Exception {
|
||||
TestThreadPool rejectingThreadPool = new TestThreadPool(
|
||||
"TransportShardBulkActionTests#testForceExecutionOnRejectionAfterMappingUpdate",
|
||||
Settings.builder()
|
||||
.put("thread_pool." + ThreadPool.Names.WRITE + ".size", 1)
|
||||
.put("thread_pool." + ThreadPool.Names.WRITE + ".queue_size", 1)
|
||||
.build());
|
||||
CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
|
||||
rejectingThreadPool.executor(ThreadPool.Names.WRITE).execute(() -> {
|
||||
try {
|
||||
cyclicBarrier.await();
|
||||
logger.info("blocking the write executor");
|
||||
cyclicBarrier.await();
|
||||
logger.info("unblocked the write executor");
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
});
|
||||
try {
|
||||
cyclicBarrier.await();
|
||||
// Place a task in the queue to block next enqueue
|
||||
rejectingThreadPool.executor(ThreadPool.Names.WRITE).execute(() -> {});
|
||||
|
||||
BulkItemRequest[] items = new BulkItemRequest[2];
|
||||
DocWriteRequest<IndexRequest> writeRequest1 = new IndexRequest("index").id("id")
|
||||
.source(Requests.INDEX_CONTENT_TYPE, "foo", 1);
|
||||
DocWriteRequest<IndexRequest> writeRequest2 = new IndexRequest("index").id("id")
|
||||
.source(Requests.INDEX_CONTENT_TYPE, "foo", "bar");
|
||||
items[0] = new BulkItemRequest(0, writeRequest1);
|
||||
items[1] = new BulkItemRequest(1, writeRequest2);
|
||||
BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, RefreshPolicy.NONE, items);
|
||||
|
||||
Engine.IndexResult mappingUpdate =
|
||||
new Engine.IndexResult(new Mapping(null, mock(RootObjectMapper.class), new MetadataFieldMapper[0], Collections.emptyMap()));
|
||||
Translog.Location resultLocation1 = new Translog.Location(42, 36, 36);
|
||||
Translog.Location resultLocation2 = new Translog.Location(42, 42, 42);
|
||||
Engine.IndexResult success1 = new FakeIndexResult(1, 1, 10, true, resultLocation1);
|
||||
Engine.IndexResult success2 = new FakeIndexResult(1, 1, 13, true, resultLocation2);
|
||||
|
||||
IndexShard shard = mock(IndexShard.class);
|
||||
when(shard.shardId()).thenReturn(shardId);
|
||||
when(shard.applyIndexOperationOnPrimary(anyLong(), any(), any(), anyLong(), anyLong(), anyLong(), anyBoolean()))
|
||||
.thenReturn(success1, mappingUpdate, success2);
|
||||
when(shard.getFailedIndexResult(any(EsRejectedExecutionException.class), anyLong())).thenCallRealMethod();
|
||||
when(shard.mapperService()).thenReturn(mock(MapperService.class));
|
||||
|
||||
randomlySetIgnoredPrimaryResponse(items[0]);
|
||||
|
||||
AtomicInteger updateCalled = new AtomicInteger();
|
||||
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
TransportShardBulkAction.performOnPrimary(
|
||||
bulkShardRequest, shard, null, rejectingThreadPool::absoluteTimeInMillis, (update, shardId, type, listener) -> {
|
||||
// There should indeed be a mapping update
|
||||
assertNotNull(update);
|
||||
updateCalled.incrementAndGet();
|
||||
listener.onResponse(null);
|
||||
try {
|
||||
// Release blocking task now that the continue write execution has been rejected and
|
||||
// the finishRequest execution has been force enqueued
|
||||
cyclicBarrier.await();
|
||||
} catch (InterruptedException | BrokenBarrierException e) {
|
||||
throw new IllegalStateException(e);
|
||||
}
|
||||
}, listener -> listener.onResponse(null), new LatchedActionListener<>(
|
||||
ActionTestUtils.assertNoFailureListener(result ->
|
||||
// Assert that we still need to fsync the location that was successfully written
|
||||
assertThat(((WritePrimaryResult<BulkShardRequest, BulkShardResponse>) result).location,
|
||||
equalTo(resultLocation1))), latch),
|
||||
rejectingThreadPool);
|
||||
latch.await();
|
||||
|
||||
assertThat("mappings were \"updated\" once", updateCalled.get(), equalTo(1));
|
||||
|
||||
verify(shard, times(2)).applyIndexOperationOnPrimary(anyLong(), any(), any(), anyLong(), anyLong(), anyLong(), anyBoolean());
|
||||
|
||||
BulkItemResponse primaryResponse1 = bulkShardRequest.items()[0].getPrimaryResponse();
|
||||
assertThat(primaryResponse1.getItemId(), equalTo(0));
|
||||
assertThat(primaryResponse1.getId(), equalTo("id"));
|
||||
assertThat(primaryResponse1.getOpType(), equalTo(DocWriteRequest.OpType.INDEX));
|
||||
assertFalse(primaryResponse1.isFailed());
|
||||
assertThat(primaryResponse1.getResponse().status(), equalTo(RestStatus.CREATED));
|
||||
assertThat(primaryResponse1.getResponse().getSeqNo(), equalTo(10L));
|
||||
|
||||
BulkItemResponse primaryResponse2 = bulkShardRequest.items()[1].getPrimaryResponse();
|
||||
assertThat(primaryResponse2.getItemId(), equalTo(1));
|
||||
assertThat(primaryResponse2.getId(), equalTo("id"));
|
||||
assertThat(primaryResponse2.getOpType(), equalTo(DocWriteRequest.OpType.INDEX));
|
||||
assertTrue(primaryResponse2.isFailed());
|
||||
assertNull(primaryResponse2.getResponse());
|
||||
assertEquals(primaryResponse2.status(), RestStatus.TOO_MANY_REQUESTS);
|
||||
assertThat(primaryResponse2.getFailure().getCause(), instanceOf(EsRejectedExecutionException.class));
|
||||
|
||||
closeShards(shard);
|
||||
} finally {
|
||||
rejectingThreadPool.shutdownNow();
|
||||
}
|
||||
}
|
||||
|
||||
private void randomlySetIgnoredPrimaryResponse(BulkItemRequest primaryRequest) {
|
||||
if (randomBoolean()) {
|
||||
// add a response to the request and thereby check that it is ignored for the primary.
|
||||
|
|
Loading…
Reference in New Issue