svn merge -c 1560761 FIXES: HDFS-5241. Provide alternate queuing audit logger to reduce logging contention (daryn)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1560763 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
96a9a59822
commit
7bbd687d4d
|
@ -221,6 +221,9 @@ Release 2.4.0 - UNRELEASED
|
|||
|
||||
HDFS-5681. renewLease should not hold fsn write lock. (daryn via Kihwal)
|
||||
|
||||
HDFS-5241. Provide alternate queuing audit logger to reduce logging
|
||||
contention (daryn)
|
||||
|
||||
BUG FIXES
|
||||
|
||||
HDFS-5034. Remove debug prints from GetFileLinkInfo (Andrew Wang via Colin
|
||||
|
|
|
@ -304,6 +304,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
|||
public static final String DFS_NAMENODE_DEFAULT_AUDIT_LOGGER_NAME = "default";
|
||||
public static final String DFS_NAMENODE_AUDIT_LOG_TOKEN_TRACKING_ID_KEY = "dfs.namenode.audit.log.token.tracking.id";
|
||||
public static final boolean DFS_NAMENODE_AUDIT_LOG_TOKEN_TRACKING_ID_DEFAULT = false;
|
||||
public static final String DFS_NAMENODE_AUDIT_LOG_ASYNC_KEY = "dfs.namenode.audit.log.async";
|
||||
public static final boolean DFS_NAMENODE_AUDIT_LOG_ASYNC_DEFAULT = false;
|
||||
|
||||
// Much code in hdfs is not yet updated to use these keys.
|
||||
public static final String DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_KEY = "dfs.client.block.write.locateFollowingBlock.retries";
|
||||
|
|
|
@ -38,6 +38,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_ACCESSTIME_PRECI
|
|||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AUDIT_LOGGERS_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AUDIT_LOG_TOKEN_TRACKING_ID_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AUDIT_LOG_TOKEN_TRACKING_ID_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AUDIT_LOG_ASYNC_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AUDIT_LOG_ASYNC_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_TXNS_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_TXNS_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DEFAULT_AUDIT_LOGGER_NAME;
|
||||
|
@ -122,6 +124,7 @@ import javax.management.StandardMBean;
|
|||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.commons.logging.impl.Log4JLogger;
|
||||
import org.apache.hadoop.HadoopIllegalArgumentException;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
@ -244,6 +247,9 @@ import org.apache.hadoop.util.DataChecksum;
|
|||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.apache.hadoop.util.Time;
|
||||
import org.apache.hadoop.util.VersionInfo;
|
||||
import org.apache.log4j.Appender;
|
||||
import org.apache.log4j.AsyncAppender;
|
||||
import org.apache.log4j.Logger;
|
||||
import org.mortbay.util.ajax.JSON;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
|
@ -627,6 +633,11 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
|||
*/
|
||||
FSNamesystem(Configuration conf, FSImage fsImage, boolean ignoreRetryCache)
|
||||
throws IOException {
|
||||
if (conf.getBoolean(DFS_NAMENODE_AUDIT_LOG_ASYNC_KEY,
|
||||
DFS_NAMENODE_AUDIT_LOG_ASYNC_DEFAULT)) {
|
||||
LOG.info("Enabling async auditlog");
|
||||
enableAsyncAuditLog();
|
||||
}
|
||||
boolean fair = conf.getBoolean("dfs.namenode.fslock.fair", true);
|
||||
LOG.info("fsLock is fair:" + fair);
|
||||
fsLock = new FSNamesystemLock(fair);
|
||||
|
@ -7373,5 +7384,26 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
|||
auditLog.info(message);
|
||||
}
|
||||
}
|
||||
|
||||
private static void enableAsyncAuditLog() {
|
||||
if (!(auditLog instanceof Log4JLogger)) {
|
||||
LOG.warn("Log4j is required to enable async auditlog");
|
||||
return;
|
||||
}
|
||||
Logger logger = ((Log4JLogger)auditLog).getLogger();
|
||||
@SuppressWarnings("unchecked")
|
||||
List<Appender> appenders = Collections.list(logger.getAllAppenders());
|
||||
// failsafe against trying to async it more than once
|
||||
if (!appenders.isEmpty() && !(appenders.get(0) instanceof AsyncAppender)) {
|
||||
AsyncAppender asyncAppender = new AsyncAppender();
|
||||
// change logger to have an async appender containing all the
|
||||
// previously configured appenders
|
||||
for (Appender appender : appenders) {
|
||||
logger.removeAppender(appender);
|
||||
asyncAppender.addAppender(appender);
|
||||
}
|
||||
logger.addAppender(asyncAppender);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -18,16 +18,17 @@
|
|||
|
||||
package org.apache.hadoop.hdfs.server.namenode;
|
||||
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.File;
|
||||
import java.io.FileReader;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
import org.apache.commons.logging.impl.Log4JLogger;
|
||||
|
@ -46,6 +47,8 @@ import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
|
|||
import org.apache.hadoop.security.AccessControlException;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.test.PathUtils;
|
||||
import org.apache.log4j.Appender;
|
||||
import org.apache.log4j.AsyncAppender;
|
||||
import org.apache.log4j.Level;
|
||||
import org.apache.log4j.LogManager;
|
||||
import org.apache.log4j.Logger;
|
||||
|
@ -54,13 +57,30 @@ import org.apache.log4j.RollingFileAppender;
|
|||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
import org.junit.runners.Parameterized.Parameters;
|
||||
|
||||
/**
|
||||
* A JUnit test that audit logs are generated
|
||||
*/
|
||||
@RunWith(Parameterized.class)
|
||||
public class TestAuditLogs {
|
||||
static final String auditLogFile = PathUtils.getTestDirName(TestAuditLogs.class) + "/TestAuditLogs-audit.log";
|
||||
boolean useAsyncLog;
|
||||
|
||||
@Parameters
|
||||
public static Collection<Object[]> data() {
|
||||
Collection<Object[]> params = new ArrayList<Object[]>();
|
||||
params.add(new Object[]{new Boolean(false)});
|
||||
params.add(new Object[]{new Boolean(true)});
|
||||
return params;
|
||||
}
|
||||
|
||||
public TestAuditLogs(boolean useAsyncLog) {
|
||||
this.useAsyncLog = useAsyncLog;
|
||||
}
|
||||
|
||||
// Pattern for:
|
||||
// allowed=(true|false) ugi=name ip=/address cmd={cmd} src={path} dst=null perm=null
|
||||
static final Pattern auditPattern = Pattern.compile(
|
||||
|
@ -84,17 +104,28 @@ public class TestAuditLogs {
|
|||
|
||||
@Before
|
||||
public void setupCluster() throws Exception {
|
||||
// must configure prior to instantiating the namesystem because it
|
||||
// will reconfigure the logger if async is enabled
|
||||
configureAuditLogs();
|
||||
conf = new HdfsConfiguration();
|
||||
final long precision = 1L;
|
||||
conf.setLong(DFSConfigKeys.DFS_NAMENODE_ACCESSTIME_PRECISION_KEY, precision);
|
||||
conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 10000L);
|
||||
conf.setBoolean(DFSConfigKeys.DFS_WEBHDFS_ENABLED_KEY, true);
|
||||
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_AUDIT_LOG_ASYNC_KEY, useAsyncLog);
|
||||
util = new DFSTestUtil.Builder().setName("TestAuditAllowed").
|
||||
setNumFiles(20).build();
|
||||
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(4).build();
|
||||
fs = cluster.getFileSystem();
|
||||
util.createFiles(fs, fileName);
|
||||
|
||||
// make sure the appender is what it's supposed to be
|
||||
Logger logger = ((Log4JLogger) FSNamesystem.auditLog).getLogger();
|
||||
@SuppressWarnings("unchecked")
|
||||
List<Appender> appenders = Collections.list(logger.getAllAppenders());
|
||||
assertEquals(1, appenders.size());
|
||||
assertEquals(useAsyncLog, appenders.get(0) instanceof AsyncAppender);
|
||||
|
||||
fnames = util.getFileNames(fileName);
|
||||
util.waitReplication(fs, fileName, (short)3);
|
||||
userGroupInfo = UserGroupInformation.createUserForTesting(username, groups);
|
||||
|
@ -203,6 +234,7 @@ public class TestAuditLogs {
|
|||
try {
|
||||
hftpFs = (HftpFileSystem) new Path(hftpUri).getFileSystem(conf);
|
||||
InputStream istream = hftpFs.open(file);
|
||||
@SuppressWarnings("unused")
|
||||
int val = istream.read();
|
||||
istream.close();
|
||||
|
||||
|
@ -234,6 +266,12 @@ public class TestAuditLogs {
|
|||
|
||||
/** Sets up log4j logger for auditlogs */
|
||||
private void setupAuditLogs() throws IOException {
|
||||
Logger logger = ((Log4JLogger) FSNamesystem.auditLog).getLogger();
|
||||
// enable logging now that the test is ready to run
|
||||
logger.setLevel(Level.INFO);
|
||||
}
|
||||
|
||||
private void configureAuditLogs() throws IOException {
|
||||
// Shutdown the LogManager to release all logger open file handles.
|
||||
// Unfortunately, Apache commons logging library does not provide
|
||||
// means to release underlying loggers. For additional info look up
|
||||
|
@ -245,7 +283,8 @@ public class TestAuditLogs {
|
|||
assertTrue(file.delete());
|
||||
}
|
||||
Logger logger = ((Log4JLogger) FSNamesystem.auditLog).getLogger();
|
||||
logger.setLevel(Level.INFO);
|
||||
// disable logging while the cluster startup preps files
|
||||
logger.setLevel(Level.OFF);
|
||||
PatternLayout layout = new PatternLayout("%m%n");
|
||||
RollingFileAppender appender = new RollingFileAppender(layout, auditLogFile);
|
||||
logger.addAppender(appender);
|
||||
|
|
Loading…
Reference in New Issue