HDFS-7218. FSNamesystem ACL operations should write to audit log on failure. (clamb via yliu)

This commit is contained in:
yliu 2014-11-05 15:55:56 +08:00
parent d57dfac1ea
commit 3faabf2052
3 changed files with 123 additions and 1 deletions

View File

@ -610,6 +610,9 @@ Release 2.6.0 - UNRELEASED
HADOOP-11233. hadoop.security.kms.client.encrypted.key.cache.expiry
property spelled wrong in core-default. (Stephen Chu via yliu)
HDFS-7218. FSNamesystem ACL operations should write to audit log on
failure. (clamb via yliu)
BREAKDOWN OF HDFS-6581 SUBTASKS AND RELATED JIRAS
HDFS-6921. Add LazyPersist flag to FileStatus. (Arpit Agarwal)

View File

@ -7851,6 +7851,11 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
public FSDirectory getFSDirectory() {
return dir;
}
/** Set the FSDirectory. */
@VisibleForTesting
public void setFSDirectory(FSDirectory dir) {
this.dir = dir;
}
/** @return the cache manager. */
public CacheManager getCacheManager() {
return cacheManager;
@ -8717,6 +8722,9 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
List<AclEntry> newAcl = dir.modifyAclEntries(src, aclSpec);
getEditLog().logSetAcl(src, newAcl);
resultingStat = getAuditFileInfo(src, false);
} catch (AccessControlException e) {
logAuditEvent(false, "modifyAclEntries", srcArg);
throw e;
} finally {
writeUnlock();
}
@ -8741,6 +8749,9 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
List<AclEntry> newAcl = dir.removeAclEntries(src, aclSpec);
getEditLog().logSetAcl(src, newAcl);
resultingStat = getAuditFileInfo(src, false);
} catch (AccessControlException e) {
logAuditEvent(false, "removeAclEntries", srcArg);
throw e;
} finally {
writeUnlock();
}
@ -8764,6 +8775,9 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
List<AclEntry> newAcl = dir.removeDefaultAcl(src);
getEditLog().logSetAcl(src, newAcl);
resultingStat = getAuditFileInfo(src, false);
} catch (AccessControlException e) {
logAuditEvent(false, "removeDefaultAcl", srcArg);
throw e;
} finally {
writeUnlock();
}
@ -8787,6 +8801,9 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
dir.removeAcl(src);
getEditLog().logSetAcl(src, AclFeature.EMPTY_ENTRY_LIST);
resultingStat = getAuditFileInfo(src, false);
} catch (AccessControlException e) {
logAuditEvent(false, "removeAcl", srcArg);
throw e;
} finally {
writeUnlock();
}
@ -8810,6 +8827,9 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
List<AclEntry> newAcl = dir.setAcl(src, aclSpec);
getEditLog().logSetAcl(src, newAcl);
resultingStat = getAuditFileInfo(src, false);
} catch (AccessControlException e) {
logAuditEvent(false, "setAcl", srcArg);
throw e;
} finally {
writeUnlock();
}
@ -8822,6 +8842,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
FSPermissionChecker pc = getPermissionChecker();
checkOperation(OperationCategory.READ);
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
boolean success = false;
readLock();
try {
checkOperation(OperationCategory.READ);
@ -8829,9 +8850,12 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
if (isPermissionEnabled) {
checkPermission(pc, src, false, null, null, null, null);
}
return dir.getAclStatus(src);
final AclStatus ret = dir.getAclStatus(src);
success = true;
return ret;
} finally {
readUnlock();
logAuditEvent(success, "getAclStatus", src);
}
}

View File

@ -19,30 +19,39 @@
package org.apache.hadoop.hdfs.server.namenode;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AUDIT_LOGGERS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY;
import static org.apache.hadoop.test.GenericTestUtils.assertExceptionContains;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.anyListOf;
import static org.mockito.Matchers.anyString;
import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.InetAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.List;
import com.google.common.collect.Lists;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.web.resources.GetOpParam;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.authorize.ProxyServers;
import org.apache.hadoop.security.authorize.ProxyUsers;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
/**
* Tests for the {@link AuditLogger} custom audit logging interface.
@ -166,6 +175,87 @@ public class TestAuditLogger {
}
}
@Test
public void testAuditLogWithAclFailure() throws Exception {
final Configuration conf = new HdfsConfiguration();
conf.setBoolean(DFS_NAMENODE_ACLS_ENABLED_KEY, true);
conf.set(DFS_NAMENODE_AUDIT_LOGGERS_KEY,
DummyAuditLogger.class.getName());
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
try {
cluster.waitClusterUp();
final FSDirectory dir = cluster.getNamesystem().getFSDirectory();
// Set up mock FSDirectory to test FSN audit logging during failure
final FSDirectory mockedDir = Mockito.spy(dir);
Mockito.doThrow(new AccessControlException("mock setAcl exception")).
when(mockedDir).
setAcl(anyString(), anyListOf(AclEntry.class));
Mockito.doThrow(new AccessControlException("mock getAclStatus exception")).
when(mockedDir).
getAclStatus(anyString());
Mockito.doThrow(new AccessControlException("mock removeAcl exception")).
when(mockedDir).
removeAcl(anyString());
Mockito.doThrow(new AccessControlException("mock removeDefaultAcl exception")).
when(mockedDir).
removeDefaultAcl(anyString());
Mockito.doThrow(new AccessControlException("mock removeAclEntries exception")).
when(mockedDir).
removeAclEntries(anyString(), anyListOf(AclEntry.class));
Mockito.doThrow(new AccessControlException("mock modifyAclEntries exception")).
when(mockedDir).
modifyAclEntries(anyString(), anyListOf(AclEntry.class));
// Replace the FSD with the mock FSD.
cluster.getNamesystem().setFSDirectory(mockedDir);
assertTrue(DummyAuditLogger.initialized);
DummyAuditLogger.resetLogCount();
final FileSystem fs = cluster.getFileSystem();
final Path p = new Path("/");
final List<AclEntry> acls = Lists.newArrayList();
try {
fs.getAclStatus(p);
} catch (AccessControlException e) {
assertExceptionContains("mock getAclStatus exception", e);
}
try {
fs.setAcl(p, acls);
} catch (AccessControlException e) {
assertExceptionContains("mock setAcl exception", e);
}
try {
fs.removeAcl(p);
} catch (AccessControlException e) {
assertExceptionContains("mock removeAcl exception", e);
}
try {
fs.removeDefaultAcl(p);
} catch (AccessControlException e) {
assertExceptionContains("mock removeDefaultAcl exception", e);
}
try {
fs.removeAclEntries(p, acls);
} catch (AccessControlException e) {
assertExceptionContains("mock removeAclEntries exception", e);
}
try {
fs.modifyAclEntries(p, acls);
} catch (AccessControlException e) {
assertExceptionContains("mock modifyAclEntries exception", e);
}
assertEquals(6, DummyAuditLogger.logCount);
assertEquals(6, DummyAuditLogger.unsuccessfulCount);
} finally {
cluster.shutdown();
}
}
/**
* Tests that a broken audit logger causes requests to fail.
*/
@ -194,6 +284,7 @@ public class TestAuditLogger {
static boolean initialized;
static int logCount;
static int unsuccessfulCount;
static short foundPermission;
static String remoteAddr;
@ -203,6 +294,7 @@ public class TestAuditLogger {
public static void resetLogCount() {
logCount = 0;
unsuccessfulCount = 0;
}
public void logAuditEvent(boolean succeeded, String userName,
@ -210,6 +302,9 @@ public class TestAuditLogger {
FileStatus stat) {
remoteAddr = addr.getHostAddress();
logCount++;
if (!succeeded) {
unsuccessfulCount++;
}
if (stat != null) {
foundPermission = stat.getPermission().toShort();
}