Add identity to query metrics, logs. (#4862)

* Add identity to query metrics, logs.

Also fix a bug where unauthorized requests would not emit any logs or metrics,
and instead would log a "Tried to emit logs and metrics twice" warning.

Also rename QueryResource's "getServer" to "cancelQuery", because that's what
it does.

* Do not emit identity by default.
This commit is contained in:
Gian Merlino 2017-09-28 11:45:23 -07:00 committed by GitHub
parent fbd4cd633b
commit a19f22b5bb
9 changed files with 245 additions and 76 deletions

View File

@ -183,6 +183,12 @@ public class DefaultQueryMetrics<QueryType extends Query<?>> implements QueryMet
// Emit nothing by default.
}
@Override
public void identity(String identity)
{
// Emit nothing by default.
}
@Override
public BitmapResultFactory<?> makeBitmapResultFactory(BitmapFactory factory)
{

View File

@ -207,6 +207,11 @@ public interface QueryMetrics<QueryType extends Query<?>>
void postFilters(List<Filter> postFilters);
/**
* Sets identity of the requester for a query. See {@code AuthenticationResult}.
*/
void identity(String identity);
/**
* Creates a {@link BitmapResultFactory} which may record some information along bitmap construction from {@link
* #preFilters(List)}. The returned BitmapResultFactory may add some dimensions to this QueryMetrics from it's {@link

View File

@ -42,8 +42,8 @@ import io.druid.server.initialization.ServerConfig;
import io.druid.server.log.RequestLogger;
import io.druid.server.security.Access;
import io.druid.server.security.AuthenticationResult;
import io.druid.server.security.AuthorizerMapper;
import io.druid.server.security.AuthorizationUtils;
import io.druid.server.security.AuthorizerMapper;
import javax.annotation.Nullable;
import javax.servlet.http.HttpServletRequest;
@ -80,6 +80,7 @@ public class QueryLifecycle
private final long startNs;
private State state = State.NEW;
private AuthenticationResult authenticationResult;
private QueryToolChest toolChest;
private QueryPlus queryPlus;
@ -111,9 +112,9 @@ public class QueryLifecycle
* is unauthorized, an IllegalStateException will be thrown. Logs and metrics are emitted when the Sequence is
* either fully iterated or throws an exception.
*
* @param query the query
* @param authenticationResult authentication result indicating identity of the requester
* @param remoteAddress remote address, for logging; or null if unknown
* @param query the query
* @param authenticationResult authentication result indicating identity of the requester
* @param remoteAddress remote address, for logging; or null if unknown
*
* @return results
*/
@ -185,63 +186,65 @@ public class QueryLifecycle
* @param authenticationResult authentication result indicating the identity of the requester
*
* @return authorization result
*
* */
public Access authorize(
final AuthenticationResult authenticationResult
)
*/
public Access authorize(final AuthenticationResult authenticationResult)
{
transition(State.INITIALIZED, State.AUTHORIZING);
Access authResult = AuthorizationUtils.authorizeAllResourceActions(
return doAuthorize(
authenticationResult,
Iterables.transform(
queryPlus.getQuery().getDataSource().getNames(),
AuthorizationUtils.DATASOURCE_READ_RA_GENERATOR
),
authorizerMapper
AuthorizationUtils.authorizeAllResourceActions(
authenticationResult,
Iterables.transform(
queryPlus.getQuery().getDataSource().getNames(),
AuthorizationUtils.DATASOURCE_READ_RA_GENERATOR
),
authorizerMapper
)
);
if (!authResult.isAllowed()) {
// Not authorized; go straight to Jail, do not pass Go.
transition(State.AUTHORIZING, State.DONE);
} else {
transition(State.AUTHORIZING, State.AUTHORIZED);
}
return authResult;
}
/**
* Authorize the query. Will return an Access object denoting whether the query is authorized or not.
*
* @param token authentication token from the request
* @param namespace namespace of the authentication token
* @param req HTTP request object of the request. If provided, the auth-related fields in the HTTP request
* will be automatically set.
*
* @return authorization result
*
* */
public Access authorize(
@Nullable HttpServletRequest req
)
*/
public Access authorize(HttpServletRequest req)
{
transition(State.INITIALIZED, State.AUTHORIZING);
Access authResult = AuthorizationUtils.authorizeAllResourceActions(
req,
Iterables.transform(
queryPlus.getQuery().getDataSource().getNames(),
AuthorizationUtils.DATASOURCE_READ_RA_GENERATOR
),
authorizerMapper
return doAuthorize(
AuthorizationUtils.authenticationResultFromRequest(req),
AuthorizationUtils.authorizeAllResourceActions(
req,
Iterables.transform(
queryPlus.getQuery().getDataSource().getNames(),
AuthorizationUtils.DATASOURCE_READ_RA_GENERATOR
),
authorizerMapper
)
);
}
if (!authResult.isAllowed()) {
private Access doAuthorize(final AuthenticationResult authenticationResult, final Access authorizationResult)
{
if (!authorizationResult.isAllowed()) {
// Not authorized; go straight to Jail, do not pass Go.
transition(State.AUTHORIZING, State.DONE);
transition(State.AUTHORIZING, State.UNAUTHORIZED);
} else {
transition(State.AUTHORIZING, State.AUTHORIZED);
}
return authResult;
this.authenticationResult = authenticationResult;
final QueryMetrics queryMetrics = queryPlus.getQueryMetrics();
if (queryMetrics != null) {
queryMetrics.identity(authenticationResult.getIdentity());
}
return authorizationResult;
}
/**
@ -314,6 +317,9 @@ public class QueryLifecycle
statsMap.put("query/time", TimeUnit.NANOSECONDS.toMillis(queryTimeNs));
statsMap.put("query/bytes", bytesWritten);
statsMap.put("success", success);
if (authenticationResult != null) {
statsMap.put("identity", authenticationResult.getIdentity());
}
if (e != null) {
statsMap.put("exception", e.toString());
@ -360,6 +366,7 @@ public class QueryLifecycle
AUTHORIZING,
AUTHORIZED,
EXECUTING,
UNAUTHORIZED,
DONE
}

View File

@ -125,7 +125,7 @@ public class QueryResource implements QueryCountStatsProvider
@DELETE
@Path("{id}")
@Produces(MediaType.APPLICATION_JSON)
public Response getServer(@PathParam("id") String queryId, @Context final HttpServletRequest req)
public Response cancelQuery(@PathParam("id") String queryId, @Context final HttpServletRequest req)
{
if (log.isDebugEnabled()) {
log.debug("Received cancel request for query [%s]", queryId);

View File

@ -22,6 +22,7 @@ package io.druid.server;
import com.fasterxml.jackson.annotation.JsonValue;
import java.util.Map;
import java.util.Objects;
/**
*/
@ -39,4 +40,29 @@ public class QueryStats
{
return stats;
}
@Override
public boolean equals(final Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final QueryStats that = (QueryStats) o;
return Objects.equals(stats, that.stats);
}
@Override
public int hashCode()
{
return Objects.hash(stats);
}
@Override
public String toString()
{
return String.valueOf(stats);
}
}

View File

@ -27,6 +27,7 @@ import io.druid.query.Query;
import org.joda.time.DateTime;
import java.util.Arrays;
import java.util.Objects;
public class RequestLogLine
{
@ -80,4 +81,37 @@ public class RequestLogLine
{
return queryStats;
}
@Override
public boolean equals(final Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final RequestLogLine that = (RequestLogLine) o;
return Objects.equals(timestamp, that.timestamp) &&
Objects.equals(remoteAddr, that.remoteAddr) &&
Objects.equals(query, that.query) &&
Objects.equals(queryStats, that.queryStats);
}
@Override
public int hashCode()
{
return Objects.hash(timestamp, remoteAddr, query, queryStats);
}
@Override
public String toString()
{
return "RequestLogLine{" +
"timestamp=" + timestamp +
", remoteAddr='" + remoteAddr + '\'' +
", query=" + query +
", queryStats=" + queryStats +
'}';
}
}

View File

@ -44,9 +44,10 @@ public class AuthorizationUtils
*
* If this attribute is already set when this function is called, an exception is thrown.
*
* @param request HTTP request to be authorized
* @param resourceAction A resource identifier and the action to be taken the resource.
* @param request HTTP request to be authorized
* @param resourceAction A resource identifier and the action to be taken the resource.
* @param authorizerMapper The singleton AuthorizerMapper instance
*
* @return ACCESS_OK or the failed Access object returned by the Authorizer that checked the request.
*/
public static Access authorizeResourceAction(
@ -62,6 +63,28 @@ public class AuthorizationUtils
);
}
/**
* Returns the authentication information for a request.
*
* @param request http request
*
* @return authentication result
*
* @throws IllegalStateException if the request was not authenticated
*/
public static AuthenticationResult authenticationResultFromRequest(final HttpServletRequest request)
{
final AuthenticationResult authenticationResult = (AuthenticationResult) request.getAttribute(
AuthConfig.DRUID_AUTHENTICATION_RESULT
);
if (authenticationResult == null) {
throw new ISE("Null authentication result");
}
return authenticationResult;
}
/**
* Check a list of resource-actions to be performed by the identity represented by authenticationResult.
*
@ -71,7 +94,8 @@ public class AuthorizationUtils
* Otherwise, return ACCESS_OK if all resource-actions were successfully authorized.
*
* @param authenticationResult Authentication result representing identity of requester
* @param resourceActions An Iterable of resource-actions to authorize
* @param resourceActions An Iterable of resource-actions to authorize
*
* @return ACCESS_OK or the Access object from the first failed check
*/
public static Access authorizeAllResourceActions(
@ -119,8 +143,9 @@ public class AuthorizationUtils
*
* If this attribute is already set when this function is called, an exception is thrown.
*
* @param request HTTP request to be authorized
* @param request HTTP request to be authorized
* @param resourceActions An Iterable of resource-actions to authorize
*
* @return ACCESS_OK or the Access object from the first failed check
*/
public static Access authorizeAllResourceActions(
@ -133,15 +158,8 @@ public class AuthorizationUtils
throw new ISE("Request already had authorization check.");
}
final AuthenticationResult authenticationResult = (AuthenticationResult) request.getAttribute(
AuthConfig.DRUID_AUTHENTICATION_RESULT
);
if (authenticationResult == null) {
throw new ISE("Null authentication result");
}
Access access = authorizeAllResourceActions(
authenticationResult,
authenticationResultFromRequest(request),
resourceActions,
authorizerMapper
);
@ -168,12 +186,12 @@ public class AuthorizationUtils
*
* If this attribute is already set when this function is called, an exception is thrown.
*
* @param request HTTP request to be authorized
* @param resources resources to be processed into resource-actions
* @param request HTTP request to be authorized
* @param resources resources to be processed into resource-actions
* @param resourceActionGenerator Function that creates an iterable of resource-actions from a resource
* @param authorizerMapper authorizer mapper
* @return Iterable containing resources that were authorized
* @param authorizerMapper authorizer mapper
*
* @return Iterable containing resources that were authorized
*/
public static <ResType> Iterable<ResType> filterAuthorizedResources(
final HttpServletRequest request,
@ -186,12 +204,7 @@ public class AuthorizationUtils
throw new ISE("Request already had authorization check.");
}
final AuthenticationResult authenticationResult = (AuthenticationResult) request.getAttribute(
AuthConfig.DRUID_AUTHENTICATION_RESULT
);
if (authenticationResult == null) {
throw new ISE("Null authentication result");
}
final AuthenticationResult authenticationResult = authenticationResultFromRequest(request);
final Authorizer authorizer = authorizerMapper.getAuthorizer(authenticationResult.getAuthorizerName());
if (authorizer == null) {
@ -231,7 +244,6 @@ public class AuthorizationUtils
}
/**
* Function for the common pattern of generating a resource-action for reading from a datasource, using the
* datasource name.

View File

@ -19,6 +19,7 @@
package io.druid.server;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
@ -38,9 +39,11 @@ import io.druid.query.QueryRunner;
import io.druid.query.QuerySegmentWalker;
import io.druid.query.QueryToolChest;
import io.druid.query.QueryToolChestWarehouse;
import io.druid.query.Result;
import io.druid.query.SegmentDescriptor;
import io.druid.query.timeboundary.TimeBoundaryResultValue;
import io.druid.server.initialization.ServerConfig;
import io.druid.server.log.NoopRequestLogger;
import io.druid.server.log.TestRequestLogger;
import io.druid.server.metrics.NoopServiceEmitter;
import io.druid.server.security.Access;
import io.druid.server.security.Action;
@ -63,8 +66,11 @@ import org.junit.Test;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.StreamingOutput;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
@ -125,6 +131,7 @@ public class QueryResourceTest
private QueryResource queryResource;
private QueryManager queryManager;
private TestRequestLogger testRequestLogger;
@BeforeClass
public static void staticSetup()
@ -139,13 +146,14 @@ public class QueryResourceTest
EasyMock.expect(testServletRequest.getHeader(QueryResource.HEADER_IF_NONE_MATCH)).andReturn(null).anyTimes();
EasyMock.expect(testServletRequest.getRemoteAddr()).andReturn("localhost").anyTimes();
queryManager = new QueryManager();
testRequestLogger = new TestRequestLogger();
queryResource = new QueryResource(
new QueryLifecycleFactory(
warehouse,
testSegmentWalker,
new DefaultGenericQueryMetricsFactory(jsonMapper),
new NoopServiceEmitter(),
new NoopRequestLogger(),
testRequestLogger,
serverConfig,
new AuthConfig(),
AuthTestUtils.TEST_AUTHORIZER_MAPPER
@ -229,7 +237,8 @@ public class QueryResourceTest
EasyMock.replay(testServletRequest);
AuthorizerMapper authMapper = new AuthorizerMapper(null) {
AuthorizerMapper authMapper = new AuthorizerMapper(null)
{
@Override
public Authorizer getAuthorizer(String name)
{
@ -255,7 +264,7 @@ public class QueryResourceTest
testSegmentWalker,
new DefaultGenericQueryMetricsFactory(jsonMapper),
new NoopServiceEmitter(),
new NoopRequestLogger(),
testRequestLogger,
serverConfig,
new AuthConfig(null, null, null),
authMapper
@ -286,12 +295,22 @@ public class QueryResourceTest
testServletRequest
);
Assert.assertEquals(Response.Status.OK.getStatusCode(), response.getStatus());
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
((StreamingOutput) response.getEntity()).write(baos);
final List<Result<TimeBoundaryResultValue>> responses = jsonMapper.readValue(
baos.toByteArray(),
new TypeReference<List<Result<TimeBoundaryResultValue>>>() {}
);
Assert.assertEquals(Response.Status.OK.getStatusCode(), response.getStatus());
Assert.assertEquals(0, responses.size());
Assert.assertEquals(1, testRequestLogger.getLogs().size());
Assert.assertEquals(true, testRequestLogger.getLogs().get(0).getQueryStats().getStats().get("success"));
Assert.assertEquals("druid", testRequestLogger.getLogs().get(0).getQueryStats().getStats().get("identity"));
}
@Test(timeout = 60_000L)
public void testSecuredGetServer() throws Exception
public void testSecuredCancelQuery() throws Exception
{
final CountDownLatch waitForCancellationLatch = new CountDownLatch(1);
final CountDownLatch waitFinishLatch = new CountDownLatch(2);
@ -311,7 +330,8 @@ public class QueryResourceTest
EasyMock.replay(testServletRequest);
AuthorizerMapper authMapper = new AuthorizerMapper(null) {
AuthorizerMapper authMapper = new AuthorizerMapper(null)
{
@Override
public Authorizer getAuthorizer(String name)
{
@ -352,7 +372,7 @@ public class QueryResourceTest
testSegmentWalker,
new DefaultGenericQueryMetricsFactory(jsonMapper),
new NoopServiceEmitter(),
new NoopRequestLogger(),
testRequestLogger,
serverConfig,
new AuthConfig(null, null, null),
authMapper
@ -404,7 +424,7 @@ public class QueryResourceTest
@Override
public void run()
{
Response response = queryResource.getServer("id_1", testServletRequest);
Response response = queryResource.cancelQuery("id_1", testServletRequest);
Assert.assertEquals(Response.Status.ACCEPTED.getStatusCode(), response.getStatus());
waitForCancellationLatch.countDown();
waitFinishLatch.countDown();
@ -416,7 +436,7 @@ public class QueryResourceTest
}
@Test(timeout = 60_000L)
public void testDenySecuredGetServer() throws Exception
public void testDenySecuredCancelQuery() throws Exception
{
final CountDownLatch waitForCancellationLatch = new CountDownLatch(1);
final CountDownLatch waitFinishLatch = new CountDownLatch(2);
@ -474,7 +494,7 @@ public class QueryResourceTest
testSegmentWalker,
new DefaultGenericQueryMetricsFactory(jsonMapper),
new NoopServiceEmitter(),
new NoopRequestLogger(),
testRequestLogger,
serverConfig,
new AuthConfig(null, null, null),
authMapper
@ -527,7 +547,7 @@ public class QueryResourceTest
public void run()
{
try {
queryResource.getServer("id_1", testServletRequest);
queryResource.cancelQuery("id_1", testServletRequest);
}
catch (ForbiddenException e) {
waitForCancellationLatch.countDown();

View File

@ -0,0 +1,59 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.server.log;
import com.google.common.collect.ImmutableList;
import io.druid.server.RequestLogLine;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
public class TestRequestLogger implements RequestLogger
{
private final List<RequestLogLine> logs;
public TestRequestLogger()
{
this.logs = new ArrayList<>();
}
@Override
public void log(final RequestLogLine requestLogLine) throws IOException
{
synchronized (logs) {
logs.add(requestLogLine);
}
}
public List<RequestLogLine> getLogs()
{
synchronized (logs) {
return ImmutableList.copyOf(logs);
}
}
public void clear()
{
synchronized (logs) {
logs.clear();
}
}
}