HDFS-8213. DFSClient should use hdfs.client.htrace HTrace configuration prefix rather than hadoop.htrace (cmccabe)

This commit is contained in:
Colin Patrick Mccabe 2015-05-01 11:19:40 -07:00
parent 2d7363b273
commit b82567d455
11 changed files with 69 additions and 55 deletions

View File

@ -25,6 +25,7 @@ import java.io.FileInputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStreamReader; import java.io.InputStreamReader;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.TreeMap; import java.util.TreeMap;
@ -52,41 +53,36 @@ import org.apache.htrace.Trace;
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public class SpanReceiverHost implements TraceAdminProtocol { public class SpanReceiverHost implements TraceAdminProtocol {
public static final String SPAN_RECEIVERS_CONF_KEY = public static final String SPAN_RECEIVERS_CONF_SUFFIX =
"hadoop.htrace.spanreceiver.classes"; "spanreceiver.classes";
private static final Log LOG = LogFactory.getLog(SpanReceiverHost.class); private static final Log LOG = LogFactory.getLog(SpanReceiverHost.class);
private static final HashMap<String, SpanReceiverHost> hosts =
new HashMap<String, SpanReceiverHost>(1);
private final TreeMap<Long, SpanReceiver> receivers = private final TreeMap<Long, SpanReceiver> receivers =
new TreeMap<Long, SpanReceiver>(); new TreeMap<Long, SpanReceiver>();
private final String confPrefix;
private Configuration config; private Configuration config;
private boolean closed = false; private boolean closed = false;
private long highestId = 1; private long highestId = 1;
private final static String LOCAL_FILE_SPAN_RECEIVER_PATH = private final static String LOCAL_FILE_SPAN_RECEIVER_PATH_SUFFIX =
"hadoop.htrace.local-file-span-receiver.path"; "local-file-span-receiver.path";
private static enum SingletonHolder { public static SpanReceiverHost get(Configuration conf, String confPrefix) {
INSTANCE; synchronized (SpanReceiverHost.class) {
Object lock = new Object(); SpanReceiverHost host = hosts.get(confPrefix);
SpanReceiverHost host = null; if (host != null) {
} return host;
public static SpanReceiverHost getInstance(Configuration conf) {
if (SingletonHolder.INSTANCE.host != null) {
return SingletonHolder.INSTANCE.host;
}
synchronized (SingletonHolder.INSTANCE.lock) {
if (SingletonHolder.INSTANCE.host != null) {
return SingletonHolder.INSTANCE.host;
} }
SpanReceiverHost host = new SpanReceiverHost(); final SpanReceiverHost newHost = new SpanReceiverHost(confPrefix);
host.loadSpanReceivers(conf); newHost.loadSpanReceivers(conf);
SingletonHolder.INSTANCE.host = host;
ShutdownHookManager.get().addShutdownHook(new Runnable() { ShutdownHookManager.get().addShutdownHook(new Runnable() {
public void run() { public void run() {
SingletonHolder.INSTANCE.host.closeReceivers(); newHost.closeReceivers();
} }
}, 0); }, 0);
return SingletonHolder.INSTANCE.host; hosts.put(confPrefix, newHost);
return newHost;
} }
} }
@ -119,6 +115,10 @@ public class SpanReceiverHost implements TraceAdminProtocol {
return new File(tmp, nonce).getAbsolutePath(); return new File(tmp, nonce).getAbsolutePath();
} }
private SpanReceiverHost(String confPrefix) {
this.confPrefix = confPrefix;
}
/** /**
* Reads the names of classes specified in the * Reads the names of classes specified in the
* "hadoop.htrace.spanreceiver.classes" property and instantiates and registers * "hadoop.htrace.spanreceiver.classes" property and instantiates and registers
@ -131,22 +131,22 @@ public class SpanReceiverHost implements TraceAdminProtocol {
*/ */
public synchronized void loadSpanReceivers(Configuration conf) { public synchronized void loadSpanReceivers(Configuration conf) {
config = new Configuration(conf); config = new Configuration(conf);
String[] receiverNames = String receiverKey = confPrefix + SPAN_RECEIVERS_CONF_SUFFIX;
config.getTrimmedStrings(SPAN_RECEIVERS_CONF_KEY); String[] receiverNames = config.getTrimmedStrings(receiverKey);
if (receiverNames == null || receiverNames.length == 0) { if (receiverNames == null || receiverNames.length == 0) {
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
LOG.trace("No span receiver names found in " + LOG.trace("No span receiver names found in " + receiverKey + ".");
SPAN_RECEIVERS_CONF_KEY + ".");
} }
return; return;
} }
// It's convenient to have each daemon log to a random trace file when // It's convenient to have each daemon log to a random trace file when
// testing. // testing.
if (config.get(LOCAL_FILE_SPAN_RECEIVER_PATH) == null) { String pathKey = confPrefix + LOCAL_FILE_SPAN_RECEIVER_PATH_SUFFIX;
if (config.get(pathKey) == null) {
String uniqueFile = getUniqueLocalTraceFileName(); String uniqueFile = getUniqueLocalTraceFileName();
config.set(LOCAL_FILE_SPAN_RECEIVER_PATH, uniqueFile); config.set(pathKey, uniqueFile);
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
LOG.trace("Set " + LOCAL_FILE_SPAN_RECEIVER_PATH + " to " + uniqueFile); LOG.trace("Set " + pathKey + " to " + uniqueFile);
} }
} }
for (String className : receiverNames) { for (String className : receiverNames) {
@ -164,7 +164,8 @@ public class SpanReceiverHost implements TraceAdminProtocol {
private synchronized SpanReceiver loadInstance(String className, private synchronized SpanReceiver loadInstance(String className,
List<ConfigurationPair> extraConfig) throws IOException { List<ConfigurationPair> extraConfig) throws IOException {
SpanReceiverBuilder builder = SpanReceiverBuilder builder =
new SpanReceiverBuilder(TraceUtils.wrapHadoopConf(config, extraConfig)); new SpanReceiverBuilder(TraceUtils.
wrapHadoopConf(confPrefix, config, extraConfig));
SpanReceiver rcvr = builder.spanReceiverClass(className.trim()).build(); SpanReceiver rcvr = builder.spanReceiverClass(className.trim()).build();
if (rcvr == null) { if (rcvr == null) {
throw new IOException("Failed to load SpanReceiver " + className); throw new IOException("Failed to load SpanReceiver " + className);

View File

@ -31,15 +31,15 @@ import org.apache.htrace.HTraceConfiguration;
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public class TraceUtils { public class TraceUtils {
public static final String HTRACE_CONF_PREFIX = "hadoop.htrace.";
private static List<ConfigurationPair> EMPTY = Collections.emptyList(); private static List<ConfigurationPair> EMPTY = Collections.emptyList();
public static HTraceConfiguration wrapHadoopConf(final Configuration conf) { public static HTraceConfiguration wrapHadoopConf(final String prefix,
return wrapHadoopConf(conf, EMPTY); final Configuration conf) {
return wrapHadoopConf(prefix, conf, EMPTY);
} }
public static HTraceConfiguration wrapHadoopConf(final Configuration conf, public static HTraceConfiguration wrapHadoopConf(final String prefix,
List<ConfigurationPair> extraConfig) { final Configuration conf, List<ConfigurationPair> extraConfig) {
final HashMap<String, String> extraMap = new HashMap<String, String>(); final HashMap<String, String> extraMap = new HashMap<String, String>();
for (ConfigurationPair pair : extraConfig) { for (ConfigurationPair pair : extraConfig) {
extraMap.put(pair.getKey(), pair.getValue()); extraMap.put(pair.getKey(), pair.getValue());
@ -50,7 +50,7 @@ public class TraceUtils {
if (extraMap.containsKey(key)) { if (extraMap.containsKey(key)) {
return extraMap.get(key); return extraMap.get(key);
} }
return conf.get(HTRACE_CONF_PREFIX + key, ""); return conf.get(prefix + key, "");
} }
@Override @Override
@ -58,7 +58,7 @@ public class TraceUtils {
if (extraMap.containsKey(key)) { if (extraMap.containsKey(key)) {
return extraMap.get(key); return extraMap.get(key);
} }
return conf.get(HTRACE_CONF_PREFIX + key, defaultValue); return conf.get(prefix + key, defaultValue);
} }
}; };
} }

View File

@ -25,13 +25,15 @@ import org.apache.htrace.HTraceConfiguration;
import org.junit.Test; import org.junit.Test;
public class TestTraceUtils { public class TestTraceUtils {
private static String TEST_PREFIX = "test.prefix.htrace.";
@Test @Test
public void testWrappedHadoopConf() { public void testWrappedHadoopConf() {
String key = "sampler"; String key = "sampler";
String value = "ProbabilitySampler"; String value = "ProbabilitySampler";
Configuration conf = new Configuration(); Configuration conf = new Configuration();
conf.set(TraceUtils.HTRACE_CONF_PREFIX + key, value); conf.set(TEST_PREFIX + key, value);
HTraceConfiguration wrapped = TraceUtils.wrapHadoopConf(conf); HTraceConfiguration wrapped = TraceUtils.wrapHadoopConf(TEST_PREFIX, conf);
assertEquals(value, wrapped.get(key)); assertEquals(value, wrapped.get(key));
} }
@ -41,11 +43,11 @@ public class TestTraceUtils {
String oldValue = "old value"; String oldValue = "old value";
String newValue = "new value"; String newValue = "new value";
Configuration conf = new Configuration(); Configuration conf = new Configuration();
conf.set(TraceUtils.HTRACE_CONF_PREFIX + key, oldValue); conf.set(TEST_PREFIX + key, oldValue);
LinkedList<ConfigurationPair> extraConfig = LinkedList<ConfigurationPair> extraConfig =
new LinkedList<ConfigurationPair>(); new LinkedList<ConfigurationPair>();
extraConfig.add(new ConfigurationPair(key, newValue)); extraConfig.add(new ConfigurationPair(key, newValue));
HTraceConfiguration wrapped = TraceUtils.wrapHadoopConf(conf, extraConfig); HTraceConfiguration wrapped = TraceUtils.wrapHadoopConf(TEST_PREFIX, conf, extraConfig);
assertEquals(newValue, wrapped.get(key)); assertEquals(newValue, wrapped.get(key));
} }
} }

View File

@ -617,6 +617,9 @@ Release 2.7.1 - UNRELEASED
HDFS-7770. Need document for storage type label of data node storage HDFS-7770. Need document for storage type label of data node storage
locations under dfs.data.dir. (Xiaoyu Yao via aajisaka) locations under dfs.data.dir. (Xiaoyu Yao via aajisaka)
HDFS-8213. DFSClient should use hdfs.client.htrace HTrace configuration
prefix rather than hadoop.htrace (cmccabe)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -299,8 +299,9 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
public DFSClient(URI nameNodeUri, ClientProtocol rpcNamenode, public DFSClient(URI nameNodeUri, ClientProtocol rpcNamenode,
Configuration conf, FileSystem.Statistics stats) Configuration conf, FileSystem.Statistics stats)
throws IOException { throws IOException {
SpanReceiverHost.getInstance(conf); SpanReceiverHost.get(conf, DFSConfigKeys.DFS_CLIENT_HTRACE_PREFIX);
traceSampler = new SamplerBuilder(TraceUtils.wrapHadoopConf(conf)).build(); traceSampler = new SamplerBuilder(TraceUtils.
wrapHadoopConf(DFSConfigKeys.DFS_CLIENT_HTRACE_PREFIX, conf)).build();
// Copy only the required DFSClient configuration // Copy only the required DFSClient configuration
this.dfsClientConf = new DfsClientConf(conf); this.dfsClientConf = new DfsClientConf(conf);
this.conf = conf; this.conf = conf;

View File

@ -55,6 +55,13 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final String DFS_WEBHDFS_ACL_PERMISSION_PATTERN_DEFAULT = public static final String DFS_WEBHDFS_ACL_PERMISSION_PATTERN_DEFAULT =
HdfsClientConfigKeys.DFS_WEBHDFS_ACL_PERMISSION_PATTERN_DEFAULT; HdfsClientConfigKeys.DFS_WEBHDFS_ACL_PERMISSION_PATTERN_DEFAULT;
// HDFS HTrace configuration is controlled by dfs.htrace.spanreceiver.classes,
// etc.
public static final String DFS_SERVER_HTRACE_PREFIX = "dfs.htrace.";
// HDFS client HTrace configuration.
public static final String DFS_CLIENT_HTRACE_PREFIX = "dfs.client.htrace.";
// HA related configuration // HA related configuration
public static final String DFS_DATANODE_RESTART_REPLICA_EXPIRY_KEY = "dfs.datanode.restart.replica.expiration"; public static final String DFS_DATANODE_RESTART_REPLICA_EXPIRY_KEY = "dfs.datanode.restart.replica.expiration";
public static final long DFS_DATANODE_RESTART_REPLICA_EXPIRY_DEFAULT = 50; public static final long DFS_DATANODE_RESTART_REPLICA_EXPIRY_DEFAULT = 50;

View File

@ -1099,7 +1099,8 @@ public class DataNode extends ReconfigurableBase
this.dnConf = new DNConf(conf); this.dnConf = new DNConf(conf);
checkSecureConfig(dnConf, conf, resources); checkSecureConfig(dnConf, conf, resources);
this.spanReceiverHost = SpanReceiverHost.getInstance(conf); this.spanReceiverHost =
SpanReceiverHost.get(conf, DFSConfigKeys.DFS_SERVER_HTRACE_PREFIX);
if (dnConf.maxLockedMemory > 0) { if (dnConf.maxLockedMemory > 0) {
if (!NativeIO.POSIX.getCacheManipulator().verifyCanMlock()) { if (!NativeIO.POSIX.getCacheManipulator().verifyCanMlock()) {

View File

@ -639,7 +639,8 @@ public class NameNode implements NameNodeStatusMXBean {
startHttpServer(conf); startHttpServer(conf);
} }
this.spanReceiverHost = SpanReceiverHost.getInstance(conf); this.spanReceiverHost =
SpanReceiverHost.get(conf, DFSConfigKeys.DFS_SERVER_HTRACE_PREFIX);
loadNamesystem(conf); loadNamesystem(conf);

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.tracing; package org.apache.hadoop.tracing;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.net.unix.TemporarySocketDirectory; import org.apache.hadoop.net.unix.TemporarySocketDirectory;
import org.junit.Assert; import org.junit.Assert;
@ -57,7 +58,8 @@ public class TestTraceAdmin {
public void testCreateAndDestroySpanReceiver() throws Exception { public void testCreateAndDestroySpanReceiver() throws Exception {
Configuration conf = new Configuration(); Configuration conf = new Configuration();
conf = new Configuration(); conf = new Configuration();
conf.set(SpanReceiverHost.SPAN_RECEIVERS_CONF_KEY, ""); conf.set(DFSConfigKeys.DFS_SERVER_HTRACE_PREFIX +
SpanReceiverHost.SPAN_RECEIVERS_CONF_SUFFIX, "");
MiniDFSCluster cluster = MiniDFSCluster cluster =
new MiniDFSCluster.Builder(conf).numDataNodes(3).build(); new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
cluster.waitActive(); cluster.waitActive();

View File

@ -22,6 +22,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils;
@ -53,14 +54,9 @@ public class TestTracing {
private static Configuration conf; private static Configuration conf;
private static MiniDFSCluster cluster; private static MiniDFSCluster cluster;
private static DistributedFileSystem dfs; private static DistributedFileSystem dfs;
private static SpanReceiverHost spanReceiverHost;
@Test @Test
public void testTracing() throws Exception { public void testTracing() throws Exception {
// getting instance already loaded.
Assert.assertEquals(spanReceiverHost,
SpanReceiverHost.getInstance(new Configuration()));
// write and read without tracing started // write and read without tracing started
String fileName = "testTracingDisabled.dat"; String fileName = "testTracingDisabled.dat";
writeTestFile(fileName); writeTestFile(fileName);
@ -196,9 +192,9 @@ public class TestTracing {
public static void setup() throws IOException { public static void setup() throws IOException {
conf = new Configuration(); conf = new Configuration();
conf.setLong("dfs.blocksize", 100 * 1024); conf.setLong("dfs.blocksize", 100 * 1024);
conf.set(SpanReceiverHost.SPAN_RECEIVERS_CONF_KEY, conf.set(DFSConfigKeys.DFS_CLIENT_HTRACE_PREFIX +
SpanReceiverHost.SPAN_RECEIVERS_CONF_SUFFIX,
SetSpanReceiver.class.getName()); SetSpanReceiver.class.getName());
spanReceiverHost = SpanReceiverHost.getInstance(conf);
} }
@Before @Before

View File

@ -64,7 +64,8 @@ public class TestTracingShortCircuitLocalRead {
public void testShortCircuitTraceHooks() throws IOException { public void testShortCircuitTraceHooks() throws IOException {
assumeTrue(NativeCodeLoader.isNativeCodeLoaded() && !Path.WINDOWS); assumeTrue(NativeCodeLoader.isNativeCodeLoaded() && !Path.WINDOWS);
conf = new Configuration(); conf = new Configuration();
conf.set(SpanReceiverHost.SPAN_RECEIVERS_CONF_KEY, conf.set(DFSConfigKeys.DFS_CLIENT_HTRACE_PREFIX +
SpanReceiverHost.SPAN_RECEIVERS_CONF_SUFFIX,
TestTracing.SetSpanReceiver.class.getName()); TestTracing.SetSpanReceiver.class.getName());
conf.setLong("dfs.blocksize", 100 * 1024); conf.setLong("dfs.blocksize", 100 * 1024);
conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, true); conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, true);
@ -78,7 +79,6 @@ public class TestTracingShortCircuitLocalRead {
dfs = cluster.getFileSystem(); dfs = cluster.getFileSystem();
try { try {
spanReceiverHost = SpanReceiverHost.getInstance(conf);
DFSTestUtil.createFile(dfs, TEST_PATH, TEST_LENGTH, (short)1, 5678L); DFSTestUtil.createFile(dfs, TEST_PATH, TEST_LENGTH, (short)1, 5678L);
TraceScope ts = Trace.startSpan("testShortCircuitTraceHooks", Sampler.ALWAYS); TraceScope ts = Trace.startSpan("testShortCircuitTraceHooks", Sampler.ALWAYS);