From c4ccbe62c0857261b197a68c1e03a02e92f21a38 Mon Sep 17 00:00:00 2001 From: yliu Date: Fri, 16 Jan 2015 00:23:51 +0800 Subject: [PATCH] HDFS-7189. Add trace spans for DFSClient metadata operations. (Colin P. McCabe via yliu) --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 + .../hadoop/hdfs/BlockStorageLocationUtil.java | 19 +- .../org/apache/hadoop/hdfs/DFSClient.java | 407 +++++++++++++++--- .../hdfs/DFSInotifyEventInputStream.java | 146 ++++--- .../hdfs/protocol/CacheDirectiveIterator.java | 10 +- .../hdfs/protocol/CachePoolIterator.java | 14 +- .../hdfs/protocol/EncryptionZoneIterator.java | 15 +- .../server/namenode/TestCacheDirectives.java | 3 +- 8 files changed, 479 insertions(+), 138 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 2d4c6348389..34ad10fde87 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -517,6 +517,9 @@ Release 2.7.0 - UNRELEASED HDFS-7457. DatanodeID generates excessive garbage. (daryn via kihwal) + HDFS-7189. Add trace spans for DFSClient metadata operations. (Colin P. + McCabe via yliu) + OPTIMIZATIONS HDFS-7454. Reduce memory footprint for AclEntries in NameNode. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockStorageLocationUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockStorageLocationUtil.java index ba749783f5f..7f992c1926f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockStorageLocationUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockStorageLocationUtil.java @@ -51,6 +51,10 @@ import org.apache.hadoop.security.token.Token; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import org.htrace.Sampler; +import org.htrace.Span; +import org.htrace.Trace; +import org.htrace.TraceScope; @InterfaceAudience.Private @InterfaceStability.Unstable @@ -71,7 +75,7 @@ class BlockStorageLocationUtil { */ private static List createVolumeBlockLocationCallables( Configuration conf, Map> datanodeBlocks, - int timeout, boolean connectToDnViaHostname) { + int timeout, boolean connectToDnViaHostname, Span parent) { if (datanodeBlocks.isEmpty()) { return Lists.newArrayList(); @@ -111,7 +115,7 @@ class BlockStorageLocationUtil { } VolumeBlockLocationCallable callable = new VolumeBlockLocationCallable( conf, datanode, poolId, blockIds, dnTokens, timeout, - connectToDnViaHostname); + connectToDnViaHostname, parent); callables.add(callable); } return callables; @@ -131,11 +135,11 @@ class BlockStorageLocationUtil { static Map queryDatanodesForHdfsBlocksMetadata( Configuration conf, Map> datanodeBlocks, int poolsize, int timeoutMs, boolean connectToDnViaHostname) - throws InvalidBlockTokenException { + throws InvalidBlockTokenException { List callables = createVolumeBlockLocationCallables(conf, datanodeBlocks, timeoutMs, - connectToDnViaHostname); + connectToDnViaHostname, Trace.currentSpan()); // Use a thread pool to execute the Callables in parallel List> futures = @@ -319,11 +323,12 @@ class BlockStorageLocationUtil { private final long[] blockIds; private final List> dnTokens; private final boolean connectToDnViaHostname; + private final Span parentSpan; VolumeBlockLocationCallable(Configuration configuration, DatanodeInfo datanode, String poolId, long []blockIds, List> dnTokens, int timeout, - boolean connectToDnViaHostname) { + boolean connectToDnViaHostname, Span parentSpan) { this.configuration = configuration; this.timeout = timeout; this.datanode = datanode; @@ -331,6 +336,7 @@ class BlockStorageLocationUtil { this.blockIds = blockIds; this.dnTokens = dnTokens; this.connectToDnViaHostname = connectToDnViaHostname; + this.parentSpan = parentSpan; } public DatanodeInfo getDatanodeInfo() { @@ -342,6 +348,8 @@ class BlockStorageLocationUtil { HdfsBlocksMetadata metadata = null; // Create the RPC proxy and make the RPC ClientDatanodeProtocol cdp = null; + TraceScope scope = + Trace.startSpan("getHdfsBlocksMetadata", parentSpan); try { cdp = DFSUtil.createClientDatanodeProtocolProxy(datanode, configuration, timeout, connectToDnViaHostname); @@ -350,6 +358,7 @@ class BlockStorageLocationUtil { // Bubble this up to the caller, handle with the Future throw e; } finally { + scope.close(); if (cdp != null) { RPC.stopProxy(cdp); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java index f289da76886..1bb7f4a787b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java @@ -998,11 +998,14 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, * @see ClientProtocol#getPreferredBlockSize(String) */ public long getBlockSize(String f) throws IOException { + TraceScope scope = getPathTraceScope("getBlockSize", f); try { return namenode.getPreferredBlockSize(f); } catch (IOException ie) { LOG.warn("Problem getting block size", ie); throw ie; + } finally { + scope.close(); } } @@ -1035,17 +1038,20 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, public Token getDelegationToken(Text renewer) throws IOException { assert dtService != null; - Token token = - namenode.getDelegationToken(renewer); - - if (token != null) { - token.setService(this.dtService); - LOG.info("Created " + DelegationTokenIdentifier.stringifyToken(token)); - } else { - LOG.info("Cannot get delegation token from " + renewer); + TraceScope scope = Trace.startSpan("getDelegationToken", traceSampler); + try { + Token token = + namenode.getDelegationToken(renewer); + if (token != null) { + token.setService(this.dtService); + LOG.info("Created " + DelegationTokenIdentifier.stringifyToken(token)); + } else { + LOG.info("Cannot get delegation token from " + renewer); + } + return token; + } finally { + scope.close(); } - return token; - } /** @@ -1216,7 +1222,12 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, @VisibleForTesting public LocatedBlocks getLocatedBlocks(String src, long start, long length) throws IOException { - return callGetBlockLocations(namenode, src, start, length); + TraceScope scope = getPathTraceScope("getBlockLocations", src); + try { + return callGetBlockLocations(namenode, src, start, length); + } finally { + scope.close(); + } } /** @@ -1243,12 +1254,15 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, boolean recoverLease(String src) throws IOException { checkOpen(); + TraceScope scope = getPathTraceScope("recoverLease", src); try { return namenode.recoverLease(src, clientName); } catch (RemoteException re) { throw re.unwrapRemoteException(FileNotFoundException.class, AccessControlException.class, UnresolvedPathException.class); + } finally { + scope.close(); } } @@ -1265,14 +1279,19 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, * as the data-block the task processes. */ public BlockLocation[] getBlockLocations(String src, long start, - long length) throws IOException, UnresolvedLinkException { - LocatedBlocks blocks = getLocatedBlocks(src, start, length); - BlockLocation[] locations = DFSUtil.locatedBlocks2Locations(blocks); - HdfsBlockLocation[] hdfsLocations = new HdfsBlockLocation[locations.length]; - for (int i = 0; i < locations.length; i++) { - hdfsLocations[i] = new HdfsBlockLocation(locations[i], blocks.get(i)); + long length) throws IOException, UnresolvedLinkException { + TraceScope scope = getPathTraceScope("getBlockLocations", src); + try { + LocatedBlocks blocks = getLocatedBlocks(src, start, length); + BlockLocation[] locations = DFSUtil.locatedBlocks2Locations(blocks); + HdfsBlockLocation[] hdfsLocations = new HdfsBlockLocation[locations.length]; + for (int i = 0; i < locations.length; i++) { + hdfsLocations[i] = new HdfsBlockLocation(locations[i], blocks.get(i)); + } + return hdfsLocations; + } finally { + scope.close(); } - return hdfsLocations; } /** @@ -1326,15 +1345,21 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, } // Make RPCs to the datanodes to get volume locations for its replicas - Map metadatas = BlockStorageLocationUtil - .queryDatanodesForHdfsBlocksMetadata(conf, datanodeBlocks, - getConf().getFileBlockStorageLocationsNumThreads, - getConf().getFileBlockStorageLocationsTimeoutMs, - getConf().connectToDnViaHostname); - - if (LOG.isTraceEnabled()) { - LOG.trace("metadata returned: " - + Joiner.on("\n").withKeyValueSeparator("=").join(metadatas)); + TraceScope scope = + Trace.startSpan("getBlockStorageLocations", traceSampler); + Map metadatas; + try { + metadatas = BlockStorageLocationUtil. + queryDatanodesForHdfsBlocksMetadata(conf, datanodeBlocks, + getConf().getFileBlockStorageLocationsNumThreads, + getConf().getFileBlockStorageLocationsTimeoutMs, + getConf().connectToDnViaHostname); + if (LOG.isTraceEnabled()) { + LOG.trace("metadata returned: " + + Joiner.on("\n").withKeyValueSeparator("=").join(metadatas)); + } + } finally { + scope.close(); } // Regroup the returned VolumeId metadata to again be grouped by @@ -1354,19 +1379,24 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, */ private KeyVersion decryptEncryptedDataEncryptionKey(FileEncryptionInfo feInfo) throws IOException { - if (provider == null) { - throw new IOException("No KeyProvider is configured, cannot access" + - " an encrypted file"); - } - EncryptedKeyVersion ekv = EncryptedKeyVersion.createForDecryption( - feInfo.getKeyName(), feInfo.getEzKeyVersionName(), feInfo.getIV(), - feInfo.getEncryptedDataEncryptionKey()); + TraceScope scope = Trace.startSpan("decryptEDEK", traceSampler); try { - KeyProviderCryptoExtension cryptoProvider = KeyProviderCryptoExtension - .createKeyProviderCryptoExtension(provider); - return cryptoProvider.decryptEncryptedKey(ekv); - } catch (GeneralSecurityException e) { - throw new IOException(e); + if (provider == null) { + throw new IOException("No KeyProvider is configured, cannot access" + + " an encrypted file"); + } + EncryptedKeyVersion ekv = EncryptedKeyVersion.createForDecryption( + feInfo.getKeyName(), feInfo.getEzKeyVersionName(), feInfo.getIV(), + feInfo.getEncryptedDataEncryptionKey()); + try { + KeyProviderCryptoExtension cryptoProvider = KeyProviderCryptoExtension + .createKeyProviderCryptoExtension(provider); + return cryptoProvider.decryptEncryptedKey(ekv); + } catch (GeneralSecurityException e) { + throw new IOException(e); + } + } finally { + scope.close(); } } @@ -1504,7 +1534,12 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, throws IOException, UnresolvedLinkException { checkOpen(); // Get block info from namenode - return new DFSInputStream(this, src, verifyChecksum); + TraceScope scope = getPathTraceScope("newDFSInputStream", src); + try { + return new DFSInputStream(this, src, verifyChecksum); + } finally { + scope.close(); + } } /** @@ -1737,6 +1772,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, */ public void createSymlink(String target, String link, boolean createParent) throws IOException { + TraceScope scope = getPathTraceScope("createSymlink", target); try { FsPermission dirPerm = FsPermission.getDefault().applyUMask(dfsClientConf.uMask); @@ -1750,6 +1786,8 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, DSQuotaExceededException.class, UnresolvedPathException.class, SnapshotAccessControlException.class); + } finally { + scope.close(); } } @@ -1760,11 +1798,14 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, */ public String getLinkTarget(String path) throws IOException { checkOpen(); + TraceScope scope = getPathTraceScope("getLinkTarget", path); try { return namenode.getLinkTarget(path); } catch (RemoteException re) { throw re.unwrapRemoteException(AccessControlException.class, FileNotFoundException.class); + } finally { + scope.close(); } } @@ -1824,6 +1865,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, */ public boolean setReplication(String src, short replication) throws IOException { + TraceScope scope = getPathTraceScope("setReplication", src); try { return namenode.setReplication(src, replication); } catch(RemoteException re) { @@ -1833,6 +1875,8 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, DSQuotaExceededException.class, UnresolvedPathException.class, SnapshotAccessControlException.class); + } finally { + scope.close(); } } @@ -1843,6 +1887,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, */ public void setStoragePolicy(String src, String policyName) throws IOException { + TraceScope scope = getPathTraceScope("setStoragePolicy", src); try { namenode.setStoragePolicy(src, policyName); } catch (RemoteException e) { @@ -1852,6 +1897,8 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, NSQuotaExceededException.class, UnresolvedPathException.class, SnapshotAccessControlException.class); + } finally { + scope.close(); } } @@ -1859,7 +1906,12 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, * @return All the existing storage policies */ public BlockStoragePolicy[] getStoragePolicies() throws IOException { - return namenode.getStoragePolicies(); + TraceScope scope = Trace.startSpan("getStoragePolicies", traceSampler); + try { + return namenode.getStoragePolicies(); + } finally { + scope.close(); + } } /** @@ -1870,6 +1922,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, @Deprecated public boolean rename(String src, String dst) throws IOException { checkOpen(); + TraceScope scope = getSrcDstTraceScope("rename", src, dst); try { return namenode.rename(src, dst); } catch(RemoteException re) { @@ -1878,6 +1931,8 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, DSQuotaExceededException.class, UnresolvedPathException.class, SnapshotAccessControlException.class); + } finally { + scope.close(); } } @@ -1887,12 +1942,15 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, */ public void concat(String trg, String [] srcs) throws IOException { checkOpen(); + TraceScope scope = Trace.startSpan("concat", traceSampler); try { namenode.concat(trg, srcs); } catch(RemoteException re) { throw re.unwrapRemoteException(AccessControlException.class, UnresolvedPathException.class, SnapshotAccessControlException.class); + } finally { + scope.close(); } } /** @@ -1902,6 +1960,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, public void rename(String src, String dst, Options.Rename... options) throws IOException { checkOpen(); + TraceScope scope = getSrcDstTraceScope("rename2", src, dst); try { namenode.rename2(src, dst, options); } catch(RemoteException re) { @@ -1914,6 +1973,8 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, NSQuotaExceededException.class, UnresolvedPathException.class, SnapshotAccessControlException.class); + } finally { + scope.close(); } } @@ -1923,11 +1984,14 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, */ public boolean truncate(String src, long newLength) throws IOException { checkOpen(); + TraceScope scope = getPathTraceScope("truncate", src); try { return namenode.truncate(src, newLength, clientName); } catch (RemoteException re) { throw re.unwrapRemoteException(AccessControlException.class, UnresolvedPathException.class); + } finally { + scope.close(); } } @@ -1938,7 +2002,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, @Deprecated public boolean delete(String src) throws IOException { checkOpen(); - return namenode.delete(src, true); + return delete(src, true); } /** @@ -1950,6 +2014,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, */ public boolean delete(String src, boolean recursive) throws IOException { checkOpen(); + TraceScope scope = getPathTraceScope("delete", src); try { return namenode.delete(src, recursive); } catch(RemoteException re) { @@ -1958,6 +2023,8 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, SafeModeException.class, UnresolvedPathException.class, SnapshotAccessControlException.class); + } finally { + scope.close(); } } @@ -1987,15 +2054,17 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, * @see ClientProtocol#getListing(String, byte[], boolean) */ public DirectoryListing listPaths(String src, byte[] startAfter, - boolean needLocation) - throws IOException { + boolean needLocation) throws IOException { checkOpen(); + TraceScope scope = getPathTraceScope("listPaths", src); try { return namenode.getListing(src, startAfter, needLocation); } catch(RemoteException re) { throw re.unwrapRemoteException(AccessControlException.class, FileNotFoundException.class, UnresolvedPathException.class); + } finally { + scope.close(); } } @@ -2009,12 +2078,15 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, */ public HdfsFileStatus getFileInfo(String src) throws IOException { checkOpen(); + TraceScope scope = getPathTraceScope("getFileInfo", src); try { return namenode.getFileInfo(src); } catch(RemoteException re) { throw re.unwrapRemoteException(AccessControlException.class, FileNotFoundException.class, UnresolvedPathException.class); + } finally { + scope.close(); } } @@ -2024,12 +2096,15 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, */ public boolean isFileClosed(String src) throws IOException{ checkOpen(); + TraceScope scope = getPathTraceScope("isFileClosed", src); try { return namenode.isFileClosed(src); } catch(RemoteException re) { throw re.unwrapRemoteException(AccessControlException.class, FileNotFoundException.class, UnresolvedPathException.class); + } finally { + scope.close(); } } @@ -2043,12 +2118,15 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, */ public HdfsFileStatus getFileLinkInfo(String src) throws IOException { checkOpen(); + TraceScope scope = getPathTraceScope("getFileLinkInfo", src); try { return namenode.getFileLinkInfo(src); } catch(RemoteException re) { throw re.unwrapRemoteException(AccessControlException.class, UnresolvedPathException.class); - } + } finally { + scope.close(); + } } @InterfaceAudience.Private @@ -2345,6 +2423,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, public void setPermission(String src, FsPermission permission) throws IOException { checkOpen(); + TraceScope scope = getPathTraceScope("setPermission", src); try { namenode.setPermission(src, permission); } catch(RemoteException re) { @@ -2353,6 +2432,8 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, SafeModeException.class, UnresolvedPathException.class, SnapshotAccessControlException.class); + } finally { + scope.close(); } } @@ -2367,6 +2448,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, public void setOwner(String src, String username, String groupname) throws IOException { checkOpen(); + TraceScope scope = getPathTraceScope("setOwner", src); try { namenode.setOwner(src, username, groupname); } catch(RemoteException re) { @@ -2375,6 +2457,18 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, SafeModeException.class, UnresolvedPathException.class, SnapshotAccessControlException.class); + } finally { + scope.close(); + } + } + + private long[] callGetStats() throws IOException { + checkOpen(); + TraceScope scope = Trace.startSpan("getStats", traceSampler); + try { + return namenode.getStats(); + } finally { + scope.close(); } } @@ -2382,7 +2476,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, * @see ClientProtocol#getStats() */ public FsStatus getDiskStatus() throws IOException { - long rawNums[] = namenode.getStats(); + long rawNums[] = callGetStats(); return new FsStatus(rawNums[0], rawNums[1], rawNums[2]); } @@ -2392,7 +2486,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, * @throws IOException */ public long getMissingBlocksCount() throws IOException { - return namenode.getStats()[ClientProtocol.GET_STATS_MISSING_BLOCKS_IDX]; + return callGetStats()[ClientProtocol.GET_STATS_MISSING_BLOCKS_IDX]; } /** @@ -2401,7 +2495,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, * @throws IOException */ public long getMissingReplOneBlocksCount() throws IOException { - return namenode.getStats()[ClientProtocol. + return callGetStats()[ClientProtocol. GET_STATS_MISSING_REPL_ONE_BLOCKS_IDX]; } @@ -2410,7 +2504,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, * @throws IOException */ public long getUnderReplicatedBlocksCount() throws IOException { - return namenode.getStats()[ClientProtocol.GET_STATS_UNDER_REPLICATED_IDX]; + return callGetStats()[ClientProtocol.GET_STATS_UNDER_REPLICATED_IDX]; } /** @@ -2418,7 +2512,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, * @throws IOException */ public long getCorruptBlocksCount() throws IOException { - return namenode.getStats()[ClientProtocol.GET_STATS_CORRUPT_BLOCKS_IDX]; + return callGetStats()[ClientProtocol.GET_STATS_CORRUPT_BLOCKS_IDX]; } /** @@ -2427,18 +2521,37 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, */ public CorruptFileBlocks listCorruptFileBlocks(String path, String cookie) - throws IOException { - return namenode.listCorruptFileBlocks(path, cookie); + throws IOException { + checkOpen(); + TraceScope scope = getPathTraceScope("listCorruptFileBlocks", path); + try { + return namenode.listCorruptFileBlocks(path, cookie); + } finally { + scope.close(); + } } public DatanodeInfo[] datanodeReport(DatanodeReportType type) - throws IOException { - return namenode.getDatanodeReport(type); + throws IOException { + checkOpen(); + TraceScope scope = Trace.startSpan("datanodeReport", traceSampler); + try { + return namenode.getDatanodeReport(type); + } finally { + scope.close(); + } } public DatanodeStorageReport[] getDatanodeStorageReport( DatanodeReportType type) throws IOException { - return namenode.getDatanodeStorageReport(type); + checkOpen(); + TraceScope scope = + Trace.startSpan("datanodeStorageReport", traceSampler); + try { + return namenode.getDatanodeStorageReport(type); + } finally { + scope.close(); + } } /** @@ -2462,7 +2575,13 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, * @see ClientProtocol#setSafeMode(HdfsConstants.SafeModeAction, boolean) */ public boolean setSafeMode(SafeModeAction action, boolean isChecked) throws IOException{ - return namenode.setSafeMode(action, isChecked); + TraceScope scope = + Trace.startSpan("setSafeMode", traceSampler); + try { + return namenode.setSafeMode(action, isChecked); + } finally { + scope.close(); + } } /** @@ -2476,10 +2595,13 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, public String createSnapshot(String snapshotRoot, String snapshotName) throws IOException { checkOpen(); + TraceScope scope = Trace.startSpan("createSnapshot", traceSampler); try { return namenode.createSnapshot(snapshotRoot, snapshotName); } catch(RemoteException re) { throw re.unwrapRemoteException(); + } finally { + scope.close(); } } @@ -2494,10 +2616,14 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, */ public void deleteSnapshot(String snapshotRoot, String snapshotName) throws IOException { + checkOpen(); + TraceScope scope = Trace.startSpan("deleteSnapshot", traceSampler); try { namenode.deleteSnapshot(snapshotRoot, snapshotName); } catch(RemoteException re) { throw re.unwrapRemoteException(); + } finally { + scope.close(); } } @@ -2512,10 +2638,13 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, public void renameSnapshot(String snapshotDir, String snapshotOldName, String snapshotNewName) throws IOException { checkOpen(); + TraceScope scope = Trace.startSpan("renameSnapshot", traceSampler); try { namenode.renameSnapshot(snapshotDir, snapshotOldName, snapshotNewName); } catch(RemoteException re) { throw re.unwrapRemoteException(); + } finally { + scope.close(); } } @@ -2528,10 +2657,14 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, public SnapshottableDirectoryStatus[] getSnapshottableDirListing() throws IOException { checkOpen(); + TraceScope scope = Trace.startSpan("getSnapshottableDirListing", + traceSampler); try { return namenode.getSnapshottableDirListing(); } catch(RemoteException re) { throw re.unwrapRemoteException(); + } finally { + scope.close(); } } @@ -2542,10 +2675,13 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, */ public void allowSnapshot(String snapshotRoot) throws IOException { checkOpen(); + TraceScope scope = Trace.startSpan("allowSnapshot", traceSampler); try { namenode.allowSnapshot(snapshotRoot); } catch (RemoteException re) { throw re.unwrapRemoteException(); + } finally { + scope.close(); } } @@ -2556,10 +2692,13 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, */ public void disallowSnapshot(String snapshotRoot) throws IOException { checkOpen(); + TraceScope scope = Trace.startSpan("disallowSnapshot", traceSampler); try { namenode.disallowSnapshot(snapshotRoot); } catch (RemoteException re) { throw re.unwrapRemoteException(); + } finally { + scope.close(); } } @@ -2571,78 +2710,99 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, public SnapshotDiffReport getSnapshotDiffReport(String snapshotDir, String fromSnapshot, String toSnapshot) throws IOException { checkOpen(); + TraceScope scope = Trace.startSpan("getSnapshotDiffReport", traceSampler); try { return namenode.getSnapshotDiffReport(snapshotDir, fromSnapshot, toSnapshot); } catch(RemoteException re) { throw re.unwrapRemoteException(); + } finally { + scope.close(); } } public long addCacheDirective( CacheDirectiveInfo info, EnumSet flags) throws IOException { checkOpen(); + TraceScope scope = Trace.startSpan("addCacheDirective", traceSampler); try { return namenode.addCacheDirective(info, flags); } catch (RemoteException re) { throw re.unwrapRemoteException(); + } finally { + scope.close(); } } public void modifyCacheDirective( CacheDirectiveInfo info, EnumSet flags) throws IOException { checkOpen(); + TraceScope scope = Trace.startSpan("modifyCacheDirective", traceSampler); try { namenode.modifyCacheDirective(info, flags); } catch (RemoteException re) { throw re.unwrapRemoteException(); + } finally { + scope.close(); } } public void removeCacheDirective(long id) throws IOException { checkOpen(); + TraceScope scope = Trace.startSpan("removeCacheDirective", traceSampler); try { namenode.removeCacheDirective(id); } catch (RemoteException re) { throw re.unwrapRemoteException(); + } finally { + scope.close(); } } public RemoteIterator listCacheDirectives( CacheDirectiveInfo filter) throws IOException { - return new CacheDirectiveIterator(namenode, filter); + return new CacheDirectiveIterator(namenode, filter, traceSampler); } public void addCachePool(CachePoolInfo info) throws IOException { checkOpen(); + TraceScope scope = Trace.startSpan("addCachePool", traceSampler); try { namenode.addCachePool(info); } catch (RemoteException re) { throw re.unwrapRemoteException(); + } finally { + scope.close(); } } public void modifyCachePool(CachePoolInfo info) throws IOException { checkOpen(); + TraceScope scope = Trace.startSpan("modifyCachePool", traceSampler); try { namenode.modifyCachePool(info); } catch (RemoteException re) { throw re.unwrapRemoteException(); + } finally { + scope.close(); } } public void removeCachePool(String poolName) throws IOException { checkOpen(); + TraceScope scope = Trace.startSpan("removeCachePool", traceSampler); try { namenode.removeCachePool(poolName); } catch (RemoteException re) { throw re.unwrapRemoteException(); + } finally { + scope.close(); } } public RemoteIterator listCachePools() throws IOException { - return new CachePoolIterator(namenode); + return new CachePoolIterator(namenode, traceSampler); } /** @@ -2651,10 +2811,13 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, * @see ClientProtocol#saveNamespace() */ void saveNamespace() throws AccessControlException, IOException { + TraceScope scope = Trace.startSpan("saveNamespace", traceSampler); try { namenode.saveNamespace(); } catch(RemoteException re) { throw re.unwrapRemoteException(AccessControlException.class); + } finally { + scope.close(); } } @@ -2665,10 +2828,13 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, * @see ClientProtocol#rollEdits() */ long rollEdits() throws AccessControlException, IOException { + TraceScope scope = Trace.startSpan("rollEdits", traceSampler); try { return namenode.rollEdits(); } catch(RemoteException re) { throw re.unwrapRemoteException(AccessControlException.class); + } finally { + scope.close(); } } @@ -2684,7 +2850,12 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, */ boolean restoreFailedStorage(String arg) throws AccessControlException, IOException{ - return namenode.restoreFailedStorage(arg); + TraceScope scope = Trace.startSpan("restoreFailedStorage", traceSampler); + try { + return namenode.restoreFailedStorage(arg); + } finally { + scope.close(); + } } /** @@ -2695,7 +2866,12 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, * @see ClientProtocol#refreshNodes() */ public void refreshNodes() throws IOException { - namenode.refreshNodes(); + TraceScope scope = Trace.startSpan("refreshNodes", traceSampler); + try { + namenode.refreshNodes(); + } finally { + scope.close(); + } } /** @@ -2704,7 +2880,12 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, * @see ClientProtocol#metaSave(String) */ public void metaSave(String pathname) throws IOException { - namenode.metaSave(pathname); + TraceScope scope = Trace.startSpan("metaSave", traceSampler); + try { + namenode.metaSave(pathname); + } finally { + scope.close(); + } } /** @@ -2716,18 +2897,33 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, * @see ClientProtocol#setBalancerBandwidth(long) */ public void setBalancerBandwidth(long bandwidth) throws IOException { - namenode.setBalancerBandwidth(bandwidth); + TraceScope scope = Trace.startSpan("setBalancerBandwidth", traceSampler); + try { + namenode.setBalancerBandwidth(bandwidth); + } finally { + scope.close(); + } } /** * @see ClientProtocol#finalizeUpgrade() */ public void finalizeUpgrade() throws IOException { - namenode.finalizeUpgrade(); + TraceScope scope = Trace.startSpan("finalizeUpgrade", traceSampler); + try { + namenode.finalizeUpgrade(); + } finally { + scope.close(); + } } RollingUpgradeInfo rollingUpgrade(RollingUpgradeAction action) throws IOException { - return namenode.rollingUpgrade(action); + TraceScope scope = Trace.startSpan("rollingUpgrade", traceSampler); + try { + return namenode.rollingUpgrade(action); + } finally { + scope.close(); + } } /** @@ -2784,6 +2980,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, if(LOG.isDebugEnabled()) { LOG.debug(src + ": masked=" + absPermission); } + TraceScope scope = Trace.startSpan("mkdir", traceSampler); try { return namenode.mkdirs(src, absPermission, createParent); } catch(RemoteException re) { @@ -2797,6 +2994,8 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, DSQuotaExceededException.class, UnresolvedPathException.class, SnapshotAccessControlException.class); + } finally { + scope.close(); } } @@ -2807,12 +3006,15 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, * @see ClientProtocol#getContentSummary(String) */ ContentSummary getContentSummary(String src) throws IOException { + TraceScope scope = getPathTraceScope("getContentSummary", src); try { return namenode.getContentSummary(src); } catch(RemoteException re) { throw re.unwrapRemoteException(AccessControlException.class, FileNotFoundException.class, UnresolvedPathException.class); + } finally { + scope.close(); } } @@ -2832,6 +3034,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, diskspaceQuota); } + TraceScope scope = getPathTraceScope("setQuota", src); try { namenode.setQuota(src, namespaceQuota, diskspaceQuota); } catch(RemoteException re) { @@ -2841,6 +3044,8 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, DSQuotaExceededException.class, UnresolvedPathException.class, SnapshotAccessControlException.class); + } finally { + scope.close(); } } @@ -2851,6 +3056,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, */ public void setTimes(String src, long mtime, long atime) throws IOException { checkOpen(); + TraceScope scope = getPathTraceScope("setTimes", src); try { namenode.setTimes(src, mtime, atime); } catch(RemoteException re) { @@ -2858,6 +3064,8 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, FileNotFoundException.class, UnresolvedPathException.class, SnapshotAccessControlException.class); + } finally { + scope.close(); } } @@ -2909,6 +3117,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, public void modifyAclEntries(String src, List aclSpec) throws IOException { checkOpen(); + TraceScope scope = getPathTraceScope("modifyAclEntries", src); try { namenode.modifyAclEntries(src, aclSpec); } catch(RemoteException re) { @@ -2919,12 +3128,15 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, SafeModeException.class, SnapshotAccessControlException.class, UnresolvedPathException.class); + } finally { + scope.close(); } } public void removeAclEntries(String src, List aclSpec) throws IOException { checkOpen(); + TraceScope scope = Trace.startSpan("removeAclEntries", traceSampler); try { namenode.removeAclEntries(src, aclSpec); } catch(RemoteException re) { @@ -2935,11 +3147,14 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, SafeModeException.class, SnapshotAccessControlException.class, UnresolvedPathException.class); + } finally { + scope.close(); } } public void removeDefaultAcl(String src) throws IOException { checkOpen(); + TraceScope scope = Trace.startSpan("removeDefaultAcl", traceSampler); try { namenode.removeDefaultAcl(src); } catch(RemoteException re) { @@ -2950,11 +3165,14 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, SafeModeException.class, SnapshotAccessControlException.class, UnresolvedPathException.class); + } finally { + scope.close(); } } public void removeAcl(String src) throws IOException { checkOpen(); + TraceScope scope = Trace.startSpan("removeAcl", traceSampler); try { namenode.removeAcl(src); } catch(RemoteException re) { @@ -2965,11 +3183,14 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, SafeModeException.class, SnapshotAccessControlException.class, UnresolvedPathException.class); + } finally { + scope.close(); } } public void setAcl(String src, List aclSpec) throws IOException { checkOpen(); + TraceScope scope = Trace.startSpan("setAcl", traceSampler); try { namenode.setAcl(src, aclSpec); } catch(RemoteException re) { @@ -2980,11 +3201,14 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, SafeModeException.class, SnapshotAccessControlException.class, UnresolvedPathException.class); + } finally { + scope.close(); } } public AclStatus getAclStatus(String src) throws IOException { checkOpen(); + TraceScope scope = getPathTraceScope("getAclStatus", src); try { return namenode.getAclStatus(src); } catch(RemoteException re) { @@ -2992,41 +3216,50 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, AclException.class, FileNotFoundException.class, UnresolvedPathException.class); + } finally { + scope.close(); } } public void createEncryptionZone(String src, String keyName) throws IOException { checkOpen(); + TraceScope scope = getPathTraceScope("createEncryptionZone", src); try { namenode.createEncryptionZone(src, keyName); } catch (RemoteException re) { throw re.unwrapRemoteException(AccessControlException.class, SafeModeException.class, UnresolvedPathException.class); + } finally { + scope.close(); } } public EncryptionZone getEZForPath(String src) throws IOException { checkOpen(); + TraceScope scope = getPathTraceScope("getEZForPath", src); try { return namenode.getEZForPath(src); } catch (RemoteException re) { throw re.unwrapRemoteException(AccessControlException.class, UnresolvedPathException.class); + } finally { + scope.close(); } } public RemoteIterator listEncryptionZones() throws IOException { checkOpen(); - return new EncryptionZoneIterator(namenode); + return new EncryptionZoneIterator(namenode, traceSampler); } public void setXAttr(String src, String name, byte[] value, EnumSet flag) throws IOException { checkOpen(); + TraceScope scope = getPathTraceScope("setXAttr", src); try { namenode.setXAttr(src, XAttrHelper.buildXAttr(name, value), flag); } catch (RemoteException re) { @@ -3036,11 +3269,14 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, SafeModeException.class, SnapshotAccessControlException.class, UnresolvedPathException.class); + } finally { + scope.close(); } } public byte[] getXAttr(String src, String name) throws IOException { checkOpen(); + TraceScope scope = getPathTraceScope("getXAttr", src); try { final List xAttrs = XAttrHelper.buildXAttrAsList(name); final List result = namenode.getXAttrs(src, xAttrs); @@ -3049,23 +3285,29 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, throw re.unwrapRemoteException(AccessControlException.class, FileNotFoundException.class, UnresolvedPathException.class); + } finally { + scope.close(); } } public Map getXAttrs(String src) throws IOException { checkOpen(); + TraceScope scope = getPathTraceScope("getXAttrs", src); try { return XAttrHelper.buildXAttrMap(namenode.getXAttrs(src, null)); } catch(RemoteException re) { throw re.unwrapRemoteException(AccessControlException.class, FileNotFoundException.class, UnresolvedPathException.class); + } finally { + scope.close(); } } public Map getXAttrs(String src, List names) throws IOException { checkOpen(); + TraceScope scope = getPathTraceScope("getXAttrs", src); try { return XAttrHelper.buildXAttrMap(namenode.getXAttrs( src, XAttrHelper.buildXAttrs(names))); @@ -3073,12 +3315,15 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, throw re.unwrapRemoteException(AccessControlException.class, FileNotFoundException.class, UnresolvedPathException.class); + } finally { + scope.close(); } } public List listXAttrs(String src) throws IOException { checkOpen(); + TraceScope scope = getPathTraceScope("listXAttrs", src); try { final Map xattrs = XAttrHelper.buildXAttrMap(namenode.listXAttrs(src)); @@ -3087,11 +3332,14 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, throw re.unwrapRemoteException(AccessControlException.class, FileNotFoundException.class, UnresolvedPathException.class); + } finally { + scope.close(); } } public void removeXAttr(String src, String name) throws IOException { checkOpen(); + TraceScope scope = getPathTraceScope("removeXAttr", src); try { namenode.removeXAttr(src, XAttrHelper.buildXAttr(name)); } catch(RemoteException re) { @@ -3101,27 +3349,32 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, SafeModeException.class, SnapshotAccessControlException.class, UnresolvedPathException.class); + } finally { + scope.close(); } } public void checkAccess(String src, FsAction mode) throws IOException { checkOpen(); + TraceScope scope = getPathTraceScope("checkAccess", src); try { namenode.checkAccess(src, mode); } catch (RemoteException re) { throw re.unwrapRemoteException(AccessControlException.class, FileNotFoundException.class, UnresolvedPathException.class); + } finally { + scope.close(); } } public DFSInotifyEventInputStream getInotifyEventStream() throws IOException { - return new DFSInotifyEventInputStream(namenode); + return new DFSInotifyEventInputStream(traceSampler, namenode); } public DFSInotifyEventInputStream getInotifyEventStream(long lastReadTxid) throws IOException { - return new DFSInotifyEventInputStream(namenode, lastReadTxid); + return new DFSInotifyEventInputStream(traceSampler, namenode, lastReadTxid); } @Override // RemotePeerFactory @@ -3240,4 +3493,24 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, } return scope; } + + private static final byte[] SRC = "src".getBytes(Charset.forName("UTF-8")); + + private static final byte[] DST = "dst".getBytes(Charset.forName("UTF-8")); + + TraceScope getSrcDstTraceScope(String description, String src, String dst) { + TraceScope scope = Trace.startSpan(description, traceSampler); + Span span = scope.getSpan(); + if (span != null) { + if (src != null) { + span.addKVAnnotation(SRC, + src.getBytes(Charset.forName("UTF-8"))); + } + if (dst != null) { + span.addKVAnnotation(DST, + dst.getBytes(Charset.forName("UTF-8"))); + } + } + return scope; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInotifyEventInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInotifyEventInputStream.java index 83b92b95387..803e4f2ec63 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInotifyEventInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInotifyEventInputStream.java @@ -26,6 +26,9 @@ import org.apache.hadoop.hdfs.inotify.EventBatchList; import org.apache.hadoop.hdfs.inotify.MissingEventsException; import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.util.Time; +import org.htrace.Sampler; +import org.htrace.Trace; +import org.htrace.TraceScope; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,6 +47,11 @@ public class DFSInotifyEventInputStream { public static Logger LOG = LoggerFactory.getLogger(DFSInotifyEventInputStream .class); + /** + * The trace sampler to use when making RPCs to the NameNode. + */ + private final Sampler traceSampler; + private final ClientProtocol namenode; private Iterator it; private long lastReadTxid; @@ -59,12 +67,15 @@ public class DFSInotifyEventInputStream { private static final int INITIAL_WAIT_MS = 10; - DFSInotifyEventInputStream(ClientProtocol namenode) throws IOException { - this(namenode, namenode.getCurrentEditLogTxid()); // only consider new txn's + DFSInotifyEventInputStream(Sampler traceSampler, ClientProtocol namenode) + throws IOException { + // Only consider new transaction IDs. + this(traceSampler, namenode, namenode.getCurrentEditLogTxid()); } - DFSInotifyEventInputStream(ClientProtocol namenode, long lastReadTxid) - throws IOException { + DFSInotifyEventInputStream(Sampler traceSampler, ClientProtocol namenode, + long lastReadTxid) throws IOException { + this.traceSampler = traceSampler; this.namenode = namenode; this.it = Iterators.emptyIterator(); this.lastReadTxid = lastReadTxid; @@ -87,39 +98,45 @@ public class DFSInotifyEventInputStream { * The next available batch of events will be returned. */ public EventBatch poll() throws IOException, MissingEventsException { - // need to keep retrying until the NN sends us the latest committed txid - if (lastReadTxid == -1) { - LOG.debug("poll(): lastReadTxid is -1, reading current txid from NN"); - lastReadTxid = namenode.getCurrentEditLogTxid(); - return null; - } - if (!it.hasNext()) { - EventBatchList el = namenode.getEditsFromTxid(lastReadTxid + 1); - if (el.getLastTxid() != -1) { - // we only want to set syncTxid when we were actually able to read some - // edits on the NN -- otherwise it will seem like edits are being - // generated faster than we can read them when the problem is really - // that we are temporarily unable to read edits - syncTxid = el.getSyncTxid(); - it = el.getBatches().iterator(); - long formerLastReadTxid = lastReadTxid; - lastReadTxid = el.getLastTxid(); - if (el.getFirstTxid() != formerLastReadTxid + 1) { - throw new MissingEventsException(formerLastReadTxid + 1, - el.getFirstTxid()); - } - } else { - LOG.debug("poll(): read no edits from the NN when requesting edits " + - "after txid {}", lastReadTxid); + TraceScope scope = + Trace.startSpan("inotifyPoll", traceSampler); + try { + // need to keep retrying until the NN sends us the latest committed txid + if (lastReadTxid == -1) { + LOG.debug("poll(): lastReadTxid is -1, reading current txid from NN"); + lastReadTxid = namenode.getCurrentEditLogTxid(); return null; } - } + if (!it.hasNext()) { + EventBatchList el = namenode.getEditsFromTxid(lastReadTxid + 1); + if (el.getLastTxid() != -1) { + // we only want to set syncTxid when we were actually able to read some + // edits on the NN -- otherwise it will seem like edits are being + // generated faster than we can read them when the problem is really + // that we are temporarily unable to read edits + syncTxid = el.getSyncTxid(); + it = el.getBatches().iterator(); + long formerLastReadTxid = lastReadTxid; + lastReadTxid = el.getLastTxid(); + if (el.getFirstTxid() != formerLastReadTxid + 1) { + throw new MissingEventsException(formerLastReadTxid + 1, + el.getFirstTxid()); + } + } else { + LOG.debug("poll(): read no edits from the NN when requesting edits " + + "after txid {}", lastReadTxid); + return null; + } + } - if (it.hasNext()) { // can be empty if el.getLastTxid != -1 but none of the - // newly seen edit log ops actually got converted to events - return it.next(); - } else { - return null; + if (it.hasNext()) { // can be empty if el.getLastTxid != -1 but none of the + // newly seen edit log ops actually got converted to events + return it.next(); + } else { + return null; + } + } finally { + scope.close(); } } @@ -163,25 +180,29 @@ public class DFSInotifyEventInputStream { */ public EventBatch poll(long time, TimeUnit tu) throws IOException, InterruptedException, MissingEventsException { - long initialTime = Time.monotonicNow(); - long totalWait = TimeUnit.MILLISECONDS.convert(time, tu); - long nextWait = INITIAL_WAIT_MS; + TraceScope scope = Trace.startSpan("inotifyPollWithTimeout", traceSampler); EventBatch next = null; - while ((next = poll()) == null) { - long timeLeft = totalWait - (Time.monotonicNow() - initialTime); - if (timeLeft <= 0) { - LOG.debug("timed poll(): timed out"); - break; - } else if (timeLeft < nextWait * 2) { - nextWait = timeLeft; - } else { - nextWait *= 2; + try { + long initialTime = Time.monotonicNow(); + long totalWait = TimeUnit.MILLISECONDS.convert(time, tu); + long nextWait = INITIAL_WAIT_MS; + while ((next = poll()) == null) { + long timeLeft = totalWait - (Time.monotonicNow() - initialTime); + if (timeLeft <= 0) { + LOG.debug("timed poll(): timed out"); + break; + } else if (timeLeft < nextWait * 2) { + nextWait = timeLeft; + } else { + nextWait *= 2; + } + LOG.debug("timed poll(): poll() returned null, sleeping for {} ms", + nextWait); + Thread.sleep(nextWait); } - LOG.debug("timed poll(): poll() returned null, sleeping for {} ms", - nextWait); - Thread.sleep(nextWait); + } finally { + scope.close(); } - return next; } @@ -196,18 +217,23 @@ public class DFSInotifyEventInputStream { */ public EventBatch take() throws IOException, InterruptedException, MissingEventsException { + TraceScope scope = Trace.startSpan("inotifyTake", traceSampler); EventBatch next = null; - int nextWaitMin = INITIAL_WAIT_MS; - while ((next = poll()) == null) { - // sleep for a random period between nextWaitMin and nextWaitMin * 2 - // to avoid stampedes at the NN if there are multiple clients - int sleepTime = nextWaitMin + rng.nextInt(nextWaitMin); - LOG.debug("take(): poll() returned null, sleeping for {} ms", sleepTime); - Thread.sleep(sleepTime); - // the maximum sleep is 2 minutes - nextWaitMin = Math.min(60000, nextWaitMin * 2); + try { + int nextWaitMin = INITIAL_WAIT_MS; + while ((next = poll()) == null) { + // sleep for a random period between nextWaitMin and nextWaitMin * 2 + // to avoid stampedes at the NN if there are multiple clients + int sleepTime = nextWaitMin + rng.nextInt(nextWaitMin); + LOG.debug("take(): poll() returned null, sleeping for {} ms", sleepTime); + Thread.sleep(sleepTime); + // the maximum sleep is 2 minutes + nextWaitMin = Math.min(60000, nextWaitMin * 2); + } + } finally { + scope.close(); } return next; } -} \ No newline at end of file +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/CacheDirectiveIterator.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/CacheDirectiveIterator.java index 676106de10f..d28b771b573 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/CacheDirectiveIterator.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/CacheDirectiveIterator.java @@ -27,6 +27,9 @@ import org.apache.hadoop.fs.InvalidRequestException; import org.apache.hadoop.ipc.RemoteException; import com.google.common.base.Preconditions; +import org.htrace.Sampler; +import org.htrace.Trace; +import org.htrace.TraceScope; /** * CacheDirectiveIterator is a remote iterator that iterates cache directives. @@ -39,12 +42,14 @@ public class CacheDirectiveIterator private CacheDirectiveInfo filter; private final ClientProtocol namenode; + private final Sampler traceSampler; public CacheDirectiveIterator(ClientProtocol namenode, - CacheDirectiveInfo filter) { + CacheDirectiveInfo filter, Sampler traceSampler) { super(0L); this.namenode = namenode; this.filter = filter; + this.traceSampler = traceSampler; } private static CacheDirectiveInfo removeIdFromFilter(CacheDirectiveInfo filter) { @@ -89,6 +94,7 @@ public class CacheDirectiveIterator public BatchedEntries makeRequest(Long prevKey) throws IOException { BatchedEntries entries = null; + TraceScope scope = Trace.startSpan("listCacheDirectives", traceSampler); try { entries = namenode.listCacheDirectives(prevKey, filter); } catch (IOException e) { @@ -110,6 +116,8 @@ public class CacheDirectiveIterator "Did not find requested id " + id); } throw e; + } finally { + scope.close(); } Preconditions.checkNotNull(entries); return entries; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/CachePoolIterator.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/CachePoolIterator.java index 44d6b451742..1f17c8edb33 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/CachePoolIterator.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/CachePoolIterator.java @@ -23,6 +23,9 @@ import java.io.IOException; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.fs.BatchedRemoteIterator; +import org.htrace.Sampler; +import org.htrace.Trace; +import org.htrace.TraceScope; /** * CachePoolIterator is a remote iterator that iterates cache pools. @@ -34,16 +37,23 @@ public class CachePoolIterator extends BatchedRemoteIterator { private final ClientProtocol namenode; + private final Sampler traceSampler; - public CachePoolIterator(ClientProtocol namenode) { + public CachePoolIterator(ClientProtocol namenode, Sampler traceSampler) { super(""); this.namenode = namenode; + this.traceSampler = traceSampler; } @Override public BatchedEntries makeRequest(String prevKey) throws IOException { - return namenode.listCachePools(prevKey); + TraceScope scope = Trace.startSpan("listCachePools", traceSampler); + try { + return namenode.listCachePools(prevKey); + } finally { + scope.close(); + } } @Override diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/EncryptionZoneIterator.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/EncryptionZoneIterator.java index b8c21b048ac..8a648e81df8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/EncryptionZoneIterator.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/EncryptionZoneIterator.java @@ -23,6 +23,9 @@ import java.io.IOException; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.fs.BatchedRemoteIterator; +import org.htrace.Sampler; +import org.htrace.Trace; +import org.htrace.TraceScope; /** * EncryptionZoneIterator is a remote iterator that iterates over encryption @@ -34,16 +37,24 @@ public class EncryptionZoneIterator extends BatchedRemoteIterator { private final ClientProtocol namenode; + private final Sampler traceSampler; - public EncryptionZoneIterator(ClientProtocol namenode) { + public EncryptionZoneIterator(ClientProtocol namenode, + Sampler traceSampler) { super(Long.valueOf(0)); this.namenode = namenode; + this.traceSampler = traceSampler; } @Override public BatchedEntries makeRequest(Long prevId) throws IOException { - return namenode.listEncryptionZones(prevId); + TraceScope scope = Trace.startSpan("listEncryptionZones", traceSampler); + try { + return namenode.listEncryptionZones(prevId); + } finally { + scope.close(); + } } @Override diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCacheDirectives.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCacheDirectives.java index 93076928f3e..70410ce4f86 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCacheDirectives.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCacheDirectives.java @@ -88,6 +88,7 @@ import org.apache.hadoop.util.GSet; import org.apache.log4j.Level; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; +import org.htrace.Sampler; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -908,7 +909,7 @@ public class TestCacheDirectives { // Uncache and check each path in sequence RemoteIterator entries = - new CacheDirectiveIterator(nnRpc, null); + new CacheDirectiveIterator(nnRpc, null, Sampler.NEVER); for (int i=0; i