diff --git a/.idea/misc.xml b/.idea/misc.xml index bf2061d7392..e66748b7823 100644 --- a/.idea/misc.xml +++ b/.idea/misc.xml @@ -31,6 +31,7 @@ + @@ -46,7 +47,7 @@ @@ -84,7 +92,7 @@ - + - + \ No newline at end of file diff --git a/extensions-core/azure-extensions/src/main/java/org/apache/druid/data/input/azure/AzureInputSource.java b/extensions-core/azure-extensions/src/main/java/org/apache/druid/data/input/azure/AzureInputSource.java index bd1db5c1f02..f68ddfa9011 100644 --- a/extensions-core/azure-extensions/src/main/java/org/apache/druid/data/input/azure/AzureInputSource.java +++ b/extensions-core/azure-extensions/src/main/java/org/apache/druid/data/input/azure/AzureInputSource.java @@ -21,6 +21,7 @@ package org.apache.druid.data.input.azure; import com.fasterxml.jackson.annotation.JacksonInject; import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; import com.google.common.collect.Iterators; @@ -35,12 +36,15 @@ import org.apache.druid.storage.azure.AzureCloudBlobIterableFactory; import org.apache.druid.storage.azure.AzureInputDataConfig; import org.apache.druid.storage.azure.AzureStorage; +import javax.annotation.Nonnull; import javax.annotation.Nullable; import java.net.URI; import java.net.URISyntaxException; +import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Objects; +import java.util.Set; /** * Abstracts the Azure storage system where input data is stored. Allows users to retrieve entities in @@ -77,6 +81,14 @@ public class AzureInputSource extends CloudObjectInputSource this.inputDataConfig = Preconditions.checkNotNull(inputDataConfig, "AzureInputDataConfig"); } + @JsonIgnore + @Nonnull + @Override + public Set getTypes() + { + return Collections.singleton(SCHEME); + } + @Override public SplittableInputSource> withSplit(InputSplit> split) { diff --git a/extensions-core/azure-extensions/src/test/java/org/apache/druid/data/input/azure/AzureInputSourceTest.java b/extensions-core/azure-extensions/src/test/java/org/apache/druid/data/input/azure/AzureInputSourceTest.java index a8fc402eb87..eefa5bfcb90 100644 --- a/extensions-core/azure-extensions/src/test/java/org/apache/druid/data/input/azure/AzureInputSourceTest.java +++ b/extensions-core/azure-extensions/src/test/java/org/apache/druid/data/input/azure/AzureInputSourceTest.java @@ -20,6 +20,7 @@ package org.apache.druid.data.input.azure; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterators; import nl.jqno.equalsverifier.EqualsVerifier; import org.apache.druid.data.input.InputSplit; @@ -265,6 +266,23 @@ public class AzureInputSourceTest extends EasyMockSupport Assert.assertEquals("AzureInputSource{uris=[], prefixes=[azure://container/blob], objects=[], objectGlob=null}", actualToString); } + @Test + public void test_getTypes_returnsExpectedTypes() + { + List prefixes = ImmutableList.of(PREFIX_URI); + azureInputSource = new AzureInputSource( + storage, + entityFactory, + azureCloudBlobIterableFactory, + inputDataConfig, + EMPTY_URIS, + prefixes, + EMPTY_OBJECTS, + null + ); + Assert.assertEquals(ImmutableSet.of(AzureInputSource.SCHEME), azureInputSource.getTypes()); + } + @Test public void abidesEqualsContract() { diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSource.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSource.java index 55f6d173ec6..3e2e50d6bc7 100644 --- a/extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSource.java +++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSource.java @@ -21,6 +21,7 @@ package org.apache.druid.data.input.google; import com.fasterxml.jackson.annotation.JacksonInject; import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.api.services.storage.model.StorageObject; import com.google.common.collect.Iterators; @@ -36,15 +37,19 @@ import org.apache.druid.storage.google.GoogleStorage; import org.apache.druid.storage.google.GoogleStorageDruidModule; import org.apache.druid.storage.google.GoogleUtils; +import javax.annotation.Nonnull; import javax.annotation.Nullable; import java.io.IOException; import java.math.BigInteger; import java.net.URI; +import java.util.Collections; import java.util.Iterator; import java.util.List; +import java.util.Set; public class GoogleCloudStorageInputSource extends CloudObjectInputSource { + static final String TYPE_KEY = GoogleStorageDruidModule.SCHEME; private static final Logger LOG = new Logger(GoogleCloudStorageInputSource.class); private final GoogleStorage storage; @@ -65,6 +70,14 @@ public class GoogleCloudStorageInputSource extends CloudObjectInputSource this.inputDataConfig = inputDataConfig; } + @JsonIgnore + @Nonnull + @Override + public Set getTypes() + { + return Collections.singleton(TYPE_KEY); + } + @Override protected InputEntity createEntity(CloudObjectLocation location) { diff --git a/extensions-core/google-extensions/src/test/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSourceTest.java b/extensions-core/google-extensions/src/test/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSourceTest.java index 41308e54c28..404fd45d4d0 100644 --- a/extensions-core/google-extensions/src/test/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSourceTest.java +++ b/extensions-core/google-extensions/src/test/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSourceTest.java @@ -153,6 +153,14 @@ public class GoogleCloudStorageInputSourceTest extends InitializedNullHandlingTe Assert.assertEquals(withObjects, serdeWithObjects); } + @Test + public void testGetTypes() + { + final GoogleCloudStorageInputSource inputSource = + new GoogleCloudStorageInputSource(STORAGE, INPUT_DATA_CONFIG, EXPECTED_URIS, ImmutableList.of(), null, null); + Assert.assertEquals(Collections.singleton(GoogleCloudStorageInputSource.TYPE_KEY), inputSource.getTypes()); + } + @Test public void testWithUrisSplit() throws Exception { diff --git a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputSource.java b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputSource.java index 7faebccf268..9e76a69ec3b 100644 --- a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputSource.java +++ b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputSource.java @@ -21,6 +21,7 @@ package org.apache.druid.inputsource.hdfs; import com.fasterxml.jackson.annotation.JacksonInject; import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; @@ -38,6 +39,7 @@ import org.apache.druid.data.input.impl.SplittableInputSource; import org.apache.druid.guice.Hdfs; import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.storage.hdfs.HdfsStorageDruidModule; import org.apache.druid.utils.Streams; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; @@ -48,6 +50,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileSplit; +import javax.annotation.Nonnull; import javax.annotation.Nullable; import java.io.File; import java.io.IOException; @@ -56,11 +59,13 @@ import java.util.Collection; import java.util.Collections; import java.util.Iterator; import java.util.List; +import java.util.Set; import java.util.stream.Collectors; import java.util.stream.Stream; public class HdfsInputSource extends AbstractInputSource implements SplittableInputSource> { + static final String TYPE_KEY = HdfsStorageDruidModule.SCHEME; private static final String PROP_PATHS = "paths"; private final List inputPaths; @@ -91,6 +96,14 @@ public class HdfsInputSource extends AbstractInputSource implements SplittableIn this.inputPaths.forEach(p -> verifyProtocol(configuration, inputSourceConfig, p)); } + @JsonIgnore + @Nonnull + @Override + public Set getTypes() + { + return Collections.singleton(TYPE_KEY); + } + public static List coerceInputPathsToList(Object inputPaths, String propertyName) { if (inputPaths instanceof String) { diff --git a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsStorageDruidModule.java b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsStorageDruidModule.java index 68c7650960b..a1443ea50b7 100644 --- a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsStorageDruidModule.java +++ b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsStorageDruidModule.java @@ -50,7 +50,7 @@ import java.util.Properties; */ public class HdfsStorageDruidModule implements DruidModule { - static final String SCHEME = "hdfs"; + public static final String SCHEME = "hdfs"; private Properties props = null; @Inject diff --git a/extensions-core/hdfs-storage/src/test/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceTest.java b/extensions-core/hdfs-storage/src/test/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceTest.java index e1f112ad008..879e49f0c7e 100644 --- a/extensions-core/hdfs-storage/src/test/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceTest.java +++ b/extensions-core/hdfs-storage/src/test/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceTest.java @@ -163,6 +163,20 @@ public class HdfsInputSourceTest extends InitializedNullHandlingTest .inputSourceConfig(DEFAULT_INPUT_SOURCE_CONFIG) .build(); } + + @Test + public void testGetTypes() + { + final Configuration conf = new Configuration(); + conf.set("fs.default.name", "hdfs://localhost:7020"); + HdfsInputSource inputSource = HdfsInputSource.builder() + .paths("/foo/bar*") + .configuration(conf) + .inputSourceConfig(DEFAULT_INPUT_SOURCE_CONFIG) + .build(); + + Assert.assertEquals(Collections.singleton(HdfsInputSource.TYPE_KEY), inputSource.getTypes()); + } } public static class SerializeDeserializeTest diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java index 700d2d12d91..8ba36124c29 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java @@ -21,6 +21,7 @@ package org.apache.druid.indexing.kafka; import com.fasterxml.jackson.annotation.JacksonInject; import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; @@ -30,12 +31,20 @@ import org.apache.druid.indexing.common.task.TaskResource; import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask; import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner; import org.apache.druid.segment.indexing.DataSchema; +import org.apache.druid.server.security.Action; +import org.apache.druid.server.security.Resource; +import org.apache.druid.server.security.ResourceAction; +import org.apache.druid.server.security.ResourceType; +import javax.annotation.Nonnull; +import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.Set; public class KafkaIndexTask extends SeekableStreamIndexTask { + public static final String INPUT_SOURCE_TYPE = "kafka"; private static final String TYPE = "index_kafka"; private final ObjectMapper configMapper; @@ -132,6 +141,17 @@ public class KafkaIndexTask extends SeekableStreamIndexTask getInputSourceResources() + { + return Collections.singleton(new ResourceAction( + new Resource(ResourceType.EXTERNAL, INPUT_SOURCE_TYPE), + Action.READ + )); + } + @Override public boolean supportsQueries() { diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java index 55f69e1cdae..f2bee7f431a 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java @@ -21,6 +21,7 @@ package org.apache.druid.indexing.kafka.supervisor; import com.fasterxml.jackson.annotation.JacksonInject; import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.druid.guice.annotations.Json; @@ -35,13 +36,20 @@ import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.segment.incremental.RowIngestionMetersFactory; import org.apache.druid.segment.indexing.DataSchema; import org.apache.druid.server.metrics.DruidMonitorSchedulerConfig; +import org.apache.druid.server.security.Action; +import org.apache.druid.server.security.Resource; +import org.apache.druid.server.security.ResourceAction; +import org.apache.druid.server.security.ResourceType; +import javax.annotation.Nonnull; import javax.annotation.Nullable; +import java.util.Collections; import java.util.Map; +import java.util.Set; public class KafkaSupervisorSpec extends SeekableStreamSupervisorSpec { - private static final String TASK_TYPE = "kafka"; + static final String TASK_TYPE = "kafka"; @JsonCreator public KafkaSupervisorSpec( @@ -92,6 +100,17 @@ public class KafkaSupervisorSpec extends SeekableStreamSupervisorSpec return TASK_TYPE; } + @Nonnull + @JsonIgnore + @Override + public Set getInputSourceTypes() + { + return Collections.singleton(new ResourceAction( + new Resource(ResourceType.EXTERNAL, TASK_TYPE), + Action.READ + )); + } + @Override public String getSource() { diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java index 0ba17cdc388..42f2154f208 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java @@ -111,6 +111,10 @@ import org.apache.druid.segment.indexing.DataSchema; import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec; import org.apache.druid.segment.transform.ExpressionTransform; import org.apache.druid.segment.transform.TransformSpec; +import org.apache.druid.server.security.Action; +import org.apache.druid.server.security.Resource; +import org.apache.druid.server.security.ResourceAction; +import org.apache.druid.server.security.ResourceType; import org.apache.druid.timeline.DataSegment; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; @@ -2672,6 +2676,45 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase Assert.assertEquals(task, task1); } + @Test + public void testCorrectInputSources() throws Exception + { + // This is both a serde test and a regression test for https://github.com/apache/druid/issues/7724. + + final KafkaIndexTask task = createTask( + "taskid", + NEW_DATA_SCHEMA.withTransformSpec( + new TransformSpec( + null, + ImmutableList.of(new ExpressionTransform("beep", "nofunc()", ExprMacroTable.nil())) + ) + ), + new KafkaIndexTaskIOConfig( + 0, + "sequence", + new SeekableStreamStartSequenceNumbers<>(topic, ImmutableMap.of(), ImmutableSet.of()), + new SeekableStreamEndSequenceNumbers<>(topic, ImmutableMap.of()), + ImmutableMap.of(), + KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS, + true, + null, + null, + INPUT_FORMAT, + null + ) + ); + + Assert.assertEquals( + Collections.singleton( + new ResourceAction(new Resource( + ResourceType.EXTERNAL, + KafkaIndexTask.INPUT_SOURCE_TYPE + ), Action.READ)), + task.getInputSourceResources() + ); + } + + /** * Wait for a task to consume certain offsets (inclusive). */ diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java index 28b915d22cd..09a8a7a645f 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java @@ -91,6 +91,10 @@ import org.apache.druid.segment.realtime.FireDepartment; import org.apache.druid.server.metrics.DruidMonitorSchedulerConfig; import org.apache.druid.server.metrics.ExceptionCapturingServiceEmitter; import org.apache.druid.server.metrics.NoopServiceEmitter; +import org.apache.druid.server.security.Action; +import org.apache.druid.server.security.Resource; +import org.apache.druid.server.security.ResourceAction; +import org.apache.druid.server.security.ResourceType; import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.NewPartitions; import org.apache.kafka.clients.admin.NewTopic; @@ -433,6 +437,13 @@ public class KafkaSupervisorTest extends EasyMockSupport Long.MAX_VALUE, (long) taskConfig.getEndSequenceNumbers().getPartitionSequenceNumberMap().get(2) ); + Assert.assertEquals( + Collections.singleton(new ResourceAction( + new Resource(ResourceType.EXTERNAL, KafkaSupervisorSpec.TASK_TYPE), + Action.READ + )), + testableSupervisorSpec.getInputSourceTypes() + ); autoscaler.reset(); autoscaler.stop(); diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java index d8fbd6a36bb..dc3d64af362 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java @@ -21,6 +21,7 @@ package org.apache.druid.indexing.kinesis; import com.fasterxml.jackson.annotation.JacksonInject; import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; @@ -34,12 +35,20 @@ import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask; import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.segment.indexing.DataSchema; +import org.apache.druid.server.security.Action; +import org.apache.druid.server.security.Resource; +import org.apache.druid.server.security.ResourceAction; +import org.apache.druid.server.security.ResourceType; import org.apache.druid.utils.RuntimeInfo; +import javax.annotation.Nonnull; +import java.util.Collections; import java.util.Map; +import java.util.Set; public class KinesisIndexTask extends SeekableStreamIndexTask { + public static final String INPUT_SOURCE_TYPE = "kinesis"; private static final String TYPE = "index_kinesis"; private static final Logger log = new Logger(KinesisIndexTask.class); @@ -147,6 +156,17 @@ public class KinesisIndexTask extends SeekableStreamIndexTask getInputSourceResources() + { + return Collections.singleton(new ResourceAction( + new Resource(ResourceType.EXTERNAL, INPUT_SOURCE_TYPE), + Action.READ + )); + } + @Override public boolean supportsQueries() { diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorSpec.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorSpec.java index f210ca69760..026e4ac4fb5 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorSpec.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorSpec.java @@ -21,6 +21,7 @@ package org.apache.druid.indexing.kinesis.supervisor; import com.fasterxml.jackson.annotation.JacksonInject; import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.inject.name.Named; @@ -38,13 +39,20 @@ import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.segment.incremental.RowIngestionMetersFactory; import org.apache.druid.segment.indexing.DataSchema; import org.apache.druid.server.metrics.DruidMonitorSchedulerConfig; +import org.apache.druid.server.security.Action; +import org.apache.druid.server.security.Resource; +import org.apache.druid.server.security.ResourceAction; +import org.apache.druid.server.security.ResourceType; +import javax.annotation.Nonnull; import javax.annotation.Nullable; +import java.util.Collections; import java.util.Map; +import java.util.Set; public class KinesisSupervisorSpec extends SeekableStreamSupervisorSpec { - private static final String SUPERVISOR_TYPE = "kinesis"; + static final String SUPERVISOR_TYPE = "kinesis"; private final AWSCredentialsConfig awsCredentialsConfig; @JsonCreator @@ -114,6 +122,17 @@ public class KinesisSupervisorSpec extends SeekableStreamSupervisorSpec return SUPERVISOR_TYPE; } + @Nonnull + @JsonIgnore + @Override + public Set getInputSourceTypes() + { + return Collections.singleton(new ResourceAction( + new Resource(ResourceType.EXTERNAL, SUPERVISOR_TYPE), + Action.READ + )); + } + @Override public String getSource() { diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskSerdeTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskSerdeTest.java index 9d9b3dd417d..097a7784a89 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskSerdeTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskSerdeTest.java @@ -36,6 +36,10 @@ import org.apache.druid.segment.indexing.DataSchema; import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager; import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider; import org.apache.druid.segment.realtime.firehose.NoopChatHandlerProvider; +import org.apache.druid.server.security.Action; +import org.apache.druid.server.security.Resource; +import org.apache.druid.server.security.ResourceAction; +import org.apache.druid.server.security.ResourceType; import org.junit.Assert; import org.junit.Rule; import org.junit.Test; @@ -127,6 +131,14 @@ public class KinesisIndexTaskSerdeTest Assert.assertEquals(ACCESS_KEY, awsCredentialsConfig.getAccessKey().getPassword()); Assert.assertEquals(SECRET_KEY, awsCredentialsConfig.getSecretKey().getPassword()); Assert.assertEquals(FILE_SESSION_CREDENTIALS, awsCredentialsConfig.getFileSessionCredentials()); + Assert.assertEquals( + Collections.singleton( + new ResourceAction(new Resource( + ResourceType.EXTERNAL, + KinesisIndexTask.INPUT_SOURCE_TYPE + ), Action.READ)), + target.getInputSourceResources() + ); } private static ObjectMapper createObjectMapper() diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java index 265a9fc144f..079e5314e80 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java @@ -88,6 +88,10 @@ import org.apache.druid.segment.realtime.FireDepartment; import org.apache.druid.server.metrics.DruidMonitorSchedulerConfig; import org.apache.druid.server.metrics.ExceptionCapturingServiceEmitter; import org.apache.druid.server.metrics.NoopServiceEmitter; +import org.apache.druid.server.security.Action; +import org.apache.druid.server.security.Resource; +import org.apache.druid.server.security.ResourceAction; +import org.apache.druid.server.security.ResourceType; import org.easymock.Capture; import org.easymock.CaptureType; import org.easymock.EasyMock; @@ -4080,6 +4084,59 @@ public class KinesisSupervisorTest extends EasyMockSupport testShardSplitPhaseThree(phaseTwoTasks); } + @Test + public void testCorrectInputSources() + { + KinesisSupervisorSpec supervisorSpec = new KinesisSupervisorSpec( + null, + dataSchema, + null, + new KinesisSupervisorIOConfig( + STREAM, + null, + null, + null, + null, + null, + null, + null, + null, + true, + null, + null, + null, + null, + null, + null, + null, + null, + null, + false + ), + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null + ); + + Assert.assertEquals( + Collections.singleton( + new ResourceAction( + new Resource(ResourceType.EXTERNAL, KinesisSupervisorSpec.SUPERVISOR_TYPE), + Action.READ + )), + supervisorSpec.getInputSourceTypes() + ); + } + private List testShardSplitPhaseOne() throws Exception { supervisorRecordSupplier.assign(EasyMock.anyObject()); diff --git a/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSource.java b/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSource.java index 7a52b672f71..106d33216f2 100644 --- a/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSource.java +++ b/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSource.java @@ -30,6 +30,7 @@ import com.amazonaws.services.securitytoken.AWSSecurityTokenService; import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClientBuilder; import com.fasterxml.jackson.annotation.JacksonInject; import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.annotations.VisibleForTesting; @@ -56,13 +57,16 @@ import org.apache.druid.storage.s3.ServerSideEncryptingAmazonS3; import javax.annotation.Nonnull; import javax.annotation.Nullable; import java.net.URI; +import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Objects; +import java.util.Set; import java.util.UUID; public class S3InputSource extends CloudObjectInputSource { + public static final String TYPE_KEY = S3StorageDruidModule.SCHEME; // We lazily initialize ServerSideEncryptingAmazonS3 to avoid costly s3 operation when we only need S3InputSource // for stored information (such as for task logs) and not for ingestion. // (This cost only applies for new ServerSideEncryptingAmazonS3 created with s3InputSourceConfig given). @@ -236,6 +240,14 @@ public class S3InputSource extends CloudObjectInputSource this.maxRetries = maxRetries; } + @JsonIgnore + @Nonnull + @Override + public Set getTypes() + { + return Collections.singleton(TYPE_KEY); + } + private void applyAssumeRole( ServerSideEncryptingAmazonS3.Builder s3ClientBuilder, S3InputSourceConfig s3InputSourceConfig, diff --git a/extensions-core/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceTest.java b/extensions-core/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceTest.java index c742af8e5f7..fc538b682fc 100644 --- a/extensions-core/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceTest.java +++ b/extensions-core/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceTest.java @@ -336,6 +336,25 @@ public class S3InputSourceTest extends InitializedNullHandlingTest EasyMock.verify(SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER); } + @Test + public void testGetTypes() + { + final S3InputSource inputSource = new S3InputSource( + SERVICE, + SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER, + INPUT_DATA_CONFIG, + EXPECTED_URIS, + null, + null, + null, + null, + null, + null, + null + ); + Assert.assertEquals(Collections.singleton(S3InputSource.TYPE_KEY), inputSource.getTypes()); + } + @Test public void testS3InputSourceUseEndPointClientProxy() { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java index 8387d137f11..0428b8e42f6 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java @@ -30,6 +30,7 @@ import com.google.common.base.Preconditions; import com.google.common.collect.BiMap; import com.google.common.collect.HashBiMap; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import org.apache.curator.shaded.com.google.common.base.Verify; @@ -90,6 +91,7 @@ import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager; import org.apache.druid.segment.transform.TransformSpec; import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory; import org.apache.druid.server.coordinator.duty.CompactSegments; +import org.apache.druid.server.security.ResourceAction; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.SegmentTimeline; import org.apache.druid.timeline.TimelineObjectHolder; @@ -395,6 +397,14 @@ public class CompactionTask extends AbstractBatchIndexTask return TYPE; } + @Nonnull + @JsonIgnore + @Override + public Set getInputSourceResources() + { + return ImmutableSet.of(); + } + @Override public int getPriority() { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java index e2b5bd99177..d81167fddf6 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java @@ -67,11 +67,15 @@ import org.apache.druid.segment.realtime.firehose.ChatHandler; import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider; import org.apache.druid.server.security.Action; import org.apache.druid.server.security.AuthorizerMapper; +import org.apache.druid.server.security.Resource; +import org.apache.druid.server.security.ResourceAction; +import org.apache.druid.server.security.ResourceType; import org.apache.druid.timeline.DataSegment; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.util.ToolRunner; import org.joda.time.Interval; +import javax.annotation.Nonnull; import javax.annotation.Nullable; import javax.servlet.http.HttpServletRequest; import javax.ws.rs.GET; @@ -85,9 +89,11 @@ import java.io.File; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.stream.Collectors; public class HadoopIndexTask extends HadoopTask implements ChatHandler @@ -193,6 +199,14 @@ public class HadoopIndexTask extends HadoopTask implements ChatHandler return "index_hadoop"; } + @Nonnull + @JsonIgnore + @Override + public Set getInputSourceResources() + { + return Collections.singleton(new ResourceAction(new Resource(ResourceType.EXTERNAL, "hadoop"), Action.READ)); + } + @Override public boolean isReady(TaskActionClient taskActionClient) throws Exception { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java index 31acf6e6b6a..b4d47080828 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java @@ -20,6 +20,7 @@ package org.apache.druid.indexing.common.task; import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonInclude.Include; import com.fasterxml.jackson.annotation.JsonProperty; @@ -29,6 +30,7 @@ import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; import com.google.common.hash.HashFunction; import com.google.common.hash.Hashing; import com.google.common.util.concurrent.ListenableFuture; @@ -97,6 +99,9 @@ import org.apache.druid.segment.realtime.firehose.ChatHandler; import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory; import org.apache.druid.server.security.Action; import org.apache.druid.server.security.AuthorizerMapper; +import org.apache.druid.server.security.Resource; +import org.apache.druid.server.security.ResourceAction; +import org.apache.druid.server.security.ResourceType; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec; import org.apache.druid.timeline.partition.NumberedShardSpec; @@ -130,6 +135,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.function.Function; import java.util.function.Predicate; +import java.util.stream.Collectors; public class IndexTask extends AbstractBatchIndexTask implements ChatHandler { @@ -288,6 +294,22 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler } } + @Nonnull + @JsonIgnore + @Override + public Set getInputSourceResources() + { + if (ingestionSchema.getIOConfig().firehoseFactory != null) { + throw getInputSecurityOnFirehoseUnsupportedError(); + } + return getIngestionSchema().getIOConfig().getInputSource() != null ? + getIngestionSchema().getIOConfig().getInputSource().getTypes() + .stream() + .map(i -> new ResourceAction(new Resource(ResourceType.EXTERNAL, i), Action.READ)) + .collect(Collectors.toSet()) : + ImmutableSet.of(); + } + @GET @Path("/unparseableEvents") @Produces(MediaType.APPLICATION_JSON) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NoopTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NoopTask.java index bce3eb41839..e4c112100e4 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NoopTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NoopTask.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableSet; import org.apache.druid.data.input.Firehose; import org.apache.druid.data.input.FirehoseFactory; import org.apache.druid.indexer.TaskStatus; @@ -33,9 +34,12 @@ import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.server.security.ResourceAction; +import javax.annotation.Nonnull; import java.util.HashMap; import java.util.Map; +import java.util.Set; import java.util.UUID; /** @@ -100,6 +104,14 @@ public class NoopTask extends AbstractTask return "noop"; } + @Nonnull + @JsonIgnore + @Override + public Set getInputSourceResources() + { + return ImmutableSet.of(); + } + @JsonProperty public long getRunTime() { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java index 9a9d49670d6..1cc2d329c88 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java @@ -19,6 +19,7 @@ package org.apache.druid.indexing.common.task; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonSubTypes; import com.fasterxml.jackson.annotation.JsonSubTypes.Type; import com.fasterxml.jackson.annotation.JsonTypeInfo; @@ -36,10 +37,15 @@ import org.apache.druid.indexing.common.task.batch.parallel.PartialGenericSegmen import org.apache.druid.indexing.common.task.batch.parallel.PartialHashSegmentGenerateTask; import org.apache.druid.indexing.common.task.batch.parallel.PartialRangeSegmentGenerateTask; import org.apache.druid.indexing.common.task.batch.parallel.SinglePhaseSubTask; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.UOE; import org.apache.druid.query.Query; import org.apache.druid.query.QueryRunner; +import org.apache.druid.server.security.ResourceAction; +import javax.annotation.Nonnull; import java.util.Map; +import java.util.Set; /** * Represents a task that can run on a worker. The general contracts surrounding Tasks are: @@ -138,6 +144,33 @@ public interface Task */ String getDataSource(); + /** + * @return The types of {@link org.apache.druid.data.input.InputSource} that the task uses. Empty set is returned if + * the task does not use any. Users can be given permission to access particular types of + * input sources but not others, using the + * {@link org.apache.druid.server.security.AuthConfig#enableInputSourceSecurity} config. + * @throws UnsupportedOperationException if the given task type does not suppoert input source based security. Such + * would be the case, if the task uses firehose. + */ + @JsonIgnore + @Nonnull + default Set getInputSourceResources() throws UOE + { + throw new UOE(StringUtils.format( + "Task type [%s], does not support input source based security", + getType() + )); + } + + default UOE getInputSecurityOnFirehoseUnsupportedError() + { + throw new UOE(StringUtils.format( + "Input source based security cannot be performed '%s' task because it uses firehose." + + " Change the tasks configuration, or disable `isEnableInputSourceSecurity`", + getType() + )); + } + /** * Returns query runners for this task. If this task is not meant to answer queries over its datasource, this method * should return null. diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/LegacySinglePhaseSubTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/LegacySinglePhaseSubTask.java index 71dbec7bfe5..683a406c425 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/LegacySinglePhaseSubTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/LegacySinglePhaseSubTask.java @@ -20,11 +20,20 @@ package org.apache.druid.indexing.common.task.batch.parallel; import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.ImmutableSet; import org.apache.druid.indexing.common.task.TaskResource; +import org.apache.druid.server.security.Action; +import org.apache.druid.server.security.Resource; +import org.apache.druid.server.security.ResourceAction; +import org.apache.druid.server.security.ResourceType; +import javax.annotation.Nonnull; import javax.annotation.Nullable; import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; public class LegacySinglePhaseSubTask extends SinglePhaseSubTask { @@ -56,4 +65,20 @@ public class LegacySinglePhaseSubTask extends SinglePhaseSubTask { return SinglePhaseSubTask.OLD_TYPE_NAME; } + + @Nonnull + @JsonIgnore + @Override + public Set getInputSourceResources() + { + if (getIngestionSchema().getIOConfig().getFirehoseFactory() != null) { + throw getInputSecurityOnFirehoseUnsupportedError(); + } + return getIngestionSchema().getIOConfig().getInputSource() != null ? + getIngestionSchema().getIOConfig().getInputSource().getTypes() + .stream() + .map(i -> new ResourceAction(new Resource(ResourceType.EXTERNAL, i), Action.READ)) + .collect(Collectors.toSet()) : + ImmutableSet.of(); + } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java index 90850eb2bdb..852df1de85a 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java @@ -20,12 +20,14 @@ package org.apache.druid.indexing.common.task.batch.parallel; import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; import org.apache.datasketches.hll.HllSketch; import org.apache.datasketches.hll.Union; import org.apache.datasketches.memory.Memory; @@ -76,6 +78,9 @@ import org.apache.druid.segment.realtime.firehose.ChatHandler; import org.apache.druid.segment.realtime.firehose.ChatHandlers; import org.apache.druid.server.security.Action; import org.apache.druid.server.security.AuthorizerMapper; +import org.apache.druid.server.security.Resource; +import org.apache.druid.server.security.ResourceAction; +import org.apache.druid.server.security.ResourceType; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.partition.BuildingShardSpec; import org.apache.druid.timeline.partition.NumberedShardSpec; @@ -270,6 +275,22 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen return TYPE; } + @Nonnull + @JsonIgnore + @Override + public Set getInputSourceResources() + { + if (getIngestionSchema().getIOConfig().getFirehoseFactory() != null) { + throw getInputSecurityOnFirehoseUnsupportedError(); + } + return getIngestionSchema().getIOConfig().getInputSource() != null ? + getIngestionSchema().getIOConfig().getInputSource().getTypes() + .stream() + .map(i -> new ResourceAction(new Resource(ResourceType.EXTERNAL, i), Action.READ)) + .collect(Collectors.toSet()) : + ImmutableSet.of(); + } + @JsonProperty("spec") public ParallelIndexIngestionSpec getIngestionSchema() { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionCardinalityTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionCardinalityTask.java index 250df1afc3f..1cefdb5ae2a 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionCardinalityTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionCardinalityTask.java @@ -21,11 +21,13 @@ package org.apache.druid.indexing.common.task.batch.parallel; import com.fasterxml.jackson.annotation.JacksonInject; import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Optional; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableSet; import org.apache.datasketches.hll.HllSketch; import org.apache.druid.data.input.InputFormat; import org.apache.druid.data.input.InputRow; @@ -43,16 +45,23 @@ import org.apache.druid.segment.incremental.ParseExceptionHandler; import org.apache.druid.segment.incremental.RowIngestionMeters; import org.apache.druid.segment.indexing.DataSchema; import org.apache.druid.segment.indexing.granularity.GranularitySpec; +import org.apache.druid.server.security.Action; +import org.apache.druid.server.security.Resource; +import org.apache.druid.server.security.ResourceAction; +import org.apache.druid.server.security.ResourceType; import org.apache.druid.timeline.partition.HashPartitioner; import org.joda.time.DateTime; import org.joda.time.Interval; +import javax.annotation.Nonnull; import javax.annotation.Nullable; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; public class PartialDimensionCardinalityTask extends PerfectRollupWorkerTask { @@ -133,6 +142,22 @@ public class PartialDimensionCardinalityTask extends PerfectRollupWorkerTask return TYPE; } + @Nonnull + @JsonIgnore + @Override + public Set getInputSourceResources() + { + if (getIngestionSchema().getIOConfig().getFirehoseFactory() != null) { + throw getInputSecurityOnFirehoseUnsupportedError(); + } + return getIngestionSchema().getIOConfig().getInputSource() != null ? + getIngestionSchema().getIOConfig().getInputSource().getTypes() + .stream() + .map(i -> new ResourceAction(new Resource(ResourceType.EXTERNAL, i), Action.READ)) + .collect(Collectors.toSet()) : + ImmutableSet.of(); + } + @Override public boolean isReady(TaskActionClient taskActionClient) throws Exception { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionDistributionTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionDistributionTask.java index 3764ec3884b..bcb4403e57e 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionDistributionTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionDistributionTask.java @@ -20,10 +20,12 @@ package org.apache.druid.indexing.common.task.batch.parallel; import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import com.google.common.hash.BloomFilter; import com.google.common.hash.Funnels; @@ -49,14 +51,21 @@ import org.apache.druid.segment.incremental.ParseExceptionHandler; import org.apache.druid.segment.incremental.RowIngestionMeters; import org.apache.druid.segment.indexing.DataSchema; import org.apache.druid.segment.indexing.granularity.GranularitySpec; +import org.apache.druid.server.security.Action; +import org.apache.druid.server.security.Resource; +import org.apache.druid.server.security.ResourceAction; +import org.apache.druid.server.security.ResourceType; import org.joda.time.Interval; +import javax.annotation.Nonnull; import javax.annotation.Nullable; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Set; import java.util.function.Supplier; +import java.util.stream.Collectors; /** * The worker task of {@link PartialDimensionDistributionParallelIndexTaskRunner}. This task @@ -174,6 +183,22 @@ public class PartialDimensionDistributionTask extends PerfectRollupWorkerTask return TYPE; } + @Nonnull + @JsonIgnore + @Override + public Set getInputSourceResources() + { + if (getIngestionSchema().getIOConfig().getFirehoseFactory() != null) { + throw getInputSecurityOnFirehoseUnsupportedError(); + } + return getIngestionSchema().getIOConfig().getInputSource() != null ? + getIngestionSchema().getIOConfig().getInputSource().getTypes() + .stream() + .map(i -> new ResourceAction(new Resource(ResourceType.EXTERNAL, i), Action.READ)) + .collect(Collectors.toSet()) : + ImmutableSet.of(); + } + @Override public boolean isReady(TaskActionClient taskActionClient) throws Exception { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateTask.java index 1eb141c3f58..d79ce2b7479 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateTask.java @@ -20,7 +20,9 @@ package org.apache.druid.indexing.common.task.batch.parallel; import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.ImmutableSet; import org.apache.druid.indexer.partitions.HashedPartitionsSpec; import org.apache.druid.indexing.common.TaskReport; import org.apache.druid.indexing.common.TaskToolbox; @@ -33,6 +35,10 @@ import org.apache.druid.indexing.common.task.batch.parallel.iterator.DefaultInde import org.apache.druid.indexing.common.task.batch.partition.HashPartitionAnalysis; import org.apache.druid.indexing.worker.shuffle.ShuffleDataSegmentPusher; import org.apache.druid.segment.indexing.granularity.GranularitySpec; +import org.apache.druid.server.security.Action; +import org.apache.druid.server.security.Resource; +import org.apache.druid.server.security.ResourceAction; +import org.apache.druid.server.security.ResourceType; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.partition.PartialShardSpec; import org.joda.time.Interval; @@ -42,6 +48,7 @@ import javax.annotation.Nullable; import java.io.IOException; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.stream.Collectors; /** @@ -131,6 +138,22 @@ public class PartialHashSegmentGenerateTask extends PartialSegmentGenerateTask getInputSourceResources() + { + if (getIngestionSchema().getIOConfig().getFirehoseFactory() != null) { + throw getInputSecurityOnFirehoseUnsupportedError(); + } + return getIngestionSchema().getIOConfig().getInputSource() != null ? + getIngestionSchema().getIOConfig().getInputSource().getTypes() + .stream() + .map(i -> new ResourceAction(new Resource(ResourceType.EXTERNAL, i), Action.READ)) + .collect(Collectors.toSet()) : + ImmutableSet.of(); + } + @Override public boolean isReady(TaskActionClient taskActionClient) throws Exception { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialRangeSegmentGenerateTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialRangeSegmentGenerateTask.java index ab966e67d5b..8402ad6b58c 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialRangeSegmentGenerateTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialRangeSegmentGenerateTask.java @@ -20,8 +20,10 @@ package org.apache.druid.indexing.common.task.batch.parallel; import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableSet; import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec; import org.apache.druid.indexer.partitions.PartitionsSpec; import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec; @@ -35,14 +37,20 @@ import org.apache.druid.indexing.common.task.TaskResource; import org.apache.druid.indexing.common.task.batch.parallel.iterator.RangePartitionIndexTaskInputRowIteratorBuilder; import org.apache.druid.indexing.common.task.batch.partition.RangePartitionAnalysis; import org.apache.druid.indexing.worker.shuffle.ShuffleDataSegmentPusher; +import org.apache.druid.server.security.Action; +import org.apache.druid.server.security.Resource; +import org.apache.druid.server.security.ResourceAction; +import org.apache.druid.server.security.ResourceType; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.partition.PartitionBoundaries; import org.joda.time.Interval; +import javax.annotation.Nonnull; import javax.annotation.Nullable; import java.io.IOException; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.stream.Collectors; /** @@ -148,6 +156,22 @@ public class PartialRangeSegmentGenerateTask extends PartialSegmentGenerateTask< return TYPE; } + @Nonnull + @JsonIgnore + @Override + public Set getInputSourceResources() + { + if (getIngestionSchema().getIOConfig().getFirehoseFactory() != null) { + throw getInputSecurityOnFirehoseUnsupportedError(); + } + return getIngestionSchema().getIOConfig().getInputSource() != null ? + getIngestionSchema().getIOConfig().getInputSource().getTypes() + .stream() + .map(i -> new ResourceAction(new Resource(ResourceType.EXTERNAL, i), Action.READ)) + .collect(Collectors.toSet()) : + ImmutableSet.of(); + } + @Override public boolean isReady(TaskActionClient taskActionClient) throws IOException { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java index 03a49efaf59..8706f8227af 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java @@ -20,6 +20,7 @@ package org.apache.druid.indexing.common.task.batch.parallel; import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; @@ -27,6 +28,7 @@ import com.google.common.base.Preconditions; import com.google.common.base.Throwables; import com.google.common.collect.FluentIterable; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; import org.apache.druid.data.input.InputRow; import org.apache.druid.data.input.InputSource; import org.apache.druid.indexer.IngestionState; @@ -70,6 +72,9 @@ import org.apache.druid.segment.realtime.appenderator.SegmentsAndCommitMetadata; import org.apache.druid.segment.realtime.firehose.ChatHandler; import org.apache.druid.server.security.Action; import org.apache.druid.server.security.AuthorizerMapper; +import org.apache.druid.server.security.Resource; +import org.apache.druid.server.security.ResourceAction; +import org.apache.druid.server.security.ResourceType; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.SegmentTimeline; import org.apache.druid.timeline.TimelineObjectHolder; @@ -77,6 +82,7 @@ import org.apache.druid.timeline.partition.PartitionChunk; import org.checkerframework.checker.nullness.qual.MonotonicNonNull; import org.joda.time.Interval; +import javax.annotation.Nonnull; import javax.annotation.Nullable; import javax.servlet.http.HttpServletRequest; import javax.ws.rs.GET; @@ -96,6 +102,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; /** * The worker task of {@link SinglePhaseParallelIndexTaskRunner}. Similar to {@link IndexTask}, but this task @@ -190,6 +197,22 @@ public class SinglePhaseSubTask extends AbstractBatchSubtask implements ChatHand return TYPE; } + @Nonnull + @JsonIgnore + @Override + public Set getInputSourceResources() + { + if (getIngestionSchema().getIOConfig().getFirehoseFactory() != null) { + throw getInputSecurityOnFirehoseUnsupportedError(); + } + return getIngestionSchema().getIOConfig().getInputSource() != null ? + getIngestionSchema().getIOConfig().getInputSource().getTypes() + .stream() + .map(i -> new ResourceAction(new Resource(ResourceType.EXTERNAL, i), Action.READ)) + .collect(Collectors.toSet()) : + ImmutableSet.of(); + } + @Override public boolean isReady(TaskActionClient taskActionClient) throws IOException { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java index ad3e57cba74..a64b67b3b83 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java @@ -19,6 +19,7 @@ package org.apache.druid.indexing.overlord.http; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; import com.google.common.base.Optional; import com.google.common.collect.ImmutableList; @@ -61,6 +62,7 @@ import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.UOE; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.metadata.EntryExistsException; import org.apache.druid.metadata.TaskLookup; @@ -73,6 +75,7 @@ import org.apache.druid.server.http.security.DatasourceResourceFilter; import org.apache.druid.server.http.security.StateResourceFilter; import org.apache.druid.server.security.Access; import org.apache.druid.server.security.Action; +import org.apache.druid.server.security.AuthConfig; import org.apache.druid.server.security.AuthorizationUtils; import org.apache.druid.server.security.AuthorizerMapper; import org.apache.druid.server.security.ForbiddenException; @@ -106,6 +109,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -131,6 +135,8 @@ public class OverlordResource private final WorkerTaskRunnerQueryAdapter workerTaskRunnerQueryAdapter; private final ProvisioningStrategy provisioningStrategy; + private final AuthConfig authConfig; + private AtomicReference workerConfigRef = null; private static final List API_TASK_STATES = ImmutableList.of("pending", "waiting", "running", "complete"); @@ -162,7 +168,8 @@ public class OverlordResource AuditManager auditManager, AuthorizerMapper authorizerMapper, WorkerTaskRunnerQueryAdapter workerTaskRunnerQueryAdapter, - ProvisioningStrategy provisioningStrategy + ProvisioningStrategy provisioningStrategy, + AuthConfig authConfig ) { this.taskMaster = taskMaster; @@ -174,6 +181,7 @@ public class OverlordResource this.authorizerMapper = authorizerMapper; this.workerTaskRunnerQueryAdapter = workerTaskRunnerQueryAdapter; this.provisioningStrategy = provisioningStrategy; + this.authConfig = authConfig; } /** @@ -187,15 +195,24 @@ public class OverlordResource @Produces(MediaType.APPLICATION_JSON) public Response taskPost(final Task task, @Context final HttpServletRequest req) { - final String dataSource = task.getDataSource(); - final ResourceAction resourceAction = new ResourceAction( - new Resource(dataSource, ResourceType.DATASOURCE), - Action.WRITE - ); + final Set resourceActions; + try { + resourceActions = getNeededResourceActionsForTask(task); + } + catch (UOE e) { + return Response.status(Response.Status.BAD_REQUEST) + .entity( + ImmutableMap.of( + "error", + e.getMessage() + ) + ) + .build(); + } - Access authResult = AuthorizationUtils.authorizeResourceAction( + Access authResult = AuthorizationUtils.authorizeAllResourceActions( req, - resourceAction, + resourceActions, authorizerMapper ); @@ -1086,6 +1103,18 @@ public class OverlordResource } } + @VisibleForTesting + Set getNeededResourceActionsForTask(Task task) throws UOE + { + final String dataSource = task.getDataSource(); + final Set resourceActions = new HashSet<>(); + resourceActions.add(new ResourceAction(new Resource(dataSource, ResourceType.DATASOURCE), Action.WRITE)); + if (authConfig.isEnableInputSourceSecurity()) { + resourceActions.addAll(task.getInputSourceResources()); + } + return resourceActions; + } + private List securedTaskStatusPlus( List collectionToFilter, @Nullable String dataSource, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java index 8daedf81c9c..5fb5722dca4 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java @@ -80,6 +80,7 @@ import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.UOE; import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.jackson.JacksonUtils; @@ -1280,6 +1281,20 @@ public class AppenderatorDriverRealtimeIndexTaskTest extends InitializedNullHand Assert.assertEquals(TaskState.SUCCESS, taskStatus.getStatusCode()); } + @Test(timeout = 60_000L) + public void testInputSourceResourcesThrowException() + { + // Expect 2 segments as we will hit maxTotalRows + expectPublishedSegments(2); + + final AppenderatorDriverRealtimeIndexTask task = + makeRealtimeTask(null, Integer.MAX_VALUE, 1500L); + Assert.assertThrows( + UOE.class, + task::getInputSourceResources + ); + } + private ListenableFuture runTask(final Task task) { try { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java index f91e67dc7c6..3a713f74e4e 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java @@ -62,6 +62,10 @@ import org.apache.druid.segment.SegmentUtils; import org.apache.druid.segment.indexing.DataSchema; import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec; import org.apache.druid.segment.loading.NoopSegmentCacheManager; +import org.apache.druid.server.security.Action; +import org.apache.druid.server.security.Resource; +import org.apache.druid.server.security.ResourceAction; +import org.apache.druid.server.security.ResourceType; import org.apache.druid.timeline.CompactionState; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.partition.DimensionRangeShardSpec; @@ -87,6 +91,7 @@ import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -922,6 +927,15 @@ public class CompactionTaskParallelRunTest extends AbstractParallelIndexSupervis null ); + Assert.assertEquals( + Collections.singleton( + new ResourceAction(new Resource( + ResourceType.EXTERNAL, + LocalInputSource.TYPE_KEY + ), Action.READ)), + indexTask.getInputSourceResources() + ); + runTask(indexTask); } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java index 19b178caa34..9815dde8fea 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java @@ -673,6 +673,25 @@ public class CompactionTaskTest assertEquals(expectedFromJson, fromJson); } + @Test + public void testInputSourceResources() + { + final Builder builder = new Builder( + DATA_SOURCE, + segmentCacheManagerFactory, + RETRY_POLICY_FACTORY + ); + final CompactionTask task = builder + .inputSpec( + new CompactionIntervalSpec(COMPACTION_INTERVAL, SegmentUtils.hashIds(SEGMENTS)) + ) + .tuningConfig(createTuningConfig()) + .context(ImmutableMap.of("testKey", "testContext")) + .build(); + + Assert.assertTrue(task.getInputSourceResources().isEmpty()); + } + @Test public void testGetTuningConfigWithIndexTuningConfig() { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/HadoopIndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/HadoopIndexTaskTest.java new file mode 100644 index 00000000000..a99ad7b6bc1 --- /dev/null +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/HadoopIndexTaskTest.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.common.task; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import org.apache.druid.indexer.HadoopIOConfig; +import org.apache.druid.indexer.HadoopIngestionSpec; +import org.apache.druid.jackson.DefaultObjectMapper; +import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.segment.indexing.DataSchema; +import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec; +import org.apache.druid.server.security.Action; +import org.apache.druid.server.security.AuthTestUtils; +import org.apache.druid.server.security.Resource; +import org.apache.druid.server.security.ResourceAction; +import org.apache.druid.server.security.ResourceType; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Collections; + +public class HadoopIndexTaskTest +{ + private final ObjectMapper jsonMapper = new DefaultObjectMapper(); + + @Test + public void testCorrectInputSourceTypes() + { + final HadoopIndexTask task = new HadoopIndexTask( + null, + new HadoopIngestionSpec( + new DataSchema( + "foo", null, new AggregatorFactory[0], new UniformGranularitySpec( + Granularities.DAY, + null, + ImmutableList.of(Intervals.of("2010-01-01/P1D")) + ), + null, + jsonMapper + ), new HadoopIOConfig(ImmutableMap.of("paths", "bar"), null, null), null + ), + null, + null, + "blah", + jsonMapper, + null, + AuthTestUtils.TEST_AUTHORIZER_MAPPER, + null + ); + + Assert.assertEquals( + Collections.singleton( + new ResourceAction(new Resource( + ResourceType.EXTERNAL, + "hadoop" + ), Action.READ)), + task.getInputSourceResources() + ); + } +} diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java index 5b6dcbfcc42..c12e9c4308f 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java @@ -91,6 +91,10 @@ import org.apache.druid.segment.realtime.plumber.NoopSegmentHandoffNotifierFacto import org.apache.druid.segment.transform.ExpressionTransform; import org.apache.druid.segment.transform.TransformSpec; import org.apache.druid.server.metrics.NoopServiceEmitter; +import org.apache.druid.server.security.Action; +import org.apache.druid.server.security.Resource; +import org.apache.druid.server.security.ResourceAction; +import org.apache.druid.server.security.ResourceType; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.SegmentId; import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec; @@ -209,6 +213,54 @@ public class IndexTaskTest extends IngestionTestBase taskRunner = new TestTaskRunner(); } + @Test + public void testCorrectInputSourceTypes() throws IOException + { + File tmpDir = temporaryFolder.newFolder(); + IndexTask indexTask = new IndexTask( + null, + null, + new IndexIngestionSpec( + new DataSchema( + "test-json", + DEFAULT_TIMESTAMP_SPEC, + new DimensionsSpec( + ImmutableList.of( + new StringDimensionSchema("ts"), + new StringDimensionSchema("dim"), + new LongDimensionSchema("valDim") + ) + ), + new AggregatorFactory[]{new LongSumAggregatorFactory("valMet", "val")}, + new UniformGranularitySpec( + Granularities.DAY, + Granularities.MINUTE, + Collections.singletonList(Intervals.of("2014/P1D")) + ), + null + ), + new IndexIOConfig( + null, + new LocalInputSource(tmpDir, "druid*"), + DEFAULT_INPUT_FORMAT, + false, + false + ), + createTuningConfigWithMaxRowsPerSegment(10, true) + ), + null + ); + + Assert.assertEquals( + Collections.singleton( + new ResourceAction(new Resource( + ResourceType.EXTERNAL, + LocalInputSource.TYPE_KEY + ), Action.READ)), + indexTask.getInputSourceResources() + ); + } + @Test public void testIngestNullOnlyColumns() throws Exception { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/NoopTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/NoopTaskTest.java new file mode 100644 index 00000000000..ad7a61fa204 --- /dev/null +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/NoopTaskTest.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.common.task; + +import org.junit.Assert; +import org.junit.Test; + +public class NoopTaskTest +{ + @Test + public void testNullInputSources() + { + NoopTask task = new NoopTask("myID", null, null, 1, 0, null, null, null); + Assert.assertTrue(task.getInputSourceResources().isEmpty()); + } +} diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java index c3c1f4b7800..3e5d1df89c2 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java @@ -68,6 +68,7 @@ import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.UOE; import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.jackson.JacksonUtils; @@ -199,6 +200,16 @@ public class RealtimeIndexTaskTest extends InitializedNullHandlingTest Assert.assertTrue(task.supportsQueries()); } + @Test(timeout = 60_000L) + public void testInputSourceTypes() + { + final RealtimeIndexTask task = makeRealtimeTask(null); + Assert.assertThrows( + UOE.class, + task::getInputSourceResources + ); + } + @Test(timeout = 60_000L, expected = ExecutionException.class) public void testHandoffTimeout() throws Exception { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskTest.java new file mode 100644 index 00000000000..ca8458cf7fd --- /dev/null +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskTest.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.common.task; + +import org.apache.druid.indexer.TaskStatus; +import org.apache.druid.indexing.common.TaskToolbox; +import org.apache.druid.indexing.common.actions.TaskActionClient; +import org.apache.druid.indexing.common.config.TaskConfig; +import org.apache.druid.java.util.common.UOE; +import org.apache.druid.query.Query; +import org.apache.druid.query.QueryRunner; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Map; + +public class TaskTest +{ + private static final Task TASK = new Task() + { + @Override + public String getId() + { + return null; + } + + @Override + public String getGroupId() + { + return null; + } + + @Override + public TaskResource getTaskResource() + { + return null; + } + + @Override + public String getType() + { + return null; + } + + @Override + public String getNodeType() + { + return null; + } + + @Override + public String getDataSource() + { + return null; + } + + @Override + public QueryRunner getQueryRunner(Query query) + { + return null; + } + + @Override + public boolean supportsQueries() + { + return false; + } + + @Override + public String getClasspathPrefix() + { + return null; + } + + @Override + public boolean isReady(TaskActionClient taskActionClient) + { + return false; + } + + @Override + public boolean canRestore() + { + return false; + } + + @Override + public void stopGracefully(TaskConfig taskConfig) + { + + } + + @Override + public TaskStatus run(TaskToolbox toolbox) + { + return null; + } + + @Override + public Map getContext() + { + return null; + } + }; + + @Test + public void testGetInputSourceTypes() + { + Assert.assertThrows( + UOE.class, + TASK::getInputSourceResources + ); + } +} diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionCardinalityTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionCardinalityTaskTest.java index 1e529cbfe6b..d8e64b05397 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionCardinalityTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionCardinalityTaskTest.java @@ -44,6 +44,10 @@ import org.apache.druid.segment.TestHelper; import org.apache.druid.segment.incremental.ParseExceptionHandler; import org.apache.druid.segment.indexing.DataSchema; import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec; +import org.apache.druid.server.security.Action; +import org.apache.druid.server.security.Resource; +import org.apache.druid.server.security.ResourceAction; +import org.apache.druid.server.security.ResourceType; import org.apache.druid.testing.junit.LoggerCaptureRule; import org.apache.logging.log4j.core.LogEvent; import org.easymock.Capture; @@ -114,6 +118,21 @@ public class PartialDimensionCardinalityTaskTest TestHelper.testSerializesDeserializes(OBJECT_MAPPER, task); } + @Test + public void hasCorrectInputSourceTypes() + { + PartialDimensionCardinalityTask task = new PartialDimensionCardinalityTaskBuilder() + .build(); + Assert.assertEquals( + Collections.singleton( + new ResourceAction(new Resource( + ResourceType.EXTERNAL, + InlineInputSource.TYPE_KEY + ), Action.READ)), + task.getInputSourceResources() + ); + } + @Test public void hasCorrectPrefixForAutomaticId() { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionDistributionTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionDistributionTaskTest.java index 49fb4041380..2a9109e6804 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionDistributionTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionDistributionTaskTest.java @@ -37,9 +37,12 @@ import org.apache.druid.indexing.common.stats.DropwizardRowIngestionMetersFactor import org.apache.druid.indexing.common.task.batch.parallel.distribution.StringDistribution; import org.apache.druid.indexing.common.task.batch.parallel.distribution.StringSketch; import org.apache.druid.java.util.common.StringUtils; -import org.apache.druid.segment.TestHelper; import org.apache.druid.segment.incremental.ParseExceptionHandler; import org.apache.druid.segment.indexing.DataSchema; +import org.apache.druid.server.security.Action; +import org.apache.druid.server.security.Resource; +import org.apache.druid.server.security.ResourceAction; +import org.apache.druid.server.security.ResourceType; import org.apache.druid.testing.junit.LoggerCaptureRule; import org.apache.druid.timeline.partition.PartitionBoundaries; import org.apache.logging.log4j.core.LogEvent; @@ -108,14 +111,6 @@ public class PartialDimensionDistributionTaskTest .build(); } - @Test - public void serializesDeserializes() - { - PartialDimensionDistributionTask task = new PartialDimensionDistributionTaskBuilder() - .build(); - TestHelper.testSerializesDeserializes(OBJECT_MAPPER, task); - } - @Test public void hasCorrectPrefixForAutomaticId() { @@ -375,6 +370,22 @@ public class PartialDimensionDistributionTaskTest Assert.assertEquals(TaskState.SUCCESS, taskStatus.getStatusCode()); } + @Test + public void testInputSourceResources() + { + PartialDimensionDistributionTask task = new PartialDimensionDistributionTaskBuilder() + .build(); + + Assert.assertEquals( + Collections.singleton( + new ResourceAction( + new Resource(ResourceType.EXTERNAL, InlineInputSource.TYPE_KEY), + Action.READ + )), + task.getInputSourceResources() + ); + } + private DimensionDistributionReport runTask(PartialDimensionDistributionTaskBuilder taskBuilder) { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateTaskTest.java index 749921fff3c..37ff1a21504 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateTaskTest.java @@ -30,6 +30,10 @@ import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.segment.TestHelper; import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec; +import org.apache.druid.server.security.Action; +import org.apache.druid.server.security.Resource; +import org.apache.druid.server.security.ResourceAction; +import org.apache.druid.server.security.ResourceType; import org.hamcrest.Matchers; import org.joda.time.Interval; import org.junit.Assert; @@ -39,6 +43,7 @@ import org.junit.Test; import org.junit.rules.ExpectedException; import java.io.File; +import java.util.Collections; import java.util.List; import java.util.Map; @@ -86,6 +91,19 @@ public class PartialHashSegmentGenerateTaskTest Assert.assertThat(id, Matchers.startsWith(PartialHashSegmentGenerateTask.TYPE)); } + @Test + public void hasCorrectInputSourceTypes() + { + Assert.assertEquals( + Collections.singleton( + new ResourceAction(new Resource( + ResourceType.EXTERNAL, + LocalInputSource.TYPE_KEY + ), Action.READ)), + target.getInputSourceResources() + ); + } + @Test public void testCreateHashPartitionAnalysisFromPartitionsSpecWithNumShardsReturningAnalysisOfValidNumBuckets() { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialRangeSegmentGenerateTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialRangeSegmentGenerateTaskTest.java index 09016e192ee..f2a235ef71b 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialRangeSegmentGenerateTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialRangeSegmentGenerateTaskTest.java @@ -30,6 +30,10 @@ import org.apache.druid.indexer.partitions.PartitionsSpec; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.segment.TestHelper; import org.apache.druid.segment.indexing.DataSchema; +import org.apache.druid.server.security.Action; +import org.apache.druid.server.security.Resource; +import org.apache.druid.server.security.ResourceAction; +import org.apache.druid.server.security.ResourceType; import org.apache.druid.timeline.partition.PartitionBoundaries; import org.hamcrest.Matchers; import org.junit.Assert; @@ -101,6 +105,20 @@ public class PartialRangeSegmentGenerateTaskTest extends AbstractParallelIndexSu TestHelper.testSerializesDeserializes(getObjectMapper(), task); } + @Test + public void hasCorrectInputSourceTypes() + { + PartialRangeSegmentGenerateTask task = new PartialRangeSegmentGenerateTaskBuilder().build(); + Assert.assertEquals( + Collections.singleton( + new ResourceAction(new Resource( + ResourceType.EXTERNAL, + InlineInputSource.TYPE_KEY + ), Action.READ)), + task.getInputSourceResources() + ); + } + @Test public void hasCorrectPrefixForAutomaticId() { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java index 0b5fbf2dce5..e57438b8cb8 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java @@ -46,6 +46,10 @@ import org.apache.druid.segment.incremental.ParseExceptionReport; import org.apache.druid.segment.incremental.RowIngestionMetersTotals; import org.apache.druid.segment.indexing.DataSchema; import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec; +import org.apache.druid.server.security.Action; +import org.apache.druid.server.security.Resource; +import org.apache.druid.server.security.ResourceAction; +import org.apache.druid.server.security.ResourceType; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.Partitions; import org.apache.druid.timeline.SegmentTimeline; @@ -172,6 +176,14 @@ public class SinglePhaseParallelIndexingTest extends AbstractParallelIndexSuperv final TaskActionClient subTaskActionClient = createActionClient(subTask); prepareTaskForLocking(subTask); Assert.assertTrue(subTask.isReady(subTaskActionClient)); + Assert.assertEquals( + Collections.singleton( + new ResourceAction(new Resource( + ResourceType.EXTERNAL, + LocalInputSource.TYPE_KEY + ), Action.READ)), + subTask.getInputSourceResources() + ); } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTaskSpecTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTaskSpecTest.java index d42f7e69200..ab260b9cf20 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTaskSpecTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTaskSpecTest.java @@ -28,12 +28,17 @@ import org.apache.druid.data.input.impl.TimestampSpec; import org.apache.druid.indexing.common.TestUtils; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.segment.indexing.DataSchema; +import org.apache.druid.server.security.Action; +import org.apache.druid.server.security.Resource; +import org.apache.druid.server.security.ResourceAction; +import org.apache.druid.server.security.ResourceType; import org.junit.Assert; import org.junit.Before; import org.junit.Test; import java.io.File; import java.io.IOException; +import java.util.Collections; import java.util.Map; public class SinglePhaseSubTaskSpecTest @@ -88,5 +93,13 @@ public class SinglePhaseSubTaskSpecTest final byte[] json = mapper.writeValueAsBytes(expected); final Map actual = mapper.readValue(json, Map.class); Assert.assertEquals(SinglePhaseSubTask.OLD_TYPE_NAME, actual.get("type")); + Assert.assertEquals( + Collections.singleton( + new ResourceAction(new Resource( + ResourceType.EXTERNAL, + LocalInputSource.TYPE_KEY + ), Action.READ)), + expected.getInputSourceResources() + ); } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java index f01668fd37a..caf8b792df1 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java @@ -52,6 +52,7 @@ import org.apache.druid.indexing.worker.config.WorkerConfig; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.RE; +import org.apache.druid.java.util.common.UOE; import org.apache.druid.metadata.TaskLookup.ActiveTaskLookup; import org.apache.druid.metadata.TaskLookup.CompleteTaskLookup; import org.apache.druid.metadata.TaskLookup.TaskLookupType; @@ -64,6 +65,8 @@ import org.apache.druid.server.security.Authorizer; import org.apache.druid.server.security.AuthorizerMapper; import org.apache.druid.server.security.ForbiddenException; import org.apache.druid.server.security.Resource; +import org.apache.druid.server.security.ResourceAction; +import org.apache.druid.server.security.ResourceType; import org.easymock.EasyMock; import org.jboss.netty.handler.codec.http.HttpResponseStatus; import org.joda.time.DateTime; @@ -87,6 +90,7 @@ import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.atomic.AtomicReference; public class OverlordResourceTest @@ -95,6 +99,7 @@ public class OverlordResourceTest private TaskMaster taskMaster; private JacksonConfigManager configManager; private ProvisioningStrategy provisioningStrategy; + private AuthConfig authConfig; private TaskStorageQueryAdapter taskStorageQueryAdapter; private IndexerMetadataStorageAdapter indexerMetadataStorageAdapter; private HttpServletRequest req; @@ -110,6 +115,7 @@ public class OverlordResourceTest taskRunner = EasyMock.createMock(TaskRunner.class); configManager = EasyMock.createMock(JacksonConfigManager.class); provisioningStrategy = EasyMock.createMock(ProvisioningStrategy.class); + authConfig = EasyMock.createMock(AuthConfig.class); taskMaster = EasyMock.createStrictMock(TaskMaster.class); taskStorageQueryAdapter = EasyMock.createStrictMock(TaskStorageQueryAdapter.class); indexerMetadataStorageAdapter = EasyMock.createStrictMock(IndexerMetadataStorageAdapter.class); @@ -162,7 +168,8 @@ public class OverlordResourceTest null, authMapper, workerTaskRunnerQueryAdapter, - provisioningStrategy + provisioningStrategy, + authConfig ); } @@ -175,7 +182,8 @@ public class OverlordResourceTest taskStorageQueryAdapter, indexerMetadataStorageAdapter, req, - workerTaskRunnerQueryAdapter + workerTaskRunnerQueryAdapter, + authConfig ); } @@ -189,7 +197,8 @@ public class OverlordResourceTest taskStorageQueryAdapter, indexerMetadataStorageAdapter, req, - workerTaskRunnerQueryAdapter + workerTaskRunnerQueryAdapter, + authConfig ); final Response response = overlordResource.getLeader(); @@ -208,7 +217,8 @@ public class OverlordResourceTest taskStorageQueryAdapter, indexerMetadataStorageAdapter, req, - workerTaskRunnerQueryAdapter + workerTaskRunnerQueryAdapter, + authConfig ); // true @@ -253,7 +263,8 @@ public class OverlordResourceTest taskStorageQueryAdapter, indexerMetadataStorageAdapter, req, - workerTaskRunnerQueryAdapter + workerTaskRunnerQueryAdapter, + authConfig ); List responseObjects = (List) overlordResource.getWaitingTasks(req) @@ -284,7 +295,8 @@ public class OverlordResourceTest taskStorageQueryAdapter, indexerMetadataStorageAdapter, req, - workerTaskRunnerQueryAdapter + workerTaskRunnerQueryAdapter, + authConfig ); List responseObjects = (List) overlordResource .getCompleteTasks(null, req).getEntity(); @@ -325,7 +337,8 @@ public class OverlordResourceTest taskStorageQueryAdapter, indexerMetadataStorageAdapter, req, - workerTaskRunnerQueryAdapter + workerTaskRunnerQueryAdapter, + authConfig ); List responseObjects = (List) overlordResource.getRunningTasks(null, req) @@ -373,7 +386,8 @@ public class OverlordResourceTest taskStorageQueryAdapter, indexerMetadataStorageAdapter, req, - workerTaskRunnerQueryAdapter + workerTaskRunnerQueryAdapter, + authConfig ); List responseObjects = (List) overlordResource .getTasks(null, null, null, null, null, req) @@ -419,7 +433,8 @@ public class OverlordResourceTest taskStorageQueryAdapter, indexerMetadataStorageAdapter, req, - workerTaskRunnerQueryAdapter + workerTaskRunnerQueryAdapter, + authConfig ); List responseObjects = (List) overlordResource @@ -465,7 +480,8 @@ public class OverlordResourceTest taskStorageQueryAdapter, indexerMetadataStorageAdapter, req, - workerTaskRunnerQueryAdapter + workerTaskRunnerQueryAdapter, + authConfig ); List responseObjects = (List) overlordResource .getTasks( @@ -517,7 +533,8 @@ public class OverlordResourceTest taskStorageQueryAdapter, indexerMetadataStorageAdapter, req, - workerTaskRunnerQueryAdapter + workerTaskRunnerQueryAdapter, + authConfig ); List responseObjects = (List) overlordResource @@ -566,7 +583,8 @@ public class OverlordResourceTest taskStorageQueryAdapter, indexerMetadataStorageAdapter, req, - workerTaskRunnerQueryAdapter + workerTaskRunnerQueryAdapter, + authConfig ); List responseObjects = (List) overlordResource @@ -600,7 +618,8 @@ public class OverlordResourceTest taskStorageQueryAdapter, indexerMetadataStorageAdapter, req, - workerTaskRunnerQueryAdapter + workerTaskRunnerQueryAdapter, + authConfig ); List responseObjects = (List) overlordResource .getTasks("complete", null, null, null, null, req) @@ -634,7 +653,8 @@ public class OverlordResourceTest taskStorageQueryAdapter, indexerMetadataStorageAdapter, req, - workerTaskRunnerQueryAdapter + workerTaskRunnerQueryAdapter, + authConfig ); String interval = "2010-01-01_P1D"; List responseObjects = (List) overlordResource @@ -689,7 +709,8 @@ public class OverlordResourceTest taskStorageQueryAdapter, indexerMetadataStorageAdapter, req, - workerTaskRunnerQueryAdapter + workerTaskRunnerQueryAdapter, + authConfig ); // Verify that only the tasks of read access datasource are returned @@ -745,7 +766,8 @@ public class OverlordResourceTest taskStorageQueryAdapter, indexerMetadataStorageAdapter, req, - workerTaskRunnerQueryAdapter + workerTaskRunnerQueryAdapter, + authConfig ); // Verify that only the tasks of read access datasource are returned @@ -772,7 +794,8 @@ public class OverlordResourceTest taskStorageQueryAdapter, indexerMetadataStorageAdapter, req, - workerTaskRunnerQueryAdapter + workerTaskRunnerQueryAdapter, + authConfig ); // Verify that only the tasks of read access datasource are returned @@ -805,7 +828,8 @@ public class OverlordResourceTest taskStorageQueryAdapter, indexerMetadataStorageAdapter, req, - workerTaskRunnerQueryAdapter + workerTaskRunnerQueryAdapter, + authConfig ); List responseObjects = (List) overlordResource .getTasks("complete", null, null, null, null, req) @@ -824,7 +848,8 @@ public class OverlordResourceTest taskStorageQueryAdapter, indexerMetadataStorageAdapter, req, - workerTaskRunnerQueryAdapter + workerTaskRunnerQueryAdapter, + authConfig ); Object responseObject = overlordResource .getTasks("blah", "ds_test", null, null, null, req) @@ -840,6 +865,7 @@ public class OverlordResourceTest { expectedException.expect(ForbiddenException.class); expectAuthorizationTokenCheck(); + EasyMock.expect(authConfig.isEnableInputSourceSecurity()).andReturn(false); EasyMock.replay( taskRunner, @@ -847,7 +873,8 @@ public class OverlordResourceTest taskStorageQueryAdapter, indexerMetadataStorageAdapter, req, - workerTaskRunnerQueryAdapter + workerTaskRunnerQueryAdapter, + authConfig ); Task task = NoopTask.create(); overlordResource.taskPost(task, req); @@ -857,6 +884,7 @@ public class OverlordResourceTest public void testTaskPostDeniesDatasourceReadUser() { expectAuthorizationTokenCheck(Users.WIKI_READER); + EasyMock.expect(authConfig.isEnableInputSourceSecurity()).andReturn(false); EasyMock.replay( taskRunner, @@ -864,7 +892,8 @@ public class OverlordResourceTest taskStorageQueryAdapter, indexerMetadataStorageAdapter, req, - workerTaskRunnerQueryAdapter + workerTaskRunnerQueryAdapter, + authConfig ); // Verify that taskPost fails for user who has only datasource read access @@ -895,7 +924,8 @@ public class OverlordResourceTest taskStorageQueryAdapter, indexerMetadataStorageAdapter, req, - workerTaskRunnerQueryAdapter + workerTaskRunnerQueryAdapter, + authConfig ); final Map response = (Map) overlordResource @@ -924,7 +954,8 @@ public class OverlordResourceTest taskStorageQueryAdapter, indexerMetadataStorageAdapter, req, - workerTaskRunnerQueryAdapter + workerTaskRunnerQueryAdapter, + authConfig ); final Response response1 = overlordResource.getTaskPayload("mytask"); @@ -973,7 +1004,8 @@ public class OverlordResourceTest taskStorageQueryAdapter, indexerMetadataStorageAdapter, req, - workerTaskRunnerQueryAdapter + workerTaskRunnerQueryAdapter, + authConfig ); final Response response1 = overlordResource.getTaskStatus("mytask"); @@ -1031,7 +1063,8 @@ public class OverlordResourceTest taskStorageQueryAdapter, indexerMetadataStorageAdapter, req, - workerTaskRunnerQueryAdapter + workerTaskRunnerQueryAdapter, + authConfig ); final Response response = overlordResource.getDatasourceLockedIntervals(minTaskPriority); @@ -1057,7 +1090,8 @@ public class OverlordResourceTest taskStorageQueryAdapter, indexerMetadataStorageAdapter, req, - workerTaskRunnerQueryAdapter + workerTaskRunnerQueryAdapter, + authConfig ); Response response = overlordResource.getDatasourceLockedIntervals(null); @@ -1091,7 +1125,8 @@ public class OverlordResourceTest indexerMetadataStorageAdapter, req, mockQueue, - workerTaskRunnerQueryAdapter + workerTaskRunnerQueryAdapter, + authConfig ); final Map response = (Map) overlordResource @@ -1142,7 +1177,8 @@ public class OverlordResourceTest indexerMetadataStorageAdapter, req, mockQueue, - workerTaskRunnerQueryAdapter + workerTaskRunnerQueryAdapter, + authConfig ); final Map response = (Map) overlordResource @@ -1164,7 +1200,8 @@ public class OverlordResourceTest taskStorageQueryAdapter, indexerMetadataStorageAdapter, req, - workerTaskRunnerQueryAdapter + workerTaskRunnerQueryAdapter, + authConfig ); final Response response = overlordResource.shutdownTasksForDataSource("notExisting"); @@ -1185,7 +1222,8 @@ public class OverlordResourceTest taskStorageQueryAdapter, indexerMetadataStorageAdapter, req, - workerTaskRunnerQueryAdapter + workerTaskRunnerQueryAdapter, + authConfig ); final Response response = overlordResource.enableWorker(host); @@ -1208,7 +1246,8 @@ public class OverlordResourceTest taskStorageQueryAdapter, indexerMetadataStorageAdapter, req, - workerTaskRunnerQueryAdapter + workerTaskRunnerQueryAdapter, + authConfig ); final Response response = overlordResource.disableWorker(host); @@ -1231,7 +1270,8 @@ public class OverlordResourceTest taskStorageQueryAdapter, indexerMetadataStorageAdapter, req, - workerTaskRunnerQueryAdapter + workerTaskRunnerQueryAdapter, + authConfig ); final Response response = overlordResource.enableWorker(host); @@ -1254,7 +1294,8 @@ public class OverlordResourceTest taskStorageQueryAdapter, indexerMetadataStorageAdapter, req, - workerTaskRunnerQueryAdapter + workerTaskRunnerQueryAdapter, + authConfig ); final Response response = overlordResource.disableWorker(host); @@ -1277,7 +1318,8 @@ public class OverlordResourceTest indexerMetadataStorageAdapter, req, workerTaskRunnerQueryAdapter, - configManager + configManager, + authConfig ); final Response response = overlordResource.getTotalWorkerCapacity(); Assert.assertEquals(HttpResponseStatus.SERVICE_UNAVAILABLE.getCode(), response.getStatus()); @@ -1296,7 +1338,8 @@ public class OverlordResourceTest indexerMetadataStorageAdapter, req, workerTaskRunnerQueryAdapter, - configManager + configManager, + authConfig ); final Response response = overlordResource.getTotalWorkerCapacity(); Assert.assertEquals(HttpResponseStatus.OK.getCode(), response.getStatus()); @@ -1316,7 +1359,8 @@ public class OverlordResourceTest indexerMetadataStorageAdapter, req, workerTaskRunnerQueryAdapter, - configManager + configManager, + authConfig ); final Response response = overlordResource.getTotalWorkerCapacity(); Assert.assertEquals(HttpResponseStatus.OK.getCode(), response.getStatus()); @@ -1337,7 +1381,8 @@ public class OverlordResourceTest indexerMetadataStorageAdapter, req, workerTaskRunnerQueryAdapter, - configManager + configManager, + authConfig ); final Response response = overlordResource.getTotalWorkerCapacity(); Assert.assertEquals(HttpResponseStatus.OK.getCode(), response.getStatus()); @@ -1384,7 +1429,8 @@ public class OverlordResourceTest req, workerTaskRunnerQueryAdapter, configManager, - provisioningStrategy + provisioningStrategy, + authConfig ); final Response response = overlordResource.getTotalWorkerCapacity(); Assert.assertEquals(HttpResponseStatus.OK.getCode(), response.getStatus()); @@ -1431,7 +1477,8 @@ public class OverlordResourceTest req, workerTaskRunnerQueryAdapter, configManager, - provisioningStrategy + provisioningStrategy, + authConfig ); final Response response = overlordResource.getTotalWorkerCapacity(); Assert.assertEquals(HttpResponseStatus.OK.getCode(), response.getStatus()); @@ -1439,6 +1486,108 @@ public class OverlordResourceTest Assert.assertEquals(invalidExpectedCapacity, ((TotalWorkerCapacityResponse) response.getEntity()).getMaximumCapacityWithAutoScale()); } + @Test + public void testResourceActionsForTaskWithInputTypeAndInputSecurityEnabled() + { + + final String dataSource = "dataSourceTest"; + final String inputSourceType = "local"; + Task task = EasyMock.createMock(Task.class); + + EasyMock.expect(authConfig.isEnableInputSourceSecurity()).andReturn(true); + EasyMock.expect(task.getDataSource()).andReturn(dataSource); + EasyMock.expect(task.getInputSourceResources()) + .andReturn(ImmutableSet.of(new ResourceAction( + new Resource(ResourceType.EXTERNAL, inputSourceType), + Action.READ + ))); + + EasyMock.replay( + task, + authConfig, + taskRunner, + taskMaster, + taskStorageQueryAdapter, + indexerMetadataStorageAdapter, + req, + workerTaskRunnerQueryAdapter + ); + + Set expectedResourceActions = ImmutableSet.of( + new ResourceAction(new Resource(dataSource, ResourceType.DATASOURCE), Action.WRITE), + new ResourceAction(new Resource(ResourceType.EXTERNAL, inputSourceType), Action.READ) + ); + Set resourceActions = overlordResource.getNeededResourceActionsForTask(task); + Assert.assertEquals(expectedResourceActions, resourceActions); + } + + @Test + public void testResourceActionsForTaskWithFirehoseAndInputSecurityEnabled() + { + + final String dataSource = "dataSourceTest"; + final UOE expectedException = new UOE("unsupported"); + Task task = EasyMock.createMock(Task.class); + + EasyMock.expect(authConfig.isEnableInputSourceSecurity()).andReturn(true); + EasyMock.expect(task.getId()).andReturn("taskId"); + EasyMock.expect(task.getDataSource()).andReturn(dataSource); + EasyMock.expect(task.getInputSourceResources()).andThrow(expectedException); + + EasyMock.replay( + task, + authConfig, + taskRunner, + taskMaster, + taskStorageQueryAdapter, + indexerMetadataStorageAdapter, + req, + workerTaskRunnerQueryAdapter + ); + + + final UOE e = Assert.assertThrows( + UOE.class, + () -> overlordResource.getNeededResourceActionsForTask(task) + ); + + Assert.assertEquals(expectedException, e); + } + + @Test + public void testResourceActionsForTaskWithInputTypeAndInputSecurityDisabled() + { + + final String dataSource = "dataSourceTest"; + final String inputSourceType = "local"; + Task task = EasyMock.createMock(Task.class); + + EasyMock.expect(authConfig.isEnableInputSourceSecurity()).andReturn(false); + EasyMock.expect(task.getDataSource()).andReturn(dataSource); + EasyMock.expect(task.getInputSourceResources()) + .andReturn(ImmutableSet.of(new ResourceAction( + new Resource(ResourceType.EXTERNAL, inputSourceType), + Action.READ + ))); + + EasyMock.replay( + task, + authConfig, + taskRunner, + taskMaster, + taskStorageQueryAdapter, + indexerMetadataStorageAdapter, + req, + workerTaskRunnerQueryAdapter + ); + + Set expectedResourceActions = ImmutableSet.of( + new ResourceAction(new Resource(dataSource, ResourceType.DATASOURCE), Action.WRITE) + ); + Set resourceActions = overlordResource.getNeededResourceActionsForTask(task); + Assert.assertEquals(expectedResourceActions, resourceActions); + } + private void expectAuthorizationTokenCheck() { expectAuthorizationTokenCheck(Users.DRUID); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java index 3ada645ff88..c33917de571 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java @@ -258,7 +258,8 @@ public class OverlordTest null, AuthTestUtils.TEST_AUTHORIZER_MAPPER, workerTaskRunnerQueryAdapter, - null + null, + new AuthConfig() ); Response response = overlordResource.getLeader(); Assert.assertEquals(druidNode.getHostAndPort(), response.getEntity()); diff --git a/processing/src/main/java/org/apache/druid/data/input/InputSource.java b/processing/src/main/java/org/apache/druid/data/input/InputSource.java index 793cad2fedc..be815742be1 100644 --- a/processing/src/main/java/org/apache/druid/data/input/InputSource.java +++ b/processing/src/main/java/org/apache/druid/data/input/InputSource.java @@ -28,9 +28,12 @@ import org.apache.druid.data.input.impl.HttpInputSource; import org.apache.druid.data.input.impl.InlineInputSource; import org.apache.druid.data.input.impl.LocalInputSource; import org.apache.druid.guice.annotations.UnstableApi; +import org.apache.druid.java.util.common.UOE; +import javax.annotation.Nonnull; import javax.annotation.Nullable; import java.io.File; +import java.util.Set; /** * InputSource abstracts the storage system where input data is stored. It creates an {@link InputSourceReader} @@ -87,4 +90,16 @@ public interface InputSource @Nullable InputFormat inputFormat, File temporaryDirectory ); + + /** + * The types of input sources uses. A set is returned here, as some InputSource implementation allow for + * combining of multiple input sources. + * @return The types of input sources uses + */ + @JsonIgnore + @Nonnull + default Set getTypes() + { + throw new UOE("This inputSource does not support input source based security"); + } } diff --git a/processing/src/main/java/org/apache/druid/data/input/impl/CombiningInputSource.java b/processing/src/main/java/org/apache/druid/data/input/impl/CombiningInputSource.java index 05899021a6e..9e3f84fe1cf 100644 --- a/processing/src/main/java/org/apache/druid/data/input/impl/CombiningInputSource.java +++ b/processing/src/main/java/org/apache/druid/data/input/impl/CombiningInputSource.java @@ -20,6 +20,7 @@ package org.apache.druid.data.input.impl; import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; import org.apache.druid.data.input.AbstractInputSource; @@ -29,10 +30,13 @@ import org.apache.druid.data.input.InputSplit; import org.apache.druid.data.input.SplitHintSpec; import org.apache.druid.java.util.common.Pair; +import javax.annotation.Nonnull; import javax.annotation.Nullable; import java.io.IOException; +import java.util.HashSet; import java.util.List; import java.util.Objects; +import java.util.Set; import java.util.stream.Stream; /** @@ -61,6 +65,19 @@ public class CombiningInputSource extends AbstractInputSource implements Splitta this.delegates = delegates; } + @JsonIgnore + @Nonnull + @Override + public Set getTypes() + { + Set types = new HashSet<>(); + for (InputSource delegate : delegates) { + types.addAll(delegate.getTypes()); + } + + return types; + } + @JsonProperty public List getDelegates() { diff --git a/processing/src/main/java/org/apache/druid/data/input/impl/HttpInputSource.java b/processing/src/main/java/org/apache/druid/data/input/impl/HttpInputSource.java index 616f05afa55..61b4a8e6753 100644 --- a/processing/src/main/java/org/apache/druid/data/input/impl/HttpInputSource.java +++ b/processing/src/main/java/org/apache/druid/data/input/impl/HttpInputSource.java @@ -21,6 +21,7 @@ package org.apache.druid.data.input.impl; import com.fasterxml.jackson.annotation.JacksonInject; import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; @@ -34,12 +35,14 @@ import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.metadata.PasswordProvider; +import javax.annotation.Nonnull; import javax.annotation.Nullable; import java.io.File; import java.net.URI; import java.util.Collections; import java.util.List; import java.util.Objects; +import java.util.Set; import java.util.stream.Stream; public class HttpInputSource extends AbstractInputSource implements SplittableInputSource @@ -69,6 +72,14 @@ public class HttpInputSource extends AbstractInputSource implements SplittableIn this.config = config; } + @JsonIgnore + @Nonnull + @Override + public Set getTypes() + { + return Collections.singleton(TYPE_KEY); + } + public static void throwIfInvalidProtocols(HttpInputSourceConfig config, List uris) { for (URI uri : uris) { diff --git a/processing/src/main/java/org/apache/druid/data/input/impl/InlineInputSource.java b/processing/src/main/java/org/apache/druid/data/input/impl/InlineInputSource.java index eafe13aaec3..319750ef3e1 100644 --- a/processing/src/main/java/org/apache/druid/data/input/impl/InlineInputSource.java +++ b/processing/src/main/java/org/apache/druid/data/input/impl/InlineInputSource.java @@ -30,9 +30,12 @@ import org.apache.druid.data.input.InputRowSchema; import org.apache.druid.data.input.InputSourceReader; import org.apache.druid.java.util.common.StringUtils; +import javax.annotation.Nonnull; import javax.annotation.Nullable; import java.io.File; +import java.util.Collections; import java.util.Objects; +import java.util.Set; import java.util.stream.Stream; public class InlineInputSource extends AbstractInputSource @@ -48,6 +51,14 @@ public class InlineInputSource extends AbstractInputSource this.data = data; } + @JsonIgnore + @Nonnull + @Override + public Set getTypes() + { + return Collections.singleton(TYPE_KEY); + } + @JsonProperty public String getData() { diff --git a/processing/src/main/java/org/apache/druid/data/input/impl/LocalInputSource.java b/processing/src/main/java/org/apache/druid/data/input/impl/LocalInputSource.java index 4a3dd9e257d..23208c94d76 100644 --- a/processing/src/main/java/org/apache/druid/data/input/impl/LocalInputSource.java +++ b/processing/src/main/java/org/apache/druid/data/input/impl/LocalInputSource.java @@ -20,6 +20,7 @@ package org.apache.druid.data.input.impl; import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.annotations.VisibleForTesting; @@ -45,12 +46,14 @@ import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.utils.CollectionUtils; import org.apache.druid.utils.Streams; +import javax.annotation.Nonnull; import javax.annotation.Nullable; import java.io.File; import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Objects; +import java.util.Set; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -81,6 +84,14 @@ public class LocalInputSource extends AbstractInputSource implements SplittableI } } + @JsonIgnore + @Nonnull + @Override + public Set getTypes() + { + return Collections.singleton(TYPE_KEY); + } + public LocalInputSource(File baseDir, String filter) { this(baseDir, filter, null); diff --git a/processing/src/test/java/org/apache/druid/data/input/impl/CombiningInputSourceTest.java b/processing/src/test/java/org/apache/druid/data/input/impl/CombiningInputSourceTest.java index 1db194baec1..32c8117a9a5 100644 --- a/processing/src/test/java/org/apache/druid/data/input/impl/CombiningInputSourceTest.java +++ b/processing/src/test/java/org/apache/druid/data/input/impl/CombiningInputSourceTest.java @@ -20,6 +20,7 @@ package org.apache.druid.data.input.impl; import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.module.SimpleModule; @@ -42,8 +43,11 @@ import java.io.File; import java.io.IOException; import java.net.URI; import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Objects; +import java.util.Set; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -66,6 +70,24 @@ public class CombiningInputSourceTest Assert.assertEquals(combiningInputSource, fromJson); } + @Test + public void testGetTypes() + { + final ObjectMapper mapper = new ObjectMapper(); + mapper.registerModule(new SimpleModule("test-module").registerSubtypes(TestFileInputSource.class, TestUriInputSource.class)); + final TestFileInputSource fileSource = new TestFileInputSource(ImmutableList.of(new File("myFile").getAbsoluteFile())); + final TestUriInputSource uriInputSource = new TestUriInputSource( + ImmutableList.of(URI.create("http://test.com/http-test"))); + final CombiningInputSource combiningInputSource = new CombiningInputSource(ImmutableList.of( + fileSource, + uriInputSource + )); + Set expectedTypes = new HashSet<>(); + expectedTypes.addAll(fileSource.getTypes()); + expectedTypes.addAll(uriInputSource.getTypes()); + Assert.assertEquals(expectedTypes, combiningInputSource.getTypes()); + } + @Test public void testEstimateNumSplits() { @@ -201,6 +223,13 @@ public class CombiningInputSourceTest files = fileList; } + @JsonIgnore + @Override + public Set getTypes() + { + return Collections.singleton("testFile"); + } + @JsonProperty public List getFiles() { @@ -261,6 +290,13 @@ public class CombiningInputSourceTest uris = uriList; } + @JsonIgnore + @Override + public Set getTypes() + { + return Collections.singleton("testUri"); + } + @JsonProperty public List getUris() { diff --git a/processing/src/test/java/org/apache/druid/data/input/impl/InlineInputSourceTest.java b/processing/src/test/java/org/apache/druid/data/input/impl/InlineInputSourceTest.java new file mode 100644 index 00000000000..0d354d3b996 --- /dev/null +++ b/processing/src/test/java/org/apache/druid/data/input/impl/InlineInputSourceTest.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.impl; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.Collections; + +public class InlineInputSourceTest +{ + @Test + public void testGetTypes() + { + InlineInputSource inputSource = new InlineInputSource("data"); + Assert.assertEquals(Collections.singleton(InlineInputSource.TYPE_KEY), inputSource.getTypes()); + } +} diff --git a/processing/src/test/java/org/apache/druid/data/input/impl/InputSourceTest.java b/processing/src/test/java/org/apache/druid/data/input/impl/InputSourceTest.java new file mode 100644 index 00000000000..34c7850bf2b --- /dev/null +++ b/processing/src/test/java/org/apache/druid/data/input/impl/InputSourceTest.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.impl; + +import org.apache.druid.data.input.InputFormat; +import org.apache.druid.data.input.InputRowSchema; +import org.apache.druid.data.input.InputSource; +import org.apache.druid.data.input.InputSourceReader; +import org.apache.druid.java.util.common.UOE; +import org.junit.Assert; +import org.junit.Test; + +import javax.annotation.Nullable; +import java.io.File; + +public class InputSourceTest +{ + private static InputSource INPUT_SOURCE = new InputSource() + { + @Override + public boolean isSplittable() + { + return false; + } + + @Override + public boolean needsFormat() + { + return false; + } + + @Override + public InputSourceReader reader( + InputRowSchema inputRowSchema, + @Nullable InputFormat inputFormat, + File temporaryDirectory + ) + { + return null; + } + }; + + @Test + public void testGetTypes() + { + Assert.assertThrows(UOE.class, () -> INPUT_SOURCE.getTypes()); + } +} diff --git a/processing/src/test/java/org/apache/druid/data/input/impl/LocalInputSourceTest.java b/processing/src/test/java/org/apache/druid/data/input/impl/LocalInputSourceTest.java index 2917a2aa5f2..c72c52f462b 100644 --- a/processing/src/test/java/org/apache/druid/data/input/impl/LocalInputSourceTest.java +++ b/processing/src/test/java/org/apache/druid/data/input/impl/LocalInputSourceTest.java @@ -40,6 +40,7 @@ import java.io.Writer; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.util.ArrayList; +import java.util.Collections; import java.util.HashSet; import java.util.Iterator; import java.util.List; @@ -88,6 +89,13 @@ public class LocalInputSourceTest Assert.assertEquals(source, fromJson); } + @Test + public void testGetTypes() + { + final LocalInputSource source = new LocalInputSource(new File("myFile").getAbsoluteFile(), "myFilter"); + Assert.assertEquals(Collections.singleton(LocalInputSource.TYPE_KEY), source.getTypes()); + } + @Test public void testEquals() { diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java index b19aeaa2881..1820976e876 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java @@ -20,15 +20,20 @@ package org.apache.druid.indexing.overlord.supervisor; import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableSet; import org.apache.druid.indexing.overlord.DataSourceMetadata; import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats; +import org.apache.druid.server.security.ResourceAction; +import javax.annotation.Nonnull; import javax.annotation.Nullable; import java.util.ArrayList; import java.util.List; import java.util.Objects; +import java.util.Set; /** * Used as a tombstone marker in the supervisors metadata table to indicate that the supervisor has been removed. @@ -111,6 +116,14 @@ public class NoopSupervisorSpec implements SupervisorSpec return type; } + @Nonnull + @JsonIgnore + @Override + public Set getInputSourceTypes() + { + return ImmutableSet.of(); + } + @Override @JsonProperty("source") public String getSource() diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpec.java b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpec.java index 9b44cd08dd1..0386edc15c2 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpec.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpec.java @@ -19,11 +19,17 @@ package org.apache.druid.indexing.overlord.supervisor; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonSubTypes; import com.fasterxml.jackson.annotation.JsonTypeInfo; import org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.UOE; +import org.apache.druid.server.security.ResourceAction; +import javax.annotation.Nonnull; import java.util.List; +import java.util.Set; @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type") @JsonSubTypes(value = { @@ -71,6 +77,22 @@ public interface SupervisorSpec */ String getType(); + /** + * @return The types of {@link org.apache.druid.data.input.InputSource} that the task uses. Empty set is returned if + * the task does not use any. Users can be given permission to access particular types of + * input sources but not others, using the + * {@link org.apache.druid.server.security.AuthConfig#enableInputSourceSecurity} config. + */ + @JsonIgnore + @Nonnull + default Set getInputSourceTypes() throws UnsupportedOperationException + { + throw new UOE(StringUtils.format( + "SuperviserSpec type [%s], does not support input source based security", + getType() + )); + } + /** * This API is only used for informational purposes in * org.apache.druid.sql.calcite.schema.SystemSchema.SupervisorsTable diff --git a/server/src/main/java/org/apache/druid/metadata/input/InputSourceModule.java b/server/src/main/java/org/apache/druid/metadata/input/InputSourceModule.java index eb612f70958..f85adf5965d 100644 --- a/server/src/main/java/org/apache/druid/metadata/input/InputSourceModule.java +++ b/server/src/main/java/org/apache/druid/metadata/input/InputSourceModule.java @@ -41,7 +41,7 @@ public class InputSourceModule implements DruidModule return ImmutableList.of( new SimpleModule("InputSourceModule") .registerSubtypes( - new NamedType(SqlInputSource.class, "sql") + new NamedType(SqlInputSource.class, SqlInputSource.TYPE_KEY) ) ); } diff --git a/server/src/main/java/org/apache/druid/metadata/input/SqlInputSource.java b/server/src/main/java/org/apache/druid/metadata/input/SqlInputSource.java index c7dfbb7fa36..0064a343107 100644 --- a/server/src/main/java/org/apache/druid/metadata/input/SqlInputSource.java +++ b/server/src/main/java/org/apache/druid/metadata/input/SqlInputSource.java @@ -21,6 +21,7 @@ package org.apache.druid.metadata.input; import com.fasterxml.jackson.annotation.JacksonInject; import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Preconditions; @@ -35,15 +36,18 @@ import org.apache.druid.data.input.impl.SplittableInputSource; import org.apache.druid.guice.annotations.Smile; import org.apache.druid.metadata.SQLFirehoseDatabaseConnector; +import javax.annotation.Nonnull; import javax.annotation.Nullable; import java.io.File; import java.util.Collections; import java.util.List; import java.util.Objects; +import java.util.Set; import java.util.stream.Stream; public class SqlInputSource extends AbstractInputSource implements SplittableInputSource { + static final String TYPE_KEY = "sql"; private final List sqls; private final SQLFirehoseDatabaseConnector sqlFirehoseDatabaseConnector; private final ObjectMapper objectMapper; @@ -68,6 +72,14 @@ public class SqlInputSource extends AbstractInputSource implements SplittableInp this.objectMapper = objectMapper; } + @JsonIgnore + @Nonnull + @Override + public Set getTypes() + { + return Collections.singleton(TYPE_KEY); + } + @JsonProperty public List getSqls() { diff --git a/server/src/test/java/org/apache/druid/indexing/NoopSupervisorSpecTest.java b/server/src/test/java/org/apache/druid/indexing/NoopSupervisorSpecTest.java index fd5fac09e51..c0fbe92df11 100644 --- a/server/src/test/java/org/apache/druid/indexing/NoopSupervisorSpecTest.java +++ b/server/src/test/java/org/apache/druid/indexing/NoopSupervisorSpecTest.java @@ -64,4 +64,11 @@ public class NoopSupervisorSpecTest } Assert.assertNull(e); } + + @Test + public void testInputSourceTypes() + { + NoopSupervisorSpec noopSupervisorSpec = new NoopSupervisorSpec(null, Collections.singletonList("datasource1")); + Assert.assertTrue(noopSupervisorSpec.getInputSourceTypes().isEmpty()); + } } diff --git a/server/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpecTest.java b/server/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpecTest.java new file mode 100644 index 00000000000..4000fabad5b --- /dev/null +++ b/server/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpecTest.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.overlord.supervisor; + +import org.apache.druid.java.util.common.UOE; +import org.junit.Assert; +import org.junit.Test; + +import java.util.List; + +public class SupervisorSpecTest +{ + private static final SupervisorSpec SUPERVISOR_SPEC = new SupervisorSpec() + { + @Override + public String getId() + { + return null; + } + + @Override + public Supervisor createSupervisor() + { + return null; + } + + @Override + public List getDataSources() + { + return null; + } + + @Override + public String getType() + { + return null; + } + + @Override + public String getSource() + { + return null; + } + }; + + @Test + public void test() + { + Assert.assertThrows(UOE.class, () -> SUPERVISOR_SPEC.getInputSourceTypes()); + } +} diff --git a/server/src/test/java/org/apache/druid/metadata/input/SqlInputSourceTest.java b/server/src/test/java/org/apache/druid/metadata/input/SqlInputSourceTest.java index a83dec676bf..7a5ea7b2149 100644 --- a/server/src/test/java/org/apache/druid/metadata/input/SqlInputSourceTest.java +++ b/server/src/test/java/org/apache/druid/metadata/input/SqlInputSourceTest.java @@ -57,6 +57,7 @@ import java.io.File; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Objects; import java.util.Set; @@ -123,6 +124,17 @@ public class SqlInputSourceTest Assert.assertEquals(sqlInputSource, inputSourceFromJson); } + @Test + public void testGetTypes() + { + mapper.registerSubtypes(TestSerdeFirehoseConnector.class); + final SqlInputSourceTest.TestSerdeFirehoseConnector testSerdeFirehoseConnector = new SqlInputSourceTest.TestSerdeFirehoseConnector( + new MetadataStorageConnectorConfig()); + final SqlInputSource sqlInputSource = + new SqlInputSource(SqlTestUtils.selectFrom(TABLE_1), true, testSerdeFirehoseConnector, mapper); + Assert.assertEquals(Collections.singleton(SqlInputSource.TYPE_KEY), sqlInputSource.getTypes()); + } + @Test public void testSingleSplit() throws Exception { diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/SqlResourceCollectorShuttle.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/SqlResourceCollectorShuttle.java index 71c532d736d..850b472bc83 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/SqlResourceCollectorShuttle.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/SqlResourceCollectorShuttle.java @@ -51,14 +51,12 @@ public class SqlResourceCollectorShuttle extends SqlShuttle private final Set resourceActions; private final PlannerContext plannerContext; private final SqlValidator validator; - private final boolean inputSourceTypeSecurityEnabled; public SqlResourceCollectorShuttle(SqlValidator validator, PlannerContext plannerContext) { this.validator = validator; this.resourceActions = new HashSet<>(); this.plannerContext = plannerContext; - inputSourceTypeSecurityEnabled = plannerContext.getPlannerToolbox().getAuthConfig().isEnableInputSourceSecurity(); } @Override @@ -67,7 +65,7 @@ public class SqlResourceCollectorShuttle extends SqlShuttle if (call.getOperator() instanceof AuthorizableOperator) { resourceActions.addAll(((AuthorizableOperator) call.getOperator()).computeResources( call, - inputSourceTypeSecurityEnabled + plannerContext.getPlannerToolbox().getAuthConfig().isEnableInputSourceSecurity() )); }