Allow for Input source security in native task layer (#14003)

Fixes #13837.

### Description

This change allows for input source type security in the native task layer.

To enable this feature, the user must set the following property to true:

`druid.auth.enableInputSourceSecurity=true`

The default value for this property is false, which will continue the existing functionality of needing authorization to write to the respective datasource.

When this config is enabled, the users will be required to be authorized for the following resource action, in addition to write permission on the respective datasource.

`new ResourceAction(new Resource(ResourceType.EXTERNAL, {INPUT_SOURCE_TYPE}, Action.READ`

where `{INPUT_SOURCE_TYPE}` is the type of the input source being used;, http, inline, s3, etc..

Only tasks that provide a non-default implementation of the `getInputSourceResources` method can be submitted when config `druid.auth.enableInputSourceSecurity=true` is set. Otherwise, a 400 error will be thrown.
This commit is contained in:
zachjsh 2023-04-06 13:13:09 -04:00 committed by GitHub
parent 92912a6a2b
commit 5c0221375c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
64 changed files with 1610 additions and 68 deletions

View File

@ -31,6 +31,7 @@
<writeAnnotation name="org.powermock.api.easymock.annotation.Mock" />
</writeAnnotations>
</component>
<component name="ExternalStorageConfigurationManager" enabled="true" />
<component name="JavaScriptSettings">
<option name="languageLevel" value="ES6" />
</component>
@ -46,7 +47,7 @@
<option name="myDefaultNotNull" value="javax.annotation.Nonnull" />
<option name="myNullables">
<value>
<list size="12">
<list size="15">
<item index="0" class="java.lang.String" itemvalue="org.jetbrains.annotations.Nullable" />
<item index="1" class="java.lang.String" itemvalue="javax.annotation.Nullable" />
<item index="2" class="java.lang.String" itemvalue="javax.annotation.CheckForNull" />
@ -59,12 +60,15 @@
<item index="9" class="java.lang.String" itemvalue="org.checkerframework.checker.nullness.compatqual.NullableType" />
<item index="10" class="java.lang.String" itemvalue="androidx.annotation.RecentlyNullable" />
<item index="11" class="java.lang.String" itemvalue="com.android.annotations.Nullable" />
<item index="12" class="java.lang.String" itemvalue="org.jspecify.nullness.Nullable" />
<item index="13" class="java.lang.String" itemvalue="org.eclipse.jdt.annotation.Nullable" />
<item index="14" class="java.lang.String" itemvalue="jakarta.annotation.Nullable" />
</list>
</value>
</option>
<option name="myNotNulls">
<value>
<list size="11">
<list size="15">
<item index="0" class="java.lang.String" itemvalue="org.jetbrains.annotations.NotNull" />
<item index="1" class="java.lang.String" itemvalue="javax.annotation.Nonnull" />
<item index="2" class="java.lang.String" itemvalue="edu.umd.cs.findbugs.annotations.NonNull" />
@ -76,6 +80,10 @@
<item index="8" class="java.lang.String" itemvalue="org.checkerframework.checker.nullness.compatqual.NonNullType" />
<item index="9" class="java.lang.String" itemvalue="androidx.annotation.RecentlyNonNull" />
<item index="10" class="java.lang.String" itemvalue="com.android.annotations.NonNull" />
<item index="11" class="java.lang.String" itemvalue="jakarta.annotation.Nonnull" />
<item index="12" class="java.lang.String" itemvalue="lombok.NonNull" />
<item index="13" class="java.lang.String" itemvalue="org.jspecify.nullness.NonNull" />
<item index="14" class="java.lang.String" itemvalue="org.eclipse.jdt.annotation.NonNull" />
</list>
</value>
</option>
@ -84,7 +92,7 @@
<resource url="http://maven.apache.org/ASSEMBLY/2.0.0" location="$PROJECT_DIR$/.idea/xml-schemas/assembly-2.0.0.xsd" />
<resource url="http://www.w3.org/Graphics/SVG/1.1/DTD/svg11.dtd" location="$PROJECT_DIR$/.idea/xml-schemas/svg11.dtd" />
</component>
<component name="ProjectRootManager" version="2" languageLevel="JDK_1_8" default="false" project-jdk-name="1.8" project-jdk-type="JavaSDK">
<component name="ProjectRootManager" version="2" languageLevel="JDK_1_8" project-jdk-name="1.8" project-jdk-type="JavaSDK">
<output url="file://$PROJECT_DIR$/classes" />
</component>
</project>

View File

@ -21,6 +21,7 @@ package org.apache.druid.data.input.azure;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.collect.Iterators;
@ -35,12 +36,15 @@ import org.apache.druid.storage.azure.AzureCloudBlobIterableFactory;
import org.apache.druid.storage.azure.AzureInputDataConfig;
import org.apache.druid.storage.azure.AzureStorage;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Set;
/**
* Abstracts the Azure storage system where input data is stored. Allows users to retrieve entities in
@ -77,6 +81,14 @@ public class AzureInputSource extends CloudObjectInputSource
this.inputDataConfig = Preconditions.checkNotNull(inputDataConfig, "AzureInputDataConfig");
}
@JsonIgnore
@Nonnull
@Override
public Set<String> getTypes()
{
return Collections.singleton(SCHEME);
}
@Override
public SplittableInputSource<List<CloudObjectLocation>> withSplit(InputSplit<List<CloudObjectLocation>> split)
{

View File

@ -20,6 +20,7 @@
package org.apache.druid.data.input.azure;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterators;
import nl.jqno.equalsverifier.EqualsVerifier;
import org.apache.druid.data.input.InputSplit;
@ -265,6 +266,23 @@ public class AzureInputSourceTest extends EasyMockSupport
Assert.assertEquals("AzureInputSource{uris=[], prefixes=[azure://container/blob], objects=[], objectGlob=null}", actualToString);
}
@Test
public void test_getTypes_returnsExpectedTypes()
{
List<URI> prefixes = ImmutableList.of(PREFIX_URI);
azureInputSource = new AzureInputSource(
storage,
entityFactory,
azureCloudBlobIterableFactory,
inputDataConfig,
EMPTY_URIS,
prefixes,
EMPTY_OBJECTS,
null
);
Assert.assertEquals(ImmutableSet.of(AzureInputSource.SCHEME), azureInputSource.getTypes());
}
@Test
public void abidesEqualsContract()
{

View File

@ -21,6 +21,7 @@ package org.apache.druid.data.input.google;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.api.services.storage.model.StorageObject;
import com.google.common.collect.Iterators;
@ -36,15 +37,19 @@ import org.apache.druid.storage.google.GoogleStorage;
import org.apache.druid.storage.google.GoogleStorageDruidModule;
import org.apache.druid.storage.google.GoogleUtils;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.io.IOException;
import java.math.BigInteger;
import java.net.URI;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
public class GoogleCloudStorageInputSource extends CloudObjectInputSource
{
static final String TYPE_KEY = GoogleStorageDruidModule.SCHEME;
private static final Logger LOG = new Logger(GoogleCloudStorageInputSource.class);
private final GoogleStorage storage;
@ -65,6 +70,14 @@ public class GoogleCloudStorageInputSource extends CloudObjectInputSource
this.inputDataConfig = inputDataConfig;
}
@JsonIgnore
@Nonnull
@Override
public Set<String> getTypes()
{
return Collections.singleton(TYPE_KEY);
}
@Override
protected InputEntity createEntity(CloudObjectLocation location)
{

View File

@ -153,6 +153,14 @@ public class GoogleCloudStorageInputSourceTest extends InitializedNullHandlingTe
Assert.assertEquals(withObjects, serdeWithObjects);
}
@Test
public void testGetTypes()
{
final GoogleCloudStorageInputSource inputSource =
new GoogleCloudStorageInputSource(STORAGE, INPUT_DATA_CONFIG, EXPECTED_URIS, ImmutableList.of(), null, null);
Assert.assertEquals(Collections.singleton(GoogleCloudStorageInputSource.TYPE_KEY), inputSource.getTypes());
}
@Test
public void testWithUrisSplit() throws Exception
{

View File

@ -21,6 +21,7 @@ package org.apache.druid.inputsource.hdfs;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
@ -38,6 +39,7 @@ import org.apache.druid.data.input.impl.SplittableInputSource;
import org.apache.druid.guice.Hdfs;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.storage.hdfs.HdfsStorageDruidModule;
import org.apache.druid.utils.Streams;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
@ -48,6 +50,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
@ -56,11 +59,13 @@ import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
public class HdfsInputSource extends AbstractInputSource implements SplittableInputSource<List<Path>>
{
static final String TYPE_KEY = HdfsStorageDruidModule.SCHEME;
private static final String PROP_PATHS = "paths";
private final List<String> inputPaths;
@ -91,6 +96,14 @@ public class HdfsInputSource extends AbstractInputSource implements SplittableIn
this.inputPaths.forEach(p -> verifyProtocol(configuration, inputSourceConfig, p));
}
@JsonIgnore
@Nonnull
@Override
public Set<String> getTypes()
{
return Collections.singleton(TYPE_KEY);
}
public static List<String> coerceInputPathsToList(Object inputPaths, String propertyName)
{
if (inputPaths instanceof String) {

View File

@ -50,7 +50,7 @@ import java.util.Properties;
*/
public class HdfsStorageDruidModule implements DruidModule
{
static final String SCHEME = "hdfs";
public static final String SCHEME = "hdfs";
private Properties props = null;
@Inject

View File

@ -163,6 +163,20 @@ public class HdfsInputSourceTest extends InitializedNullHandlingTest
.inputSourceConfig(DEFAULT_INPUT_SOURCE_CONFIG)
.build();
}
@Test
public void testGetTypes()
{
final Configuration conf = new Configuration();
conf.set("fs.default.name", "hdfs://localhost:7020");
HdfsInputSource inputSource = HdfsInputSource.builder()
.paths("/foo/bar*")
.configuration(conf)
.inputSourceConfig(DEFAULT_INPUT_SOURCE_CONFIG)
.build();
Assert.assertEquals(Collections.singleton(HdfsInputSource.TYPE_KEY), inputSource.getTypes());
}
}
public static class SerializeDeserializeTest

View File

@ -21,6 +21,7 @@ package org.apache.druid.indexing.kafka;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
@ -30,12 +31,20 @@ import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.server.security.Action;
import org.apache.druid.server.security.Resource;
import org.apache.druid.server.security.ResourceAction;
import org.apache.druid.server.security.ResourceType;
import javax.annotation.Nonnull;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
public class KafkaIndexTask extends SeekableStreamIndexTask<Integer, Long, KafkaRecordEntity>
{
public static final String INPUT_SOURCE_TYPE = "kafka";
private static final String TYPE = "index_kafka";
private final ObjectMapper configMapper;
@ -132,6 +141,17 @@ public class KafkaIndexTask extends SeekableStreamIndexTask<Integer, Long, Kafka
return TYPE;
}
@Nonnull
@JsonIgnore
@Override
public Set<ResourceAction> getInputSourceResources()
{
return Collections.singleton(new ResourceAction(
new Resource(ResourceType.EXTERNAL, INPUT_SOURCE_TYPE),
Action.READ
));
}
@Override
public boolean supportsQueries()
{

View File

@ -21,6 +21,7 @@ package org.apache.druid.indexing.kafka.supervisor;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.guice.annotations.Json;
@ -35,13 +36,20 @@ import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.server.metrics.DruidMonitorSchedulerConfig;
import org.apache.druid.server.security.Action;
import org.apache.druid.server.security.Resource;
import org.apache.druid.server.security.ResourceAction;
import org.apache.druid.server.security.ResourceType;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
public class KafkaSupervisorSpec extends SeekableStreamSupervisorSpec
{
private static final String TASK_TYPE = "kafka";
static final String TASK_TYPE = "kafka";
@JsonCreator
public KafkaSupervisorSpec(
@ -92,6 +100,17 @@ public class KafkaSupervisorSpec extends SeekableStreamSupervisorSpec
return TASK_TYPE;
}
@Nonnull
@JsonIgnore
@Override
public Set<ResourceAction> getInputSourceTypes()
{
return Collections.singleton(new ResourceAction(
new Resource(ResourceType.EXTERNAL, TASK_TYPE),
Action.READ
));
}
@Override
public String getSource()
{

View File

@ -111,6 +111,10 @@ import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.segment.transform.ExpressionTransform;
import org.apache.druid.segment.transform.TransformSpec;
import org.apache.druid.server.security.Action;
import org.apache.druid.server.security.Resource;
import org.apache.druid.server.security.ResourceAction;
import org.apache.druid.server.security.ResourceType;
import org.apache.druid.timeline.DataSegment;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
@ -2672,6 +2676,45 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
Assert.assertEquals(task, task1);
}
@Test
public void testCorrectInputSources() throws Exception
{
// This is both a serde test and a regression test for https://github.com/apache/druid/issues/7724.
final KafkaIndexTask task = createTask(
"taskid",
NEW_DATA_SCHEMA.withTransformSpec(
new TransformSpec(
null,
ImmutableList.of(new ExpressionTransform("beep", "nofunc()", ExprMacroTable.nil()))
)
),
new KafkaIndexTaskIOConfig(
0,
"sequence",
new SeekableStreamStartSequenceNumbers<>(topic, ImmutableMap.of(), ImmutableSet.of()),
new SeekableStreamEndSequenceNumbers<>(topic, ImmutableMap.of()),
ImmutableMap.of(),
KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
true,
null,
null,
INPUT_FORMAT,
null
)
);
Assert.assertEquals(
Collections.singleton(
new ResourceAction(new Resource(
ResourceType.EXTERNAL,
KafkaIndexTask.INPUT_SOURCE_TYPE
), Action.READ)),
task.getInputSourceResources()
);
}
/**
* Wait for a task to consume certain offsets (inclusive).
*/

View File

@ -91,6 +91,10 @@ import org.apache.druid.segment.realtime.FireDepartment;
import org.apache.druid.server.metrics.DruidMonitorSchedulerConfig;
import org.apache.druid.server.metrics.ExceptionCapturingServiceEmitter;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.apache.druid.server.security.Action;
import org.apache.druid.server.security.Resource;
import org.apache.druid.server.security.ResourceAction;
import org.apache.druid.server.security.ResourceType;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.NewPartitions;
import org.apache.kafka.clients.admin.NewTopic;
@ -433,6 +437,13 @@ public class KafkaSupervisorTest extends EasyMockSupport
Long.MAX_VALUE,
(long) taskConfig.getEndSequenceNumbers().getPartitionSequenceNumberMap().get(2)
);
Assert.assertEquals(
Collections.singleton(new ResourceAction(
new Resource(ResourceType.EXTERNAL, KafkaSupervisorSpec.TASK_TYPE),
Action.READ
)),
testableSupervisorSpec.getInputSourceTypes()
);
autoscaler.reset();
autoscaler.stop();

View File

@ -21,6 +21,7 @@ package org.apache.druid.indexing.kinesis;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
@ -34,12 +35,20 @@ import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.server.security.Action;
import org.apache.druid.server.security.Resource;
import org.apache.druid.server.security.ResourceAction;
import org.apache.druid.server.security.ResourceType;
import org.apache.druid.utils.RuntimeInfo;
import javax.annotation.Nonnull;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
public class KinesisIndexTask extends SeekableStreamIndexTask<String, String, ByteEntity>
{
public static final String INPUT_SOURCE_TYPE = "kinesis";
private static final String TYPE = "index_kinesis";
private static final Logger log = new Logger(KinesisIndexTask.class);
@ -147,6 +156,17 @@ public class KinesisIndexTask extends SeekableStreamIndexTask<String, String, By
return TYPE;
}
@Nonnull
@JsonIgnore
@Override
public Set<ResourceAction> getInputSourceResources()
{
return Collections.singleton(new ResourceAction(
new Resource(ResourceType.EXTERNAL, INPUT_SOURCE_TYPE),
Action.READ
));
}
@Override
public boolean supportsQueries()
{

View File

@ -21,6 +21,7 @@ package org.apache.druid.indexing.kinesis.supervisor;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.inject.name.Named;
@ -38,13 +39,20 @@ import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.server.metrics.DruidMonitorSchedulerConfig;
import org.apache.druid.server.security.Action;
import org.apache.druid.server.security.Resource;
import org.apache.druid.server.security.ResourceAction;
import org.apache.druid.server.security.ResourceType;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
public class KinesisSupervisorSpec extends SeekableStreamSupervisorSpec
{
private static final String SUPERVISOR_TYPE = "kinesis";
static final String SUPERVISOR_TYPE = "kinesis";
private final AWSCredentialsConfig awsCredentialsConfig;
@JsonCreator
@ -114,6 +122,17 @@ public class KinesisSupervisorSpec extends SeekableStreamSupervisorSpec
return SUPERVISOR_TYPE;
}
@Nonnull
@JsonIgnore
@Override
public Set<ResourceAction> getInputSourceTypes()
{
return Collections.singleton(new ResourceAction(
new Resource(ResourceType.EXTERNAL, SUPERVISOR_TYPE),
Action.READ
));
}
@Override
public String getSource()
{

View File

@ -36,6 +36,10 @@ import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
import org.apache.druid.segment.realtime.firehose.NoopChatHandlerProvider;
import org.apache.druid.server.security.Action;
import org.apache.druid.server.security.Resource;
import org.apache.druid.server.security.ResourceAction;
import org.apache.druid.server.security.ResourceType;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
@ -127,6 +131,14 @@ public class KinesisIndexTaskSerdeTest
Assert.assertEquals(ACCESS_KEY, awsCredentialsConfig.getAccessKey().getPassword());
Assert.assertEquals(SECRET_KEY, awsCredentialsConfig.getSecretKey().getPassword());
Assert.assertEquals(FILE_SESSION_CREDENTIALS, awsCredentialsConfig.getFileSessionCredentials());
Assert.assertEquals(
Collections.singleton(
new ResourceAction(new Resource(
ResourceType.EXTERNAL,
KinesisIndexTask.INPUT_SOURCE_TYPE
), Action.READ)),
target.getInputSourceResources()
);
}
private static ObjectMapper createObjectMapper()

View File

@ -88,6 +88,10 @@ import org.apache.druid.segment.realtime.FireDepartment;
import org.apache.druid.server.metrics.DruidMonitorSchedulerConfig;
import org.apache.druid.server.metrics.ExceptionCapturingServiceEmitter;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.apache.druid.server.security.Action;
import org.apache.druid.server.security.Resource;
import org.apache.druid.server.security.ResourceAction;
import org.apache.druid.server.security.ResourceType;
import org.easymock.Capture;
import org.easymock.CaptureType;
import org.easymock.EasyMock;
@ -4080,6 +4084,59 @@ public class KinesisSupervisorTest extends EasyMockSupport
testShardSplitPhaseThree(phaseTwoTasks);
}
@Test
public void testCorrectInputSources()
{
KinesisSupervisorSpec supervisorSpec = new KinesisSupervisorSpec(
null,
dataSchema,
null,
new KinesisSupervisorIOConfig(
STREAM,
null,
null,
null,
null,
null,
null,
null,
null,
true,
null,
null,
null,
null,
null,
null,
null,
null,
null,
false
),
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null
);
Assert.assertEquals(
Collections.singleton(
new ResourceAction(
new Resource(ResourceType.EXTERNAL, KinesisSupervisorSpec.SUPERVISOR_TYPE),
Action.READ
)),
supervisorSpec.getInputSourceTypes()
);
}
private List<Task> testShardSplitPhaseOne() throws Exception
{
supervisorRecordSupplier.assign(EasyMock.anyObject());

View File

@ -30,6 +30,7 @@ import com.amazonaws.services.securitytoken.AWSSecurityTokenService;
import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClientBuilder;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.annotations.VisibleForTesting;
@ -56,13 +57,16 @@ import org.apache.druid.storage.s3.ServerSideEncryptingAmazonS3;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.net.URI;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
public class S3InputSource extends CloudObjectInputSource
{
public static final String TYPE_KEY = S3StorageDruidModule.SCHEME;
// We lazily initialize ServerSideEncryptingAmazonS3 to avoid costly s3 operation when we only need S3InputSource
// for stored information (such as for task logs) and not for ingestion.
// (This cost only applies for new ServerSideEncryptingAmazonS3 created with s3InputSourceConfig given).
@ -236,6 +240,14 @@ public class S3InputSource extends CloudObjectInputSource
this.maxRetries = maxRetries;
}
@JsonIgnore
@Nonnull
@Override
public Set<String> getTypes()
{
return Collections.singleton(TYPE_KEY);
}
private void applyAssumeRole(
ServerSideEncryptingAmazonS3.Builder s3ClientBuilder,
S3InputSourceConfig s3InputSourceConfig,

View File

@ -336,6 +336,25 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
EasyMock.verify(SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER);
}
@Test
public void testGetTypes()
{
final S3InputSource inputSource = new S3InputSource(
SERVICE,
SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER,
INPUT_DATA_CONFIG,
EXPECTED_URIS,
null,
null,
null,
null,
null,
null,
null
);
Assert.assertEquals(Collections.singleton(S3InputSource.TYPE_KEY), inputSource.getTypes());
}
@Test
public void testS3InputSourceUseEndPointClientProxy()
{

View File

@ -30,6 +30,7 @@ import com.google.common.base.Preconditions;
import com.google.common.collect.BiMap;
import com.google.common.collect.HashBiMap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import org.apache.curator.shaded.com.google.common.base.Verify;
@ -90,6 +91,7 @@ import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
import org.apache.druid.segment.transform.TransformSpec;
import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
import org.apache.druid.server.coordinator.duty.CompactSegments;
import org.apache.druid.server.security.ResourceAction;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentTimeline;
import org.apache.druid.timeline.TimelineObjectHolder;
@ -395,6 +397,14 @@ public class CompactionTask extends AbstractBatchIndexTask
return TYPE;
}
@Nonnull
@JsonIgnore
@Override
public Set<ResourceAction> getInputSourceResources()
{
return ImmutableSet.of();
}
@Override
public int getPriority()
{

View File

@ -67,11 +67,15 @@ import org.apache.druid.segment.realtime.firehose.ChatHandler;
import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
import org.apache.druid.server.security.Action;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.server.security.Resource;
import org.apache.druid.server.security.ResourceAction;
import org.apache.druid.server.security.ResourceType;
import org.apache.druid.timeline.DataSegment;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.util.ToolRunner;
import org.joda.time.Interval;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.GET;
@ -85,9 +89,11 @@ import java.io.File;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
public class HadoopIndexTask extends HadoopTask implements ChatHandler
@ -193,6 +199,14 @@ public class HadoopIndexTask extends HadoopTask implements ChatHandler
return "index_hadoop";
}
@Nonnull
@JsonIgnore
@Override
public Set<ResourceAction> getInputSourceResources()
{
return Collections.singleton(new ResourceAction(new Resource(ResourceType.EXTERNAL, "hadoop"), Action.READ));
}
@Override
public boolean isReady(TaskActionClient taskActionClient) throws Exception
{

View File

@ -20,6 +20,7 @@
package org.apache.druid.indexing.common.task;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonInclude.Include;
import com.fasterxml.jackson.annotation.JsonProperty;
@ -29,6 +30,7 @@ import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hashing;
import com.google.common.util.concurrent.ListenableFuture;
@ -97,6 +99,9 @@ import org.apache.druid.segment.realtime.firehose.ChatHandler;
import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
import org.apache.druid.server.security.Action;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.server.security.Resource;
import org.apache.druid.server.security.ResourceAction;
import org.apache.druid.server.security.ResourceType;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec;
import org.apache.druid.timeline.partition.NumberedShardSpec;
@ -130,6 +135,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
{
@ -288,6 +294,22 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
}
}
@Nonnull
@JsonIgnore
@Override
public Set<ResourceAction> getInputSourceResources()
{
if (ingestionSchema.getIOConfig().firehoseFactory != null) {
throw getInputSecurityOnFirehoseUnsupportedError();
}
return getIngestionSchema().getIOConfig().getInputSource() != null ?
getIngestionSchema().getIOConfig().getInputSource().getTypes()
.stream()
.map(i -> new ResourceAction(new Resource(ResourceType.EXTERNAL, i), Action.READ))
.collect(Collectors.toSet()) :
ImmutableSet.of();
}
@GET
@Path("/unparseableEvents")
@Produces(MediaType.APPLICATION_JSON)

View File

@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.data.input.Firehose;
import org.apache.druid.data.input.FirehoseFactory;
import org.apache.druid.indexer.TaskStatus;
@ -33,9 +34,12 @@ import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.server.security.ResourceAction;
import javax.annotation.Nonnull;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
/**
@ -100,6 +104,14 @@ public class NoopTask extends AbstractTask
return "noop";
}
@Nonnull
@JsonIgnore
@Override
public Set<ResourceAction> getInputSourceResources()
{
return ImmutableSet.of();
}
@JsonProperty
public long getRunTime()
{

View File

@ -19,6 +19,7 @@
package org.apache.druid.indexing.common.task;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonSubTypes.Type;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
@ -36,10 +37,15 @@ import org.apache.druid.indexing.common.task.batch.parallel.PartialGenericSegmen
import org.apache.druid.indexing.common.task.batch.parallel.PartialHashSegmentGenerateTask;
import org.apache.druid.indexing.common.task.batch.parallel.PartialRangeSegmentGenerateTask;
import org.apache.druid.indexing.common.task.batch.parallel.SinglePhaseSubTask;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.UOE;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.server.security.ResourceAction;
import javax.annotation.Nonnull;
import java.util.Map;
import java.util.Set;
/**
* Represents a task that can run on a worker. The general contracts surrounding Tasks are:
@ -138,6 +144,33 @@ public interface Task
*/
String getDataSource();
/**
* @return The types of {@link org.apache.druid.data.input.InputSource} that the task uses. Empty set is returned if
* the task does not use any. Users can be given permission to access particular types of
* input sources but not others, using the
* {@link org.apache.druid.server.security.AuthConfig#enableInputSourceSecurity} config.
* @throws UnsupportedOperationException if the given task type does not suppoert input source based security. Such
* would be the case, if the task uses firehose.
*/
@JsonIgnore
@Nonnull
default Set<ResourceAction> getInputSourceResources() throws UOE
{
throw new UOE(StringUtils.format(
"Task type [%s], does not support input source based security",
getType()
));
}
default UOE getInputSecurityOnFirehoseUnsupportedError()
{
throw new UOE(StringUtils.format(
"Input source based security cannot be performed '%s' task because it uses firehose."
+ " Change the tasks configuration, or disable `isEnableInputSourceSecurity`",
getType()
));
}
/**
* Returns query runners for this task. If this task is not meant to answer queries over its datasource, this method
* should return null.

View File

@ -20,11 +20,20 @@
package org.apache.druid.indexing.common.task.batch.parallel;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.server.security.Action;
import org.apache.druid.server.security.Resource;
import org.apache.druid.server.security.ResourceAction;
import org.apache.druid.server.security.ResourceType;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
public class LegacySinglePhaseSubTask extends SinglePhaseSubTask
{
@ -56,4 +65,20 @@ public class LegacySinglePhaseSubTask extends SinglePhaseSubTask
{
return SinglePhaseSubTask.OLD_TYPE_NAME;
}
@Nonnull
@JsonIgnore
@Override
public Set<ResourceAction> getInputSourceResources()
{
if (getIngestionSchema().getIOConfig().getFirehoseFactory() != null) {
throw getInputSecurityOnFirehoseUnsupportedError();
}
return getIngestionSchema().getIOConfig().getInputSource() != null ?
getIngestionSchema().getIOConfig().getInputSource().getTypes()
.stream()
.map(i -> new ResourceAction(new Resource(ResourceType.EXTERNAL, i), Action.READ))
.collect(Collectors.toSet()) :
ImmutableSet.of();
}
}

View File

@ -20,12 +20,14 @@
package org.apache.druid.indexing.common.task.batch.parallel;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.datasketches.hll.HllSketch;
import org.apache.datasketches.hll.Union;
import org.apache.datasketches.memory.Memory;
@ -76,6 +78,9 @@ import org.apache.druid.segment.realtime.firehose.ChatHandler;
import org.apache.druid.segment.realtime.firehose.ChatHandlers;
import org.apache.druid.server.security.Action;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.server.security.Resource;
import org.apache.druid.server.security.ResourceAction;
import org.apache.druid.server.security.ResourceType;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.BuildingShardSpec;
import org.apache.druid.timeline.partition.NumberedShardSpec;
@ -270,6 +275,22 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
return TYPE;
}
@Nonnull
@JsonIgnore
@Override
public Set<ResourceAction> getInputSourceResources()
{
if (getIngestionSchema().getIOConfig().getFirehoseFactory() != null) {
throw getInputSecurityOnFirehoseUnsupportedError();
}
return getIngestionSchema().getIOConfig().getInputSource() != null ?
getIngestionSchema().getIOConfig().getInputSource().getTypes()
.stream()
.map(i -> new ResourceAction(new Resource(ResourceType.EXTERNAL, i), Action.READ))
.collect(Collectors.toSet()) :
ImmutableSet.of();
}
@JsonProperty("spec")
public ParallelIndexIngestionSpec getIngestionSchema()
{

View File

@ -21,11 +21,13 @@ package org.apache.druid.indexing.common.task.batch.parallel;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
import org.apache.datasketches.hll.HllSketch;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.InputRow;
@ -43,16 +45,23 @@ import org.apache.druid.segment.incremental.ParseExceptionHandler;
import org.apache.druid.segment.incremental.RowIngestionMeters;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.granularity.GranularitySpec;
import org.apache.druid.server.security.Action;
import org.apache.druid.server.security.Resource;
import org.apache.druid.server.security.ResourceAction;
import org.apache.druid.server.security.ResourceType;
import org.apache.druid.timeline.partition.HashPartitioner;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
public class PartialDimensionCardinalityTask extends PerfectRollupWorkerTask
{
@ -133,6 +142,22 @@ public class PartialDimensionCardinalityTask extends PerfectRollupWorkerTask
return TYPE;
}
@Nonnull
@JsonIgnore
@Override
public Set<ResourceAction> getInputSourceResources()
{
if (getIngestionSchema().getIOConfig().getFirehoseFactory() != null) {
throw getInputSecurityOnFirehoseUnsupportedError();
}
return getIngestionSchema().getIOConfig().getInputSource() != null ?
getIngestionSchema().getIOConfig().getInputSource().getTypes()
.stream()
.map(i -> new ResourceAction(new Resource(ResourceType.EXTERNAL, i), Action.READ))
.collect(Collectors.toSet()) :
ImmutableSet.of();
}
@Override
public boolean isReady(TaskActionClient taskActionClient) throws Exception
{

View File

@ -20,10 +20,12 @@
package org.apache.druid.indexing.common.task.batch.parallel;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.hash.BloomFilter;
import com.google.common.hash.Funnels;
@ -49,14 +51,21 @@ import org.apache.druid.segment.incremental.ParseExceptionHandler;
import org.apache.druid.segment.incremental.RowIngestionMeters;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.granularity.GranularitySpec;
import org.apache.druid.server.security.Action;
import org.apache.druid.server.security.Resource;
import org.apache.druid.server.security.ResourceAction;
import org.apache.druid.server.security.ResourceType;
import org.joda.time.Interval;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.Supplier;
import java.util.stream.Collectors;
/**
* The worker task of {@link PartialDimensionDistributionParallelIndexTaskRunner}. This task
@ -174,6 +183,22 @@ public class PartialDimensionDistributionTask extends PerfectRollupWorkerTask
return TYPE;
}
@Nonnull
@JsonIgnore
@Override
public Set<ResourceAction> getInputSourceResources()
{
if (getIngestionSchema().getIOConfig().getFirehoseFactory() != null) {
throw getInputSecurityOnFirehoseUnsupportedError();
}
return getIngestionSchema().getIOConfig().getInputSource() != null ?
getIngestionSchema().getIOConfig().getInputSource().getTypes()
.stream()
.map(i -> new ResourceAction(new Resource(ResourceType.EXTERNAL, i), Action.READ))
.collect(Collectors.toSet()) :
ImmutableSet.of();
}
@Override
public boolean isReady(TaskActionClient taskActionClient) throws Exception
{

View File

@ -20,7 +20,9 @@
package org.apache.druid.indexing.common.task.batch.parallel;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.indexing.common.TaskToolbox;
@ -33,6 +35,10 @@ import org.apache.druid.indexing.common.task.batch.parallel.iterator.DefaultInde
import org.apache.druid.indexing.common.task.batch.partition.HashPartitionAnalysis;
import org.apache.druid.indexing.worker.shuffle.ShuffleDataSegmentPusher;
import org.apache.druid.segment.indexing.granularity.GranularitySpec;
import org.apache.druid.server.security.Action;
import org.apache.druid.server.security.Resource;
import org.apache.druid.server.security.ResourceAction;
import org.apache.druid.server.security.ResourceType;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.PartialShardSpec;
import org.joda.time.Interval;
@ -42,6 +48,7 @@ import javax.annotation.Nullable;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
/**
@ -131,6 +138,22 @@ public class PartialHashSegmentGenerateTask extends PartialSegmentGenerateTask<G
return TYPE;
}
@Nonnull
@JsonIgnore
@Override
public Set<ResourceAction> getInputSourceResources()
{
if (getIngestionSchema().getIOConfig().getFirehoseFactory() != null) {
throw getInputSecurityOnFirehoseUnsupportedError();
}
return getIngestionSchema().getIOConfig().getInputSource() != null ?
getIngestionSchema().getIOConfig().getInputSource().getTypes()
.stream()
.map(i -> new ResourceAction(new Resource(ResourceType.EXTERNAL, i), Action.READ))
.collect(Collectors.toSet()) :
ImmutableSet.of();
}
@Override
public boolean isReady(TaskActionClient taskActionClient) throws Exception
{

View File

@ -20,8 +20,10 @@
package org.apache.druid.indexing.common.task.batch.parallel;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec;
import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec;
@ -35,14 +37,20 @@ import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.indexing.common.task.batch.parallel.iterator.RangePartitionIndexTaskInputRowIteratorBuilder;
import org.apache.druid.indexing.common.task.batch.partition.RangePartitionAnalysis;
import org.apache.druid.indexing.worker.shuffle.ShuffleDataSegmentPusher;
import org.apache.druid.server.security.Action;
import org.apache.druid.server.security.Resource;
import org.apache.druid.server.security.ResourceAction;
import org.apache.druid.server.security.ResourceType;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.PartitionBoundaries;
import org.joda.time.Interval;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
/**
@ -148,6 +156,22 @@ public class PartialRangeSegmentGenerateTask extends PartialSegmentGenerateTask<
return TYPE;
}
@Nonnull
@JsonIgnore
@Override
public Set<ResourceAction> getInputSourceResources()
{
if (getIngestionSchema().getIOConfig().getFirehoseFactory() != null) {
throw getInputSecurityOnFirehoseUnsupportedError();
}
return getIngestionSchema().getIOConfig().getInputSource() != null ?
getIngestionSchema().getIOConfig().getInputSource().getTypes()
.stream()
.map(i -> new ResourceAction(new Resource(ResourceType.EXTERNAL, i), Action.READ))
.collect(Collectors.toSet()) :
ImmutableSet.of();
}
@Override
public boolean isReady(TaskActionClient taskActionClient) throws IOException
{

View File

@ -20,6 +20,7 @@
package org.apache.druid.indexing.common.task.batch.parallel;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
@ -27,6 +28,7 @@ import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputSource;
import org.apache.druid.indexer.IngestionState;
@ -70,6 +72,9 @@ import org.apache.druid.segment.realtime.appenderator.SegmentsAndCommitMetadata;
import org.apache.druid.segment.realtime.firehose.ChatHandler;
import org.apache.druid.server.security.Action;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.server.security.Resource;
import org.apache.druid.server.security.ResourceAction;
import org.apache.druid.server.security.ResourceType;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentTimeline;
import org.apache.druid.timeline.TimelineObjectHolder;
@ -77,6 +82,7 @@ import org.apache.druid.timeline.partition.PartitionChunk;
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
import org.joda.time.Interval;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.GET;
@ -96,6 +102,7 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
/**
* The worker task of {@link SinglePhaseParallelIndexTaskRunner}. Similar to {@link IndexTask}, but this task
@ -190,6 +197,22 @@ public class SinglePhaseSubTask extends AbstractBatchSubtask implements ChatHand
return TYPE;
}
@Nonnull
@JsonIgnore
@Override
public Set<ResourceAction> getInputSourceResources()
{
if (getIngestionSchema().getIOConfig().getFirehoseFactory() != null) {
throw getInputSecurityOnFirehoseUnsupportedError();
}
return getIngestionSchema().getIOConfig().getInputSource() != null ?
getIngestionSchema().getIOConfig().getInputSource().getTypes()
.stream()
.map(i -> new ResourceAction(new Resource(ResourceType.EXTERNAL, i), Action.READ))
.collect(Collectors.toSet()) :
ImmutableSet.of();
}
@Override
public boolean isReady(TaskActionClient taskActionClient) throws IOException
{

View File

@ -19,6 +19,7 @@
package org.apache.druid.indexing.overlord.http;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
@ -61,6 +62,7 @@ import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.UOE;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.metadata.EntryExistsException;
import org.apache.druid.metadata.TaskLookup;
@ -73,6 +75,7 @@ import org.apache.druid.server.http.security.DatasourceResourceFilter;
import org.apache.druid.server.http.security.StateResourceFilter;
import org.apache.druid.server.security.Access;
import org.apache.druid.server.security.Action;
import org.apache.druid.server.security.AuthConfig;
import org.apache.druid.server.security.AuthorizationUtils;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.server.security.ForbiddenException;
@ -106,6 +109,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -131,6 +135,8 @@ public class OverlordResource
private final WorkerTaskRunnerQueryAdapter workerTaskRunnerQueryAdapter;
private final ProvisioningStrategy provisioningStrategy;
private final AuthConfig authConfig;
private AtomicReference<WorkerBehaviorConfig> workerConfigRef = null;
private static final List API_TASK_STATES = ImmutableList.of("pending", "waiting", "running", "complete");
@ -162,7 +168,8 @@ public class OverlordResource
AuditManager auditManager,
AuthorizerMapper authorizerMapper,
WorkerTaskRunnerQueryAdapter workerTaskRunnerQueryAdapter,
ProvisioningStrategy provisioningStrategy
ProvisioningStrategy provisioningStrategy,
AuthConfig authConfig
)
{
this.taskMaster = taskMaster;
@ -174,6 +181,7 @@ public class OverlordResource
this.authorizerMapper = authorizerMapper;
this.workerTaskRunnerQueryAdapter = workerTaskRunnerQueryAdapter;
this.provisioningStrategy = provisioningStrategy;
this.authConfig = authConfig;
}
/**
@ -187,15 +195,24 @@ public class OverlordResource
@Produces(MediaType.APPLICATION_JSON)
public Response taskPost(final Task task, @Context final HttpServletRequest req)
{
final String dataSource = task.getDataSource();
final ResourceAction resourceAction = new ResourceAction(
new Resource(dataSource, ResourceType.DATASOURCE),
Action.WRITE
);
final Set<ResourceAction> resourceActions;
try {
resourceActions = getNeededResourceActionsForTask(task);
}
catch (UOE e) {
return Response.status(Response.Status.BAD_REQUEST)
.entity(
ImmutableMap.of(
"error",
e.getMessage()
)
)
.build();
}
Access authResult = AuthorizationUtils.authorizeResourceAction(
Access authResult = AuthorizationUtils.authorizeAllResourceActions(
req,
resourceAction,
resourceActions,
authorizerMapper
);
@ -1086,6 +1103,18 @@ public class OverlordResource
}
}
@VisibleForTesting
Set<ResourceAction> getNeededResourceActionsForTask(Task task) throws UOE
{
final String dataSource = task.getDataSource();
final Set<ResourceAction> resourceActions = new HashSet<>();
resourceActions.add(new ResourceAction(new Resource(dataSource, ResourceType.DATASOURCE), Action.WRITE));
if (authConfig.isEnableInputSourceSecurity()) {
resourceActions.addAll(task.getInputSourceResources());
}
return resourceActions;
}
private List<TaskStatusPlus> securedTaskStatusPlus(
List<TaskStatusPlus> collectionToFilter,
@Nullable String dataSource,

View File

@ -80,6 +80,7 @@ import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.UOE;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.jackson.JacksonUtils;
@ -1280,6 +1281,20 @@ public class AppenderatorDriverRealtimeIndexTaskTest extends InitializedNullHand
Assert.assertEquals(TaskState.SUCCESS, taskStatus.getStatusCode());
}
@Test(timeout = 60_000L)
public void testInputSourceResourcesThrowException()
{
// Expect 2 segments as we will hit maxTotalRows
expectPublishedSegments(2);
final AppenderatorDriverRealtimeIndexTask task =
makeRealtimeTask(null, Integer.MAX_VALUE, 1500L);
Assert.assertThrows(
UOE.class,
task::getInputSourceResources
);
}
private ListenableFuture<TaskStatus> runTask(final Task task)
{
try {

View File

@ -62,6 +62,10 @@ import org.apache.druid.segment.SegmentUtils;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.segment.loading.NoopSegmentCacheManager;
import org.apache.druid.server.security.Action;
import org.apache.druid.server.security.Resource;
import org.apache.druid.server.security.ResourceAction;
import org.apache.druid.server.security.ResourceType;
import org.apache.druid.timeline.CompactionState;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.DimensionRangeShardSpec;
@ -87,6 +91,7 @@ import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@ -922,6 +927,15 @@ public class CompactionTaskParallelRunTest extends AbstractParallelIndexSupervis
null
);
Assert.assertEquals(
Collections.singleton(
new ResourceAction(new Resource(
ResourceType.EXTERNAL,
LocalInputSource.TYPE_KEY
), Action.READ)),
indexTask.getInputSourceResources()
);
runTask(indexTask);
}

View File

@ -673,6 +673,25 @@ public class CompactionTaskTest
assertEquals(expectedFromJson, fromJson);
}
@Test
public void testInputSourceResources()
{
final Builder builder = new Builder(
DATA_SOURCE,
segmentCacheManagerFactory,
RETRY_POLICY_FACTORY
);
final CompactionTask task = builder
.inputSpec(
new CompactionIntervalSpec(COMPACTION_INTERVAL, SegmentUtils.hashIds(SEGMENTS))
)
.tuningConfig(createTuningConfig())
.context(ImmutableMap.of("testKey", "testContext"))
.build();
Assert.assertTrue(task.getInputSourceResources().isEmpty());
}
@Test
public void testGetTuningConfigWithIndexTuningConfig()
{

View File

@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.common.task;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.indexer.HadoopIOConfig;
import org.apache.druid.indexer.HadoopIngestionSpec;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.server.security.Action;
import org.apache.druid.server.security.AuthTestUtils;
import org.apache.druid.server.security.Resource;
import org.apache.druid.server.security.ResourceAction;
import org.apache.druid.server.security.ResourceType;
import org.junit.Assert;
import org.junit.Test;
import java.util.Collections;
public class HadoopIndexTaskTest
{
private final ObjectMapper jsonMapper = new DefaultObjectMapper();
@Test
public void testCorrectInputSourceTypes()
{
final HadoopIndexTask task = new HadoopIndexTask(
null,
new HadoopIngestionSpec(
new DataSchema(
"foo", null, new AggregatorFactory[0], new UniformGranularitySpec(
Granularities.DAY,
null,
ImmutableList.of(Intervals.of("2010-01-01/P1D"))
),
null,
jsonMapper
), new HadoopIOConfig(ImmutableMap.of("paths", "bar"), null, null), null
),
null,
null,
"blah",
jsonMapper,
null,
AuthTestUtils.TEST_AUTHORIZER_MAPPER,
null
);
Assert.assertEquals(
Collections.singleton(
new ResourceAction(new Resource(
ResourceType.EXTERNAL,
"hadoop"
), Action.READ)),
task.getInputSourceResources()
);
}
}

View File

@ -91,6 +91,10 @@ import org.apache.druid.segment.realtime.plumber.NoopSegmentHandoffNotifierFacto
import org.apache.druid.segment.transform.ExpressionTransform;
import org.apache.druid.segment.transform.TransformSpec;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.apache.druid.server.security.Action;
import org.apache.druid.server.security.Resource;
import org.apache.druid.server.security.ResourceAction;
import org.apache.druid.server.security.ResourceType;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec;
@ -209,6 +213,54 @@ public class IndexTaskTest extends IngestionTestBase
taskRunner = new TestTaskRunner();
}
@Test
public void testCorrectInputSourceTypes() throws IOException
{
File tmpDir = temporaryFolder.newFolder();
IndexTask indexTask = new IndexTask(
null,
null,
new IndexIngestionSpec(
new DataSchema(
"test-json",
DEFAULT_TIMESTAMP_SPEC,
new DimensionsSpec(
ImmutableList.of(
new StringDimensionSchema("ts"),
new StringDimensionSchema("dim"),
new LongDimensionSchema("valDim")
)
),
new AggregatorFactory[]{new LongSumAggregatorFactory("valMet", "val")},
new UniformGranularitySpec(
Granularities.DAY,
Granularities.MINUTE,
Collections.singletonList(Intervals.of("2014/P1D"))
),
null
),
new IndexIOConfig(
null,
new LocalInputSource(tmpDir, "druid*"),
DEFAULT_INPUT_FORMAT,
false,
false
),
createTuningConfigWithMaxRowsPerSegment(10, true)
),
null
);
Assert.assertEquals(
Collections.singleton(
new ResourceAction(new Resource(
ResourceType.EXTERNAL,
LocalInputSource.TYPE_KEY
), Action.READ)),
indexTask.getInputSourceResources()
);
}
@Test
public void testIngestNullOnlyColumns() throws Exception
{

View File

@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.common.task;
import org.junit.Assert;
import org.junit.Test;
public class NoopTaskTest
{
@Test
public void testNullInputSources()
{
NoopTask task = new NoopTask("myID", null, null, 1, 0, null, null, null);
Assert.assertTrue(task.getInputSourceResources().isEmpty());
}
}

View File

@ -68,6 +68,7 @@ import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.UOE;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.jackson.JacksonUtils;
@ -199,6 +200,16 @@ public class RealtimeIndexTaskTest extends InitializedNullHandlingTest
Assert.assertTrue(task.supportsQueries());
}
@Test(timeout = 60_000L)
public void testInputSourceTypes()
{
final RealtimeIndexTask task = makeRealtimeTask(null);
Assert.assertThrows(
UOE.class,
task::getInputSourceResources
);
}
@Test(timeout = 60_000L, expected = ExecutionException.class)
public void testHandoffTimeout() throws Exception
{

View File

@ -0,0 +1,131 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.common.task;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.java.util.common.UOE;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryRunner;
import org.junit.Assert;
import org.junit.Test;
import java.util.Map;
public class TaskTest
{
private static final Task TASK = new Task()
{
@Override
public String getId()
{
return null;
}
@Override
public String getGroupId()
{
return null;
}
@Override
public TaskResource getTaskResource()
{
return null;
}
@Override
public String getType()
{
return null;
}
@Override
public String getNodeType()
{
return null;
}
@Override
public String getDataSource()
{
return null;
}
@Override
public <T> QueryRunner<T> getQueryRunner(Query<T> query)
{
return null;
}
@Override
public boolean supportsQueries()
{
return false;
}
@Override
public String getClasspathPrefix()
{
return null;
}
@Override
public boolean isReady(TaskActionClient taskActionClient)
{
return false;
}
@Override
public boolean canRestore()
{
return false;
}
@Override
public void stopGracefully(TaskConfig taskConfig)
{
}
@Override
public TaskStatus run(TaskToolbox toolbox)
{
return null;
}
@Override
public Map<String, Object> getContext()
{
return null;
}
};
@Test
public void testGetInputSourceTypes()
{
Assert.assertThrows(
UOE.class,
TASK::getInputSourceResources
);
}
}

View File

@ -44,6 +44,10 @@ import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.incremental.ParseExceptionHandler;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.server.security.Action;
import org.apache.druid.server.security.Resource;
import org.apache.druid.server.security.ResourceAction;
import org.apache.druid.server.security.ResourceType;
import org.apache.druid.testing.junit.LoggerCaptureRule;
import org.apache.logging.log4j.core.LogEvent;
import org.easymock.Capture;
@ -114,6 +118,21 @@ public class PartialDimensionCardinalityTaskTest
TestHelper.testSerializesDeserializes(OBJECT_MAPPER, task);
}
@Test
public void hasCorrectInputSourceTypes()
{
PartialDimensionCardinalityTask task = new PartialDimensionCardinalityTaskBuilder()
.build();
Assert.assertEquals(
Collections.singleton(
new ResourceAction(new Resource(
ResourceType.EXTERNAL,
InlineInputSource.TYPE_KEY
), Action.READ)),
task.getInputSourceResources()
);
}
@Test
public void hasCorrectPrefixForAutomaticId()
{

View File

@ -37,9 +37,12 @@ import org.apache.druid.indexing.common.stats.DropwizardRowIngestionMetersFactor
import org.apache.druid.indexing.common.task.batch.parallel.distribution.StringDistribution;
import org.apache.druid.indexing.common.task.batch.parallel.distribution.StringSketch;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.incremental.ParseExceptionHandler;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.server.security.Action;
import org.apache.druid.server.security.Resource;
import org.apache.druid.server.security.ResourceAction;
import org.apache.druid.server.security.ResourceType;
import org.apache.druid.testing.junit.LoggerCaptureRule;
import org.apache.druid.timeline.partition.PartitionBoundaries;
import org.apache.logging.log4j.core.LogEvent;
@ -108,14 +111,6 @@ public class PartialDimensionDistributionTaskTest
.build();
}
@Test
public void serializesDeserializes()
{
PartialDimensionDistributionTask task = new PartialDimensionDistributionTaskBuilder()
.build();
TestHelper.testSerializesDeserializes(OBJECT_MAPPER, task);
}
@Test
public void hasCorrectPrefixForAutomaticId()
{
@ -375,6 +370,22 @@ public class PartialDimensionDistributionTaskTest
Assert.assertEquals(TaskState.SUCCESS, taskStatus.getStatusCode());
}
@Test
public void testInputSourceResources()
{
PartialDimensionDistributionTask task = new PartialDimensionDistributionTaskBuilder()
.build();
Assert.assertEquals(
Collections.singleton(
new ResourceAction(
new Resource(ResourceType.EXTERNAL, InlineInputSource.TYPE_KEY),
Action.READ
)),
task.getInputSourceResources()
);
}
private DimensionDistributionReport runTask(PartialDimensionDistributionTaskBuilder taskBuilder)
{

View File

@ -30,6 +30,10 @@ import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.server.security.Action;
import org.apache.druid.server.security.Resource;
import org.apache.druid.server.security.ResourceAction;
import org.apache.druid.server.security.ResourceType;
import org.hamcrest.Matchers;
import org.joda.time.Interval;
import org.junit.Assert;
@ -39,6 +43,7 @@ import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.io.File;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@ -86,6 +91,19 @@ public class PartialHashSegmentGenerateTaskTest
Assert.assertThat(id, Matchers.startsWith(PartialHashSegmentGenerateTask.TYPE));
}
@Test
public void hasCorrectInputSourceTypes()
{
Assert.assertEquals(
Collections.singleton(
new ResourceAction(new Resource(
ResourceType.EXTERNAL,
LocalInputSource.TYPE_KEY
), Action.READ)),
target.getInputSourceResources()
);
}
@Test
public void testCreateHashPartitionAnalysisFromPartitionsSpecWithNumShardsReturningAnalysisOfValidNumBuckets()
{

View File

@ -30,6 +30,10 @@ import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.server.security.Action;
import org.apache.druid.server.security.Resource;
import org.apache.druid.server.security.ResourceAction;
import org.apache.druid.server.security.ResourceType;
import org.apache.druid.timeline.partition.PartitionBoundaries;
import org.hamcrest.Matchers;
import org.junit.Assert;
@ -101,6 +105,20 @@ public class PartialRangeSegmentGenerateTaskTest extends AbstractParallelIndexSu
TestHelper.testSerializesDeserializes(getObjectMapper(), task);
}
@Test
public void hasCorrectInputSourceTypes()
{
PartialRangeSegmentGenerateTask task = new PartialRangeSegmentGenerateTaskBuilder().build();
Assert.assertEquals(
Collections.singleton(
new ResourceAction(new Resource(
ResourceType.EXTERNAL,
InlineInputSource.TYPE_KEY
), Action.READ)),
task.getInputSourceResources()
);
}
@Test
public void hasCorrectPrefixForAutomaticId()
{

View File

@ -46,6 +46,10 @@ import org.apache.druid.segment.incremental.ParseExceptionReport;
import org.apache.druid.segment.incremental.RowIngestionMetersTotals;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.server.security.Action;
import org.apache.druid.server.security.Resource;
import org.apache.druid.server.security.ResourceAction;
import org.apache.druid.server.security.ResourceType;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.Partitions;
import org.apache.druid.timeline.SegmentTimeline;
@ -172,6 +176,14 @@ public class SinglePhaseParallelIndexingTest extends AbstractParallelIndexSuperv
final TaskActionClient subTaskActionClient = createActionClient(subTask);
prepareTaskForLocking(subTask);
Assert.assertTrue(subTask.isReady(subTaskActionClient));
Assert.assertEquals(
Collections.singleton(
new ResourceAction(new Resource(
ResourceType.EXTERNAL,
LocalInputSource.TYPE_KEY
), Action.READ)),
subTask.getInputSourceResources()
);
}
}

View File

@ -28,12 +28,17 @@ import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.server.security.Action;
import org.apache.druid.server.security.Resource;
import org.apache.druid.server.security.ResourceAction;
import org.apache.druid.server.security.ResourceType;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.Map;
public class SinglePhaseSubTaskSpecTest
@ -88,5 +93,13 @@ public class SinglePhaseSubTaskSpecTest
final byte[] json = mapper.writeValueAsBytes(expected);
final Map<String, Object> actual = mapper.readValue(json, Map.class);
Assert.assertEquals(SinglePhaseSubTask.OLD_TYPE_NAME, actual.get("type"));
Assert.assertEquals(
Collections.singleton(
new ResourceAction(new Resource(
ResourceType.EXTERNAL,
LocalInputSource.TYPE_KEY
), Action.READ)),
expected.getInputSourceResources()
);
}
}

View File

@ -52,6 +52,7 @@ import org.apache.druid.indexing.worker.config.WorkerConfig;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.RE;
import org.apache.druid.java.util.common.UOE;
import org.apache.druid.metadata.TaskLookup.ActiveTaskLookup;
import org.apache.druid.metadata.TaskLookup.CompleteTaskLookup;
import org.apache.druid.metadata.TaskLookup.TaskLookupType;
@ -64,6 +65,8 @@ import org.apache.druid.server.security.Authorizer;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.server.security.ForbiddenException;
import org.apache.druid.server.security.Resource;
import org.apache.druid.server.security.ResourceAction;
import org.apache.druid.server.security.ResourceType;
import org.easymock.EasyMock;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.joda.time.DateTime;
@ -87,6 +90,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
public class OverlordResourceTest
@ -95,6 +99,7 @@ public class OverlordResourceTest
private TaskMaster taskMaster;
private JacksonConfigManager configManager;
private ProvisioningStrategy provisioningStrategy;
private AuthConfig authConfig;
private TaskStorageQueryAdapter taskStorageQueryAdapter;
private IndexerMetadataStorageAdapter indexerMetadataStorageAdapter;
private HttpServletRequest req;
@ -110,6 +115,7 @@ public class OverlordResourceTest
taskRunner = EasyMock.createMock(TaskRunner.class);
configManager = EasyMock.createMock(JacksonConfigManager.class);
provisioningStrategy = EasyMock.createMock(ProvisioningStrategy.class);
authConfig = EasyMock.createMock(AuthConfig.class);
taskMaster = EasyMock.createStrictMock(TaskMaster.class);
taskStorageQueryAdapter = EasyMock.createStrictMock(TaskStorageQueryAdapter.class);
indexerMetadataStorageAdapter = EasyMock.createStrictMock(IndexerMetadataStorageAdapter.class);
@ -162,7 +168,8 @@ public class OverlordResourceTest
null,
authMapper,
workerTaskRunnerQueryAdapter,
provisioningStrategy
provisioningStrategy,
authConfig
);
}
@ -175,7 +182,8 @@ public class OverlordResourceTest
taskStorageQueryAdapter,
indexerMetadataStorageAdapter,
req,
workerTaskRunnerQueryAdapter
workerTaskRunnerQueryAdapter,
authConfig
);
}
@ -189,7 +197,8 @@ public class OverlordResourceTest
taskStorageQueryAdapter,
indexerMetadataStorageAdapter,
req,
workerTaskRunnerQueryAdapter
workerTaskRunnerQueryAdapter,
authConfig
);
final Response response = overlordResource.getLeader();
@ -208,7 +217,8 @@ public class OverlordResourceTest
taskStorageQueryAdapter,
indexerMetadataStorageAdapter,
req,
workerTaskRunnerQueryAdapter
workerTaskRunnerQueryAdapter,
authConfig
);
// true
@ -253,7 +263,8 @@ public class OverlordResourceTest
taskStorageQueryAdapter,
indexerMetadataStorageAdapter,
req,
workerTaskRunnerQueryAdapter
workerTaskRunnerQueryAdapter,
authConfig
);
List<TaskStatusPlus> responseObjects = (List<TaskStatusPlus>) overlordResource.getWaitingTasks(req)
@ -284,7 +295,8 @@ public class OverlordResourceTest
taskStorageQueryAdapter,
indexerMetadataStorageAdapter,
req,
workerTaskRunnerQueryAdapter
workerTaskRunnerQueryAdapter,
authConfig
);
List<TaskStatusPlus> responseObjects = (List) overlordResource
.getCompleteTasks(null, req).getEntity();
@ -325,7 +337,8 @@ public class OverlordResourceTest
taskStorageQueryAdapter,
indexerMetadataStorageAdapter,
req,
workerTaskRunnerQueryAdapter
workerTaskRunnerQueryAdapter,
authConfig
);
List<TaskStatusPlus> responseObjects = (List) overlordResource.getRunningTasks(null, req)
@ -373,7 +386,8 @@ public class OverlordResourceTest
taskStorageQueryAdapter,
indexerMetadataStorageAdapter,
req,
workerTaskRunnerQueryAdapter
workerTaskRunnerQueryAdapter,
authConfig
);
List<TaskStatusPlus> responseObjects = (List<TaskStatusPlus>) overlordResource
.getTasks(null, null, null, null, null, req)
@ -419,7 +433,8 @@ public class OverlordResourceTest
taskStorageQueryAdapter,
indexerMetadataStorageAdapter,
req,
workerTaskRunnerQueryAdapter
workerTaskRunnerQueryAdapter,
authConfig
);
List<TaskStatusPlus> responseObjects = (List<TaskStatusPlus>) overlordResource
@ -465,7 +480,8 @@ public class OverlordResourceTest
taskStorageQueryAdapter,
indexerMetadataStorageAdapter,
req,
workerTaskRunnerQueryAdapter
workerTaskRunnerQueryAdapter,
authConfig
);
List<TaskStatusPlus> responseObjects = (List<TaskStatusPlus>) overlordResource
.getTasks(
@ -517,7 +533,8 @@ public class OverlordResourceTest
taskStorageQueryAdapter,
indexerMetadataStorageAdapter,
req,
workerTaskRunnerQueryAdapter
workerTaskRunnerQueryAdapter,
authConfig
);
List<TaskStatusPlus> responseObjects = (List) overlordResource
@ -566,7 +583,8 @@ public class OverlordResourceTest
taskStorageQueryAdapter,
indexerMetadataStorageAdapter,
req,
workerTaskRunnerQueryAdapter
workerTaskRunnerQueryAdapter,
authConfig
);
List<TaskStatusPlus> responseObjects = (List<TaskStatusPlus>) overlordResource
@ -600,7 +618,8 @@ public class OverlordResourceTest
taskStorageQueryAdapter,
indexerMetadataStorageAdapter,
req,
workerTaskRunnerQueryAdapter
workerTaskRunnerQueryAdapter,
authConfig
);
List<TaskStatusPlus> responseObjects = (List<TaskStatusPlus>) overlordResource
.getTasks("complete", null, null, null, null, req)
@ -634,7 +653,8 @@ public class OverlordResourceTest
taskStorageQueryAdapter,
indexerMetadataStorageAdapter,
req,
workerTaskRunnerQueryAdapter
workerTaskRunnerQueryAdapter,
authConfig
);
String interval = "2010-01-01_P1D";
List<TaskStatusPlus> responseObjects = (List<TaskStatusPlus>) overlordResource
@ -689,7 +709,8 @@ public class OverlordResourceTest
taskStorageQueryAdapter,
indexerMetadataStorageAdapter,
req,
workerTaskRunnerQueryAdapter
workerTaskRunnerQueryAdapter,
authConfig
);
// Verify that only the tasks of read access datasource are returned
@ -745,7 +766,8 @@ public class OverlordResourceTest
taskStorageQueryAdapter,
indexerMetadataStorageAdapter,
req,
workerTaskRunnerQueryAdapter
workerTaskRunnerQueryAdapter,
authConfig
);
// Verify that only the tasks of read access datasource are returned
@ -772,7 +794,8 @@ public class OverlordResourceTest
taskStorageQueryAdapter,
indexerMetadataStorageAdapter,
req,
workerTaskRunnerQueryAdapter
workerTaskRunnerQueryAdapter,
authConfig
);
// Verify that only the tasks of read access datasource are returned
@ -805,7 +828,8 @@ public class OverlordResourceTest
taskStorageQueryAdapter,
indexerMetadataStorageAdapter,
req,
workerTaskRunnerQueryAdapter
workerTaskRunnerQueryAdapter,
authConfig
);
List<TaskStatusPlus> responseObjects = (List<TaskStatusPlus>) overlordResource
.getTasks("complete", null, null, null, null, req)
@ -824,7 +848,8 @@ public class OverlordResourceTest
taskStorageQueryAdapter,
indexerMetadataStorageAdapter,
req,
workerTaskRunnerQueryAdapter
workerTaskRunnerQueryAdapter,
authConfig
);
Object responseObject = overlordResource
.getTasks("blah", "ds_test", null, null, null, req)
@ -840,6 +865,7 @@ public class OverlordResourceTest
{
expectedException.expect(ForbiddenException.class);
expectAuthorizationTokenCheck();
EasyMock.expect(authConfig.isEnableInputSourceSecurity()).andReturn(false);
EasyMock.replay(
taskRunner,
@ -847,7 +873,8 @@ public class OverlordResourceTest
taskStorageQueryAdapter,
indexerMetadataStorageAdapter,
req,
workerTaskRunnerQueryAdapter
workerTaskRunnerQueryAdapter,
authConfig
);
Task task = NoopTask.create();
overlordResource.taskPost(task, req);
@ -857,6 +884,7 @@ public class OverlordResourceTest
public void testTaskPostDeniesDatasourceReadUser()
{
expectAuthorizationTokenCheck(Users.WIKI_READER);
EasyMock.expect(authConfig.isEnableInputSourceSecurity()).andReturn(false);
EasyMock.replay(
taskRunner,
@ -864,7 +892,8 @@ public class OverlordResourceTest
taskStorageQueryAdapter,
indexerMetadataStorageAdapter,
req,
workerTaskRunnerQueryAdapter
workerTaskRunnerQueryAdapter,
authConfig
);
// Verify that taskPost fails for user who has only datasource read access
@ -895,7 +924,8 @@ public class OverlordResourceTest
taskStorageQueryAdapter,
indexerMetadataStorageAdapter,
req,
workerTaskRunnerQueryAdapter
workerTaskRunnerQueryAdapter,
authConfig
);
final Map<String, Integer> response = (Map<String, Integer>) overlordResource
@ -924,7 +954,8 @@ public class OverlordResourceTest
taskStorageQueryAdapter,
indexerMetadataStorageAdapter,
req,
workerTaskRunnerQueryAdapter
workerTaskRunnerQueryAdapter,
authConfig
);
final Response response1 = overlordResource.getTaskPayload("mytask");
@ -973,7 +1004,8 @@ public class OverlordResourceTest
taskStorageQueryAdapter,
indexerMetadataStorageAdapter,
req,
workerTaskRunnerQueryAdapter
workerTaskRunnerQueryAdapter,
authConfig
);
final Response response1 = overlordResource.getTaskStatus("mytask");
@ -1031,7 +1063,8 @@ public class OverlordResourceTest
taskStorageQueryAdapter,
indexerMetadataStorageAdapter,
req,
workerTaskRunnerQueryAdapter
workerTaskRunnerQueryAdapter,
authConfig
);
final Response response = overlordResource.getDatasourceLockedIntervals(minTaskPriority);
@ -1057,7 +1090,8 @@ public class OverlordResourceTest
taskStorageQueryAdapter,
indexerMetadataStorageAdapter,
req,
workerTaskRunnerQueryAdapter
workerTaskRunnerQueryAdapter,
authConfig
);
Response response = overlordResource.getDatasourceLockedIntervals(null);
@ -1091,7 +1125,8 @@ public class OverlordResourceTest
indexerMetadataStorageAdapter,
req,
mockQueue,
workerTaskRunnerQueryAdapter
workerTaskRunnerQueryAdapter,
authConfig
);
final Map<String, Integer> response = (Map<String, Integer>) overlordResource
@ -1142,7 +1177,8 @@ public class OverlordResourceTest
indexerMetadataStorageAdapter,
req,
mockQueue,
workerTaskRunnerQueryAdapter
workerTaskRunnerQueryAdapter,
authConfig
);
final Map<String, Integer> response = (Map<String, Integer>) overlordResource
@ -1164,7 +1200,8 @@ public class OverlordResourceTest
taskStorageQueryAdapter,
indexerMetadataStorageAdapter,
req,
workerTaskRunnerQueryAdapter
workerTaskRunnerQueryAdapter,
authConfig
);
final Response response = overlordResource.shutdownTasksForDataSource("notExisting");
@ -1185,7 +1222,8 @@ public class OverlordResourceTest
taskStorageQueryAdapter,
indexerMetadataStorageAdapter,
req,
workerTaskRunnerQueryAdapter
workerTaskRunnerQueryAdapter,
authConfig
);
final Response response = overlordResource.enableWorker(host);
@ -1208,7 +1246,8 @@ public class OverlordResourceTest
taskStorageQueryAdapter,
indexerMetadataStorageAdapter,
req,
workerTaskRunnerQueryAdapter
workerTaskRunnerQueryAdapter,
authConfig
);
final Response response = overlordResource.disableWorker(host);
@ -1231,7 +1270,8 @@ public class OverlordResourceTest
taskStorageQueryAdapter,
indexerMetadataStorageAdapter,
req,
workerTaskRunnerQueryAdapter
workerTaskRunnerQueryAdapter,
authConfig
);
final Response response = overlordResource.enableWorker(host);
@ -1254,7 +1294,8 @@ public class OverlordResourceTest
taskStorageQueryAdapter,
indexerMetadataStorageAdapter,
req,
workerTaskRunnerQueryAdapter
workerTaskRunnerQueryAdapter,
authConfig
);
final Response response = overlordResource.disableWorker(host);
@ -1277,7 +1318,8 @@ public class OverlordResourceTest
indexerMetadataStorageAdapter,
req,
workerTaskRunnerQueryAdapter,
configManager
configManager,
authConfig
);
final Response response = overlordResource.getTotalWorkerCapacity();
Assert.assertEquals(HttpResponseStatus.SERVICE_UNAVAILABLE.getCode(), response.getStatus());
@ -1296,7 +1338,8 @@ public class OverlordResourceTest
indexerMetadataStorageAdapter,
req,
workerTaskRunnerQueryAdapter,
configManager
configManager,
authConfig
);
final Response response = overlordResource.getTotalWorkerCapacity();
Assert.assertEquals(HttpResponseStatus.OK.getCode(), response.getStatus());
@ -1316,7 +1359,8 @@ public class OverlordResourceTest
indexerMetadataStorageAdapter,
req,
workerTaskRunnerQueryAdapter,
configManager
configManager,
authConfig
);
final Response response = overlordResource.getTotalWorkerCapacity();
Assert.assertEquals(HttpResponseStatus.OK.getCode(), response.getStatus());
@ -1337,7 +1381,8 @@ public class OverlordResourceTest
indexerMetadataStorageAdapter,
req,
workerTaskRunnerQueryAdapter,
configManager
configManager,
authConfig
);
final Response response = overlordResource.getTotalWorkerCapacity();
Assert.assertEquals(HttpResponseStatus.OK.getCode(), response.getStatus());
@ -1384,7 +1429,8 @@ public class OverlordResourceTest
req,
workerTaskRunnerQueryAdapter,
configManager,
provisioningStrategy
provisioningStrategy,
authConfig
);
final Response response = overlordResource.getTotalWorkerCapacity();
Assert.assertEquals(HttpResponseStatus.OK.getCode(), response.getStatus());
@ -1431,7 +1477,8 @@ public class OverlordResourceTest
req,
workerTaskRunnerQueryAdapter,
configManager,
provisioningStrategy
provisioningStrategy,
authConfig
);
final Response response = overlordResource.getTotalWorkerCapacity();
Assert.assertEquals(HttpResponseStatus.OK.getCode(), response.getStatus());
@ -1439,6 +1486,108 @@ public class OverlordResourceTest
Assert.assertEquals(invalidExpectedCapacity, ((TotalWorkerCapacityResponse) response.getEntity()).getMaximumCapacityWithAutoScale());
}
@Test
public void testResourceActionsForTaskWithInputTypeAndInputSecurityEnabled()
{
final String dataSource = "dataSourceTest";
final String inputSourceType = "local";
Task task = EasyMock.createMock(Task.class);
EasyMock.expect(authConfig.isEnableInputSourceSecurity()).andReturn(true);
EasyMock.expect(task.getDataSource()).andReturn(dataSource);
EasyMock.expect(task.getInputSourceResources())
.andReturn(ImmutableSet.of(new ResourceAction(
new Resource(ResourceType.EXTERNAL, inputSourceType),
Action.READ
)));
EasyMock.replay(
task,
authConfig,
taskRunner,
taskMaster,
taskStorageQueryAdapter,
indexerMetadataStorageAdapter,
req,
workerTaskRunnerQueryAdapter
);
Set<ResourceAction> expectedResourceActions = ImmutableSet.of(
new ResourceAction(new Resource(dataSource, ResourceType.DATASOURCE), Action.WRITE),
new ResourceAction(new Resource(ResourceType.EXTERNAL, inputSourceType), Action.READ)
);
Set<ResourceAction> resourceActions = overlordResource.getNeededResourceActionsForTask(task);
Assert.assertEquals(expectedResourceActions, resourceActions);
}
@Test
public void testResourceActionsForTaskWithFirehoseAndInputSecurityEnabled()
{
final String dataSource = "dataSourceTest";
final UOE expectedException = new UOE("unsupported");
Task task = EasyMock.createMock(Task.class);
EasyMock.expect(authConfig.isEnableInputSourceSecurity()).andReturn(true);
EasyMock.expect(task.getId()).andReturn("taskId");
EasyMock.expect(task.getDataSource()).andReturn(dataSource);
EasyMock.expect(task.getInputSourceResources()).andThrow(expectedException);
EasyMock.replay(
task,
authConfig,
taskRunner,
taskMaster,
taskStorageQueryAdapter,
indexerMetadataStorageAdapter,
req,
workerTaskRunnerQueryAdapter
);
final UOE e = Assert.assertThrows(
UOE.class,
() -> overlordResource.getNeededResourceActionsForTask(task)
);
Assert.assertEquals(expectedException, e);
}
@Test
public void testResourceActionsForTaskWithInputTypeAndInputSecurityDisabled()
{
final String dataSource = "dataSourceTest";
final String inputSourceType = "local";
Task task = EasyMock.createMock(Task.class);
EasyMock.expect(authConfig.isEnableInputSourceSecurity()).andReturn(false);
EasyMock.expect(task.getDataSource()).andReturn(dataSource);
EasyMock.expect(task.getInputSourceResources())
.andReturn(ImmutableSet.of(new ResourceAction(
new Resource(ResourceType.EXTERNAL, inputSourceType),
Action.READ
)));
EasyMock.replay(
task,
authConfig,
taskRunner,
taskMaster,
taskStorageQueryAdapter,
indexerMetadataStorageAdapter,
req,
workerTaskRunnerQueryAdapter
);
Set<ResourceAction> expectedResourceActions = ImmutableSet.of(
new ResourceAction(new Resource(dataSource, ResourceType.DATASOURCE), Action.WRITE)
);
Set<ResourceAction> resourceActions = overlordResource.getNeededResourceActionsForTask(task);
Assert.assertEquals(expectedResourceActions, resourceActions);
}
private void expectAuthorizationTokenCheck()
{
expectAuthorizationTokenCheck(Users.DRUID);

View File

@ -258,7 +258,8 @@ public class OverlordTest
null,
AuthTestUtils.TEST_AUTHORIZER_MAPPER,
workerTaskRunnerQueryAdapter,
null
null,
new AuthConfig()
);
Response response = overlordResource.getLeader();
Assert.assertEquals(druidNode.getHostAndPort(), response.getEntity());

View File

@ -28,9 +28,12 @@ import org.apache.druid.data.input.impl.HttpInputSource;
import org.apache.druid.data.input.impl.InlineInputSource;
import org.apache.druid.data.input.impl.LocalInputSource;
import org.apache.druid.guice.annotations.UnstableApi;
import org.apache.druid.java.util.common.UOE;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.io.File;
import java.util.Set;
/**
* InputSource abstracts the storage system where input data is stored. It creates an {@link InputSourceReader}
@ -87,4 +90,16 @@ public interface InputSource
@Nullable InputFormat inputFormat,
File temporaryDirectory
);
/**
* The types of input sources uses. A set is returned here, as some InputSource implementation allow for
* combining of multiple input sources.
* @return The types of input sources uses
*/
@JsonIgnore
@Nonnull
default Set<String> getTypes()
{
throw new UOE("This inputSource does not support input source based security");
}
}

View File

@ -20,6 +20,7 @@
package org.apache.druid.data.input.impl;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import org.apache.druid.data.input.AbstractInputSource;
@ -29,10 +30,13 @@ import org.apache.druid.data.input.InputSplit;
import org.apache.druid.data.input.SplitHintSpec;
import org.apache.druid.java.util.common.Pair;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Stream;
/**
@ -61,6 +65,19 @@ public class CombiningInputSource extends AbstractInputSource implements Splitta
this.delegates = delegates;
}
@JsonIgnore
@Nonnull
@Override
public Set<String> getTypes()
{
Set<String> types = new HashSet<>();
for (InputSource delegate : delegates) {
types.addAll(delegate.getTypes());
}
return types;
}
@JsonProperty
public List<SplittableInputSource> getDelegates()
{

View File

@ -21,6 +21,7 @@ package org.apache.druid.data.input.impl;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
@ -34,12 +35,14 @@ import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.metadata.PasswordProvider;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.io.File;
import java.net.URI;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Stream;
public class HttpInputSource extends AbstractInputSource implements SplittableInputSource<URI>
@ -69,6 +72,14 @@ public class HttpInputSource extends AbstractInputSource implements SplittableIn
this.config = config;
}
@JsonIgnore
@Nonnull
@Override
public Set<String> getTypes()
{
return Collections.singleton(TYPE_KEY);
}
public static void throwIfInvalidProtocols(HttpInputSourceConfig config, List<URI> uris)
{
for (URI uri : uris) {

View File

@ -30,9 +30,12 @@ import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.InputSourceReader;
import org.apache.druid.java.util.common.StringUtils;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.io.File;
import java.util.Collections;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Stream;
public class InlineInputSource extends AbstractInputSource
@ -48,6 +51,14 @@ public class InlineInputSource extends AbstractInputSource
this.data = data;
}
@JsonIgnore
@Nonnull
@Override
public Set<String> getTypes()
{
return Collections.singleton(TYPE_KEY);
}
@JsonProperty
public String getData()
{

View File

@ -20,6 +20,7 @@
package org.apache.druid.data.input.impl;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.annotations.VisibleForTesting;
@ -45,12 +46,14 @@ import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.utils.CollectionUtils;
import org.apache.druid.utils.Streams;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.io.File;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@ -81,6 +84,14 @@ public class LocalInputSource extends AbstractInputSource implements SplittableI
}
}
@JsonIgnore
@Nonnull
@Override
public Set<String> getTypes()
{
return Collections.singleton(TYPE_KEY);
}
public LocalInputSource(File baseDir, String filter)
{
this(baseDir, filter, null);

View File

@ -20,6 +20,7 @@
package org.apache.druid.data.input.impl;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.module.SimpleModule;
@ -42,8 +43,11 @@ import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@ -66,6 +70,24 @@ public class CombiningInputSourceTest
Assert.assertEquals(combiningInputSource, fromJson);
}
@Test
public void testGetTypes()
{
final ObjectMapper mapper = new ObjectMapper();
mapper.registerModule(new SimpleModule("test-module").registerSubtypes(TestFileInputSource.class, TestUriInputSource.class));
final TestFileInputSource fileSource = new TestFileInputSource(ImmutableList.of(new File("myFile").getAbsoluteFile()));
final TestUriInputSource uriInputSource = new TestUriInputSource(
ImmutableList.of(URI.create("http://test.com/http-test")));
final CombiningInputSource combiningInputSource = new CombiningInputSource(ImmutableList.of(
fileSource,
uriInputSource
));
Set<String> expectedTypes = new HashSet<>();
expectedTypes.addAll(fileSource.getTypes());
expectedTypes.addAll(uriInputSource.getTypes());
Assert.assertEquals(expectedTypes, combiningInputSource.getTypes());
}
@Test
public void testEstimateNumSplits()
{
@ -201,6 +223,13 @@ public class CombiningInputSourceTest
files = fileList;
}
@JsonIgnore
@Override
public Set<String> getTypes()
{
return Collections.singleton("testFile");
}
@JsonProperty
public List<File> getFiles()
{
@ -261,6 +290,13 @@ public class CombiningInputSourceTest
uris = uriList;
}
@JsonIgnore
@Override
public Set<String> getTypes()
{
return Collections.singleton("testUri");
}
@JsonProperty
public List<URI> getUris()
{

View File

@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.data.input.impl;
import org.junit.Assert;
import org.junit.Test;
import java.util.Collections;
public class InlineInputSourceTest
{
@Test
public void testGetTypes()
{
InlineInputSource inputSource = new InlineInputSource("data");
Assert.assertEquals(Collections.singleton(InlineInputSource.TYPE_KEY), inputSource.getTypes());
}
}

View File

@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.data.input.impl;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.InputSource;
import org.apache.druid.data.input.InputSourceReader;
import org.apache.druid.java.util.common.UOE;
import org.junit.Assert;
import org.junit.Test;
import javax.annotation.Nullable;
import java.io.File;
public class InputSourceTest
{
private static InputSource INPUT_SOURCE = new InputSource()
{
@Override
public boolean isSplittable()
{
return false;
}
@Override
public boolean needsFormat()
{
return false;
}
@Override
public InputSourceReader reader(
InputRowSchema inputRowSchema,
@Nullable InputFormat inputFormat,
File temporaryDirectory
)
{
return null;
}
};
@Test
public void testGetTypes()
{
Assert.assertThrows(UOE.class, () -> INPUT_SOURCE.getTypes());
}
}

View File

@ -40,6 +40,7 @@ import java.io.Writer;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
@ -88,6 +89,13 @@ public class LocalInputSourceTest
Assert.assertEquals(source, fromJson);
}
@Test
public void testGetTypes()
{
final LocalInputSource source = new LocalInputSource(new File("myFile").getAbsoluteFile(), "myFilter");
Assert.assertEquals(Collections.singleton(LocalInputSource.TYPE_KEY), source.getTypes());
}
@Test
public void testEquals()
{

View File

@ -20,15 +20,20 @@
package org.apache.druid.indexing.overlord.supervisor;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.indexing.overlord.DataSourceMetadata;
import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
import org.apache.druid.server.security.ResourceAction;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Set;
/**
* Used as a tombstone marker in the supervisors metadata table to indicate that the supervisor has been removed.
@ -111,6 +116,14 @@ public class NoopSupervisorSpec implements SupervisorSpec
return type;
}
@Nonnull
@JsonIgnore
@Override
public Set<ResourceAction> getInputSourceTypes()
{
return ImmutableSet.of();
}
@Override
@JsonProperty("source")
public String getSource()

View File

@ -19,11 +19,17 @@
package org.apache.druid.indexing.overlord.supervisor;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.UOE;
import org.apache.druid.server.security.ResourceAction;
import javax.annotation.Nonnull;
import java.util.List;
import java.util.Set;
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
@JsonSubTypes(value = {
@ -71,6 +77,22 @@ public interface SupervisorSpec
*/
String getType();
/**
* @return The types of {@link org.apache.druid.data.input.InputSource} that the task uses. Empty set is returned if
* the task does not use any. Users can be given permission to access particular types of
* input sources but not others, using the
* {@link org.apache.druid.server.security.AuthConfig#enableInputSourceSecurity} config.
*/
@JsonIgnore
@Nonnull
default Set<ResourceAction> getInputSourceTypes() throws UnsupportedOperationException
{
throw new UOE(StringUtils.format(
"SuperviserSpec type [%s], does not support input source based security",
getType()
));
}
/**
* This API is only used for informational purposes in
* org.apache.druid.sql.calcite.schema.SystemSchema.SupervisorsTable

View File

@ -41,7 +41,7 @@ public class InputSourceModule implements DruidModule
return ImmutableList.<Module>of(
new SimpleModule("InputSourceModule")
.registerSubtypes(
new NamedType(SqlInputSource.class, "sql")
new NamedType(SqlInputSource.class, SqlInputSource.TYPE_KEY)
)
);
}

View File

@ -21,6 +21,7 @@ package org.apache.druid.metadata.input;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
@ -35,15 +36,18 @@ import org.apache.druid.data.input.impl.SplittableInputSource;
import org.apache.druid.guice.annotations.Smile;
import org.apache.druid.metadata.SQLFirehoseDatabaseConnector;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.io.File;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Stream;
public class SqlInputSource extends AbstractInputSource implements SplittableInputSource<String>
{
static final String TYPE_KEY = "sql";
private final List<String> sqls;
private final SQLFirehoseDatabaseConnector sqlFirehoseDatabaseConnector;
private final ObjectMapper objectMapper;
@ -68,6 +72,14 @@ public class SqlInputSource extends AbstractInputSource implements SplittableInp
this.objectMapper = objectMapper;
}
@JsonIgnore
@Nonnull
@Override
public Set<String> getTypes()
{
return Collections.singleton(TYPE_KEY);
}
@JsonProperty
public List<String> getSqls()
{

View File

@ -64,4 +64,11 @@ public class NoopSupervisorSpecTest
}
Assert.assertNull(e);
}
@Test
public void testInputSourceTypes()
{
NoopSupervisorSpec noopSupervisorSpec = new NoopSupervisorSpec(null, Collections.singletonList("datasource1"));
Assert.assertTrue(noopSupervisorSpec.getInputSourceTypes().isEmpty());
}
}

View File

@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.overlord.supervisor;
import org.apache.druid.java.util.common.UOE;
import org.junit.Assert;
import org.junit.Test;
import java.util.List;
public class SupervisorSpecTest
{
private static final SupervisorSpec SUPERVISOR_SPEC = new SupervisorSpec()
{
@Override
public String getId()
{
return null;
}
@Override
public Supervisor createSupervisor()
{
return null;
}
@Override
public List<String> getDataSources()
{
return null;
}
@Override
public String getType()
{
return null;
}
@Override
public String getSource()
{
return null;
}
};
@Test
public void test()
{
Assert.assertThrows(UOE.class, () -> SUPERVISOR_SPEC.getInputSourceTypes());
}
}

View File

@ -57,6 +57,7 @@ import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Set;
@ -123,6 +124,17 @@ public class SqlInputSourceTest
Assert.assertEquals(sqlInputSource, inputSourceFromJson);
}
@Test
public void testGetTypes()
{
mapper.registerSubtypes(TestSerdeFirehoseConnector.class);
final SqlInputSourceTest.TestSerdeFirehoseConnector testSerdeFirehoseConnector = new SqlInputSourceTest.TestSerdeFirehoseConnector(
new MetadataStorageConnectorConfig());
final SqlInputSource sqlInputSource =
new SqlInputSource(SqlTestUtils.selectFrom(TABLE_1), true, testSerdeFirehoseConnector, mapper);
Assert.assertEquals(Collections.singleton(SqlInputSource.TYPE_KEY), sqlInputSource.getTypes());
}
@Test
public void testSingleSplit() throws Exception
{

View File

@ -51,14 +51,12 @@ public class SqlResourceCollectorShuttle extends SqlShuttle
private final Set<ResourceAction> resourceActions;
private final PlannerContext plannerContext;
private final SqlValidator validator;
private final boolean inputSourceTypeSecurityEnabled;
public SqlResourceCollectorShuttle(SqlValidator validator, PlannerContext plannerContext)
{
this.validator = validator;
this.resourceActions = new HashSet<>();
this.plannerContext = plannerContext;
inputSourceTypeSecurityEnabled = plannerContext.getPlannerToolbox().getAuthConfig().isEnableInputSourceSecurity();
}
@Override
@ -67,7 +65,7 @@ public class SqlResourceCollectorShuttle extends SqlShuttle
if (call.getOperator() instanceof AuthorizableOperator) {
resourceActions.addAll(((AuthorizableOperator) call.getOperator()).computeResources(
call,
inputSourceTypeSecurityEnabled
plannerContext.getPlannerToolbox().getAuthConfig().isEnableInputSourceSecurity()
));
}