From f2d6100124dbe7cbc92ad91d28bd12a1800a1f2a Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Fri, 8 Oct 2021 10:39:48 +0530 Subject: [PATCH] Require Datasource WRITE authorization for Supervisor and Task access (#11718) Follow up PR for #11680 Description Supervisor and Task APIs are related to ingestion and must always require Datasource WRITE authorization even if they are purely informative. Changes Check Datasource WRITE in SystemSchema for tables "supervisors" and "tasks" Check Datasource WRITE for APIs /supervisor/history and /supervisor/{id}/history Check Datasource for all Indexing Task APIs --- docs/operations/security-user-auth.md | 3 +- .../indexing/common/task/AbstractTask.java | 16 + .../AppenderatorDriverRealtimeIndexTask.java | 5 +- .../indexing/common/task/HadoopIndexTask.java | 3 +- .../druid/indexing/common/task/IndexTask.java | 7 +- .../indexing/common/task/IndexTaskUtils.java | 13 +- .../parallel/ParallelIndexSupervisorTask.java | 38 +- .../batch/parallel/SinglePhaseSubTask.java | 7 +- .../overlord/http/OverlordResource.java | 6 +- .../supervisor/SupervisorResource.java | 8 +- .../SeekableStreamIndexTaskRunner.java | 32 +- .../indexing/common/task/IndexTaskTest.java | 84 ++++ .../overlord/http/OverlordResourceTest.java | 179 +++++++- .../SupervisorResourceFilterTest.java | 2 +- .../supervisor/SupervisorResourceTest.java | 9 + ...SeekableStreamIndexTaskRunnerAuthTest.java | 393 ++++++++++++++++++ .../docker/ldap-configs/bootstrap.ldif | 72 ++-- .../AbstractAuthConfigurationTest.java | 175 +++++--- .../ITBasicAuthConfigurationTest.java | 36 +- .../ITBasicAuthLdapConfigurationTest.java | 27 +- .../sql/calcite/schema/SystemSchema.java | 4 +- .../sql/calcite/schema/SystemSchemaTest.java | 359 ++++++++++------ 22 files changed, 1168 insertions(+), 310 deletions(-) create mode 100644 indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerAuthTest.java diff --git a/docs/operations/security-user-auth.md b/docs/operations/security-user-auth.md index 54f4317a708..e5e5cab1bcd 100644 --- a/docs/operations/security-user-auth.md +++ b/docs/operations/security-user-auth.md @@ -139,7 +139,8 @@ Queries on the [system schema tables](../querying/sql.md#system-schema) require - `segments`: Segments will be filtered based on DATASOURCE READ permissions. - `servers`: The user requires STATE READ permissions. - `server_segments`: The user requires STATE READ permissions and segments will be filtered based on DATASOURCE READ permissions. -- `tasks`: Tasks will be filtered based on DATASOURCE READ permissions. +- `tasks`: Druid filters tasks according to DATASOURCE WRITE permissions. +- `supervisors`: Druid filters supervisors according to DATASOURCE WRITE permissions. ## Configuration Propagation diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java index 8964a1e1c68..e31270c3b1c 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java @@ -30,9 +30,12 @@ import org.apache.druid.indexing.common.actions.LockListAction; import org.apache.druid.indexing.common.actions.TaskActionClient; import org.apache.druid.query.Query; import org.apache.druid.query.QueryRunner; +import org.apache.druid.server.security.AuthorizerMapper; +import org.apache.druid.server.security.ForbiddenException; import org.joda.time.Interval; import javax.annotation.Nullable; +import javax.servlet.http.HttpServletRequest; import java.io.IOException; import java.util.HashMap; import java.util.List; @@ -164,6 +167,19 @@ public abstract class AbstractTask implements Task return TaskStatus.success(getId()); } + /** + * Authorizes WRITE action on a task's datasource + * + * @throws ForbiddenException if not authorized + */ + public void authorizeRequestForDatasourceWrite( + HttpServletRequest request, + AuthorizerMapper authorizerMapper + ) throws ForbiddenException + { + IndexTaskUtils.authorizeRequestForDatasourceWrite(request, dataSource, authorizerMapper); + } + @Override public boolean equals(Object o) { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java index 8908014adfb..4e1c5e7a275 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java @@ -84,7 +84,6 @@ import org.apache.druid.segment.realtime.firehose.ClippedFirehoseFactory; import org.apache.druid.segment.realtime.firehose.EventReceiverFirehoseFactory; import org.apache.druid.segment.realtime.firehose.TimedShutoffFirehoseFactory; import org.apache.druid.segment.realtime.plumber.Committers; -import org.apache.druid.server.security.Action; import org.apache.druid.server.security.AuthorizerMapper; import org.apache.druid.timeline.partition.NumberedPartialShardSpec; import org.checkerframework.checker.nullness.qual.MonotonicNonNull; @@ -530,7 +529,7 @@ public class AppenderatorDriverRealtimeIndexTask extends AbstractTask implements @Context final HttpServletRequest req ) { - IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, getDataSource(), authorizerMapper); + authorizeRequestForDatasourceWrite(req, authorizerMapper); Map returnMap = new HashMap<>(); Map totalsMap = new HashMap<>(); Map averagesMap = new HashMap<>(); @@ -556,7 +555,7 @@ public class AppenderatorDriverRealtimeIndexTask extends AbstractTask implements @Context final HttpServletRequest req ) { - IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, getDataSource(), authorizerMapper); + authorizeRequestForDatasourceWrite(req, authorizerMapper); List events = IndexTaskUtils.getMessagesFromSavedParseExceptions( parseExceptionHandler.getSavedParseExceptions() ); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java index b04097c9353..97e5ec021d8 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java @@ -63,7 +63,6 @@ import org.apache.druid.segment.indexing.granularity.ArbitraryGranularitySpec; import org.apache.druid.segment.indexing.granularity.GranularitySpec; import org.apache.druid.segment.realtime.firehose.ChatHandler; import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider; -import org.apache.druid.server.security.Action; import org.apache.druid.server.security.AuthorizerMapper; import org.apache.druid.timeline.DataSegment; import org.apache.hadoop.mapred.JobClient; @@ -634,7 +633,7 @@ public class HadoopIndexTask extends HadoopTask implements ChatHandler @QueryParam("windows") List windows ) { - IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, getDataSource(), authorizerMapper); + authorizeRequestForDatasourceWrite(req, authorizerMapper); Map returnMap = new HashMap<>(); Map totalsMap = new HashMap<>(); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java index a993b539076..c68a2b44c57 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java @@ -95,7 +95,6 @@ import org.apache.druid.segment.realtime.appenderator.SegmentsAndCommitMetadata; import org.apache.druid.segment.realtime.appenderator.TransactionalSegmentPublisher; import org.apache.druid.segment.realtime.firehose.ChatHandler; import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory; -import org.apache.druid.server.security.Action; import org.apache.druid.server.security.AuthorizerMapper; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec; @@ -285,7 +284,7 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler @QueryParam("full") String full ) { - IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, getDataSource(), authorizerMapper); + authorizeRequestForDatasourceWrite(req, authorizerMapper); return Response.ok(doGetUnparseableEvents(full)).build(); } @@ -394,7 +393,7 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler @QueryParam("full") String full ) { - IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, getDataSource(), authorizerMapper); + authorizeRequestForDatasourceWrite(req, authorizerMapper); return Response.ok(doGetRowStats(full)).build(); } @@ -406,7 +405,7 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler @QueryParam("full") String full ) { - IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, getDataSource(), authorizerMapper); + authorizeRequestForDatasourceWrite(req, authorizerMapper); Map returnMap = new HashMap<>(); Map ingestionStatsAndErrors = new HashMap<>(); Map payload = new HashMap<>(); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTaskUtils.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTaskUtils.java index abc6b923e1a..5be50b6c7d3 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTaskUtils.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTaskUtils.java @@ -56,28 +56,25 @@ public class IndexTaskUtils } /** - * Authorizes action to be performed on a task's datasource + * Authorizes WRITE action on a task's datasource * - * @return authorization result + * @throws ForbiddenException if not authorized */ - public static Access datasourceAuthorizationCheck( + public static void authorizeRequestForDatasourceWrite( final HttpServletRequest req, - Action action, String datasource, AuthorizerMapper authorizerMapper - ) + ) throws ForbiddenException { ResourceAction resourceAction = new ResourceAction( new Resource(datasource, ResourceType.DATASOURCE), - action + Action.WRITE ); Access access = AuthorizationUtils.authorizeResourceAction(req, resourceAction, authorizerMapper); if (!access.isAllowed()) { throw new ForbiddenException(access.toString()); } - - return access; } public static void setTaskDimensions(final ServiceMetricEvent.Builder metricBuilder, final Task task) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java index 201a48e8e63..3dfc9c7213d 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java @@ -53,7 +53,6 @@ import org.apache.druid.indexing.common.task.CurrentSubTaskHolder; import org.apache.druid.indexing.common.task.IndexTask; import org.apache.druid.indexing.common.task.IndexTask.IndexIngestionSpec; import org.apache.druid.indexing.common.task.IndexTask.IndexTuningConfig; -import org.apache.druid.indexing.common.task.IndexTaskUtils; import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.common.task.TaskResource; import org.apache.druid.indexing.common.task.Tasks; @@ -75,8 +74,6 @@ import org.apache.druid.segment.indexing.granularity.GranularitySpec; import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; import org.apache.druid.segment.realtime.appenderator.TransactionalSegmentPublisher; import org.apache.druid.segment.realtime.firehose.ChatHandler; -import org.apache.druid.segment.realtime.firehose.ChatHandlers; -import org.apache.druid.server.security.Action; import org.apache.druid.server.security.AuthorizerMapper; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.partition.BuildingShardSpec; @@ -1176,7 +1173,7 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen @Context final HttpServletRequest req ) { - ChatHandlers.authorizationCheck(req, Action.READ, getDataSource(), authorizerMapper); + authorizeRequestForDatasourceWrite(req, authorizerMapper); if (toolbox == null) { return Response.status(Response.Status.SERVICE_UNAVAILABLE).entity("task is not running yet").build(); @@ -1255,12 +1252,7 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen @Context final HttpServletRequest req ) { - ChatHandlers.authorizationCheck( - req, - Action.WRITE, - getDataSource(), - authorizerMapper - ); + authorizeRequestForDatasourceWrite(req, authorizerMapper); if (currentSubTaskHolder == null || currentSubTaskHolder.getTask() == null) { return Response.status(Response.Status.SERVICE_UNAVAILABLE).entity("task is not running yet").build(); } else { @@ -1278,7 +1270,7 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen @Produces(MediaType.APPLICATION_JSON) public Response getMode(@Context final HttpServletRequest req) { - IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, getDataSource(), authorizerMapper); + authorizeRequestForDatasourceWrite(req, authorizerMapper); return Response.ok(isParallelMode() ? "parallel" : "sequential").build(); } @@ -1287,7 +1279,7 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen @Produces(MediaType.APPLICATION_JSON) public Response getPhaseName(@Context final HttpServletRequest req) { - IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, getDataSource(), authorizerMapper); + authorizeRequestForDatasourceWrite(req, authorizerMapper); if (isParallelMode()) { final ParallelIndexTaskRunner runner = getCurrentRunner(); if (runner == null) { @@ -1305,7 +1297,7 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen @Produces(MediaType.APPLICATION_JSON) public Response getProgress(@Context final HttpServletRequest req) { - IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, getDataSource(), authorizerMapper); + authorizeRequestForDatasourceWrite(req, authorizerMapper); final ParallelIndexTaskRunner currentRunner = getCurrentRunner(); if (currentRunner == null) { return Response.status(Response.Status.SERVICE_UNAVAILABLE).entity("task is not running yet").build(); @@ -1319,7 +1311,7 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen @Produces(MediaType.APPLICATION_JSON) public Response getRunningTasks(@Context final HttpServletRequest req) { - IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, getDataSource(), authorizerMapper); + authorizeRequestForDatasourceWrite(req, authorizerMapper); final ParallelIndexTaskRunner currentRunner = getCurrentRunner(); if (currentRunner == null) { return Response.status(Response.Status.SERVICE_UNAVAILABLE).entity("task is not running yet").build(); @@ -1333,7 +1325,7 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen @Produces(MediaType.APPLICATION_JSON) public Response getSubTaskSpecs(@Context final HttpServletRequest req) { - IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, getDataSource(), authorizerMapper); + authorizeRequestForDatasourceWrite(req, authorizerMapper); final ParallelIndexTaskRunner currentRunner = getCurrentRunner(); if (currentRunner == null) { return Response.status(Response.Status.SERVICE_UNAVAILABLE).entity("task is not running yet").build(); @@ -1347,7 +1339,7 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen @Produces(MediaType.APPLICATION_JSON) public Response getRunningSubTaskSpecs(@Context final HttpServletRequest req) { - IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, getDataSource(), authorizerMapper); + authorizeRequestForDatasourceWrite(req, authorizerMapper); final ParallelIndexTaskRunner currentRunner = getCurrentRunner(); if (currentRunner == null) { return Response.status(Response.Status.SERVICE_UNAVAILABLE).entity("task is not running yet").build(); @@ -1361,7 +1353,7 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen @Produces(MediaType.APPLICATION_JSON) public Response getCompleteSubTaskSpecs(@Context final HttpServletRequest req) { - IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, getDataSource(), authorizerMapper); + authorizeRequestForDatasourceWrite(req, authorizerMapper); final ParallelIndexTaskRunner currentRunner = getCurrentRunner(); if (currentRunner == null) { return Response.status(Response.Status.SERVICE_UNAVAILABLE).entity("task is not running yet").build(); @@ -1375,7 +1367,7 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen @Produces(MediaType.APPLICATION_JSON) public Response getSubTaskSpec(@PathParam("id") String id, @Context final HttpServletRequest req) { - IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, getDataSource(), authorizerMapper); + authorizeRequestForDatasourceWrite(req, authorizerMapper); final ParallelIndexTaskRunner currentRunner = getCurrentRunner(); if (currentRunner == null) { @@ -1395,7 +1387,7 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen @Produces(MediaType.APPLICATION_JSON) public Response getSubTaskState(@PathParam("id") String id, @Context final HttpServletRequest req) { - IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, getDataSource(), authorizerMapper); + authorizeRequestForDatasourceWrite(req, authorizerMapper); final ParallelIndexTaskRunner currentRunner = getCurrentRunner(); if (currentRunner == null) { return Response.status(Response.Status.SERVICE_UNAVAILABLE).entity("task is not running yet").build(); @@ -1417,7 +1409,7 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen @Context final HttpServletRequest req ) { - IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, getDataSource(), authorizerMapper); + authorizeRequestForDatasourceWrite(req, authorizerMapper); final ParallelIndexTaskRunner currentRunner = getCurrentRunner(); if (currentRunner == null) { return Response.status(Response.Status.SERVICE_UNAVAILABLE).entity("task is not running yet").build(); @@ -1569,7 +1561,7 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen @QueryParam("full") String full ) { - IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, getDataSource(), authorizerMapper); + authorizeRequestForDatasourceWrite(req, authorizerMapper); return Response.ok(doGetRowStatsAndUnparseableEvents(full, false).lhs).build(); } @@ -1614,8 +1606,8 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen @QueryParam("full") String full ) { - IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, getDataSource(), authorizerMapper); - + authorizeRequestForDatasourceWrite(req, authorizerMapper); return Response.ok(doGetLiveReports(full)).build(); } + } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java index 267df485a64..f979bc89ffc 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java @@ -67,7 +67,6 @@ import org.apache.druid.segment.realtime.appenderator.BaseAppenderatorDriver; import org.apache.druid.segment.realtime.appenderator.BatchAppenderatorDriver; import org.apache.druid.segment.realtime.appenderator.SegmentsAndCommitMetadata; import org.apache.druid.segment.realtime.firehose.ChatHandler; -import org.apache.druid.server.security.Action; import org.apache.druid.server.security.AuthorizerMapper; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.TimelineObjectHolder; @@ -488,7 +487,7 @@ public class SinglePhaseSubTask extends AbstractBatchSubtask implements ChatHand @QueryParam("full") String full ) { - IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, getDataSource(), authorizerMapper); + authorizeRequestForDatasourceWrite(req, authorizerMapper); Map> events = new HashMap<>(); boolean needsBuildSegments = false; @@ -563,7 +562,7 @@ public class SinglePhaseSubTask extends AbstractBatchSubtask implements ChatHand @QueryParam("full") String full ) { - IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, getDataSource(), authorizerMapper); + authorizeRequestForDatasourceWrite(req, authorizerMapper); return Response.ok(doGetRowStats(full)).build(); } @@ -595,7 +594,7 @@ public class SinglePhaseSubTask extends AbstractBatchSubtask implements ChatHand @QueryParam("full") String full ) { - IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, getDataSource(), authorizerMapper); + authorizeRequestForDatasourceWrite(req, authorizerMapper); return Response.ok(doGetLiveReports(full)).build(); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java index 0076eb0335b..1f596dd327d 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java @@ -575,11 +575,11 @@ public class OverlordResource } } // early authorization check if datasource != null - // fail fast if user not authorized to access datasource + // fail fast if user not authorized to write to datasource if (dataSource != null) { final ResourceAction resourceAction = new ResourceAction( new Resource(dataSource, ResourceType.DATASOURCE), - Action.READ + Action.WRITE ); final Access authResult = AuthorizationUtils.authorizeResourceAction( req, @@ -987,7 +987,7 @@ public class OverlordResource ); } return Collections.singletonList( - new ResourceAction(new Resource(taskDatasource, ResourceType.DATASOURCE), Action.READ) + new ResourceAction(new Resource(taskDatasource, ResourceType.DATASOURCE), Action.WRITE) ); }; List optionalTypeFilteredList = collectionToFilter; diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java index 74c5b6192cb..91046661e5d 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java @@ -65,7 +65,7 @@ import java.util.stream.Collectors; @Path("/druid/indexer/v1/supervisor") public class SupervisorResource { - private static final Function> SPEC_DATASOURCE_READ_RA_GENERATOR = + private static final Function> SPEC_DATASOURCE_WRITE_RA_GENERATOR = supervisorSpec -> { if (supervisorSpec.getSpec() == null) { return null; @@ -75,7 +75,7 @@ public class SupervisorResource } return Iterables.transform( supervisorSpec.getSpec().getDataSources(), - AuthorizationUtils.DATASOURCE_READ_RA_GENERATOR + AuthorizationUtils.DATASOURCE_WRITE_RA_GENERATOR ); }; @@ -376,7 +376,7 @@ public class SupervisorResource AuthorizationUtils.filterAuthorizedResources( req, manager.getSupervisorHistory(), - SPEC_DATASOURCE_READ_RA_GENERATOR, + SPEC_DATASOURCE_WRITE_RA_GENERATOR, authorizerMapper ) ).build() @@ -401,7 +401,7 @@ public class SupervisorResource AuthorizationUtils.filterAuthorizedResources( req, historyForId, - SPEC_DATASOURCE_READ_RA_GENERATOR, + SPEC_DATASOURCE_WRITE_RA_GENERATOR, authorizerMapper ) ); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java index fba93526aad..50bf440b67b 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java @@ -81,8 +81,6 @@ import org.apache.druid.segment.realtime.appenderator.AppenderatorDriverAddResul import org.apache.druid.segment.realtime.appenderator.SegmentsAndCommitMetadata; import org.apache.druid.segment.realtime.appenderator.StreamAppenderatorDriver; import org.apache.druid.segment.realtime.firehose.ChatHandler; -import org.apache.druid.server.security.Access; -import org.apache.druid.server.security.Action; import org.apache.druid.server.security.AuthorizerMapper; import org.apache.druid.timeline.DataSegment; import org.apache.druid.utils.CollectionUtils; @@ -1361,12 +1359,10 @@ public abstract class SeekableStreamIndexTaskRunner getCurrentOffsets(@Context final HttpServletRequest req) { - authorizationCheck(req, Action.READ); + authorizeRequest(req); return getCurrentOffsets(); } @@ -1482,7 +1478,7 @@ public abstract class SeekableStreamIndexTaskRunner getEndOffsetsHTTP(@Context final HttpServletRequest req) { - authorizationCheck(req, Action.READ); + authorizeRequest(req); return getEndOffsets(); } @@ -1502,7 +1498,7 @@ public abstract class SeekableStreamIndexTaskRunner events = IndexTaskUtils.getMessagesFromSavedParseExceptions( parseExceptionHandler.getSavedParseExceptions() ); @@ -1726,7 +1722,7 @@ public abstract class SeekableStreamIndexTaskRunner { + final String username = authenticationResult.getIdentity(); + if (!resource.getType().equals(ResourceType.DATASOURCE) || username == null) { + return new Access(false); + } else if (action == Action.WRITE) { + return new Access(username.equals(datasourceWriteUser)); + } else { + return new Access(username.equals(datasourceReadUser)); + } + }; + } + }; + + // Create test target + final IndexTask indexTask = new IndexTask( + null, + null, + createDefaultIngestionSpec( + jsonMapper, + temporaryFolder.newFolder(), + null, + null, + createTuningConfigWithMaxRowsPerSegment(2, true), + false, + false + ), + null + ); + + // Verify that datasourceWriteUser is successfully authorized + HttpServletRequest writeUserRequest = EasyMock.mock(HttpServletRequest.class); + expectAuthorizationTokenCheck(datasourceWriteUser, writeUserRequest); + EasyMock.replay(writeUserRequest); + indexTask.authorizeRequestForDatasourceWrite(writeUserRequest, authorizerMapper); + + // Verify that datasourceReadUser is not successfully authorized + HttpServletRequest readUserRequest = EasyMock.mock(HttpServletRequest.class); + expectAuthorizationTokenCheck(datasourceReadUser, readUserRequest); + EasyMock.replay(readUserRequest); + expectedException.expect(ForbiddenException.class); + indexTask.authorizeRequestForDatasourceWrite(readUserRequest, authorizerMapper); + } + + private void expectAuthorizationTokenCheck(String username, HttpServletRequest request) + { + AuthenticationResult authenticationResult = new AuthenticationResult(username, "druid", null, null); + EasyMock.expect(request.getAttribute(AuthConfig.DRUID_ALLOW_UNSECURED_PATH)).andReturn(null).anyTimes(); + EasyMock.expect(request.getAttribute(AuthConfig.DRUID_AUTHORIZATION_CHECKED)).andReturn(null).atLeastOnce(); + EasyMock.expect(request.getAttribute(AuthConfig.DRUID_AUTHENTICATION_RESULT)) + .andReturn(authenticationResult) + .atLeastOnce(); + + request.setAttribute(AuthConfig.DRUID_AUTHORIZATION_CHECKED, false); + EasyMock.expectLastCall().anyTimes(); + + request.setAttribute(AuthConfig.DRUID_AUTHORIZATION_CHECKED, true); + EasyMock.expectLastCall().anyTimes(); + } + @Test public void testEqualsAndHashCode() { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java index c4751669b27..ae494eee3bf 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java @@ -71,6 +71,7 @@ import org.junit.Test; import org.junit.rules.ExpectedException; import javax.servlet.http.HttpServletRequest; +import javax.ws.rs.WebApplicationException; import javax.ws.rs.core.Response; import javax.ws.rs.core.Response.Status; import java.util.Arrays; @@ -116,10 +117,24 @@ public class OverlordResourceTest @Override public Access authorize(AuthenticationResult authenticationResult, Resource resource, Action action) { - if (resource.getName().equals("allow")) { - return new Access(true); - } else { - return new Access(false); + final String username = authenticationResult.getIdentity(); + switch (resource.getName()) { + case "allow": + return new Access(true); + case Datasources.WIKIPEDIA: + // All users can read wikipedia but only writer can write + return new Access( + action == Action.READ + || (action == Action.WRITE && Users.WIKI_WRITER.equals(username)) + ); + case Datasources.BUZZFEED: + // All users can read buzzfeed but only writer can write + return new Access( + action == Action.READ + || (action == Action.WRITE && Users.BUZZ_WRITER.equals(username)) + ); + default: + return new Access(false); } } @@ -841,6 +856,104 @@ public class OverlordResourceTest Assert.assertTrue("DataSource Check", "allow".equals(responseObjects.get(0).getDataSource())); } + @Test + public void testGetTasksRequiresDatasourceWrite() + { + // Setup mocks for a user who has write access to "wikipedia" + // and read access to "buzzfeed" + expectAuthorizationTokenCheck(Users.WIKI_WRITER); + + // Setup mocks to return completed, active, known, pending and running tasks + EasyMock.expect(taskStorageQueryAdapter.getCompletedTaskInfoByCreatedTimeDuration(null, null, null)).andStubReturn( + ImmutableList.of( + createTaskInfo("id_5", Datasources.WIKIPEDIA), + createTaskInfo("id_6", Datasources.BUZZFEED) + ) + ); + + EasyMock.expect(taskStorageQueryAdapter.getActiveTaskInfo(null)).andStubReturn( + ImmutableList.of( + createTaskInfo("id_1", Datasources.WIKIPEDIA), + createTaskInfo("id_2", Datasources.BUZZFEED) + ) + ); + + EasyMock.>expect(taskRunner.getKnownTasks()).andReturn( + ImmutableList.of( + new MockTaskRunnerWorkItem("id_1", null), + new MockTaskRunnerWorkItem("id_4", null) + ) + ).atLeastOnce(); + + EasyMock.>expect(taskRunner.getPendingTasks()).andReturn( + ImmutableList.of( + new MockTaskRunnerWorkItem("id_4", null) + ) + ); + + EasyMock.>expect(taskRunner.getRunningTasks()).andReturn( + ImmutableList.of( + new MockTaskRunnerWorkItem("id_1", null) + ) + ); + + // Replay all mocks + EasyMock.replay( + taskRunner, + taskMaster, + taskStorageQueryAdapter, + indexerMetadataStorageAdapter, + req, + workerTaskRunnerQueryAdapter + ); + + // Verify that only the tasks of write access datasource are returned + List responseObjects = (List) overlordResource + .getTasks(null, null, null, null, null, req) + .getEntity(); + Assert.assertEquals(2, responseObjects.size()); + for (TaskStatusPlus taskStatus : responseObjects) { + Assert.assertEquals(Datasources.WIKIPEDIA, taskStatus.getDataSource()); + } + } + + @Test + public void testGetTasksFilterByDatasourceRequiresWrite() + { + // Setup mocks for a user who has write access to "wikipedia" + // and read access to "buzzfeed" + expectAuthorizationTokenCheck(Users.WIKI_WRITER); + + // Setup mocks to return completed, active, known, pending and running tasks + EasyMock.expect(taskStorageQueryAdapter.getCompletedTaskInfoByCreatedTimeDuration(null, null, null)).andStubReturn( + ImmutableList.of( + createTaskInfo("id_5", Datasources.WIKIPEDIA), + createTaskInfo("id_6", Datasources.BUZZFEED) + ) + ); + + EasyMock.expect(taskStorageQueryAdapter.getActiveTaskInfo(null)).andStubReturn( + ImmutableList.of( + createTaskInfo("id_1", Datasources.WIKIPEDIA), + createTaskInfo("id_2", Datasources.BUZZFEED) + ) + ); + + // Replay all mocks + EasyMock.replay( + taskRunner, + taskMaster, + taskStorageQueryAdapter, + indexerMetadataStorageAdapter, + req, + workerTaskRunnerQueryAdapter + ); + + // Verify that only the tasks of write access datasource are returned + expectedException.expect(WebApplicationException.class); + overlordResource.getTasks(null, Datasources.BUZZFEED, null, null, null, req); + } + @Test public void testGetNullCompleteTask() { @@ -926,6 +1039,27 @@ public class OverlordResourceTest overlordResource.taskPost(task, req); } + @Test + public void testTaskPostDeniesDatasourceReadUser() + { + expectAuthorizationTokenCheck(Users.WIKI_WRITER); + + EasyMock.replay( + taskRunner, + taskMaster, + taskStorageQueryAdapter, + indexerMetadataStorageAdapter, + req, + workerTaskRunnerQueryAdapter + ); + + // Verify that taskPost fails for user who has only datasource read access + Task task = NoopTask.create(Datasources.BUZZFEED); + expectedException.expect(ForbiddenException.class); + expectedException.expect(ForbiddenException.class); + overlordResource.taskPost(task, req); + } + @Test public void testKillPendingSegments() { @@ -1317,7 +1451,12 @@ public class OverlordResourceTest private void expectAuthorizationTokenCheck() { - AuthenticationResult authenticationResult = new AuthenticationResult("druid", "druid", null, null); + expectAuthorizationTokenCheck(Users.DRUID); + } + + private void expectAuthorizationTokenCheck(String username) + { + AuthenticationResult authenticationResult = new AuthenticationResult(username, "druid", null, null); EasyMock.expect(req.getAttribute(AuthConfig.DRUID_ALLOW_UNSECURED_PATH)).andReturn(null).anyTimes(); EasyMock.expect(req.getAttribute(AuthConfig.DRUID_AUTHORIZATION_CHECKED)).andReturn(null).atLeastOnce(); EasyMock.expect(req.getAttribute(AuthConfig.DRUID_AUTHENTICATION_RESULT)) @@ -1360,6 +1499,36 @@ public class OverlordResourceTest }; } + private TaskInfo createTaskInfo(String taskId, String datasource) + { + return new TaskInfo<>( + taskId, + DateTime.now(ISOChronology.getInstanceUTC()), + TaskStatus.success(taskId), + datasource, + getTaskWithIdAndDatasource(taskId, datasource) + ); + } + + /** + * Usernames to use in the tests. + */ + private static class Users + { + private static final String DRUID = "druid"; + private static final String WIKI_WRITER = "Wiki Writer"; + private static final String BUZZ_WRITER = "Buzz Writer"; + } + + /** + * Datasource names to use in the tests. + */ + private static class Datasources + { + private static final String WIKIPEDIA = "wikipedia"; + private static final String BUZZFEED = "buzzfeed"; + } + private static class MockTaskRunnerWorkItem extends TaskRunnerWorkItem { public MockTaskRunnerWorkItem( diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/security/SupervisorResourceFilterTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/security/SupervisorResourceFilterTest.java index d38165ccd1b..118b8b5f692 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/security/SupervisorResourceFilterTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/security/SupervisorResourceFilterTest.java @@ -130,7 +130,7 @@ public class SupervisorResourceFilterTest ) { expect(containerRequest.getPathSegments()) - .andReturn(getPathSegments("/druid/indexer/v1/supervisor/datasource1")) + .andReturn(getPathSegments(path)) .anyTimes(); expect(containerRequest.getMethod()).andReturn(requestMethod).anyTimes(); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java index c22460ec926..8c77ea0c97f 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java @@ -30,10 +30,12 @@ import org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAu import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.segment.TestHelper; import org.apache.druid.server.security.Access; +import org.apache.druid.server.security.Action; import org.apache.druid.server.security.AuthConfig; import org.apache.druid.server.security.AuthenticationResult; import org.apache.druid.server.security.Authorizer; import org.apache.druid.server.security.AuthorizerMapper; +import org.apache.druid.server.security.ResourceType; import org.easymock.Capture; import org.easymock.EasyMock; import org.easymock.EasyMockRunner; @@ -92,7 +94,14 @@ public class SupervisorResourceTest extends EasyMockSupport @Override public Authorizer getAuthorizer(String name) { + // Create an Authorizer that only allows Datasource WRITE requests + // because all SupervisorResource APIs must only check Datasource WRITE access return (authenticationResult, resource, action) -> { + if (!resource.getType().equals(ResourceType.DATASOURCE) + || action != Action.WRITE) { + return new Access(false); + } + if (authenticationResult.getIdentity().equals("druid")) { return Access.OK; } else { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerAuthTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerAuthTest.java new file mode 100644 index 00000000000..3eba53fe934 --- /dev/null +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerAuthTest.java @@ -0,0 +1,393 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.seekablestream; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.druid.data.input.impl.ByteEntity; +import org.apache.druid.data.input.impl.CsvInputFormat; +import org.apache.druid.data.input.impl.DimensionsSpec; +import org.apache.druid.data.input.impl.TimestampSpec; +import org.apache.druid.indexing.common.LockGranularity; +import org.apache.druid.indexing.common.TaskToolbox; +import org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord; +import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber; +import org.apache.druid.indexing.seekablestream.common.RecordSupplier; +import org.apache.druid.indexing.seekablestream.common.StreamPartition; +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.granularity.AllGranularity; +import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.segment.indexing.DataSchema; +import org.apache.druid.segment.indexing.granularity.ArbitraryGranularitySpec; +import org.apache.druid.segment.transform.TransformSpec; +import org.apache.druid.server.security.Access; +import org.apache.druid.server.security.Action; +import org.apache.druid.server.security.AuthConfig; +import org.apache.druid.server.security.AuthenticationResult; +import org.apache.druid.server.security.Authorizer; +import org.apache.druid.server.security.AuthorizerMapper; +import org.apache.druid.server.security.ForbiddenException; +import org.apache.druid.server.security.ResourceType; +import org.easymock.EasyMock; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import javax.servlet.http.HttpServletRequest; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.function.Consumer; + +import static org.easymock.EasyMock.mock; +import static org.easymock.EasyMock.replay; + +/** + * Unit Test to test authorization requirements of + * {@link SeekableStreamIndexTaskRunner} and {@link SeekableStreamIndexTask}. + */ +public class SeekableStreamIndexTaskRunnerAuthTest +{ + + /** + * Test target. + */ + private TestSeekableStreamIndexTaskRunner taskRunner; + + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + @Before + public void setUp() + { + // Create an AuthorizerMapper that only allows access to a Datasource resource + AuthorizerMapper authorizerMapper = new AuthorizerMapper(null) + { + @Override + public Authorizer getAuthorizer(String name) + { + return (authenticationResult, resource, action) -> { + final String username = authenticationResult.getIdentity(); + + // Allow access to a Datasource if + // - any user requests Read access + // - or, Datasource Write User requests Write access + if (resource.getType().equals(ResourceType.DATASOURCE)) { + return new Access( + action == Action.READ + || (action == Action.WRITE && username.equals(Users.DATASOURCE_WRITE)) + ); + } + + // Do not allow access to any other resource + return new Access(false); + }; + } + }; + + DataSchema dataSchema = new DataSchema( + "datasource", + new TimestampSpec(null, null, null), + new DimensionsSpec(Collections.emptyList()), + new AggregatorFactory[]{}, + new ArbitraryGranularitySpec(new AllGranularity(), Collections.emptyList()), + TransformSpec.NONE, + null, + null + ); + SeekableStreamIndexTaskTuningConfig tuningConfig = mock(SeekableStreamIndexTaskTuningConfig.class); + /*expect(tuningConfig.getIntermediateHandoffPeriod()).andReturn(Period.minutes(10)); + expect(tuningConfig.isLogParseExceptions()).andReturn(false); + expect(tuningConfig.getMaxParseExceptions()).andReturn(1000); + expect(tuningConfig.getMaxSavedParseExceptions()).andReturn(100); + + replay(tuningConfig);*/ + + SeekableStreamIndexTaskIOConfig ioConfig = new TestSeekableStreamIndexTaskIOConfig(); + + // Initiliaze task and task runner + SeekableStreamIndexTask indexTask + = new TestSeekableStreamIndexTask("id", dataSchema, tuningConfig, ioConfig); + taskRunner = new TestSeekableStreamIndexTaskRunner(indexTask, authorizerMapper); + } + + @Test + public void testGetStatusHttp() + { + verifyOnlyDatasourceWriteUserCanAccess(taskRunner::getStatusHTTP); + } + + @Test + public void testGetStartTime() + { + verifyOnlyDatasourceWriteUserCanAccess(taskRunner::getStartTime); + } + + @Test + public void testStop() + { + verifyOnlyDatasourceWriteUserCanAccess(taskRunner::stop); + } + + @Test + public void testPauseHttp() + { + verifyOnlyDatasourceWriteUserCanAccess(req -> { + try { + taskRunner.pauseHTTP(req); + } + catch (InterruptedException e) { + // ignore + } + }); + } + + @Test + public void testResumeHttp() + { + verifyOnlyDatasourceWriteUserCanAccess(req -> { + try { + taskRunner.resumeHTTP(req); + } + catch (InterruptedException e) { + + } + }); + } + + @Test + public void testGetEndOffsets() + { + verifyOnlyDatasourceWriteUserCanAccess(taskRunner::getCurrentOffsets); + } + + @Test + public void testSetEndOffsetsHttp() + { + verifyOnlyDatasourceWriteUserCanAccess(request -> { + try { + taskRunner.setEndOffsetsHTTP(Collections.emptyMap(), false, request); + } + catch (InterruptedException e) { + + } + }); + } + + @Test + public void testGetCheckpointsHttp() + { + verifyOnlyDatasourceWriteUserCanAccess(taskRunner::getCheckpointsHTTP); + } + + + private void verifyOnlyDatasourceWriteUserCanAccess( + Consumer method + ) + { + // Verify that datasource write user can access + HttpServletRequest allowedRequest = createRequest(Users.DATASOURCE_WRITE); + replay(allowedRequest); + method.accept(allowedRequest); + + // Verify that no other user can access + HttpServletRequest blockedRequest = createRequest(Users.DATASOURCE_READ); + replay(blockedRequest); + expectedException.expect(ForbiddenException.class); + method.accept(blockedRequest); + } + + private HttpServletRequest createRequest(String username) + { + HttpServletRequest request = mock(HttpServletRequest.class); + + AuthenticationResult authenticationResult = new AuthenticationResult(username, "druid", null, null); + EasyMock.expect(request.getAttribute(AuthConfig.DRUID_ALLOW_UNSECURED_PATH)).andReturn(null).anyTimes(); + EasyMock.expect(request.getAttribute(AuthConfig.DRUID_AUTHORIZATION_CHECKED)).andReturn(null).atLeastOnce(); + EasyMock.expect(request.getAttribute(AuthConfig.DRUID_AUTHENTICATION_RESULT)) + .andReturn(authenticationResult) + .atLeastOnce(); + + request.setAttribute(AuthConfig.DRUID_AUTHORIZATION_CHECKED, false); + EasyMock.expectLastCall().anyTimes(); + + request.setAttribute(AuthConfig.DRUID_AUTHORIZATION_CHECKED, true); + EasyMock.expectLastCall().anyTimes(); + + return request; + } + + /** + * Dummy implementation used as the test target to test out the non-abstract methods. + */ + private static class TestSeekableStreamIndexTaskRunner + extends SeekableStreamIndexTaskRunner + { + + private TestSeekableStreamIndexTaskRunner( + SeekableStreamIndexTask task, + AuthorizerMapper authorizerMapper + ) + { + super(task, null, authorizerMapper, LockGranularity.SEGMENT); + } + + @Override + protected boolean isEndOfShard(String seqNum) + { + return false; + } + + @Nullable + @Override + protected TreeMap> getCheckPointsFromContext( + TaskToolbox toolbox, + String checkpointsString + ) + { + return null; + } + + @Override + protected String getNextStartOffset(String sequenceNumber) + { + return null; + } + + @Override + protected SeekableStreamEndSequenceNumbers deserializePartitionsFromMetadata( + ObjectMapper mapper, + Object object + ) + { + return null; + } + + @Nonnull + @Override + protected List> getRecords( + RecordSupplier recordSupplier, + TaskToolbox toolbox + ) + { + return null; + } + + @Override + protected SeekableStreamDataSourceMetadata createDataSourceMetadata( + SeekableStreamSequenceNumbers partitions + ) + { + return null; + } + + @Override + protected OrderedSequenceNumber createSequenceNumber(String sequenceNumber) + { + return null; + } + + @Override + protected void possiblyResetDataSourceMetadata( + TaskToolbox toolbox, + RecordSupplier recordSupplier, + Set> assignment + ) + { + + } + + @Override + protected boolean isEndOffsetExclusive() + { + return false; + } + + @Override + protected TypeReference>> getSequenceMetadataTypeReference() + { + return null; + } + } + + private static class TestSeekableStreamIndexTask extends SeekableStreamIndexTask + { + + public TestSeekableStreamIndexTask( + String id, + DataSchema dataSchema, + SeekableStreamIndexTaskTuningConfig tuningConfig, + SeekableStreamIndexTaskIOConfig ioConfig + ) + { + super(id, null, dataSchema, tuningConfig, ioConfig, null, null); + } + + @Override + public String getType() + { + return null; + } + + @Override + protected SeekableStreamIndexTaskRunner createTaskRunner() + { + return null; + } + + @Override + protected RecordSupplier newTaskRecordSupplier() + { + return null; + } + } + + private static class TestSeekableStreamIndexTaskIOConfig extends SeekableStreamIndexTaskIOConfig + { + public TestSeekableStreamIndexTaskIOConfig() + { + super( + null, + "someSequence", + new SeekableStreamStartSequenceNumbers<>("abc", "def", Collections.emptyMap(), Collections.emptyMap(), null), + new SeekableStreamEndSequenceNumbers<>("abc", "def", Collections.emptyMap(), Collections.emptyMap()), + false, + DateTimes.nowUtc().minusDays(2), + DateTimes.nowUtc(), + new CsvInputFormat(null, null, true, null, 0) + ); + } + } + + /** + * Usernames used in the tests. + */ + private static class Users + { + private static final String DATASOURCE_READ = "datasourceRead"; + private static final String DATASOURCE_WRITE = "datasourceWrite"; + } + +} diff --git a/integration-tests/docker/ldap-configs/bootstrap.ldif b/integration-tests/docker/ldap-configs/bootstrap.ldif index 9614a782e01..d88265f1466 100644 --- a/integration-tests/docker/ldap-configs/bootstrap.ldif +++ b/integration-tests/docker/ldap-configs/bootstrap.ldif @@ -53,41 +53,41 @@ description: Admin users uniqueMember: uid=admin,ou=Users,dc=example,dc=org uniqueMember: uid=druid_system,ou=Users,dc=example,dc=org -dn: uid=datasourceOnlyUser,ou=Users,dc=example,dc=org -uid: datasourceOnlyUser -cn: datasourceOnlyUser -sn: datasourceOnlyUser +dn: uid=datasourceReadOnlyUser,ou=Users,dc=example,dc=org +uid: datasourceReadOnlyUser +cn: datasourceReadOnlyUser +sn: datasourceReadOnlyUser objectClass: top objectClass: posixAccount objectClass: inetOrgPerson -homeDirectory: /home/datasourceOnlyUser +homeDirectory: /home/datasourceReadOnlyUser uidNumber: 3 gidNumber: 3 userPassword: helloworld -dn: cn=datasourceOnlyGroup,ou=Groups,dc=example,dc=org +dn: cn=datasourceReadOnlyGroup,ou=Groups,dc=example,dc=org objectClass: groupOfUniqueNames -cn: datasourceOnlyGroup -description: datasourceOnlyGroup users -uniqueMember: uid=datasourceOnlyUser,ou=Users,dc=example,dc=org +cn: datasourceReadOnlyGroup +description: datasourceReadOnlyGroup users +uniqueMember: uid=datasourceReadOnlyUser,ou=Users,dc=example,dc=org -dn: uid=datasourceWithStateUser,ou=Users,dc=example,dc=org -uid: datasourceWithStateUser -cn: datasourceWithStateUser -sn: datasourceWithStateUser +dn: uid=datasourceReadWithStateUser,ou=Users,dc=example,dc=org +uid: datasourceReadWithStateUser +cn: datasourceReadWithStateUser +sn: datasourceReadWithStateUser objectClass: top objectClass: posixAccount objectClass: inetOrgPerson -homeDirectory: /home/datasourceWithStateUser +homeDirectory: /home/datasourceReadWithStateUser uidNumber: 4 gidNumber: 4 userPassword: helloworld -dn: cn=datasourceWithStateGroup,ou=Groups,dc=example,dc=org +dn: cn=datasourceReadWithStateGroup,ou=Groups,dc=example,dc=org objectClass: groupOfUniqueNames -cn: datasourceWithStateGroup -description: datasourceWithStateGroup users -uniqueMember: uid=datasourceWithStateUser,ou=Users,dc=example,dc=org +cn: datasourceReadWithStateGroup +description: datasourceReadWithStateGroup users +uniqueMember: uid=datasourceReadWithStateUser,ou=Users,dc=example,dc=org dn: uid=stateOnlyUser,ou=Users,dc=example,dc=org uid: stateOnlyUser @@ -137,20 +137,38 @@ uidNumber: 7 gidNumber: 7 userPassword: helloworld -dn: uid=datasourceAndSysUser,ou=Users,dc=example,dc=org -uid: datasourceAndSysUser -cn: datasourceAndSysUser -sn: datasourceAndSysUser +dn: uid=datasourceReadAndSysUser,ou=Users,dc=example,dc=org +uid: datasourceReadAndSysUser +cn: datasourceReadAndSysUser +sn: datasourceReadAndSysUser objectClass: top objectClass: posixAccount objectClass: inetOrgPerson -homeDirectory: /home/datasourceAndSysUser +homeDirectory: /home/datasourceReadAndSysUser uidNumber: 8 gidNumber: 8 userPassword: helloworld -dn: cn=datasourceWithSysGroup,ou=Groups,dc=example,dc=org +dn: cn=datasourceReadWithSysGroup,ou=Groups,dc=example,dc=org objectClass: groupOfUniqueNames -cn: datasourceWithSysGroup -description: datasourceWithSysGroup users -uniqueMember: uid=datasourceAndSysUser,ou=Users,dc=example,dc=org +cn: datasourceReadWithSysGroup +description: datasourceReadWithSysGroup users +uniqueMember: uid=datasourceReadAndSysUser,ou=Users,dc=example,dc=org + +dn: uid=datasourceWriteAndSysUser,ou=Users,dc=example,dc=org +uid: datasourceWriteAndSysUser +cn: datasourceWriteAndSysUser +sn: datasourceWriteAndSysUser +objectClass: top +objectClass: posixAccount +objectClass: inetOrgPerson +homeDirectory: /home/datasourceWriteAndSysUser +uidNumber: 8 +gidNumber: 8 +userPassword: helloworld + +dn: cn=datasourceWriteWithSysGroup,ou=Groups,dc=example,dc=org +objectClass: groupOfUniqueNames +cn: datasourceWriteWithSysGroup +description: datasourceWriteWithSysGroup users +uniqueMember: uid=datasourceWriteAndSysUser,ou=Users,dc=example,dc=org diff --git a/integration-tests/src/test/java/org/apache/druid/tests/security/AbstractAuthConfigurationTest.java b/integration-tests/src/test/java/org/apache/druid/tests/security/AbstractAuthConfigurationTest.java index 42556ad2531..c29be8d4bde 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/security/AbstractAuthConfigurationTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/security/AbstractAuthConfigurationTest.java @@ -98,7 +98,7 @@ public abstract class AbstractAuthConfigurationTest * create a ResourceAction set of permissions that can only read a 'auth_test' datasource, for Authorizer * implementations which use ResourceAction pattern matching */ - protected static final List DATASOURCE_ONLY_PERMISSIONS = Collections.singletonList( + protected static final List DATASOURCE_READ_ONLY_PERMISSIONS = Collections.singletonList( new ResourceAction( new Resource("auth_test", ResourceType.DATASOURCE), Action.READ @@ -109,7 +109,7 @@ public abstract class AbstractAuthConfigurationTest * create a ResourceAction set of permissions that can only read 'auth_test' + partial SYSTEM_TABLE, for Authorizer * implementations which use ResourceAction pattern matching */ - protected static final List DATASOURCE_SYS_PERMISSIONS = ImmutableList.of( + protected static final List DATASOURCE_READ_SYS_PERMISSIONS = ImmutableList.of( new ResourceAction( new Resource("auth_test", ResourceType.DATASOURCE), Action.READ @@ -134,11 +134,39 @@ public abstract class AbstractAuthConfigurationTest ) ); + /** + * create a ResourceAction set of permissions that can write datasource 'auth_test' + */ + protected static final List DATASOURCE_WRITE_SYS_PERMISSIONS = ImmutableList.of( + new ResourceAction( + new Resource("auth_test", ResourceType.DATASOURCE), + Action.WRITE + ), + new ResourceAction( + new Resource("segments", ResourceType.SYSTEM_TABLE), + Action.READ + ), + // test missing state permission but having servers permission + new ResourceAction( + new Resource("servers", ResourceType.SYSTEM_TABLE), + Action.READ + ), + // test missing state permission but having server_segments permission + new ResourceAction( + new Resource("server_segments", ResourceType.SYSTEM_TABLE), + Action.READ + ), + new ResourceAction( + new Resource("tasks", ResourceType.SYSTEM_TABLE), + Action.READ + ) + ); + /** * create a ResourceAction set of permissions that can only read 'auth_test' + STATE + SYSTEM_TABLE read access, for * Authorizer implementations which use ResourceAction pattern matching */ - protected static final List DATASOURCE_SYS_STATE_PERMISSIONS = ImmutableList.of( + protected static final List DATASOURCE_READ_SYS_STATE_PERMISSIONS = ImmutableList.of( new ResourceAction( new Resource("auth_test", ResourceType.DATASOURCE), Action.READ @@ -187,16 +215,18 @@ public abstract class AbstractAuthConfigurationTest protected CoordinatorResourceTestClient coordinatorClient; protected HttpClient adminClient; - protected HttpClient datasourceOnlyUserClient; - protected HttpClient datasourceAndSysUserClient; - protected HttpClient datasourceWithStateUserClient; + protected HttpClient datasourceReadOnlyUserClient; + protected HttpClient datasourceReadAndSysUserClient; + protected HttpClient datasourceWriteAndSysUserClient; + protected HttpClient datasourceReadWithStateUserClient; protected HttpClient stateOnlyUserClient; protected HttpClient internalSystemClient; - protected abstract void setupDatasourceOnlyUser() throws Exception; - protected abstract void setupDatasourceAndSysTableUser() throws Exception; - protected abstract void setupDatasourceAndSysAndStateUser() throws Exception; + protected abstract void setupDatasourceReadOnlyUser() throws Exception; + protected abstract void setupDatasourceReadAndSysTableUser() throws Exception; + protected abstract void setupDatasourceWriteAndSysTableUser() throws Exception; + protected abstract void setupDatasourceReadAndSysAndStateUser() throws Exception; protected abstract void setupSysTableAndStateOnlyUser() throws Exception; protected abstract void setupTestSpecificHttpClients() throws Exception; protected abstract String getAuthenticatorName(); @@ -242,44 +272,44 @@ public abstract class AbstractAuthConfigurationTest } @Test - public void test_systemSchemaAccess_datasourceOnlyUser() throws Exception + public void test_systemSchemaAccess_datasourceReadOnlyUser() throws Exception { // check that we can access a datasource-permission restricted resource on the broker HttpUtil.makeRequest( - datasourceOnlyUserClient, + datasourceReadOnlyUserClient, HttpMethod.GET, config.getBrokerUrl() + "/druid/v2/datasources/auth_test", null ); // as user that can only read auth_test - LOG.info("Checking sys.segments query as datasourceOnlyUser..."); + LOG.info("Checking sys.segments query as datasourceReadOnlyUser..."); verifySystemSchemaQueryFailure( - datasourceOnlyUserClient, + datasourceReadOnlyUserClient, SYS_SCHEMA_SEGMENTS_QUERY, HttpResponseStatus.FORBIDDEN, "{\"Access-Check-Result\":\"Allowed:false, Message:\"}" ); - LOG.info("Checking sys.servers query as datasourceOnlyUser..."); + LOG.info("Checking sys.servers query as datasourceReadOnlyUser..."); verifySystemSchemaQueryFailure( - datasourceOnlyUserClient, + datasourceReadOnlyUserClient, SYS_SCHEMA_SERVERS_QUERY, HttpResponseStatus.FORBIDDEN, "{\"Access-Check-Result\":\"Allowed:false, Message:\"}" ); - LOG.info("Checking sys.server_segments query as datasourceOnlyUser..."); + LOG.info("Checking sys.server_segments query as datasourceReadOnlyUser..."); verifySystemSchemaQueryFailure( - datasourceOnlyUserClient, + datasourceReadOnlyUserClient, SYS_SCHEMA_SERVER_SEGMENTS_QUERY, HttpResponseStatus.FORBIDDEN, "{\"Access-Check-Result\":\"Allowed:false, Message:\"}" ); - LOG.info("Checking sys.tasks query as datasourceOnlyUser..."); + LOG.info("Checking sys.tasks query as datasourceReadOnlyUser..."); verifySystemSchemaQueryFailure( - datasourceOnlyUserClient, + datasourceReadOnlyUserClient, SYS_SCHEMA_TASKS_QUERY, HttpResponseStatus.FORBIDDEN, "{\"Access-Check-Result\":\"Allowed:false, Message:\"}" @@ -287,83 +317,119 @@ public abstract class AbstractAuthConfigurationTest } @Test - public void test_systemSchemaAccess_datasourceAndSysUser() throws Exception + public void test_systemSchemaAccess_datasourceReadAndSysUser() throws Exception { // check that we can access a datasource-permission restricted resource on the broker HttpUtil.makeRequest( - datasourceAndSysUserClient, + datasourceReadAndSysUserClient, HttpMethod.GET, config.getBrokerUrl() + "/druid/v2/datasources/auth_test", null ); // as user that can only read auth_test - LOG.info("Checking sys.segments query as datasourceAndSysUser..."); + LOG.info("Checking sys.segments query as datasourceReadAndSysUser..."); verifySystemSchemaQuery( - datasourceAndSysUserClient, + datasourceReadAndSysUserClient, SYS_SCHEMA_SEGMENTS_QUERY, adminSegments.stream() .filter((segmentEntry) -> "auth_test".equals(segmentEntry.get("datasource"))) .collect(Collectors.toList()) ); - LOG.info("Checking sys.servers query as datasourceAndSysUser..."); + LOG.info("Checking sys.servers query as datasourceReadAndSysUser..."); verifySystemSchemaQueryFailure( - datasourceAndSysUserClient, + datasourceReadAndSysUserClient, SYS_SCHEMA_SERVERS_QUERY, HttpResponseStatus.FORBIDDEN, "{\"Access-Check-Result\":\"Insufficient permission to view servers : Allowed:false, Message:\"}" ); - LOG.info("Checking sys.server_segments query as datasourceAndSysUser..."); + LOG.info("Checking sys.server_segments query as datasourceReadAndSysUser..."); verifySystemSchemaQueryFailure( - datasourceAndSysUserClient, + datasourceReadAndSysUserClient, SYS_SCHEMA_SERVER_SEGMENTS_QUERY, HttpResponseStatus.FORBIDDEN, "{\"Access-Check-Result\":\"Insufficient permission to view servers : Allowed:false, Message:\"}" ); - LOG.info("Checking sys.tasks query as datasourceAndSysUser..."); + // Verify that sys.tasks result is empty as it is filtered by Datasource WRITE access + LOG.info("Checking sys.tasks query as datasourceReadAndSysUser..."); verifySystemSchemaQuery( - datasourceAndSysUserClient, + datasourceReadAndSysUserClient, + SYS_SCHEMA_TASKS_QUERY, + Collections.emptyList() + ); + } + + @Test + public void test_systemSchemaAccess_datasourceWriteAndSysUser() throws Exception + { + // Verify that sys.segments result is empty as it is filtered by Datasource READ access + LOG.info("Checking sys.segments query as datasourceWriteAndSysUser..."); + verifySystemSchemaQuery( + datasourceWriteAndSysUserClient, + SYS_SCHEMA_SEGMENTS_QUERY, + Collections.emptyList() + ); + + LOG.info("Checking sys.servers query as datasourceWriteAndSysUser..."); + verifySystemSchemaQueryFailure( + datasourceWriteAndSysUserClient, + SYS_SCHEMA_SERVERS_QUERY, + HttpResponseStatus.FORBIDDEN, + "{\"Access-Check-Result\":\"Insufficient permission to view servers : Allowed:false, Message:\"}" + ); + + LOG.info("Checking sys.server_segments query as datasourceWriteAndSysUser..."); + verifySystemSchemaQueryFailure( + datasourceWriteAndSysUserClient, + SYS_SCHEMA_SERVER_SEGMENTS_QUERY, + HttpResponseStatus.FORBIDDEN, + "{\"Access-Check-Result\":\"Insufficient permission to view servers : Allowed:false, Message:\"}" + ); + + LOG.info("Checking sys.tasks query as datasourceWriteAndSysUser..."); + verifySystemSchemaQuery( + datasourceWriteAndSysUserClient, SYS_SCHEMA_TASKS_QUERY, adminTasks.stream() - .filter((taskEntry) -> "auth_test".equals(taskEntry.get("datasource"))) + .filter((segmentEntry) -> "auth_test".equals(segmentEntry.get("datasource"))) .collect(Collectors.toList()) ); } @Test - public void test_systemSchemaAccess_datasourceAndSysWithStateUser() throws Exception + public void test_systemSchemaAccess_datasourceReadAndSysWithStateUser() throws Exception { // check that we can access a state-permission restricted resource on the broker HttpUtil.makeRequest( - datasourceWithStateUserClient, + datasourceReadWithStateUserClient, HttpMethod.GET, config.getBrokerUrl() + "/status", null ); // as user that can read auth_test and STATE - LOG.info("Checking sys.segments query as datasourceWithStateUser..."); + LOG.info("Checking sys.segments query as datasourceReadWithStateUser..."); verifySystemSchemaQuery( - datasourceWithStateUserClient, + datasourceReadWithStateUserClient, SYS_SCHEMA_SEGMENTS_QUERY, adminSegments.stream() .filter((segmentEntry) -> "auth_test".equals(segmentEntry.get("datasource"))) .collect(Collectors.toList()) ); - LOG.info("Checking sys.servers query as datasourceWithStateUser..."); + LOG.info("Checking sys.servers query as datasourceReadWithStateUser..."); verifySystemSchemaServerQuery( - datasourceWithStateUserClient, + datasourceReadWithStateUserClient, SYS_SCHEMA_SERVERS_QUERY, adminServers ); - LOG.info("Checking sys.server_segments query as datasourceWithStateUser..."); + LOG.info("Checking sys.server_segments query as datasourceReadWithStateUser..."); verifySystemSchemaQuery( - datasourceWithStateUserClient, + datasourceReadWithStateUserClient, SYS_SCHEMA_SERVER_SEGMENTS_QUERY, adminServerSegments.stream() .filter((serverSegmentEntry) -> ((String) serverSegmentEntry.get("segment_id")).contains( @@ -371,13 +437,12 @@ public abstract class AbstractAuthConfigurationTest .collect(Collectors.toList()) ); - LOG.info("Checking sys.tasks query as datasourceWithStateUser..."); + // Verify that sys.tasks result is empty as it is filtered by Datasource WRITE access + LOG.info("Checking sys.tasks query as datasourceReadWithStateUser..."); verifySystemSchemaQuery( - datasourceWithStateUserClient, + datasourceReadWithStateUserClient, SYS_SCHEMA_TASKS_QUERY, - adminTasks.stream() - .filter((taskEntry) -> "auth_test".equals(taskEntry.get("datasource"))) - .collect(Collectors.toList()) + Collections.emptyList() ); } @@ -500,9 +565,10 @@ public abstract class AbstractAuthConfigurationTest protected void setupHttpClientsAndUsers() throws Exception { setupHttpClients(); - setupDatasourceOnlyUser(); - setupDatasourceAndSysTableUser(); - setupDatasourceAndSysAndStateUser(); + setupDatasourceReadOnlyUser(); + setupDatasourceReadAndSysTableUser(); + setupDatasourceWriteAndSysTableUser(); + setupDatasourceReadAndSysAndStateUser(); setupSysTableAndStateOnlyUser(); } @@ -766,18 +832,23 @@ public abstract class AbstractAuthConfigurationTest httpClient ); - datasourceOnlyUserClient = new CredentialedHttpClient( - new BasicCredentials("datasourceOnlyUser", "helloworld"), + datasourceReadOnlyUserClient = new CredentialedHttpClient( + new BasicCredentials("datasourceReadOnlyUser", "helloworld"), httpClient ); - datasourceAndSysUserClient = new CredentialedHttpClient( - new BasicCredentials("datasourceAndSysUser", "helloworld"), + datasourceReadAndSysUserClient = new CredentialedHttpClient( + new BasicCredentials("datasourceReadAndSysUser", "helloworld"), httpClient ); - datasourceWithStateUserClient = new CredentialedHttpClient( - new BasicCredentials("datasourceWithStateUser", "helloworld"), + datasourceWriteAndSysUserClient = new CredentialedHttpClient( + new BasicCredentials("datasourceWriteAndSysUser", "helloworld"), + httpClient + ); + + datasourceReadWithStateUserClient = new CredentialedHttpClient( + new BasicCredentials("datasourceReadWithStateUser", "helloworld"), httpClient ); diff --git a/integration-tests/src/test/java/org/apache/druid/tests/security/ITBasicAuthConfigurationTest.java b/integration-tests/src/test/java/org/apache/druid/tests/security/ITBasicAuthConfigurationTest.java index 690757f7e31..c87b750d5f1 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/security/ITBasicAuthConfigurationTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/security/ITBasicAuthConfigurationTest.java @@ -70,38 +70,50 @@ public class ITBasicAuthConfigurationTest extends AbstractAuthConfigurationTest } @Override - protected void setupDatasourceOnlyUser() throws Exception + protected void setupDatasourceReadOnlyUser() throws Exception { createUserAndRoleWithPermissions( adminClient, - "datasourceOnlyUser", + "datasourceReadOnlyUser", "helloworld", - "datasourceOnlyRole", - DATASOURCE_ONLY_PERMISSIONS + "datasourceReadOnlyRole", + DATASOURCE_READ_ONLY_PERMISSIONS ); } @Override - protected void setupDatasourceAndSysTableUser() throws Exception + protected void setupDatasourceReadAndSysTableUser() throws Exception { createUserAndRoleWithPermissions( adminClient, - "datasourceAndSysUser", + "datasourceReadAndSysUser", "helloworld", - "datasourceAndSysRole", - DATASOURCE_SYS_PERMISSIONS + "datasourceReadAndSysRole", + DATASOURCE_READ_SYS_PERMISSIONS ); } @Override - protected void setupDatasourceAndSysAndStateUser() throws Exception + protected void setupDatasourceWriteAndSysTableUser() throws Exception { createUserAndRoleWithPermissions( adminClient, - "datasourceWithStateUser", + "datasourceWriteAndSysUser", "helloworld", - "datasourceWithStateRole", - DATASOURCE_SYS_STATE_PERMISSIONS + "datasourceWriteAndSysRole", + DATASOURCE_WRITE_SYS_PERMISSIONS + ); + } + + @Override + protected void setupDatasourceReadAndSysAndStateUser() throws Exception + { + createUserAndRoleWithPermissions( + adminClient, + "datasourceReadWithStateUser", + "helloworld", + "datasourceReadWithStateRole", + DATASOURCE_READ_SYS_STATE_PERMISSIONS ); } diff --git a/integration-tests/src/test/java/org/apache/druid/tests/security/ITBasicAuthLdapConfigurationTest.java b/integration-tests/src/test/java/org/apache/druid/tests/security/ITBasicAuthLdapConfigurationTest.java index 97a7c531756..3469f56ed03 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/security/ITBasicAuthLdapConfigurationTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/security/ITBasicAuthLdapConfigurationTest.java @@ -120,29 +120,38 @@ public class ITBasicAuthLdapConfigurationTest extends AbstractAuthConfigurationT @Override - protected void setupDatasourceOnlyUser() throws Exception + protected void setupDatasourceReadOnlyUser() throws Exception { createRoleWithPermissionsAndGroupMapping( - "datasourceOnlyGroup", - ImmutableMap.of("datasourceOnlyRole", DATASOURCE_ONLY_PERMISSIONS) + "datasourceReadOnlyGroup", + ImmutableMap.of("datasourceReadOnlyRole", DATASOURCE_READ_ONLY_PERMISSIONS) ); } @Override - protected void setupDatasourceAndSysTableUser() throws Exception + protected void setupDatasourceReadAndSysTableUser() throws Exception { createRoleWithPermissionsAndGroupMapping( - "datasourceWithSysGroup", - ImmutableMap.of("datasourceWithSysRole", DATASOURCE_SYS_PERMISSIONS) + "datasourceReadWithSysGroup", + ImmutableMap.of("datasourceReadWithSysRole", DATASOURCE_READ_SYS_PERMISSIONS) ); } @Override - protected void setupDatasourceAndSysAndStateUser() throws Exception + protected void setupDatasourceWriteAndSysTableUser() throws Exception { createRoleWithPermissionsAndGroupMapping( - "datasourceWithStateGroup", - ImmutableMap.of("datasourceWithStateRole", DATASOURCE_SYS_STATE_PERMISSIONS) + "datasourceWriteWithSysGroup", + ImmutableMap.of("datasourceWriteWithSysRole", DATASOURCE_WRITE_SYS_PERMISSIONS) + ); + } + + @Override + protected void setupDatasourceReadAndSysAndStateUser() throws Exception + { + createRoleWithPermissionsAndGroupMapping( + "datasourceReadWithStateGroup", + ImmutableMap.of("datasourceReadWithStateRole", DATASOURCE_READ_SYS_STATE_PERMISSIONS) ); } diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java index 0562e333314..aa2bdce4c8a 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java @@ -872,7 +872,7 @@ public class SystemSchema extends AbstractSchema ); Function> raGenerator = task -> Collections.singletonList( - AuthorizationUtils.DATASOURCE_READ_RA_GENERATOR.apply(task.getDataSource())); + AuthorizationUtils.DATASOURCE_WRITE_RA_GENERATOR.apply(task.getDataSource())); final Iterable authorizedTasks = AuthorizationUtils.filterAuthorizedResources( authenticationResult, @@ -1014,7 +1014,7 @@ public class SystemSchema extends AbstractSchema ); Function> raGenerator = supervisor -> Collections.singletonList( - AuthorizationUtils.DATASOURCE_READ_RA_GENERATOR.apply(supervisor.getSource())); + AuthorizationUtils.DATASOURCE_WRITE_RA_GENERATOR.apply(supervisor.getSource())); final Iterable authorizedSupervisors = AuthorizationUtils.filterAuthorizedResources( authenticationResult, diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java index d25a9026f15..b299a3a8eee 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java @@ -81,9 +81,12 @@ import org.apache.druid.server.coordination.DruidServerMetadata; import org.apache.druid.server.coordination.ServerType; import org.apache.druid.server.coordinator.BytesAccumulatingResponseHandler; import org.apache.druid.server.security.Access; +import org.apache.druid.server.security.Action; +import org.apache.druid.server.security.AuthenticationResult; import org.apache.druid.server.security.Authorizer; import org.apache.druid.server.security.AuthorizerMapper; import org.apache.druid.server.security.NoopEscalator; +import org.apache.druid.server.security.ResourceType; import org.apache.druid.sql.calcite.planner.PlannerConfig; import org.apache.druid.sql.calcite.schema.SystemSchema.SegmentsTable; import org.apache.druid.sql.calcite.table.RowSignatures; @@ -196,14 +199,7 @@ public class SystemSchemaTest extends CalciteTestBase .addMockedMethod("getStatus") .createMock(); request = EasyMock.createMock(Request.class); - authMapper = new AuthorizerMapper(null) - { - @Override - public Authorizer getAuthorizer(String name) - { - return (authenticationResult, resource, action) -> new Access(true); - } - }; + authMapper = createAuthMapper(); final File tmpDir = temporaryFolder.newFolder(); final QueryableIndex index1 = IndexBuilder.create() @@ -554,33 +550,7 @@ public class SystemSchemaTest extends CalciteTestBase EasyMock.expect(metadataView.getPublishedSegments()).andReturn(publishedSegments.iterator()).once(); EasyMock.replay(client, request, responseHolder, responseHandler, metadataView); - DataContext dataContext = new DataContext() - { - @Override - public SchemaPlus getRootSchema() - { - return null; - } - - @Override - public JavaTypeFactory getTypeFactory() - { - return null; - } - - @Override - public QueryProvider getQueryProvider() - { - return null; - } - - @Override - public Object get(String name) - { - return CalciteTests.SUPER_USER_AUTH_RESULT; - } - }; - + DataContext dataContext = createDataContext(Users.SUPER); final List rows = segmentsTable.scan(dataContext).toList(); rows.sort((Object[] row1, Object[] row2) -> ((Comparable) row1[0]).compareTo(row2[0])); @@ -831,33 +801,7 @@ public class SystemSchemaTest extends CalciteTestBase indexerNodeDiscovery ); - DataContext dataContext = new DataContext() - { - @Override - public SchemaPlus getRootSchema() - { - return null; - } - - @Override - public JavaTypeFactory getTypeFactory() - { - return null; - } - - @Override - public QueryProvider getQueryProvider() - { - return null; - } - - @Override - public Object get(String name) - { - return CalciteTests.SUPER_USER_AUTH_RESULT; - } - }; - + DataContext dataContext = createDataContext(Users.SUPER); final List rows = serversTable.scan(dataContext).toList(); rows.sort((Object[] row1, Object[] row2) -> ((Comparable) row1[0]).compareTo(row2[0])); @@ -1112,32 +1056,7 @@ public class SystemSchemaTest extends CalciteTestBase .andReturn(immutableDruidServers) .once(); EasyMock.replay(serverView); - DataContext dataContext = new DataContext() - { - @Override - public SchemaPlus getRootSchema() - { - return null; - } - - @Override - public JavaTypeFactory getTypeFactory() - { - return null; - } - - @Override - public QueryProvider getQueryProvider() - { - return null; - } - - @Override - public Object get(String name) - { - return CalciteTests.SUPER_USER_AUTH_RESULT; - } - }; + DataContext dataContext = createDataContext(Users.SUPER); //server_segments table is the join of servers and segments table // it will have 5 rows as follows @@ -1230,32 +1149,7 @@ public class SystemSchemaTest extends CalciteTestBase responseHolder.done(); EasyMock.replay(client, request, responseHandler); - DataContext dataContext = new DataContext() - { - @Override - public SchemaPlus getRootSchema() - { - return null; - } - - @Override - public JavaTypeFactory getTypeFactory() - { - return null; - } - - @Override - public QueryProvider getQueryProvider() - { - return null; - } - - @Override - public Object get(String name) - { - return CalciteTests.SUPER_USER_AUTH_RESULT; - } - }; + DataContext dataContext = createDataContext(Users.SUPER); final List rows = tasksTable.scan(dataContext).toList(); Object[] row0 = rows.get(0); @@ -1294,6 +1188,81 @@ public class SystemSchemaTest extends CalciteTestBase verifyTypes(rows, SystemSchema.TASKS_SIGNATURE); } + @Test + public void testTasksTableAuth() throws Exception + { + SystemSchema.TasksTable tasksTable = new SystemSchema.TasksTable(client, mapper, authMapper); + + EasyMock.expect(client.makeRequest(HttpMethod.GET, "/druid/indexer/v1/tasks")) + .andReturn(request) + .anyTimes(); + + String json = "[{\n" + + "\t\"id\": \"index_wikipedia_2018-09-20T22:33:44.911Z\",\n" + + "\t\"groupId\": \"group_index_wikipedia_2018-09-20T22:33:44.911Z\",\n" + + "\t\"type\": \"index\",\n" + + "\t\"createdTime\": \"2018-09-20T22:33:44.922Z\",\n" + + "\t\"queueInsertionTime\": \"1970-01-01T00:00:00.000Z\",\n" + + "\t\"statusCode\": \"FAILED\",\n" + + "\t\"runnerStatusCode\": \"NONE\",\n" + + "\t\"duration\": -1,\n" + + "\t\"location\": {\n" + + "\t\t\"host\": \"testHost\",\n" + + "\t\t\"port\": 1234,\n" + + "\t\t\"tlsPort\": -1\n" + + "\t},\n" + + "\t\"dataSource\": \"wikipedia\",\n" + + "\t\"errorMsg\": null\n" + + "}, {\n" + + "\t\"id\": \"index_wikipedia_2018-09-21T18:38:47.773Z\",\n" + + "\t\"groupId\": \"group_index_wikipedia_2018-09-21T18:38:47.773Z\",\n" + + "\t\"type\": \"index\",\n" + + "\t\"createdTime\": \"2018-09-21T18:38:47.873Z\",\n" + + "\t\"queueInsertionTime\": \"2018-09-21T18:38:47.910Z\",\n" + + "\t\"statusCode\": \"RUNNING\",\n" + + "\t\"runnerStatusCode\": \"RUNNING\",\n" + + "\t\"duration\": null,\n" + + "\t\"location\": {\n" + + "\t\t\"host\": \"192.168.1.6\",\n" + + "\t\t\"port\": 8100,\n" + + "\t\t\"tlsPort\": -1\n" + + "\t},\n" + + "\t\"dataSource\": \"wikipedia\",\n" + + "\t\"errorMsg\": null\n" + + "}]"; + + HttpResponse httpResp = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK); + + EasyMock.expect(client.go(EasyMock.eq(request), EasyMock.anyObject())) + .andReturn(createFullResponseHolder(httpResp, json)) + .andReturn(createFullResponseHolder(httpResp, json)) + .andReturn(createFullResponseHolder(httpResp, json)); + + EasyMock.expect(request.getUrl()) + .andReturn(new URL("http://test-host:1234/druid/indexer/v1/tasks")) + .anyTimes(); + + EasyMock.replay(client, request, responseHandler); + + // Verify that no row is returned for Datasource Read user + List rows = tasksTable + .scan(createDataContext(Users.DATASOURCE_READ)) + .toList(); + Assert.assertTrue(rows.isEmpty()); + + // Verify that 2 rows are is returned for Datasource Write user + rows = tasksTable + .scan(createDataContext(Users.DATASOURCE_WRITE)) + .toList(); + Assert.assertEquals(2, rows.size()); + + // Verify that 2 rows are returned for Super user + rows = tasksTable + .scan(createDataContext(Users.SUPER)) + .toList(); + Assert.assertEquals(2, rows.size()); + } + @Test public void testSupervisorTable() throws Exception { @@ -1337,7 +1306,111 @@ public class SystemSchemaTest extends CalciteTestBase responseHolder.done(); EasyMock.replay(client, request, responseHandler); - DataContext dataContext = new DataContext() + DataContext dataContext = createDataContext(Users.SUPER); + final List rows = supervisorTable.scan(dataContext).toList(); + + Object[] row0 = rows.get(0); + Assert.assertEquals("wikipedia", row0[0].toString()); + Assert.assertEquals("UNHEALTHY_SUPERVISOR", row0[1].toString()); + Assert.assertEquals("UNABLE_TO_CONNECT_TO_STREAM", row0[2].toString()); + Assert.assertEquals(0L, row0[3]); + Assert.assertEquals("kafka", row0[4].toString()); + Assert.assertEquals("wikipedia", row0[5].toString()); + Assert.assertEquals(0L, row0[6]); + Assert.assertEquals( + "{\"type\":\"kafka\",\"dataSchema\":{\"dataSource\":\"wikipedia\"},\"context\":null,\"suspended\":false}", + row0[7].toString() + ); + + // Verify value types. + verifyTypes(rows, SystemSchema.SUPERVISOR_SIGNATURE); + } + + @Test + public void testSupervisorTableAuth() throws Exception + { + SystemSchema.SupervisorsTable supervisorTable = + new SystemSchema.SupervisorsTable(client, mapper, createAuthMapper()); + + EasyMock.expect(client.makeRequest(HttpMethod.GET, "/druid/indexer/v1/supervisor?system")) + .andReturn(request) + .anyTimes(); + + final String json = "[{\n" + + "\t\"id\": \"wikipedia\",\n" + + "\t\"state\": \"UNHEALTHY_SUPERVISOR\",\n" + + "\t\"detailedState\": \"UNABLE_TO_CONNECT_TO_STREAM\",\n" + + "\t\"healthy\": false,\n" + + "\t\"specString\": \"{\\\"type\\\":\\\"kafka\\\",\\\"dataSchema\\\":{\\\"dataSource\\\":\\\"wikipedia\\\"}" + + ",\\\"context\\\":null,\\\"suspended\\\":false}\",\n" + + "\t\"type\": \"kafka\",\n" + + "\t\"source\": \"wikipedia\",\n" + + "\t\"suspended\": false\n" + + "}]"; + + HttpResponse httpResponse = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK); + EasyMock.expect(client.go(EasyMock.eq(request), EasyMock.anyObject())) + .andReturn(createFullResponseHolder(httpResponse, json)) + .andReturn(createFullResponseHolder(httpResponse, json)) + .andReturn(createFullResponseHolder(httpResponse, json)); + + EasyMock.expect(responseHandler.getStatus()) + .andReturn(httpResponse.getStatus().getCode()) + .anyTimes(); + EasyMock.expect(request.getUrl()) + .andReturn(new URL("http://test-host:1234/druid/indexer/v1/supervisor?system")) + .anyTimes(); + + EasyMock.replay(client, request, responseHandler); + + // Verify that no row is returned for Datasource Read user + List rows = supervisorTable + .scan(createDataContext(Users.DATASOURCE_READ)) + .toList(); + Assert.assertTrue(rows.isEmpty()); + + // Verify that 1 row is returned for Datasource Write user + rows = supervisorTable + .scan(createDataContext(Users.DATASOURCE_WRITE)) + .toList(); + Assert.assertEquals(1, rows.size()); + + // Verify that 1 row is returned for Super user + rows = supervisorTable + .scan(createDataContext(Users.SUPER)) + .toList(); + Assert.assertEquals(1, rows.size()); + + // TODO: If needed, verify the first row here + + // TODO: Verify value types. + // verifyTypes(rows, SystemSchema.SUPERVISOR_SIGNATURE); + } + + /** + * Creates a response holder that contains the given json. + */ + private InputStreamFullResponseHolder createFullResponseHolder( + HttpResponse httpResponse, + String json + ) + { + InputStreamFullResponseHolder responseHolder = + new InputStreamFullResponseHolder(httpResponse.getStatus(), httpResponse); + + byte[] bytesToWrite = json.getBytes(StandardCharsets.UTF_8); + responseHolder.addChunk(bytesToWrite); + responseHolder.done(); + + return responseHolder; + } + + /** + * Creates a DataContext for the given username. + */ + private DataContext createDataContext(String username) + { + return new DataContext() { @Override public SchemaPlus getRootSchema() @@ -1358,28 +1431,40 @@ public class SystemSchemaTest extends CalciteTestBase } @Override - public Object get(String name) + public Object get(String authorizerName) { - return CalciteTests.SUPER_USER_AUTH_RESULT; + return CalciteTests.TEST_SUPERUSER_NAME.equals(username) + ? CalciteTests.SUPER_USER_AUTH_RESULT + : new AuthenticationResult(username, authorizerName, null, null); } }; - final List rows = supervisorTable.scan(dataContext).toList(); + } - Object[] row0 = rows.get(0); - Assert.assertEquals("wikipedia", row0[0].toString()); - Assert.assertEquals("UNHEALTHY_SUPERVISOR", row0[1].toString()); - Assert.assertEquals("UNABLE_TO_CONNECT_TO_STREAM", row0[2].toString()); - Assert.assertEquals(0L, row0[3]); - Assert.assertEquals("kafka", row0[4].toString()); - Assert.assertEquals("wikipedia", row0[5].toString()); - Assert.assertEquals(0L, row0[6]); - Assert.assertEquals( - "{\"type\":\"kafka\",\"dataSchema\":{\"dataSource\":\"wikipedia\"},\"context\":null,\"suspended\":false}", - row0[7].toString() - ); + private AuthorizerMapper createAuthMapper() + { + return new AuthorizerMapper(null) + { + @Override + public Authorizer getAuthorizer(String name) + { + return (authenticationResult, resource, action) -> { + final String username = authenticationResult.getIdentity(); - // Verify value types. - verifyTypes(rows, SystemSchema.SUPERVISOR_SIGNATURE); + // Allow access to a Datasource if + // - any user requests Read access + // - Super User or Datasource Write User requests Write access + if (resource.getType().equals(ResourceType.DATASOURCE)) { + return new Access( + action == Action.READ + || username.equals(Users.SUPER) + || username.equals(Users.DATASOURCE_WRITE) + ); + } + + return new Access(true); + }; + } + }; } private static void verifyTypes(final List rows, final RowSignature signature) @@ -1443,4 +1528,14 @@ public class SystemSchemaTest extends CalciteTestBase } } } + + /** + * Usernames to be used in tests. + */ + private static class Users + { + private static final String SUPER = CalciteTests.TEST_SUPERUSER_NAME; + private static final String DATASOURCE_READ = "datasourceRead"; + private static final String DATASOURCE_WRITE = "datasourceWrite"; + } }