HADOOP-10756. KMS audit log should consolidate successful similar requests. (asuresh via tucu)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1619541 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
c6eee38b81
commit
b781c3bc88
|
@ -163,6 +163,9 @@ Release 2.6.0 - UNRELEASED
|
|||
HADOOP-10891. Add EncryptedKeyVersion factory method to
|
||||
KeyProviderCryptoExtension. (wang)
|
||||
|
||||
HADOOP-10756. KMS audit log should consolidate successful similar requests.
|
||||
(asuresh via tucu)
|
||||
|
||||
BUG FIXES
|
||||
|
||||
HADOOP-10781. Unportable getgrouplist() usage breaks FreeBSD (Dmitry
|
||||
|
|
|
@ -20,6 +20,7 @@ package org.apache.hadoop.crypto.key.kms.server;
|
|||
import org.apache.commons.codec.binary.Base64;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.crypto.key.KeyProvider;
|
||||
import org.apache.hadoop.crypto.key.KeyProvider.KeyVersion;
|
||||
import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension;
|
||||
import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension.EncryptedKeyVersion;
|
||||
import org.apache.hadoop.crypto.key.kms.KMSRESTConstants;
|
||||
|
@ -27,7 +28,6 @@ import org.apache.hadoop.security.AccessControlException;
|
|||
import org.apache.hadoop.security.authentication.client.AuthenticationException;
|
||||
import org.apache.hadoop.security.authorize.AuthorizationException;
|
||||
import org.apache.hadoop.crypto.key.kms.KMSClientProvider;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
|
||||
import javax.ws.rs.Consumes;
|
||||
import javax.ws.rs.DELETE;
|
||||
|
@ -59,22 +59,25 @@ import java.util.Map;
|
|||
@Path(KMSRESTConstants.SERVICE_VERSION)
|
||||
@InterfaceAudience.Private
|
||||
public class KMS {
|
||||
private static final String CREATE_KEY = "CREATE_KEY";
|
||||
private static final String DELETE_KEY = "DELETE_KEY";
|
||||
private static final String ROLL_NEW_VERSION = "ROLL_NEW_VERSION";
|
||||
private static final String GET_KEYS = "GET_KEYS";
|
||||
private static final String GET_KEYS_METADATA = "GET_KEYS_METADATA";
|
||||
private static final String GET_KEY_VERSION = "GET_KEY_VERSION";
|
||||
private static final String GET_CURRENT_KEY = "GET_CURRENT_KEY";
|
||||
private static final String GET_KEY_VERSIONS = "GET_KEY_VERSIONS";
|
||||
private static final String GET_METADATA = "GET_METADATA";
|
||||
private static final String GENERATE_EEK = "GENERATE_EEK";
|
||||
private static final String DECRYPT_EEK = "DECRYPT_EEK";
|
||||
public static final String CREATE_KEY = "CREATE_KEY";
|
||||
public static final String DELETE_KEY = "DELETE_KEY";
|
||||
public static final String ROLL_NEW_VERSION = "ROLL_NEW_VERSION";
|
||||
public static final String GET_KEYS = "GET_KEYS";
|
||||
public static final String GET_KEYS_METADATA = "GET_KEYS_METADATA";
|
||||
public static final String GET_KEY_VERSIONS = "GET_KEY_VERSIONS";
|
||||
public static final String GET_METADATA = "GET_METADATA";
|
||||
|
||||
public static final String GET_KEY_VERSION = "GET_KEY_VERSION";
|
||||
public static final String GET_CURRENT_KEY = "GET_CURRENT_KEY";
|
||||
public static final String GENERATE_EEK = "GENERATE_EEK";
|
||||
public static final String DECRYPT_EEK = "DECRYPT_EEK";
|
||||
|
||||
private KeyProviderCryptoExtension provider;
|
||||
private KMSAudit kmsAudit;
|
||||
|
||||
public KMS() throws Exception {
|
||||
provider = KMSWebApp.getKeyProvider();
|
||||
kmsAudit= KMSWebApp.getKMSAudit();
|
||||
}
|
||||
|
||||
private static Principal getPrincipal(SecurityContext securityContext)
|
||||
|
@ -86,13 +89,26 @@ public class KMS {
|
|||
return user;
|
||||
}
|
||||
|
||||
private static void assertAccess(KMSACLs.Type aclType, Principal principal,
|
||||
|
||||
private static final String UNAUTHORIZED_MSG_WITH_KEY =
|
||||
"User:{0} not allowed to do ''{1}'' on ''{2}''";
|
||||
|
||||
private static final String UNAUTHORIZED_MSG_WITHOUT_KEY =
|
||||
"User:{0} not allowed to do ''{1}''";
|
||||
|
||||
private void assertAccess(KMSACLs.Type aclType, Principal principal,
|
||||
String operation) throws AccessControlException {
|
||||
assertAccess(aclType, principal, operation, null);
|
||||
}
|
||||
|
||||
private void assertAccess(KMSACLs.Type aclType, Principal principal,
|
||||
String operation, String key) throws AccessControlException {
|
||||
if (!KMSWebApp.getACLs().hasAccess(aclType, principal.getName())) {
|
||||
KMSWebApp.getUnauthorizedCallsMeter().mark();
|
||||
KMSAudit.unauthorized(principal, operation, key);
|
||||
kmsAudit.unauthorized(principal, operation, key);
|
||||
throw new AuthorizationException(MessageFormat.format(
|
||||
"User:{0} not allowed to do ''{1}'' on ''{2}''",
|
||||
(key != null) ? UNAUTHORIZED_MSG_WITH_KEY
|
||||
: UNAUTHORIZED_MSG_WITHOUT_KEY,
|
||||
principal.getName(), operation, key));
|
||||
}
|
||||
}
|
||||
|
@ -149,7 +165,7 @@ public class KMS {
|
|||
|
||||
provider.flush();
|
||||
|
||||
KMSAudit.ok(user, CREATE_KEY, name, "UserProvidedMaterial:" +
|
||||
kmsAudit.ok(user, CREATE_KEY, name, "UserProvidedMaterial:" +
|
||||
(material != null) + " Description:" + description);
|
||||
|
||||
if (!KMSWebApp.getACLs().hasAccess(KMSACLs.Type.GET, user.getName())) {
|
||||
|
@ -175,7 +191,7 @@ public class KMS {
|
|||
provider.deleteKey(name);
|
||||
provider.flush();
|
||||
|
||||
KMSAudit.ok(user, DELETE_KEY, name, "");
|
||||
kmsAudit.ok(user, DELETE_KEY, name, "");
|
||||
|
||||
return Response.ok().build();
|
||||
}
|
||||
|
@ -203,7 +219,7 @@ public class KMS {
|
|||
|
||||
provider.flush();
|
||||
|
||||
KMSAudit.ok(user, ROLL_NEW_VERSION, name, "UserProvidedMaterial:" +
|
||||
kmsAudit.ok(user, ROLL_NEW_VERSION, name, "UserProvidedMaterial:" +
|
||||
(material != null) + " NewVersion:" + keyVersion.getVersionName());
|
||||
|
||||
if (!KMSWebApp.getACLs().hasAccess(KMSACLs.Type.GET, user.getName())) {
|
||||
|
@ -222,11 +238,10 @@ public class KMS {
|
|||
KMSWebApp.getAdminCallsMeter().mark();
|
||||
Principal user = getPrincipal(securityContext);
|
||||
String[] keyNames = keyNamesList.toArray(new String[keyNamesList.size()]);
|
||||
String names = StringUtils.arrayToString(keyNames);
|
||||
assertAccess(KMSACLs.Type.GET_METADATA, user, GET_KEYS_METADATA, names);
|
||||
assertAccess(KMSACLs.Type.GET_METADATA, user, GET_KEYS_METADATA);
|
||||
KeyProvider.Metadata[] keysMeta = provider.getKeysMetadata(keyNames);
|
||||
Object json = KMSServerJSONUtils.toJSON(keyNames, keysMeta);
|
||||
KMSAudit.ok(user, GET_KEYS_METADATA, names, "");
|
||||
kmsAudit.ok(user, GET_KEYS_METADATA, "");
|
||||
return Response.ok().type(MediaType.APPLICATION_JSON).entity(json).build();
|
||||
}
|
||||
|
||||
|
@ -237,9 +252,9 @@ public class KMS {
|
|||
throws Exception {
|
||||
KMSWebApp.getAdminCallsMeter().mark();
|
||||
Principal user = getPrincipal(securityContext);
|
||||
assertAccess(KMSACLs.Type.GET_KEYS, user, GET_KEYS, "*");
|
||||
assertAccess(KMSACLs.Type.GET_KEYS, user, GET_KEYS);
|
||||
Object json = provider.getKeys();
|
||||
KMSAudit.ok(user, GET_KEYS, "*", "");
|
||||
kmsAudit.ok(user, GET_KEYS, "");
|
||||
return Response.ok().type(MediaType.APPLICATION_JSON).entity(json).build();
|
||||
}
|
||||
|
||||
|
@ -263,7 +278,7 @@ public class KMS {
|
|||
KMSWebApp.getAdminCallsMeter().mark();
|
||||
assertAccess(KMSACLs.Type.GET_METADATA, user, GET_METADATA, name);
|
||||
Object json = KMSServerJSONUtils.toJSON(name, provider.getMetadata(name));
|
||||
KMSAudit.ok(user, GET_METADATA, name, "");
|
||||
kmsAudit.ok(user, GET_METADATA, name, "");
|
||||
return Response.ok().type(MediaType.APPLICATION_JSON).entity(json).build();
|
||||
}
|
||||
|
||||
|
@ -279,7 +294,7 @@ public class KMS {
|
|||
KMSWebApp.getKeyCallsMeter().mark();
|
||||
assertAccess(KMSACLs.Type.GET, user, GET_CURRENT_KEY, name);
|
||||
Object json = KMSServerJSONUtils.toJSON(provider.getCurrentKey(name));
|
||||
KMSAudit.ok(user, GET_CURRENT_KEY, name, "");
|
||||
kmsAudit.ok(user, GET_CURRENT_KEY, name, "");
|
||||
return Response.ok().type(MediaType.APPLICATION_JSON).entity(json).build();
|
||||
}
|
||||
|
||||
|
@ -292,9 +307,12 @@ public class KMS {
|
|||
Principal user = getPrincipal(securityContext);
|
||||
KMSClientProvider.checkNotEmpty(versionName, "versionName");
|
||||
KMSWebApp.getKeyCallsMeter().mark();
|
||||
assertAccess(KMSACLs.Type.GET, user, GET_KEY_VERSION, versionName);
|
||||
Object json = KMSServerJSONUtils.toJSON(provider.getKeyVersion(versionName));
|
||||
KMSAudit.ok(user, GET_KEY_VERSION, versionName, "");
|
||||
KeyVersion keyVersion = provider.getKeyVersion(versionName);
|
||||
assertAccess(KMSACLs.Type.GET, user, GET_KEY_VERSION);
|
||||
if (keyVersion != null) {
|
||||
kmsAudit.ok(user, GET_KEY_VERSION, keyVersion.getName(), "");
|
||||
}
|
||||
Object json = KMSServerJSONUtils.toJSON(keyVersion);
|
||||
return Response.ok().type(MediaType.APPLICATION_JSON).entity(json).build();
|
||||
}
|
||||
|
||||
|
@ -327,7 +345,7 @@ public class KMS {
|
|||
} catch (Exception e) {
|
||||
throw new IOException(e);
|
||||
}
|
||||
KMSAudit.ok(user, GENERATE_EEK, name, "");
|
||||
kmsAudit.ok(user, GENERATE_EEK, name, "");
|
||||
retJSON = new ArrayList();
|
||||
for (EncryptedKeyVersion edek : retEdeks) {
|
||||
((ArrayList)retJSON).add(KMSServerJSONUtils.toJSON(edek));
|
||||
|
@ -362,7 +380,7 @@ public class KMS {
|
|||
(String) jsonPayload.get(KMSRESTConstants.MATERIAL_FIELD);
|
||||
Object retJSON;
|
||||
if (eekOp.equals(KMSRESTConstants.EEK_DECRYPT)) {
|
||||
assertAccess(KMSACLs.Type.DECRYPT_EEK, user, DECRYPT_EEK, versionName);
|
||||
assertAccess(KMSACLs.Type.DECRYPT_EEK, user, DECRYPT_EEK, keyName);
|
||||
KMSClientProvider.checkNotNull(ivStr, KMSRESTConstants.IV_FIELD);
|
||||
byte[] iv = Base64.decodeBase64(ivStr);
|
||||
KMSClientProvider.checkNotNull(encMaterialStr,
|
||||
|
@ -373,7 +391,7 @@ public class KMS {
|
|||
new KMSClientProvider.KMSEncryptedKeyVersion(keyName, versionName,
|
||||
iv, KeyProviderCryptoExtension.EEK, encMaterial));
|
||||
retJSON = KMSServerJSONUtils.toJSON(retKeyVersion);
|
||||
KMSAudit.ok(user, DECRYPT_EEK, versionName, "");
|
||||
kmsAudit.ok(user, DECRYPT_EEK, keyName, "");
|
||||
} else {
|
||||
throw new IllegalArgumentException("Wrong " + KMSRESTConstants.EEK_OP +
|
||||
" value, it must be " + KMSRESTConstants.EEK_GENERATE + " or " +
|
||||
|
@ -396,7 +414,7 @@ public class KMS {
|
|||
KMSWebApp.getKeyCallsMeter().mark();
|
||||
assertAccess(KMSACLs.Type.GET, user, GET_KEY_VERSIONS, name);
|
||||
Object json = KMSServerJSONUtils.toJSON(provider.getKeyVersions(name));
|
||||
KMSAudit.ok(user, GET_KEY_VERSIONS, name, "");
|
||||
kmsAudit.ok(user, GET_KEY_VERSIONS, name, "");
|
||||
return Response.ok().type(MediaType.APPLICATION_JSON).entity(json).build();
|
||||
}
|
||||
|
||||
|
|
|
@ -20,43 +20,202 @@ package org.apache.hadoop.crypto.key.kms.server;
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.google.common.base.Joiner;
|
||||
import com.google.common.base.Strings;
|
||||
import com.google.common.cache.Cache;
|
||||
import com.google.common.cache.CacheBuilder;
|
||||
import com.google.common.cache.RemovalListener;
|
||||
import com.google.common.cache.RemovalNotification;
|
||||
import com.google.common.collect.Sets;
|
||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||
|
||||
import java.security.Principal;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
/**
|
||||
* Provides convenience methods for audit logging consistently the different
|
||||
* types of events.
|
||||
*/
|
||||
public class KMSAudit {
|
||||
|
||||
private static class AuditEvent {
|
||||
private final AtomicLong accessCount = new AtomicLong(-1);
|
||||
private final String keyName;
|
||||
private final String user;
|
||||
private final String op;
|
||||
private final String extraMsg;
|
||||
private final long startTime = System.currentTimeMillis();
|
||||
|
||||
private AuditEvent(String keyName, String user, String op, String msg) {
|
||||
this.keyName = keyName;
|
||||
this.user = user;
|
||||
this.op = op;
|
||||
this.extraMsg = msg;
|
||||
}
|
||||
|
||||
public String getExtraMsg() {
|
||||
return extraMsg;
|
||||
}
|
||||
|
||||
public AtomicLong getAccessCount() {
|
||||
return accessCount;
|
||||
}
|
||||
|
||||
public String getKeyName() {
|
||||
return keyName;
|
||||
}
|
||||
|
||||
public String getUser() {
|
||||
return user;
|
||||
}
|
||||
|
||||
public String getOp() {
|
||||
return op;
|
||||
}
|
||||
|
||||
public long getStartTime() {
|
||||
return startTime;
|
||||
}
|
||||
}
|
||||
|
||||
public static enum OpStatus {
|
||||
OK, UNAUTHORIZED, UNAUTHENTICATED, ERROR;
|
||||
}
|
||||
|
||||
private static Set<String> AGGREGATE_OPS_WHITELIST = Sets.newHashSet(
|
||||
KMS.GET_KEY_VERSION, KMS.GET_CURRENT_KEY, KMS.DECRYPT_EEK, KMS.GENERATE_EEK
|
||||
);
|
||||
|
||||
private Cache<String, AuditEvent> cache;
|
||||
|
||||
private ScheduledExecutorService executor;
|
||||
|
||||
public static final String KMS_LOGGER_NAME = "kms-audit";
|
||||
|
||||
private static Logger AUDIT_LOG = LoggerFactory.getLogger(KMS_LOGGER_NAME);
|
||||
|
||||
private static void op(String status, String op, Principal user, String key,
|
||||
String extraMsg) {
|
||||
AUDIT_LOG.info("Status:{} User:{} Op:{} Name:{}{}", status, user.getName(),
|
||||
op, key, extraMsg);
|
||||
KMSAudit(long delay) {
|
||||
cache = CacheBuilder.newBuilder()
|
||||
.expireAfterWrite(delay, TimeUnit.MILLISECONDS)
|
||||
.removalListener(
|
||||
new RemovalListener<String, AuditEvent>() {
|
||||
@Override
|
||||
public void onRemoval(
|
||||
RemovalNotification<String, AuditEvent> entry) {
|
||||
AuditEvent event = entry.getValue();
|
||||
if (event.getAccessCount().get() > 0) {
|
||||
KMSAudit.this.logEvent(event);
|
||||
event.getAccessCount().set(0);
|
||||
KMSAudit.this.cache.put(entry.getKey(), event);
|
||||
}
|
||||
}
|
||||
}).build();
|
||||
executor = Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder()
|
||||
.setDaemon(true).setNameFormat(KMS_LOGGER_NAME + "_thread").build());
|
||||
executor.scheduleAtFixedRate(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
cache.cleanUp();
|
||||
}
|
||||
}, delay / 10, delay / 10, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
public static void ok(Principal user, String op, String key,
|
||||
String extraMsg) {
|
||||
op("OK", op, user, key, extraMsg);
|
||||
}
|
||||
|
||||
public static void unauthorized(Principal user, String op, String key) {
|
||||
op("UNAUTHORIZED", op, user, key, "");
|
||||
}
|
||||
|
||||
public static void error(Principal user, String method, String url,
|
||||
String extraMsg) {
|
||||
AUDIT_LOG.info("Status:ERROR User:{} Method:{} URL:{} Exception:'{}'",
|
||||
user.getName(), method, url, extraMsg);
|
||||
}
|
||||
|
||||
public static void unauthenticated(String remoteHost, String method,
|
||||
String url, String extraMsg) {
|
||||
private void logEvent(AuditEvent event) {
|
||||
AUDIT_LOG.info(
|
||||
"Status:UNAUTHENTICATED RemoteHost:{} Method:{} URL:{} ErrorMsg:'{}'",
|
||||
remoteHost, method, url, extraMsg);
|
||||
"OK[op={}, key={}, user={}, accessCount={}, interval={}ms] {}",
|
||||
event.getOp(), event.getKeyName(), event.getUser(),
|
||||
event.getAccessCount().get(),
|
||||
(System.currentTimeMillis() - event.getStartTime()),
|
||||
event.getExtraMsg());
|
||||
}
|
||||
|
||||
private void op(OpStatus opStatus, final String op, final String user,
|
||||
final String key, final String extraMsg) {
|
||||
if (!Strings.isNullOrEmpty(user) && !Strings.isNullOrEmpty(key)
|
||||
&& !Strings.isNullOrEmpty(op)
|
||||
&& AGGREGATE_OPS_WHITELIST.contains(op)) {
|
||||
String cacheKey = createCacheKey(user, key, op);
|
||||
if (opStatus == OpStatus.UNAUTHORIZED) {
|
||||
cache.invalidate(cacheKey);
|
||||
AUDIT_LOG.info("UNAUTHORIZED[op={}, key={}, user={}] {}", op, key, user,
|
||||
extraMsg);
|
||||
} else {
|
||||
try {
|
||||
AuditEvent event = cache.get(cacheKey, new Callable<AuditEvent>() {
|
||||
@Override
|
||||
public AuditEvent call() throws Exception {
|
||||
return new AuditEvent(key, user, op, extraMsg);
|
||||
}
|
||||
});
|
||||
// Log first access (initialized as -1 so
|
||||
// incrementAndGet() == 0 implies first access)
|
||||
if (event.getAccessCount().incrementAndGet() == 0) {
|
||||
event.getAccessCount().incrementAndGet();
|
||||
logEvent(event);
|
||||
}
|
||||
} catch (ExecutionException ex) {
|
||||
throw new RuntimeException(ex);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
List<String> kvs = new LinkedList<String>();
|
||||
if (!Strings.isNullOrEmpty(op)) {
|
||||
kvs.add("op=" + op);
|
||||
}
|
||||
if (!Strings.isNullOrEmpty(key)) {
|
||||
kvs.add("key=" + key);
|
||||
}
|
||||
if (!Strings.isNullOrEmpty(user)) {
|
||||
kvs.add("user=" + user);
|
||||
}
|
||||
if (kvs.size() == 0) {
|
||||
AUDIT_LOG.info("{} {}", opStatus.toString(), extraMsg);
|
||||
} else {
|
||||
String join = Joiner.on(", ").join(kvs);
|
||||
AUDIT_LOG.info("{}[{}] {}", opStatus.toString(), join, extraMsg);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void ok(Principal user, String op, String key,
|
||||
String extraMsg) {
|
||||
op(OpStatus.OK, op, user.getName(), key, extraMsg);
|
||||
}
|
||||
|
||||
public void ok(Principal user, String op, String extraMsg) {
|
||||
op(OpStatus.OK, op, user.getName(), null, extraMsg);
|
||||
}
|
||||
|
||||
public void unauthorized(Principal user, String op, String key) {
|
||||
op(OpStatus.UNAUTHORIZED, op, user.getName(), key, "");
|
||||
}
|
||||
|
||||
public void error(Principal user, String method, String url,
|
||||
String extraMsg) {
|
||||
op(OpStatus.ERROR, null, user.getName(), null, "Method:'" + method
|
||||
+ "' Exception:'" + extraMsg + "'");
|
||||
}
|
||||
|
||||
public void unauthenticated(String remoteHost, String method,
|
||||
String url, String extraMsg) {
|
||||
op(OpStatus.UNAUTHENTICATED, null, null, null, "RemoteHost:"
|
||||
+ remoteHost + " Method:" + method
|
||||
+ " URL:" + url + " ErrorMsg:'" + extraMsg + "'");
|
||||
}
|
||||
|
||||
private static String createCacheKey(String user, String key, String op) {
|
||||
return user + "#" + key + "#" + op;
|
||||
}
|
||||
|
||||
public void shutdown() {
|
||||
executor.shutdownNow();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -115,8 +115,10 @@ public class KMSAuthenticationFilter extends AuthenticationFilter {
|
|||
if (queryString != null) {
|
||||
requestURL.append("?").append(queryString);
|
||||
}
|
||||
KMSAudit.unauthenticated(request.getRemoteHost(), method,
|
||||
requestURL.toString(), kmsResponse.msg);
|
||||
|
||||
KMSWebApp.getKMSAudit().unauthenticated(
|
||||
request.getRemoteHost(), method, requestURL.toString(),
|
||||
kmsResponse.msg);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -43,12 +43,17 @@ public class KMSConfiguration {
|
|||
// TImeout for the Current Key cache
|
||||
public static final String CURR_KEY_CACHE_TIMEOUT_KEY = CONFIG_PREFIX +
|
||||
"current.key.cache.timeout.ms";
|
||||
// Delay for Audit logs that need aggregation
|
||||
public static final String KMS_AUDIT_AGGREGATION_DELAY = CONFIG_PREFIX +
|
||||
"aggregation.delay.ms";
|
||||
|
||||
public static final boolean KEY_CACHE_ENABLE_DEFAULT = true;
|
||||
// 10 mins
|
||||
public static final long KEY_CACHE_TIMEOUT_DEFAULT = 10 * 60 * 1000;
|
||||
// 30 secs
|
||||
public static final long CURR_KEY_CACHE_TIMEOUT_DEFAULT = 30 * 1000;
|
||||
// 10 secs
|
||||
public static final long KMS_AUDIT_AGGREGATION_DELAY_DEFAULT = 10000;
|
||||
|
||||
static Configuration getConfiguration(boolean loadHadoopDefaults,
|
||||
String ... resources) {
|
||||
|
|
|
@ -20,9 +20,11 @@ package org.apache.hadoop.crypto.key.kms.server;
|
|||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
|
||||
import com.sun.jersey.api.container.ContainerException;
|
||||
|
||||
import org.apache.hadoop.crypto.key.kms.KMSRESTConstants;
|
||||
import org.apache.hadoop.security.AccessControlException;
|
||||
import org.apache.hadoop.security.authentication.client.AuthenticationException;
|
||||
import org.apache.hadoop.security.authorize.AuthorizationException;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
@ -30,6 +32,7 @@ import javax.ws.rs.core.MediaType;
|
|||
import javax.ws.rs.core.Response;
|
||||
import javax.ws.rs.ext.ExceptionMapper;
|
||||
import javax.ws.rs.ext.Provider;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.security.Principal;
|
||||
import java.util.LinkedHashMap;
|
||||
|
@ -83,6 +86,10 @@ public class KMSExceptionsProvider implements ExceptionMapper<Exception> {
|
|||
status = Response.Status.FORBIDDEN;
|
||||
// we don't audit here because we did it already when checking access
|
||||
doAudit = false;
|
||||
} else if (throwable instanceof AuthorizationException) {
|
||||
status = Response.Status.UNAUTHORIZED;
|
||||
// we don't audit here because we did it already when checking access
|
||||
doAudit = false;
|
||||
} else if (throwable instanceof AccessControlException) {
|
||||
status = Response.Status.FORBIDDEN;
|
||||
} else if (exception instanceof IOException) {
|
||||
|
@ -95,7 +102,8 @@ public class KMSExceptionsProvider implements ExceptionMapper<Exception> {
|
|||
status = Response.Status.INTERNAL_SERVER_ERROR;
|
||||
}
|
||||
if (doAudit) {
|
||||
KMSAudit.error(KMSMDCFilter.getPrincipal(), KMSMDCFilter.getMethod(),
|
||||
KMSWebApp.getKMSAudit().error(KMSMDCFilter.getPrincipal(),
|
||||
KMSMDCFilter.getMethod(),
|
||||
KMSMDCFilter.getURL(), getOneLineMessage(exception));
|
||||
}
|
||||
return createResponse(status, throwable);
|
||||
|
|
|
@ -76,6 +76,7 @@ public class KMSWebApp implements ServletContextListener {
|
|||
private static Meter decryptEEKCallsMeter;
|
||||
private static Meter generateEEKCallsMeter;
|
||||
private static Meter invalidCallsMeter;
|
||||
private static KMSAudit kmsAudit;
|
||||
private static KeyProviderCryptoExtension keyProviderCryptoExtension;
|
||||
|
||||
static {
|
||||
|
@ -144,6 +145,11 @@ public class KMSWebApp implements ServletContextListener {
|
|||
unauthenticatedCallsMeter = metricRegistry.register(
|
||||
UNAUTHENTICATED_CALLS_METER, new Meter());
|
||||
|
||||
kmsAudit =
|
||||
new KMSAudit(kmsConf.getLong(
|
||||
KMSConfiguration.KMS_AUDIT_AGGREGATION_DELAY,
|
||||
KMSConfiguration.KMS_AUDIT_AGGREGATION_DELAY_DEFAULT));
|
||||
|
||||
// this is required for the the JMXJsonServlet to work properly.
|
||||
// the JMXJsonServlet is behind the authentication filter,
|
||||
// thus the '*' ACL.
|
||||
|
@ -199,6 +205,7 @@ public class KMSWebApp implements ServletContextListener {
|
|||
|
||||
@Override
|
||||
public void contextDestroyed(ServletContextEvent sce) {
|
||||
kmsAudit.shutdown();
|
||||
acls.stopReloader();
|
||||
jmxReporter.stop();
|
||||
jmxReporter.close();
|
||||
|
@ -245,4 +252,8 @@ public class KMSWebApp implements ServletContextListener {
|
|||
public static KeyProviderCryptoExtension getKeyProvider() {
|
||||
return keyProviderCryptoExtension;
|
||||
}
|
||||
|
||||
public static KMSAudit getKMSAudit() {
|
||||
return kmsAudit;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -104,6 +104,25 @@ Hadoop Key Management Server (KMS) - Documentation Sets ${project.version}
|
|||
</property>
|
||||
+---+
|
||||
|
||||
** KMS Aggregated Audit logs
|
||||
|
||||
Audit logs are aggregated for API accesses to the GET_KEY_VERSION,
|
||||
GET_CURRENT_KEY, DECRYPT_EEK, GENERATE_EEK operations.
|
||||
|
||||
Entries are grouped by the (user,key,operation) combined key for a configurable
|
||||
aggregation interval after which the number of accesses to the specified
|
||||
end-point by the user for a given key is flushed to the audit log.
|
||||
|
||||
The Aggregation interval is configured via the property :
|
||||
|
||||
+---+
|
||||
<property>
|
||||
<name>hadoop.kms.aggregation.delay.ms</name>
|
||||
<value>10000</value>
|
||||
</property>
|
||||
+---+
|
||||
|
||||
|
||||
** Start/Stop the KMS
|
||||
|
||||
To start/stop KMS use KMS's bin/kms.sh script. For example:
|
||||
|
|
|
@ -0,0 +1,134 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.crypto.key.kms.server;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.FilterOutputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.io.PrintStream;
|
||||
import java.security.Principal;
|
||||
|
||||
import org.apache.log4j.LogManager;
|
||||
import org.apache.log4j.PropertyConfigurator;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
public class TestKMSAudit {
|
||||
|
||||
private PrintStream originalOut;
|
||||
private ByteArrayOutputStream memOut;
|
||||
private FilterOut filterOut;
|
||||
private PrintStream capturedOut;
|
||||
|
||||
private KMSAudit kmsAudit;
|
||||
|
||||
private static class FilterOut extends FilterOutputStream {
|
||||
public FilterOut(OutputStream out) {
|
||||
super(out);
|
||||
}
|
||||
|
||||
public void setOutputStream(OutputStream out) {
|
||||
this.out = out;
|
||||
}
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setUp() {
|
||||
originalOut = System.err;
|
||||
memOut = new ByteArrayOutputStream();
|
||||
filterOut = new FilterOut(memOut);
|
||||
capturedOut = new PrintStream(filterOut);
|
||||
System.setErr(capturedOut);
|
||||
PropertyConfigurator.configure(Thread.currentThread().
|
||||
getContextClassLoader()
|
||||
.getResourceAsStream("log4j-kmsaudit.properties"));
|
||||
this.kmsAudit = new KMSAudit(1000);
|
||||
}
|
||||
|
||||
@After
|
||||
public void cleanUp() {
|
||||
System.setErr(originalOut);
|
||||
LogManager.resetConfiguration();
|
||||
kmsAudit.shutdown();
|
||||
}
|
||||
|
||||
private String getAndResetLogOutput() {
|
||||
capturedOut.flush();
|
||||
String logOutput = new String(memOut.toByteArray());
|
||||
memOut = new ByteArrayOutputStream();
|
||||
filterOut.setOutputStream(memOut);
|
||||
return logOutput;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAggregation() throws Exception {
|
||||
Principal luser = Mockito.mock(Principal.class);
|
||||
Mockito.when(luser.getName()).thenReturn("luser");
|
||||
kmsAudit.ok(luser, KMS.DECRYPT_EEK, "k1", "testmsg");
|
||||
kmsAudit.ok(luser, KMS.DECRYPT_EEK, "k1", "testmsg");
|
||||
kmsAudit.ok(luser, KMS.DECRYPT_EEK, "k1", "testmsg");
|
||||
kmsAudit.ok(luser, KMS.DELETE_KEY, "k1", "testmsg");
|
||||
kmsAudit.ok(luser, KMS.ROLL_NEW_VERSION, "k1", "testmsg");
|
||||
kmsAudit.ok(luser, KMS.DECRYPT_EEK, "k1", "testmsg");
|
||||
kmsAudit.ok(luser, KMS.DECRYPT_EEK, "k1", "testmsg");
|
||||
kmsAudit.ok(luser, KMS.DECRYPT_EEK, "k1", "testmsg");
|
||||
Thread.sleep(1500);
|
||||
kmsAudit.ok(luser, KMS.DECRYPT_EEK, "k1", "testmsg");
|
||||
Thread.sleep(1500);
|
||||
String out = getAndResetLogOutput();
|
||||
System.out.println(out);
|
||||
Assert.assertTrue(
|
||||
out.matches(
|
||||
"OK\\[op=DECRYPT_EEK, key=k1, user=luser, accessCount=1, interval=[^m]{1,4}ms\\] testmsg"
|
||||
// Not aggregated !!
|
||||
+ "OK\\[op=DELETE_KEY, key=k1, user=luser\\] testmsg"
|
||||
+ "OK\\[op=ROLL_NEW_VERSION, key=k1, user=luser\\] testmsg"
|
||||
// Aggregated
|
||||
+ "OK\\[op=DECRYPT_EEK, key=k1, user=luser, accessCount=6, interval=[^m]{1,4}ms\\] testmsg"
|
||||
+ "OK\\[op=DECRYPT_EEK, key=k1, user=luser, accessCount=1, interval=[^m]{1,4}ms\\] testmsg"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAggregationUnauth() throws Exception {
|
||||
Principal luser = Mockito.mock(Principal.class);
|
||||
Mockito.when(luser.getName()).thenReturn("luser");
|
||||
kmsAudit.unauthorized(luser, KMS.GENERATE_EEK, "k2");
|
||||
Thread.sleep(1000);
|
||||
kmsAudit.ok(luser, KMS.GENERATE_EEK, "k3", "testmsg");
|
||||
kmsAudit.ok(luser, KMS.GENERATE_EEK, "k3", "testmsg");
|
||||
kmsAudit.ok(luser, KMS.GENERATE_EEK, "k3", "testmsg");
|
||||
kmsAudit.ok(luser, KMS.GENERATE_EEK, "k3", "testmsg");
|
||||
kmsAudit.ok(luser, KMS.GENERATE_EEK, "k3", "testmsg");
|
||||
kmsAudit.unauthorized(luser, KMS.GENERATE_EEK, "k3");
|
||||
kmsAudit.ok(luser, KMS.GENERATE_EEK, "k3", "testmsg");
|
||||
Thread.sleep(2000);
|
||||
String out = getAndResetLogOutput();
|
||||
System.out.println(out);
|
||||
Assert.assertTrue(
|
||||
out.matches(
|
||||
"UNAUTHORIZED\\[op=GENERATE_EEK, key=k2, user=luser\\] "
|
||||
+ "OK\\[op=GENERATE_EEK, key=k3, user=luser, accessCount=1, interval=[^m]{1,4}ms\\] testmsg"
|
||||
+ "OK\\[op=GENERATE_EEK, key=k3, user=luser, accessCount=5, interval=[^m]{1,4}ms\\] testmsg"
|
||||
+ "UNAUTHORIZED\\[op=GENERATE_EEK, key=k3, user=luser\\] "
|
||||
+ "OK\\[op=GENERATE_EEK, key=k3, user=luser, accessCount=1, interval=[^m]{1,4}ms\\] testmsg"));
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,25 @@
|
|||
#
|
||||
# Licensed to the Apache Software Foundation (ASF) under one
|
||||
# or more contributor license agreements. See the NOTICE file
|
||||
# distributed with this work for additional information
|
||||
# regarding copyright ownership. The ASF licenses this file
|
||||
# to you under the Apache License, Version 2.0 (the
|
||||
# "License"); you may not use this file except in compliance
|
||||
# with the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
#
|
||||
|
||||
# LOG Appender
|
||||
log4j.appender.kms-audit=org.apache.log4j.ConsoleAppender
|
||||
log4j.appender.kms-audit.Target=System.err
|
||||
log4j.appender.kms-audit.layout=org.apache.log4j.PatternLayout
|
||||
log4j.appender.kms-audit.layout.ConversionPattern=%m
|
||||
|
||||
log4j.rootLogger=INFO, kms-audit
|
Loading…
Reference in New Issue