diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerContext.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerContext.java index 42515a1779a..687660ba750 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerContext.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerContext.java @@ -21,6 +21,7 @@ package org.apache.druid.msq.exec; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.inject.Injector; +import org.apache.druid.indexing.common.TaskLockType; import org.apache.druid.indexing.common.actions.TaskActionClient; import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.java.util.emitter.service.ServiceEmitter; @@ -83,6 +84,11 @@ public interface ControllerContext */ TaskActionClient taskActionClient(); + /** + * Task lock type. + */ + TaskLockType taskLockType(); + /** * Provides services about workers: starting, canceling, obtaining status. * diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java index 019e998182f..96c46635662 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java @@ -927,7 +927,7 @@ public class ControllerImpl implements Controller destination, partitionBoundaries, keyReader, - MultiStageQueryContext.validateAndGetTaskLockType(QueryContext.of(querySpec.getQuery().getContext()), false), + context.taskLockType(), isStageOutputEmpty ); } @@ -1335,10 +1335,7 @@ public class ControllerImpl implements Controller (DataSourceMSQDestination) querySpec.getDestination(); final Set segmentsWithTombstones = new HashSet<>(segments); int numTombstones = 0; - final TaskLockType taskLockType = MultiStageQueryContext.validateAndGetTaskLockType( - QueryContext.of(querySpec.getQuery().getContext()), - destination.isReplaceTimeChunks() - ); + final TaskLockType taskLockType = context.taskLockType(); if (destination.isReplaceTimeChunks()) { final List intervalsToDrop = findIntervalsToDrop(Preconditions.checkNotNull(segments, "segments")); diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java index c148e7fc1bb..ca93c673a4b 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java @@ -24,6 +24,7 @@ import com.google.common.collect.ImmutableMap; import com.google.inject.Injector; import com.google.inject.Key; import org.apache.druid.guice.annotations.Self; +import org.apache.druid.indexing.common.TaskLockType; import org.apache.druid.indexing.common.TaskToolbox; import org.apache.druid.indexing.common.actions.TaskActionClient; import org.apache.druid.indexing.common.task.IndexTaskUtils; @@ -168,6 +169,12 @@ public class IndexerControllerContext implements ControllerContext return toolbox.getTaskActionClient(); } + @Override + public TaskLockType taskLockType() + { + return task.getTaskLockType(); + } + @Override public WorkerClient newWorkerClient() { diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java index 4ddc8274b9d..dc985e26fef 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java @@ -55,6 +55,7 @@ import org.apache.druid.msq.indexing.destination.MSQDestination; import org.apache.druid.msq.indexing.destination.TaskReportMSQDestination; import org.apache.druid.msq.util.MultiStageQueryContext; import org.apache.druid.query.QueryContext; +import org.apache.druid.query.QueryContexts; import org.apache.druid.rpc.ServiceClientFactory; import org.apache.druid.rpc.StandardRetryPolicy; import org.apache.druid.rpc.indexing.OverlordClient; @@ -234,8 +235,7 @@ public class MSQControllerTask extends AbstractTask implements ClientTaskQuery, { // If we're in replace mode, acquire locks for all intervals before declaring the task ready. if (isIngestion(querySpec) && ((DataSourceMSQDestination) querySpec.getDestination()).isReplaceTimeChunks()) { - final TaskLockType taskLockType = - MultiStageQueryContext.validateAndGetTaskLockType(QueryContext.of(querySpec.getQuery().getContext()), true); + final TaskLockType taskLockType = getTaskLockType(); final List intervals = ((DataSourceMSQDestination) querySpec.getDestination()).getReplaceTimeChunks(); log.debug( @@ -306,6 +306,26 @@ public class MSQControllerTask extends AbstractTask implements ClientTaskQuery, return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_BATCH_INDEX_TASK_PRIORITY); } + @Nullable + public TaskLockType getTaskLockType() + { + if (isIngestion(querySpec)) { + return MultiStageQueryContext.validateAndGetTaskLockType( + QueryContext.of( + // Use the task context and override with the query context + QueryContexts.override( + getContext(), + querySpec.getQuery().getContext() + ) + ), + ((DataSourceMSQDestination) querySpec.getDestination()).isReplaceTimeChunks() + ); + } else { + // Locks need to be acquired only if data is being ingested into a DataSource + return null; + } + } + private static String getDataSourceForTaskMetadata(final MSQSpec querySpec) { final MSQDestination destination = querySpec.getDestination(); diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQControllerTaskTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQControllerTaskTest.java index 8d974285fb5..467a5bab49f 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQControllerTaskTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQControllerTaskTest.java @@ -28,6 +28,7 @@ import org.apache.druid.indexing.common.TimeChunkLock; import org.apache.druid.indexing.common.actions.TaskAction; import org.apache.druid.indexing.common.actions.TaskActionClient; import org.apache.druid.indexing.common.actions.TimeChunkLockTryAcquireAction; +import org.apache.druid.indexing.common.task.Tasks; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.granularity.Granularities; @@ -46,85 +47,56 @@ import org.junit.Test; import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.Map; public class MSQControllerTaskTest { - private final List INTERVALS = - Collections.singletonList(Intervals.of( - "2011-04-01T00:00:00.000Z/2011-04-03T00:00:00.000Z")); + private static final List INTERVALS = Collections.singletonList( + Intervals.of("2011-04-01/2011-04-03") + ); - private final MSQSpec MSQ_SPEC = MSQSpec - .builder() - .destination(new DataSourceMSQDestination( - "target", - Granularities.DAY, - null, - INTERVALS, - null, - null - )) - .query(new Druids.ScanQueryBuilder() - .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST) - .intervals(new MultipleIntervalSegmentSpec(INTERVALS)) - .dataSource("target") - .build() - ) - .columnMappings(new ColumnMappings(ImmutableList.of(new ColumnMapping("a0", "cnt")))) - .tuningConfig(MSQTuningConfig.defaultConfig()) - .build(); + private static MSQSpec.Builder msqSpecBuilder() + { + return MSQSpec + .builder() + .destination( + new DataSourceMSQDestination("target", Granularities.DAY, null, INTERVALS, null, null) + ) + .query( + new Druids.ScanQueryBuilder() + .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST) + .intervals(new MultipleIntervalSegmentSpec(INTERVALS)) + .dataSource("target") + .build() + ) + .columnMappings(new ColumnMappings(ImmutableList.of(new ColumnMapping("a0", "cnt")))) + .tuningConfig(MSQTuningConfig.defaultConfig()); + } @Test public void testGetInputSourceResources() { - MSQControllerTask controllerTask = new MSQControllerTask( - null, - MSQ_SPEC, - null, - null, - null, - null, - null, - null - ); - Assert.assertTrue(controllerTask.getInputSourceResources().isEmpty()); + Assert.assertTrue(createControllerTask(msqSpecBuilder()).getInputSourceResources().isEmpty()); } @Test public void testGetDefaultLookupLoadingSpec() { - MSQControllerTask controllerTask = new MSQControllerTask( - null, - MSQ_SPEC, - null, - null, - null, - null, - null, - null - ); + MSQControllerTask controllerTask = createControllerTask(msqSpecBuilder()); Assert.assertEquals(LookupLoadingSpec.NONE, controllerTask.getLookupLoadingSpec()); } @Test public void testGetDefaultBroadcastDatasourceLoadingSpec() { - MSQControllerTask controllerTask = new MSQControllerTask( - null, - MSQ_SPEC, - null, - null, - null, - null, - null, - null - ); + MSQControllerTask controllerTask = createControllerTask(msqSpecBuilder()); Assert.assertEquals(BroadcastDatasourceLoadingSpec.NONE, controllerTask.getBroadcastDatasourceLoadingSpec()); } @Test public void testGetLookupLoadingSpecUsingLookupLoadingInfoInContext() { - MSQSpec build = MSQSpec + MSQSpec.Builder builder = MSQSpec .builder() .query(new Druids.ScanQueryBuilder() .intervals(new MultipleIntervalSegmentSpec(INTERVALS)) @@ -137,54 +109,83 @@ public class MSQControllerTaskTest .build() ) .columnMappings(new ColumnMappings(Collections.emptyList())) - .tuningConfig(MSQTuningConfig.defaultConfig()) - .build(); - MSQControllerTask controllerTask = new MSQControllerTask( - null, - build, - null, - null, - null, - null, - null, - null - ); + .tuningConfig(MSQTuningConfig.defaultConfig()); - // Va;idate that MSQ Controller task doesn't load any lookups even if context has lookup info populated. - Assert.assertEquals(LookupLoadingSpec.NONE, controllerTask.getLookupLoadingSpec()); + // Validate that MSQ Controller task doesn't load any lookups even if context has lookup info populated. + Assert.assertEquals(LookupLoadingSpec.NONE, createControllerTask(builder).getLookupLoadingSpec()); } @Test public void testGetTaskAllocatorId() { - final String taskId = "taskId"; - MSQControllerTask controllerTask = new MSQControllerTask( - taskId, - MSQ_SPEC, - null, - null, - null, - null, - null, - null + MSQControllerTask controllerTask = createControllerTask(msqSpecBuilder()); + Assert.assertEquals(controllerTask.getId(), controllerTask.getTaskAllocatorId()); + } + + @Test + public void testGetTaskLockType() + { + final DataSourceMSQDestination appendDestination + = new DataSourceMSQDestination("target", Granularities.DAY, null, null, null, null); + Assert.assertEquals( + TaskLockType.SHARED, + createControllerTask(msqSpecBuilder().destination(appendDestination)).getTaskLockType() + ); + + final DataSourceMSQDestination replaceDestination + = new DataSourceMSQDestination("target", Granularities.DAY, null, INTERVALS, null, null); + Assert.assertEquals( + TaskLockType.EXCLUSIVE, + createControllerTask(msqSpecBuilder().destination(replaceDestination)).getTaskLockType() + ); + + // With 'useConcurrentLocks' in task context + final Map taskContext = Collections.singletonMap(Tasks.USE_CONCURRENT_LOCKS, true); + final MSQControllerTask appendTaskWithContext = new MSQControllerTask( + null, + msqSpecBuilder().destination(appendDestination).build(), + null, + null, + null, + null, + null, + taskContext + ); + Assert.assertEquals(TaskLockType.APPEND, appendTaskWithContext.getTaskLockType()); + + final MSQControllerTask replaceTaskWithContext = new MSQControllerTask( + null, + msqSpecBuilder().destination(replaceDestination).build(), + null, + null, + null, + null, + null, + taskContext + ); + Assert.assertEquals(TaskLockType.REPLACE, replaceTaskWithContext.getTaskLockType()); + + // With 'useConcurrentLocks' in query context + final Map queryContext = Collections.singletonMap(Tasks.USE_CONCURRENT_LOCKS, true); + final ScanQuery query = new Druids.ScanQueryBuilder() + .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST) + .intervals(new MultipleIntervalSegmentSpec(INTERVALS)) + .dataSource("target") + .context(queryContext) + .build(); + Assert.assertEquals( + TaskLockType.APPEND, + createControllerTask(msqSpecBuilder().query(query).destination(appendDestination)).getTaskLockType() + ); + Assert.assertEquals( + TaskLockType.REPLACE, + createControllerTask(msqSpecBuilder().query(query).destination(replaceDestination)).getTaskLockType() ); - Assert.assertEquals(taskId, controllerTask.getTaskAllocatorId()); } @Test public void testIsReady() throws Exception { - final String taskId = "taskId"; - MSQControllerTask controllerTask = new MSQControllerTask( - taskId, - MSQ_SPEC, - null, - null, - null, - null, - null, - null - ); TestTaskActionClient taskActionClient = new TestTaskActionClient( new TimeChunkLock( TaskLockType.REPLACE, @@ -195,24 +196,14 @@ public class MSQControllerTaskTest 0 ) ); - Assert.assertTrue(controllerTask.isReady(taskActionClient)); + Assert.assertTrue(createControllerTask(msqSpecBuilder()).isReady(taskActionClient)); } @Test public void testIsReadyWithRevokedLock() { - final String taskId = "taskId"; - MSQControllerTask controllerTask = new MSQControllerTask( - taskId, - MSQ_SPEC, - null, - null, - null, - null, - null, - null - ); - TestTaskActionClient taskActionClient = new TestTaskActionClient( + MSQControllerTask controllerTask = createControllerTask(msqSpecBuilder()); + TaskActionClient taskActionClient = new TestTaskActionClient( new TimeChunkLock( TaskLockType.REPLACE, "groupId", @@ -225,10 +216,17 @@ public class MSQControllerTaskTest ); DruidException exception = Assert.assertThrows( DruidException.class, - () -> controllerTask.isReady(taskActionClient)); + () -> controllerTask.isReady(taskActionClient) + ); Assert.assertEquals( "Lock of type[REPLACE] for interval[2011-04-01T00:00:00.000Z/2011-04-03T00:00:00.000Z] was revoked", - exception.getMessage()); + exception.getMessage() + ); + } + + private static MSQControllerTask createControllerTask(MSQSpec.Builder specBuilder) + { + return new MSQControllerTask("controller_1", specBuilder.build(), null, null, null, null, null, null, null); } private static class TestTaskActionClient implements TaskActionClient diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java index a7ec6054b56..970d873c96c 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java @@ -37,6 +37,7 @@ import org.apache.druid.indexer.TaskLocation; import org.apache.druid.indexer.TaskState; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexer.TaskStatusPlus; +import org.apache.druid.indexing.common.TaskLockType; import org.apache.druid.indexing.common.actions.TaskActionClient; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.ISE; @@ -55,6 +56,7 @@ import org.apache.druid.msq.exec.WorkerMemoryParameters; import org.apache.druid.msq.exec.WorkerStorageParameters; import org.apache.druid.msq.indexing.IndexerControllerContext; import org.apache.druid.msq.indexing.IndexerTableInputSpecSlicer; +import org.apache.druid.msq.indexing.MSQControllerTask; import org.apache.druid.msq.indexing.MSQSpec; import org.apache.druid.msq.indexing.MSQWorkerTask; import org.apache.druid.msq.indexing.MSQWorkerTaskLauncher; @@ -107,6 +109,7 @@ public class MSQTestControllerContext implements ControllerContext private Controller controller; private final WorkerMemoryParameters workerMemoryParameters; private final QueryContext queryContext; + private final MSQControllerTask controllerTask; public MSQTestControllerContext( ObjectMapper mapper, @@ -114,7 +117,7 @@ public class MSQTestControllerContext implements ControllerContext TaskActionClient taskActionClient, WorkerMemoryParameters workerMemoryParameters, List loadedSegments, - QueryContext queryContext + MSQControllerTask controllerTask ) { this.mapper = mapper; @@ -134,7 +137,8 @@ public class MSQTestControllerContext implements ControllerContext .collect(Collectors.toList()) ); this.workerMemoryParameters = workerMemoryParameters; - this.queryContext = queryContext; + this.controllerTask = controllerTask; + this.queryContext = controllerTask.getQuerySpec().getQuery().context(); } OverlordClient overlordClient = new NoopOverlordClient() @@ -322,6 +326,12 @@ public class MSQTestControllerContext implements ControllerContext return taskActionClient; } + @Override + public TaskLockType taskLockType() + { + return controllerTask.getTaskLockType(); + } + @Override public InputSpecSlicer newTableInputSpecSlicer(WorkerManager workerManager) { diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestOverlordServiceClient.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestOverlordServiceClient.java index a565283154f..6a7db8aa5b6 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestOverlordServiceClient.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestOverlordServiceClient.java @@ -103,7 +103,7 @@ public class MSQTestOverlordServiceClient extends NoopOverlordClient taskActionClient, workerMemoryParameters, loadedSegmentMetadata, - cTask.getQuerySpec().getQuery().context() + cTask ); inMemoryControllerTask.put(cTask.getId(), cTask);