HDFS-9170. Move libhdfs / fuse-dfs / libwebhdfs to hdfs-client. Contributed by Haohui Mai.

This commit is contained in:
Haohui Mai 2015-09-29 17:48:29 -07:00
parent 6f335e4f0e
commit d5a9a3daa0
24 changed files with 230 additions and 573 deletions

View File

@ -349,17 +349,13 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
if (clientContext.getUseLegacyBlockReaderLocal()) { if (clientContext.getUseLegacyBlockReaderLocal()) {
reader = getLegacyBlockReaderLocal(); reader = getLegacyBlockReaderLocal();
if (reader != null) { if (reader != null) {
if (LOG.isTraceEnabled()) { LOG.trace("{}: returning new legacy block reader local.", this);
LOG.trace(this + ": returning new legacy block reader local.");
}
return reader; return reader;
} }
} else { } else {
reader = getBlockReaderLocal(); reader = getBlockReaderLocal();
if (reader != null) { if (reader != null) {
if (LOG.isTraceEnabled()) { LOG.trace("{}: returning new block reader local.", this);
LOG.trace(this + ": returning new block reader local.");
}
return reader; return reader;
} }
} }
@ -367,10 +363,8 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
if (scConf.isDomainSocketDataTraffic()) { if (scConf.isDomainSocketDataTraffic()) {
reader = getRemoteBlockReaderFromDomain(); reader = getRemoteBlockReaderFromDomain();
if (reader != null) { if (reader != null) {
if (LOG.isTraceEnabled()) { LOG.trace("{}: returning new remote block reader using UNIX domain "
LOG.trace(this + ": returning new remote block reader using " + + "socket on {}", this, pathInfo.getPath());
"UNIX domain socket on " + pathInfo.getPath());
}
return reader; return reader;
} }
} }
@ -405,10 +399,8 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
setVisibleLength(visibleLength). setVisibleLength(visibleLength).
build(); build();
if (accessor == null) { if (accessor == null) {
if (LOG.isTraceEnabled()) { LOG.trace("{}: No ReplicaAccessor created by {}",
LOG.trace(this + ": No ReplicaAccessor created by " + this, cls.getName());
cls.getName());
}
} else { } else {
return new ExternalBlockReader(accessor, visibleLength, startOffset); return new ExternalBlockReader(accessor, visibleLength, startOffset);
} }
@ -427,14 +419,10 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
* first introduced in HDFS-2246. * first introduced in HDFS-2246.
*/ */
private BlockReader getLegacyBlockReaderLocal() throws IOException { private BlockReader getLegacyBlockReaderLocal() throws IOException {
if (LOG.isTraceEnabled()) { LOG.trace("{}: trying to construct BlockReaderLocalLegacy", this);
LOG.trace(this + ": trying to construct BlockReaderLocalLegacy");
}
if (!DFSUtilClient.isLocalAddress(inetSocketAddress)) { if (!DFSUtilClient.isLocalAddress(inetSocketAddress)) {
if (LOG.isTraceEnabled()) { LOG.trace("{}: can't construct BlockReaderLocalLegacy because the address"
LOG.trace(this + ": can't construct BlockReaderLocalLegacy because " + + "{} is not local", this, inetSocketAddress);
"the address " + inetSocketAddress + " is not local");
}
return null; return null;
} }
if (clientContext.getDisableLegacyBlockReaderLocal()) { if (clientContext.getDisableLegacyBlockReaderLocal()) {
@ -470,10 +458,8 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
} }
private BlockReader getBlockReaderLocal() throws InvalidToken { private BlockReader getBlockReaderLocal() throws InvalidToken {
if (LOG.isTraceEnabled()) { LOG.trace("{}: trying to construct a BlockReaderLocal for short-circuit "
LOG.trace(this + ": trying to construct a BlockReaderLocal " + + " reads.", this);
"for short-circuit reads.");
}
if (pathInfo == null) { if (pathInfo == null) {
pathInfo = clientContext.getDomainSocketFactory() pathInfo = clientContext.getDomainSocketFactory()
.getPathInfo(inetSocketAddress, conf.getShortCircuitConf()); .getPathInfo(inetSocketAddress, conf.getShortCircuitConf());
@ -488,10 +474,8 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
ShortCircuitReplicaInfo info = cache.fetchOrCreate(key, this); ShortCircuitReplicaInfo info = cache.fetchOrCreate(key, this);
InvalidToken exc = info.getInvalidTokenException(); InvalidToken exc = info.getInvalidTokenException();
if (exc != null) { if (exc != null) {
if (LOG.isTraceEnabled()) { LOG.trace("{}: got InvalidToken exception while trying to construct "
LOG.trace(this + ": got InvalidToken exception while trying to " + + "BlockReaderLocal via {}", this, pathInfo.getPath());
"construct BlockReaderLocal via " + pathInfo.getPath());
}
throw exc; throw exc;
} }
if (info.getReplica() == null) { if (info.getReplica() == null) {
@ -527,9 +511,7 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
createShortCircuitReplicaInfoCallback.createShortCircuitReplicaInfo(); createShortCircuitReplicaInfoCallback.createShortCircuitReplicaInfo();
if (info != null) return info; if (info != null) return info;
} }
if (LOG.isTraceEnabled()) { LOG.trace("{}: trying to create ShortCircuitReplicaInfo.", this);
LOG.trace(this + ": trying to create ShortCircuitReplicaInfo.");
}
BlockReaderPeer curPeer; BlockReaderPeer curPeer;
while (true) { while (true) {
curPeer = nextDomainPeer(); curPeer = nextDomainPeer();
@ -544,10 +526,8 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
new ExtendedBlockId(block.getBlockId(), block.getBlockPoolId()), new ExtendedBlockId(block.getBlockId(), block.getBlockPoolId()),
clientName); clientName);
if (usedPeer.booleanValue()) { if (usedPeer.booleanValue()) {
if (LOG.isTraceEnabled()) { LOG.trace("{}: allocShmSlot used up our previous socket {}. "
LOG.trace(this + ": allocShmSlot used up our previous socket " + + "Allocating a new one...", this, peer.getDomainSocket());
peer.getDomainSocket() + ". Allocating a new one...");
}
curPeer = nextDomainPeer(); curPeer = nextDomainPeer();
if (curPeer == null) break; if (curPeer == null) break;
peer = (DomainPeer)curPeer.peer; peer = (DomainPeer)curPeer.peer;
@ -562,9 +542,7 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
if (curPeer.fromCache) { if (curPeer.fromCache) {
// Handle an I/O error we got when using a cached socket. // Handle an I/O error we got when using a cached socket.
// These are considered less serious, because the socket may be stale. // These are considered less serious, because the socket may be stale.
if (LOG.isDebugEnabled()) { LOG.debug("{}: closing stale domain peer {}", this, peer, e);
LOG.debug(this + ": closing stale domain peer " + peer, e);
}
IOUtilsClient.cleanup(LOG, peer); IOUtilsClient.cleanup(LOG, peer);
} else { } else {
// Handle an I/O error we got when using a newly created socket. // Handle an I/O error we got when using a newly created socket.
@ -617,7 +595,7 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
ExtendedBlockId key = ExtendedBlockId key =
new ExtendedBlockId(block.getBlockId(), block.getBlockPoolId()); new ExtendedBlockId(block.getBlockId(), block.getBlockPoolId());
if (buf[0] == USE_RECEIPT_VERIFICATION.getNumber()) { if (buf[0] == USE_RECEIPT_VERIFICATION.getNumber()) {
LOG.trace("Sending receipt verification byte for slot " + slot); LOG.trace("Sending receipt verification byte for slot {}", slot);
sock.getOutputStream().write(0); sock.getOutputStream().write(0);
} }
replica = new ShortCircuitReplica(key, fis[0], fis[1], cache, replica = new ShortCircuitReplica(key, fis[0], fis[1], cache,
@ -650,9 +628,7 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
String msg = "access control error while " + String msg = "access control error while " +
"attempting to set up short-circuit access to " + "attempting to set up short-circuit access to " +
fileName + resp.getMessage(); fileName + resp.getMessage();
if (LOG.isDebugEnabled()) { LOG.debug("{}:{}", this, msg);
LOG.debug(this + ":" + msg);
}
return new ShortCircuitReplicaInfo(new InvalidToken(msg)); return new ShortCircuitReplicaInfo(new InvalidToken(msg));
default: default:
LOG.warn(this + ": unknown response code " + resp.getStatus() + LOG.warn(this + ": unknown response code " + resp.getStatus() +
@ -684,10 +660,8 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
" is not usable.", this, pathInfo); " is not usable.", this, pathInfo);
return null; return null;
} }
if (LOG.isTraceEnabled()) { LOG.trace("{}: trying to create a remote block reader from the UNIX domain "
LOG.trace(this + ": trying to create a remote block reader from the " + + "socket at {}", this, pathInfo.getPath());
"UNIX domain socket at " + pathInfo.getPath());
}
while (true) { while (true) {
BlockReaderPeer curPeer = nextDomainPeer(); BlockReaderPeer curPeer = nextDomainPeer();
@ -701,19 +675,15 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
} catch (IOException ioe) { } catch (IOException ioe) {
IOUtilsClient.cleanup(LOG, peer); IOUtilsClient.cleanup(LOG, peer);
if (isSecurityException(ioe)) { if (isSecurityException(ioe)) {
if (LOG.isTraceEnabled()) { LOG.trace("{}: got security exception while constructing a remote "
LOG.trace(this + ": got security exception while constructing " + + " block reader from the unix domain socket at {}",
"a remote block reader from the unix domain socket at " + this, pathInfo.getPath(), ioe);
pathInfo.getPath(), ioe);
}
throw ioe; throw ioe;
} }
if (curPeer.fromCache) { if (curPeer.fromCache) {
// Handle an I/O error we got when using a cached peer. These are // Handle an I/O error we got when using a cached peer. These are
// considered less serious, because the underlying socket may be stale. // considered less serious, because the underlying socket may be stale.
if (LOG.isDebugEnabled()) { LOG.debug("Closed potentially stale domain peer {}", peer, ioe);
LOG.debug("Closed potentially stale domain peer " + peer, ioe);
}
} else { } else {
// Handle an I/O error we got when using a newly created domain peer. // Handle an I/O error we got when using a newly created domain peer.
// We temporarily disable the domain socket path for a few minutes in // We temporarily disable the domain socket path for a few minutes in
@ -747,10 +717,8 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
* If there was another problem. * If there was another problem.
*/ */
private BlockReader getRemoteBlockReaderFromTcp() throws IOException { private BlockReader getRemoteBlockReaderFromTcp() throws IOException {
if (LOG.isTraceEnabled()) { LOG.trace("{}: trying to create a remote block reader from a TCP socket",
LOG.trace(this + ": trying to create a remote block reader from a " + this);
"TCP socket");
}
BlockReader blockReader = null; BlockReader blockReader = null;
while (true) { while (true) {
BlockReaderPeer curPeer = null; BlockReaderPeer curPeer = null;
@ -763,19 +731,15 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
return blockReader; return blockReader;
} catch (IOException ioe) { } catch (IOException ioe) {
if (isSecurityException(ioe)) { if (isSecurityException(ioe)) {
if (LOG.isTraceEnabled()) { LOG.trace("{}: got security exception while constructing a remote "
LOG.trace(this + ": got security exception while constructing " + + "block reader from {}", this, peer, ioe);
"a remote block reader from " + peer, ioe);
}
throw ioe; throw ioe;
} }
if ((curPeer != null) && curPeer.fromCache) { if ((curPeer != null) && curPeer.fromCache) {
// Handle an I/O error we got when using a cached peer. These are // Handle an I/O error we got when using a cached peer. These are
// considered less serious, because the underlying socket may be // considered less serious, because the underlying socket may be
// stale. // stale.
if (LOG.isDebugEnabled()) { LOG.debug("Closed potentially stale remote peer {}", peer, ioe);
LOG.debug("Closed potentially stale remote peer " + peer, ioe);
}
} else { } else {
// Handle an I/O error we got when using a newly created peer. // Handle an I/O error we got when using a newly created peer.
LOG.warn("I/O error constructing remote block reader.", ioe); LOG.warn("I/O error constructing remote block reader.", ioe);
@ -808,9 +772,7 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
if (remainingCacheTries > 0) { if (remainingCacheTries > 0) {
Peer peer = clientContext.getPeerCache().get(datanode, true); Peer peer = clientContext.getPeerCache().get(datanode, true);
if (peer != null) { if (peer != null) {
if (LOG.isTraceEnabled()) { LOG.trace("nextDomainPeer: reusing existing peer {}", peer);
LOG.trace("nextDomainPeer: reusing existing peer " + peer);
}
return new BlockReaderPeer(peer, true); return new BlockReaderPeer(peer, true);
} }
} }
@ -832,24 +794,18 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
if (remainingCacheTries > 0) { if (remainingCacheTries > 0) {
Peer peer = clientContext.getPeerCache().get(datanode, false); Peer peer = clientContext.getPeerCache().get(datanode, false);
if (peer != null) { if (peer != null) {
if (LOG.isTraceEnabled()) { LOG.trace("nextTcpPeer: reusing existing peer {}", peer);
LOG.trace("nextTcpPeer: reusing existing peer " + peer);
}
return new BlockReaderPeer(peer, true); return new BlockReaderPeer(peer, true);
} }
} }
try { try {
Peer peer = remotePeerFactory.newConnectedPeer(inetSocketAddress, token, Peer peer = remotePeerFactory.newConnectedPeer(inetSocketAddress, token,
datanode); datanode);
if (LOG.isTraceEnabled()) { LOG.trace("nextTcpPeer: created newConnectedPeer {}", peer);
LOG.trace("nextTcpPeer: created newConnectedPeer " + peer);
}
return new BlockReaderPeer(peer, false); return new BlockReaderPeer(peer, false);
} catch (IOException e) { } catch (IOException e) {
if (LOG.isTraceEnabled()) { LOG.trace("nextTcpPeer: failed to create newConnectedPeer connected to"
LOG.trace("nextTcpPeer: failed to create newConnectedPeer " + + "{}", datanode);
"connected to " + datanode);
}
throw e; throw e;
} }
} }

View File

@ -412,17 +412,10 @@ class BlockReaderLocal implements BlockReader {
public synchronized int read(ByteBuffer buf) throws IOException { public synchronized int read(ByteBuffer buf) throws IOException {
boolean canSkipChecksum = createNoChecksumContext(); boolean canSkipChecksum = createNoChecksumContext();
try { try {
String traceString = null; String traceFormatStr = "read(buf.remaining={}, block={}, filename={}, "
if (LOG.isTraceEnabled()) { + "canSkipChecksum={})";
traceString = new StringBuilder(). LOG.trace(traceFormatStr + ": starting",
append("read("). buf.remaining(), block, filename, canSkipChecksum);
append("buf.remaining=").append(buf.remaining()).
append(", block=").append(block).
append(", filename=").append(filename).
append(", canSkipChecksum=").append(canSkipChecksum).
append(")").toString();
LOG.info(traceString + ": starting");
}
int nRead; int nRead;
try { try {
if (canSkipChecksum && zeroReadaheadRequested) { if (canSkipChecksum && zeroReadaheadRequested) {
@ -431,14 +424,12 @@ class BlockReaderLocal implements BlockReader {
nRead = readWithBounceBuffer(buf, canSkipChecksum); nRead = readWithBounceBuffer(buf, canSkipChecksum);
} }
} catch (IOException e) { } catch (IOException e) {
if (LOG.isTraceEnabled()) { LOG.trace(traceFormatStr + ": I/O error",
LOG.info(traceString + ": I/O error", e); buf.remaining(), block, filename, canSkipChecksum, e);
}
throw e; throw e;
} }
if (LOG.isTraceEnabled()) { LOG.trace(traceFormatStr + ": returning {}",
LOG.info(traceString + ": returning " + nRead); buf.remaining(), block, filename, canSkipChecksum, nRead);
}
return nRead; return nRead;
} finally { } finally {
if (canSkipChecksum) releaseNoChecksumContext(); if (canSkipChecksum) releaseNoChecksumContext();
@ -490,10 +481,8 @@ class BlockReaderLocal implements BlockReader {
} }
dataBuf.limit(dataBuf.position()); dataBuf.limit(dataBuf.position());
dataBuf.position(Math.min(dataBuf.position(), slop)); dataBuf.position(Math.min(dataBuf.position(), slop));
if (LOG.isTraceEnabled()) { LOG.trace("loaded {} bytes into bounce buffer from offset {} of {}",
LOG.trace("loaded " + dataBuf.remaining() + " bytes into bounce " + dataBuf.remaining(), oldDataPos, block);
"buffer from offset " + oldDataPos + " of " + block);
}
return dataBuf.limit() != maxReadaheadLength; return dataBuf.limit() != maxReadaheadLength;
} }
@ -565,18 +554,10 @@ class BlockReaderLocal implements BlockReader {
boolean canSkipChecksum = createNoChecksumContext(); boolean canSkipChecksum = createNoChecksumContext();
int nRead; int nRead;
try { try {
String traceString = null; final String traceFormatStr = "read(arr.length={}, off={}, len={}, "
if (LOG.isTraceEnabled()) { + "filename={}, block={}, canSkipChecksum={})";
traceString = new StringBuilder(). LOG.trace(traceFormatStr + ": starting",
append("read(arr.length=").append(arr.length). arr.length, off, len, filename, block, canSkipChecksum);
append(", off=").append(off).
append(", len=").append(len).
append(", filename=").append(filename).
append(", block=").append(block).
append(", canSkipChecksum=").append(canSkipChecksum).
append(")").toString();
LOG.trace(traceString + ": starting");
}
try { try {
if (canSkipChecksum && zeroReadaheadRequested) { if (canSkipChecksum && zeroReadaheadRequested) {
nRead = readWithoutBounceBuffer(arr, off, len); nRead = readWithoutBounceBuffer(arr, off, len);
@ -584,14 +565,12 @@ class BlockReaderLocal implements BlockReader {
nRead = readWithBounceBuffer(arr, off, len, canSkipChecksum); nRead = readWithBounceBuffer(arr, off, len, canSkipChecksum);
} }
} catch (IOException e) { } catch (IOException e) {
if (LOG.isTraceEnabled()) { LOG.trace(traceFormatStr + ": I/O error",
LOG.trace(traceString + ": I/O error", e); arr.length, off, len, filename, block, canSkipChecksum, e);
}
throw e; throw e;
} }
if (LOG.isTraceEnabled()) { LOG.trace(traceFormatStr + ": returning {}",
LOG.trace(traceString + ": returning " + nRead); arr.length, off, len, filename, block, canSkipChecksum, nRead);
}
} finally { } finally {
if (canSkipChecksum) releaseNoChecksumContext(); if (canSkipChecksum) releaseNoChecksumContext();
} }
@ -634,11 +613,9 @@ class BlockReaderLocal implements BlockReader {
dataBuf.position(dataBuf.position() + discardedFromBuf); dataBuf.position(dataBuf.position() + discardedFromBuf);
remaining -= discardedFromBuf; remaining -= discardedFromBuf;
} }
if (LOG.isTraceEnabled()) { LOG.trace("skip(n={}, block={}, filename={}): discarded {} bytes from "
LOG.trace("skip(n=" + n + ", block=" + block + ", filename=" + + "dataBuf and advanced dataPos by {}",
filename + "): discarded " + discardedFromBuf + " bytes from " + n, block, filename, discardedFromBuf, remaining);
"dataBuf and advanced dataPos by " + remaining);
}
dataPos += remaining; dataPos += remaining;
return n; return n;
} }
@ -653,9 +630,7 @@ class BlockReaderLocal implements BlockReader {
public synchronized void close() throws IOException { public synchronized void close() throws IOException {
if (closed) return; if (closed) return;
closed = true; closed = true;
if (LOG.isTraceEnabled()) { LOG.trace("close(filename={}, block={})", filename, block);
LOG.trace("close(filename=" + filename + ", block=" + block + ")");
}
replica.unref(); replica.unref();
freeDataBufIfExists(); freeDataBufIfExists();
freeChecksumBufIfExists(); freeChecksumBufIfExists();
@ -705,11 +680,9 @@ class BlockReaderLocal implements BlockReader {
(opts.contains(ReadOption.SKIP_CHECKSUMS) == false); (opts.contains(ReadOption.SKIP_CHECKSUMS) == false);
if (anchor) { if (anchor) {
if (!createNoChecksumContext()) { if (!createNoChecksumContext()) {
if (LOG.isTraceEnabled()) { LOG.trace("can't get an mmap for {} of {} since SKIP_CHECKSUMS was not "
LOG.trace("can't get an mmap for " + block + " of " + filename + + "given, we aren't skipping checksums, and the block is not "
" since SKIP_CHECKSUMS was not given, " + + "mlocked.", block, filename);
"we aren't skipping checksums, and the block is not mlocked.");
}
return null; return null;
} }
} }

View File

@ -221,11 +221,9 @@ class BlockReaderLocalLegacy implements BlockReader {
File blkfile = new File(pathinfo.getBlockPath()); File blkfile = new File(pathinfo.getBlockPath());
dataIn = new FileInputStream(blkfile); dataIn = new FileInputStream(blkfile);
if (LOG.isDebugEnabled()) { LOG.debug("New BlockReaderLocalLegacy for file {} of size {} startOffset "
LOG.debug("New BlockReaderLocalLegacy for file " + blkfile + " of size " + "{} length {} short circuit checksum {}",
+ blkfile.length() + " startOffset " + startOffset + " length " blkfile, blkfile.length(), startOffset, length, !skipChecksumCheck);
+ length + " short circuit checksum " + !skipChecksumCheck);
}
if (!skipChecksumCheck) { if (!skipChecksumCheck) {
// get the metadata file // get the metadata file
@ -292,9 +290,7 @@ class BlockReaderLocalLegacy implements BlockReader {
// channel for the DataNode to notify the client that the path has been // channel for the DataNode to notify the client that the path has been
// invalidated. Therefore, our only option is to skip caching. // invalidated. Therefore, our only option is to skip caching.
if (pathinfo != null && !storageType.isTransient()) { if (pathinfo != null && !storageType.isTransient()) {
if (LOG.isDebugEnabled()) { LOG.debug("Cached location of block {} as {}", blk, pathinfo);
LOG.debug("Cached location of block " + blk + " as " + pathinfo);
}
localDatanodeInfo.setBlockLocalPathInfo(blk, pathinfo); localDatanodeInfo.setBlockLocalPathInfo(blk, pathinfo);
} }
} catch (IOException e) { } catch (IOException e) {
@ -603,9 +599,7 @@ class BlockReaderLocalLegacy implements BlockReader {
@Override @Override
public synchronized int read(byte[] buf, int off, int len) throws IOException { public synchronized int read(byte[] buf, int off, int len) throws IOException {
if (LOG.isTraceEnabled()) { LOG.trace("read off {} len {}", off, len);
LOG.trace("read off " + off + " len " + len);
}
if (!verifyChecksum) { if (!verifyChecksum) {
return dataIn.read(buf, off, len); return dataIn.read(buf, off, len);
} }
@ -624,9 +618,7 @@ class BlockReaderLocalLegacy implements BlockReader {
@Override @Override
public synchronized long skip(long n) throws IOException { public synchronized long skip(long n) throws IOException {
if (LOG.isDebugEnabled()) { LOG.debug("skip {}", n);
LOG.debug("skip " + n);
}
if (n <= 0) { if (n <= 0) {
return 0; return 0;
} }

View File

@ -419,9 +419,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
} }
final int idx = r.nextInt(localInterfaceAddrs.length); final int idx = r.nextInt(localInterfaceAddrs.length);
final SocketAddress addr = localInterfaceAddrs[idx]; final SocketAddress addr = localInterfaceAddrs[idx];
if (LOG.isDebugEnabled()) { LOG.debug("Using local interface {}", addr);
LOG.debug("Using local interface " + addr);
}
return addr; return addr;
} }
@ -1216,9 +1214,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
InetSocketAddress[] favoredNodes) throws IOException { InetSocketAddress[] favoredNodes) throws IOException {
checkOpen(); checkOpen();
final FsPermission masked = applyUMask(permission); final FsPermission masked = applyUMask(permission);
if(LOG.isDebugEnabled()) { LOG.debug("{}: masked={}", src, masked);
LOG.debug(src + ": masked=" + masked);
}
final DFSOutputStream result = DFSOutputStream.newStreamForCreate(this, final DFSOutputStream result = DFSOutputStream.newStreamForCreate(this,
src, masked, flag, createParent, replication, blockSize, progress, src, masked, flag, createParent, replication, blockSize, progress,
buffersize, dfsClientConf.createChecksum(checksumOpt), buffersize, dfsClientConf.createChecksum(checksumOpt),
@ -1815,10 +1811,8 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
smallBufferSize)); smallBufferSize));
in = new DataInputStream(pair.in); in = new DataInputStream(pair.in);
if (LOG.isDebugEnabled()) { LOG.debug("write to {}: {}, block={}",
LOG.debug("write to " + datanodes[j] + ": " datanodes[j], Op.BLOCK_CHECKSUM, block);
+ Op.BLOCK_CHECKSUM + ", block=" + block);
}
// get block MD5 // get block MD5
new Sender(out).blockChecksum(block, lb.getBlockToken()); new Sender(out).blockChecksum(block, lb.getBlockToken());
@ -1882,12 +1876,10 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
} }
} catch (InvalidBlockTokenException ibte) { } catch (InvalidBlockTokenException ibte) {
if (i > lastRetriedIndex) { if (i > lastRetriedIndex) {
if (LOG.isDebugEnabled()) { LOG.debug("Got access token error in response to OP_BLOCK_CHECKSUM "
LOG.debug("Got access token error in response to OP_BLOCK_CHECKSUM " + "for file {} for block {} from datanode {}. Will retry the "
+ "for file " + src + " for block " + block + "block once.",
+ " from datanode " + datanodes[j] src, block, datanodes[j]);
+ ". Will retry the block once.");
}
lastRetriedIndex = i; lastRetriedIndex = i;
done = true; // actually it's not done; but we'll retry done = true; // actually it's not done; but we'll retry
i--; // repeat at i-th block i--; // repeat at i-th block
@ -1941,9 +1933,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
try { try {
sock = socketFactory.createSocket(); sock = socketFactory.createSocket();
String dnAddr = dn.getXferAddr(getConf().isConnectToDnViaHostname()); String dnAddr = dn.getXferAddr(getConf().isConnectToDnViaHostname());
if (LOG.isDebugEnabled()) { LOG.debug("Connecting to datanode {}", dnAddr);
LOG.debug("Connecting to datanode " + dnAddr);
}
NetUtils.connect(sock, NetUtils.createSocketAddr(dnAddr), timeout); NetUtils.connect(sock, NetUtils.createSocketAddr(dnAddr), timeout);
sock.setSoTimeout(timeout); sock.setSoTimeout(timeout);
@ -2563,9 +2553,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
absPermission = applyUMask(null); absPermission = applyUMask(null);
} }
if(LOG.isDebugEnabled()) { LOG.debug("{}: masked={}", src, absPermission);
LOG.debug(src + ": masked=" + absPermission);
}
TraceScope scope = tracer.newScope("mkdir"); TraceScope scope = tracer.newScope("mkdir");
try { try {
return namenode.mkdirs(src, absPermission, createParent); return namenode.mkdirs(src, absPermission, createParent);
@ -3061,9 +3049,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
} }
}); });
HEDGED_READ_THREAD_POOL.allowCoreThreadTimeOut(true); HEDGED_READ_THREAD_POOL.allowCoreThreadTimeOut(true);
if (LOG.isDebugEnabled()) { LOG.debug("Using hedged reads; pool threads={}", num);
LOG.debug("Using hedged reads; pool threads=" + num);
}
} }
ThreadPoolExecutor getHedgedReadsThreadPool() { ThreadPoolExecutor getHedgedReadsThreadPool() {

View File

@ -315,9 +315,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
if (locatedBlocks == null || refresh) { if (locatedBlocks == null || refresh) {
newInfo = dfsClient.getLocatedBlocks(src, 0); newInfo = dfsClient.getLocatedBlocks(src, 0);
} }
if (DFSClient.LOG.isDebugEnabled()) { DFSClient.LOG.debug("newInfo = {}", newInfo);
DFSClient.LOG.debug("newInfo = " + newInfo);
}
if (newInfo == null) { if (newInfo == null) {
throw new IOException("Cannot open filename " + src); throw new IOException("Cannot open filename " + src);
} }
@ -383,10 +381,8 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
replicaNotFoundCount--; replicaNotFoundCount--;
} }
if (DFSClient.LOG.isDebugEnabled()) { DFSClient.LOG.debug("Failed to getReplicaVisibleLength from datanode {}"
DFSClient.LOG.debug("Failed to getReplicaVisibleLength from datanode " + " for block {}", datanode, locatedblock.getBlock(), ioe);
+ datanode + " for block " + locatedblock.getBlock(), ioe);
}
} finally { } finally {
if (cdp != null) { if (cdp != null) {
RPC.stopProxy(cdp); RPC.stopProxy(cdp);
@ -1067,9 +1063,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
} }
final String dnAddr = final String dnAddr =
chosenNode.getXferAddr(dfsClient.getConf().isConnectToDnViaHostname()); chosenNode.getXferAddr(dfsClient.getConf().isConnectToDnViaHostname());
if (DFSClient.LOG.isDebugEnabled()) { DFSClient.LOG.debug("Connecting to datanode {}", dnAddr);
DFSClient.LOG.debug("Connecting to datanode " + dnAddr);
}
InetSocketAddress targetAddr = NetUtils.createSocketAddr(dnAddr); InetSocketAddress targetAddr = NetUtils.createSocketAddr(dnAddr);
return new DNAddrPair(chosenNode, targetAddr, storageType); return new DNAddrPair(chosenNode, targetAddr, storageType);
} }
@ -1309,11 +1303,8 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
future.get(); future.get();
return; return;
} }
if (DFSClient.LOG.isDebugEnabled()) { DFSClient.LOG.debug("Waited {}ms to read from {}; spawning hedged "
DFSClient.LOG.debug("Waited " + conf.getHedgedReadThresholdMillis() + "read", conf.getHedgedReadThresholdMillis(), chosenNode.info);
+ "ms to read from " + chosenNode.info
+ "; spawning hedged read");
}
// Ignore this node on next go around. // Ignore this node on next go around.
ignored.add(chosenNode.info); ignored.add(chosenNode.info);
dfsClient.getHedgedReadMetrics().incHedgedReadOps(); dfsClient.getHedgedReadMetrics().incHedgedReadOps();
@ -1340,10 +1331,8 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
.submit(getFromDataNodeCallable); .submit(getFromDataNodeCallable);
futures.add(oneMoreRequest); futures.add(oneMoreRequest);
} catch (IOException ioe) { } catch (IOException ioe) {
if (DFSClient.LOG.isDebugEnabled()) { DFSClient.LOG.debug("Failed getting node for hedged read: {}",
DFSClient.LOG.debug("Failed getting node for hedged read: " ioe.getMessage());
+ ioe.getMessage());
}
} }
// if not succeeded. Submit callables for each datanode in a loop, wait // if not succeeded. Submit callables for each datanode in a loop, wait
// for a fixed interval and get the result from the fastest one. // for a fixed interval and get the result from the fastest one.
@ -1599,11 +1588,8 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
throw new IOException(errMsg); throw new IOException(errMsg);
} }
} catch (IOException e) {//make following read to retry } catch (IOException e) {//make following read to retry
if(DFSClient.LOG.isDebugEnabled()) { DFSClient.LOG.debug("Exception while seek to {} from {} of {} from "
DFSClient.LOG.debug("Exception while seek to " + targetPos + "{}", targetPos, getCurrentBlock(), src, currentNode, e);
+ " from " + getCurrentBlock() + " of " + src + " from "
+ currentNode, e);
}
} }
} }
} }
@ -1819,20 +1805,16 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
} else { } else {
length63 = 1 + curEnd - curPos; length63 = 1 + curEnd - curPos;
if (length63 <= 0) { if (length63 <= 0) {
if (DFSClient.LOG.isDebugEnabled()) { DFSClient.LOG.debug("Unable to perform a zero-copy read from offset {}"
DFSClient.LOG.debug("Unable to perform a zero-copy read from offset " + + " of {}; {} bytes left in block. blockPos={}; curPos={};"
curPos + " of " + src + "; " + length63 + " bytes left in block. " + + "curEnd={}",
"blockPos=" + blockPos + "; curPos=" + curPos + curPos, src, length63, blockPos, curPos, curEnd);
"; curEnd=" + curEnd);
}
return null; return null;
} }
if (DFSClient.LOG.isDebugEnabled()) { DFSClient.LOG.debug("Reducing read length from {} to {} to avoid going "
DFSClient.LOG.debug("Reducing read length from " + maxLength + + "more than one byte past the end of the block. blockPos={}; "
" to " + length63 + " to avoid going more than one byte " + +" curPos={}; curEnd={}",
"past the end of the block. blockPos=" + blockPos + maxLength, length63, blockPos, curPos, curEnd);
"; curPos=" + curPos + "; curEnd=" + curEnd);
}
} }
// Make sure that don't go beyond 31-bit offsets in the MappedByteBuffer. // Make sure that don't go beyond 31-bit offsets in the MappedByteBuffer.
int length; int length;
@ -1846,28 +1828,20 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
// So we can't mmap the parts of the block higher than the 2 GB offset. // So we can't mmap the parts of the block higher than the 2 GB offset.
// FIXME: we could work around this with multiple memory maps. // FIXME: we could work around this with multiple memory maps.
// See HDFS-5101. // See HDFS-5101.
if (DFSClient.LOG.isDebugEnabled()) { DFSClient.LOG.debug("Unable to perform a zero-copy read from offset {} "
DFSClient.LOG.debug("Unable to perform a zero-copy read from offset " + + " of {}; 31-bit MappedByteBuffer limit exceeded. blockPos={}, "
curPos + " of " + src + "; 31-bit MappedByteBuffer limit " + + "curEnd={}", curPos, src, blockPos, curEnd);
"exceeded. blockPos=" + blockPos + ", curEnd=" + curEnd);
}
return null; return null;
} }
length = (int)length31; length = (int)length31;
if (DFSClient.LOG.isDebugEnabled()) { DFSClient.LOG.debug("Reducing read length from {} to {} to avoid 31-bit "
DFSClient.LOG.debug("Reducing read length from " + maxLength + + "limit. blockPos={}; curPos={}; curEnd={}",
" to " + length + " to avoid 31-bit limit. " + maxLength, length, blockPos, curPos, curEnd);
"blockPos=" + blockPos + "; curPos=" + curPos +
"; curEnd=" + curEnd);
}
} }
final ClientMmap clientMmap = blockReader.getClientMmap(opts); final ClientMmap clientMmap = blockReader.getClientMmap(opts);
if (clientMmap == null) { if (clientMmap == null) {
if (DFSClient.LOG.isDebugEnabled()) { DFSClient.LOG.debug("unable to perform a zero-copy read from offset {} of"
DFSClient.LOG.debug("unable to perform a zero-copy read from offset " + + " {}; BlockReader#getClientMmap returned null.", curPos, src);
curPos + " of " + src + "; BlockReader#getClientMmap returned " +
"null.");
}
return null; return null;
} }
boolean success = false; boolean success = false;
@ -1881,11 +1855,8 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
synchronized (infoLock) { synchronized (infoLock) {
readStatistics.addZeroCopyBytes(length); readStatistics.addZeroCopyBytes(length);
} }
if (DFSClient.LOG.isDebugEnabled()) { DFSClient.LOG.debug("readZeroCopy read {} bytes from offset {} via the "
DFSClient.LOG.debug("readZeroCopy read " + length + + "zero-copy read path. blockEnd = {}", length, curPos, blockEnd);
" bytes from offset " + curPos + " via the zero-copy read " +
"path. blockEnd = " + blockEnd);
}
success = true; success = true;
} finally { } finally {
if (!success) { if (!success) {

View File

@ -190,9 +190,9 @@ public class DFSOutputStream extends FSOutputSummer
this.fileEncryptionInfo = stat.getFileEncryptionInfo(); this.fileEncryptionInfo = stat.getFileEncryptionInfo();
this.cachingStrategy = new AtomicReference<CachingStrategy>( this.cachingStrategy = new AtomicReference<CachingStrategy>(
dfsClient.getDefaultWriteCachingStrategy()); dfsClient.getDefaultWriteCachingStrategy());
if ((progress != null) && DFSClient.LOG.isDebugEnabled()) { if (progress != null) {
DFSClient.LOG.debug( DFSClient.LOG.debug("Set non-null progress callback on DFSOutputStream "
"Set non-null progress callback on DFSOutputStream " + src); +"{}", src);
} }
this.bytesPerChecksum = checksum.getBytesPerChecksum(); this.bytesPerChecksum = checksum.getBytesPerChecksum();
@ -365,12 +365,9 @@ public class DFSOutputStream extends FSOutputSummer
final int chunkSize = csize + getChecksumSize(); final int chunkSize = csize + getChecksumSize();
chunksPerPacket = Math.max(bodySize/chunkSize, 1); chunksPerPacket = Math.max(bodySize/chunkSize, 1);
packetSize = chunkSize*chunksPerPacket; packetSize = chunkSize*chunksPerPacket;
if (DFSClient.LOG.isDebugEnabled()) { DFSClient.LOG.debug("computePacketChunkSize: src={}, chunkSize={}, "
DFSClient.LOG.debug("computePacketChunkSize: src=" + src + + "chunksPerPacket={}, packetSize={}",
", chunkSize=" + chunkSize + src, chunkSize, chunksPerPacket, packetSize);
", chunksPerPacket=" + chunksPerPacket +
", packetSize=" + packetSize);
}
} }
protected TraceScope createWriteTraceScope() { protected TraceScope createWriteTraceScope() {
@ -397,14 +394,10 @@ public class DFSOutputStream extends FSOutputSummer
if (currentPacket == null) { if (currentPacket == null) {
currentPacket = createPacket(packetSize, chunksPerPacket, getStreamer() currentPacket = createPacket(packetSize, chunksPerPacket, getStreamer()
.getBytesCurBlock(), getStreamer().getAndIncCurrentSeqno(), false); .getBytesCurBlock(), getStreamer().getAndIncCurrentSeqno(), false);
if (DFSClient.LOG.isDebugEnabled()) { DFSClient.LOG.debug("DFSClient writeChunk allocating new packet seqno={},"
DFSClient.LOG.debug("DFSClient writeChunk allocating new packet seqno=" + + " src={}, packetSize={}, chunksPerPacket={}, bytesCurBlock={}",
currentPacket.getSeqno() + currentPacket.getSeqno(), src, packetSize, chunksPerPacket,
", src=" + src + getStreamer().getBytesCurBlock());
", packetSize=" + packetSize +
", chunksPerPacket=" + chunksPerPacket +
", bytesCurBlock=" + getStreamer().getBytesCurBlock());
}
} }
currentPacket.writeChecksum(checksum, ckoff, cklen); currentPacket.writeChecksum(checksum, ckoff, cklen);
@ -558,12 +551,9 @@ public class DFSOutputStream extends FSOutputSummer
int numKept = flushBuffer(!endBlock, true); int numKept = flushBuffer(!endBlock, true);
// bytesCurBlock potentially incremented if there was buffered data // bytesCurBlock potentially incremented if there was buffered data
if (DFSClient.LOG.isDebugEnabled()) { DFSClient.LOG.debug("DFSClient flush(): bytesCurBlock={}, "
DFSClient.LOG.debug("DFSClient flush(): " + "lastFlushOffset={}, createNewBlock={}",
+ " bytesCurBlock=" + getStreamer().getBytesCurBlock() getStreamer().getBytesCurBlock(), lastFlushOffset, endBlock);
+ " lastFlushOffset=" + lastFlushOffset
+ " createNewBlock=" + endBlock);
}
// Flush only if we haven't already flushed till this offset. // Flush only if we haven't already flushed till this offset.
if (lastFlushOffset != getStreamer().getBytesCurBlock()) { if (lastFlushOffset != getStreamer().getBytesCurBlock()) {
assert getStreamer().getBytesCurBlock() > lastFlushOffset; assert getStreamer().getBytesCurBlock() > lastFlushOffset;

View File

@ -462,19 +462,13 @@ public class DFSUtilClient {
InetAddress addr = targetAddr.getAddress(); InetAddress addr = targetAddr.getAddress();
Boolean cached = localAddrMap.get(addr.getHostAddress()); Boolean cached = localAddrMap.get(addr.getHostAddress());
if (cached != null) { if (cached != null) {
if (LOG.isTraceEnabled()) { LOG.trace("Address {} is {} local", targetAddr, (cached ? "" : "not"));
LOG.trace("Address " + targetAddr +
(cached ? " is local" : " is not local"));
}
return cached; return cached;
} }
boolean local = NetUtils.isLocalAddress(addr); boolean local = NetUtils.isLocalAddress(addr);
if (LOG.isTraceEnabled()) { LOG.trace("Address {} is {} local", targetAddr, (local ? "" : "not"));
LOG.trace("Address " + targetAddr +
(local ? " is local" : " is not local"));
}
localAddrMap.put(addr.getHostAddress(), local); localAddrMap.put(addr.getHostAddress(), local);
return local; return local;
} }

View File

@ -132,18 +132,14 @@ class DataStreamer extends Daemon {
final int length, final DFSClient client) throws IOException { final int length, final DFSClient client) throws IOException {
final DfsClientConf conf = client.getConf(); final DfsClientConf conf = client.getConf();
final String dnAddr = first.getXferAddr(conf.isConnectToDnViaHostname()); final String dnAddr = first.getXferAddr(conf.isConnectToDnViaHostname());
if (LOG.isDebugEnabled()) { LOG.debug("Connecting to datanode {}", dnAddr);
LOG.debug("Connecting to datanode " + dnAddr);
}
final InetSocketAddress isa = NetUtils.createSocketAddr(dnAddr); final InetSocketAddress isa = NetUtils.createSocketAddr(dnAddr);
final Socket sock = client.socketFactory.createSocket(); final Socket sock = client.socketFactory.createSocket();
final int timeout = client.getDatanodeReadTimeout(length); final int timeout = client.getDatanodeReadTimeout(length);
NetUtils.connect(sock, isa, client.getRandomLocalInterfaceAddr(), conf.getSocketTimeout()); NetUtils.connect(sock, isa, client.getRandomLocalInterfaceAddr(), conf.getSocketTimeout());
sock.setSoTimeout(timeout); sock.setSoTimeout(timeout);
sock.setSendBufferSize(HdfsConstants.DEFAULT_DATA_SOCKET_SIZE); sock.setSendBufferSize(HdfsConstants.DEFAULT_DATA_SOCKET_SIZE);
if (LOG.isDebugEnabled()) { LOG.debug("Send buf size {}", sock.getSendBufferSize());
LOG.debug("Send buf size " + sock.getSendBufferSize());
}
return sock; return sock;
} }
@ -484,9 +480,7 @@ class DataStreamer extends Daemon {
} }
private void endBlock() { private void endBlock() {
if(LOG.isDebugEnabled()) { LOG.debug("Closing old block {}", block);
LOG.debug("Closing old block " + block);
}
this.setName("DataStreamer for file " + src); this.setName("DataStreamer for file " + src);
closeResponder(); closeResponder();
closeStream(); closeStream();
@ -567,15 +561,11 @@ class DataStreamer extends Daemon {
// get new block from namenode. // get new block from namenode.
if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) { if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) {
if(LOG.isDebugEnabled()) { LOG.debug("Allocating new block");
LOG.debug("Allocating new block");
}
setPipeline(nextBlockOutputStream()); setPipeline(nextBlockOutputStream());
initDataStreaming(); initDataStreaming();
} else if (stage == BlockConstructionStage.PIPELINE_SETUP_APPEND) { } else if (stage == BlockConstructionStage.PIPELINE_SETUP_APPEND) {
if(LOG.isDebugEnabled()) { LOG.debug("Append to block {}", block);
LOG.debug("Append to block " + block);
}
setupPipelineForAppendOrRecovery(); setupPipelineForAppendOrRecovery();
if (streamerClosed) { if (streamerClosed) {
continue; continue;
@ -627,10 +617,7 @@ class DataStreamer extends Daemon {
} }
} }
if (LOG.isDebugEnabled()) { LOG.debug("DataStreamer block {} sending packet {}", block, one);
LOG.debug("DataStreamer block " + block +
" sending packet " + one);
}
// write out data to remote datanode // write out data to remote datanode
TraceScope writeScope = dfsClient.getTracer(). TraceScope writeScope = dfsClient.getTracer().
@ -741,9 +728,7 @@ class DataStreamer extends Daemon {
TraceScope scope = dfsClient.getTracer(). TraceScope scope = dfsClient.getTracer().
newScope("waitForAckedSeqno"); newScope("waitForAckedSeqno");
try { try {
if (LOG.isDebugEnabled()) { LOG.debug("Waiting for ack for: {}", seqno);
LOG.debug("Waiting for ack for: " + seqno);
}
long begin = Time.monotonicNow(); long begin = Time.monotonicNow();
try { try {
synchronized (dataQueue) { synchronized (dataQueue) {
@ -955,8 +940,8 @@ class DataStreamer extends Daemon {
LOG.warn("Slow ReadProcessor read fields took " + duration LOG.warn("Slow ReadProcessor read fields took " + duration
+ "ms (threshold=" + dfsclientSlowLogThresholdMs + "ms); ack: " + "ms (threshold=" + dfsclientSlowLogThresholdMs + "ms); ack: "
+ ack + ", targets: " + Arrays.asList(targets)); + ack + ", targets: " + Arrays.asList(targets));
} else if (LOG.isDebugEnabled()) { } else {
LOG.debug("DFSClient " + ack); LOG.debug("DFSClient {}", ack);
} }
long seqno = ack.getSeqno(); long seqno = ack.getSeqno();
@ -1176,9 +1161,7 @@ class DataStreamer extends Daemon {
} }
private void addDatanode2ExistingPipeline() throws IOException { private void addDatanode2ExistingPipeline() throws IOException {
if (DataTransferProtocol.LOG.isDebugEnabled()) { DataTransferProtocol.LOG.debug("lastAckedSeqno = {}", lastAckedSeqno);
DataTransferProtocol.LOG.debug("lastAckedSeqno = " + lastAckedSeqno);
}
/* /*
* Is data transfer necessary? We have the following cases. * Is data transfer necessary? We have the following cases.
* *
@ -1645,10 +1628,8 @@ class DataStreamer extends Daemon {
new HashSet<String>(Arrays.asList(favoredNodes)); new HashSet<String>(Arrays.asList(favoredNodes));
for (int i = 0; i < nodes.length; i++) { for (int i = 0; i < nodes.length; i++) {
pinnings[i] = favoredSet.remove(nodes[i].getXferAddrWithHostname()); pinnings[i] = favoredSet.remove(nodes[i].getXferAddrWithHostname());
if (LOG.isDebugEnabled()) { LOG.debug("{} was chosen by name node (favored={}).",
LOG.debug(nodes[i].getXferAddrWithHostname() + nodes[i].getXferAddrWithHostname(), pinnings[i]);
" was chosen by name node (favored=" + pinnings[i] + ").");
}
} }
if (shouldLog && !favoredSet.isEmpty()) { if (shouldLog && !favoredSet.isEmpty()) {
// There is one or more favored nodes that were not allocated. // There is one or more favored nodes that were not allocated.
@ -1787,9 +1768,7 @@ class DataStreamer extends Daemon {
packet.addTraceParent(Tracer.getCurrentSpanId()); packet.addTraceParent(Tracer.getCurrentSpanId());
dataQueue.addLast(packet); dataQueue.addLast(packet);
lastQueuedSeqno = packet.getSeqno(); lastQueuedSeqno = packet.getSeqno();
if (LOG.isDebugEnabled()) { LOG.debug("Queued packet {}", packet.getSeqno());
LOG.debug("Queued packet " + packet.getSeqno());
}
dataQueue.notifyAll(); dataQueue.notifyAll();
} }
} }

View File

@ -250,9 +250,7 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader {
PacketHeader header = new PacketHeader(); PacketHeader header = new PacketHeader();
header.readFields(in); header.readFields(in);
if (LOG.isDebugEnabled()) { LOG.debug("DFSClient readChunk got header {}", header);
LOG.debug("DFSClient readChunk got header " + header);
}
// Sanity check the lengths // Sanity check the lengths
if (!header.sanityCheck(lastSeqNo)) { if (!header.sanityCheck(lastSeqNo)) {

View File

@ -135,14 +135,9 @@ public class RemoteBlockReader2 implements BlockReader {
@Override @Override
public synchronized int read(byte[] buf, int off, int len) public synchronized int read(byte[] buf, int off, int len)
throws IOException { throws IOException {
UUID randomId = (LOG.isTraceEnabled() ? UUID.randomUUID() : null);
UUID randomId = null; LOG.trace("Starting read #{} file {} from datanode {}",
if (LOG.isTraceEnabled()) { randomId, filename, datanodeID.getHostName());
randomId = UUID.randomUUID();
LOG.trace(String.format("Starting read #%s file %s from datanode %s",
randomId.toString(), this.filename,
this.datanodeID.getHostName()));
}
if (curDataSlice == null || curDataSlice.remaining() == 0 && bytesNeededToFinish > 0) { if (curDataSlice == null || curDataSlice.remaining() == 0 && bytesNeededToFinish > 0) {
TraceScope scope = tracer.newScope( TraceScope scope = tracer.newScope(
@ -154,9 +149,7 @@ public class RemoteBlockReader2 implements BlockReader {
} }
} }
if (LOG.isTraceEnabled()) { LOG.trace("Finishing read #{}", randomId);
LOG.trace(String.format("Finishing read #" + randomId));
}
if (curDataSlice.remaining() == 0) { if (curDataSlice.remaining() == 0) {
// we're at EOF now // we're at EOF now
@ -203,9 +196,7 @@ public class RemoteBlockReader2 implements BlockReader {
curDataSlice = packetReceiver.getDataSlice(); curDataSlice = packetReceiver.getDataSlice();
assert curDataSlice.capacity() == curHeader.getDataLen(); assert curDataSlice.capacity() == curHeader.getDataLen();
if (LOG.isTraceEnabled()) { LOG.trace("DFSClient readNextPacket got header {}", curHeader);
LOG.trace("DFSClient readNextPacket got header " + curHeader);
}
// Sanity check the lengths // Sanity check the lengths
if (!curHeader.sanityCheck(lastSeqNo)) { if (!curHeader.sanityCheck(lastSeqNo)) {
@ -276,9 +267,7 @@ public class RemoteBlockReader2 implements BlockReader {
} }
private void readTrailingEmptyPacket() throws IOException { private void readTrailingEmptyPacket() throws IOException {
if (LOG.isTraceEnabled()) { LOG.trace("Reading empty packet at end of read");
LOG.trace("Reading empty packet at end of read");
}
packetReceiver.receiveNextPacket(in); packetReceiver.receiveNextPacket(in);

View File

@ -308,10 +308,7 @@ public class LeaseRenewer {
} }
LeaseRenewer.this.run(id); LeaseRenewer.this.run(id);
} catch(InterruptedException e) { } catch(InterruptedException e) {
if (LOG.isDebugEnabled()) { LOG.debug("LeaseRenewer is interrupted.", e);
LOG.debug(LeaseRenewer.this.getClass().getSimpleName()
+ " is interrupted.", e);
}
} finally { } finally {
synchronized(LeaseRenewer.this) { synchronized(LeaseRenewer.this) {
Factory.INSTANCE.remove(LeaseRenewer.this); Factory.INSTANCE.remove(LeaseRenewer.this);
@ -399,9 +396,7 @@ public class LeaseRenewer {
} }
if (daemonCopy != null) { if (daemonCopy != null) {
if(LOG.isDebugEnabled()) { LOG.debug("Wait for lease checker to terminate");
LOG.debug("Wait for lease checker to terminate");
}
daemonCopy.join(); daemonCopy.join();
} }
} }
@ -424,16 +419,11 @@ public class LeaseRenewer {
//skip if current client name is the same as the previous name. //skip if current client name is the same as the previous name.
if (!c.getClientName().equals(previousName)) { if (!c.getClientName().equals(previousName)) {
if (!c.renewLease()) { if (!c.renewLease()) {
if (LOG.isDebugEnabled()) { LOG.debug("Did not renew lease for client {}", c);
LOG.debug("Did not renew lease for client " +
c);
}
continue; continue;
} }
previousName = c.getClientName(); previousName = c.getClientName();
if (LOG.isDebugEnabled()) { LOG.debug("Lease renewed for client {}", previousName);
LOG.debug("Lease renewed for client " + previousName);
}
} }
} }
} }

View File

@ -147,10 +147,8 @@ public class PacketReceiver implements Closeable {
throw new IOException("Invalid header length " + headerLen); throw new IOException("Invalid header length " + headerLen);
} }
if (LOG.isTraceEnabled()) { LOG.trace("readNextPacket: dataPlusChecksumLen={}, headerLen={}",
LOG.trace("readNextPacket: dataPlusChecksumLen = " + dataPlusChecksumLen + dataPlusChecksumLen, headerLen);
" headerLen = " + headerLen);
}
// Sanity check the buffer size so we don't allocate too much memory // Sanity check the buffer size so we don't allocate too much memory
// and OOME. // and OOME.

View File

@ -73,10 +73,8 @@ public class Sender implements DataTransferProtocol {
private static void send(final DataOutputStream out, final Op opcode, private static void send(final DataOutputStream out, final Op opcode,
final Message proto) throws IOException { final Message proto) throws IOException {
if (LOG.isTraceEnabled()) { LOG.trace("Sending DataTransferOp {}: {}",
LOG.trace("Sending DataTransferOp " + proto.getClass().getSimpleName() proto.getClass().getSimpleName(), proto);
+ ": " + proto);
}
op(out, opcode); op(out, opcode);
proto.writeDelimitedTo(out); proto.writeDelimitedTo(out);
out.flush(); out.flush();

View File

@ -332,10 +332,8 @@ public final class DataTransferSaslUtil {
public static IOStreamPair createStreamPair(Configuration conf, public static IOStreamPair createStreamPair(Configuration conf,
CipherOption cipherOption, OutputStream out, InputStream in, CipherOption cipherOption, OutputStream out, InputStream in,
boolean isServer) throws IOException { boolean isServer) throws IOException {
if (LOG.isDebugEnabled()) { LOG.debug("Creating IOStreamPair of CryptoInputStream and "
LOG.debug("Creating IOStreamPair of CryptoInputStream and " + + "CryptoOutputStream.");
"CryptoOutputStream.");
}
CryptoCodec codec = CryptoCodec.getInstance(conf, CryptoCodec codec = CryptoCodec.getInstance(conf,
cipherOption.getCipherSuite()); cipherOption.getCipherSuite());
byte[] inKey = cipherOption.getInKey(); byte[] inKey = cipherOption.getInKey();

View File

@ -130,9 +130,7 @@ public class ClientDatanodeProtocolTranslatorPB implements
throws IOException { throws IOException {
final String dnAddr = datanodeid.getIpcAddr(connectToDnViaHostname); final String dnAddr = datanodeid.getIpcAddr(connectToDnViaHostname);
InetSocketAddress addr = NetUtils.createSocketAddr(dnAddr); InetSocketAddress addr = NetUtils.createSocketAddr(dnAddr);
if (LOG.isDebugEnabled()) { LOG.debug("Connecting to datanode {} addr={}", dnAddr, addr);
LOG.debug("Connecting to datanode " + dnAddr + " addr=" + addr);
}
rpcProxy = createClientDatanodeProtocolProxy(addr, rpcProxy = createClientDatanodeProtocolProxy(addr,
UserGroupInformation.getCurrentUser(), conf, UserGroupInformation.getCurrentUser(), conf,
NetUtils.getDefaultSocketFactory(conf), socketTimeout); NetUtils.getDefaultSocketFactory(conf), socketTimeout);
@ -143,9 +141,7 @@ public class ClientDatanodeProtocolTranslatorPB implements
boolean connectToDnViaHostname, LocatedBlock locatedBlock) throws IOException { boolean connectToDnViaHostname, LocatedBlock locatedBlock) throws IOException {
final String dnAddr = datanodeid.getIpcAddr(connectToDnViaHostname); final String dnAddr = datanodeid.getIpcAddr(connectToDnViaHostname);
InetSocketAddress addr = NetUtils.createSocketAddr(dnAddr); InetSocketAddress addr = NetUtils.createSocketAddr(dnAddr);
if (LOG.isDebugEnabled()) { LOG.debug("Connecting to datanode {} addr={}", dnAddr, addr);
LOG.debug("Connecting to datanode " + dnAddr + " addr=" + addr);
}
// Since we're creating a new UserGroupInformation here, we know that no // Since we're creating a new UserGroupInformation here, we know that no
// future RPC proxies will be able to re-use the same connection. And // future RPC proxies will be able to re-use the same connection. And

View File

@ -129,18 +129,13 @@ public class DfsClientShmManager implements Closeable {
ShmId shmId = shm.getShmId(); ShmId shmId = shm.getShmId();
Slot slot = shm.allocAndRegisterSlot(blockId); Slot slot = shm.allocAndRegisterSlot(blockId);
if (shm.isFull()) { if (shm.isFull()) {
if (LOG.isTraceEnabled()) { LOG.trace("{}: pulled the last slot {} out of {}",
LOG.trace(this + ": pulled the last slot " + slot.getSlotIdx() + this, slot.getSlotIdx(), shm);
" out of " + shm);
}
DfsClientShm removedShm = notFull.remove(shmId); DfsClientShm removedShm = notFull.remove(shmId);
Preconditions.checkState(removedShm == shm); Preconditions.checkState(removedShm == shm);
full.put(shmId, shm); full.put(shmId, shm);
} else { } else {
if (LOG.isTraceEnabled()) { LOG.trace("{}: pulled slot {} out of {}", this, slot.getSlotIdx(), shm);
LOG.trace(this + ": pulled slot " + slot.getSlotIdx() +
" out of " + shm);
}
} }
return slot; return slot;
} }
@ -187,9 +182,7 @@ public class DfsClientShmManager implements Closeable {
DfsClientShm shm = DfsClientShm shm =
new DfsClientShm(PBHelperClient.convert(resp.getId()), new DfsClientShm(PBHelperClient.convert(resp.getId()),
fis[0], this, peer); fis[0], this, peer);
if (LOG.isTraceEnabled()) { LOG.trace("{}: createNewShm: created {}", this, shm);
LOG.trace(this + ": createNewShm: created " + shm);
}
return shm; return shm;
} finally { } finally {
try { try {
@ -234,15 +227,11 @@ public class DfsClientShmManager implements Closeable {
String clientName, ExtendedBlockId blockId) throws IOException { String clientName, ExtendedBlockId blockId) throws IOException {
while (true) { while (true) {
if (closed) { if (closed) {
if (LOG.isTraceEnabled()) { LOG.trace("{}: the DfsClientShmManager has been closed.", this);
LOG.trace(this + ": the DfsClientShmManager has been closed.");
}
return null; return null;
} }
if (disabled) { if (disabled) {
if (LOG.isTraceEnabled()) { LOG.trace("{}: shared memory segment access is disabled.", this);
LOG.trace(this + ": shared memory segment access is disabled.");
}
return null; return null;
} }
// Try to use an existing slot. // Try to use an existing slot.
@ -253,9 +242,7 @@ public class DfsClientShmManager implements Closeable {
// There are no free slots. If someone is loading more slots, wait // There are no free slots. If someone is loading more slots, wait
// for that to finish. // for that to finish.
if (loading) { if (loading) {
if (LOG.isTraceEnabled()) { LOG.trace("{}: waiting for loading to finish...", this);
LOG.trace(this + ": waiting for loading to finish...");
}
finishedLoading.awaitUninterruptibly(); finishedLoading.awaitUninterruptibly();
} else { } else {
// Otherwise, load the slot ourselves. // Otherwise, load the slot ourselves.
@ -282,11 +269,9 @@ public class DfsClientShmManager implements Closeable {
// fired and marked the shm as disconnected. In this case, we // fired and marked the shm as disconnected. In this case, we
// obviously don't want to add the SharedMemorySegment to our list // obviously don't want to add the SharedMemorySegment to our list
// of valid not-full segments. // of valid not-full segments.
if (LOG.isDebugEnabled()) { LOG.debug("{}: the UNIX domain socket associated with this "
LOG.debug(this + ": the UNIX domain socket associated with " + + "short-circuit memory closed before we could make use of "
"this short-circuit memory closed before we could make " + + "the shm.", this);
"use of the shm.");
}
} else { } else {
notFull.put(shm.getShmId(), shm); notFull.put(shm.getShmId(), shm);
} }
@ -309,9 +294,7 @@ public class DfsClientShmManager implements Closeable {
Preconditions.checkState(!full.containsKey(shm.getShmId())); Preconditions.checkState(!full.containsKey(shm.getShmId()));
Preconditions.checkState(!notFull.containsKey(shm.getShmId())); Preconditions.checkState(!notFull.containsKey(shm.getShmId()));
if (shm.isEmpty()) { if (shm.isEmpty()) {
if (LOG.isTraceEnabled()) { LOG.trace("{}: freeing empty stale {}", this, shm);
LOG.trace(this + ": freeing empty stale " + shm);
}
shm.free(); shm.free();
} }
} else { } else {
@ -336,10 +319,8 @@ public class DfsClientShmManager implements Closeable {
// lowest ID, but it could still occur. In most workloads, // lowest ID, but it could still occur. In most workloads,
// fragmentation should not be a major concern, since it doesn't impact // fragmentation should not be a major concern, since it doesn't impact
// peak file descriptor usage or the speed of allocation. // peak file descriptor usage or the speed of allocation.
if (LOG.isTraceEnabled()) { LOG.trace("{}: shutting down UNIX domain socket for empty {}",
LOG.trace(this + ": shutting down UNIX domain socket for " + this, shm);
"empty " + shm);
}
shutdown(shm); shutdown(shm);
} else { } else {
notFull.put(shmId, shm); notFull.put(shmId, shm);

View File

@ -103,9 +103,7 @@ public class ShortCircuitCache implements Closeable {
if (ShortCircuitCache.this.closed) return; if (ShortCircuitCache.this.closed) return;
long curMs = Time.monotonicNow(); long curMs = Time.monotonicNow();
if (LOG.isDebugEnabled()) { LOG.debug("{}: cache cleaner running at {}", this, curMs);
LOG.debug(this + ": cache cleaner running at " + curMs);
}
int numDemoted = demoteOldEvictableMmaped(curMs); int numDemoted = demoteOldEvictableMmaped(curMs);
int numPurged = 0; int numPurged = 0;
@ -127,11 +125,9 @@ public class ShortCircuitCache implements Closeable {
numPurged++; numPurged++;
} }
if (LOG.isDebugEnabled()) { LOG.debug("{}: finishing cache cleaner run started at {}. Demoted {} "
LOG.debug(this + ": finishing cache cleaner run started at " + + "mmapped replicas; purged {} replicas.",
curMs + ". Demoted " + numDemoted + " mmapped replicas; " + this, curMs, numDemoted, numPurged);
"purged " + numPurged + " replicas.");
}
} finally { } finally {
ShortCircuitCache.this.lock.unlock(); ShortCircuitCache.this.lock.unlock();
} }
@ -186,9 +182,7 @@ public class ShortCircuitCache implements Closeable {
@Override @Override
public void run() { public void run() {
if (LOG.isTraceEnabled()) { LOG.trace("{}: about to release {}", ShortCircuitCache.this, slot);
LOG.trace(ShortCircuitCache.this + ": about to release " + slot);
}
final DfsClientShm shm = (DfsClientShm)slot.getShm(); final DfsClientShm shm = (DfsClientShm)slot.getShm();
final DomainSocket shmSock = shm.getPeer().getDomainSocket(); final DomainSocket shmSock = shm.getPeer().getDomainSocket();
final String path = shmSock.getPath(); final String path = shmSock.getPath();
@ -205,9 +199,7 @@ public class ShortCircuitCache implements Closeable {
String error = resp.hasError() ? resp.getError() : "(unknown)"; String error = resp.hasError() ? resp.getError() : "(unknown)";
throw new IOException(resp.getStatus().toString() + ": " + error); throw new IOException(resp.getStatus().toString() + ": " + error);
} }
if (LOG.isTraceEnabled()) { LOG.trace("{}: released {}", this, slot);
LOG.trace(ShortCircuitCache.this + ": released " + slot);
}
success = true; success = true;
} catch (IOException e) { } catch (IOException e) {
LOG.error(ShortCircuitCache.this + ": failed to release " + LOG.error(ShortCircuitCache.this + ": failed to release " +
@ -433,9 +425,7 @@ public class ShortCircuitCache implements Closeable {
purgeReason = "purging replica because it is stale."; purgeReason = "purging replica because it is stale.";
} }
if (purgeReason != null) { if (purgeReason != null) {
if (LOG.isDebugEnabled()) { LOG.debug("{}: {}", this, purgeReason);
LOG.debug(this + ": " + purgeReason);
}
purge(replica); purge(replica);
} }
} }
@ -677,10 +667,8 @@ public class ShortCircuitCache implements Closeable {
ShortCircuitReplicaInfo info = null; ShortCircuitReplicaInfo info = null;
do { do {
if (closed) { if (closed) {
if (LOG.isTraceEnabled()) { LOG.trace("{}: can't fethchOrCreate {} because the cache is closed.",
LOG.trace(this + ": can't fetchOrCreate " + key + this, key);
" because the cache is closed.");
}
return null; return null;
} }
Waitable<ShortCircuitReplicaInfo> waitable = replicaInfoMap.get(key); Waitable<ShortCircuitReplicaInfo> waitable = replicaInfoMap.get(key);
@ -688,9 +676,7 @@ public class ShortCircuitCache implements Closeable {
try { try {
info = fetch(key, waitable); info = fetch(key, waitable);
} catch (RetriableException e) { } catch (RetriableException e) {
if (LOG.isDebugEnabled()) { LOG.debug("{}: retrying {}", this, e.getMessage());
LOG.debug(this + ": retrying " + e.getMessage());
}
continue; continue;
} }
} }
@ -721,9 +707,7 @@ public class ShortCircuitCache implements Closeable {
// ShortCircuitReplica. So we simply wait for it to complete. // ShortCircuitReplica. So we simply wait for it to complete.
ShortCircuitReplicaInfo info; ShortCircuitReplicaInfo info;
try { try {
if (LOG.isTraceEnabled()) { LOG.trace("{}: found waitable for {}", this, key);
LOG.trace(this + ": found waitable for " + key);
}
info = waitable.await(); info = waitable.await();
} catch (InterruptedException e) { } catch (InterruptedException e) {
LOG.info(this + ": interrupted while waiting for " + key); LOG.info(this + ": interrupted while waiting for " + key);
@ -765,9 +749,7 @@ public class ShortCircuitCache implements Closeable {
// Handle loading a new replica. // Handle loading a new replica.
ShortCircuitReplicaInfo info = null; ShortCircuitReplicaInfo info = null;
try { try {
if (LOG.isTraceEnabled()) { LOG.trace("{}: loading {}", this, key);
LOG.trace(this + ": loading " + key);
}
info = creator.createShortCircuitReplicaInfo(); info = creator.createShortCircuitReplicaInfo();
} catch (RuntimeException e) { } catch (RuntimeException e) {
LOG.warn(this + ": failed to load " + key, e); LOG.warn(this + ": failed to load " + key, e);
@ -777,9 +759,7 @@ public class ShortCircuitCache implements Closeable {
try { try {
if (info.getReplica() != null) { if (info.getReplica() != null) {
// On success, make sure the cache cleaner thread is running. // On success, make sure the cache cleaner thread is running.
if (LOG.isTraceEnabled()) { LOG.trace("{}: successfully loaded {}", this, info.getReplica());
LOG.trace(this + ": successfully loaded " + info.getReplica());
}
startCacheCleanerThreadIfNeeded(); startCacheCleanerThreadIfNeeded();
// Note: new ShortCircuitReplicas start with a refCount of 2, // Note: new ShortCircuitReplicas start with a refCount of 2,
// indicating that both this cache and whoever requested the // indicating that both this cache and whoever requested the
@ -811,10 +791,8 @@ public class ShortCircuitCache implements Closeable {
cleanerExecutor.scheduleAtFixedRate(cacheCleaner, rateMs, rateMs, cleanerExecutor.scheduleAtFixedRate(cacheCleaner, rateMs, rateMs,
TimeUnit.MILLISECONDS); TimeUnit.MILLISECONDS);
cacheCleaner.setFuture(future); cacheCleaner.setFuture(future);
if (LOG.isDebugEnabled()) { LOG.debug("{}: starting cache cleaner thread which will run every {} ms",
LOG.debug(this + ": starting cache cleaner thread which will run " + this, rateMs);
"every " + rateMs + " ms");
}
} }
} }
@ -832,17 +810,12 @@ public class ShortCircuitCache implements Closeable {
long lastAttemptTimeMs = (Long)replica.mmapData; long lastAttemptTimeMs = (Long)replica.mmapData;
long delta = Time.monotonicNow() - lastAttemptTimeMs; long delta = Time.monotonicNow() - lastAttemptTimeMs;
if (delta < mmapRetryTimeoutMs) { if (delta < mmapRetryTimeoutMs) {
if (LOG.isTraceEnabled()) { LOG.trace("{}: can't create client mmap for {} because we failed to"
LOG.trace(this + ": can't create client mmap for " + + " create one just {}ms ago.", this, replica, delta);
replica + " because we failed to " +
"create one just " + delta + "ms ago.");
}
return null; return null;
} }
if (LOG.isTraceEnabled()) { LOG.trace("{}: retrying client mmap for {}, {} ms after the previous "
LOG.trace(this + ": retrying client mmap for " + replica + + "failure.", this, replica, delta);
", " + delta + " ms after the previous failure.");
}
} else if (replica.mmapData instanceof Condition) { } else if (replica.mmapData instanceof Condition) {
Condition cond = (Condition)replica.mmapData; Condition cond = (Condition)replica.mmapData;
cond.awaitUninterruptibly(); cond.awaitUninterruptibly();
@ -965,38 +938,10 @@ public class ShortCircuitCache implements Closeable {
} }
} }
} }
if (LOG.isDebugEnabled()) { LOG.debug("visiting {} with outstandingMmapCount={}, replicas={}, "
StringBuilder builder = new StringBuilder(); + "failedLoads={}, evictable={}, evictableMmapped={}",
builder.append("visiting ").append(visitor.getClass().getName()). visitor.getClass().getName(), outstandingMmapCount, replicas,
append("with outstandingMmapCount=").append(outstandingMmapCount). failedLoads, evictable, evictableMmapped);
append(", replicas=");
String prefix = "";
for (Entry<ExtendedBlockId, ShortCircuitReplica> entry : replicas.entrySet()) {
builder.append(prefix).append(entry.getValue());
prefix = ",";
}
prefix = "";
builder.append(", failedLoads=");
for (Entry<ExtendedBlockId, InvalidToken> entry : failedLoads.entrySet()) {
builder.append(prefix).append(entry.getValue());
prefix = ",";
}
prefix = "";
builder.append(", evictable=");
for (Entry<Long, ShortCircuitReplica> entry : evictable.entrySet()) {
builder.append(prefix).append(entry.getKey()).
append(":").append(entry.getValue());
prefix = ",";
}
prefix = "";
builder.append(", evictableMmapped=");
for (Entry<Long, ShortCircuitReplica> entry : evictableMmapped.entrySet()) {
builder.append(prefix).append(entry.getKey()).
append(":").append(entry.getValue());
prefix = ",";
}
LOG.debug(builder.toString());
}
visitor.visit(outstandingMmapCount, replicas, failedLoads, visitor.visit(outstandingMmapCount, replicas, failedLoads,
evictable, evictableMmapped); evictable, evictableMmapped);
} finally { } finally {

View File

@ -154,25 +154,19 @@ public class ShortCircuitReplica {
// Check staleness by looking at the shared memory area we use to // Check staleness by looking at the shared memory area we use to
// communicate with the DataNode. // communicate with the DataNode.
boolean stale = !slot.isValid(); boolean stale = !slot.isValid();
if (LOG.isTraceEnabled()) { LOG.trace("{}: checked shared memory segment. isStale={}", this, stale);
LOG.trace(this + ": checked shared memory segment. isStale=" + stale);
}
return stale; return stale;
} else { } else {
// Fall back to old, time-based staleness method. // Fall back to old, time-based staleness method.
long deltaMs = Time.monotonicNow() - creationTimeMs; long deltaMs = Time.monotonicNow() - creationTimeMs;
long staleThresholdMs = cache.getStaleThresholdMs(); long staleThresholdMs = cache.getStaleThresholdMs();
if (deltaMs > staleThresholdMs) { if (deltaMs > staleThresholdMs) {
if (LOG.isTraceEnabled()) { LOG.trace("{} is stale because it's {} ms old and staleThreadholdMS={}",
LOG.trace(this + " is stale because it's " + deltaMs + this, deltaMs, staleThresholdMs);
" ms old, and staleThresholdMs = " + staleThresholdMs);
}
return true; return true;
} else { } else {
if (LOG.isTraceEnabled()) { LOG.trace("{} is not stale because it's only {} ms old "
LOG.trace(this + " is not stale because it's only " + deltaMs + + "and staleThresholdMs={}", this, deltaMs, staleThresholdMs);
" ms old, and staleThresholdMs = " + staleThresholdMs);
}
return false; return false;
} }
} }
@ -194,13 +188,8 @@ public class ShortCircuitReplica {
return false; return false;
} }
boolean result = slot.addAnchor(); boolean result = slot.addAnchor();
if (LOG.isTraceEnabled()) { LOG.trace("{}: {} no-checksum anchor to slot {}",
if (result) { this, result ? "added" : "could not add", slot);
LOG.trace(this + ": added no-checksum anchor to slot " + slot);
} else {
LOG.trace(this + ": could not add no-checksum anchor to slot " + slot);
}
}
return result; return result;
} }
@ -263,9 +252,7 @@ public class ShortCircuitReplica {
suffix += " scheduling " + slot + " for later release."; suffix += " scheduling " + slot + " for later release.";
} }
} }
if (LOG.isTraceEnabled()) { LOG.trace("closed {}{}", this, suffix);
LOG.trace("closed " + this + suffix);
}
} }
public FileInputStream getDataStream() { public FileInputStream getDataStream() {
@ -293,9 +280,7 @@ public class ShortCircuitReplica {
FileChannel channel = dataStream.getChannel(); FileChannel channel = dataStream.getChannel();
MappedByteBuffer mmap = channel.map(MapMode.READ_ONLY, 0, MappedByteBuffer mmap = channel.map(MapMode.READ_ONLY, 0,
Math.min(Integer.MAX_VALUE, channel.size())); Math.min(Integer.MAX_VALUE, channel.size()));
if (LOG.isTraceEnabled()) { LOG.trace("{}: created mmap of size {}", this, channel.size());
LOG.trace(this + ": created mmap of size " + channel.size());
}
return mmap; return mmap;
} catch (IOException e) { } catch (IOException e) {
LOG.warn(this + ": mmap error", e); LOG.warn(this + ": mmap error", e);

View File

@ -484,13 +484,9 @@ public class ShortCircuitShm {
POSIX.MMAP_PROT_READ | POSIX.MMAP_PROT_WRITE, true, mmappedLength); POSIX.MMAP_PROT_READ | POSIX.MMAP_PROT_WRITE, true, mmappedLength);
this.slots = new Slot[mmappedLength / BYTES_PER_SLOT]; this.slots = new Slot[mmappedLength / BYTES_PER_SLOT];
this.allocatedSlots = new BitSet(slots.length); this.allocatedSlots = new BitSet(slots.length);
if (LOG.isTraceEnabled()) { LOG.trace("creating {}(shmId={}, mmappedLength={}, baseAddress={}, "
LOG.trace("creating " + this.getClass().getSimpleName() + + "slots.length={})", this.getClass().getSimpleName(), shmId,
"(shmId=" + shmId + mmappedLength, String.format("%x", baseAddress), slots.length);
", mmappedLength=" + mmappedLength +
", baseAddress=" + String.format("%x", baseAddress) +
", slots.length=" + slots.length + ")");
}
} }
public final ShmId getShmId() { public final ShmId getShmId() {
@ -615,9 +611,7 @@ public class ShortCircuitShm {
"tried to unregister slot " + slotIdx + ", which was not registered."); "tried to unregister slot " + slotIdx + ", which was not registered.");
allocatedSlots.set(slotIdx, false); allocatedSlots.set(slotIdx, false);
slots[slotIdx] = null; slots[slotIdx] = null;
if (LOG.isTraceEnabled()) { LOG.trace("{}: unregisterSlot {}", this, slotIdx);
LOG.trace(this + ": unregisterSlot " + slotIdx);
}
} }
/** /**

View File

@ -36,18 +36,6 @@ import org.slf4j.LoggerFactory;
@InterfaceAudience.Private @InterfaceAudience.Private
public abstract class ByteArrayManager { public abstract class ByteArrayManager {
static final Logger LOG = LoggerFactory.getLogger(ByteArrayManager.class); static final Logger LOG = LoggerFactory.getLogger(ByteArrayManager.class);
private static final ThreadLocal<StringBuilder> DEBUG_MESSAGE =
new ThreadLocal<StringBuilder>() {
protected StringBuilder initialValue() {
return new StringBuilder();
}
};
private static void logDebugMessage() {
final StringBuilder b = DEBUG_MESSAGE.get();
LOG.debug(b.toString());
b.setLength(0);
}
static final int MIN_ARRAY_LENGTH = 32; static final int MIN_ARRAY_LENGTH = 32;
static final byte[] EMPTY_BYTE_ARRAY = {}; static final byte[] EMPTY_BYTE_ARRAY = {};
@ -160,27 +148,18 @@ public abstract class ByteArrayManager {
* via the {@link FixedLengthManager#recycle(byte[])} method. * via the {@link FixedLengthManager#recycle(byte[])} method.
*/ */
synchronized byte[] allocate() throws InterruptedException { synchronized byte[] allocate() throws InterruptedException {
if (LOG.isDebugEnabled()) { LOG.debug(", {}", this);
DEBUG_MESSAGE.get().append(", ").append(this);
}
for(; numAllocated >= maxAllocated;) { for(; numAllocated >= maxAllocated;) {
if (LOG.isDebugEnabled()) { LOG.debug(": wait ...");
DEBUG_MESSAGE.get().append(": wait ...");
logDebugMessage();
}
wait(); wait();
if (LOG.isDebugEnabled()) { LOG.debug("wake up: {}", this);
DEBUG_MESSAGE.get().append("wake up: ").append(this);
}
} }
numAllocated++; numAllocated++;
final byte[] array = freeQueue.poll(); final byte[] array = freeQueue.poll();
if (LOG.isDebugEnabled()) { LOG.debug(", recycled? {}", array != null);
DEBUG_MESSAGE.get().append(", recycled? ").append(array != null);
}
return array != null? array : new byte[byteArrayLength]; return array != null? array : new byte[byteArrayLength];
} }
@ -194,9 +173,7 @@ public abstract class ByteArrayManager {
synchronized int recycle(byte[] array) { synchronized int recycle(byte[] array) {
Preconditions.checkNotNull(array); Preconditions.checkNotNull(array);
Preconditions.checkArgument(array.length == byteArrayLength); Preconditions.checkArgument(array.length == byteArrayLength);
if (LOG.isDebugEnabled()) { LOG.debug(", {}", this);
DEBUG_MESSAGE.get().append(", ").append(this);
}
notify(); notify();
numAllocated--; numAllocated--;
@ -207,9 +184,7 @@ public abstract class ByteArrayManager {
} }
if (freeQueue.size() < maxAllocated - numAllocated) { if (freeQueue.size() < maxAllocated - numAllocated) {
if (LOG.isDebugEnabled()) { LOG.debug(", freeQueue.offer");
DEBUG_MESSAGE.get().append(", freeQueue.offer");
}
freeQueue.offer(array); freeQueue.offer(array);
} }
return freeQueue.size(); return freeQueue.size();
@ -349,9 +324,7 @@ public abstract class ByteArrayManager {
public byte[] newByteArray(final int arrayLength) public byte[] newByteArray(final int arrayLength)
throws InterruptedException { throws InterruptedException {
Preconditions.checkArgument(arrayLength >= 0); Preconditions.checkArgument(arrayLength >= 0);
if (LOG.isDebugEnabled()) { LOG.debug("allocate({})", arrayLength);
DEBUG_MESSAGE.get().append("allocate(").append(arrayLength).append(")");
}
final byte[] array; final byte[] array;
if (arrayLength == 0) { if (arrayLength == 0) {
@ -365,18 +338,12 @@ public abstract class ByteArrayManager {
final FixedLengthManager manager = final FixedLengthManager manager =
managers.get(powerOfTwo, aboveThreshold); managers.get(powerOfTwo, aboveThreshold);
if (LOG.isDebugEnabled()) { LOG.debug(": count={}, {}Threshold", count,
DEBUG_MESSAGE.get().append(": count=").append(count) aboveThreshold ? "above" : "below");
.append(aboveThreshold? ", aboveThreshold": ", belowThreshold");
}
array = manager != null? manager.allocate(): new byte[powerOfTwo]; array = manager != null? manager.allocate(): new byte[powerOfTwo];
} }
if (LOG.isDebugEnabled()) { LOG.debug(", return byte[{}]", array.length);
DEBUG_MESSAGE.get().append(", return byte[")
.append(array.length).append("]");
logDebugMessage();
}
return array; return array;
} }
@ -391,10 +358,7 @@ public abstract class ByteArrayManager {
@Override @Override
public int release(final byte[] array) { public int release(final byte[] array) {
Preconditions.checkNotNull(array); Preconditions.checkNotNull(array);
if (LOG.isDebugEnabled()) { LOG.debug("recycle: array.length={}", array.length);
DEBUG_MESSAGE.get()
.append("recycle: array.length=").append(array.length);
}
final int freeQueueSize; final int freeQueueSize;
if (array.length == 0) { if (array.length == 0) {
@ -404,10 +368,7 @@ public abstract class ByteArrayManager {
freeQueueSize = manager == null? -1: manager.recycle(array); freeQueueSize = manager == null? -1: manager.recycle(array);
} }
if (LOG.isDebugEnabled()) { LOG.debug(", freeQueueSize={}", freeQueueSize);
DEBUG_MESSAGE.get().append(", freeQueueSize=").append(freeQueueSize);
logDebugMessage();
}
return freeQueueSize; return freeQueueSize;
} }

View File

@ -134,9 +134,7 @@ final class TokenAspect<T extends FileSystem & Renewable> {
if (token != null) { if (token != null) {
fs.setDelegationToken(token); fs.setDelegationToken(token);
addRenewAction(fs); addRenewAction(fs);
if (LOG.isDebugEnabled()) { LOG.debug("Created new DT for {}", token.getService());
LOG.debug("Created new DT for {}", token.getService());
}
} }
hasInitedToken = true; hasInitedToken = true;
} }
@ -149,9 +147,7 @@ final class TokenAspect<T extends FileSystem & Renewable> {
synchronized void initDelegationToken(UserGroupInformation ugi) { synchronized void initDelegationToken(UserGroupInformation ugi) {
Token<?> token = selectDelegationToken(ugi); Token<?> token = selectDelegationToken(ugi);
if (token != null) { if (token != null) {
if (LOG.isDebugEnabled()) { LOG.debug("Found existing DT for {}", token.getService());
LOG.debug("Found existing DT for {}", token.getService());
}
fs.setDelegationToken(token); fs.setDelegationToken(token);
hasInitedToken = true; hasInitedToken = true;
} }

View File

@ -182,9 +182,7 @@ public class URLConnectionFactory {
public URLConnection openConnection(URL url, boolean isSpnego) public URLConnection openConnection(URL url, boolean isSpnego)
throws IOException, AuthenticationException { throws IOException, AuthenticationException {
if (isSpnego) { if (isSpnego) {
if (LOG.isDebugEnabled()) { LOG.debug("open AuthenticatedURL connection {}", url);
LOG.debug("open AuthenticatedURL connection {}", url);
}
UserGroupInformation.getCurrentUser().checkTGTAndReloginFromKeytab(); UserGroupInformation.getCurrentUser().checkTGTAndReloginFromKeytab();
final AuthenticatedURL.Token authToken = new AuthenticatedURL.Token(); final AuthenticatedURL.Token authToken = new AuthenticatedURL.Token();
return new AuthenticatedURL(new KerberosUgiAuthenticator(), return new AuthenticatedURL(new KerberosUgiAuthenticator(),

View File

@ -233,16 +233,12 @@ public class WebHdfsFileSystem extends FileSystem
// refetch tokens. even if ugi has credentials, don't attempt // refetch tokens. even if ugi has credentials, don't attempt
// to get another token to match hdfs/rpc behavior // to get another token to match hdfs/rpc behavior
if (token != null) { if (token != null) {
if(LOG.isDebugEnabled()) { LOG.debug("Using UGI token: {}", token);
LOG.debug("Using UGI token: {}", token);
}
canRefreshDelegationToken = false; canRefreshDelegationToken = false;
} else { } else {
token = getDelegationToken(null); token = getDelegationToken(null);
if (token != null) { if (token != null) {
if(LOG.isDebugEnabled()) { LOG.debug("Fetched new token: {}", token);
LOG.debug("Fetched new token: {}", token);
}
} else { // security is disabled } else { // security is disabled
canRefreshDelegationToken = false; canRefreshDelegationToken = false;
} }
@ -257,9 +253,7 @@ public class WebHdfsFileSystem extends FileSystem
boolean replaced = false; boolean replaced = false;
if (canRefreshDelegationToken) { if (canRefreshDelegationToken) {
Token<?> token = getDelegationToken(null); Token<?> token = getDelegationToken(null);
if(LOG.isDebugEnabled()) { LOG.debug("Replaced expired token: {}", token);
LOG.debug("Replaced expired token: {}", token);
}
setDelegationToken(token); setDelegationToken(token);
replaced = (token != null); replaced = (token != null);
} }
@ -442,9 +436,7 @@ public class WebHdfsFileSystem extends FileSystem
InetSocketAddress nnAddr = getCurrentNNAddr(); InetSocketAddress nnAddr = getCurrentNNAddr();
final URL url = new URL(getTransportScheme(), nnAddr.getHostName(), final URL url = new URL(getTransportScheme(), nnAddr.getHostName(),
nnAddr.getPort(), path + '?' + query); nnAddr.getPort(), path + '?' + query);
if (LOG.isTraceEnabled()) { LOG.trace("url={}", url);
LOG.trace("url={}", url);
}
return url; return url;
} }
@ -479,9 +471,7 @@ public class WebHdfsFileSystem extends FileSystem
+ Param.toSortedString("&", getAuthParameters(op)) + Param.toSortedString("&", getAuthParameters(op))
+ Param.toSortedString("&", parameters); + Param.toSortedString("&", parameters);
final URL url = getNamenodeURL(path, query); final URL url = getNamenodeURL(path, query);
if (LOG.isTraceEnabled()) { LOG.trace("url={}", url);
LOG.trace("url={}", url);
}
return url; return url;
} }
@ -769,9 +759,7 @@ public class WebHdfsFileSystem extends FileSystem
} catch (Exception e) { // catch json parser errors } catch (Exception e) { // catch json parser errors
final IOException ioe = final IOException ioe =
new IOException("Response decoding failure: "+e.toString(), e); new IOException("Response decoding failure: "+e.toString(), e);
if (LOG.isDebugEnabled()) { LOG.debug("Response decoding failure.", e);
LOG.debug("Response decoding failure: {}", e.toString(), e);
}
throw ioe; throw ioe;
} finally { } finally {
conn.disconnect(); conn.disconnect();
@ -1242,9 +1230,7 @@ public class WebHdfsFileSystem extends FileSystem
cancelDelegationToken(delegationToken); cancelDelegationToken(delegationToken);
} }
} catch (IOException ioe) { } catch (IOException ioe) {
if (LOG.isDebugEnabled()) { LOG.debug("Token cancel failed: ", ioe);
LOG.debug("Token cancel failed: ", ioe);
}
} finally { } finally {
super.close(); super.close();
} }

View File

@ -1000,6 +1000,9 @@ Release 2.8.0 - UNRELEASED
HDFS-8696. Make the lower and higher watermark in the DN Netty server HDFS-8696. Make the lower and higher watermark in the DN Netty server
configurable. (Xiaobing Zhou via wheat9) configurable. (Xiaobing Zhou via wheat9)
HDFS-8971. Remove guards when calling LOG.debug() and LOG.trace() in client
package. (Mingliang Liu via wheat9)
OPTIMIZATIONS OPTIMIZATIONS
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than