Require Datasource WRITE authorization for Supervisor and Task access (#11718)

Follow up PR for #11680

Description
Supervisor and Task APIs are related to ingestion and must always require Datasource WRITE
authorization even if they are purely informative.

Changes
Check Datasource WRITE in SystemSchema for tables "supervisors" and "tasks"
Check Datasource WRITE for APIs /supervisor/history and /supervisor/{id}/history
Check Datasource for all Indexing Task APIs
This commit is contained in:
Kashif Faraz 2021-10-08 10:39:48 +05:30 committed by GitHub
parent 45d0ecbefb
commit f2d6100124
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
22 changed files with 1168 additions and 310 deletions

View File

@ -139,7 +139,8 @@ Queries on the [system schema tables](../querying/sql.md#system-schema) require
- `segments`: Segments will be filtered based on DATASOURCE READ permissions.
- `servers`: The user requires STATE READ permissions.
- `server_segments`: The user requires STATE READ permissions and segments will be filtered based on DATASOURCE READ permissions.
- `tasks`: Tasks will be filtered based on DATASOURCE READ permissions.
- `tasks`: Druid filters tasks according to DATASOURCE WRITE permissions.
- `supervisors`: Druid filters supervisors according to DATASOURCE WRITE permissions.
## Configuration Propagation

View File

@ -30,9 +30,12 @@ import org.apache.druid.indexing.common.actions.LockListAction;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.server.security.ForbiddenException;
import org.joda.time.Interval;
import javax.annotation.Nullable;
import javax.servlet.http.HttpServletRequest;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
@ -164,6 +167,19 @@ public abstract class AbstractTask implements Task
return TaskStatus.success(getId());
}
/**
* Authorizes WRITE action on a task's datasource
*
* @throws ForbiddenException if not authorized
*/
public void authorizeRequestForDatasourceWrite(
HttpServletRequest request,
AuthorizerMapper authorizerMapper
) throws ForbiddenException
{
IndexTaskUtils.authorizeRequestForDatasourceWrite(request, dataSource, authorizerMapper);
}
@Override
public boolean equals(Object o)
{

View File

@ -84,7 +84,6 @@ import org.apache.druid.segment.realtime.firehose.ClippedFirehoseFactory;
import org.apache.druid.segment.realtime.firehose.EventReceiverFirehoseFactory;
import org.apache.druid.segment.realtime.firehose.TimedShutoffFirehoseFactory;
import org.apache.druid.segment.realtime.plumber.Committers;
import org.apache.druid.server.security.Action;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.timeline.partition.NumberedPartialShardSpec;
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
@ -530,7 +529,7 @@ public class AppenderatorDriverRealtimeIndexTask extends AbstractTask implements
@Context final HttpServletRequest req
)
{
IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, getDataSource(), authorizerMapper);
authorizeRequestForDatasourceWrite(req, authorizerMapper);
Map<String, Object> returnMap = new HashMap<>();
Map<String, Object> totalsMap = new HashMap<>();
Map<String, Object> averagesMap = new HashMap<>();
@ -556,7 +555,7 @@ public class AppenderatorDriverRealtimeIndexTask extends AbstractTask implements
@Context final HttpServletRequest req
)
{
IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, getDataSource(), authorizerMapper);
authorizeRequestForDatasourceWrite(req, authorizerMapper);
List<String> events = IndexTaskUtils.getMessagesFromSavedParseExceptions(
parseExceptionHandler.getSavedParseExceptions()
);

View File

@ -63,7 +63,6 @@ import org.apache.druid.segment.indexing.granularity.ArbitraryGranularitySpec;
import org.apache.druid.segment.indexing.granularity.GranularitySpec;
import org.apache.druid.segment.realtime.firehose.ChatHandler;
import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
import org.apache.druid.server.security.Action;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.timeline.DataSegment;
import org.apache.hadoop.mapred.JobClient;
@ -634,7 +633,7 @@ public class HadoopIndexTask extends HadoopTask implements ChatHandler
@QueryParam("windows") List<Integer> windows
)
{
IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, getDataSource(), authorizerMapper);
authorizeRequestForDatasourceWrite(req, authorizerMapper);
Map<String, Object> returnMap = new HashMap<>();
Map<String, Object> totalsMap = new HashMap<>();

View File

@ -95,7 +95,6 @@ import org.apache.druid.segment.realtime.appenderator.SegmentsAndCommitMetadata;
import org.apache.druid.segment.realtime.appenderator.TransactionalSegmentPublisher;
import org.apache.druid.segment.realtime.firehose.ChatHandler;
import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
import org.apache.druid.server.security.Action;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec;
@ -285,7 +284,7 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
@QueryParam("full") String full
)
{
IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, getDataSource(), authorizerMapper);
authorizeRequestForDatasourceWrite(req, authorizerMapper);
return Response.ok(doGetUnparseableEvents(full)).build();
}
@ -394,7 +393,7 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
@QueryParam("full") String full
)
{
IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, getDataSource(), authorizerMapper);
authorizeRequestForDatasourceWrite(req, authorizerMapper);
return Response.ok(doGetRowStats(full)).build();
}
@ -406,7 +405,7 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
@QueryParam("full") String full
)
{
IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, getDataSource(), authorizerMapper);
authorizeRequestForDatasourceWrite(req, authorizerMapper);
Map<String, Object> returnMap = new HashMap<>();
Map<String, Object> ingestionStatsAndErrors = new HashMap<>();
Map<String, Object> payload = new HashMap<>();

View File

@ -56,28 +56,25 @@ public class IndexTaskUtils
}
/**
* Authorizes action to be performed on a task's datasource
* Authorizes WRITE action on a task's datasource
*
* @return authorization result
* @throws ForbiddenException if not authorized
*/
public static Access datasourceAuthorizationCheck(
public static void authorizeRequestForDatasourceWrite(
final HttpServletRequest req,
Action action,
String datasource,
AuthorizerMapper authorizerMapper
)
) throws ForbiddenException
{
ResourceAction resourceAction = new ResourceAction(
new Resource(datasource, ResourceType.DATASOURCE),
action
Action.WRITE
);
Access access = AuthorizationUtils.authorizeResourceAction(req, resourceAction, authorizerMapper);
if (!access.isAllowed()) {
throw new ForbiddenException(access.toString());
}
return access;
}
public static void setTaskDimensions(final ServiceMetricEvent.Builder metricBuilder, final Task task)

View File

@ -53,7 +53,6 @@ import org.apache.druid.indexing.common.task.CurrentSubTaskHolder;
import org.apache.druid.indexing.common.task.IndexTask;
import org.apache.druid.indexing.common.task.IndexTask.IndexIngestionSpec;
import org.apache.druid.indexing.common.task.IndexTask.IndexTuningConfig;
import org.apache.druid.indexing.common.task.IndexTaskUtils;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.indexing.common.task.Tasks;
@ -75,8 +74,6 @@ import org.apache.druid.segment.indexing.granularity.GranularitySpec;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.apache.druid.segment.realtime.appenderator.TransactionalSegmentPublisher;
import org.apache.druid.segment.realtime.firehose.ChatHandler;
import org.apache.druid.segment.realtime.firehose.ChatHandlers;
import org.apache.druid.server.security.Action;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.BuildingShardSpec;
@ -1176,7 +1173,7 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
@Context final HttpServletRequest req
)
{
ChatHandlers.authorizationCheck(req, Action.READ, getDataSource(), authorizerMapper);
authorizeRequestForDatasourceWrite(req, authorizerMapper);
if (toolbox == null) {
return Response.status(Response.Status.SERVICE_UNAVAILABLE).entity("task is not running yet").build();
@ -1255,12 +1252,7 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
@Context final HttpServletRequest req
)
{
ChatHandlers.authorizationCheck(
req,
Action.WRITE,
getDataSource(),
authorizerMapper
);
authorizeRequestForDatasourceWrite(req, authorizerMapper);
if (currentSubTaskHolder == null || currentSubTaskHolder.getTask() == null) {
return Response.status(Response.Status.SERVICE_UNAVAILABLE).entity("task is not running yet").build();
} else {
@ -1278,7 +1270,7 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
@Produces(MediaType.APPLICATION_JSON)
public Response getMode(@Context final HttpServletRequest req)
{
IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, getDataSource(), authorizerMapper);
authorizeRequestForDatasourceWrite(req, authorizerMapper);
return Response.ok(isParallelMode() ? "parallel" : "sequential").build();
}
@ -1287,7 +1279,7 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
@Produces(MediaType.APPLICATION_JSON)
public Response getPhaseName(@Context final HttpServletRequest req)
{
IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, getDataSource(), authorizerMapper);
authorizeRequestForDatasourceWrite(req, authorizerMapper);
if (isParallelMode()) {
final ParallelIndexTaskRunner runner = getCurrentRunner();
if (runner == null) {
@ -1305,7 +1297,7 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
@Produces(MediaType.APPLICATION_JSON)
public Response getProgress(@Context final HttpServletRequest req)
{
IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, getDataSource(), authorizerMapper);
authorizeRequestForDatasourceWrite(req, authorizerMapper);
final ParallelIndexTaskRunner currentRunner = getCurrentRunner();
if (currentRunner == null) {
return Response.status(Response.Status.SERVICE_UNAVAILABLE).entity("task is not running yet").build();
@ -1319,7 +1311,7 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
@Produces(MediaType.APPLICATION_JSON)
public Response getRunningTasks(@Context final HttpServletRequest req)
{
IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, getDataSource(), authorizerMapper);
authorizeRequestForDatasourceWrite(req, authorizerMapper);
final ParallelIndexTaskRunner currentRunner = getCurrentRunner();
if (currentRunner == null) {
return Response.status(Response.Status.SERVICE_UNAVAILABLE).entity("task is not running yet").build();
@ -1333,7 +1325,7 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
@Produces(MediaType.APPLICATION_JSON)
public Response getSubTaskSpecs(@Context final HttpServletRequest req)
{
IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, getDataSource(), authorizerMapper);
authorizeRequestForDatasourceWrite(req, authorizerMapper);
final ParallelIndexTaskRunner currentRunner = getCurrentRunner();
if (currentRunner == null) {
return Response.status(Response.Status.SERVICE_UNAVAILABLE).entity("task is not running yet").build();
@ -1347,7 +1339,7 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
@Produces(MediaType.APPLICATION_JSON)
public Response getRunningSubTaskSpecs(@Context final HttpServletRequest req)
{
IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, getDataSource(), authorizerMapper);
authorizeRequestForDatasourceWrite(req, authorizerMapper);
final ParallelIndexTaskRunner currentRunner = getCurrentRunner();
if (currentRunner == null) {
return Response.status(Response.Status.SERVICE_UNAVAILABLE).entity("task is not running yet").build();
@ -1361,7 +1353,7 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
@Produces(MediaType.APPLICATION_JSON)
public Response getCompleteSubTaskSpecs(@Context final HttpServletRequest req)
{
IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, getDataSource(), authorizerMapper);
authorizeRequestForDatasourceWrite(req, authorizerMapper);
final ParallelIndexTaskRunner currentRunner = getCurrentRunner();
if (currentRunner == null) {
return Response.status(Response.Status.SERVICE_UNAVAILABLE).entity("task is not running yet").build();
@ -1375,7 +1367,7 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
@Produces(MediaType.APPLICATION_JSON)
public Response getSubTaskSpec(@PathParam("id") String id, @Context final HttpServletRequest req)
{
IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, getDataSource(), authorizerMapper);
authorizeRequestForDatasourceWrite(req, authorizerMapper);
final ParallelIndexTaskRunner currentRunner = getCurrentRunner();
if (currentRunner == null) {
@ -1395,7 +1387,7 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
@Produces(MediaType.APPLICATION_JSON)
public Response getSubTaskState(@PathParam("id") String id, @Context final HttpServletRequest req)
{
IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, getDataSource(), authorizerMapper);
authorizeRequestForDatasourceWrite(req, authorizerMapper);
final ParallelIndexTaskRunner currentRunner = getCurrentRunner();
if (currentRunner == null) {
return Response.status(Response.Status.SERVICE_UNAVAILABLE).entity("task is not running yet").build();
@ -1417,7 +1409,7 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
@Context final HttpServletRequest req
)
{
IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, getDataSource(), authorizerMapper);
authorizeRequestForDatasourceWrite(req, authorizerMapper);
final ParallelIndexTaskRunner currentRunner = getCurrentRunner();
if (currentRunner == null) {
return Response.status(Response.Status.SERVICE_UNAVAILABLE).entity("task is not running yet").build();
@ -1569,7 +1561,7 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
@QueryParam("full") String full
)
{
IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, getDataSource(), authorizerMapper);
authorizeRequestForDatasourceWrite(req, authorizerMapper);
return Response.ok(doGetRowStatsAndUnparseableEvents(full, false).lhs).build();
}
@ -1614,8 +1606,8 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
@QueryParam("full") String full
)
{
IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, getDataSource(), authorizerMapper);
authorizeRequestForDatasourceWrite(req, authorizerMapper);
return Response.ok(doGetLiveReports(full)).build();
}
}

View File

@ -67,7 +67,6 @@ import org.apache.druid.segment.realtime.appenderator.BaseAppenderatorDriver;
import org.apache.druid.segment.realtime.appenderator.BatchAppenderatorDriver;
import org.apache.druid.segment.realtime.appenderator.SegmentsAndCommitMetadata;
import org.apache.druid.segment.realtime.firehose.ChatHandler;
import org.apache.druid.server.security.Action;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.TimelineObjectHolder;
@ -488,7 +487,7 @@ public class SinglePhaseSubTask extends AbstractBatchSubtask implements ChatHand
@QueryParam("full") String full
)
{
IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, getDataSource(), authorizerMapper);
authorizeRequestForDatasourceWrite(req, authorizerMapper);
Map<String, List<String>> events = new HashMap<>();
boolean needsBuildSegments = false;
@ -563,7 +562,7 @@ public class SinglePhaseSubTask extends AbstractBatchSubtask implements ChatHand
@QueryParam("full") String full
)
{
IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, getDataSource(), authorizerMapper);
authorizeRequestForDatasourceWrite(req, authorizerMapper);
return Response.ok(doGetRowStats(full)).build();
}
@ -595,7 +594,7 @@ public class SinglePhaseSubTask extends AbstractBatchSubtask implements ChatHand
@QueryParam("full") String full
)
{
IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, getDataSource(), authorizerMapper);
authorizeRequestForDatasourceWrite(req, authorizerMapper);
return Response.ok(doGetLiveReports(full)).build();
}

View File

@ -575,11 +575,11 @@ public class OverlordResource
}
}
// early authorization check if datasource != null
// fail fast if user not authorized to access datasource
// fail fast if user not authorized to write to datasource
if (dataSource != null) {
final ResourceAction resourceAction = new ResourceAction(
new Resource(dataSource, ResourceType.DATASOURCE),
Action.READ
Action.WRITE
);
final Access authResult = AuthorizationUtils.authorizeResourceAction(
req,
@ -987,7 +987,7 @@ public class OverlordResource
);
}
return Collections.singletonList(
new ResourceAction(new Resource(taskDatasource, ResourceType.DATASOURCE), Action.READ)
new ResourceAction(new Resource(taskDatasource, ResourceType.DATASOURCE), Action.WRITE)
);
};
List<TaskStatusPlus> optionalTypeFilteredList = collectionToFilter;

View File

@ -65,7 +65,7 @@ import java.util.stream.Collectors;
@Path("/druid/indexer/v1/supervisor")
public class SupervisorResource
{
private static final Function<VersionedSupervisorSpec, Iterable<ResourceAction>> SPEC_DATASOURCE_READ_RA_GENERATOR =
private static final Function<VersionedSupervisorSpec, Iterable<ResourceAction>> SPEC_DATASOURCE_WRITE_RA_GENERATOR =
supervisorSpec -> {
if (supervisorSpec.getSpec() == null) {
return null;
@ -75,7 +75,7 @@ public class SupervisorResource
}
return Iterables.transform(
supervisorSpec.getSpec().getDataSources(),
AuthorizationUtils.DATASOURCE_READ_RA_GENERATOR
AuthorizationUtils.DATASOURCE_WRITE_RA_GENERATOR
);
};
@ -376,7 +376,7 @@ public class SupervisorResource
AuthorizationUtils.filterAuthorizedResources(
req,
manager.getSupervisorHistory(),
SPEC_DATASOURCE_READ_RA_GENERATOR,
SPEC_DATASOURCE_WRITE_RA_GENERATOR,
authorizerMapper
)
).build()
@ -401,7 +401,7 @@ public class SupervisorResource
AuthorizationUtils.filterAuthorizedResources(
req,
historyForId,
SPEC_DATASOURCE_READ_RA_GENERATOR,
SPEC_DATASOURCE_WRITE_RA_GENERATOR,
authorizerMapper
)
);

View File

@ -81,8 +81,6 @@ import org.apache.druid.segment.realtime.appenderator.AppenderatorDriverAddResul
import org.apache.druid.segment.realtime.appenderator.SegmentsAndCommitMetadata;
import org.apache.druid.segment.realtime.appenderator.StreamAppenderatorDriver;
import org.apache.druid.segment.realtime.firehose.ChatHandler;
import org.apache.druid.server.security.Access;
import org.apache.druid.server.security.Action;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.utils.CollectionUtils;
@ -1361,12 +1359,10 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
/**
* Authorizes action to be performed on this task's datasource
*
* @return authorization result
*/
private Access authorizationCheck(final HttpServletRequest req, Action action)
private void authorizeRequest(final HttpServletRequest req)
{
return IndexTaskUtils.datasourceAuthorizationCheck(req, action, task.getDataSource(), authorizerMapper);
task.authorizeRequestForDatasourceWrite(req, authorizerMapper);
}
public Appenderator getAppenderator()
@ -1443,7 +1439,7 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
@Path("/stop")
public Response stop(@Context final HttpServletRequest req)
{
authorizationCheck(req, Action.WRITE);
authorizeRequest(req);
stopGracefully();
return Response.status(Response.Status.OK).build();
}
@ -1453,7 +1449,7 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
@Produces(MediaType.APPLICATION_JSON)
public Status getStatusHTTP(@Context final HttpServletRequest req)
{
authorizationCheck(req, Action.READ);
authorizeRequest(req);
return status;
}
@ -1468,7 +1464,7 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
@Produces(MediaType.APPLICATION_JSON)
public Map<PartitionIdType, SequenceOffsetType> getCurrentOffsets(@Context final HttpServletRequest req)
{
authorizationCheck(req, Action.READ);
authorizeRequest(req);
return getCurrentOffsets();
}
@ -1482,7 +1478,7 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
@Produces(MediaType.APPLICATION_JSON)
public Map<PartitionIdType, SequenceOffsetType> getEndOffsetsHTTP(@Context final HttpServletRequest req)
{
authorizationCheck(req, Action.READ);
authorizeRequest(req);
return getEndOffsets();
}
@ -1502,7 +1498,7 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
@Context final HttpServletRequest req
) throws InterruptedException
{
authorizationCheck(req, Action.WRITE);
authorizeRequest(req);
return setEndOffsets(sequences, finish);
}
@ -1552,7 +1548,7 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
@Context final HttpServletRequest req
)
{
authorizationCheck(req, Action.READ);
authorizeRequest(req);
return Response.ok(doGetRowStats()).build();
}
@ -1563,7 +1559,7 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
@Context final HttpServletRequest req
)
{
authorizationCheck(req, Action.READ);
authorizeRequest(req);
return Response.ok(doGetLiveReports()).build();
}
@ -1575,7 +1571,7 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
@Context final HttpServletRequest req
)
{
authorizationCheck(req, Action.READ);
authorizeRequest(req);
List<String> events = IndexTaskUtils.getMessagesFromSavedParseExceptions(
parseExceptionHandler.getSavedParseExceptions()
);
@ -1726,7 +1722,7 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
@Context final HttpServletRequest req
)
{
authorizationCheck(req, Action.READ);
authorizeRequest(req);
return getCheckpoints();
}
@ -1753,7 +1749,7 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
@Context final HttpServletRequest req
) throws InterruptedException
{
authorizationCheck(req, Action.WRITE);
authorizeRequest(req);
return pause();
}
@ -1808,7 +1804,7 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
@Path("/resume")
public Response resumeHTTP(@Context final HttpServletRequest req) throws InterruptedException
{
authorizationCheck(req, Action.WRITE);
authorizeRequest(req);
resume();
return Response.status(Response.Status.OK).build();
}
@ -1841,7 +1837,7 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
@Produces(MediaType.APPLICATION_JSON)
public DateTime getStartTime(@Context final HttpServletRequest req)
{
authorizationCheck(req, Action.WRITE);
authorizeRequest(req);
return startTime;
}

View File

@ -90,6 +90,14 @@ import org.apache.druid.segment.realtime.firehose.WindowedStorageAdapter;
import org.apache.druid.segment.realtime.plumber.NoopSegmentHandoffNotifierFactory;
import org.apache.druid.segment.transform.ExpressionTransform;
import org.apache.druid.segment.transform.TransformSpec;
import org.apache.druid.server.security.Access;
import org.apache.druid.server.security.Action;
import org.apache.druid.server.security.AuthConfig;
import org.apache.druid.server.security.AuthenticationResult;
import org.apache.druid.server.security.Authorizer;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.server.security.ForbiddenException;
import org.apache.druid.server.security.ResourceType;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec;
@ -111,6 +119,7 @@ import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import javax.annotation.Nullable;
import javax.servlet.http.HttpServletRequest;
import java.io.BufferedWriter;
import java.io.File;
import java.io.IOException;
@ -2604,6 +2613,81 @@ public class IndexTaskTest extends IngestionTestBase
}
}
@Test
public void testAuthorizeRequestForDatasourceWrite() throws Exception
{
// Need to run this only once
if (lockGranularity == LockGranularity.SEGMENT) {
return;
}
// Create auth mapper which allows datasourceReadUser to read datasource
// and datasourceWriteUser to write to datasource
final String datasourceWriteUser = "datasourceWriteUser";
final String datasourceReadUser = "datasourceReadUser";
AuthorizerMapper authorizerMapper = new AuthorizerMapper(null) {
@Override
public Authorizer getAuthorizer(String name)
{
return (authenticationResult, resource, action) -> {
final String username = authenticationResult.getIdentity();
if (!resource.getType().equals(ResourceType.DATASOURCE) || username == null) {
return new Access(false);
} else if (action == Action.WRITE) {
return new Access(username.equals(datasourceWriteUser));
} else {
return new Access(username.equals(datasourceReadUser));
}
};
}
};
// Create test target
final IndexTask indexTask = new IndexTask(
null,
null,
createDefaultIngestionSpec(
jsonMapper,
temporaryFolder.newFolder(),
null,
null,
createTuningConfigWithMaxRowsPerSegment(2, true),
false,
false
),
null
);
// Verify that datasourceWriteUser is successfully authorized
HttpServletRequest writeUserRequest = EasyMock.mock(HttpServletRequest.class);
expectAuthorizationTokenCheck(datasourceWriteUser, writeUserRequest);
EasyMock.replay(writeUserRequest);
indexTask.authorizeRequestForDatasourceWrite(writeUserRequest, authorizerMapper);
// Verify that datasourceReadUser is not successfully authorized
HttpServletRequest readUserRequest = EasyMock.mock(HttpServletRequest.class);
expectAuthorizationTokenCheck(datasourceReadUser, readUserRequest);
EasyMock.replay(readUserRequest);
expectedException.expect(ForbiddenException.class);
indexTask.authorizeRequestForDatasourceWrite(readUserRequest, authorizerMapper);
}
private void expectAuthorizationTokenCheck(String username, HttpServletRequest request)
{
AuthenticationResult authenticationResult = new AuthenticationResult(username, "druid", null, null);
EasyMock.expect(request.getAttribute(AuthConfig.DRUID_ALLOW_UNSECURED_PATH)).andReturn(null).anyTimes();
EasyMock.expect(request.getAttribute(AuthConfig.DRUID_AUTHORIZATION_CHECKED)).andReturn(null).atLeastOnce();
EasyMock.expect(request.getAttribute(AuthConfig.DRUID_AUTHENTICATION_RESULT))
.andReturn(authenticationResult)
.atLeastOnce();
request.setAttribute(AuthConfig.DRUID_AUTHORIZATION_CHECKED, false);
EasyMock.expectLastCall().anyTimes();
request.setAttribute(AuthConfig.DRUID_AUTHORIZATION_CHECKED, true);
EasyMock.expectLastCall().anyTimes();
}
@Test
public void testEqualsAndHashCode()
{

View File

@ -71,6 +71,7 @@ import org.junit.Test;
import org.junit.rules.ExpectedException;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import java.util.Arrays;
@ -116,9 +117,23 @@ public class OverlordResourceTest
@Override
public Access authorize(AuthenticationResult authenticationResult, Resource resource, Action action)
{
if (resource.getName().equals("allow")) {
final String username = authenticationResult.getIdentity();
switch (resource.getName()) {
case "allow":
return new Access(true);
} else {
case Datasources.WIKIPEDIA:
// All users can read wikipedia but only writer can write
return new Access(
action == Action.READ
|| (action == Action.WRITE && Users.WIKI_WRITER.equals(username))
);
case Datasources.BUZZFEED:
// All users can read buzzfeed but only writer can write
return new Access(
action == Action.READ
|| (action == Action.WRITE && Users.BUZZ_WRITER.equals(username))
);
default:
return new Access(false);
}
}
@ -841,6 +856,104 @@ public class OverlordResourceTest
Assert.assertTrue("DataSource Check", "allow".equals(responseObjects.get(0).getDataSource()));
}
@Test
public void testGetTasksRequiresDatasourceWrite()
{
// Setup mocks for a user who has write access to "wikipedia"
// and read access to "buzzfeed"
expectAuthorizationTokenCheck(Users.WIKI_WRITER);
// Setup mocks to return completed, active, known, pending and running tasks
EasyMock.expect(taskStorageQueryAdapter.getCompletedTaskInfoByCreatedTimeDuration(null, null, null)).andStubReturn(
ImmutableList.of(
createTaskInfo("id_5", Datasources.WIKIPEDIA),
createTaskInfo("id_6", Datasources.BUZZFEED)
)
);
EasyMock.expect(taskStorageQueryAdapter.getActiveTaskInfo(null)).andStubReturn(
ImmutableList.of(
createTaskInfo("id_1", Datasources.WIKIPEDIA),
createTaskInfo("id_2", Datasources.BUZZFEED)
)
);
EasyMock.<Collection<? extends TaskRunnerWorkItem>>expect(taskRunner.getKnownTasks()).andReturn(
ImmutableList.of(
new MockTaskRunnerWorkItem("id_1", null),
new MockTaskRunnerWorkItem("id_4", null)
)
).atLeastOnce();
EasyMock.<Collection<? extends TaskRunnerWorkItem>>expect(taskRunner.getPendingTasks()).andReturn(
ImmutableList.of(
new MockTaskRunnerWorkItem("id_4", null)
)
);
EasyMock.<Collection<? extends TaskRunnerWorkItem>>expect(taskRunner.getRunningTasks()).andReturn(
ImmutableList.of(
new MockTaskRunnerWorkItem("id_1", null)
)
);
// Replay all mocks
EasyMock.replay(
taskRunner,
taskMaster,
taskStorageQueryAdapter,
indexerMetadataStorageAdapter,
req,
workerTaskRunnerQueryAdapter
);
// Verify that only the tasks of write access datasource are returned
List<TaskStatusPlus> responseObjects = (List<TaskStatusPlus>) overlordResource
.getTasks(null, null, null, null, null, req)
.getEntity();
Assert.assertEquals(2, responseObjects.size());
for (TaskStatusPlus taskStatus : responseObjects) {
Assert.assertEquals(Datasources.WIKIPEDIA, taskStatus.getDataSource());
}
}
@Test
public void testGetTasksFilterByDatasourceRequiresWrite()
{
// Setup mocks for a user who has write access to "wikipedia"
// and read access to "buzzfeed"
expectAuthorizationTokenCheck(Users.WIKI_WRITER);
// Setup mocks to return completed, active, known, pending and running tasks
EasyMock.expect(taskStorageQueryAdapter.getCompletedTaskInfoByCreatedTimeDuration(null, null, null)).andStubReturn(
ImmutableList.of(
createTaskInfo("id_5", Datasources.WIKIPEDIA),
createTaskInfo("id_6", Datasources.BUZZFEED)
)
);
EasyMock.expect(taskStorageQueryAdapter.getActiveTaskInfo(null)).andStubReturn(
ImmutableList.of(
createTaskInfo("id_1", Datasources.WIKIPEDIA),
createTaskInfo("id_2", Datasources.BUZZFEED)
)
);
// Replay all mocks
EasyMock.replay(
taskRunner,
taskMaster,
taskStorageQueryAdapter,
indexerMetadataStorageAdapter,
req,
workerTaskRunnerQueryAdapter
);
// Verify that only the tasks of write access datasource are returned
expectedException.expect(WebApplicationException.class);
overlordResource.getTasks(null, Datasources.BUZZFEED, null, null, null, req);
}
@Test
public void testGetNullCompleteTask()
{
@ -926,6 +1039,27 @@ public class OverlordResourceTest
overlordResource.taskPost(task, req);
}
@Test
public void testTaskPostDeniesDatasourceReadUser()
{
expectAuthorizationTokenCheck(Users.WIKI_WRITER);
EasyMock.replay(
taskRunner,
taskMaster,
taskStorageQueryAdapter,
indexerMetadataStorageAdapter,
req,
workerTaskRunnerQueryAdapter
);
// Verify that taskPost fails for user who has only datasource read access
Task task = NoopTask.create(Datasources.BUZZFEED);
expectedException.expect(ForbiddenException.class);
expectedException.expect(ForbiddenException.class);
overlordResource.taskPost(task, req);
}
@Test
public void testKillPendingSegments()
{
@ -1317,7 +1451,12 @@ public class OverlordResourceTest
private void expectAuthorizationTokenCheck()
{
AuthenticationResult authenticationResult = new AuthenticationResult("druid", "druid", null, null);
expectAuthorizationTokenCheck(Users.DRUID);
}
private void expectAuthorizationTokenCheck(String username)
{
AuthenticationResult authenticationResult = new AuthenticationResult(username, "druid", null, null);
EasyMock.expect(req.getAttribute(AuthConfig.DRUID_ALLOW_UNSECURED_PATH)).andReturn(null).anyTimes();
EasyMock.expect(req.getAttribute(AuthConfig.DRUID_AUTHORIZATION_CHECKED)).andReturn(null).atLeastOnce();
EasyMock.expect(req.getAttribute(AuthConfig.DRUID_AUTHENTICATION_RESULT))
@ -1360,6 +1499,36 @@ public class OverlordResourceTest
};
}
private TaskInfo<Task, TaskStatus> createTaskInfo(String taskId, String datasource)
{
return new TaskInfo<>(
taskId,
DateTime.now(ISOChronology.getInstanceUTC()),
TaskStatus.success(taskId),
datasource,
getTaskWithIdAndDatasource(taskId, datasource)
);
}
/**
* Usernames to use in the tests.
*/
private static class Users
{
private static final String DRUID = "druid";
private static final String WIKI_WRITER = "Wiki Writer";
private static final String BUZZ_WRITER = "Buzz Writer";
}
/**
* Datasource names to use in the tests.
*/
private static class Datasources
{
private static final String WIKIPEDIA = "wikipedia";
private static final String BUZZFEED = "buzzfeed";
}
private static class MockTaskRunnerWorkItem extends TaskRunnerWorkItem
{
public MockTaskRunnerWorkItem(

View File

@ -130,7 +130,7 @@ public class SupervisorResourceFilterTest
)
{
expect(containerRequest.getPathSegments())
.andReturn(getPathSegments("/druid/indexer/v1/supervisor/datasource1"))
.andReturn(getPathSegments(path))
.anyTimes();
expect(containerRequest.getMethod()).andReturn(requestMethod).anyTimes();

View File

@ -30,10 +30,12 @@ import org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAu
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.server.security.Access;
import org.apache.druid.server.security.Action;
import org.apache.druid.server.security.AuthConfig;
import org.apache.druid.server.security.AuthenticationResult;
import org.apache.druid.server.security.Authorizer;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.server.security.ResourceType;
import org.easymock.Capture;
import org.easymock.EasyMock;
import org.easymock.EasyMockRunner;
@ -92,7 +94,14 @@ public class SupervisorResourceTest extends EasyMockSupport
@Override
public Authorizer getAuthorizer(String name)
{
// Create an Authorizer that only allows Datasource WRITE requests
// because all SupervisorResource APIs must only check Datasource WRITE access
return (authenticationResult, resource, action) -> {
if (!resource.getType().equals(ResourceType.DATASOURCE)
|| action != Action.WRITE) {
return new Access(false);
}
if (authenticationResult.getIdentity().equals("druid")) {
return Access.OK;
} else {

View File

@ -0,0 +1,393 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.seekablestream;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.data.input.impl.ByteEntity;
import org.apache.druid.data.input.impl.CsvInputFormat;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord;
import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber;
import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
import org.apache.druid.indexing.seekablestream.common.StreamPartition;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.granularity.AllGranularity;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.granularity.ArbitraryGranularitySpec;
import org.apache.druid.segment.transform.TransformSpec;
import org.apache.druid.server.security.Access;
import org.apache.druid.server.security.Action;
import org.apache.druid.server.security.AuthConfig;
import org.apache.druid.server.security.AuthenticationResult;
import org.apache.druid.server.security.Authorizer;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.server.security.ForbiddenException;
import org.apache.druid.server.security.ResourceType;
import org.easymock.EasyMock;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.servlet.http.HttpServletRequest;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.function.Consumer;
import static org.easymock.EasyMock.mock;
import static org.easymock.EasyMock.replay;
/**
* Unit Test to test authorization requirements of
* {@link SeekableStreamIndexTaskRunner} and {@link SeekableStreamIndexTask}.
*/
public class SeekableStreamIndexTaskRunnerAuthTest
{
/**
* Test target.
*/
private TestSeekableStreamIndexTaskRunner taskRunner;
@Rule
public ExpectedException expectedException = ExpectedException.none();
@Before
public void setUp()
{
// Create an AuthorizerMapper that only allows access to a Datasource resource
AuthorizerMapper authorizerMapper = new AuthorizerMapper(null)
{
@Override
public Authorizer getAuthorizer(String name)
{
return (authenticationResult, resource, action) -> {
final String username = authenticationResult.getIdentity();
// Allow access to a Datasource if
// - any user requests Read access
// - or, Datasource Write User requests Write access
if (resource.getType().equals(ResourceType.DATASOURCE)) {
return new Access(
action == Action.READ
|| (action == Action.WRITE && username.equals(Users.DATASOURCE_WRITE))
);
}
// Do not allow access to any other resource
return new Access(false);
};
}
};
DataSchema dataSchema = new DataSchema(
"datasource",
new TimestampSpec(null, null, null),
new DimensionsSpec(Collections.emptyList()),
new AggregatorFactory[]{},
new ArbitraryGranularitySpec(new AllGranularity(), Collections.emptyList()),
TransformSpec.NONE,
null,
null
);
SeekableStreamIndexTaskTuningConfig tuningConfig = mock(SeekableStreamIndexTaskTuningConfig.class);
/*expect(tuningConfig.getIntermediateHandoffPeriod()).andReturn(Period.minutes(10));
expect(tuningConfig.isLogParseExceptions()).andReturn(false);
expect(tuningConfig.getMaxParseExceptions()).andReturn(1000);
expect(tuningConfig.getMaxSavedParseExceptions()).andReturn(100);
replay(tuningConfig);*/
SeekableStreamIndexTaskIOConfig<String, String> ioConfig = new TestSeekableStreamIndexTaskIOConfig();
// Initiliaze task and task runner
SeekableStreamIndexTask<String, String, ByteEntity> indexTask
= new TestSeekableStreamIndexTask("id", dataSchema, tuningConfig, ioConfig);
taskRunner = new TestSeekableStreamIndexTaskRunner(indexTask, authorizerMapper);
}
@Test
public void testGetStatusHttp()
{
verifyOnlyDatasourceWriteUserCanAccess(taskRunner::getStatusHTTP);
}
@Test
public void testGetStartTime()
{
verifyOnlyDatasourceWriteUserCanAccess(taskRunner::getStartTime);
}
@Test
public void testStop()
{
verifyOnlyDatasourceWriteUserCanAccess(taskRunner::stop);
}
@Test
public void testPauseHttp()
{
verifyOnlyDatasourceWriteUserCanAccess(req -> {
try {
taskRunner.pauseHTTP(req);
}
catch (InterruptedException e) {
// ignore
}
});
}
@Test
public void testResumeHttp()
{
verifyOnlyDatasourceWriteUserCanAccess(req -> {
try {
taskRunner.resumeHTTP(req);
}
catch (InterruptedException e) {
}
});
}
@Test
public void testGetEndOffsets()
{
verifyOnlyDatasourceWriteUserCanAccess(taskRunner::getCurrentOffsets);
}
@Test
public void testSetEndOffsetsHttp()
{
verifyOnlyDatasourceWriteUserCanAccess(request -> {
try {
taskRunner.setEndOffsetsHTTP(Collections.emptyMap(), false, request);
}
catch (InterruptedException e) {
}
});
}
@Test
public void testGetCheckpointsHttp()
{
verifyOnlyDatasourceWriteUserCanAccess(taskRunner::getCheckpointsHTTP);
}
private void verifyOnlyDatasourceWriteUserCanAccess(
Consumer<HttpServletRequest> method
)
{
// Verify that datasource write user can access
HttpServletRequest allowedRequest = createRequest(Users.DATASOURCE_WRITE);
replay(allowedRequest);
method.accept(allowedRequest);
// Verify that no other user can access
HttpServletRequest blockedRequest = createRequest(Users.DATASOURCE_READ);
replay(blockedRequest);
expectedException.expect(ForbiddenException.class);
method.accept(blockedRequest);
}
private HttpServletRequest createRequest(String username)
{
HttpServletRequest request = mock(HttpServletRequest.class);
AuthenticationResult authenticationResult = new AuthenticationResult(username, "druid", null, null);
EasyMock.expect(request.getAttribute(AuthConfig.DRUID_ALLOW_UNSECURED_PATH)).andReturn(null).anyTimes();
EasyMock.expect(request.getAttribute(AuthConfig.DRUID_AUTHORIZATION_CHECKED)).andReturn(null).atLeastOnce();
EasyMock.expect(request.getAttribute(AuthConfig.DRUID_AUTHENTICATION_RESULT))
.andReturn(authenticationResult)
.atLeastOnce();
request.setAttribute(AuthConfig.DRUID_AUTHORIZATION_CHECKED, false);
EasyMock.expectLastCall().anyTimes();
request.setAttribute(AuthConfig.DRUID_AUTHORIZATION_CHECKED, true);
EasyMock.expectLastCall().anyTimes();
return request;
}
/**
* Dummy implementation used as the test target to test out the non-abstract methods.
*/
private static class TestSeekableStreamIndexTaskRunner
extends SeekableStreamIndexTaskRunner<String, String, ByteEntity>
{
private TestSeekableStreamIndexTaskRunner(
SeekableStreamIndexTask<String, String, ByteEntity> task,
AuthorizerMapper authorizerMapper
)
{
super(task, null, authorizerMapper, LockGranularity.SEGMENT);
}
@Override
protected boolean isEndOfShard(String seqNum)
{
return false;
}
@Nullable
@Override
protected TreeMap<Integer, Map<String, String>> getCheckPointsFromContext(
TaskToolbox toolbox,
String checkpointsString
)
{
return null;
}
@Override
protected String getNextStartOffset(String sequenceNumber)
{
return null;
}
@Override
protected SeekableStreamEndSequenceNumbers<String, String> deserializePartitionsFromMetadata(
ObjectMapper mapper,
Object object
)
{
return null;
}
@Nonnull
@Override
protected List<OrderedPartitionableRecord<String, String, ByteEntity>> getRecords(
RecordSupplier<String, String, ByteEntity> recordSupplier,
TaskToolbox toolbox
)
{
return null;
}
@Override
protected SeekableStreamDataSourceMetadata<String, String> createDataSourceMetadata(
SeekableStreamSequenceNumbers<String, String> partitions
)
{
return null;
}
@Override
protected OrderedSequenceNumber<String> createSequenceNumber(String sequenceNumber)
{
return null;
}
@Override
protected void possiblyResetDataSourceMetadata(
TaskToolbox toolbox,
RecordSupplier<String, String, ByteEntity> recordSupplier,
Set<StreamPartition<String>> assignment
)
{
}
@Override
protected boolean isEndOffsetExclusive()
{
return false;
}
@Override
protected TypeReference<List<SequenceMetadata<String, String>>> getSequenceMetadataTypeReference()
{
return null;
}
}
private static class TestSeekableStreamIndexTask extends SeekableStreamIndexTask<String, String, ByteEntity>
{
public TestSeekableStreamIndexTask(
String id,
DataSchema dataSchema,
SeekableStreamIndexTaskTuningConfig tuningConfig,
SeekableStreamIndexTaskIOConfig<String, String> ioConfig
)
{
super(id, null, dataSchema, tuningConfig, ioConfig, null, null);
}
@Override
public String getType()
{
return null;
}
@Override
protected SeekableStreamIndexTaskRunner<String, String, ByteEntity> createTaskRunner()
{
return null;
}
@Override
protected RecordSupplier<String, String, ByteEntity> newTaskRecordSupplier()
{
return null;
}
}
private static class TestSeekableStreamIndexTaskIOConfig extends SeekableStreamIndexTaskIOConfig<String, String>
{
public TestSeekableStreamIndexTaskIOConfig()
{
super(
null,
"someSequence",
new SeekableStreamStartSequenceNumbers<>("abc", "def", Collections.emptyMap(), Collections.emptyMap(), null),
new SeekableStreamEndSequenceNumbers<>("abc", "def", Collections.emptyMap(), Collections.emptyMap()),
false,
DateTimes.nowUtc().minusDays(2),
DateTimes.nowUtc(),
new CsvInputFormat(null, null, true, null, 0)
);
}
}
/**
* Usernames used in the tests.
*/
private static class Users
{
private static final String DATASOURCE_READ = "datasourceRead";
private static final String DATASOURCE_WRITE = "datasourceWrite";
}
}

View File

@ -53,41 +53,41 @@ description: Admin users
uniqueMember: uid=admin,ou=Users,dc=example,dc=org
uniqueMember: uid=druid_system,ou=Users,dc=example,dc=org
dn: uid=datasourceOnlyUser,ou=Users,dc=example,dc=org
uid: datasourceOnlyUser
cn: datasourceOnlyUser
sn: datasourceOnlyUser
dn: uid=datasourceReadOnlyUser,ou=Users,dc=example,dc=org
uid: datasourceReadOnlyUser
cn: datasourceReadOnlyUser
sn: datasourceReadOnlyUser
objectClass: top
objectClass: posixAccount
objectClass: inetOrgPerson
homeDirectory: /home/datasourceOnlyUser
homeDirectory: /home/datasourceReadOnlyUser
uidNumber: 3
gidNumber: 3
userPassword: helloworld
dn: cn=datasourceOnlyGroup,ou=Groups,dc=example,dc=org
dn: cn=datasourceReadOnlyGroup,ou=Groups,dc=example,dc=org
objectClass: groupOfUniqueNames
cn: datasourceOnlyGroup
description: datasourceOnlyGroup users
uniqueMember: uid=datasourceOnlyUser,ou=Users,dc=example,dc=org
cn: datasourceReadOnlyGroup
description: datasourceReadOnlyGroup users
uniqueMember: uid=datasourceReadOnlyUser,ou=Users,dc=example,dc=org
dn: uid=datasourceWithStateUser,ou=Users,dc=example,dc=org
uid: datasourceWithStateUser
cn: datasourceWithStateUser
sn: datasourceWithStateUser
dn: uid=datasourceReadWithStateUser,ou=Users,dc=example,dc=org
uid: datasourceReadWithStateUser
cn: datasourceReadWithStateUser
sn: datasourceReadWithStateUser
objectClass: top
objectClass: posixAccount
objectClass: inetOrgPerson
homeDirectory: /home/datasourceWithStateUser
homeDirectory: /home/datasourceReadWithStateUser
uidNumber: 4
gidNumber: 4
userPassword: helloworld
dn: cn=datasourceWithStateGroup,ou=Groups,dc=example,dc=org
dn: cn=datasourceReadWithStateGroup,ou=Groups,dc=example,dc=org
objectClass: groupOfUniqueNames
cn: datasourceWithStateGroup
description: datasourceWithStateGroup users
uniqueMember: uid=datasourceWithStateUser,ou=Users,dc=example,dc=org
cn: datasourceReadWithStateGroup
description: datasourceReadWithStateGroup users
uniqueMember: uid=datasourceReadWithStateUser,ou=Users,dc=example,dc=org
dn: uid=stateOnlyUser,ou=Users,dc=example,dc=org
uid: stateOnlyUser
@ -137,20 +137,38 @@ uidNumber: 7
gidNumber: 7
userPassword: helloworld
dn: uid=datasourceAndSysUser,ou=Users,dc=example,dc=org
uid: datasourceAndSysUser
cn: datasourceAndSysUser
sn: datasourceAndSysUser
dn: uid=datasourceReadAndSysUser,ou=Users,dc=example,dc=org
uid: datasourceReadAndSysUser
cn: datasourceReadAndSysUser
sn: datasourceReadAndSysUser
objectClass: top
objectClass: posixAccount
objectClass: inetOrgPerson
homeDirectory: /home/datasourceAndSysUser
homeDirectory: /home/datasourceReadAndSysUser
uidNumber: 8
gidNumber: 8
userPassword: helloworld
dn: cn=datasourceWithSysGroup,ou=Groups,dc=example,dc=org
dn: cn=datasourceReadWithSysGroup,ou=Groups,dc=example,dc=org
objectClass: groupOfUniqueNames
cn: datasourceWithSysGroup
description: datasourceWithSysGroup users
uniqueMember: uid=datasourceAndSysUser,ou=Users,dc=example,dc=org
cn: datasourceReadWithSysGroup
description: datasourceReadWithSysGroup users
uniqueMember: uid=datasourceReadAndSysUser,ou=Users,dc=example,dc=org
dn: uid=datasourceWriteAndSysUser,ou=Users,dc=example,dc=org
uid: datasourceWriteAndSysUser
cn: datasourceWriteAndSysUser
sn: datasourceWriteAndSysUser
objectClass: top
objectClass: posixAccount
objectClass: inetOrgPerson
homeDirectory: /home/datasourceWriteAndSysUser
uidNumber: 8
gidNumber: 8
userPassword: helloworld
dn: cn=datasourceWriteWithSysGroup,ou=Groups,dc=example,dc=org
objectClass: groupOfUniqueNames
cn: datasourceWriteWithSysGroup
description: datasourceWriteWithSysGroup users
uniqueMember: uid=datasourceWriteAndSysUser,ou=Users,dc=example,dc=org

View File

@ -98,7 +98,7 @@ public abstract class AbstractAuthConfigurationTest
* create a ResourceAction set of permissions that can only read a 'auth_test' datasource, for Authorizer
* implementations which use ResourceAction pattern matching
*/
protected static final List<ResourceAction> DATASOURCE_ONLY_PERMISSIONS = Collections.singletonList(
protected static final List<ResourceAction> DATASOURCE_READ_ONLY_PERMISSIONS = Collections.singletonList(
new ResourceAction(
new Resource("auth_test", ResourceType.DATASOURCE),
Action.READ
@ -109,7 +109,7 @@ public abstract class AbstractAuthConfigurationTest
* create a ResourceAction set of permissions that can only read 'auth_test' + partial SYSTEM_TABLE, for Authorizer
* implementations which use ResourceAction pattern matching
*/
protected static final List<ResourceAction> DATASOURCE_SYS_PERMISSIONS = ImmutableList.of(
protected static final List<ResourceAction> DATASOURCE_READ_SYS_PERMISSIONS = ImmutableList.of(
new ResourceAction(
new Resource("auth_test", ResourceType.DATASOURCE),
Action.READ
@ -134,11 +134,39 @@ public abstract class AbstractAuthConfigurationTest
)
);
/**
* create a ResourceAction set of permissions that can write datasource 'auth_test'
*/
protected static final List<ResourceAction> DATASOURCE_WRITE_SYS_PERMISSIONS = ImmutableList.of(
new ResourceAction(
new Resource("auth_test", ResourceType.DATASOURCE),
Action.WRITE
),
new ResourceAction(
new Resource("segments", ResourceType.SYSTEM_TABLE),
Action.READ
),
// test missing state permission but having servers permission
new ResourceAction(
new Resource("servers", ResourceType.SYSTEM_TABLE),
Action.READ
),
// test missing state permission but having server_segments permission
new ResourceAction(
new Resource("server_segments", ResourceType.SYSTEM_TABLE),
Action.READ
),
new ResourceAction(
new Resource("tasks", ResourceType.SYSTEM_TABLE),
Action.READ
)
);
/**
* create a ResourceAction set of permissions that can only read 'auth_test' + STATE + SYSTEM_TABLE read access, for
* Authorizer implementations which use ResourceAction pattern matching
*/
protected static final List<ResourceAction> DATASOURCE_SYS_STATE_PERMISSIONS = ImmutableList.of(
protected static final List<ResourceAction> DATASOURCE_READ_SYS_STATE_PERMISSIONS = ImmutableList.of(
new ResourceAction(
new Resource("auth_test", ResourceType.DATASOURCE),
Action.READ
@ -187,16 +215,18 @@ public abstract class AbstractAuthConfigurationTest
protected CoordinatorResourceTestClient coordinatorClient;
protected HttpClient adminClient;
protected HttpClient datasourceOnlyUserClient;
protected HttpClient datasourceAndSysUserClient;
protected HttpClient datasourceWithStateUserClient;
protected HttpClient datasourceReadOnlyUserClient;
protected HttpClient datasourceReadAndSysUserClient;
protected HttpClient datasourceWriteAndSysUserClient;
protected HttpClient datasourceReadWithStateUserClient;
protected HttpClient stateOnlyUserClient;
protected HttpClient internalSystemClient;
protected abstract void setupDatasourceOnlyUser() throws Exception;
protected abstract void setupDatasourceAndSysTableUser() throws Exception;
protected abstract void setupDatasourceAndSysAndStateUser() throws Exception;
protected abstract void setupDatasourceReadOnlyUser() throws Exception;
protected abstract void setupDatasourceReadAndSysTableUser() throws Exception;
protected abstract void setupDatasourceWriteAndSysTableUser() throws Exception;
protected abstract void setupDatasourceReadAndSysAndStateUser() throws Exception;
protected abstract void setupSysTableAndStateOnlyUser() throws Exception;
protected abstract void setupTestSpecificHttpClients() throws Exception;
protected abstract String getAuthenticatorName();
@ -242,44 +272,44 @@ public abstract class AbstractAuthConfigurationTest
}
@Test
public void test_systemSchemaAccess_datasourceOnlyUser() throws Exception
public void test_systemSchemaAccess_datasourceReadOnlyUser() throws Exception
{
// check that we can access a datasource-permission restricted resource on the broker
HttpUtil.makeRequest(
datasourceOnlyUserClient,
datasourceReadOnlyUserClient,
HttpMethod.GET,
config.getBrokerUrl() + "/druid/v2/datasources/auth_test",
null
);
// as user that can only read auth_test
LOG.info("Checking sys.segments query as datasourceOnlyUser...");
LOG.info("Checking sys.segments query as datasourceReadOnlyUser...");
verifySystemSchemaQueryFailure(
datasourceOnlyUserClient,
datasourceReadOnlyUserClient,
SYS_SCHEMA_SEGMENTS_QUERY,
HttpResponseStatus.FORBIDDEN,
"{\"Access-Check-Result\":\"Allowed:false, Message:\"}"
);
LOG.info("Checking sys.servers query as datasourceOnlyUser...");
LOG.info("Checking sys.servers query as datasourceReadOnlyUser...");
verifySystemSchemaQueryFailure(
datasourceOnlyUserClient,
datasourceReadOnlyUserClient,
SYS_SCHEMA_SERVERS_QUERY,
HttpResponseStatus.FORBIDDEN,
"{\"Access-Check-Result\":\"Allowed:false, Message:\"}"
);
LOG.info("Checking sys.server_segments query as datasourceOnlyUser...");
LOG.info("Checking sys.server_segments query as datasourceReadOnlyUser...");
verifySystemSchemaQueryFailure(
datasourceOnlyUserClient,
datasourceReadOnlyUserClient,
SYS_SCHEMA_SERVER_SEGMENTS_QUERY,
HttpResponseStatus.FORBIDDEN,
"{\"Access-Check-Result\":\"Allowed:false, Message:\"}"
);
LOG.info("Checking sys.tasks query as datasourceOnlyUser...");
LOG.info("Checking sys.tasks query as datasourceReadOnlyUser...");
verifySystemSchemaQueryFailure(
datasourceOnlyUserClient,
datasourceReadOnlyUserClient,
SYS_SCHEMA_TASKS_QUERY,
HttpResponseStatus.FORBIDDEN,
"{\"Access-Check-Result\":\"Allowed:false, Message:\"}"
@ -287,83 +317,119 @@ public abstract class AbstractAuthConfigurationTest
}
@Test
public void test_systemSchemaAccess_datasourceAndSysUser() throws Exception
public void test_systemSchemaAccess_datasourceReadAndSysUser() throws Exception
{
// check that we can access a datasource-permission restricted resource on the broker
HttpUtil.makeRequest(
datasourceAndSysUserClient,
datasourceReadAndSysUserClient,
HttpMethod.GET,
config.getBrokerUrl() + "/druid/v2/datasources/auth_test",
null
);
// as user that can only read auth_test
LOG.info("Checking sys.segments query as datasourceAndSysUser...");
LOG.info("Checking sys.segments query as datasourceReadAndSysUser...");
verifySystemSchemaQuery(
datasourceAndSysUserClient,
datasourceReadAndSysUserClient,
SYS_SCHEMA_SEGMENTS_QUERY,
adminSegments.stream()
.filter((segmentEntry) -> "auth_test".equals(segmentEntry.get("datasource")))
.collect(Collectors.toList())
);
LOG.info("Checking sys.servers query as datasourceAndSysUser...");
LOG.info("Checking sys.servers query as datasourceReadAndSysUser...");
verifySystemSchemaQueryFailure(
datasourceAndSysUserClient,
datasourceReadAndSysUserClient,
SYS_SCHEMA_SERVERS_QUERY,
HttpResponseStatus.FORBIDDEN,
"{\"Access-Check-Result\":\"Insufficient permission to view servers : Allowed:false, Message:\"}"
);
LOG.info("Checking sys.server_segments query as datasourceAndSysUser...");
LOG.info("Checking sys.server_segments query as datasourceReadAndSysUser...");
verifySystemSchemaQueryFailure(
datasourceAndSysUserClient,
datasourceReadAndSysUserClient,
SYS_SCHEMA_SERVER_SEGMENTS_QUERY,
HttpResponseStatus.FORBIDDEN,
"{\"Access-Check-Result\":\"Insufficient permission to view servers : Allowed:false, Message:\"}"
);
LOG.info("Checking sys.tasks query as datasourceAndSysUser...");
// Verify that sys.tasks result is empty as it is filtered by Datasource WRITE access
LOG.info("Checking sys.tasks query as datasourceReadAndSysUser...");
verifySystemSchemaQuery(
datasourceAndSysUserClient,
datasourceReadAndSysUserClient,
SYS_SCHEMA_TASKS_QUERY,
Collections.emptyList()
);
}
@Test
public void test_systemSchemaAccess_datasourceWriteAndSysUser() throws Exception
{
// Verify that sys.segments result is empty as it is filtered by Datasource READ access
LOG.info("Checking sys.segments query as datasourceWriteAndSysUser...");
verifySystemSchemaQuery(
datasourceWriteAndSysUserClient,
SYS_SCHEMA_SEGMENTS_QUERY,
Collections.emptyList()
);
LOG.info("Checking sys.servers query as datasourceWriteAndSysUser...");
verifySystemSchemaQueryFailure(
datasourceWriteAndSysUserClient,
SYS_SCHEMA_SERVERS_QUERY,
HttpResponseStatus.FORBIDDEN,
"{\"Access-Check-Result\":\"Insufficient permission to view servers : Allowed:false, Message:\"}"
);
LOG.info("Checking sys.server_segments query as datasourceWriteAndSysUser...");
verifySystemSchemaQueryFailure(
datasourceWriteAndSysUserClient,
SYS_SCHEMA_SERVER_SEGMENTS_QUERY,
HttpResponseStatus.FORBIDDEN,
"{\"Access-Check-Result\":\"Insufficient permission to view servers : Allowed:false, Message:\"}"
);
LOG.info("Checking sys.tasks query as datasourceWriteAndSysUser...");
verifySystemSchemaQuery(
datasourceWriteAndSysUserClient,
SYS_SCHEMA_TASKS_QUERY,
adminTasks.stream()
.filter((taskEntry) -> "auth_test".equals(taskEntry.get("datasource")))
.filter((segmentEntry) -> "auth_test".equals(segmentEntry.get("datasource")))
.collect(Collectors.toList())
);
}
@Test
public void test_systemSchemaAccess_datasourceAndSysWithStateUser() throws Exception
public void test_systemSchemaAccess_datasourceReadAndSysWithStateUser() throws Exception
{
// check that we can access a state-permission restricted resource on the broker
HttpUtil.makeRequest(
datasourceWithStateUserClient,
datasourceReadWithStateUserClient,
HttpMethod.GET,
config.getBrokerUrl() + "/status",
null
);
// as user that can read auth_test and STATE
LOG.info("Checking sys.segments query as datasourceWithStateUser...");
LOG.info("Checking sys.segments query as datasourceReadWithStateUser...");
verifySystemSchemaQuery(
datasourceWithStateUserClient,
datasourceReadWithStateUserClient,
SYS_SCHEMA_SEGMENTS_QUERY,
adminSegments.stream()
.filter((segmentEntry) -> "auth_test".equals(segmentEntry.get("datasource")))
.collect(Collectors.toList())
);
LOG.info("Checking sys.servers query as datasourceWithStateUser...");
LOG.info("Checking sys.servers query as datasourceReadWithStateUser...");
verifySystemSchemaServerQuery(
datasourceWithStateUserClient,
datasourceReadWithStateUserClient,
SYS_SCHEMA_SERVERS_QUERY,
adminServers
);
LOG.info("Checking sys.server_segments query as datasourceWithStateUser...");
LOG.info("Checking sys.server_segments query as datasourceReadWithStateUser...");
verifySystemSchemaQuery(
datasourceWithStateUserClient,
datasourceReadWithStateUserClient,
SYS_SCHEMA_SERVER_SEGMENTS_QUERY,
adminServerSegments.stream()
.filter((serverSegmentEntry) -> ((String) serverSegmentEntry.get("segment_id")).contains(
@ -371,13 +437,12 @@ public abstract class AbstractAuthConfigurationTest
.collect(Collectors.toList())
);
LOG.info("Checking sys.tasks query as datasourceWithStateUser...");
// Verify that sys.tasks result is empty as it is filtered by Datasource WRITE access
LOG.info("Checking sys.tasks query as datasourceReadWithStateUser...");
verifySystemSchemaQuery(
datasourceWithStateUserClient,
datasourceReadWithStateUserClient,
SYS_SCHEMA_TASKS_QUERY,
adminTasks.stream()
.filter((taskEntry) -> "auth_test".equals(taskEntry.get("datasource")))
.collect(Collectors.toList())
Collections.emptyList()
);
}
@ -500,9 +565,10 @@ public abstract class AbstractAuthConfigurationTest
protected void setupHttpClientsAndUsers() throws Exception
{
setupHttpClients();
setupDatasourceOnlyUser();
setupDatasourceAndSysTableUser();
setupDatasourceAndSysAndStateUser();
setupDatasourceReadOnlyUser();
setupDatasourceReadAndSysTableUser();
setupDatasourceWriteAndSysTableUser();
setupDatasourceReadAndSysAndStateUser();
setupSysTableAndStateOnlyUser();
}
@ -766,18 +832,23 @@ public abstract class AbstractAuthConfigurationTest
httpClient
);
datasourceOnlyUserClient = new CredentialedHttpClient(
new BasicCredentials("datasourceOnlyUser", "helloworld"),
datasourceReadOnlyUserClient = new CredentialedHttpClient(
new BasicCredentials("datasourceReadOnlyUser", "helloworld"),
httpClient
);
datasourceAndSysUserClient = new CredentialedHttpClient(
new BasicCredentials("datasourceAndSysUser", "helloworld"),
datasourceReadAndSysUserClient = new CredentialedHttpClient(
new BasicCredentials("datasourceReadAndSysUser", "helloworld"),
httpClient
);
datasourceWithStateUserClient = new CredentialedHttpClient(
new BasicCredentials("datasourceWithStateUser", "helloworld"),
datasourceWriteAndSysUserClient = new CredentialedHttpClient(
new BasicCredentials("datasourceWriteAndSysUser", "helloworld"),
httpClient
);
datasourceReadWithStateUserClient = new CredentialedHttpClient(
new BasicCredentials("datasourceReadWithStateUser", "helloworld"),
httpClient
);

View File

@ -70,38 +70,50 @@ public class ITBasicAuthConfigurationTest extends AbstractAuthConfigurationTest
}
@Override
protected void setupDatasourceOnlyUser() throws Exception
protected void setupDatasourceReadOnlyUser() throws Exception
{
createUserAndRoleWithPermissions(
adminClient,
"datasourceOnlyUser",
"datasourceReadOnlyUser",
"helloworld",
"datasourceOnlyRole",
DATASOURCE_ONLY_PERMISSIONS
"datasourceReadOnlyRole",
DATASOURCE_READ_ONLY_PERMISSIONS
);
}
@Override
protected void setupDatasourceAndSysTableUser() throws Exception
protected void setupDatasourceReadAndSysTableUser() throws Exception
{
createUserAndRoleWithPermissions(
adminClient,
"datasourceAndSysUser",
"datasourceReadAndSysUser",
"helloworld",
"datasourceAndSysRole",
DATASOURCE_SYS_PERMISSIONS
"datasourceReadAndSysRole",
DATASOURCE_READ_SYS_PERMISSIONS
);
}
@Override
protected void setupDatasourceAndSysAndStateUser() throws Exception
protected void setupDatasourceWriteAndSysTableUser() throws Exception
{
createUserAndRoleWithPermissions(
adminClient,
"datasourceWithStateUser",
"datasourceWriteAndSysUser",
"helloworld",
"datasourceWithStateRole",
DATASOURCE_SYS_STATE_PERMISSIONS
"datasourceWriteAndSysRole",
DATASOURCE_WRITE_SYS_PERMISSIONS
);
}
@Override
protected void setupDatasourceReadAndSysAndStateUser() throws Exception
{
createUserAndRoleWithPermissions(
adminClient,
"datasourceReadWithStateUser",
"helloworld",
"datasourceReadWithStateRole",
DATASOURCE_READ_SYS_STATE_PERMISSIONS
);
}

View File

@ -120,29 +120,38 @@ public class ITBasicAuthLdapConfigurationTest extends AbstractAuthConfigurationT
@Override
protected void setupDatasourceOnlyUser() throws Exception
protected void setupDatasourceReadOnlyUser() throws Exception
{
createRoleWithPermissionsAndGroupMapping(
"datasourceOnlyGroup",
ImmutableMap.of("datasourceOnlyRole", DATASOURCE_ONLY_PERMISSIONS)
"datasourceReadOnlyGroup",
ImmutableMap.of("datasourceReadOnlyRole", DATASOURCE_READ_ONLY_PERMISSIONS)
);
}
@Override
protected void setupDatasourceAndSysTableUser() throws Exception
protected void setupDatasourceReadAndSysTableUser() throws Exception
{
createRoleWithPermissionsAndGroupMapping(
"datasourceWithSysGroup",
ImmutableMap.of("datasourceWithSysRole", DATASOURCE_SYS_PERMISSIONS)
"datasourceReadWithSysGroup",
ImmutableMap.of("datasourceReadWithSysRole", DATASOURCE_READ_SYS_PERMISSIONS)
);
}
@Override
protected void setupDatasourceAndSysAndStateUser() throws Exception
protected void setupDatasourceWriteAndSysTableUser() throws Exception
{
createRoleWithPermissionsAndGroupMapping(
"datasourceWithStateGroup",
ImmutableMap.of("datasourceWithStateRole", DATASOURCE_SYS_STATE_PERMISSIONS)
"datasourceWriteWithSysGroup",
ImmutableMap.of("datasourceWriteWithSysRole", DATASOURCE_WRITE_SYS_PERMISSIONS)
);
}
@Override
protected void setupDatasourceReadAndSysAndStateUser() throws Exception
{
createRoleWithPermissionsAndGroupMapping(
"datasourceReadWithStateGroup",
ImmutableMap.of("datasourceReadWithStateRole", DATASOURCE_READ_SYS_STATE_PERMISSIONS)
);
}

View File

@ -872,7 +872,7 @@ public class SystemSchema extends AbstractSchema
);
Function<TaskStatusPlus, Iterable<ResourceAction>> raGenerator = task -> Collections.singletonList(
AuthorizationUtils.DATASOURCE_READ_RA_GENERATOR.apply(task.getDataSource()));
AuthorizationUtils.DATASOURCE_WRITE_RA_GENERATOR.apply(task.getDataSource()));
final Iterable<TaskStatusPlus> authorizedTasks = AuthorizationUtils.filterAuthorizedResources(
authenticationResult,
@ -1014,7 +1014,7 @@ public class SystemSchema extends AbstractSchema
);
Function<SupervisorStatus, Iterable<ResourceAction>> raGenerator = supervisor -> Collections.singletonList(
AuthorizationUtils.DATASOURCE_READ_RA_GENERATOR.apply(supervisor.getSource()));
AuthorizationUtils.DATASOURCE_WRITE_RA_GENERATOR.apply(supervisor.getSource()));
final Iterable<SupervisorStatus> authorizedSupervisors = AuthorizationUtils.filterAuthorizedResources(
authenticationResult,

View File

@ -81,9 +81,12 @@ import org.apache.druid.server.coordination.DruidServerMetadata;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.coordinator.BytesAccumulatingResponseHandler;
import org.apache.druid.server.security.Access;
import org.apache.druid.server.security.Action;
import org.apache.druid.server.security.AuthenticationResult;
import org.apache.druid.server.security.Authorizer;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.server.security.NoopEscalator;
import org.apache.druid.server.security.ResourceType;
import org.apache.druid.sql.calcite.planner.PlannerConfig;
import org.apache.druid.sql.calcite.schema.SystemSchema.SegmentsTable;
import org.apache.druid.sql.calcite.table.RowSignatures;
@ -196,14 +199,7 @@ public class SystemSchemaTest extends CalciteTestBase
.addMockedMethod("getStatus")
.createMock();
request = EasyMock.createMock(Request.class);
authMapper = new AuthorizerMapper(null)
{
@Override
public Authorizer getAuthorizer(String name)
{
return (authenticationResult, resource, action) -> new Access(true);
}
};
authMapper = createAuthMapper();
final File tmpDir = temporaryFolder.newFolder();
final QueryableIndex index1 = IndexBuilder.create()
@ -554,33 +550,7 @@ public class SystemSchemaTest extends CalciteTestBase
EasyMock.expect(metadataView.getPublishedSegments()).andReturn(publishedSegments.iterator()).once();
EasyMock.replay(client, request, responseHolder, responseHandler, metadataView);
DataContext dataContext = new DataContext()
{
@Override
public SchemaPlus getRootSchema()
{
return null;
}
@Override
public JavaTypeFactory getTypeFactory()
{
return null;
}
@Override
public QueryProvider getQueryProvider()
{
return null;
}
@Override
public Object get(String name)
{
return CalciteTests.SUPER_USER_AUTH_RESULT;
}
};
DataContext dataContext = createDataContext(Users.SUPER);
final List<Object[]> rows = segmentsTable.scan(dataContext).toList();
rows.sort((Object[] row1, Object[] row2) -> ((Comparable) row1[0]).compareTo(row2[0]));
@ -831,33 +801,7 @@ public class SystemSchemaTest extends CalciteTestBase
indexerNodeDiscovery
);
DataContext dataContext = new DataContext()
{
@Override
public SchemaPlus getRootSchema()
{
return null;
}
@Override
public JavaTypeFactory getTypeFactory()
{
return null;
}
@Override
public QueryProvider getQueryProvider()
{
return null;
}
@Override
public Object get(String name)
{
return CalciteTests.SUPER_USER_AUTH_RESULT;
}
};
DataContext dataContext = createDataContext(Users.SUPER);
final List<Object[]> rows = serversTable.scan(dataContext).toList();
rows.sort((Object[] row1, Object[] row2) -> ((Comparable) row1[0]).compareTo(row2[0]));
@ -1112,32 +1056,7 @@ public class SystemSchemaTest extends CalciteTestBase
.andReturn(immutableDruidServers)
.once();
EasyMock.replay(serverView);
DataContext dataContext = new DataContext()
{
@Override
public SchemaPlus getRootSchema()
{
return null;
}
@Override
public JavaTypeFactory getTypeFactory()
{
return null;
}
@Override
public QueryProvider getQueryProvider()
{
return null;
}
@Override
public Object get(String name)
{
return CalciteTests.SUPER_USER_AUTH_RESULT;
}
};
DataContext dataContext = createDataContext(Users.SUPER);
//server_segments table is the join of servers and segments table
// it will have 5 rows as follows
@ -1230,32 +1149,7 @@ public class SystemSchemaTest extends CalciteTestBase
responseHolder.done();
EasyMock.replay(client, request, responseHandler);
DataContext dataContext = new DataContext()
{
@Override
public SchemaPlus getRootSchema()
{
return null;
}
@Override
public JavaTypeFactory getTypeFactory()
{
return null;
}
@Override
public QueryProvider getQueryProvider()
{
return null;
}
@Override
public Object get(String name)
{
return CalciteTests.SUPER_USER_AUTH_RESULT;
}
};
DataContext dataContext = createDataContext(Users.SUPER);
final List<Object[]> rows = tasksTable.scan(dataContext).toList();
Object[] row0 = rows.get(0);
@ -1294,6 +1188,81 @@ public class SystemSchemaTest extends CalciteTestBase
verifyTypes(rows, SystemSchema.TASKS_SIGNATURE);
}
@Test
public void testTasksTableAuth() throws Exception
{
SystemSchema.TasksTable tasksTable = new SystemSchema.TasksTable(client, mapper, authMapper);
EasyMock.expect(client.makeRequest(HttpMethod.GET, "/druid/indexer/v1/tasks"))
.andReturn(request)
.anyTimes();
String json = "[{\n"
+ "\t\"id\": \"index_wikipedia_2018-09-20T22:33:44.911Z\",\n"
+ "\t\"groupId\": \"group_index_wikipedia_2018-09-20T22:33:44.911Z\",\n"
+ "\t\"type\": \"index\",\n"
+ "\t\"createdTime\": \"2018-09-20T22:33:44.922Z\",\n"
+ "\t\"queueInsertionTime\": \"1970-01-01T00:00:00.000Z\",\n"
+ "\t\"statusCode\": \"FAILED\",\n"
+ "\t\"runnerStatusCode\": \"NONE\",\n"
+ "\t\"duration\": -1,\n"
+ "\t\"location\": {\n"
+ "\t\t\"host\": \"testHost\",\n"
+ "\t\t\"port\": 1234,\n"
+ "\t\t\"tlsPort\": -1\n"
+ "\t},\n"
+ "\t\"dataSource\": \"wikipedia\",\n"
+ "\t\"errorMsg\": null\n"
+ "}, {\n"
+ "\t\"id\": \"index_wikipedia_2018-09-21T18:38:47.773Z\",\n"
+ "\t\"groupId\": \"group_index_wikipedia_2018-09-21T18:38:47.773Z\",\n"
+ "\t\"type\": \"index\",\n"
+ "\t\"createdTime\": \"2018-09-21T18:38:47.873Z\",\n"
+ "\t\"queueInsertionTime\": \"2018-09-21T18:38:47.910Z\",\n"
+ "\t\"statusCode\": \"RUNNING\",\n"
+ "\t\"runnerStatusCode\": \"RUNNING\",\n"
+ "\t\"duration\": null,\n"
+ "\t\"location\": {\n"
+ "\t\t\"host\": \"192.168.1.6\",\n"
+ "\t\t\"port\": 8100,\n"
+ "\t\t\"tlsPort\": -1\n"
+ "\t},\n"
+ "\t\"dataSource\": \"wikipedia\",\n"
+ "\t\"errorMsg\": null\n"
+ "}]";
HttpResponse httpResp = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
EasyMock.expect(client.go(EasyMock.eq(request), EasyMock.anyObject()))
.andReturn(createFullResponseHolder(httpResp, json))
.andReturn(createFullResponseHolder(httpResp, json))
.andReturn(createFullResponseHolder(httpResp, json));
EasyMock.expect(request.getUrl())
.andReturn(new URL("http://test-host:1234/druid/indexer/v1/tasks"))
.anyTimes();
EasyMock.replay(client, request, responseHandler);
// Verify that no row is returned for Datasource Read user
List<Object[]> rows = tasksTable
.scan(createDataContext(Users.DATASOURCE_READ))
.toList();
Assert.assertTrue(rows.isEmpty());
// Verify that 2 rows are is returned for Datasource Write user
rows = tasksTable
.scan(createDataContext(Users.DATASOURCE_WRITE))
.toList();
Assert.assertEquals(2, rows.size());
// Verify that 2 rows are returned for Super user
rows = tasksTable
.scan(createDataContext(Users.SUPER))
.toList();
Assert.assertEquals(2, rows.size());
}
@Test
public void testSupervisorTable() throws Exception
{
@ -1337,7 +1306,111 @@ public class SystemSchemaTest extends CalciteTestBase
responseHolder.done();
EasyMock.replay(client, request, responseHandler);
DataContext dataContext = new DataContext()
DataContext dataContext = createDataContext(Users.SUPER);
final List<Object[]> rows = supervisorTable.scan(dataContext).toList();
Object[] row0 = rows.get(0);
Assert.assertEquals("wikipedia", row0[0].toString());
Assert.assertEquals("UNHEALTHY_SUPERVISOR", row0[1].toString());
Assert.assertEquals("UNABLE_TO_CONNECT_TO_STREAM", row0[2].toString());
Assert.assertEquals(0L, row0[3]);
Assert.assertEquals("kafka", row0[4].toString());
Assert.assertEquals("wikipedia", row0[5].toString());
Assert.assertEquals(0L, row0[6]);
Assert.assertEquals(
"{\"type\":\"kafka\",\"dataSchema\":{\"dataSource\":\"wikipedia\"},\"context\":null,\"suspended\":false}",
row0[7].toString()
);
// Verify value types.
verifyTypes(rows, SystemSchema.SUPERVISOR_SIGNATURE);
}
@Test
public void testSupervisorTableAuth() throws Exception
{
SystemSchema.SupervisorsTable supervisorTable =
new SystemSchema.SupervisorsTable(client, mapper, createAuthMapper());
EasyMock.expect(client.makeRequest(HttpMethod.GET, "/druid/indexer/v1/supervisor?system"))
.andReturn(request)
.anyTimes();
final String json = "[{\n"
+ "\t\"id\": \"wikipedia\",\n"
+ "\t\"state\": \"UNHEALTHY_SUPERVISOR\",\n"
+ "\t\"detailedState\": \"UNABLE_TO_CONNECT_TO_STREAM\",\n"
+ "\t\"healthy\": false,\n"
+ "\t\"specString\": \"{\\\"type\\\":\\\"kafka\\\",\\\"dataSchema\\\":{\\\"dataSource\\\":\\\"wikipedia\\\"}"
+ ",\\\"context\\\":null,\\\"suspended\\\":false}\",\n"
+ "\t\"type\": \"kafka\",\n"
+ "\t\"source\": \"wikipedia\",\n"
+ "\t\"suspended\": false\n"
+ "}]";
HttpResponse httpResponse = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
EasyMock.expect(client.go(EasyMock.eq(request), EasyMock.anyObject()))
.andReturn(createFullResponseHolder(httpResponse, json))
.andReturn(createFullResponseHolder(httpResponse, json))
.andReturn(createFullResponseHolder(httpResponse, json));
EasyMock.expect(responseHandler.getStatus())
.andReturn(httpResponse.getStatus().getCode())
.anyTimes();
EasyMock.expect(request.getUrl())
.andReturn(new URL("http://test-host:1234/druid/indexer/v1/supervisor?system"))
.anyTimes();
EasyMock.replay(client, request, responseHandler);
// Verify that no row is returned for Datasource Read user
List<Object[]> rows = supervisorTable
.scan(createDataContext(Users.DATASOURCE_READ))
.toList();
Assert.assertTrue(rows.isEmpty());
// Verify that 1 row is returned for Datasource Write user
rows = supervisorTable
.scan(createDataContext(Users.DATASOURCE_WRITE))
.toList();
Assert.assertEquals(1, rows.size());
// Verify that 1 row is returned for Super user
rows = supervisorTable
.scan(createDataContext(Users.SUPER))
.toList();
Assert.assertEquals(1, rows.size());
// TODO: If needed, verify the first row here
// TODO: Verify value types.
// verifyTypes(rows, SystemSchema.SUPERVISOR_SIGNATURE);
}
/**
* Creates a response holder that contains the given json.
*/
private InputStreamFullResponseHolder createFullResponseHolder(
HttpResponse httpResponse,
String json
)
{
InputStreamFullResponseHolder responseHolder =
new InputStreamFullResponseHolder(httpResponse.getStatus(), httpResponse);
byte[] bytesToWrite = json.getBytes(StandardCharsets.UTF_8);
responseHolder.addChunk(bytesToWrite);
responseHolder.done();
return responseHolder;
}
/**
* Creates a DataContext for the given username.
*/
private DataContext createDataContext(String username)
{
return new DataContext()
{
@Override
public SchemaPlus getRootSchema()
@ -1358,28 +1431,40 @@ public class SystemSchemaTest extends CalciteTestBase
}
@Override
public Object get(String name)
public Object get(String authorizerName)
{
return CalciteTests.SUPER_USER_AUTH_RESULT;
return CalciteTests.TEST_SUPERUSER_NAME.equals(username)
? CalciteTests.SUPER_USER_AUTH_RESULT
: new AuthenticationResult(username, authorizerName, null, null);
}
};
final List<Object[]> rows = supervisorTable.scan(dataContext).toList();
}
Object[] row0 = rows.get(0);
Assert.assertEquals("wikipedia", row0[0].toString());
Assert.assertEquals("UNHEALTHY_SUPERVISOR", row0[1].toString());
Assert.assertEquals("UNABLE_TO_CONNECT_TO_STREAM", row0[2].toString());
Assert.assertEquals(0L, row0[3]);
Assert.assertEquals("kafka", row0[4].toString());
Assert.assertEquals("wikipedia", row0[5].toString());
Assert.assertEquals(0L, row0[6]);
Assert.assertEquals(
"{\"type\":\"kafka\",\"dataSchema\":{\"dataSource\":\"wikipedia\"},\"context\":null,\"suspended\":false}",
row0[7].toString()
private AuthorizerMapper createAuthMapper()
{
return new AuthorizerMapper(null)
{
@Override
public Authorizer getAuthorizer(String name)
{
return (authenticationResult, resource, action) -> {
final String username = authenticationResult.getIdentity();
// Allow access to a Datasource if
// - any user requests Read access
// - Super User or Datasource Write User requests Write access
if (resource.getType().equals(ResourceType.DATASOURCE)) {
return new Access(
action == Action.READ
|| username.equals(Users.SUPER)
|| username.equals(Users.DATASOURCE_WRITE)
);
}
// Verify value types.
verifyTypes(rows, SystemSchema.SUPERVISOR_SIGNATURE);
return new Access(true);
};
}
};
}
private static void verifyTypes(final List<Object[]> rows, final RowSignature signature)
@ -1443,4 +1528,14 @@ public class SystemSchemaTest extends CalciteTestBase
}
}
}
/**
* Usernames to be used in tests.
*/
private static class Users
{
private static final String SUPER = CalciteTests.TEST_SUPERUSER_NAME;
private static final String DATASOURCE_READ = "datasourceRead";
private static final String DATASOURCE_WRITE = "datasourceWrite";
}
}