HDFS-7189. Add trace spans for DFSClient metadata operations. (Colin P. McCabe via yliu)

This commit is contained in:
yliu 2015-01-16 00:21:38 +08:00
parent 2e4df87104
commit bdbf13ac46
8 changed files with 476 additions and 138 deletions

View File

@ -247,6 +247,9 @@ Release 2.7.0 - UNRELEASED
HDFS-7457. DatanodeID generates excessive garbage. (daryn via kihwal) HDFS-7457. DatanodeID generates excessive garbage. (daryn via kihwal)
HDFS-7189. Add trace spans for DFSClient metadata operations. (Colin P.
McCabe via yliu)
OPTIMIZATIONS OPTIMIZATIONS
HDFS-7454. Reduce memory footprint for AclEntries in NameNode. HDFS-7454. Reduce memory footprint for AclEntries in NameNode.

View File

@ -51,6 +51,10 @@
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import org.htrace.Sampler;
import org.htrace.Span;
import org.htrace.Trace;
import org.htrace.TraceScope;
@InterfaceAudience.Private @InterfaceAudience.Private
@InterfaceStability.Unstable @InterfaceStability.Unstable
@ -71,7 +75,7 @@ class BlockStorageLocationUtil {
*/ */
private static List<VolumeBlockLocationCallable> createVolumeBlockLocationCallables( private static List<VolumeBlockLocationCallable> createVolumeBlockLocationCallables(
Configuration conf, Map<DatanodeInfo, List<LocatedBlock>> datanodeBlocks, Configuration conf, Map<DatanodeInfo, List<LocatedBlock>> datanodeBlocks,
int timeout, boolean connectToDnViaHostname) { int timeout, boolean connectToDnViaHostname, Span parent) {
if (datanodeBlocks.isEmpty()) { if (datanodeBlocks.isEmpty()) {
return Lists.newArrayList(); return Lists.newArrayList();
@ -111,7 +115,7 @@ private static List<VolumeBlockLocationCallable> createVolumeBlockLocationCallab
} }
VolumeBlockLocationCallable callable = new VolumeBlockLocationCallable( VolumeBlockLocationCallable callable = new VolumeBlockLocationCallable(
conf, datanode, poolId, blockIds, dnTokens, timeout, conf, datanode, poolId, blockIds, dnTokens, timeout,
connectToDnViaHostname); connectToDnViaHostname, parent);
callables.add(callable); callables.add(callable);
} }
return callables; return callables;
@ -131,11 +135,11 @@ private static List<VolumeBlockLocationCallable> createVolumeBlockLocationCallab
static Map<DatanodeInfo, HdfsBlocksMetadata> queryDatanodesForHdfsBlocksMetadata( static Map<DatanodeInfo, HdfsBlocksMetadata> queryDatanodesForHdfsBlocksMetadata(
Configuration conf, Map<DatanodeInfo, List<LocatedBlock>> datanodeBlocks, Configuration conf, Map<DatanodeInfo, List<LocatedBlock>> datanodeBlocks,
int poolsize, int timeoutMs, boolean connectToDnViaHostname) int poolsize, int timeoutMs, boolean connectToDnViaHostname)
throws InvalidBlockTokenException { throws InvalidBlockTokenException {
List<VolumeBlockLocationCallable> callables = List<VolumeBlockLocationCallable> callables =
createVolumeBlockLocationCallables(conf, datanodeBlocks, timeoutMs, createVolumeBlockLocationCallables(conf, datanodeBlocks, timeoutMs,
connectToDnViaHostname); connectToDnViaHostname, Trace.currentSpan());
// Use a thread pool to execute the Callables in parallel // Use a thread pool to execute the Callables in parallel
List<Future<HdfsBlocksMetadata>> futures = List<Future<HdfsBlocksMetadata>> futures =
@ -319,11 +323,12 @@ private static class VolumeBlockLocationCallable implements
private final long[] blockIds; private final long[] blockIds;
private final List<Token<BlockTokenIdentifier>> dnTokens; private final List<Token<BlockTokenIdentifier>> dnTokens;
private final boolean connectToDnViaHostname; private final boolean connectToDnViaHostname;
private final Span parentSpan;
VolumeBlockLocationCallable(Configuration configuration, VolumeBlockLocationCallable(Configuration configuration,
DatanodeInfo datanode, String poolId, long []blockIds, DatanodeInfo datanode, String poolId, long []blockIds,
List<Token<BlockTokenIdentifier>> dnTokens, int timeout, List<Token<BlockTokenIdentifier>> dnTokens, int timeout,
boolean connectToDnViaHostname) { boolean connectToDnViaHostname, Span parentSpan) {
this.configuration = configuration; this.configuration = configuration;
this.timeout = timeout; this.timeout = timeout;
this.datanode = datanode; this.datanode = datanode;
@ -331,6 +336,7 @@ private static class VolumeBlockLocationCallable implements
this.blockIds = blockIds; this.blockIds = blockIds;
this.dnTokens = dnTokens; this.dnTokens = dnTokens;
this.connectToDnViaHostname = connectToDnViaHostname; this.connectToDnViaHostname = connectToDnViaHostname;
this.parentSpan = parentSpan;
} }
public DatanodeInfo getDatanodeInfo() { public DatanodeInfo getDatanodeInfo() {
@ -342,6 +348,8 @@ public HdfsBlocksMetadata call() throws Exception {
HdfsBlocksMetadata metadata = null; HdfsBlocksMetadata metadata = null;
// Create the RPC proxy and make the RPC // Create the RPC proxy and make the RPC
ClientDatanodeProtocol cdp = null; ClientDatanodeProtocol cdp = null;
TraceScope scope =
Trace.startSpan("getHdfsBlocksMetadata", parentSpan);
try { try {
cdp = DFSUtil.createClientDatanodeProtocolProxy(datanode, configuration, cdp = DFSUtil.createClientDatanodeProtocolProxy(datanode, configuration,
timeout, connectToDnViaHostname); timeout, connectToDnViaHostname);
@ -350,6 +358,7 @@ public HdfsBlocksMetadata call() throws Exception {
// Bubble this up to the caller, handle with the Future // Bubble this up to the caller, handle with the Future
throw e; throw e;
} finally { } finally {
scope.close();
if (cdp != null) { if (cdp != null) {
RPC.stopProxy(cdp); RPC.stopProxy(cdp);
} }

View File

@ -26,6 +26,9 @@
import org.apache.hadoop.hdfs.inotify.MissingEventsException; import org.apache.hadoop.hdfs.inotify.MissingEventsException;
import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Time;
import org.htrace.Sampler;
import org.htrace.Trace;
import org.htrace.TraceScope;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -44,6 +47,11 @@ public class DFSInotifyEventInputStream {
public static Logger LOG = LoggerFactory.getLogger(DFSInotifyEventInputStream public static Logger LOG = LoggerFactory.getLogger(DFSInotifyEventInputStream
.class); .class);
/**
* The trace sampler to use when making RPCs to the NameNode.
*/
private final Sampler<?> traceSampler;
private final ClientProtocol namenode; private final ClientProtocol namenode;
private Iterator<EventBatch> it; private Iterator<EventBatch> it;
private long lastReadTxid; private long lastReadTxid;
@ -59,12 +67,15 @@ public class DFSInotifyEventInputStream {
private static final int INITIAL_WAIT_MS = 10; private static final int INITIAL_WAIT_MS = 10;
DFSInotifyEventInputStream(ClientProtocol namenode) throws IOException { DFSInotifyEventInputStream(Sampler<?> traceSampler, ClientProtocol namenode)
this(namenode, namenode.getCurrentEditLogTxid()); // only consider new txn's throws IOException {
// Only consider new transaction IDs.
this(traceSampler, namenode, namenode.getCurrentEditLogTxid());
} }
DFSInotifyEventInputStream(ClientProtocol namenode, long lastReadTxid) DFSInotifyEventInputStream(Sampler traceSampler, ClientProtocol namenode,
throws IOException { long lastReadTxid) throws IOException {
this.traceSampler = traceSampler;
this.namenode = namenode; this.namenode = namenode;
this.it = Iterators.emptyIterator(); this.it = Iterators.emptyIterator();
this.lastReadTxid = lastReadTxid; this.lastReadTxid = lastReadTxid;
@ -87,39 +98,45 @@ public class DFSInotifyEventInputStream {
* The next available batch of events will be returned. * The next available batch of events will be returned.
*/ */
public EventBatch poll() throws IOException, MissingEventsException { public EventBatch poll() throws IOException, MissingEventsException {
// need to keep retrying until the NN sends us the latest committed txid TraceScope scope =
if (lastReadTxid == -1) { Trace.startSpan("inotifyPoll", traceSampler);
LOG.debug("poll(): lastReadTxid is -1, reading current txid from NN"); try {
lastReadTxid = namenode.getCurrentEditLogTxid(); // need to keep retrying until the NN sends us the latest committed txid
return null; if (lastReadTxid == -1) {
} LOG.debug("poll(): lastReadTxid is -1, reading current txid from NN");
if (!it.hasNext()) { lastReadTxid = namenode.getCurrentEditLogTxid();
EventBatchList el = namenode.getEditsFromTxid(lastReadTxid + 1);
if (el.getLastTxid() != -1) {
// we only want to set syncTxid when we were actually able to read some
// edits on the NN -- otherwise it will seem like edits are being
// generated faster than we can read them when the problem is really
// that we are temporarily unable to read edits
syncTxid = el.getSyncTxid();
it = el.getBatches().iterator();
long formerLastReadTxid = lastReadTxid;
lastReadTxid = el.getLastTxid();
if (el.getFirstTxid() != formerLastReadTxid + 1) {
throw new MissingEventsException(formerLastReadTxid + 1,
el.getFirstTxid());
}
} else {
LOG.debug("poll(): read no edits from the NN when requesting edits " +
"after txid {}", lastReadTxid);
return null; return null;
} }
} if (!it.hasNext()) {
EventBatchList el = namenode.getEditsFromTxid(lastReadTxid + 1);
if (el.getLastTxid() != -1) {
// we only want to set syncTxid when we were actually able to read some
// edits on the NN -- otherwise it will seem like edits are being
// generated faster than we can read them when the problem is really
// that we are temporarily unable to read edits
syncTxid = el.getSyncTxid();
it = el.getBatches().iterator();
long formerLastReadTxid = lastReadTxid;
lastReadTxid = el.getLastTxid();
if (el.getFirstTxid() != formerLastReadTxid + 1) {
throw new MissingEventsException(formerLastReadTxid + 1,
el.getFirstTxid());
}
} else {
LOG.debug("poll(): read no edits from the NN when requesting edits " +
"after txid {}", lastReadTxid);
return null;
}
}
if (it.hasNext()) { // can be empty if el.getLastTxid != -1 but none of the if (it.hasNext()) { // can be empty if el.getLastTxid != -1 but none of the
// newly seen edit log ops actually got converted to events // newly seen edit log ops actually got converted to events
return it.next(); return it.next();
} else { } else {
return null; return null;
}
} finally {
scope.close();
} }
} }
@ -163,25 +180,29 @@ public long getTxidsBehindEstimate() {
*/ */
public EventBatch poll(long time, TimeUnit tu) throws IOException, public EventBatch poll(long time, TimeUnit tu) throws IOException,
InterruptedException, MissingEventsException { InterruptedException, MissingEventsException {
long initialTime = Time.monotonicNow(); TraceScope scope = Trace.startSpan("inotifyPollWithTimeout", traceSampler);
long totalWait = TimeUnit.MILLISECONDS.convert(time, tu);
long nextWait = INITIAL_WAIT_MS;
EventBatch next = null; EventBatch next = null;
while ((next = poll()) == null) { try {
long timeLeft = totalWait - (Time.monotonicNow() - initialTime); long initialTime = Time.monotonicNow();
if (timeLeft <= 0) { long totalWait = TimeUnit.MILLISECONDS.convert(time, tu);
LOG.debug("timed poll(): timed out"); long nextWait = INITIAL_WAIT_MS;
break; while ((next = poll()) == null) {
} else if (timeLeft < nextWait * 2) { long timeLeft = totalWait - (Time.monotonicNow() - initialTime);
nextWait = timeLeft; if (timeLeft <= 0) {
} else { LOG.debug("timed poll(): timed out");
nextWait *= 2; break;
} else if (timeLeft < nextWait * 2) {
nextWait = timeLeft;
} else {
nextWait *= 2;
}
LOG.debug("timed poll(): poll() returned null, sleeping for {} ms",
nextWait);
Thread.sleep(nextWait);
} }
LOG.debug("timed poll(): poll() returned null, sleeping for {} ms", } finally {
nextWait); scope.close();
Thread.sleep(nextWait);
} }
return next; return next;
} }
@ -196,16 +217,21 @@ public EventBatch poll(long time, TimeUnit tu) throws IOException,
*/ */
public EventBatch take() throws IOException, InterruptedException, public EventBatch take() throws IOException, InterruptedException,
MissingEventsException { MissingEventsException {
TraceScope scope = Trace.startSpan("inotifyTake", traceSampler);
EventBatch next = null; EventBatch next = null;
int nextWaitMin = INITIAL_WAIT_MS; try {
while ((next = poll()) == null) { int nextWaitMin = INITIAL_WAIT_MS;
// sleep for a random period between nextWaitMin and nextWaitMin * 2 while ((next = poll()) == null) {
// to avoid stampedes at the NN if there are multiple clients // sleep for a random period between nextWaitMin and nextWaitMin * 2
int sleepTime = nextWaitMin + rng.nextInt(nextWaitMin); // to avoid stampedes at the NN if there are multiple clients
LOG.debug("take(): poll() returned null, sleeping for {} ms", sleepTime); int sleepTime = nextWaitMin + rng.nextInt(nextWaitMin);
Thread.sleep(sleepTime); LOG.debug("take(): poll() returned null, sleeping for {} ms", sleepTime);
// the maximum sleep is 2 minutes Thread.sleep(sleepTime);
nextWaitMin = Math.min(60000, nextWaitMin * 2); // the maximum sleep is 2 minutes
nextWaitMin = Math.min(60000, nextWaitMin * 2);
}
} finally {
scope.close();
} }
return next; return next;

View File

@ -27,6 +27,9 @@
import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.RemoteException;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import org.htrace.Sampler;
import org.htrace.Trace;
import org.htrace.TraceScope;
/** /**
* CacheDirectiveIterator is a remote iterator that iterates cache directives. * CacheDirectiveIterator is a remote iterator that iterates cache directives.
@ -39,12 +42,14 @@ public class CacheDirectiveIterator
private CacheDirectiveInfo filter; private CacheDirectiveInfo filter;
private final ClientProtocol namenode; private final ClientProtocol namenode;
private final Sampler<?> traceSampler;
public CacheDirectiveIterator(ClientProtocol namenode, public CacheDirectiveIterator(ClientProtocol namenode,
CacheDirectiveInfo filter) { CacheDirectiveInfo filter, Sampler<?> traceSampler) {
super(0L); super(0L);
this.namenode = namenode; this.namenode = namenode;
this.filter = filter; this.filter = filter;
this.traceSampler = traceSampler;
} }
private static CacheDirectiveInfo removeIdFromFilter(CacheDirectiveInfo filter) { private static CacheDirectiveInfo removeIdFromFilter(CacheDirectiveInfo filter) {
@ -89,6 +94,7 @@ public boolean hasMore() {
public BatchedEntries<CacheDirectiveEntry> makeRequest(Long prevKey) public BatchedEntries<CacheDirectiveEntry> makeRequest(Long prevKey)
throws IOException { throws IOException {
BatchedEntries<CacheDirectiveEntry> entries = null; BatchedEntries<CacheDirectiveEntry> entries = null;
TraceScope scope = Trace.startSpan("listCacheDirectives", traceSampler);
try { try {
entries = namenode.listCacheDirectives(prevKey, filter); entries = namenode.listCacheDirectives(prevKey, filter);
} catch (IOException e) { } catch (IOException e) {
@ -110,6 +116,8 @@ public BatchedEntries<CacheDirectiveEntry> makeRequest(Long prevKey)
"Did not find requested id " + id); "Did not find requested id " + id);
} }
throw e; throw e;
} finally {
scope.close();
} }
Preconditions.checkNotNull(entries); Preconditions.checkNotNull(entries);
return entries; return entries;

View File

@ -23,6 +23,9 @@
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.BatchedRemoteIterator; import org.apache.hadoop.fs.BatchedRemoteIterator;
import org.htrace.Sampler;
import org.htrace.Trace;
import org.htrace.TraceScope;
/** /**
* CachePoolIterator is a remote iterator that iterates cache pools. * CachePoolIterator is a remote iterator that iterates cache pools.
@ -34,16 +37,23 @@ public class CachePoolIterator
extends BatchedRemoteIterator<String, CachePoolEntry> { extends BatchedRemoteIterator<String, CachePoolEntry> {
private final ClientProtocol namenode; private final ClientProtocol namenode;
private final Sampler traceSampler;
public CachePoolIterator(ClientProtocol namenode) { public CachePoolIterator(ClientProtocol namenode, Sampler traceSampler) {
super(""); super("");
this.namenode = namenode; this.namenode = namenode;
this.traceSampler = traceSampler;
} }
@Override @Override
public BatchedEntries<CachePoolEntry> makeRequest(String prevKey) public BatchedEntries<CachePoolEntry> makeRequest(String prevKey)
throws IOException { throws IOException {
return namenode.listCachePools(prevKey); TraceScope scope = Trace.startSpan("listCachePools", traceSampler);
try {
return namenode.listCachePools(prevKey);
} finally {
scope.close();
}
} }
@Override @Override

View File

@ -23,6 +23,9 @@
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.BatchedRemoteIterator; import org.apache.hadoop.fs.BatchedRemoteIterator;
import org.htrace.Sampler;
import org.htrace.Trace;
import org.htrace.TraceScope;
/** /**
* EncryptionZoneIterator is a remote iterator that iterates over encryption * EncryptionZoneIterator is a remote iterator that iterates over encryption
@ -34,16 +37,24 @@ public class EncryptionZoneIterator
extends BatchedRemoteIterator<Long, EncryptionZone> { extends BatchedRemoteIterator<Long, EncryptionZone> {
private final ClientProtocol namenode; private final ClientProtocol namenode;
private final Sampler<?> traceSampler;
public EncryptionZoneIterator(ClientProtocol namenode) { public EncryptionZoneIterator(ClientProtocol namenode,
Sampler<?> traceSampler) {
super(Long.valueOf(0)); super(Long.valueOf(0));
this.namenode = namenode; this.namenode = namenode;
this.traceSampler = traceSampler;
} }
@Override @Override
public BatchedEntries<EncryptionZone> makeRequest(Long prevId) public BatchedEntries<EncryptionZone> makeRequest(Long prevId)
throws IOException { throws IOException {
return namenode.listEncryptionZones(prevId); TraceScope scope = Trace.startSpan("listEncryptionZones", traceSampler);
try {
return namenode.listEncryptionZones(prevId);
} finally {
scope.close();
}
} }
@Override @Override

View File

@ -88,6 +88,7 @@
import org.apache.log4j.Level; import org.apache.log4j.Level;
import org.apache.log4j.LogManager; import org.apache.log4j.LogManager;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import org.htrace.Sampler;
import org.junit.After; import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
@ -908,7 +909,7 @@ public Boolean get() {
// Uncache and check each path in sequence // Uncache and check each path in sequence
RemoteIterator<CacheDirectiveEntry> entries = RemoteIterator<CacheDirectiveEntry> entries =
new CacheDirectiveIterator(nnRpc, null); new CacheDirectiveIterator(nnRpc, null, Sampler.NEVER);
for (int i=0; i<numFiles; i++) { for (int i=0; i<numFiles; i++) {
CacheDirectiveEntry entry = entries.next(); CacheDirectiveEntry entry = entries.next();
nnRpc.removeCacheDirective(entry.getInfo().getId()); nnRpc.removeCacheDirective(entry.getInfo().getId());