diff --git a/docs/ingestion/tasks.md b/docs/ingestion/tasks.md index dbb4f2b65c7..1a6d58d0633 100644 --- a/docs/ingestion/tasks.md +++ b/docs/ingestion/tasks.md @@ -332,7 +332,7 @@ Each task type has a different default lock priority. The below table shows the |task type|default priority| |---------|----------------| |Realtime index task|75| -|Batch index task|50| +|Batch index tasks, including [native batch](native-batch.md), [SQL](../multi-stage-query/index.md), and [Hadoop-based](hadoop.md)|50| |Merge/Append/Compaction task|25| |Other tasks|0| diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java index 647a7567575..a4812292fb1 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java @@ -59,8 +59,9 @@ import org.apache.druid.indexing.common.actions.LockListAction; import org.apache.druid.indexing.common.actions.MarkSegmentsAsUnusedAction; import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction; import org.apache.druid.indexing.common.actions.SegmentAllocateAction; -import org.apache.druid.indexing.common.actions.SegmentInsertAction; import org.apache.druid.indexing.common.actions.SegmentTransactionalInsertAction; +import org.apache.druid.indexing.common.actions.TaskActionClient; +import org.apache.druid.indexing.overlord.SegmentPublishResult; import org.apache.druid.indexing.overlord.Segments; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.IAE; @@ -1090,30 +1091,17 @@ public class ControllerImpl implements Controller .submit(new MarkSegmentsAsUnusedAction(task.getDataSource(), interval)); } } else { - try { - context.taskActionClient() - .submit(SegmentTransactionalInsertAction.overwriteAction(null, segmentsToDrop, segments)); - } - catch (Exception e) { - if (isTaskLockPreemptedException(e)) { - throw new MSQException(e, InsertLockPreemptedFault.instance()); - } else { - throw e; - } - } + performSegmentPublish( + context.taskActionClient(), + SegmentTransactionalInsertAction.overwriteAction(null, segmentsToDrop, segments) + ); } } else if (!segments.isEmpty()) { // Append mode. - try { - context.taskActionClient().submit(new SegmentInsertAction(segments)); - } - catch (Exception e) { - if (isTaskLockPreemptedException(e)) { - throw new MSQException(e, InsertLockPreemptedFault.instance()); - } else { - throw e; - } - } + performSegmentPublish( + context.taskActionClient(), + SegmentTransactionalInsertAction.appendAction(segments, null, null) + ); } } @@ -1863,6 +1851,32 @@ public class ControllerImpl implements Controller return retVal; } + /** + * Performs a particular {@link SegmentTransactionalInsertAction}, publishing segments. + * + * Throws {@link MSQException} with {@link InsertLockPreemptedFault} if the action fails due to lock preemption. + */ + static void performSegmentPublish( + final TaskActionClient client, + final SegmentTransactionalInsertAction action + ) throws IOException + { + try { + final SegmentPublishResult result = client.submit(action); + + if (!result.isSuccess()) { + throw new MSQException(InsertLockPreemptedFault.instance()); + } + } + catch (Exception e) { + if (isTaskLockPreemptedException(e)) { + throw new MSQException(e, InsertLockPreemptedFault.instance()); + } else { + throw e; + } + } + } + /** * Method that determines whether an exception was raised due to the task lock for the controller task being * preempted. Uses string comparison, because the relevant Overlord APIs do not have a more reliable way of @@ -1874,6 +1888,9 @@ public class ControllerImpl implements Controller private static boolean isTaskLockPreemptedException(Exception e) { final String exceptionMsg = e.getMessage(); + if (exceptionMsg == null) { + return false; + } final List validExceptionExcerpts = ImmutableList.of( "are not covered by locks" /* From TaskLocks */, "is preempted and no longer valid" /* From SegmentAllocateAction */ diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java index c6fda274536..76d079348be 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java @@ -204,6 +204,12 @@ public class MSQControllerTask extends AbstractTask } } + @Override + public int getPriority() + { + return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_BATCH_INDEX_TASK_PRIORITY); + } + private static String getDataSourceForTaskMetadata(final MSQSpec querySpec) { final MSQDestination destination = querySpec.getDestination(); diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTask.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTask.java index 04eafc49cc4..e8117a015f5 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTask.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTask.java @@ -30,6 +30,7 @@ import org.apache.druid.indexing.common.TaskToolbox; import org.apache.druid.indexing.common.actions.TaskActionClient; import org.apache.druid.indexing.common.config.TaskConfig; import org.apache.druid.indexing.common.task.AbstractTask; +import org.apache.druid.indexing.common.task.Tasks; import org.apache.druid.msq.exec.MSQTasks; import org.apache.druid.msq.exec.Worker; import org.apache.druid.msq.exec.WorkerContext; @@ -112,4 +113,10 @@ public class MSQWorkerTask extends AbstractTask worker.stopGracefully(); } } + + @Override + public int getPriority() + { + return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_BATCH_INDEX_TASK_PRIORITY); + } } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/InsertLockPreemptedFault.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/InsertLockPreemptedFault.java index 83bd9ad8e62..c355dc3ff67 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/InsertLockPreemptedFault.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/InsertLockPreemptedFault.java @@ -32,8 +32,8 @@ public class InsertLockPreemptedFault extends BaseMSQFault { super( CODE, - "Insert lock preempted while trying to ingest the data." - + " This can occur if there are higher priority jobs like real-time ingestion running on same time chunks." + "Lock preempted while trying to ingest the data. This can occur if there are higher priority tasks, such as " + + "real-time ingestion, running on the same time chunks." ); } diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/ControllerImplTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/ControllerImplTest.java new file mode 100644 index 00000000000..b7b0c4b825f --- /dev/null +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/ControllerImplTest.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.msq.exec; + +import org.apache.druid.indexing.common.actions.SegmentTransactionalInsertAction; +import org.apache.druid.indexing.common.actions.TaskActionClient; +import org.apache.druid.indexing.overlord.SegmentPublishResult; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.msq.indexing.error.InsertLockPreemptedFault; +import org.apache.druid.msq.indexing.error.MSQException; +import org.easymock.EasyMock; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; +import java.util.Collections; + +public class ControllerImplTest +{ + @Test + public void test_performSegmentPublish_ok() throws IOException + { + final SegmentTransactionalInsertAction action = + SegmentTransactionalInsertAction.appendAction(Collections.emptySet(), null, null); + + final TaskActionClient taskActionClient = EasyMock.mock(TaskActionClient.class); + EasyMock.expect(taskActionClient.submit(action)).andReturn(SegmentPublishResult.ok(Collections.emptySet())); + EasyMock.replay(taskActionClient); + + // All OK. + ControllerImpl.performSegmentPublish(taskActionClient, action); + } + + @Test + public void test_performSegmentPublish_publishFail() throws IOException + { + final SegmentTransactionalInsertAction action = + SegmentTransactionalInsertAction.appendAction(Collections.emptySet(), null, null); + + final TaskActionClient taskActionClient = EasyMock.mock(TaskActionClient.class); + EasyMock.expect(taskActionClient.submit(action)).andReturn(SegmentPublishResult.fail("oops")); + EasyMock.replay(taskActionClient); + + final MSQException e = Assert.assertThrows( + MSQException.class, + () -> ControllerImpl.performSegmentPublish(taskActionClient, action) + ); + + Assert.assertEquals(InsertLockPreemptedFault.instance(), e.getFault()); + } + + @Test + public void test_performSegmentPublish_publishException() throws IOException + { + final SegmentTransactionalInsertAction action = + SegmentTransactionalInsertAction.appendAction(Collections.emptySet(), null, null); + + final TaskActionClient taskActionClient = EasyMock.mock(TaskActionClient.class); + EasyMock.expect(taskActionClient.submit(action)).andThrow(new ISE("oops")); + EasyMock.replay(taskActionClient); + + final ISE e = Assert.assertThrows( + ISE.class, + () -> ControllerImpl.performSegmentPublish(taskActionClient, action) + ); + + Assert.assertEquals("oops", e.getMessage()); + } + + @Test + public void test_performSegmentPublish_publishLockPreemptedException() throws IOException + { + final SegmentTransactionalInsertAction action = + SegmentTransactionalInsertAction.appendAction(Collections.emptySet(), null, null); + + final TaskActionClient taskActionClient = EasyMock.mock(TaskActionClient.class); + EasyMock.expect(taskActionClient.submit(action)).andThrow(new ISE("are not covered by locks")); + EasyMock.replay(taskActionClient); + + final MSQException e = Assert.assertThrows( + MSQException.class, + () -> ControllerImpl.performSegmentPublish(taskActionClient, action) + ); + + Assert.assertEquals(InsertLockPreemptedFault.instance(), e.getFault()); + } +} diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestTaskActionClient.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestTaskActionClient.java index 5022cfe9291..2e6ee4a9bc6 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestTaskActionClient.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestTaskActionClient.java @@ -27,16 +27,20 @@ import org.apache.druid.indexing.common.TimeChunkLock; import org.apache.druid.indexing.common.actions.LockListAction; import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction; import org.apache.druid.indexing.common.actions.SegmentAllocateAction; +import org.apache.druid.indexing.common.actions.SegmentTransactionalInsertAction; import org.apache.druid.indexing.common.actions.TaskAction; import org.apache.druid.indexing.common.actions.TaskActionClient; +import org.apache.druid.indexing.overlord.SegmentPublishResult; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.granularity.Granularity; import org.apache.druid.java.util.common.granularity.PeriodGranularity; import org.apache.druid.msq.indexing.error.InsertLockPreemptedFaultTest; import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; +import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.SegmentId; import org.joda.time.Interval; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; @@ -91,6 +95,10 @@ public class MSQTestTaskActionClient implements TaskActionClient )); } else if (taskAction instanceof RetrieveUsedSegmentsAction) { return (RetType) ImmutableSet.of(); + } else if (taskAction instanceof SegmentTransactionalInsertAction) { + // Always OK. + final Set segments = ((SegmentTransactionalInsertAction) taskAction).getSegments(); + return (RetType) SegmentPublishResult.ok(segments); } else { return null; }