From 48528a0c9858f06c0daf7dc88cc9cecfe88a3654 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Mon, 7 Nov 2022 19:57:34 -0800 Subject: [PATCH] MSQ: Fix task lock checking during publish, fix lock priority. (#13282) * MSQ: Fix task lock checking during publish, fix lock priority. Fixes two issues: 1) ControllerImpl did not properly check the return value of SegmentTransactionalInsertAction when doing a REPLACE. This could cause it to not realize that its locks were preempted. 2) Task lock priority was the default of 0. It should be the higher batch default of 50. The low priority made it possible for MSQ tasks to be preempted by compaction tasks, which is not desired. * Restructuring, add docs. * Add performSegmentPublish tests. * Fix tests. --- docs/ingestion/tasks.md | 2 +- .../apache/druid/msq/exec/ControllerImpl.java | 61 ++++++---- .../druid/msq/indexing/MSQControllerTask.java | 6 + .../druid/msq/indexing/MSQWorkerTask.java | 7 ++ .../error/InsertLockPreemptedFault.java | 4 +- .../druid/msq/exec/ControllerImplTest.java | 104 ++++++++++++++++++ .../msq/test/MSQTestTaskActionClient.java | 8 ++ 7 files changed, 167 insertions(+), 25 deletions(-) create mode 100644 extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/ControllerImplTest.java diff --git a/docs/ingestion/tasks.md b/docs/ingestion/tasks.md index dbb4f2b65c7..1a6d58d0633 100644 --- a/docs/ingestion/tasks.md +++ b/docs/ingestion/tasks.md @@ -332,7 +332,7 @@ Each task type has a different default lock priority. The below table shows the |task type|default priority| |---------|----------------| |Realtime index task|75| -|Batch index task|50| +|Batch index tasks, including [native batch](native-batch.md), [SQL](../multi-stage-query/index.md), and [Hadoop-based](hadoop.md)|50| |Merge/Append/Compaction task|25| |Other tasks|0| diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java index 647a7567575..a4812292fb1 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java @@ -59,8 +59,9 @@ import org.apache.druid.indexing.common.actions.LockListAction; import org.apache.druid.indexing.common.actions.MarkSegmentsAsUnusedAction; import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction; import org.apache.druid.indexing.common.actions.SegmentAllocateAction; -import org.apache.druid.indexing.common.actions.SegmentInsertAction; import org.apache.druid.indexing.common.actions.SegmentTransactionalInsertAction; +import org.apache.druid.indexing.common.actions.TaskActionClient; +import org.apache.druid.indexing.overlord.SegmentPublishResult; import org.apache.druid.indexing.overlord.Segments; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.IAE; @@ -1090,30 +1091,17 @@ public class ControllerImpl implements Controller .submit(new MarkSegmentsAsUnusedAction(task.getDataSource(), interval)); } } else { - try { - context.taskActionClient() - .submit(SegmentTransactionalInsertAction.overwriteAction(null, segmentsToDrop, segments)); - } - catch (Exception e) { - if (isTaskLockPreemptedException(e)) { - throw new MSQException(e, InsertLockPreemptedFault.instance()); - } else { - throw e; - } - } + performSegmentPublish( + context.taskActionClient(), + SegmentTransactionalInsertAction.overwriteAction(null, segmentsToDrop, segments) + ); } } else if (!segments.isEmpty()) { // Append mode. - try { - context.taskActionClient().submit(new SegmentInsertAction(segments)); - } - catch (Exception e) { - if (isTaskLockPreemptedException(e)) { - throw new MSQException(e, InsertLockPreemptedFault.instance()); - } else { - throw e; - } - } + performSegmentPublish( + context.taskActionClient(), + SegmentTransactionalInsertAction.appendAction(segments, null, null) + ); } } @@ -1863,6 +1851,32 @@ public class ControllerImpl implements Controller return retVal; } + /** + * Performs a particular {@link SegmentTransactionalInsertAction}, publishing segments. + * + * Throws {@link MSQException} with {@link InsertLockPreemptedFault} if the action fails due to lock preemption. + */ + static void performSegmentPublish( + final TaskActionClient client, + final SegmentTransactionalInsertAction action + ) throws IOException + { + try { + final SegmentPublishResult result = client.submit(action); + + if (!result.isSuccess()) { + throw new MSQException(InsertLockPreemptedFault.instance()); + } + } + catch (Exception e) { + if (isTaskLockPreemptedException(e)) { + throw new MSQException(e, InsertLockPreemptedFault.instance()); + } else { + throw e; + } + } + } + /** * Method that determines whether an exception was raised due to the task lock for the controller task being * preempted. Uses string comparison, because the relevant Overlord APIs do not have a more reliable way of @@ -1874,6 +1888,9 @@ public class ControllerImpl implements Controller private static boolean isTaskLockPreemptedException(Exception e) { final String exceptionMsg = e.getMessage(); + if (exceptionMsg == null) { + return false; + } final List validExceptionExcerpts = ImmutableList.of( "are not covered by locks" /* From TaskLocks */, "is preempted and no longer valid" /* From SegmentAllocateAction */ diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java index c6fda274536..76d079348be 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java @@ -204,6 +204,12 @@ public class MSQControllerTask extends AbstractTask } } + @Override + public int getPriority() + { + return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_BATCH_INDEX_TASK_PRIORITY); + } + private static String getDataSourceForTaskMetadata(final MSQSpec querySpec) { final MSQDestination destination = querySpec.getDestination(); diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTask.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTask.java index 04eafc49cc4..e8117a015f5 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTask.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTask.java @@ -30,6 +30,7 @@ import org.apache.druid.indexing.common.TaskToolbox; import org.apache.druid.indexing.common.actions.TaskActionClient; import org.apache.druid.indexing.common.config.TaskConfig; import org.apache.druid.indexing.common.task.AbstractTask; +import org.apache.druid.indexing.common.task.Tasks; import org.apache.druid.msq.exec.MSQTasks; import org.apache.druid.msq.exec.Worker; import org.apache.druid.msq.exec.WorkerContext; @@ -112,4 +113,10 @@ public class MSQWorkerTask extends AbstractTask worker.stopGracefully(); } } + + @Override + public int getPriority() + { + return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_BATCH_INDEX_TASK_PRIORITY); + } } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/InsertLockPreemptedFault.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/InsertLockPreemptedFault.java index 83bd9ad8e62..c355dc3ff67 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/InsertLockPreemptedFault.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/InsertLockPreemptedFault.java @@ -32,8 +32,8 @@ public class InsertLockPreemptedFault extends BaseMSQFault { super( CODE, - "Insert lock preempted while trying to ingest the data." - + " This can occur if there are higher priority jobs like real-time ingestion running on same time chunks." + "Lock preempted while trying to ingest the data. This can occur if there are higher priority tasks, such as " + + "real-time ingestion, running on the same time chunks." ); } diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/ControllerImplTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/ControllerImplTest.java new file mode 100644 index 00000000000..b7b0c4b825f --- /dev/null +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/ControllerImplTest.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.msq.exec; + +import org.apache.druid.indexing.common.actions.SegmentTransactionalInsertAction; +import org.apache.druid.indexing.common.actions.TaskActionClient; +import org.apache.druid.indexing.overlord.SegmentPublishResult; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.msq.indexing.error.InsertLockPreemptedFault; +import org.apache.druid.msq.indexing.error.MSQException; +import org.easymock.EasyMock; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; +import java.util.Collections; + +public class ControllerImplTest +{ + @Test + public void test_performSegmentPublish_ok() throws IOException + { + final SegmentTransactionalInsertAction action = + SegmentTransactionalInsertAction.appendAction(Collections.emptySet(), null, null); + + final TaskActionClient taskActionClient = EasyMock.mock(TaskActionClient.class); + EasyMock.expect(taskActionClient.submit(action)).andReturn(SegmentPublishResult.ok(Collections.emptySet())); + EasyMock.replay(taskActionClient); + + // All OK. + ControllerImpl.performSegmentPublish(taskActionClient, action); + } + + @Test + public void test_performSegmentPublish_publishFail() throws IOException + { + final SegmentTransactionalInsertAction action = + SegmentTransactionalInsertAction.appendAction(Collections.emptySet(), null, null); + + final TaskActionClient taskActionClient = EasyMock.mock(TaskActionClient.class); + EasyMock.expect(taskActionClient.submit(action)).andReturn(SegmentPublishResult.fail("oops")); + EasyMock.replay(taskActionClient); + + final MSQException e = Assert.assertThrows( + MSQException.class, + () -> ControllerImpl.performSegmentPublish(taskActionClient, action) + ); + + Assert.assertEquals(InsertLockPreemptedFault.instance(), e.getFault()); + } + + @Test + public void test_performSegmentPublish_publishException() throws IOException + { + final SegmentTransactionalInsertAction action = + SegmentTransactionalInsertAction.appendAction(Collections.emptySet(), null, null); + + final TaskActionClient taskActionClient = EasyMock.mock(TaskActionClient.class); + EasyMock.expect(taskActionClient.submit(action)).andThrow(new ISE("oops")); + EasyMock.replay(taskActionClient); + + final ISE e = Assert.assertThrows( + ISE.class, + () -> ControllerImpl.performSegmentPublish(taskActionClient, action) + ); + + Assert.assertEquals("oops", e.getMessage()); + } + + @Test + public void test_performSegmentPublish_publishLockPreemptedException() throws IOException + { + final SegmentTransactionalInsertAction action = + SegmentTransactionalInsertAction.appendAction(Collections.emptySet(), null, null); + + final TaskActionClient taskActionClient = EasyMock.mock(TaskActionClient.class); + EasyMock.expect(taskActionClient.submit(action)).andThrow(new ISE("are not covered by locks")); + EasyMock.replay(taskActionClient); + + final MSQException e = Assert.assertThrows( + MSQException.class, + () -> ControllerImpl.performSegmentPublish(taskActionClient, action) + ); + + Assert.assertEquals(InsertLockPreemptedFault.instance(), e.getFault()); + } +} diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestTaskActionClient.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestTaskActionClient.java index 5022cfe9291..2e6ee4a9bc6 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestTaskActionClient.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestTaskActionClient.java @@ -27,16 +27,20 @@ import org.apache.druid.indexing.common.TimeChunkLock; import org.apache.druid.indexing.common.actions.LockListAction; import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction; import org.apache.druid.indexing.common.actions.SegmentAllocateAction; +import org.apache.druid.indexing.common.actions.SegmentTransactionalInsertAction; import org.apache.druid.indexing.common.actions.TaskAction; import org.apache.druid.indexing.common.actions.TaskActionClient; +import org.apache.druid.indexing.overlord.SegmentPublishResult; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.granularity.Granularity; import org.apache.druid.java.util.common.granularity.PeriodGranularity; import org.apache.druid.msq.indexing.error.InsertLockPreemptedFaultTest; import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; +import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.SegmentId; import org.joda.time.Interval; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; @@ -91,6 +95,10 @@ public class MSQTestTaskActionClient implements TaskActionClient )); } else if (taskAction instanceof RetrieveUsedSegmentsAction) { return (RetType) ImmutableSet.of(); + } else if (taskAction instanceof SegmentTransactionalInsertAction) { + // Always OK. + final Set segments = ((SegmentTransactionalInsertAction) taskAction).getSegments(); + return (RetType) SegmentPublishResult.ok(segments); } else { return null; }