Input source security fixes (#14266)

It was found that several supported tasks / input sources did not have implementations for the methods used by the input source security feature, causing these tasks and input sources to fail when used with this feature. This pr adds the needed missing implementations. Also securing the sampling endpoint with input source security, when enabled.
This commit is contained in:
zachjsh 2023-06-01 16:37:19 -07:00 committed by GitHub
parent e75fb8e8e3
commit 04a82da63d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
38 changed files with 941 additions and 24 deletions

View File

@ -24,10 +24,12 @@ import com.aliyun.oss.model.OSSObjectSummary;
import com.aliyun.oss.model.ObjectMetadata;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterators;
import org.apache.commons.lang.StringUtils;
import org.apache.druid.data.input.InputEntity;
@ -40,6 +42,7 @@ import org.apache.druid.storage.aliyun.OssInputDataConfig;
import org.apache.druid.storage.aliyun.OssStorageDruidModule;
import org.apache.druid.storage.aliyun.OssUtils;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.net.URI;
import java.nio.file.FileSystems;
@ -48,6 +51,7 @@ import java.nio.file.Paths;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Set;
public class OssInputSource extends CloudObjectInputSource
{
@ -94,6 +98,14 @@ public class OssInputSource extends CloudObjectInputSource
);
}
@Override
@JsonIgnore
@Nonnull
public Set<String> getTypes()
{
return ImmutableSet.of(OssStorageDruidModule.SCHEME);
}
@Nullable
@JsonProperty("properties")

View File

@ -36,6 +36,7 @@ import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.fasterxml.jackson.module.guice.ObjectMapperModule;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.inject.Binder;
import com.google.inject.Guice;
import com.google.inject.Injector;
@ -61,6 +62,7 @@ import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.java.util.common.parsers.JSONPathSpec;
import org.apache.druid.metadata.DefaultPasswordProvider;
import org.apache.druid.storage.aliyun.OssInputDataConfig;
import org.apache.druid.storage.aliyun.OssStorageDruidModule;
import org.apache.druid.storage.aliyun.OssUtils;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.apache.druid.utils.CompressionUtils;
@ -557,6 +559,22 @@ public class OssInputSourceTest extends InitializedNullHandlingTest
EasyMock.verify(OSSCLIENT);
}
@Test
public void testGetTypes()
{
OssInputSource inputSource = new OssInputSource(
OSSCLIENT,
INPUT_DATA_CONFIG,
null,
ImmutableList.of(PREFIXES.get(0), EXPECTED_COMPRESSED_URIS.get(1)),
null,
null,
null
);
Assert.assertEquals(ImmutableSet.of(OssStorageDruidModule.SCHEME), inputSource.getTypes());
}
private static void expectListObjects(URI prefix, List<URI> uris, byte[] content)
{
final ObjectListing result = new ObjectListing();

View File

@ -44,7 +44,6 @@ import java.util.Set;
public class KafkaIndexTask extends SeekableStreamIndexTask<Integer, Long, KafkaRecordEntity>
{
public static final String INPUT_SOURCE_TYPE = "kafka";
private static final String TYPE = "index_kafka";
private final ObjectMapper configMapper;
@ -147,7 +146,7 @@ public class KafkaIndexTask extends SeekableStreamIndexTask<Integer, Long, Kafka
public Set<ResourceAction> getInputSourceResources()
{
return Collections.singleton(new ResourceAction(
new Resource(INPUT_SOURCE_TYPE, ResourceType.EXTERNAL),
new Resource(KafkaIndexTaskModule.SCHEME, ResourceType.EXTERNAL),
Action.READ
));
}

View File

@ -33,6 +33,8 @@ import java.util.List;
public class KafkaIndexTaskModule implements DruidModule
{
public static final String SCHEME = "kafka";
@Override
public List<? extends Module> getJacksonModules()
{
@ -40,15 +42,15 @@ public class KafkaIndexTaskModule implements DruidModule
new SimpleModule(getClass().getSimpleName())
.registerSubtypes(
new NamedType(KafkaIndexTask.class, "index_kafka"),
new NamedType(KafkaDataSourceMetadata.class, "kafka"),
new NamedType(KafkaIndexTaskIOConfig.class, "kafka"),
new NamedType(KafkaDataSourceMetadata.class, SCHEME),
new NamedType(KafkaIndexTaskIOConfig.class, SCHEME),
// "KafkaTuningConfig" is not the ideal name, but is needed for backwards compatibility.
// (Older versions of Druid didn't specify a type name and got this one by default.)
new NamedType(KafkaIndexTaskTuningConfig.class, "KafkaTuningConfig"),
new NamedType(KafkaSupervisorTuningConfig.class, "kafka"),
new NamedType(KafkaSupervisorSpec.class, "kafka"),
new NamedType(KafkaSamplerSpec.class, "kafka"),
new NamedType(KafkaInputFormat.class, "kafka")
new NamedType(KafkaSupervisorTuningConfig.class, SCHEME),
new NamedType(KafkaSupervisorSpec.class, SCHEME),
new NamedType(KafkaSamplerSpec.class, SCHEME),
new NamedType(KafkaInputFormat.class, SCHEME)
)
);
}

View File

@ -21,6 +21,7 @@ package org.apache.druid.indexing.kafka;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorIOConfig;
@ -28,10 +29,18 @@ import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorSpec;
import org.apache.druid.indexing.overlord.sampler.InputSourceSampler;
import org.apache.druid.indexing.overlord.sampler.SamplerConfig;
import org.apache.druid.indexing.seekablestream.SeekableStreamSamplerSpec;
import org.apache.druid.java.util.common.UOE;
import org.apache.druid.server.security.Action;
import org.apache.druid.server.security.Resource;
import org.apache.druid.server.security.ResourceAction;
import org.apache.druid.server.security.ResourceType;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
public class KafkaSamplerSpec extends SeekableStreamSamplerSpec
{
@ -69,4 +78,21 @@ public class KafkaSamplerSpec extends SeekableStreamSamplerSpec
Thread.currentThread().setContextClassLoader(currCtxCl);
}
}
@Override
public String getType()
{
return KafkaIndexTaskModule.SCHEME;
}
@Override
@JsonIgnore
@Nonnull
public Set<ResourceAction> getInputSourceResources() throws UOE
{
return Collections.singleton(new ResourceAction(
new Resource(KafkaIndexTaskModule.SCHEME, ResourceType.EXTERNAL),
Action.READ
));
}
}

View File

@ -2709,7 +2709,7 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
Assert.assertEquals(
Collections.singleton(
new ResourceAction(new Resource(
KafkaIndexTask.INPUT_SOURCE_TYPE,
KafkaIndexTaskModule.SCHEME,
ResourceType.EXTERNAL
), Action.READ)),
task.getInputSourceResources()

View File

@ -52,6 +52,10 @@ import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.server.security.Action;
import org.apache.druid.server.security.Resource;
import org.apache.druid.server.security.ResourceAction;
import org.apache.druid.server.security.ResourceType;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
@ -64,6 +68,7 @@ import org.junit.rules.ExpectedException;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@ -541,4 +546,63 @@ public class KafkaSamplerSpecTest extends InitializedNullHandlingTest
expectedException.expectMessage("Invalid url in bootstrap.servers");
samplerSpec.sample();
}
@Test
public void testGetInputSourceResources()
{
KafkaSupervisorSpec supervisorSpec = new KafkaSupervisorSpec(
null,
DATA_SCHEMA,
null,
new KafkaSupervisorIOConfig(
TOPIC,
new JsonInputFormat(JSONPathSpec.DEFAULT, null, null, null, null),
null,
null,
null,
// invalid bootstrap server
ImmutableMap.of("bootstrap.servers", "127.0.0.1"),
null,
null,
null,
null,
true,
null,
null,
null,
null,
null,
null
),
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null
);
KafkaSamplerSpec samplerSpec = new KafkaSamplerSpec(
supervisorSpec,
new SamplerConfig(5, null, null, null),
new InputSourceSampler(OBJECT_MAPPER),
OBJECT_MAPPER
);
Assert.assertEquals(
Collections.singleton(
new ResourceAction(new Resource(
KafkaIndexTaskModule.SCHEME,
ResourceType.EXTERNAL
), Action.READ)),
samplerSpec.getInputSourceResources()
);
}
}

View File

@ -48,7 +48,6 @@ import java.util.Set;
public class KinesisIndexTask extends SeekableStreamIndexTask<String, String, ByteEntity>
{
public static final String INPUT_SOURCE_TYPE = "kinesis";
private static final String TYPE = "index_kinesis";
private static final Logger log = new Logger(KinesisIndexTask.class);
@ -162,7 +161,7 @@ public class KinesisIndexTask extends SeekableStreamIndexTask<String, String, By
public Set<ResourceAction> getInputSourceResources()
{
return Collections.singleton(new ResourceAction(
new Resource(INPUT_SOURCE_TYPE, ResourceType.EXTERNAL),
new Resource(KinesisIndexingServiceModule.SCHEME, ResourceType.EXTERNAL),
Action.READ
));
}

View File

@ -36,6 +36,7 @@ import java.util.List;
public class KinesisIndexingServiceModule implements DruidModule
{
public static final String AWS_SCOPE = "kinesis";
public static final String SCHEME = "kinesis";
static final String PROPERTY_BASE = "druid.kinesis";
@Override
@ -45,11 +46,11 @@ public class KinesisIndexingServiceModule implements DruidModule
new SimpleModule(getClass().getSimpleName())
.registerSubtypes(
new NamedType(KinesisIndexTask.class, "index_kinesis"),
new NamedType(KinesisDataSourceMetadata.class, "kinesis"),
new NamedType(KinesisIndexTaskIOConfig.class, "kinesis"),
new NamedType(KinesisSupervisorTuningConfig.class, "kinesis"),
new NamedType(KinesisSupervisorSpec.class, "kinesis"),
new NamedType(KinesisSamplerSpec.class, "kinesis")
new NamedType(KinesisDataSourceMetadata.class, SCHEME),
new NamedType(KinesisIndexTaskIOConfig.class, SCHEME),
new NamedType(KinesisSupervisorTuningConfig.class, SCHEME),
new NamedType(KinesisSupervisorSpec.class, SCHEME),
new NamedType(KinesisSamplerSpec.class, SCHEME)
)
);
}

View File

@ -30,8 +30,16 @@ import org.apache.druid.indexing.kinesis.supervisor.KinesisSupervisorTuningConfi
import org.apache.druid.indexing.overlord.sampler.InputSourceSampler;
import org.apache.druid.indexing.overlord.sampler.SamplerConfig;
import org.apache.druid.indexing.seekablestream.SeekableStreamSamplerSpec;
import org.apache.druid.java.util.common.UOE;
import org.apache.druid.server.security.Action;
import org.apache.druid.server.security.Resource;
import org.apache.druid.server.security.ResourceAction;
import org.apache.druid.server.security.ResourceType;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.Collections;
import java.util.Set;
public class KinesisSamplerSpec extends SeekableStreamSamplerSpec
{
@ -77,4 +85,20 @@ public class KinesisSamplerSpec extends SeekableStreamSamplerSpec
tuningConfig.isUseListShards()
);
}
@Override
public String getType()
{
return KinesisIndexingServiceModule.SCHEME;
}
@Nonnull
@Override
public Set<ResourceAction> getInputSourceResources() throws UOE
{
return Collections.singleton(new ResourceAction(
new Resource(KinesisIndexingServiceModule.SCHEME, ResourceType.EXTERNAL),
Action.READ
));
}
}

View File

@ -134,7 +134,7 @@ public class KinesisIndexTaskSerdeTest
Assert.assertEquals(
Collections.singleton(
new ResourceAction(new Resource(
KinesisIndexTask.INPUT_SOURCE_TYPE,
KinesisIndexingServiceModule.SCHEME,
ResourceType.EXTERNAL
), Action.READ)),
target.getInputSourceResources()

View File

@ -53,6 +53,10 @@ import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.server.security.Action;
import org.apache.druid.server.security.Resource;
import org.apache.druid.server.security.ResourceAction;
import org.apache.druid.server.security.ResourceType;
import org.easymock.EasyMock;
import org.easymock.EasyMockSupport;
import org.junit.Assert;
@ -248,6 +252,66 @@ public class KinesisSamplerSpecTest extends EasyMockSupport
runSamplerAndCompareResponse(samplerSpec, false);
}
@Test
public void testGetInputSourceResources()
{
KinesisSupervisorSpec supervisorSpec = new KinesisSupervisorSpec(
null,
DATA_SCHEMA,
null,
new KinesisSupervisorIOConfig(
STREAM,
new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()), ImmutableMap.of(), false, false, false),
null,
null,
null,
null,
null,
null,
null,
true,
null,
null,
null,
null,
null,
null,
null,
null,
null,
false
),
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null
);
KinesisSamplerSpec samplerSpec = new TestableKinesisSamplerSpec(
supervisorSpec,
new SamplerConfig(5, null, null, null),
new InputSourceSampler(new DefaultObjectMapper()),
null
);
Assert.assertEquals(
Collections.singleton(
new ResourceAction(new Resource(
KinesisIndexingServiceModule.SCHEME,
ResourceType.EXTERNAL
), Action.READ)),
samplerSpec.getInputSourceResources()
);
}
private void runSamplerAndCompareResponse(SamplerSpec samplerSpec, boolean useInputFormat) throws InterruptedException
{
EasyMock.expect(recordSupplier.getPartitionIds(STREAM)).andReturn(ImmutableSet.of(SHARD_ID)).once();

View File

@ -21,6 +21,7 @@ package org.apache.druid.msq.input;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowListPlusRawValues;
@ -29,11 +30,14 @@ import org.apache.druid.data.input.InputSource;
import org.apache.druid.data.input.InputSourceReader;
import org.apache.druid.data.input.InputStats;
import org.apache.druid.java.util.common.CloseableIterators;
import org.apache.druid.java.util.common.UOE;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.io.File;
import java.util.Collections;
import java.util.Set;
/**
* An {@link InputSource} that returns nothing (no rows).
@ -88,4 +92,11 @@ public class NilInputSource implements InputSource
}
};
}
@Nonnull
@Override
public Set<String> getTypes() throws UOE
{
return ImmutableSet.of();
}
}

View File

@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.msq.input.external;
import org.apache.druid.msq.input.NilInputSource;
import org.junit.Assert;
import org.junit.Test;
public class NilInputSourceTest
{
private static final NilInputSource NIL_INPUT_SOURCE = NilInputSource.instance();
@Test
public void testGetTypes()
{
Assert.assertTrue(NIL_INPUT_SOURCE.getTypes().isEmpty());
}
}

View File

@ -32,6 +32,8 @@ import java.util.List;
public class IndexingServiceInputSourceModule implements DruidModule
{
public static final String GENERATOR_SCHEME = "generator";
@Override
public List<? extends Module> getJacksonModules()
{
@ -39,7 +41,7 @@ public class IndexingServiceInputSourceModule implements DruidModule
new SimpleModule("IndexingServiceInputSourceModule")
.registerSubtypes(
new NamedType(DruidInputSource.class, "druid"),
new NamedType(GeneratorInputSource.class, "generator")
new NamedType(GeneratorInputSource.class, GENERATOR_SCHEME)
)
);
}

View File

@ -62,6 +62,7 @@ import org.apache.druid.indexing.common.stats.TaskRealtimeMetricsMonitor;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.UOE;
import org.apache.druid.java.util.common.parsers.ParseException;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.query.NoopQueryRunner;
@ -87,10 +88,12 @@ import org.apache.druid.segment.realtime.firehose.TimedShutoffFirehoseFactory;
import org.apache.druid.segment.realtime.plumber.Committers;
import org.apache.druid.server.security.Action;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.server.security.ResourceAction;
import org.apache.druid.timeline.partition.NumberedPartialShardSpec;
import org.apache.druid.utils.CloseableUtils;
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
import javax.annotation.Nonnull;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
@ -104,6 +107,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentLinkedQueue;
@ -213,6 +217,17 @@ public class AppenderatorDriverRealtimeIndexTask extends AbstractTask implements
return "index_realtime_appenderator";
}
@Override
@JsonIgnore
@Nonnull
public Set<ResourceAction> getInputSourceResources() throws UOE
{
throw new UOE(StringUtils.format(
"Task type [%s], does not support input source based security",
getType()
));
}
@Override
public String getNodeType()
{

View File

@ -19,6 +19,7 @@
package org.apache.druid.indexing.common.task;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.indexer.TaskStatus;
@ -28,11 +29,14 @@ import org.apache.druid.indexing.common.actions.RetrieveUnusedSegmentsAction;
import org.apache.druid.indexing.common.actions.SegmentMetadataUpdateAction;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.server.security.ResourceAction;
import org.apache.druid.timeline.DataSegment;
import org.joda.time.Interval;
import javax.annotation.Nonnull;
import java.util.List;
import java.util.Map;
import java.util.Set;
public class ArchiveTask extends AbstractFixedIntervalTask
{
@ -59,6 +63,14 @@ public class ArchiveTask extends AbstractFixedIntervalTask
return "archive";
}
@Nonnull
@JsonIgnore
@Override
public Set<ResourceAction> getInputSourceResources()
{
return ImmutableSet.of();
}
@Override
public TaskStatus runTask(TaskToolbox toolbox) throws Exception
{

View File

@ -20,6 +20,7 @@
package org.apache.druid.indexing.common.task;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.indexer.TaskStatus;
@ -29,11 +30,14 @@ import org.apache.druid.indexing.common.actions.RetrieveUnusedSegmentsAction;
import org.apache.druid.indexing.common.actions.SegmentMetadataUpdateAction;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.server.security.ResourceAction;
import org.apache.druid.timeline.DataSegment;
import org.joda.time.Interval;
import javax.annotation.Nonnull;
import java.util.List;
import java.util.Map;
import java.util.Set;
public class MoveTask extends AbstractFixedIntervalTask
{
@ -67,6 +71,14 @@ public class MoveTask extends AbstractFixedIntervalTask
return "move";
}
@Nonnull
@JsonIgnore
@Override
public Set<ResourceAction> getInputSourceResources()
{
return ImmutableSet.of();
}
@Override
public TaskStatus runTask(TaskToolbox toolbox) throws Exception
{

View File

@ -44,6 +44,7 @@ import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.UOE;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.query.NoopQueryRunner;
import org.apache.druid.query.Query;
@ -66,13 +67,16 @@ import org.apache.druid.segment.realtime.plumber.Plumbers;
import org.apache.druid.segment.realtime.plumber.RealtimePlumberSchool;
import org.apache.druid.segment.realtime.plumber.VersioningPolicy;
import org.apache.druid.server.coordination.DataSegmentAnnouncer;
import org.apache.druid.server.security.ResourceAction;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.utils.CloseableUtils;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import javax.annotation.Nonnull;
import java.io.IOException;
import java.util.Map;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.CountDownLatch;
@ -179,6 +183,17 @@ public class RealtimeIndexTask extends AbstractTask
return "index_realtime";
}
@Override
@JsonIgnore
@Nonnull
public Set<ResourceAction> getInputSourceResources() throws UOE
{
throw new UOE(StringUtils.format(
"Task type [%s], does not support input source based security",
getType()
));
}
@Override
public String getNodeType()
{

View File

@ -19,6 +19,7 @@
package org.apache.druid.indexing.common.task;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.indexer.TaskStatus;
@ -28,12 +29,15 @@ import org.apache.druid.indexing.common.actions.RetrieveUnusedSegmentsAction;
import org.apache.druid.indexing.common.actions.SegmentMetadataUpdateAction;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.server.security.ResourceAction;
import org.apache.druid.timeline.DataSegment;
import org.joda.time.Interval;
import javax.annotation.Nonnull;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
public class RestoreTask extends AbstractFixedIntervalTask
{
@ -60,6 +64,14 @@ public class RestoreTask extends AbstractFixedIntervalTask
return "restore";
}
@Nonnull
@JsonIgnore
@Override
public Set<ResourceAction> getInputSourceResources()
{
return ImmutableSet.of();
}
@Override
public TaskStatus runTask(TaskToolbox toolbox) throws Exception
{

View File

@ -20,20 +20,25 @@
package org.apache.druid.indexing.common.task.batch.parallel;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.collect.HashBasedTable;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Table;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.server.security.ResourceAction;
import org.apache.druid.timeline.partition.BuildingShardSpec;
import org.apache.druid.timeline.partition.ShardSpec;
import org.joda.time.Interval;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
* {@link ParallelIndexTaskRunner} for the phase to merge generic partitioned segments in multi-phase parallel indexing.
@ -118,6 +123,14 @@ public class PartialGenericSegmentMergeTask extends PartialSegmentMergeTask<Buil
return TYPE;
}
@Nonnull
@JsonIgnore
@Override
public Set<ResourceAction> getInputSourceResources()
{
return ImmutableSet.of();
}
@Override
BuildingShardSpec<?> createShardSpec(TaskToolbox toolbox, Interval interval, int partitionId)
{

View File

@ -21,12 +21,14 @@ package org.apache.druid.indexing.input;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterators;
import org.apache.druid.client.coordinator.CoordinatorClient;
import org.apache.druid.data.input.AbstractInputSource;
@ -64,6 +66,7 @@ import org.apache.druid.utils.Streams;
import org.joda.time.Duration;
import org.joda.time.Interval;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.io.File;
import java.util.ArrayList;
@ -75,6 +78,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.ThreadLocalRandom;
@ -88,6 +92,8 @@ import java.util.stream.Stream;
@JsonInclude(JsonInclude.Include.NON_NULL)
public class DruidInputSource extends AbstractInputSource implements SplittableInputSource<List<WindowedSegmentId>>
{
public static final String TYPE_KEY = "druid";
private static final Logger LOG = new Logger(DruidInputSource.class);
/**
@ -179,6 +185,14 @@ public class DruidInputSource extends AbstractInputSource implements SplittableI
this.taskConfig = Preconditions.checkNotNull(taskConfig, "null taskConfig");
}
@JsonIgnore
@Nonnull
@Override
public Set<String> getTypes()
{
return ImmutableSet.of(TYPE_KEY);
}
@JsonProperty
public String getDataSource()
{

View File

@ -20,6 +20,7 @@
package org.apache.druid.indexing.input;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import org.apache.druid.data.input.AbstractInputSource;
@ -34,6 +35,7 @@ import org.apache.druid.data.input.InputStats;
import org.apache.druid.data.input.MapBasedInputRow;
import org.apache.druid.data.input.SplitHintSpec;
import org.apache.druid.data.input.impl.SplittableInputSource;
import org.apache.druid.guice.IndexingServiceInputSourceModule;
import org.apache.druid.java.util.common.CloseableIterators;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
@ -41,12 +43,15 @@ import org.apache.druid.segment.generator.DataGenerator;
import org.apache.druid.segment.generator.GeneratorBasicSchemas;
import org.apache.druid.segment.generator.GeneratorColumnSchema;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.io.File;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Random;
import java.util.Set;
import java.util.stream.LongStream;
import java.util.stream.Stream;
@ -106,6 +111,14 @@ public class GeneratorInputSource extends AbstractInputSource implements Splitta
this.timestampIncrement = timestampIncrement != null ? timestampIncrement : DEFAULT_TIMESTAMP_INCREMENT;
}
@JsonIgnore
@Nonnull
@Override
public Set<String> getTypes()
{
return Collections.singleton(IndexingServiceInputSourceModule.GENERATOR_SCHEME);
}
@Override
public Stream<InputSplit<Long>> createSplits(
InputFormat inputFormat,

View File

@ -21,6 +21,7 @@ package org.apache.druid.indexing.overlord.sampler;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import org.apache.druid.client.indexing.SamplerResponse;
@ -28,9 +29,17 @@ import org.apache.druid.client.indexing.SamplerSpec;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.InputSource;
import org.apache.druid.indexing.common.task.IndexTask;
import org.apache.druid.java.util.common.UOE;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.server.security.Action;
import org.apache.druid.server.security.Resource;
import org.apache.druid.server.security.ResourceAction;
import org.apache.druid.server.security.ResourceType;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.Set;
import java.util.stream.Collectors;
public class IndexTaskSamplerSpec implements SamplerSpec
{
@ -80,4 +89,21 @@ public class IndexTaskSamplerSpec implements SamplerSpec
{
return inputSourceSampler.sample(inputSource, inputFormat, dataSchema, samplerConfig);
}
@Override
public String getType()
{
return SamplerModule.INDEX_SCHEME;
}
@Override
@JsonIgnore
@Nonnull
public Set<ResourceAction> getInputSourceResources() throws UOE
{
return inputSource.getTypes()
.stream()
.map(i -> new ResourceAction(new Resource(i, ResourceType.EXTERNAL), Action.READ))
.collect(Collectors.toSet());
}
}

View File

@ -33,13 +33,15 @@ import java.util.List;
public class SamplerModule implements DruidModule
{
public static final String INDEX_SCHEME = "index";
@Override
public List<? extends Module> getJacksonModules()
{
return ImmutableList.of(
new SimpleModule(getClass().getSimpleName())
.registerSubtypes(
new NamedType(IndexTaskSamplerSpec.class, "index")
new NamedType(IndexTaskSamplerSpec.class, INDEX_SCHEME)
)
);
}

View File

@ -20,26 +20,67 @@
package org.apache.druid.indexing.overlord.sampler;
import com.google.common.base.Preconditions;
import com.sun.jersey.spi.container.ResourceFilters;
import com.google.inject.Inject;
import org.apache.druid.client.indexing.SamplerResponse;
import org.apache.druid.client.indexing.SamplerSpec;
import org.apache.druid.server.http.security.StateResourceFilter;
import org.apache.druid.server.security.Access;
import org.apache.druid.server.security.Action;
import org.apache.druid.server.security.AuthConfig;
import org.apache.druid.server.security.AuthorizationUtils;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.server.security.ForbiddenException;
import org.apache.druid.server.security.Resource;
import org.apache.druid.server.security.ResourceAction;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.Consumes;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import java.util.HashSet;
import java.util.Set;
@Path("/druid/indexer/v1/sampler")
public class SamplerResource
{
private final AuthorizerMapper authorizerMapper;
private final AuthConfig authConfig;
private static final ResourceAction STATE_RESOURCE_WRITE =
new ResourceAction(Resource.STATE_RESOURCE, Action.WRITE);
@Inject
public SamplerResource(
final AuthorizerMapper authorizerMapper,
final AuthConfig authConfig
)
{
this.authorizerMapper = authorizerMapper;
this.authConfig = authConfig;
}
@POST
@Consumes(MediaType.APPLICATION_JSON)
@Produces(MediaType.APPLICATION_JSON)
@ResourceFilters(StateResourceFilter.class)
public SamplerResponse post(final SamplerSpec sampler)
public SamplerResponse post(final SamplerSpec sampler, @Context final HttpServletRequest req)
{
return Preconditions.checkNotNull(sampler, "Request body cannot be empty").sample();
Preconditions.checkNotNull(sampler, "Request body cannot be empty");
Set<ResourceAction> resourceActions = new HashSet<>();
resourceActions.add(STATE_RESOURCE_WRITE);
if (authConfig.isEnableInputSourceSecurity()) {
resourceActions.addAll(sampler.getInputSourceResources());
}
Access authResult = AuthorizationUtils.authorizeAllResourceActions(
req,
resourceActions,
authorizerMapper
);
if (!authResult.isAllowed()) {
throw new ForbiddenException(authResult.getMessage());
}
return sampler.sample();
}
}

View File

@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.common.task;
import org.apache.druid.java.util.common.Intervals;
import org.junit.Assert;
import org.junit.Test;
public class ArchiveTaskTest
{
@Test
public void testGetInputSourceResources()
{
final ArchiveTask task = new ArchiveTask(
null,
"foo",
Intervals.of("2010-01-01/P1D"),
null
);
Assert.assertTrue(task.getInputSourceResources().isEmpty());
}
}

View File

@ -21,6 +21,7 @@ package org.apache.druid.indexing.common.task;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.exc.ValueInstantiationException;
@ -127,6 +128,7 @@ import org.apache.druid.segment.selector.settable.SettableColumnValueSelector;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
import org.apache.druid.server.security.AuthTestUtils;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.server.security.ResourceAction;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.hamcrest.CoreMatchers;
@ -139,6 +141,7 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
@ -152,6 +155,7 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@ -2188,6 +2192,14 @@ public class CompactionTaskTest
return "compact";
}
@Nonnull
@JsonIgnore
@Override
public Set<ResourceAction> getInputSourceResources()
{
return ImmutableSet.of();
}
@JsonProperty
public Interval getInterval()
{

View File

@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.common.task;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.java.util.common.Intervals;
import org.junit.Assert;
import org.junit.Test;
public class MoveTaskTest
{
@Test
public void testGetInputSourceResources()
{
final MoveTask task = new MoveTask(
null,
"foo",
Intervals.of("2010-01-01/P1D"),
ImmutableMap.of("bucket", "hey", "baseKey", "what"),
null,
null
);
Assert.assertTrue(task.getInputSourceResources().isEmpty());
}
}

View File

@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.common.task;
import org.apache.druid.java.util.common.Intervals;
import org.junit.Assert;
import org.junit.Test;
public class RestoreTaskTest
{
@Test
public void testGetInputSourceResources()
{
final RestoreTask task = new RestoreTask(
null,
"foo",
Intervals.of("2010-01-01/P1D"),
null
);
Assert.assertTrue(task.getInputSourceResources().isEmpty());
}
}

View File

@ -143,4 +143,10 @@ public class PartialGenericSegmentMergeTaskTest extends AbstractParallelIndexSup
ParallelIndexTestingFactory.CONTEXT
);
}
@Test
public void testGetInputSourceResources()
{
Assert.assertTrue(target.getInputSourceResources().isEmpty());
}
}

View File

@ -299,4 +299,25 @@ public class DruidInputSourceTest
Assert.assertTrue(columnsFilter.apply(column));
Assert.assertFalse(columnsFilter.apply(metricName));
}
@Test
public void testGetTypes()
{
String datasource = "foo";
Interval interval = Intervals.of("2000/2001");
DruidInputSource druidInputSource = new DruidInputSource(
datasource,
interval,
null,
null,
ImmutableList.of("a"),
ImmutableList.of("b"),
indexIO,
coordinatorClient,
segmentCacheManagerFactory,
retryPolicyFactory,
taskConfig
);
Assert.assertEquals(ImmutableSet.of(DruidInputSource.TYPE_KEY), druidInputSource.getTypes());
}
}

View File

@ -22,10 +22,12 @@ package org.apache.druid.indexing.input;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import nl.jqno.equalsverifier.EqualsVerifier;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputSourceReader;
import org.apache.druid.data.input.InputSplit;
import org.apache.druid.guice.IndexingServiceInputSourceModule;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
@ -162,4 +164,21 @@ public class GeneratorInputSourceTest
((GeneratorInputSource) inputSource.withSplit(new InputSplit<>(2048L))).getSeed()
);
}
@Test
public void testGetTypes()
{
GeneratorInputSource inputSource = new GeneratorInputSource(
"basic",
null,
1000,
2,
1024L,
DateTimes.nowUtc().getMillis(),
1000,
1.0
);
Assert.assertEquals(ImmutableSet.of(IndexingServiceInputSourceModule.GENERATOR_SCHEME), inputSource.getTypes());
}
}

View File

@ -32,6 +32,10 @@ import org.apache.druid.guice.FirehoseModule;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.server.security.Action;
import org.apache.druid.server.security.Resource;
import org.apache.druid.server.security.ResourceAction;
import org.apache.druid.server.security.ResourceType;
import org.easymock.Capture;
import org.easymock.EasyMock;
import org.easymock.EasyMockSupport;
@ -42,6 +46,7 @@ import org.junit.rules.ExpectedException;
import java.io.File;
import java.io.IOException;
import java.util.Collections;
public class IndexTaskSamplerSpecTest extends EasyMockSupport
{
@ -124,5 +129,13 @@ public class IndexTaskSamplerSpecTest extends EasyMockSupport
SamplerConfig samplerConfig = capturedSamplerConfig.getValue();
Assert.assertEquals(123, samplerConfig.getNumRows());
Assert.assertEquals(2345, samplerConfig.getTimeoutMs());
Assert.assertEquals(
Collections.singleton(
new ResourceAction(new Resource(
LocalInputSource.TYPE_KEY,
ResourceType.EXTERNAL
), Action.READ)),
spec.getInputSourceResources()
);
}
}

View File

@ -0,0 +1,195 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.overlord.sampler;
import org.apache.druid.client.indexing.SamplerSpec;
import org.apache.druid.java.util.common.UOE;
import org.apache.druid.server.security.Access;
import org.apache.druid.server.security.Action;
import org.apache.druid.server.security.AuthConfig;
import org.apache.druid.server.security.AuthenticationResult;
import org.apache.druid.server.security.Authorizer;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.server.security.ForbiddenException;
import org.apache.druid.server.security.Resource;
import org.apache.druid.server.security.ResourceAction;
import org.apache.druid.server.security.ResourceType;
import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import javax.servlet.http.HttpServletRequest;
import java.util.Collections;
public class SamplerResourceTest
{
private HttpServletRequest req;
private AuthConfig authConfig;
private SamplerSpec samplerSpec;
private SamplerResource samplerResource;
private static class Users
{
private static final String INPUT_SOURCE_ALLOWED = "inputSourceAllowed";
private static final String INPUT_SOURCE_DISALLOWED = "inputSourceDisallowed";
}
private static final AuthorizerMapper AUTH_MAPPER = new AuthorizerMapper(null)
{
@Override
public Authorizer getAuthorizer(String name)
{
return new Authorizer()
{
@Override
public Access authorize(AuthenticationResult authenticationResult, Resource resource, Action action)
{
final String username = authenticationResult.getIdentity();
switch (resource.getType()) {
case ResourceType.EXTERNAL:
return new Access(
action == Action.READ && Users.INPUT_SOURCE_ALLOWED.equals(username)
);
default:
return new Access(true);
}
}
};
}
};
@Before
public void setUp()
{
req = EasyMock.createStrictMock(HttpServletRequest.class);
authConfig = EasyMock.createMock(AuthConfig.class);
samplerSpec = EasyMock.createMock(SamplerSpec.class);
samplerResource = new SamplerResource(AUTH_MAPPER, authConfig);
}
@Test
public void test_post_properResourcesAuthorized()
{
expectAuthorizationTokenCheck(Users.INPUT_SOURCE_DISALLOWED);
Authorizer mockAuthorizer = EasyMock.createMock(Authorizer.class);
AuthorizerMapper mockAuthMapper = EasyMock.createMock(AuthorizerMapper.class);
EasyMock.expect(mockAuthMapper.getAuthorizer("druid")).andReturn(mockAuthorizer);
EasyMock.expect(mockAuthorizer.authorize(
EasyMock.anyObject(AuthenticationResult.class),
EasyMock.eq(Resource.STATE_RESOURCE),
EasyMock.eq(Action.WRITE))).andReturn(Access.OK);
EasyMock.expect(authConfig.isEnableInputSourceSecurity()).andReturn(false);
EasyMock.expect(samplerSpec.sample()).andReturn(null);
EasyMock.replay(
req,
authConfig,
mockAuthorizer,
mockAuthMapper,
samplerSpec
);
samplerResource = new SamplerResource(mockAuthMapper, authConfig);
samplerResource.post(samplerSpec, req);
}
@Test
public void test_post_inputSourceSecurityEnabledAndinputSourceDisAllowed_throwsAuthError()
{
expectAuthorizationTokenCheck(Users.INPUT_SOURCE_DISALLOWED);
EasyMock.expect(authConfig.isEnableInputSourceSecurity()).andReturn(true);
EasyMock.expect(samplerSpec.getInputSourceResources()).andReturn(
Collections.singleton(new ResourceAction(new Resource("test", ResourceType.EXTERNAL), Action.READ)));
EasyMock.replay(
req,
authConfig,
samplerSpec
);
Assert.assertThrows(ForbiddenException.class, () -> samplerResource.post(samplerSpec, req));
}
@Test
public void test_post_inputSourceSecurityEnabledAndinputSourceAllowed_samples()
{
expectAuthorizationTokenCheck(Users.INPUT_SOURCE_ALLOWED);
EasyMock.expect(authConfig.isEnableInputSourceSecurity()).andReturn(true);
EasyMock.expect(samplerSpec.getInputSourceResources()).andReturn(
Collections.singleton(new ResourceAction(new Resource("test", ResourceType.EXTERNAL), Action.READ)));
EasyMock.expect(samplerSpec.sample()).andReturn(null);
EasyMock.replay(
req,
authConfig,
samplerSpec
);
samplerResource.post(samplerSpec, req);
}
@Test
public void test_post_inputSourceSecurityDisabledAndinputSourceDisAllowed_samples()
{
expectAuthorizationTokenCheck(Users.INPUT_SOURCE_DISALLOWED);
EasyMock.expect(authConfig.isEnableInputSourceSecurity()).andReturn(false);
EasyMock.expect(samplerSpec.sample()).andReturn(null);
EasyMock.replay(
req,
authConfig,
samplerSpec
);
samplerResource.post(samplerSpec, req);
}
@Test
public void test_post_inputSourceSecurityEnabledAndinputSourcNotSupported_throwsUOE()
{
expectAuthorizationTokenCheck(Users.INPUT_SOURCE_ALLOWED);
EasyMock.expect(authConfig.isEnableInputSourceSecurity()).andReturn(true);
EasyMock.expect(samplerSpec.getInputSourceResources()).andThrow(
new UOE("input source type 'test' does not support input source security feature"));
EasyMock.expect(samplerSpec.sample()).andReturn(null);
EasyMock.replay(
req,
authConfig,
samplerSpec
);
Assert.assertThrows(UOE.class, () -> samplerResource.post(samplerSpec, req));
}
private void expectAuthorizationTokenCheck(String username)
{
AuthenticationResult authenticationResult = new AuthenticationResult(username, "druid", null, null);
EasyMock.expect(req.getAttribute(AuthConfig.DRUID_ALLOW_UNSECURED_PATH)).andReturn(null).anyTimes();
EasyMock.expect(req.getAttribute(AuthConfig.DRUID_AUTHORIZATION_CHECKED)).andReturn(null).atLeastOnce();
EasyMock.expect(req.getAttribute(AuthConfig.DRUID_AUTHENTICATION_RESULT))
.andReturn(authenticationResult)
.atLeastOnce();
req.setAttribute(AuthConfig.DRUID_AUTHORIZATION_CHECKED, false);
EasyMock.expectLastCall().anyTimes();
req.setAttribute(AuthConfig.DRUID_AUTHORIZATION_CHECKED, true);
EasyMock.expectLastCall().anyTimes();
}
}

View File

@ -19,10 +19,46 @@
package org.apache.druid.client.indexing;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.UOE;
import org.apache.druid.server.security.ResourceAction;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.Set;
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
public interface SamplerSpec
{
SamplerResponse sample();
/**
* Returns the type of this sampler type.
*
* @return sampler spec type label
*/
@Nullable
default String getType()
{
return null;
}
/**
* @return The types of {@link org.apache.druid.data.input.InputSource} that the sampler spec uses.
* Empty set is returned if the sampler spec does not use any. Users can be given permission to access
* particular types of input sources but not others, using the
* {@link org.apache.druid.server.security.AuthConfig#enableInputSourceSecurity} config.
*/
@JsonIgnore
@Nonnull
default Set<ResourceAction> getInputSourceResources() throws UOE
{
throw new UOE(StringUtils.format(
"SamplerSpec type [%s], does not support input source based security",
getType()
));
}
}

View File

@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.client.indexing;
import org.apache.druid.java.util.common.UOE;
import org.junit.Assert;
import org.junit.Test;
public class SamplerSpecTest
{
private static final SamplerSpec SAMPLER_SPEC = new SamplerSpec()
{
@Override
public SamplerResponse sample()
{
return null;
}
};
@Test
public void testGetType()
{
Assert.assertNull(SAMPLER_SPEC.getType());
}
@Test
public void testGetInputSourceResources()
{
Assert.assertThrows(UOE.class, SAMPLER_SPEC::getInputSourceResources);
}
}

View File

@ -20,6 +20,7 @@
package org.apache.druid.sql.calcite;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.Module;
@ -40,6 +41,7 @@ import org.apache.druid.guice.DruidInjectorBuilder;
import org.apache.druid.initialization.DruidModule;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.UOE;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.metadata.input.InputSourceModule;
import org.apache.druid.query.Query;
@ -69,6 +71,7 @@ import org.junit.After;
import org.junit.Assert;
import org.junit.internal.matchers.ThrowableMessageMatcher;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.io.File;
import java.util.ArrayList;
@ -77,6 +80,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Stream;
public class CalciteIngestionDmlTest extends BaseCalciteQueryTest
@ -448,6 +452,14 @@ public class CalciteIngestionDmlTest extends BaseCalciteQueryTest
files = fileList;
}
@Override
@JsonIgnore
@Nonnull
public Set<String> getTypes()
{
throw new UOE("This inputSource does not support input source based security");
}
@JsonProperty
public List<File> getFiles()
{