HADOOP-18183. s3a audit logs to publish range start/end of GET requests. (#5110)
The start and end of the range is set in a new audit param "rg", e.g "?rg=100-200" Contributed by Ankit Saurabh
This commit is contained in:
parent
85ec7969a7
commit
1cecf8ab70
|
@ -90,6 +90,11 @@ public final class AuditConstants {
|
|||
*/
|
||||
public static final String PARAM_PROCESS = "ps";
|
||||
|
||||
/**
|
||||
* Header: Range for GET request data: {@value}.
|
||||
*/
|
||||
public static final String PARAM_RANGE = "rg";
|
||||
|
||||
/**
|
||||
* Task Attempt ID query header: {@value}.
|
||||
*/
|
||||
|
|
|
@ -25,6 +25,7 @@ import java.util.HashMap;
|
|||
import java.util.Map;
|
||||
|
||||
import com.amazonaws.AmazonWebServiceRequest;
|
||||
import com.amazonaws.services.s3.model.GetObjectRequest;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
@ -35,6 +36,7 @@ import org.apache.hadoop.fs.audit.CommonAuditContext;
|
|||
import org.apache.hadoop.fs.s3a.audit.AWSRequestAnalyzer;
|
||||
import org.apache.hadoop.fs.s3a.audit.AuditFailureException;
|
||||
import org.apache.hadoop.fs.s3a.audit.AuditSpanS3A;
|
||||
import org.apache.hadoop.fs.store.LogExactlyOnce;
|
||||
import org.apache.hadoop.fs.store.audit.HttpReferrerAuditHeader;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
|
||||
|
@ -110,6 +112,14 @@ public class LoggingAuditor
|
|||
*/
|
||||
private Collection<String> filters;
|
||||
|
||||
/**
|
||||
* Log for warning of problems getting the range of GetObjectRequest
|
||||
* will only log of a problem once per process instance.
|
||||
* This is to avoid logs being flooded with errors.
|
||||
*/
|
||||
private static final LogExactlyOnce WARN_INCORRECT_RANGE =
|
||||
new LogExactlyOnce(LOG);
|
||||
|
||||
/**
|
||||
* Create the auditor.
|
||||
* The UGI current user is used to provide the principal;
|
||||
|
@ -230,6 +240,26 @@ public class LoggingAuditor
|
|||
|
||||
private final HttpReferrerAuditHeader referrer;
|
||||
|
||||
/**
|
||||
* Attach Range of data for GetObject Request.
|
||||
* @param request given get object request
|
||||
*/
|
||||
private void attachRangeFromRequest(AmazonWebServiceRequest request) {
|
||||
if (request instanceof GetObjectRequest) {
|
||||
long[] rangeValue = ((GetObjectRequest) request).getRange();
|
||||
if (rangeValue == null || rangeValue.length == 0) {
|
||||
return;
|
||||
}
|
||||
if (rangeValue.length != 2) {
|
||||
WARN_INCORRECT_RANGE.warn("Expected range to contain 0 or 2 elements."
|
||||
+ " Got {} elements. Ignoring.", rangeValue.length);
|
||||
return;
|
||||
}
|
||||
String combinedRangeValue = String.format("%d-%d", rangeValue[0], rangeValue[1]);
|
||||
referrer.set(AuditConstants.PARAM_RANGE, combinedRangeValue);
|
||||
}
|
||||
}
|
||||
|
||||
private final String description;
|
||||
|
||||
private LoggingAuditSpan(
|
||||
|
@ -314,6 +344,8 @@ public class LoggingAuditor
|
|||
@Override
|
||||
public <T extends AmazonWebServiceRequest> T beforeExecution(
|
||||
final T request) {
|
||||
// attach range for GetObject requests
|
||||
attachRangeFromRequest(request);
|
||||
// build the referrer header
|
||||
final String header = referrer.buildHttpReferrer();
|
||||
// update the outer class's field.
|
||||
|
|
|
@ -232,6 +232,7 @@ If any of the field values were `null`, the field is omitted.
|
|||
| `p2` | Path 2 of operation | `s3a://alice-london/path2` |
|
||||
| `pr` | Principal | `alice` |
|
||||
| `ps` | Unique process UUID | `235865a0-d399-4696-9978-64568db1b51c` |
|
||||
| `rg` | GET request range | `100-200` |
|
||||
| `ta` | Task Attempt ID (S3A committer) | |
|
||||
| `t0` | Thread 0: thread span was created in | `100` |
|
||||
| `t1` | Thread 1: thread this operation was executed in | `200` |
|
||||
|
|
|
@ -20,8 +20,10 @@ package org.apache.hadoop.fs.s3a.audit;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.util.Map;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
import com.amazonaws.services.s3.model.GetObjectMetadataRequest;
|
||||
import com.amazonaws.services.s3.model.GetObjectRequest;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.slf4j.Logger;
|
||||
|
@ -138,6 +140,17 @@ public abstract class AbstractAuditingTest extends AbstractHadoopTestBase {
|
|||
requestFactory.newGetObjectMetadataRequest("/"));
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a GetObject request and modify it before passing it through auditor.
|
||||
* @param modifyRequest Consumer Interface for changing the request before passing to the auditor
|
||||
* @return the request
|
||||
*/
|
||||
protected GetObjectRequest get(Consumer<GetObjectRequest> modifyRequest) {
|
||||
GetObjectRequest req = requestFactory.newGetObjectRequest("/");
|
||||
modifyRequest.accept(req);
|
||||
return manager.beforeExecution(req);
|
||||
}
|
||||
|
||||
/**
|
||||
* Assert a head request fails as there is no
|
||||
* active span.
|
||||
|
@ -210,4 +223,15 @@ public abstract class AbstractAuditingTest extends AbstractHadoopTestBase {
|
|||
.isEqualTo(expected);
|
||||
}
|
||||
|
||||
/**
|
||||
* Assert the map does not contain the key, i.e, it is null.
|
||||
* @param params map of params
|
||||
* @param key key
|
||||
*/
|
||||
protected void assertMapNotContains(final Map<String, String> params, final String key) {
|
||||
assertThat(params.get(key))
|
||||
.describedAs(key)
|
||||
.isNull();
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -23,6 +23,7 @@ import java.util.Map;
|
|||
import java.util.regex.Matcher;
|
||||
|
||||
import com.amazonaws.services.s3.model.GetObjectMetadataRequest;
|
||||
import com.amazonaws.services.s3.model.GetObjectRequest;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.slf4j.Logger;
|
||||
|
@ -46,6 +47,7 @@ import static org.apache.hadoop.fs.audit.AuditConstants.PARAM_OP;
|
|||
import static org.apache.hadoop.fs.audit.AuditConstants.PARAM_PATH;
|
||||
import static org.apache.hadoop.fs.audit.AuditConstants.PARAM_PATH2;
|
||||
import static org.apache.hadoop.fs.audit.AuditConstants.PARAM_PRINCIPAL;
|
||||
import static org.apache.hadoop.fs.audit.AuditConstants.PARAM_RANGE;
|
||||
import static org.apache.hadoop.fs.audit.AuditConstants.PARAM_THREAD0;
|
||||
import static org.apache.hadoop.fs.audit.AuditConstants.PARAM_THREAD1;
|
||||
import static org.apache.hadoop.fs.audit.AuditConstants.PARAM_TIMESTAMP;
|
||||
|
@ -115,6 +117,7 @@ public class TestHttpReferrerAuditHeader extends AbstractAuditingTest {
|
|||
assertThat(span.getTimestamp())
|
||||
.describedAs("Timestamp of " + span)
|
||||
.isEqualTo(ts);
|
||||
assertMapNotContains(params, PARAM_RANGE);
|
||||
|
||||
assertMapContains(params, PARAM_TIMESTAMP,
|
||||
Long.toString(ts));
|
||||
|
@ -309,6 +312,44 @@ public class TestHttpReferrerAuditHeader extends AbstractAuditingTest {
|
|||
expectStrippedField("\"\"\"b\"", "b");
|
||||
}
|
||||
|
||||
/**
|
||||
* Verify that correct range is getting published in header.
|
||||
*/
|
||||
@Test
|
||||
public void testGetObjectRange() throws Throwable {
|
||||
AuditSpan span = span();
|
||||
GetObjectRequest request = get(getObjectRequest -> getObjectRequest.setRange(100, 200));
|
||||
Map<String, String> headers
|
||||
= request.getCustomRequestHeaders();
|
||||
assertThat(headers)
|
||||
.describedAs("Custom headers")
|
||||
.containsKey(HEADER_REFERRER);
|
||||
String header = headers.get(HEADER_REFERRER);
|
||||
LOG.info("Header is {}", header);
|
||||
Map<String, String> params
|
||||
= HttpReferrerAuditHeader.extractQueryParameters(header);
|
||||
assertMapContains(params, PARAM_RANGE, "100-200");
|
||||
}
|
||||
|
||||
/**
|
||||
* Verify that no range is getting added to the header in request without range.
|
||||
*/
|
||||
@Test
|
||||
public void testGetObjectWithoutRange() throws Throwable {
|
||||
AuditSpan span = span();
|
||||
GetObjectRequest request = get(getObjectRequest -> {});
|
||||
Map<String, String> headers
|
||||
= request.getCustomRequestHeaders();
|
||||
assertThat(headers)
|
||||
.describedAs("Custom headers")
|
||||
.containsKey(HEADER_REFERRER);
|
||||
String header = headers.get(HEADER_REFERRER);
|
||||
LOG.info("Header is {}", header);
|
||||
Map<String, String> params
|
||||
= HttpReferrerAuditHeader.extractQueryParameters(header);
|
||||
assertMapNotContains(params, PARAM_RANGE);
|
||||
}
|
||||
|
||||
/**
|
||||
* Expect a field with quote stripping to match the expected value.
|
||||
* @param str string to strip
|
||||
|
|
Loading…
Reference in New Issue