HDFS-9170. Move libhdfs / fuse-dfs / libwebhdfs to hdfs-client. Contributed by Haohui Mai.
This commit is contained in:
parent
bc0487c29c
commit
cf873fff9f
|
@ -349,17 +349,13 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
|
||||||
if (clientContext.getUseLegacyBlockReaderLocal()) {
|
if (clientContext.getUseLegacyBlockReaderLocal()) {
|
||||||
reader = getLegacyBlockReaderLocal();
|
reader = getLegacyBlockReaderLocal();
|
||||||
if (reader != null) {
|
if (reader != null) {
|
||||||
if (LOG.isTraceEnabled()) {
|
LOG.trace("{}: returning new legacy block reader local.", this);
|
||||||
LOG.trace(this + ": returning new legacy block reader local.");
|
|
||||||
}
|
|
||||||
return reader;
|
return reader;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
reader = getBlockReaderLocal();
|
reader = getBlockReaderLocal();
|
||||||
if (reader != null) {
|
if (reader != null) {
|
||||||
if (LOG.isTraceEnabled()) {
|
LOG.trace("{}: returning new block reader local.", this);
|
||||||
LOG.trace(this + ": returning new block reader local.");
|
|
||||||
}
|
|
||||||
return reader;
|
return reader;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -367,10 +363,8 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
|
||||||
if (scConf.isDomainSocketDataTraffic()) {
|
if (scConf.isDomainSocketDataTraffic()) {
|
||||||
reader = getRemoteBlockReaderFromDomain();
|
reader = getRemoteBlockReaderFromDomain();
|
||||||
if (reader != null) {
|
if (reader != null) {
|
||||||
if (LOG.isTraceEnabled()) {
|
LOG.trace("{}: returning new remote block reader using UNIX domain "
|
||||||
LOG.trace(this + ": returning new remote block reader using " +
|
+ "socket on {}", this, pathInfo.getPath());
|
||||||
"UNIX domain socket on " + pathInfo.getPath());
|
|
||||||
}
|
|
||||||
return reader;
|
return reader;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -405,10 +399,8 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
|
||||||
setVisibleLength(visibleLength).
|
setVisibleLength(visibleLength).
|
||||||
build();
|
build();
|
||||||
if (accessor == null) {
|
if (accessor == null) {
|
||||||
if (LOG.isTraceEnabled()) {
|
LOG.trace("{}: No ReplicaAccessor created by {}",
|
||||||
LOG.trace(this + ": No ReplicaAccessor created by " +
|
this, cls.getName());
|
||||||
cls.getName());
|
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
return new ExternalBlockReader(accessor, visibleLength, startOffset);
|
return new ExternalBlockReader(accessor, visibleLength, startOffset);
|
||||||
}
|
}
|
||||||
|
@ -427,14 +419,10 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
|
||||||
* first introduced in HDFS-2246.
|
* first introduced in HDFS-2246.
|
||||||
*/
|
*/
|
||||||
private BlockReader getLegacyBlockReaderLocal() throws IOException {
|
private BlockReader getLegacyBlockReaderLocal() throws IOException {
|
||||||
if (LOG.isTraceEnabled()) {
|
LOG.trace("{}: trying to construct BlockReaderLocalLegacy", this);
|
||||||
LOG.trace(this + ": trying to construct BlockReaderLocalLegacy");
|
|
||||||
}
|
|
||||||
if (!DFSUtilClient.isLocalAddress(inetSocketAddress)) {
|
if (!DFSUtilClient.isLocalAddress(inetSocketAddress)) {
|
||||||
if (LOG.isTraceEnabled()) {
|
LOG.trace("{}: can't construct BlockReaderLocalLegacy because the address"
|
||||||
LOG.trace(this + ": can't construct BlockReaderLocalLegacy because " +
|
+ "{} is not local", this, inetSocketAddress);
|
||||||
"the address " + inetSocketAddress + " is not local");
|
|
||||||
}
|
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
if (clientContext.getDisableLegacyBlockReaderLocal()) {
|
if (clientContext.getDisableLegacyBlockReaderLocal()) {
|
||||||
|
@ -470,10 +458,8 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
|
||||||
}
|
}
|
||||||
|
|
||||||
private BlockReader getBlockReaderLocal() throws InvalidToken {
|
private BlockReader getBlockReaderLocal() throws InvalidToken {
|
||||||
if (LOG.isTraceEnabled()) {
|
LOG.trace("{}: trying to construct a BlockReaderLocal for short-circuit "
|
||||||
LOG.trace(this + ": trying to construct a BlockReaderLocal " +
|
+ " reads.", this);
|
||||||
"for short-circuit reads.");
|
|
||||||
}
|
|
||||||
if (pathInfo == null) {
|
if (pathInfo == null) {
|
||||||
pathInfo = clientContext.getDomainSocketFactory()
|
pathInfo = clientContext.getDomainSocketFactory()
|
||||||
.getPathInfo(inetSocketAddress, conf.getShortCircuitConf());
|
.getPathInfo(inetSocketAddress, conf.getShortCircuitConf());
|
||||||
|
@ -488,10 +474,8 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
|
||||||
ShortCircuitReplicaInfo info = cache.fetchOrCreate(key, this);
|
ShortCircuitReplicaInfo info = cache.fetchOrCreate(key, this);
|
||||||
InvalidToken exc = info.getInvalidTokenException();
|
InvalidToken exc = info.getInvalidTokenException();
|
||||||
if (exc != null) {
|
if (exc != null) {
|
||||||
if (LOG.isTraceEnabled()) {
|
LOG.trace("{}: got InvalidToken exception while trying to construct "
|
||||||
LOG.trace(this + ": got InvalidToken exception while trying to " +
|
+ "BlockReaderLocal via {}", this, pathInfo.getPath());
|
||||||
"construct BlockReaderLocal via " + pathInfo.getPath());
|
|
||||||
}
|
|
||||||
throw exc;
|
throw exc;
|
||||||
}
|
}
|
||||||
if (info.getReplica() == null) {
|
if (info.getReplica() == null) {
|
||||||
|
@ -527,9 +511,7 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
|
||||||
createShortCircuitReplicaInfoCallback.createShortCircuitReplicaInfo();
|
createShortCircuitReplicaInfoCallback.createShortCircuitReplicaInfo();
|
||||||
if (info != null) return info;
|
if (info != null) return info;
|
||||||
}
|
}
|
||||||
if (LOG.isTraceEnabled()) {
|
LOG.trace("{}: trying to create ShortCircuitReplicaInfo.", this);
|
||||||
LOG.trace(this + ": trying to create ShortCircuitReplicaInfo.");
|
|
||||||
}
|
|
||||||
BlockReaderPeer curPeer;
|
BlockReaderPeer curPeer;
|
||||||
while (true) {
|
while (true) {
|
||||||
curPeer = nextDomainPeer();
|
curPeer = nextDomainPeer();
|
||||||
|
@ -544,10 +526,8 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
|
||||||
new ExtendedBlockId(block.getBlockId(), block.getBlockPoolId()),
|
new ExtendedBlockId(block.getBlockId(), block.getBlockPoolId()),
|
||||||
clientName);
|
clientName);
|
||||||
if (usedPeer.booleanValue()) {
|
if (usedPeer.booleanValue()) {
|
||||||
if (LOG.isTraceEnabled()) {
|
LOG.trace("{}: allocShmSlot used up our previous socket {}. "
|
||||||
LOG.trace(this + ": allocShmSlot used up our previous socket " +
|
+ "Allocating a new one...", this, peer.getDomainSocket());
|
||||||
peer.getDomainSocket() + ". Allocating a new one...");
|
|
||||||
}
|
|
||||||
curPeer = nextDomainPeer();
|
curPeer = nextDomainPeer();
|
||||||
if (curPeer == null) break;
|
if (curPeer == null) break;
|
||||||
peer = (DomainPeer)curPeer.peer;
|
peer = (DomainPeer)curPeer.peer;
|
||||||
|
@ -562,9 +542,7 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
|
||||||
if (curPeer.fromCache) {
|
if (curPeer.fromCache) {
|
||||||
// Handle an I/O error we got when using a cached socket.
|
// Handle an I/O error we got when using a cached socket.
|
||||||
// These are considered less serious, because the socket may be stale.
|
// These are considered less serious, because the socket may be stale.
|
||||||
if (LOG.isDebugEnabled()) {
|
LOG.debug("{}: closing stale domain peer {}", this, peer, e);
|
||||||
LOG.debug(this + ": closing stale domain peer " + peer, e);
|
|
||||||
}
|
|
||||||
IOUtilsClient.cleanup(LOG, peer);
|
IOUtilsClient.cleanup(LOG, peer);
|
||||||
} else {
|
} else {
|
||||||
// Handle an I/O error we got when using a newly created socket.
|
// Handle an I/O error we got when using a newly created socket.
|
||||||
|
@ -617,7 +595,7 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
|
||||||
ExtendedBlockId key =
|
ExtendedBlockId key =
|
||||||
new ExtendedBlockId(block.getBlockId(), block.getBlockPoolId());
|
new ExtendedBlockId(block.getBlockId(), block.getBlockPoolId());
|
||||||
if (buf[0] == USE_RECEIPT_VERIFICATION.getNumber()) {
|
if (buf[0] == USE_RECEIPT_VERIFICATION.getNumber()) {
|
||||||
LOG.trace("Sending receipt verification byte for slot " + slot);
|
LOG.trace("Sending receipt verification byte for slot {}", slot);
|
||||||
sock.getOutputStream().write(0);
|
sock.getOutputStream().write(0);
|
||||||
}
|
}
|
||||||
replica = new ShortCircuitReplica(key, fis[0], fis[1], cache,
|
replica = new ShortCircuitReplica(key, fis[0], fis[1], cache,
|
||||||
|
@ -650,9 +628,7 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
|
||||||
String msg = "access control error while " +
|
String msg = "access control error while " +
|
||||||
"attempting to set up short-circuit access to " +
|
"attempting to set up short-circuit access to " +
|
||||||
fileName + resp.getMessage();
|
fileName + resp.getMessage();
|
||||||
if (LOG.isDebugEnabled()) {
|
LOG.debug("{}:{}", this, msg);
|
||||||
LOG.debug(this + ":" + msg);
|
|
||||||
}
|
|
||||||
return new ShortCircuitReplicaInfo(new InvalidToken(msg));
|
return new ShortCircuitReplicaInfo(new InvalidToken(msg));
|
||||||
default:
|
default:
|
||||||
LOG.warn(this + ": unknown response code " + resp.getStatus() +
|
LOG.warn(this + ": unknown response code " + resp.getStatus() +
|
||||||
|
@ -684,10 +660,8 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
|
||||||
" is not usable.", this, pathInfo);
|
" is not usable.", this, pathInfo);
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
if (LOG.isTraceEnabled()) {
|
LOG.trace("{}: trying to create a remote block reader from the UNIX domain "
|
||||||
LOG.trace(this + ": trying to create a remote block reader from the " +
|
+ "socket at {}", this, pathInfo.getPath());
|
||||||
"UNIX domain socket at " + pathInfo.getPath());
|
|
||||||
}
|
|
||||||
|
|
||||||
while (true) {
|
while (true) {
|
||||||
BlockReaderPeer curPeer = nextDomainPeer();
|
BlockReaderPeer curPeer = nextDomainPeer();
|
||||||
|
@ -701,19 +675,15 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
|
||||||
} catch (IOException ioe) {
|
} catch (IOException ioe) {
|
||||||
IOUtilsClient.cleanup(LOG, peer);
|
IOUtilsClient.cleanup(LOG, peer);
|
||||||
if (isSecurityException(ioe)) {
|
if (isSecurityException(ioe)) {
|
||||||
if (LOG.isTraceEnabled()) {
|
LOG.trace("{}: got security exception while constructing a remote "
|
||||||
LOG.trace(this + ": got security exception while constructing " +
|
+ " block reader from the unix domain socket at {}",
|
||||||
"a remote block reader from the unix domain socket at " +
|
this, pathInfo.getPath(), ioe);
|
||||||
pathInfo.getPath(), ioe);
|
|
||||||
}
|
|
||||||
throw ioe;
|
throw ioe;
|
||||||
}
|
}
|
||||||
if (curPeer.fromCache) {
|
if (curPeer.fromCache) {
|
||||||
// Handle an I/O error we got when using a cached peer. These are
|
// Handle an I/O error we got when using a cached peer. These are
|
||||||
// considered less serious, because the underlying socket may be stale.
|
// considered less serious, because the underlying socket may be stale.
|
||||||
if (LOG.isDebugEnabled()) {
|
LOG.debug("Closed potentially stale domain peer {}", peer, ioe);
|
||||||
LOG.debug("Closed potentially stale domain peer " + peer, ioe);
|
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
// Handle an I/O error we got when using a newly created domain peer.
|
// Handle an I/O error we got when using a newly created domain peer.
|
||||||
// We temporarily disable the domain socket path for a few minutes in
|
// We temporarily disable the domain socket path for a few minutes in
|
||||||
|
@ -747,10 +717,8 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
|
||||||
* If there was another problem.
|
* If there was another problem.
|
||||||
*/
|
*/
|
||||||
private BlockReader getRemoteBlockReaderFromTcp() throws IOException {
|
private BlockReader getRemoteBlockReaderFromTcp() throws IOException {
|
||||||
if (LOG.isTraceEnabled()) {
|
LOG.trace("{}: trying to create a remote block reader from a TCP socket",
|
||||||
LOG.trace(this + ": trying to create a remote block reader from a " +
|
this);
|
||||||
"TCP socket");
|
|
||||||
}
|
|
||||||
BlockReader blockReader = null;
|
BlockReader blockReader = null;
|
||||||
while (true) {
|
while (true) {
|
||||||
BlockReaderPeer curPeer = null;
|
BlockReaderPeer curPeer = null;
|
||||||
|
@ -763,19 +731,15 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
|
||||||
return blockReader;
|
return blockReader;
|
||||||
} catch (IOException ioe) {
|
} catch (IOException ioe) {
|
||||||
if (isSecurityException(ioe)) {
|
if (isSecurityException(ioe)) {
|
||||||
if (LOG.isTraceEnabled()) {
|
LOG.trace("{}: got security exception while constructing a remote "
|
||||||
LOG.trace(this + ": got security exception while constructing " +
|
+ "block reader from {}", this, peer, ioe);
|
||||||
"a remote block reader from " + peer, ioe);
|
|
||||||
}
|
|
||||||
throw ioe;
|
throw ioe;
|
||||||
}
|
}
|
||||||
if ((curPeer != null) && curPeer.fromCache) {
|
if ((curPeer != null) && curPeer.fromCache) {
|
||||||
// Handle an I/O error we got when using a cached peer. These are
|
// Handle an I/O error we got when using a cached peer. These are
|
||||||
// considered less serious, because the underlying socket may be
|
// considered less serious, because the underlying socket may be
|
||||||
// stale.
|
// stale.
|
||||||
if (LOG.isDebugEnabled()) {
|
LOG.debug("Closed potentially stale remote peer {}", peer, ioe);
|
||||||
LOG.debug("Closed potentially stale remote peer " + peer, ioe);
|
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
// Handle an I/O error we got when using a newly created peer.
|
// Handle an I/O error we got when using a newly created peer.
|
||||||
LOG.warn("I/O error constructing remote block reader.", ioe);
|
LOG.warn("I/O error constructing remote block reader.", ioe);
|
||||||
|
@ -808,9 +772,7 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
|
||||||
if (remainingCacheTries > 0) {
|
if (remainingCacheTries > 0) {
|
||||||
Peer peer = clientContext.getPeerCache().get(datanode, true);
|
Peer peer = clientContext.getPeerCache().get(datanode, true);
|
||||||
if (peer != null) {
|
if (peer != null) {
|
||||||
if (LOG.isTraceEnabled()) {
|
LOG.trace("nextDomainPeer: reusing existing peer {}", peer);
|
||||||
LOG.trace("nextDomainPeer: reusing existing peer " + peer);
|
|
||||||
}
|
|
||||||
return new BlockReaderPeer(peer, true);
|
return new BlockReaderPeer(peer, true);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -832,24 +794,18 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
|
||||||
if (remainingCacheTries > 0) {
|
if (remainingCacheTries > 0) {
|
||||||
Peer peer = clientContext.getPeerCache().get(datanode, false);
|
Peer peer = clientContext.getPeerCache().get(datanode, false);
|
||||||
if (peer != null) {
|
if (peer != null) {
|
||||||
if (LOG.isTraceEnabled()) {
|
LOG.trace("nextTcpPeer: reusing existing peer {}", peer);
|
||||||
LOG.trace("nextTcpPeer: reusing existing peer " + peer);
|
|
||||||
}
|
|
||||||
return new BlockReaderPeer(peer, true);
|
return new BlockReaderPeer(peer, true);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
Peer peer = remotePeerFactory.newConnectedPeer(inetSocketAddress, token,
|
Peer peer = remotePeerFactory.newConnectedPeer(inetSocketAddress, token,
|
||||||
datanode);
|
datanode);
|
||||||
if (LOG.isTraceEnabled()) {
|
LOG.trace("nextTcpPeer: created newConnectedPeer {}", peer);
|
||||||
LOG.trace("nextTcpPeer: created newConnectedPeer " + peer);
|
|
||||||
}
|
|
||||||
return new BlockReaderPeer(peer, false);
|
return new BlockReaderPeer(peer, false);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
if (LOG.isTraceEnabled()) {
|
LOG.trace("nextTcpPeer: failed to create newConnectedPeer connected to"
|
||||||
LOG.trace("nextTcpPeer: failed to create newConnectedPeer " +
|
+ "{}", datanode);
|
||||||
"connected to " + datanode);
|
|
||||||
}
|
|
||||||
throw e;
|
throw e;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -412,17 +412,10 @@ class BlockReaderLocal implements BlockReader {
|
||||||
public synchronized int read(ByteBuffer buf) throws IOException {
|
public synchronized int read(ByteBuffer buf) throws IOException {
|
||||||
boolean canSkipChecksum = createNoChecksumContext();
|
boolean canSkipChecksum = createNoChecksumContext();
|
||||||
try {
|
try {
|
||||||
String traceString = null;
|
String traceFormatStr = "read(buf.remaining={}, block={}, filename={}, "
|
||||||
if (LOG.isTraceEnabled()) {
|
+ "canSkipChecksum={})";
|
||||||
traceString = new StringBuilder().
|
LOG.trace(traceFormatStr + ": starting",
|
||||||
append("read(").
|
buf.remaining(), block, filename, canSkipChecksum);
|
||||||
append("buf.remaining=").append(buf.remaining()).
|
|
||||||
append(", block=").append(block).
|
|
||||||
append(", filename=").append(filename).
|
|
||||||
append(", canSkipChecksum=").append(canSkipChecksum).
|
|
||||||
append(")").toString();
|
|
||||||
LOG.info(traceString + ": starting");
|
|
||||||
}
|
|
||||||
int nRead;
|
int nRead;
|
||||||
try {
|
try {
|
||||||
if (canSkipChecksum && zeroReadaheadRequested) {
|
if (canSkipChecksum && zeroReadaheadRequested) {
|
||||||
|
@ -431,14 +424,12 @@ class BlockReaderLocal implements BlockReader {
|
||||||
nRead = readWithBounceBuffer(buf, canSkipChecksum);
|
nRead = readWithBounceBuffer(buf, canSkipChecksum);
|
||||||
}
|
}
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
if (LOG.isTraceEnabled()) {
|
LOG.trace(traceFormatStr + ": I/O error",
|
||||||
LOG.info(traceString + ": I/O error", e);
|
buf.remaining(), block, filename, canSkipChecksum, e);
|
||||||
}
|
|
||||||
throw e;
|
throw e;
|
||||||
}
|
}
|
||||||
if (LOG.isTraceEnabled()) {
|
LOG.trace(traceFormatStr + ": returning {}",
|
||||||
LOG.info(traceString + ": returning " + nRead);
|
buf.remaining(), block, filename, canSkipChecksum, nRead);
|
||||||
}
|
|
||||||
return nRead;
|
return nRead;
|
||||||
} finally {
|
} finally {
|
||||||
if (canSkipChecksum) releaseNoChecksumContext();
|
if (canSkipChecksum) releaseNoChecksumContext();
|
||||||
|
@ -490,10 +481,8 @@ class BlockReaderLocal implements BlockReader {
|
||||||
}
|
}
|
||||||
dataBuf.limit(dataBuf.position());
|
dataBuf.limit(dataBuf.position());
|
||||||
dataBuf.position(Math.min(dataBuf.position(), slop));
|
dataBuf.position(Math.min(dataBuf.position(), slop));
|
||||||
if (LOG.isTraceEnabled()) {
|
LOG.trace("loaded {} bytes into bounce buffer from offset {} of {}",
|
||||||
LOG.trace("loaded " + dataBuf.remaining() + " bytes into bounce " +
|
dataBuf.remaining(), oldDataPos, block);
|
||||||
"buffer from offset " + oldDataPos + " of " + block);
|
|
||||||
}
|
|
||||||
return dataBuf.limit() != maxReadaheadLength;
|
return dataBuf.limit() != maxReadaheadLength;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -565,18 +554,10 @@ class BlockReaderLocal implements BlockReader {
|
||||||
boolean canSkipChecksum = createNoChecksumContext();
|
boolean canSkipChecksum = createNoChecksumContext();
|
||||||
int nRead;
|
int nRead;
|
||||||
try {
|
try {
|
||||||
String traceString = null;
|
final String traceFormatStr = "read(arr.length={}, off={}, len={}, "
|
||||||
if (LOG.isTraceEnabled()) {
|
+ "filename={}, block={}, canSkipChecksum={})";
|
||||||
traceString = new StringBuilder().
|
LOG.trace(traceFormatStr + ": starting",
|
||||||
append("read(arr.length=").append(arr.length).
|
arr.length, off, len, filename, block, canSkipChecksum);
|
||||||
append(", off=").append(off).
|
|
||||||
append(", len=").append(len).
|
|
||||||
append(", filename=").append(filename).
|
|
||||||
append(", block=").append(block).
|
|
||||||
append(", canSkipChecksum=").append(canSkipChecksum).
|
|
||||||
append(")").toString();
|
|
||||||
LOG.trace(traceString + ": starting");
|
|
||||||
}
|
|
||||||
try {
|
try {
|
||||||
if (canSkipChecksum && zeroReadaheadRequested) {
|
if (canSkipChecksum && zeroReadaheadRequested) {
|
||||||
nRead = readWithoutBounceBuffer(arr, off, len);
|
nRead = readWithoutBounceBuffer(arr, off, len);
|
||||||
|
@ -584,14 +565,12 @@ class BlockReaderLocal implements BlockReader {
|
||||||
nRead = readWithBounceBuffer(arr, off, len, canSkipChecksum);
|
nRead = readWithBounceBuffer(arr, off, len, canSkipChecksum);
|
||||||
}
|
}
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
if (LOG.isTraceEnabled()) {
|
LOG.trace(traceFormatStr + ": I/O error",
|
||||||
LOG.trace(traceString + ": I/O error", e);
|
arr.length, off, len, filename, block, canSkipChecksum, e);
|
||||||
}
|
|
||||||
throw e;
|
throw e;
|
||||||
}
|
}
|
||||||
if (LOG.isTraceEnabled()) {
|
LOG.trace(traceFormatStr + ": returning {}",
|
||||||
LOG.trace(traceString + ": returning " + nRead);
|
arr.length, off, len, filename, block, canSkipChecksum, nRead);
|
||||||
}
|
|
||||||
} finally {
|
} finally {
|
||||||
if (canSkipChecksum) releaseNoChecksumContext();
|
if (canSkipChecksum) releaseNoChecksumContext();
|
||||||
}
|
}
|
||||||
|
@ -634,11 +613,9 @@ class BlockReaderLocal implements BlockReader {
|
||||||
dataBuf.position(dataBuf.position() + discardedFromBuf);
|
dataBuf.position(dataBuf.position() + discardedFromBuf);
|
||||||
remaining -= discardedFromBuf;
|
remaining -= discardedFromBuf;
|
||||||
}
|
}
|
||||||
if (LOG.isTraceEnabled()) {
|
LOG.trace("skip(n={}, block={}, filename={}): discarded {} bytes from "
|
||||||
LOG.trace("skip(n=" + n + ", block=" + block + ", filename=" +
|
+ "dataBuf and advanced dataPos by {}",
|
||||||
filename + "): discarded " + discardedFromBuf + " bytes from " +
|
n, block, filename, discardedFromBuf, remaining);
|
||||||
"dataBuf and advanced dataPos by " + remaining);
|
|
||||||
}
|
|
||||||
dataPos += remaining;
|
dataPos += remaining;
|
||||||
return n;
|
return n;
|
||||||
}
|
}
|
||||||
|
@ -653,9 +630,7 @@ class BlockReaderLocal implements BlockReader {
|
||||||
public synchronized void close() throws IOException {
|
public synchronized void close() throws IOException {
|
||||||
if (closed) return;
|
if (closed) return;
|
||||||
closed = true;
|
closed = true;
|
||||||
if (LOG.isTraceEnabled()) {
|
LOG.trace("close(filename={}, block={})", filename, block);
|
||||||
LOG.trace("close(filename=" + filename + ", block=" + block + ")");
|
|
||||||
}
|
|
||||||
replica.unref();
|
replica.unref();
|
||||||
freeDataBufIfExists();
|
freeDataBufIfExists();
|
||||||
freeChecksumBufIfExists();
|
freeChecksumBufIfExists();
|
||||||
|
@ -705,11 +680,9 @@ class BlockReaderLocal implements BlockReader {
|
||||||
(opts.contains(ReadOption.SKIP_CHECKSUMS) == false);
|
(opts.contains(ReadOption.SKIP_CHECKSUMS) == false);
|
||||||
if (anchor) {
|
if (anchor) {
|
||||||
if (!createNoChecksumContext()) {
|
if (!createNoChecksumContext()) {
|
||||||
if (LOG.isTraceEnabled()) {
|
LOG.trace("can't get an mmap for {} of {} since SKIP_CHECKSUMS was not "
|
||||||
LOG.trace("can't get an mmap for " + block + " of " + filename +
|
+ "given, we aren't skipping checksums, and the block is not "
|
||||||
" since SKIP_CHECKSUMS was not given, " +
|
+ "mlocked.", block, filename);
|
||||||
"we aren't skipping checksums, and the block is not mlocked.");
|
|
||||||
}
|
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -221,11 +221,9 @@ class BlockReaderLocalLegacy implements BlockReader {
|
||||||
File blkfile = new File(pathinfo.getBlockPath());
|
File blkfile = new File(pathinfo.getBlockPath());
|
||||||
dataIn = new FileInputStream(blkfile);
|
dataIn = new FileInputStream(blkfile);
|
||||||
|
|
||||||
if (LOG.isDebugEnabled()) {
|
LOG.debug("New BlockReaderLocalLegacy for file {} of size {} startOffset "
|
||||||
LOG.debug("New BlockReaderLocalLegacy for file " + blkfile + " of size "
|
+ "{} length {} short circuit checksum {}",
|
||||||
+ blkfile.length() + " startOffset " + startOffset + " length "
|
blkfile, blkfile.length(), startOffset, length, !skipChecksumCheck);
|
||||||
+ length + " short circuit checksum " + !skipChecksumCheck);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!skipChecksumCheck) {
|
if (!skipChecksumCheck) {
|
||||||
// get the metadata file
|
// get the metadata file
|
||||||
|
@ -292,9 +290,7 @@ class BlockReaderLocalLegacy implements BlockReader {
|
||||||
// channel for the DataNode to notify the client that the path has been
|
// channel for the DataNode to notify the client that the path has been
|
||||||
// invalidated. Therefore, our only option is to skip caching.
|
// invalidated. Therefore, our only option is to skip caching.
|
||||||
if (pathinfo != null && !storageType.isTransient()) {
|
if (pathinfo != null && !storageType.isTransient()) {
|
||||||
if (LOG.isDebugEnabled()) {
|
LOG.debug("Cached location of block {} as {}", blk, pathinfo);
|
||||||
LOG.debug("Cached location of block " + blk + " as " + pathinfo);
|
|
||||||
}
|
|
||||||
localDatanodeInfo.setBlockLocalPathInfo(blk, pathinfo);
|
localDatanodeInfo.setBlockLocalPathInfo(blk, pathinfo);
|
||||||
}
|
}
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
|
@ -603,9 +599,7 @@ class BlockReaderLocalLegacy implements BlockReader {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized int read(byte[] buf, int off, int len) throws IOException {
|
public synchronized int read(byte[] buf, int off, int len) throws IOException {
|
||||||
if (LOG.isTraceEnabled()) {
|
LOG.trace("read off {} len {}", off, len);
|
||||||
LOG.trace("read off " + off + " len " + len);
|
|
||||||
}
|
|
||||||
if (!verifyChecksum) {
|
if (!verifyChecksum) {
|
||||||
return dataIn.read(buf, off, len);
|
return dataIn.read(buf, off, len);
|
||||||
}
|
}
|
||||||
|
@ -624,9 +618,7 @@ class BlockReaderLocalLegacy implements BlockReader {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized long skip(long n) throws IOException {
|
public synchronized long skip(long n) throws IOException {
|
||||||
if (LOG.isDebugEnabled()) {
|
LOG.debug("skip {}", n);
|
||||||
LOG.debug("skip " + n);
|
|
||||||
}
|
|
||||||
if (n <= 0) {
|
if (n <= 0) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
|
@ -425,9 +425,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
|
||||||
}
|
}
|
||||||
final int idx = r.nextInt(localInterfaceAddrs.length);
|
final int idx = r.nextInt(localInterfaceAddrs.length);
|
||||||
final SocketAddress addr = localInterfaceAddrs[idx];
|
final SocketAddress addr = localInterfaceAddrs[idx];
|
||||||
if (LOG.isDebugEnabled()) {
|
LOG.debug("Using local interface {}", addr);
|
||||||
LOG.debug("Using local interface " + addr);
|
|
||||||
}
|
|
||||||
return addr;
|
return addr;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1298,9 +1296,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
|
||||||
InetSocketAddress[] favoredNodes) throws IOException {
|
InetSocketAddress[] favoredNodes) throws IOException {
|
||||||
checkOpen();
|
checkOpen();
|
||||||
final FsPermission masked = applyUMask(permission);
|
final FsPermission masked = applyUMask(permission);
|
||||||
if(LOG.isDebugEnabled()) {
|
LOG.debug("{}: masked={}", src, masked);
|
||||||
LOG.debug(src + ": masked=" + masked);
|
|
||||||
}
|
|
||||||
final DFSOutputStream result = DFSOutputStream.newStreamForCreate(this,
|
final DFSOutputStream result = DFSOutputStream.newStreamForCreate(this,
|
||||||
src, masked, flag, createParent, replication, blockSize, progress,
|
src, masked, flag, createParent, replication, blockSize, progress,
|
||||||
buffersize, dfsClientConf.createChecksum(checksumOpt),
|
buffersize, dfsClientConf.createChecksum(checksumOpt),
|
||||||
|
@ -1893,10 +1889,8 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
|
||||||
smallBufferSize));
|
smallBufferSize));
|
||||||
in = new DataInputStream(pair.in);
|
in = new DataInputStream(pair.in);
|
||||||
|
|
||||||
if (LOG.isDebugEnabled()) {
|
LOG.debug("write to {}: {}, block={}",
|
||||||
LOG.debug("write to " + datanodes[j] + ": "
|
datanodes[j], Op.BLOCK_CHECKSUM, block);
|
||||||
+ Op.BLOCK_CHECKSUM + ", block=" + block);
|
|
||||||
}
|
|
||||||
// get block MD5
|
// get block MD5
|
||||||
new Sender(out).blockChecksum(block, lb.getBlockToken());
|
new Sender(out).blockChecksum(block, lb.getBlockToken());
|
||||||
|
|
||||||
|
@ -1960,12 +1954,10 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
|
||||||
}
|
}
|
||||||
} catch (InvalidBlockTokenException ibte) {
|
} catch (InvalidBlockTokenException ibte) {
|
||||||
if (i > lastRetriedIndex) {
|
if (i > lastRetriedIndex) {
|
||||||
if (LOG.isDebugEnabled()) {
|
LOG.debug("Got access token error in response to OP_BLOCK_CHECKSUM "
|
||||||
LOG.debug("Got access token error in response to OP_BLOCK_CHECKSUM "
|
+ "for file {} for block {} from datanode {}. Will retry the "
|
||||||
+ "for file " + src + " for block " + block
|
+ "block once.",
|
||||||
+ " from datanode " + datanodes[j]
|
src, block, datanodes[j]);
|
||||||
+ ". Will retry the block once.");
|
|
||||||
}
|
|
||||||
lastRetriedIndex = i;
|
lastRetriedIndex = i;
|
||||||
done = true; // actually it's not done; but we'll retry
|
done = true; // actually it's not done; but we'll retry
|
||||||
i--; // repeat at i-th block
|
i--; // repeat at i-th block
|
||||||
|
@ -2019,9 +2011,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
|
||||||
try {
|
try {
|
||||||
sock = socketFactory.createSocket();
|
sock = socketFactory.createSocket();
|
||||||
String dnAddr = dn.getXferAddr(getConf().isConnectToDnViaHostname());
|
String dnAddr = dn.getXferAddr(getConf().isConnectToDnViaHostname());
|
||||||
if (LOG.isDebugEnabled()) {
|
LOG.debug("Connecting to datanode {}", dnAddr);
|
||||||
LOG.debug("Connecting to datanode " + dnAddr);
|
|
||||||
}
|
|
||||||
NetUtils.connect(sock, NetUtils.createSocketAddr(dnAddr), timeout);
|
NetUtils.connect(sock, NetUtils.createSocketAddr(dnAddr), timeout);
|
||||||
sock.setSoTimeout(timeout);
|
sock.setSoTimeout(timeout);
|
||||||
|
|
||||||
|
@ -2630,9 +2620,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
|
||||||
absPermission = applyUMask(null);
|
absPermission = applyUMask(null);
|
||||||
}
|
}
|
||||||
|
|
||||||
if(LOG.isDebugEnabled()) {
|
LOG.debug("{}: masked={}", src, absPermission);
|
||||||
LOG.debug(src + ": masked=" + absPermission);
|
|
||||||
}
|
|
||||||
TraceScope scope = tracer.newScope("mkdir");
|
TraceScope scope = tracer.newScope("mkdir");
|
||||||
try {
|
try {
|
||||||
return namenode.mkdirs(src, absPermission, createParent);
|
return namenode.mkdirs(src, absPermission, createParent);
|
||||||
|
@ -3123,9 +3111,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
HEDGED_READ_THREAD_POOL.allowCoreThreadTimeOut(true);
|
HEDGED_READ_THREAD_POOL.allowCoreThreadTimeOut(true);
|
||||||
if (LOG.isDebugEnabled()) {
|
LOG.debug("Using hedged reads; pool threads={}", num);
|
||||||
LOG.debug("Using hedged reads; pool threads=" + num);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
ThreadPoolExecutor getHedgedReadsThreadPool() {
|
ThreadPoolExecutor getHedgedReadsThreadPool() {
|
||||||
|
|
|
@ -315,9 +315,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
|
||||||
if (locatedBlocks == null || refresh) {
|
if (locatedBlocks == null || refresh) {
|
||||||
newInfo = dfsClient.getLocatedBlocks(src, 0);
|
newInfo = dfsClient.getLocatedBlocks(src, 0);
|
||||||
}
|
}
|
||||||
if (DFSClient.LOG.isDebugEnabled()) {
|
DFSClient.LOG.debug("newInfo = {}", newInfo);
|
||||||
DFSClient.LOG.debug("newInfo = " + newInfo);
|
|
||||||
}
|
|
||||||
if (newInfo == null) {
|
if (newInfo == null) {
|
||||||
throw new IOException("Cannot open filename " + src);
|
throw new IOException("Cannot open filename " + src);
|
||||||
}
|
}
|
||||||
|
@ -383,10 +381,8 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
|
||||||
replicaNotFoundCount--;
|
replicaNotFoundCount--;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (DFSClient.LOG.isDebugEnabled()) {
|
DFSClient.LOG.debug("Failed to getReplicaVisibleLength from datanode {}"
|
||||||
DFSClient.LOG.debug("Failed to getReplicaVisibleLength from datanode "
|
+ " for block {}", datanode, locatedblock.getBlock(), ioe);
|
||||||
+ datanode + " for block " + locatedblock.getBlock(), ioe);
|
|
||||||
}
|
|
||||||
} finally {
|
} finally {
|
||||||
if (cdp != null) {
|
if (cdp != null) {
|
||||||
RPC.stopProxy(cdp);
|
RPC.stopProxy(cdp);
|
||||||
|
@ -1067,9 +1063,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
|
||||||
}
|
}
|
||||||
final String dnAddr =
|
final String dnAddr =
|
||||||
chosenNode.getXferAddr(dfsClient.getConf().isConnectToDnViaHostname());
|
chosenNode.getXferAddr(dfsClient.getConf().isConnectToDnViaHostname());
|
||||||
if (DFSClient.LOG.isDebugEnabled()) {
|
DFSClient.LOG.debug("Connecting to datanode {}", dnAddr);
|
||||||
DFSClient.LOG.debug("Connecting to datanode " + dnAddr);
|
|
||||||
}
|
|
||||||
InetSocketAddress targetAddr = NetUtils.createSocketAddr(dnAddr);
|
InetSocketAddress targetAddr = NetUtils.createSocketAddr(dnAddr);
|
||||||
return new DNAddrPair(chosenNode, targetAddr, storageType);
|
return new DNAddrPair(chosenNode, targetAddr, storageType);
|
||||||
}
|
}
|
||||||
|
@ -1309,11 +1303,8 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
|
||||||
future.get();
|
future.get();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if (DFSClient.LOG.isDebugEnabled()) {
|
DFSClient.LOG.debug("Waited {}ms to read from {}; spawning hedged "
|
||||||
DFSClient.LOG.debug("Waited " + conf.getHedgedReadThresholdMillis()
|
+ "read", conf.getHedgedReadThresholdMillis(), chosenNode.info);
|
||||||
+ "ms to read from " + chosenNode.info
|
|
||||||
+ "; spawning hedged read");
|
|
||||||
}
|
|
||||||
// Ignore this node on next go around.
|
// Ignore this node on next go around.
|
||||||
ignored.add(chosenNode.info);
|
ignored.add(chosenNode.info);
|
||||||
dfsClient.getHedgedReadMetrics().incHedgedReadOps();
|
dfsClient.getHedgedReadMetrics().incHedgedReadOps();
|
||||||
|
@ -1340,10 +1331,8 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
|
||||||
.submit(getFromDataNodeCallable);
|
.submit(getFromDataNodeCallable);
|
||||||
futures.add(oneMoreRequest);
|
futures.add(oneMoreRequest);
|
||||||
} catch (IOException ioe) {
|
} catch (IOException ioe) {
|
||||||
if (DFSClient.LOG.isDebugEnabled()) {
|
DFSClient.LOG.debug("Failed getting node for hedged read: {}",
|
||||||
DFSClient.LOG.debug("Failed getting node for hedged read: "
|
ioe.getMessage());
|
||||||
+ ioe.getMessage());
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
// if not succeeded. Submit callables for each datanode in a loop, wait
|
// if not succeeded. Submit callables for each datanode in a loop, wait
|
||||||
// for a fixed interval and get the result from the fastest one.
|
// for a fixed interval and get the result from the fastest one.
|
||||||
|
@ -1599,11 +1588,8 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
|
||||||
throw new IOException(errMsg);
|
throw new IOException(errMsg);
|
||||||
}
|
}
|
||||||
} catch (IOException e) {//make following read to retry
|
} catch (IOException e) {//make following read to retry
|
||||||
if(DFSClient.LOG.isDebugEnabled()) {
|
DFSClient.LOG.debug("Exception while seek to {} from {} of {} from "
|
||||||
DFSClient.LOG.debug("Exception while seek to " + targetPos
|
+ "{}", targetPos, getCurrentBlock(), src, currentNode, e);
|
||||||
+ " from " + getCurrentBlock() + " of " + src + " from "
|
|
||||||
+ currentNode, e);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1819,20 +1805,16 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
|
||||||
} else {
|
} else {
|
||||||
length63 = 1 + curEnd - curPos;
|
length63 = 1 + curEnd - curPos;
|
||||||
if (length63 <= 0) {
|
if (length63 <= 0) {
|
||||||
if (DFSClient.LOG.isDebugEnabled()) {
|
DFSClient.LOG.debug("Unable to perform a zero-copy read from offset {}"
|
||||||
DFSClient.LOG.debug("Unable to perform a zero-copy read from offset " +
|
+ " of {}; {} bytes left in block. blockPos={}; curPos={};"
|
||||||
curPos + " of " + src + "; " + length63 + " bytes left in block. " +
|
+ "curEnd={}",
|
||||||
"blockPos=" + blockPos + "; curPos=" + curPos +
|
curPos, src, length63, blockPos, curPos, curEnd);
|
||||||
"; curEnd=" + curEnd);
|
|
||||||
}
|
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
if (DFSClient.LOG.isDebugEnabled()) {
|
DFSClient.LOG.debug("Reducing read length from {} to {} to avoid going "
|
||||||
DFSClient.LOG.debug("Reducing read length from " + maxLength +
|
+ "more than one byte past the end of the block. blockPos={}; "
|
||||||
" to " + length63 + " to avoid going more than one byte " +
|
+" curPos={}; curEnd={}",
|
||||||
"past the end of the block. blockPos=" + blockPos +
|
maxLength, length63, blockPos, curPos, curEnd);
|
||||||
"; curPos=" + curPos + "; curEnd=" + curEnd);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
// Make sure that don't go beyond 31-bit offsets in the MappedByteBuffer.
|
// Make sure that don't go beyond 31-bit offsets in the MappedByteBuffer.
|
||||||
int length;
|
int length;
|
||||||
|
@ -1846,28 +1828,20 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
|
||||||
// So we can't mmap the parts of the block higher than the 2 GB offset.
|
// So we can't mmap the parts of the block higher than the 2 GB offset.
|
||||||
// FIXME: we could work around this with multiple memory maps.
|
// FIXME: we could work around this with multiple memory maps.
|
||||||
// See HDFS-5101.
|
// See HDFS-5101.
|
||||||
if (DFSClient.LOG.isDebugEnabled()) {
|
DFSClient.LOG.debug("Unable to perform a zero-copy read from offset {} "
|
||||||
DFSClient.LOG.debug("Unable to perform a zero-copy read from offset " +
|
+ " of {}; 31-bit MappedByteBuffer limit exceeded. blockPos={}, "
|
||||||
curPos + " of " + src + "; 31-bit MappedByteBuffer limit " +
|
+ "curEnd={}", curPos, src, blockPos, curEnd);
|
||||||
"exceeded. blockPos=" + blockPos + ", curEnd=" + curEnd);
|
|
||||||
}
|
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
length = (int)length31;
|
length = (int)length31;
|
||||||
if (DFSClient.LOG.isDebugEnabled()) {
|
DFSClient.LOG.debug("Reducing read length from {} to {} to avoid 31-bit "
|
||||||
DFSClient.LOG.debug("Reducing read length from " + maxLength +
|
+ "limit. blockPos={}; curPos={}; curEnd={}",
|
||||||
" to " + length + " to avoid 31-bit limit. " +
|
maxLength, length, blockPos, curPos, curEnd);
|
||||||
"blockPos=" + blockPos + "; curPos=" + curPos +
|
|
||||||
"; curEnd=" + curEnd);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
final ClientMmap clientMmap = blockReader.getClientMmap(opts);
|
final ClientMmap clientMmap = blockReader.getClientMmap(opts);
|
||||||
if (clientMmap == null) {
|
if (clientMmap == null) {
|
||||||
if (DFSClient.LOG.isDebugEnabled()) {
|
DFSClient.LOG.debug("unable to perform a zero-copy read from offset {} of"
|
||||||
DFSClient.LOG.debug("unable to perform a zero-copy read from offset " +
|
+ " {}; BlockReader#getClientMmap returned null.", curPos, src);
|
||||||
curPos + " of " + src + "; BlockReader#getClientMmap returned " +
|
|
||||||
"null.");
|
|
||||||
}
|
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
boolean success = false;
|
boolean success = false;
|
||||||
|
@ -1881,11 +1855,8 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
|
||||||
synchronized (infoLock) {
|
synchronized (infoLock) {
|
||||||
readStatistics.addZeroCopyBytes(length);
|
readStatistics.addZeroCopyBytes(length);
|
||||||
}
|
}
|
||||||
if (DFSClient.LOG.isDebugEnabled()) {
|
DFSClient.LOG.debug("readZeroCopy read {} bytes from offset {} via the "
|
||||||
DFSClient.LOG.debug("readZeroCopy read " + length +
|
+ "zero-copy read path. blockEnd = {}", length, curPos, blockEnd);
|
||||||
" bytes from offset " + curPos + " via the zero-copy read " +
|
|
||||||
"path. blockEnd = " + blockEnd);
|
|
||||||
}
|
|
||||||
success = true;
|
success = true;
|
||||||
} finally {
|
} finally {
|
||||||
if (!success) {
|
if (!success) {
|
||||||
|
|
|
@ -190,9 +190,9 @@ public class DFSOutputStream extends FSOutputSummer
|
||||||
this.fileEncryptionInfo = stat.getFileEncryptionInfo();
|
this.fileEncryptionInfo = stat.getFileEncryptionInfo();
|
||||||
this.cachingStrategy = new AtomicReference<CachingStrategy>(
|
this.cachingStrategy = new AtomicReference<CachingStrategy>(
|
||||||
dfsClient.getDefaultWriteCachingStrategy());
|
dfsClient.getDefaultWriteCachingStrategy());
|
||||||
if ((progress != null) && DFSClient.LOG.isDebugEnabled()) {
|
if (progress != null) {
|
||||||
DFSClient.LOG.debug(
|
DFSClient.LOG.debug("Set non-null progress callback on DFSOutputStream "
|
||||||
"Set non-null progress callback on DFSOutputStream " + src);
|
+"{}", src);
|
||||||
}
|
}
|
||||||
|
|
||||||
this.bytesPerChecksum = checksum.getBytesPerChecksum();
|
this.bytesPerChecksum = checksum.getBytesPerChecksum();
|
||||||
|
@ -365,12 +365,9 @@ public class DFSOutputStream extends FSOutputSummer
|
||||||
final int chunkSize = csize + getChecksumSize();
|
final int chunkSize = csize + getChecksumSize();
|
||||||
chunksPerPacket = Math.max(bodySize/chunkSize, 1);
|
chunksPerPacket = Math.max(bodySize/chunkSize, 1);
|
||||||
packetSize = chunkSize*chunksPerPacket;
|
packetSize = chunkSize*chunksPerPacket;
|
||||||
if (DFSClient.LOG.isDebugEnabled()) {
|
DFSClient.LOG.debug("computePacketChunkSize: src={}, chunkSize={}, "
|
||||||
DFSClient.LOG.debug("computePacketChunkSize: src=" + src +
|
+ "chunksPerPacket={}, packetSize={}",
|
||||||
", chunkSize=" + chunkSize +
|
src, chunkSize, chunksPerPacket, packetSize);
|
||||||
", chunksPerPacket=" + chunksPerPacket +
|
|
||||||
", packetSize=" + packetSize);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
protected TraceScope createWriteTraceScope() {
|
protected TraceScope createWriteTraceScope() {
|
||||||
|
@ -397,14 +394,10 @@ public class DFSOutputStream extends FSOutputSummer
|
||||||
if (currentPacket == null) {
|
if (currentPacket == null) {
|
||||||
currentPacket = createPacket(packetSize, chunksPerPacket, getStreamer()
|
currentPacket = createPacket(packetSize, chunksPerPacket, getStreamer()
|
||||||
.getBytesCurBlock(), getStreamer().getAndIncCurrentSeqno(), false);
|
.getBytesCurBlock(), getStreamer().getAndIncCurrentSeqno(), false);
|
||||||
if (DFSClient.LOG.isDebugEnabled()) {
|
DFSClient.LOG.debug("DFSClient writeChunk allocating new packet seqno={},"
|
||||||
DFSClient.LOG.debug("DFSClient writeChunk allocating new packet seqno=" +
|
+ " src={}, packetSize={}, chunksPerPacket={}, bytesCurBlock={}",
|
||||||
currentPacket.getSeqno() +
|
currentPacket.getSeqno(), src, packetSize, chunksPerPacket,
|
||||||
", src=" + src +
|
getStreamer().getBytesCurBlock());
|
||||||
", packetSize=" + packetSize +
|
|
||||||
", chunksPerPacket=" + chunksPerPacket +
|
|
||||||
", bytesCurBlock=" + getStreamer().getBytesCurBlock());
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
currentPacket.writeChecksum(checksum, ckoff, cklen);
|
currentPacket.writeChecksum(checksum, ckoff, cklen);
|
||||||
|
@ -563,12 +556,9 @@ public class DFSOutputStream extends FSOutputSummer
|
||||||
int numKept = flushBuffer(!endBlock, true);
|
int numKept = flushBuffer(!endBlock, true);
|
||||||
// bytesCurBlock potentially incremented if there was buffered data
|
// bytesCurBlock potentially incremented if there was buffered data
|
||||||
|
|
||||||
if (DFSClient.LOG.isDebugEnabled()) {
|
DFSClient.LOG.debug("DFSClient flush(): bytesCurBlock={}, "
|
||||||
DFSClient.LOG.debug("DFSClient flush(): "
|
+ "lastFlushOffset={}, createNewBlock={}",
|
||||||
+ " bytesCurBlock=" + getStreamer().getBytesCurBlock()
|
getStreamer().getBytesCurBlock(), lastFlushOffset, endBlock);
|
||||||
+ " lastFlushOffset=" + lastFlushOffset
|
|
||||||
+ " createNewBlock=" + endBlock);
|
|
||||||
}
|
|
||||||
// Flush only if we haven't already flushed till this offset.
|
// Flush only if we haven't already flushed till this offset.
|
||||||
if (lastFlushOffset != getStreamer().getBytesCurBlock()) {
|
if (lastFlushOffset != getStreamer().getBytesCurBlock()) {
|
||||||
assert getStreamer().getBytesCurBlock() > lastFlushOffset;
|
assert getStreamer().getBytesCurBlock() > lastFlushOffset;
|
||||||
|
|
|
@ -462,19 +462,13 @@ public class DFSUtilClient {
|
||||||
InetAddress addr = targetAddr.getAddress();
|
InetAddress addr = targetAddr.getAddress();
|
||||||
Boolean cached = localAddrMap.get(addr.getHostAddress());
|
Boolean cached = localAddrMap.get(addr.getHostAddress());
|
||||||
if (cached != null) {
|
if (cached != null) {
|
||||||
if (LOG.isTraceEnabled()) {
|
LOG.trace("Address {} is {} local", targetAddr, (cached ? "" : "not"));
|
||||||
LOG.trace("Address " + targetAddr +
|
|
||||||
(cached ? " is local" : " is not local"));
|
|
||||||
}
|
|
||||||
return cached;
|
return cached;
|
||||||
}
|
}
|
||||||
|
|
||||||
boolean local = NetUtils.isLocalAddress(addr);
|
boolean local = NetUtils.isLocalAddress(addr);
|
||||||
|
|
||||||
if (LOG.isTraceEnabled()) {
|
LOG.trace("Address {} is {} local", targetAddr, (local ? "" : "not"));
|
||||||
LOG.trace("Address " + targetAddr +
|
|
||||||
(local ? " is local" : " is not local"));
|
|
||||||
}
|
|
||||||
localAddrMap.put(addr.getHostAddress(), local);
|
localAddrMap.put(addr.getHostAddress(), local);
|
||||||
return local;
|
return local;
|
||||||
}
|
}
|
||||||
|
|
|
@ -132,18 +132,14 @@ class DataStreamer extends Daemon {
|
||||||
final int length, final DFSClient client) throws IOException {
|
final int length, final DFSClient client) throws IOException {
|
||||||
final DfsClientConf conf = client.getConf();
|
final DfsClientConf conf = client.getConf();
|
||||||
final String dnAddr = first.getXferAddr(conf.isConnectToDnViaHostname());
|
final String dnAddr = first.getXferAddr(conf.isConnectToDnViaHostname());
|
||||||
if (LOG.isDebugEnabled()) {
|
LOG.debug("Connecting to datanode {}", dnAddr);
|
||||||
LOG.debug("Connecting to datanode " + dnAddr);
|
|
||||||
}
|
|
||||||
final InetSocketAddress isa = NetUtils.createSocketAddr(dnAddr);
|
final InetSocketAddress isa = NetUtils.createSocketAddr(dnAddr);
|
||||||
final Socket sock = client.socketFactory.createSocket();
|
final Socket sock = client.socketFactory.createSocket();
|
||||||
final int timeout = client.getDatanodeReadTimeout(length);
|
final int timeout = client.getDatanodeReadTimeout(length);
|
||||||
NetUtils.connect(sock, isa, client.getRandomLocalInterfaceAddr(), conf.getSocketTimeout());
|
NetUtils.connect(sock, isa, client.getRandomLocalInterfaceAddr(), conf.getSocketTimeout());
|
||||||
sock.setSoTimeout(timeout);
|
sock.setSoTimeout(timeout);
|
||||||
sock.setSendBufferSize(HdfsConstants.DEFAULT_DATA_SOCKET_SIZE);
|
sock.setSendBufferSize(HdfsConstants.DEFAULT_DATA_SOCKET_SIZE);
|
||||||
if (LOG.isDebugEnabled()) {
|
LOG.debug("Send buf size {}", sock.getSendBufferSize());
|
||||||
LOG.debug("Send buf size " + sock.getSendBufferSize());
|
|
||||||
}
|
|
||||||
return sock;
|
return sock;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -484,9 +480,7 @@ class DataStreamer extends Daemon {
|
||||||
}
|
}
|
||||||
|
|
||||||
private void endBlock() {
|
private void endBlock() {
|
||||||
if(LOG.isDebugEnabled()) {
|
LOG.debug("Closing old block {}", block);
|
||||||
LOG.debug("Closing old block " + block);
|
|
||||||
}
|
|
||||||
this.setName("DataStreamer for file " + src);
|
this.setName("DataStreamer for file " + src);
|
||||||
closeResponder();
|
closeResponder();
|
||||||
closeStream();
|
closeStream();
|
||||||
|
@ -567,15 +561,11 @@ class DataStreamer extends Daemon {
|
||||||
|
|
||||||
// get new block from namenode.
|
// get new block from namenode.
|
||||||
if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) {
|
if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) {
|
||||||
if(LOG.isDebugEnabled()) {
|
LOG.debug("Allocating new block");
|
||||||
LOG.debug("Allocating new block");
|
|
||||||
}
|
|
||||||
setPipeline(nextBlockOutputStream());
|
setPipeline(nextBlockOutputStream());
|
||||||
initDataStreaming();
|
initDataStreaming();
|
||||||
} else if (stage == BlockConstructionStage.PIPELINE_SETUP_APPEND) {
|
} else if (stage == BlockConstructionStage.PIPELINE_SETUP_APPEND) {
|
||||||
if(LOG.isDebugEnabled()) {
|
LOG.debug("Append to block {}", block);
|
||||||
LOG.debug("Append to block " + block);
|
|
||||||
}
|
|
||||||
setupPipelineForAppendOrRecovery();
|
setupPipelineForAppendOrRecovery();
|
||||||
if (streamerClosed) {
|
if (streamerClosed) {
|
||||||
continue;
|
continue;
|
||||||
|
@ -627,10 +617,7 @@ class DataStreamer extends Daemon {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (LOG.isDebugEnabled()) {
|
LOG.debug("DataStreamer block {} sending packet {}", block, one);
|
||||||
LOG.debug("DataStreamer block " + block +
|
|
||||||
" sending packet " + one);
|
|
||||||
}
|
|
||||||
|
|
||||||
// write out data to remote datanode
|
// write out data to remote datanode
|
||||||
TraceScope writeScope = dfsClient.getTracer().
|
TraceScope writeScope = dfsClient.getTracer().
|
||||||
|
@ -741,9 +728,7 @@ class DataStreamer extends Daemon {
|
||||||
TraceScope scope = dfsClient.getTracer().
|
TraceScope scope = dfsClient.getTracer().
|
||||||
newScope("waitForAckedSeqno");
|
newScope("waitForAckedSeqno");
|
||||||
try {
|
try {
|
||||||
if (LOG.isDebugEnabled()) {
|
LOG.debug("Waiting for ack for: {}", seqno);
|
||||||
LOG.debug("Waiting for ack for: " + seqno);
|
|
||||||
}
|
|
||||||
long begin = Time.monotonicNow();
|
long begin = Time.monotonicNow();
|
||||||
try {
|
try {
|
||||||
synchronized (dataQueue) {
|
synchronized (dataQueue) {
|
||||||
|
@ -955,8 +940,8 @@ class DataStreamer extends Daemon {
|
||||||
LOG.warn("Slow ReadProcessor read fields took " + duration
|
LOG.warn("Slow ReadProcessor read fields took " + duration
|
||||||
+ "ms (threshold=" + dfsclientSlowLogThresholdMs + "ms); ack: "
|
+ "ms (threshold=" + dfsclientSlowLogThresholdMs + "ms); ack: "
|
||||||
+ ack + ", targets: " + Arrays.asList(targets));
|
+ ack + ", targets: " + Arrays.asList(targets));
|
||||||
} else if (LOG.isDebugEnabled()) {
|
} else {
|
||||||
LOG.debug("DFSClient " + ack);
|
LOG.debug("DFSClient {}", ack);
|
||||||
}
|
}
|
||||||
|
|
||||||
long seqno = ack.getSeqno();
|
long seqno = ack.getSeqno();
|
||||||
|
@ -1176,9 +1161,7 @@ class DataStreamer extends Daemon {
|
||||||
}
|
}
|
||||||
|
|
||||||
private void addDatanode2ExistingPipeline() throws IOException {
|
private void addDatanode2ExistingPipeline() throws IOException {
|
||||||
if (DataTransferProtocol.LOG.isDebugEnabled()) {
|
DataTransferProtocol.LOG.debug("lastAckedSeqno = {}", lastAckedSeqno);
|
||||||
DataTransferProtocol.LOG.debug("lastAckedSeqno = " + lastAckedSeqno);
|
|
||||||
}
|
|
||||||
/*
|
/*
|
||||||
* Is data transfer necessary? We have the following cases.
|
* Is data transfer necessary? We have the following cases.
|
||||||
*
|
*
|
||||||
|
@ -1645,10 +1628,8 @@ class DataStreamer extends Daemon {
|
||||||
new HashSet<String>(Arrays.asList(favoredNodes));
|
new HashSet<String>(Arrays.asList(favoredNodes));
|
||||||
for (int i = 0; i < nodes.length; i++) {
|
for (int i = 0; i < nodes.length; i++) {
|
||||||
pinnings[i] = favoredSet.remove(nodes[i].getXferAddrWithHostname());
|
pinnings[i] = favoredSet.remove(nodes[i].getXferAddrWithHostname());
|
||||||
if (LOG.isDebugEnabled()) {
|
LOG.debug("{} was chosen by name node (favored={}).",
|
||||||
LOG.debug(nodes[i].getXferAddrWithHostname() +
|
nodes[i].getXferAddrWithHostname(), pinnings[i]);
|
||||||
" was chosen by name node (favored=" + pinnings[i] + ").");
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
if (shouldLog && !favoredSet.isEmpty()) {
|
if (shouldLog && !favoredSet.isEmpty()) {
|
||||||
// There is one or more favored nodes that were not allocated.
|
// There is one or more favored nodes that were not allocated.
|
||||||
|
@ -1787,9 +1768,7 @@ class DataStreamer extends Daemon {
|
||||||
packet.addTraceParent(Tracer.getCurrentSpanId());
|
packet.addTraceParent(Tracer.getCurrentSpanId());
|
||||||
dataQueue.addLast(packet);
|
dataQueue.addLast(packet);
|
||||||
lastQueuedSeqno = packet.getSeqno();
|
lastQueuedSeqno = packet.getSeqno();
|
||||||
if (LOG.isDebugEnabled()) {
|
LOG.debug("Queued packet {}", packet.getSeqno());
|
||||||
LOG.debug("Queued packet " + packet.getSeqno());
|
|
||||||
}
|
|
||||||
dataQueue.notifyAll();
|
dataQueue.notifyAll();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -250,9 +250,7 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader {
|
||||||
PacketHeader header = new PacketHeader();
|
PacketHeader header = new PacketHeader();
|
||||||
header.readFields(in);
|
header.readFields(in);
|
||||||
|
|
||||||
if (LOG.isDebugEnabled()) {
|
LOG.debug("DFSClient readChunk got header {}", header);
|
||||||
LOG.debug("DFSClient readChunk got header " + header);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Sanity check the lengths
|
// Sanity check the lengths
|
||||||
if (!header.sanityCheck(lastSeqNo)) {
|
if (!header.sanityCheck(lastSeqNo)) {
|
||||||
|
|
|
@ -135,14 +135,9 @@ public class RemoteBlockReader2 implements BlockReader {
|
||||||
@Override
|
@Override
|
||||||
public synchronized int read(byte[] buf, int off, int len)
|
public synchronized int read(byte[] buf, int off, int len)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
|
UUID randomId = (LOG.isTraceEnabled() ? UUID.randomUUID() : null);
|
||||||
UUID randomId = null;
|
LOG.trace("Starting read #{} file {} from datanode {}",
|
||||||
if (LOG.isTraceEnabled()) {
|
randomId, filename, datanodeID.getHostName());
|
||||||
randomId = UUID.randomUUID();
|
|
||||||
LOG.trace(String.format("Starting read #%s file %s from datanode %s",
|
|
||||||
randomId.toString(), this.filename,
|
|
||||||
this.datanodeID.getHostName()));
|
|
||||||
}
|
|
||||||
|
|
||||||
if (curDataSlice == null || curDataSlice.remaining() == 0 && bytesNeededToFinish > 0) {
|
if (curDataSlice == null || curDataSlice.remaining() == 0 && bytesNeededToFinish > 0) {
|
||||||
TraceScope scope = tracer.newScope(
|
TraceScope scope = tracer.newScope(
|
||||||
|
@ -154,9 +149,7 @@ public class RemoteBlockReader2 implements BlockReader {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (LOG.isTraceEnabled()) {
|
LOG.trace("Finishing read #{}", randomId);
|
||||||
LOG.trace(String.format("Finishing read #" + randomId));
|
|
||||||
}
|
|
||||||
|
|
||||||
if (curDataSlice.remaining() == 0) {
|
if (curDataSlice.remaining() == 0) {
|
||||||
// we're at EOF now
|
// we're at EOF now
|
||||||
|
@ -203,9 +196,7 @@ public class RemoteBlockReader2 implements BlockReader {
|
||||||
curDataSlice = packetReceiver.getDataSlice();
|
curDataSlice = packetReceiver.getDataSlice();
|
||||||
assert curDataSlice.capacity() == curHeader.getDataLen();
|
assert curDataSlice.capacity() == curHeader.getDataLen();
|
||||||
|
|
||||||
if (LOG.isTraceEnabled()) {
|
LOG.trace("DFSClient readNextPacket got header {}", curHeader);
|
||||||
LOG.trace("DFSClient readNextPacket got header " + curHeader);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Sanity check the lengths
|
// Sanity check the lengths
|
||||||
if (!curHeader.sanityCheck(lastSeqNo)) {
|
if (!curHeader.sanityCheck(lastSeqNo)) {
|
||||||
|
@ -276,10 +267,8 @@ public class RemoteBlockReader2 implements BlockReader {
|
||||||
}
|
}
|
||||||
|
|
||||||
private void readTrailingEmptyPacket() throws IOException {
|
private void readTrailingEmptyPacket() throws IOException {
|
||||||
if (LOG.isTraceEnabled()) {
|
LOG.trace("Reading empty packet at end of read");
|
||||||
LOG.trace("Reading empty packet at end of read");
|
|
||||||
}
|
|
||||||
|
|
||||||
packetReceiver.receiveNextPacket(in);
|
packetReceiver.receiveNextPacket(in);
|
||||||
|
|
||||||
PacketHeader trailer = packetReceiver.getHeader();
|
PacketHeader trailer = packetReceiver.getHeader();
|
||||||
|
|
|
@ -308,10 +308,7 @@ public class LeaseRenewer {
|
||||||
}
|
}
|
||||||
LeaseRenewer.this.run(id);
|
LeaseRenewer.this.run(id);
|
||||||
} catch(InterruptedException e) {
|
} catch(InterruptedException e) {
|
||||||
if (LOG.isDebugEnabled()) {
|
LOG.debug("LeaseRenewer is interrupted.", e);
|
||||||
LOG.debug(LeaseRenewer.this.getClass().getSimpleName()
|
|
||||||
+ " is interrupted.", e);
|
|
||||||
}
|
|
||||||
} finally {
|
} finally {
|
||||||
synchronized(LeaseRenewer.this) {
|
synchronized(LeaseRenewer.this) {
|
||||||
Factory.INSTANCE.remove(LeaseRenewer.this);
|
Factory.INSTANCE.remove(LeaseRenewer.this);
|
||||||
|
@ -399,9 +396,7 @@ public class LeaseRenewer {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (daemonCopy != null) {
|
if (daemonCopy != null) {
|
||||||
if(LOG.isDebugEnabled()) {
|
LOG.debug("Wait for lease checker to terminate");
|
||||||
LOG.debug("Wait for lease checker to terminate");
|
|
||||||
}
|
|
||||||
daemonCopy.join();
|
daemonCopy.join();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -424,16 +419,11 @@ public class LeaseRenewer {
|
||||||
//skip if current client name is the same as the previous name.
|
//skip if current client name is the same as the previous name.
|
||||||
if (!c.getClientName().equals(previousName)) {
|
if (!c.getClientName().equals(previousName)) {
|
||||||
if (!c.renewLease()) {
|
if (!c.renewLease()) {
|
||||||
if (LOG.isDebugEnabled()) {
|
LOG.debug("Did not renew lease for client {}", c);
|
||||||
LOG.debug("Did not renew lease for client " +
|
|
||||||
c);
|
|
||||||
}
|
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
previousName = c.getClientName();
|
previousName = c.getClientName();
|
||||||
if (LOG.isDebugEnabled()) {
|
LOG.debug("Lease renewed for client {}", previousName);
|
||||||
LOG.debug("Lease renewed for client " + previousName);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -147,11 +147,9 @@ public class PacketReceiver implements Closeable {
|
||||||
throw new IOException("Invalid header length " + headerLen);
|
throw new IOException("Invalid header length " + headerLen);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (LOG.isTraceEnabled()) {
|
LOG.trace("readNextPacket: dataPlusChecksumLen={}, headerLen={}",
|
||||||
LOG.trace("readNextPacket: dataPlusChecksumLen = " + dataPlusChecksumLen +
|
dataPlusChecksumLen, headerLen);
|
||||||
" headerLen = " + headerLen);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Sanity check the buffer size so we don't allocate too much memory
|
// Sanity check the buffer size so we don't allocate too much memory
|
||||||
// and OOME.
|
// and OOME.
|
||||||
int totalLen = payloadLen + headerLen;
|
int totalLen = payloadLen + headerLen;
|
||||||
|
|
|
@ -73,10 +73,8 @@ public class Sender implements DataTransferProtocol {
|
||||||
|
|
||||||
private static void send(final DataOutputStream out, final Op opcode,
|
private static void send(final DataOutputStream out, final Op opcode,
|
||||||
final Message proto) throws IOException {
|
final Message proto) throws IOException {
|
||||||
if (LOG.isTraceEnabled()) {
|
LOG.trace("Sending DataTransferOp {}: {}",
|
||||||
LOG.trace("Sending DataTransferOp " + proto.getClass().getSimpleName()
|
proto.getClass().getSimpleName(), proto);
|
||||||
+ ": " + proto);
|
|
||||||
}
|
|
||||||
op(out, opcode);
|
op(out, opcode);
|
||||||
proto.writeDelimitedTo(out);
|
proto.writeDelimitedTo(out);
|
||||||
out.flush();
|
out.flush();
|
||||||
|
|
|
@ -332,11 +332,9 @@ public final class DataTransferSaslUtil {
|
||||||
public static IOStreamPair createStreamPair(Configuration conf,
|
public static IOStreamPair createStreamPair(Configuration conf,
|
||||||
CipherOption cipherOption, OutputStream out, InputStream in,
|
CipherOption cipherOption, OutputStream out, InputStream in,
|
||||||
boolean isServer) throws IOException {
|
boolean isServer) throws IOException {
|
||||||
if (LOG.isDebugEnabled()) {
|
LOG.debug("Creating IOStreamPair of CryptoInputStream and "
|
||||||
LOG.debug("Creating IOStreamPair of CryptoInputStream and " +
|
+ "CryptoOutputStream.");
|
||||||
"CryptoOutputStream.");
|
CryptoCodec codec = CryptoCodec.getInstance(conf,
|
||||||
}
|
|
||||||
CryptoCodec codec = CryptoCodec.getInstance(conf,
|
|
||||||
cipherOption.getCipherSuite());
|
cipherOption.getCipherSuite());
|
||||||
byte[] inKey = cipherOption.getInKey();
|
byte[] inKey = cipherOption.getInKey();
|
||||||
byte[] inIv = cipherOption.getInIv();
|
byte[] inIv = cipherOption.getInIv();
|
||||||
|
|
|
@ -137,9 +137,7 @@ public class ClientDatanodeProtocolTranslatorPB implements
|
||||||
throws IOException {
|
throws IOException {
|
||||||
final String dnAddr = datanodeid.getIpcAddr(connectToDnViaHostname);
|
final String dnAddr = datanodeid.getIpcAddr(connectToDnViaHostname);
|
||||||
InetSocketAddress addr = NetUtils.createSocketAddr(dnAddr);
|
InetSocketAddress addr = NetUtils.createSocketAddr(dnAddr);
|
||||||
if (LOG.isDebugEnabled()) {
|
LOG.debug("Connecting to datanode {} addr={}", dnAddr, addr);
|
||||||
LOG.debug("Connecting to datanode " + dnAddr + " addr=" + addr);
|
|
||||||
}
|
|
||||||
rpcProxy = createClientDatanodeProtocolProxy(addr,
|
rpcProxy = createClientDatanodeProtocolProxy(addr,
|
||||||
UserGroupInformation.getCurrentUser(), conf,
|
UserGroupInformation.getCurrentUser(), conf,
|
||||||
NetUtils.getDefaultSocketFactory(conf), socketTimeout);
|
NetUtils.getDefaultSocketFactory(conf), socketTimeout);
|
||||||
|
@ -150,10 +148,8 @@ public class ClientDatanodeProtocolTranslatorPB implements
|
||||||
boolean connectToDnViaHostname, LocatedBlock locatedBlock) throws IOException {
|
boolean connectToDnViaHostname, LocatedBlock locatedBlock) throws IOException {
|
||||||
final String dnAddr = datanodeid.getIpcAddr(connectToDnViaHostname);
|
final String dnAddr = datanodeid.getIpcAddr(connectToDnViaHostname);
|
||||||
InetSocketAddress addr = NetUtils.createSocketAddr(dnAddr);
|
InetSocketAddress addr = NetUtils.createSocketAddr(dnAddr);
|
||||||
if (LOG.isDebugEnabled()) {
|
LOG.debug("Connecting to datanode {} addr={}", dnAddr, addr);
|
||||||
LOG.debug("Connecting to datanode " + dnAddr + " addr=" + addr);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Since we're creating a new UserGroupInformation here, we know that no
|
// Since we're creating a new UserGroupInformation here, we know that no
|
||||||
// future RPC proxies will be able to re-use the same connection. And
|
// future RPC proxies will be able to re-use the same connection. And
|
||||||
// usages of this proxy tend to be one-off calls.
|
// usages of this proxy tend to be one-off calls.
|
||||||
|
|
|
@ -129,18 +129,13 @@ public class DfsClientShmManager implements Closeable {
|
||||||
ShmId shmId = shm.getShmId();
|
ShmId shmId = shm.getShmId();
|
||||||
Slot slot = shm.allocAndRegisterSlot(blockId);
|
Slot slot = shm.allocAndRegisterSlot(blockId);
|
||||||
if (shm.isFull()) {
|
if (shm.isFull()) {
|
||||||
if (LOG.isTraceEnabled()) {
|
LOG.trace("{}: pulled the last slot {} out of {}",
|
||||||
LOG.trace(this + ": pulled the last slot " + slot.getSlotIdx() +
|
this, slot.getSlotIdx(), shm);
|
||||||
" out of " + shm);
|
|
||||||
}
|
|
||||||
DfsClientShm removedShm = notFull.remove(shmId);
|
DfsClientShm removedShm = notFull.remove(shmId);
|
||||||
Preconditions.checkState(removedShm == shm);
|
Preconditions.checkState(removedShm == shm);
|
||||||
full.put(shmId, shm);
|
full.put(shmId, shm);
|
||||||
} else {
|
} else {
|
||||||
if (LOG.isTraceEnabled()) {
|
LOG.trace("{}: pulled slot {} out of {}", this, slot.getSlotIdx(), shm);
|
||||||
LOG.trace(this + ": pulled slot " + slot.getSlotIdx() +
|
|
||||||
" out of " + shm);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
return slot;
|
return slot;
|
||||||
}
|
}
|
||||||
|
@ -187,9 +182,7 @@ public class DfsClientShmManager implements Closeable {
|
||||||
DfsClientShm shm =
|
DfsClientShm shm =
|
||||||
new DfsClientShm(PBHelperClient.convert(resp.getId()),
|
new DfsClientShm(PBHelperClient.convert(resp.getId()),
|
||||||
fis[0], this, peer);
|
fis[0], this, peer);
|
||||||
if (LOG.isTraceEnabled()) {
|
LOG.trace("{}: createNewShm: created {}", this, shm);
|
||||||
LOG.trace(this + ": createNewShm: created " + shm);
|
|
||||||
}
|
|
||||||
return shm;
|
return shm;
|
||||||
} finally {
|
} finally {
|
||||||
try {
|
try {
|
||||||
|
@ -234,15 +227,11 @@ public class DfsClientShmManager implements Closeable {
|
||||||
String clientName, ExtendedBlockId blockId) throws IOException {
|
String clientName, ExtendedBlockId blockId) throws IOException {
|
||||||
while (true) {
|
while (true) {
|
||||||
if (closed) {
|
if (closed) {
|
||||||
if (LOG.isTraceEnabled()) {
|
LOG.trace("{}: the DfsClientShmManager has been closed.", this);
|
||||||
LOG.trace(this + ": the DfsClientShmManager has been closed.");
|
|
||||||
}
|
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
if (disabled) {
|
if (disabled) {
|
||||||
if (LOG.isTraceEnabled()) {
|
LOG.trace("{}: shared memory segment access is disabled.", this);
|
||||||
LOG.trace(this + ": shared memory segment access is disabled.");
|
|
||||||
}
|
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
// Try to use an existing slot.
|
// Try to use an existing slot.
|
||||||
|
@ -253,9 +242,7 @@ public class DfsClientShmManager implements Closeable {
|
||||||
// There are no free slots. If someone is loading more slots, wait
|
// There are no free slots. If someone is loading more slots, wait
|
||||||
// for that to finish.
|
// for that to finish.
|
||||||
if (loading) {
|
if (loading) {
|
||||||
if (LOG.isTraceEnabled()) {
|
LOG.trace("{}: waiting for loading to finish...", this);
|
||||||
LOG.trace(this + ": waiting for loading to finish...");
|
|
||||||
}
|
|
||||||
finishedLoading.awaitUninterruptibly();
|
finishedLoading.awaitUninterruptibly();
|
||||||
} else {
|
} else {
|
||||||
// Otherwise, load the slot ourselves.
|
// Otherwise, load the slot ourselves.
|
||||||
|
@ -282,11 +269,9 @@ public class DfsClientShmManager implements Closeable {
|
||||||
// fired and marked the shm as disconnected. In this case, we
|
// fired and marked the shm as disconnected. In this case, we
|
||||||
// obviously don't want to add the SharedMemorySegment to our list
|
// obviously don't want to add the SharedMemorySegment to our list
|
||||||
// of valid not-full segments.
|
// of valid not-full segments.
|
||||||
if (LOG.isDebugEnabled()) {
|
LOG.debug("{}: the UNIX domain socket associated with this "
|
||||||
LOG.debug(this + ": the UNIX domain socket associated with " +
|
+ "short-circuit memory closed before we could make use of "
|
||||||
"this short-circuit memory closed before we could make " +
|
+ "the shm.", this);
|
||||||
"use of the shm.");
|
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
notFull.put(shm.getShmId(), shm);
|
notFull.put(shm.getShmId(), shm);
|
||||||
}
|
}
|
||||||
|
@ -309,9 +294,7 @@ public class DfsClientShmManager implements Closeable {
|
||||||
Preconditions.checkState(!full.containsKey(shm.getShmId()));
|
Preconditions.checkState(!full.containsKey(shm.getShmId()));
|
||||||
Preconditions.checkState(!notFull.containsKey(shm.getShmId()));
|
Preconditions.checkState(!notFull.containsKey(shm.getShmId()));
|
||||||
if (shm.isEmpty()) {
|
if (shm.isEmpty()) {
|
||||||
if (LOG.isTraceEnabled()) {
|
LOG.trace("{}: freeing empty stale {}", this, shm);
|
||||||
LOG.trace(this + ": freeing empty stale " + shm);
|
|
||||||
}
|
|
||||||
shm.free();
|
shm.free();
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
@ -336,10 +319,8 @@ public class DfsClientShmManager implements Closeable {
|
||||||
// lowest ID, but it could still occur. In most workloads,
|
// lowest ID, but it could still occur. In most workloads,
|
||||||
// fragmentation should not be a major concern, since it doesn't impact
|
// fragmentation should not be a major concern, since it doesn't impact
|
||||||
// peak file descriptor usage or the speed of allocation.
|
// peak file descriptor usage or the speed of allocation.
|
||||||
if (LOG.isTraceEnabled()) {
|
LOG.trace("{}: shutting down UNIX domain socket for empty {}",
|
||||||
LOG.trace(this + ": shutting down UNIX domain socket for " +
|
this, shm);
|
||||||
"empty " + shm);
|
|
||||||
}
|
|
||||||
shutdown(shm);
|
shutdown(shm);
|
||||||
} else {
|
} else {
|
||||||
notFull.put(shmId, shm);
|
notFull.put(shmId, shm);
|
||||||
|
|
|
@ -103,9 +103,7 @@ public class ShortCircuitCache implements Closeable {
|
||||||
if (ShortCircuitCache.this.closed) return;
|
if (ShortCircuitCache.this.closed) return;
|
||||||
long curMs = Time.monotonicNow();
|
long curMs = Time.monotonicNow();
|
||||||
|
|
||||||
if (LOG.isDebugEnabled()) {
|
LOG.debug("{}: cache cleaner running at {}", this, curMs);
|
||||||
LOG.debug(this + ": cache cleaner running at " + curMs);
|
|
||||||
}
|
|
||||||
|
|
||||||
int numDemoted = demoteOldEvictableMmaped(curMs);
|
int numDemoted = demoteOldEvictableMmaped(curMs);
|
||||||
int numPurged = 0;
|
int numPurged = 0;
|
||||||
|
@ -127,11 +125,9 @@ public class ShortCircuitCache implements Closeable {
|
||||||
numPurged++;
|
numPurged++;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (LOG.isDebugEnabled()) {
|
LOG.debug("{}: finishing cache cleaner run started at {}. Demoted {} "
|
||||||
LOG.debug(this + ": finishing cache cleaner run started at " +
|
+ "mmapped replicas; purged {} replicas.",
|
||||||
curMs + ". Demoted " + numDemoted + " mmapped replicas; " +
|
this, curMs, numDemoted, numPurged);
|
||||||
"purged " + numPurged + " replicas.");
|
|
||||||
}
|
|
||||||
} finally {
|
} finally {
|
||||||
ShortCircuitCache.this.lock.unlock();
|
ShortCircuitCache.this.lock.unlock();
|
||||||
}
|
}
|
||||||
|
@ -186,9 +182,7 @@ public class ShortCircuitCache implements Closeable {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
if (LOG.isTraceEnabled()) {
|
LOG.trace("{}: about to release {}", ShortCircuitCache.this, slot);
|
||||||
LOG.trace(ShortCircuitCache.this + ": about to release " + slot);
|
|
||||||
}
|
|
||||||
final DfsClientShm shm = (DfsClientShm)slot.getShm();
|
final DfsClientShm shm = (DfsClientShm)slot.getShm();
|
||||||
final DomainSocket shmSock = shm.getPeer().getDomainSocket();
|
final DomainSocket shmSock = shm.getPeer().getDomainSocket();
|
||||||
final String path = shmSock.getPath();
|
final String path = shmSock.getPath();
|
||||||
|
@ -205,9 +199,7 @@ public class ShortCircuitCache implements Closeable {
|
||||||
String error = resp.hasError() ? resp.getError() : "(unknown)";
|
String error = resp.hasError() ? resp.getError() : "(unknown)";
|
||||||
throw new IOException(resp.getStatus().toString() + ": " + error);
|
throw new IOException(resp.getStatus().toString() + ": " + error);
|
||||||
}
|
}
|
||||||
if (LOG.isTraceEnabled()) {
|
LOG.trace("{}: released {}", this, slot);
|
||||||
LOG.trace(ShortCircuitCache.this + ": released " + slot);
|
|
||||||
}
|
|
||||||
success = true;
|
success = true;
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
LOG.error(ShortCircuitCache.this + ": failed to release " +
|
LOG.error(ShortCircuitCache.this + ": failed to release " +
|
||||||
|
@ -433,9 +425,7 @@ public class ShortCircuitCache implements Closeable {
|
||||||
purgeReason = "purging replica because it is stale.";
|
purgeReason = "purging replica because it is stale.";
|
||||||
}
|
}
|
||||||
if (purgeReason != null) {
|
if (purgeReason != null) {
|
||||||
if (LOG.isDebugEnabled()) {
|
LOG.debug("{}: {}", this, purgeReason);
|
||||||
LOG.debug(this + ": " + purgeReason);
|
|
||||||
}
|
|
||||||
purge(replica);
|
purge(replica);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -677,10 +667,8 @@ public class ShortCircuitCache implements Closeable {
|
||||||
ShortCircuitReplicaInfo info = null;
|
ShortCircuitReplicaInfo info = null;
|
||||||
do {
|
do {
|
||||||
if (closed) {
|
if (closed) {
|
||||||
if (LOG.isTraceEnabled()) {
|
LOG.trace("{}: can't fethchOrCreate {} because the cache is closed.",
|
||||||
LOG.trace(this + ": can't fetchOrCreate " + key +
|
this, key);
|
||||||
" because the cache is closed.");
|
|
||||||
}
|
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
Waitable<ShortCircuitReplicaInfo> waitable = replicaInfoMap.get(key);
|
Waitable<ShortCircuitReplicaInfo> waitable = replicaInfoMap.get(key);
|
||||||
|
@ -688,9 +676,7 @@ public class ShortCircuitCache implements Closeable {
|
||||||
try {
|
try {
|
||||||
info = fetch(key, waitable);
|
info = fetch(key, waitable);
|
||||||
} catch (RetriableException e) {
|
} catch (RetriableException e) {
|
||||||
if (LOG.isDebugEnabled()) {
|
LOG.debug("{}: retrying {}", this, e.getMessage());
|
||||||
LOG.debug(this + ": retrying " + e.getMessage());
|
|
||||||
}
|
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -721,9 +707,7 @@ public class ShortCircuitCache implements Closeable {
|
||||||
// ShortCircuitReplica. So we simply wait for it to complete.
|
// ShortCircuitReplica. So we simply wait for it to complete.
|
||||||
ShortCircuitReplicaInfo info;
|
ShortCircuitReplicaInfo info;
|
||||||
try {
|
try {
|
||||||
if (LOG.isTraceEnabled()) {
|
LOG.trace("{}: found waitable for {}", this, key);
|
||||||
LOG.trace(this + ": found waitable for " + key);
|
|
||||||
}
|
|
||||||
info = waitable.await();
|
info = waitable.await();
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
LOG.info(this + ": interrupted while waiting for " + key);
|
LOG.info(this + ": interrupted while waiting for " + key);
|
||||||
|
@ -765,9 +749,7 @@ public class ShortCircuitCache implements Closeable {
|
||||||
// Handle loading a new replica.
|
// Handle loading a new replica.
|
||||||
ShortCircuitReplicaInfo info = null;
|
ShortCircuitReplicaInfo info = null;
|
||||||
try {
|
try {
|
||||||
if (LOG.isTraceEnabled()) {
|
LOG.trace("{}: loading {}", this, key);
|
||||||
LOG.trace(this + ": loading " + key);
|
|
||||||
}
|
|
||||||
info = creator.createShortCircuitReplicaInfo();
|
info = creator.createShortCircuitReplicaInfo();
|
||||||
} catch (RuntimeException e) {
|
} catch (RuntimeException e) {
|
||||||
LOG.warn(this + ": failed to load " + key, e);
|
LOG.warn(this + ": failed to load " + key, e);
|
||||||
|
@ -777,9 +759,7 @@ public class ShortCircuitCache implements Closeable {
|
||||||
try {
|
try {
|
||||||
if (info.getReplica() != null) {
|
if (info.getReplica() != null) {
|
||||||
// On success, make sure the cache cleaner thread is running.
|
// On success, make sure the cache cleaner thread is running.
|
||||||
if (LOG.isTraceEnabled()) {
|
LOG.trace("{}: successfully loaded {}", this, info.getReplica());
|
||||||
LOG.trace(this + ": successfully loaded " + info.getReplica());
|
|
||||||
}
|
|
||||||
startCacheCleanerThreadIfNeeded();
|
startCacheCleanerThreadIfNeeded();
|
||||||
// Note: new ShortCircuitReplicas start with a refCount of 2,
|
// Note: new ShortCircuitReplicas start with a refCount of 2,
|
||||||
// indicating that both this cache and whoever requested the
|
// indicating that both this cache and whoever requested the
|
||||||
|
@ -811,10 +791,8 @@ public class ShortCircuitCache implements Closeable {
|
||||||
cleanerExecutor.scheduleAtFixedRate(cacheCleaner, rateMs, rateMs,
|
cleanerExecutor.scheduleAtFixedRate(cacheCleaner, rateMs, rateMs,
|
||||||
TimeUnit.MILLISECONDS);
|
TimeUnit.MILLISECONDS);
|
||||||
cacheCleaner.setFuture(future);
|
cacheCleaner.setFuture(future);
|
||||||
if (LOG.isDebugEnabled()) {
|
LOG.debug("{}: starting cache cleaner thread which will run every {} ms",
|
||||||
LOG.debug(this + ": starting cache cleaner thread which will run " +
|
this, rateMs);
|
||||||
"every " + rateMs + " ms");
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -832,17 +810,12 @@ public class ShortCircuitCache implements Closeable {
|
||||||
long lastAttemptTimeMs = (Long)replica.mmapData;
|
long lastAttemptTimeMs = (Long)replica.mmapData;
|
||||||
long delta = Time.monotonicNow() - lastAttemptTimeMs;
|
long delta = Time.monotonicNow() - lastAttemptTimeMs;
|
||||||
if (delta < mmapRetryTimeoutMs) {
|
if (delta < mmapRetryTimeoutMs) {
|
||||||
if (LOG.isTraceEnabled()) {
|
LOG.trace("{}: can't create client mmap for {} because we failed to"
|
||||||
LOG.trace(this + ": can't create client mmap for " +
|
+ " create one just {}ms ago.", this, replica, delta);
|
||||||
replica + " because we failed to " +
|
|
||||||
"create one just " + delta + "ms ago.");
|
|
||||||
}
|
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
if (LOG.isTraceEnabled()) {
|
LOG.trace("{}: retrying client mmap for {}, {} ms after the previous "
|
||||||
LOG.trace(this + ": retrying client mmap for " + replica +
|
+ "failure.", this, replica, delta);
|
||||||
", " + delta + " ms after the previous failure.");
|
|
||||||
}
|
|
||||||
} else if (replica.mmapData instanceof Condition) {
|
} else if (replica.mmapData instanceof Condition) {
|
||||||
Condition cond = (Condition)replica.mmapData;
|
Condition cond = (Condition)replica.mmapData;
|
||||||
cond.awaitUninterruptibly();
|
cond.awaitUninterruptibly();
|
||||||
|
@ -965,38 +938,10 @@ public class ShortCircuitCache implements Closeable {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (LOG.isDebugEnabled()) {
|
LOG.debug("visiting {} with outstandingMmapCount={}, replicas={}, "
|
||||||
StringBuilder builder = new StringBuilder();
|
+ "failedLoads={}, evictable={}, evictableMmapped={}",
|
||||||
builder.append("visiting ").append(visitor.getClass().getName()).
|
visitor.getClass().getName(), outstandingMmapCount, replicas,
|
||||||
append("with outstandingMmapCount=").append(outstandingMmapCount).
|
failedLoads, evictable, evictableMmapped);
|
||||||
append(", replicas=");
|
|
||||||
String prefix = "";
|
|
||||||
for (Entry<ExtendedBlockId, ShortCircuitReplica> entry : replicas.entrySet()) {
|
|
||||||
builder.append(prefix).append(entry.getValue());
|
|
||||||
prefix = ",";
|
|
||||||
}
|
|
||||||
prefix = "";
|
|
||||||
builder.append(", failedLoads=");
|
|
||||||
for (Entry<ExtendedBlockId, InvalidToken> entry : failedLoads.entrySet()) {
|
|
||||||
builder.append(prefix).append(entry.getValue());
|
|
||||||
prefix = ",";
|
|
||||||
}
|
|
||||||
prefix = "";
|
|
||||||
builder.append(", evictable=");
|
|
||||||
for (Entry<Long, ShortCircuitReplica> entry : evictable.entrySet()) {
|
|
||||||
builder.append(prefix).append(entry.getKey()).
|
|
||||||
append(":").append(entry.getValue());
|
|
||||||
prefix = ",";
|
|
||||||
}
|
|
||||||
prefix = "";
|
|
||||||
builder.append(", evictableMmapped=");
|
|
||||||
for (Entry<Long, ShortCircuitReplica> entry : evictableMmapped.entrySet()) {
|
|
||||||
builder.append(prefix).append(entry.getKey()).
|
|
||||||
append(":").append(entry.getValue());
|
|
||||||
prefix = ",";
|
|
||||||
}
|
|
||||||
LOG.debug(builder.toString());
|
|
||||||
}
|
|
||||||
visitor.visit(outstandingMmapCount, replicas, failedLoads,
|
visitor.visit(outstandingMmapCount, replicas, failedLoads,
|
||||||
evictable, evictableMmapped);
|
evictable, evictableMmapped);
|
||||||
} finally {
|
} finally {
|
||||||
|
|
|
@ -154,25 +154,19 @@ public class ShortCircuitReplica {
|
||||||
// Check staleness by looking at the shared memory area we use to
|
// Check staleness by looking at the shared memory area we use to
|
||||||
// communicate with the DataNode.
|
// communicate with the DataNode.
|
||||||
boolean stale = !slot.isValid();
|
boolean stale = !slot.isValid();
|
||||||
if (LOG.isTraceEnabled()) {
|
LOG.trace("{}: checked shared memory segment. isStale={}", this, stale);
|
||||||
LOG.trace(this + ": checked shared memory segment. isStale=" + stale);
|
|
||||||
}
|
|
||||||
return stale;
|
return stale;
|
||||||
} else {
|
} else {
|
||||||
// Fall back to old, time-based staleness method.
|
// Fall back to old, time-based staleness method.
|
||||||
long deltaMs = Time.monotonicNow() - creationTimeMs;
|
long deltaMs = Time.monotonicNow() - creationTimeMs;
|
||||||
long staleThresholdMs = cache.getStaleThresholdMs();
|
long staleThresholdMs = cache.getStaleThresholdMs();
|
||||||
if (deltaMs > staleThresholdMs) {
|
if (deltaMs > staleThresholdMs) {
|
||||||
if (LOG.isTraceEnabled()) {
|
LOG.trace("{} is stale because it's {} ms old and staleThreadholdMS={}",
|
||||||
LOG.trace(this + " is stale because it's " + deltaMs +
|
this, deltaMs, staleThresholdMs);
|
||||||
" ms old, and staleThresholdMs = " + staleThresholdMs);
|
|
||||||
}
|
|
||||||
return true;
|
return true;
|
||||||
} else {
|
} else {
|
||||||
if (LOG.isTraceEnabled()) {
|
LOG.trace("{} is not stale because it's only {} ms old "
|
||||||
LOG.trace(this + " is not stale because it's only " + deltaMs +
|
+ "and staleThresholdMs={}", this, deltaMs, staleThresholdMs);
|
||||||
" ms old, and staleThresholdMs = " + staleThresholdMs);
|
|
||||||
}
|
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -194,13 +188,8 @@ public class ShortCircuitReplica {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
boolean result = slot.addAnchor();
|
boolean result = slot.addAnchor();
|
||||||
if (LOG.isTraceEnabled()) {
|
LOG.trace("{}: {} no-checksum anchor to slot {}",
|
||||||
if (result) {
|
this, result ? "added" : "could not add", slot);
|
||||||
LOG.trace(this + ": added no-checksum anchor to slot " + slot);
|
|
||||||
} else {
|
|
||||||
LOG.trace(this + ": could not add no-checksum anchor to slot " + slot);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -263,9 +252,7 @@ public class ShortCircuitReplica {
|
||||||
suffix += " scheduling " + slot + " for later release.";
|
suffix += " scheduling " + slot + " for later release.";
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (LOG.isTraceEnabled()) {
|
LOG.trace("closed {}{}", this, suffix);
|
||||||
LOG.trace("closed " + this + suffix);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public FileInputStream getDataStream() {
|
public FileInputStream getDataStream() {
|
||||||
|
@ -293,9 +280,7 @@ public class ShortCircuitReplica {
|
||||||
FileChannel channel = dataStream.getChannel();
|
FileChannel channel = dataStream.getChannel();
|
||||||
MappedByteBuffer mmap = channel.map(MapMode.READ_ONLY, 0,
|
MappedByteBuffer mmap = channel.map(MapMode.READ_ONLY, 0,
|
||||||
Math.min(Integer.MAX_VALUE, channel.size()));
|
Math.min(Integer.MAX_VALUE, channel.size()));
|
||||||
if (LOG.isTraceEnabled()) {
|
LOG.trace("{}: created mmap of size {}", this, channel.size());
|
||||||
LOG.trace(this + ": created mmap of size " + channel.size());
|
|
||||||
}
|
|
||||||
return mmap;
|
return mmap;
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
LOG.warn(this + ": mmap error", e);
|
LOG.warn(this + ": mmap error", e);
|
||||||
|
|
|
@ -484,13 +484,9 @@ public class ShortCircuitShm {
|
||||||
POSIX.MMAP_PROT_READ | POSIX.MMAP_PROT_WRITE, true, mmappedLength);
|
POSIX.MMAP_PROT_READ | POSIX.MMAP_PROT_WRITE, true, mmappedLength);
|
||||||
this.slots = new Slot[mmappedLength / BYTES_PER_SLOT];
|
this.slots = new Slot[mmappedLength / BYTES_PER_SLOT];
|
||||||
this.allocatedSlots = new BitSet(slots.length);
|
this.allocatedSlots = new BitSet(slots.length);
|
||||||
if (LOG.isTraceEnabled()) {
|
LOG.trace("creating {}(shmId={}, mmappedLength={}, baseAddress={}, "
|
||||||
LOG.trace("creating " + this.getClass().getSimpleName() +
|
+ "slots.length={})", this.getClass().getSimpleName(), shmId,
|
||||||
"(shmId=" + shmId +
|
mmappedLength, String.format("%x", baseAddress), slots.length);
|
||||||
", mmappedLength=" + mmappedLength +
|
|
||||||
", baseAddress=" + String.format("%x", baseAddress) +
|
|
||||||
", slots.length=" + slots.length + ")");
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public final ShmId getShmId() {
|
public final ShmId getShmId() {
|
||||||
|
@ -615,9 +611,7 @@ public class ShortCircuitShm {
|
||||||
"tried to unregister slot " + slotIdx + ", which was not registered.");
|
"tried to unregister slot " + slotIdx + ", which was not registered.");
|
||||||
allocatedSlots.set(slotIdx, false);
|
allocatedSlots.set(slotIdx, false);
|
||||||
slots[slotIdx] = null;
|
slots[slotIdx] = null;
|
||||||
if (LOG.isTraceEnabled()) {
|
LOG.trace("{}: unregisterSlot {}", this, slotIdx);
|
||||||
LOG.trace(this + ": unregisterSlot " + slotIdx);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -36,18 +36,6 @@ import org.slf4j.LoggerFactory;
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public abstract class ByteArrayManager {
|
public abstract class ByteArrayManager {
|
||||||
static final Logger LOG = LoggerFactory.getLogger(ByteArrayManager.class);
|
static final Logger LOG = LoggerFactory.getLogger(ByteArrayManager.class);
|
||||||
private static final ThreadLocal<StringBuilder> DEBUG_MESSAGE =
|
|
||||||
new ThreadLocal<StringBuilder>() {
|
|
||||||
protected StringBuilder initialValue() {
|
|
||||||
return new StringBuilder();
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
private static void logDebugMessage() {
|
|
||||||
final StringBuilder b = DEBUG_MESSAGE.get();
|
|
||||||
LOG.debug(b.toString());
|
|
||||||
b.setLength(0);
|
|
||||||
}
|
|
||||||
|
|
||||||
static final int MIN_ARRAY_LENGTH = 32;
|
static final int MIN_ARRAY_LENGTH = 32;
|
||||||
static final byte[] EMPTY_BYTE_ARRAY = {};
|
static final byte[] EMPTY_BYTE_ARRAY = {};
|
||||||
|
@ -160,27 +148,18 @@ public abstract class ByteArrayManager {
|
||||||
* via the {@link FixedLengthManager#recycle(byte[])} method.
|
* via the {@link FixedLengthManager#recycle(byte[])} method.
|
||||||
*/
|
*/
|
||||||
synchronized byte[] allocate() throws InterruptedException {
|
synchronized byte[] allocate() throws InterruptedException {
|
||||||
if (LOG.isDebugEnabled()) {
|
LOG.debug(", {}", this);
|
||||||
DEBUG_MESSAGE.get().append(", ").append(this);
|
|
||||||
}
|
|
||||||
for(; numAllocated >= maxAllocated;) {
|
for(; numAllocated >= maxAllocated;) {
|
||||||
if (LOG.isDebugEnabled()) {
|
LOG.debug(": wait ...");
|
||||||
DEBUG_MESSAGE.get().append(": wait ...");
|
|
||||||
logDebugMessage();
|
|
||||||
}
|
|
||||||
|
|
||||||
wait();
|
wait();
|
||||||
|
|
||||||
if (LOG.isDebugEnabled()) {
|
LOG.debug("wake up: {}", this);
|
||||||
DEBUG_MESSAGE.get().append("wake up: ").append(this);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
numAllocated++;
|
numAllocated++;
|
||||||
|
|
||||||
final byte[] array = freeQueue.poll();
|
final byte[] array = freeQueue.poll();
|
||||||
if (LOG.isDebugEnabled()) {
|
LOG.debug(", recycled? {}", array != null);
|
||||||
DEBUG_MESSAGE.get().append(", recycled? ").append(array != null);
|
|
||||||
}
|
|
||||||
return array != null? array : new byte[byteArrayLength];
|
return array != null? array : new byte[byteArrayLength];
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -194,9 +173,7 @@ public abstract class ByteArrayManager {
|
||||||
synchronized int recycle(byte[] array) {
|
synchronized int recycle(byte[] array) {
|
||||||
Preconditions.checkNotNull(array);
|
Preconditions.checkNotNull(array);
|
||||||
Preconditions.checkArgument(array.length == byteArrayLength);
|
Preconditions.checkArgument(array.length == byteArrayLength);
|
||||||
if (LOG.isDebugEnabled()) {
|
LOG.debug(", {}", this);
|
||||||
DEBUG_MESSAGE.get().append(", ").append(this);
|
|
||||||
}
|
|
||||||
|
|
||||||
notify();
|
notify();
|
||||||
numAllocated--;
|
numAllocated--;
|
||||||
|
@ -207,9 +184,7 @@ public abstract class ByteArrayManager {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (freeQueue.size() < maxAllocated - numAllocated) {
|
if (freeQueue.size() < maxAllocated - numAllocated) {
|
||||||
if (LOG.isDebugEnabled()) {
|
LOG.debug(", freeQueue.offer");
|
||||||
DEBUG_MESSAGE.get().append(", freeQueue.offer");
|
|
||||||
}
|
|
||||||
freeQueue.offer(array);
|
freeQueue.offer(array);
|
||||||
}
|
}
|
||||||
return freeQueue.size();
|
return freeQueue.size();
|
||||||
|
@ -349,9 +324,7 @@ public abstract class ByteArrayManager {
|
||||||
public byte[] newByteArray(final int arrayLength)
|
public byte[] newByteArray(final int arrayLength)
|
||||||
throws InterruptedException {
|
throws InterruptedException {
|
||||||
Preconditions.checkArgument(arrayLength >= 0);
|
Preconditions.checkArgument(arrayLength >= 0);
|
||||||
if (LOG.isDebugEnabled()) {
|
LOG.debug("allocate({})", arrayLength);
|
||||||
DEBUG_MESSAGE.get().append("allocate(").append(arrayLength).append(")");
|
|
||||||
}
|
|
||||||
|
|
||||||
final byte[] array;
|
final byte[] array;
|
||||||
if (arrayLength == 0) {
|
if (arrayLength == 0) {
|
||||||
|
@ -365,18 +338,12 @@ public abstract class ByteArrayManager {
|
||||||
final FixedLengthManager manager =
|
final FixedLengthManager manager =
|
||||||
managers.get(powerOfTwo, aboveThreshold);
|
managers.get(powerOfTwo, aboveThreshold);
|
||||||
|
|
||||||
if (LOG.isDebugEnabled()) {
|
LOG.debug(": count={}, {}Threshold", count,
|
||||||
DEBUG_MESSAGE.get().append(": count=").append(count)
|
aboveThreshold ? "above" : "below");
|
||||||
.append(aboveThreshold? ", aboveThreshold": ", belowThreshold");
|
|
||||||
}
|
|
||||||
array = manager != null? manager.allocate(): new byte[powerOfTwo];
|
array = manager != null? manager.allocate(): new byte[powerOfTwo];
|
||||||
}
|
}
|
||||||
|
|
||||||
if (LOG.isDebugEnabled()) {
|
LOG.debug(", return byte[{}]", array.length);
|
||||||
DEBUG_MESSAGE.get().append(", return byte[")
|
|
||||||
.append(array.length).append("]");
|
|
||||||
logDebugMessage();
|
|
||||||
}
|
|
||||||
return array;
|
return array;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -391,10 +358,7 @@ public abstract class ByteArrayManager {
|
||||||
@Override
|
@Override
|
||||||
public int release(final byte[] array) {
|
public int release(final byte[] array) {
|
||||||
Preconditions.checkNotNull(array);
|
Preconditions.checkNotNull(array);
|
||||||
if (LOG.isDebugEnabled()) {
|
LOG.debug("recycle: array.length={}", array.length);
|
||||||
DEBUG_MESSAGE.get()
|
|
||||||
.append("recycle: array.length=").append(array.length);
|
|
||||||
}
|
|
||||||
|
|
||||||
final int freeQueueSize;
|
final int freeQueueSize;
|
||||||
if (array.length == 0) {
|
if (array.length == 0) {
|
||||||
|
@ -404,10 +368,7 @@ public abstract class ByteArrayManager {
|
||||||
freeQueueSize = manager == null? -1: manager.recycle(array);
|
freeQueueSize = manager == null? -1: manager.recycle(array);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (LOG.isDebugEnabled()) {
|
LOG.debug(", freeQueueSize={}", freeQueueSize);
|
||||||
DEBUG_MESSAGE.get().append(", freeQueueSize=").append(freeQueueSize);
|
|
||||||
logDebugMessage();
|
|
||||||
}
|
|
||||||
return freeQueueSize;
|
return freeQueueSize;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -140,9 +140,7 @@ final class TokenAspect<T extends FileSystem & Renewable> {
|
||||||
if (token != null) {
|
if (token != null) {
|
||||||
fs.setDelegationToken(token);
|
fs.setDelegationToken(token);
|
||||||
addRenewAction(fs);
|
addRenewAction(fs);
|
||||||
if (LOG.isDebugEnabled()) {
|
LOG.debug("Created new DT for {}", token.getService());
|
||||||
LOG.debug("Created new DT for {}", token.getService());
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
hasInitedToken = true;
|
hasInitedToken = true;
|
||||||
}
|
}
|
||||||
|
@ -155,9 +153,7 @@ final class TokenAspect<T extends FileSystem & Renewable> {
|
||||||
synchronized void initDelegationToken(UserGroupInformation ugi) {
|
synchronized void initDelegationToken(UserGroupInformation ugi) {
|
||||||
Token<?> token = selectDelegationToken(ugi);
|
Token<?> token = selectDelegationToken(ugi);
|
||||||
if (token != null) {
|
if (token != null) {
|
||||||
if (LOG.isDebugEnabled()) {
|
LOG.debug("Found existing DT for {}", token.getService());
|
||||||
LOG.debug("Found existing DT for {}", token.getService());
|
|
||||||
}
|
|
||||||
fs.setDelegationToken(token);
|
fs.setDelegationToken(token);
|
||||||
hasInitedToken = true;
|
hasInitedToken = true;
|
||||||
}
|
}
|
||||||
|
|
|
@ -182,9 +182,7 @@ public class URLConnectionFactory {
|
||||||
public URLConnection openConnection(URL url, boolean isSpnego)
|
public URLConnection openConnection(URL url, boolean isSpnego)
|
||||||
throws IOException, AuthenticationException {
|
throws IOException, AuthenticationException {
|
||||||
if (isSpnego) {
|
if (isSpnego) {
|
||||||
if (LOG.isDebugEnabled()) {
|
LOG.debug("open AuthenticatedURL connection {}", url);
|
||||||
LOG.debug("open AuthenticatedURL connection {}", url);
|
|
||||||
}
|
|
||||||
UserGroupInformation.getCurrentUser().checkTGTAndReloginFromKeytab();
|
UserGroupInformation.getCurrentUser().checkTGTAndReloginFromKeytab();
|
||||||
final AuthenticatedURL.Token authToken = new AuthenticatedURL.Token();
|
final AuthenticatedURL.Token authToken = new AuthenticatedURL.Token();
|
||||||
return new AuthenticatedURL(new KerberosUgiAuthenticator(),
|
return new AuthenticatedURL(new KerberosUgiAuthenticator(),
|
||||||
|
|
|
@ -240,16 +240,12 @@ public class WebHdfsFileSystem extends FileSystem
|
||||||
// refetch tokens. even if ugi has credentials, don't attempt
|
// refetch tokens. even if ugi has credentials, don't attempt
|
||||||
// to get another token to match hdfs/rpc behavior
|
// to get another token to match hdfs/rpc behavior
|
||||||
if (token != null) {
|
if (token != null) {
|
||||||
if(LOG.isDebugEnabled()) {
|
LOG.debug("Using UGI token: {}", token);
|
||||||
LOG.debug("Using UGI token: {}", token);
|
|
||||||
}
|
|
||||||
canRefreshDelegationToken = false;
|
canRefreshDelegationToken = false;
|
||||||
} else {
|
} else {
|
||||||
token = getDelegationToken(null);
|
token = getDelegationToken(null);
|
||||||
if (token != null) {
|
if (token != null) {
|
||||||
if(LOG.isDebugEnabled()) {
|
LOG.debug("Fetched new token: {}", token);
|
||||||
LOG.debug("Fetched new token: {}", token);
|
|
||||||
}
|
|
||||||
} else { // security is disabled
|
} else { // security is disabled
|
||||||
canRefreshDelegationToken = false;
|
canRefreshDelegationToken = false;
|
||||||
}
|
}
|
||||||
|
@ -264,9 +260,7 @@ public class WebHdfsFileSystem extends FileSystem
|
||||||
boolean replaced = false;
|
boolean replaced = false;
|
||||||
if (canRefreshDelegationToken) {
|
if (canRefreshDelegationToken) {
|
||||||
Token<?> token = getDelegationToken(null);
|
Token<?> token = getDelegationToken(null);
|
||||||
if(LOG.isDebugEnabled()) {
|
LOG.debug("Replaced expired token: {}", token);
|
||||||
LOG.debug("Replaced expired token: {}", token);
|
|
||||||
}
|
|
||||||
setDelegationToken(token);
|
setDelegationToken(token);
|
||||||
replaced = (token != null);
|
replaced = (token != null);
|
||||||
}
|
}
|
||||||
|
@ -451,9 +445,7 @@ public class WebHdfsFileSystem extends FileSystem
|
||||||
InetSocketAddress nnAddr = getCurrentNNAddr();
|
InetSocketAddress nnAddr = getCurrentNNAddr();
|
||||||
final URL url = new URL(getTransportScheme(), nnAddr.getHostName(),
|
final URL url = new URL(getTransportScheme(), nnAddr.getHostName(),
|
||||||
nnAddr.getPort(), path + '?' + query);
|
nnAddr.getPort(), path + '?' + query);
|
||||||
if (LOG.isTraceEnabled()) {
|
LOG.trace("url={}", url);
|
||||||
LOG.trace("url={}", url);
|
|
||||||
}
|
|
||||||
return url;
|
return url;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -488,9 +480,7 @@ public class WebHdfsFileSystem extends FileSystem
|
||||||
+ Param.toSortedString("&", getAuthParameters(op))
|
+ Param.toSortedString("&", getAuthParameters(op))
|
||||||
+ Param.toSortedString("&", parameters);
|
+ Param.toSortedString("&", parameters);
|
||||||
final URL url = getNamenodeURL(path, query);
|
final URL url = getNamenodeURL(path, query);
|
||||||
if (LOG.isTraceEnabled()) {
|
LOG.trace("url={}", url);
|
||||||
LOG.trace("url={}", url);
|
|
||||||
}
|
|
||||||
return url;
|
return url;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -778,9 +768,7 @@ public class WebHdfsFileSystem extends FileSystem
|
||||||
} catch (Exception e) { // catch json parser errors
|
} catch (Exception e) { // catch json parser errors
|
||||||
final IOException ioe =
|
final IOException ioe =
|
||||||
new IOException("Response decoding failure: "+e.toString(), e);
|
new IOException("Response decoding failure: "+e.toString(), e);
|
||||||
if (LOG.isDebugEnabled()) {
|
LOG.debug("Response decoding failure.", e);
|
||||||
LOG.debug("Response decoding failure: {}", e.toString(), e);
|
|
||||||
}
|
|
||||||
throw ioe;
|
throw ioe;
|
||||||
} finally {
|
} finally {
|
||||||
conn.disconnect();
|
conn.disconnect();
|
||||||
|
@ -1251,9 +1239,7 @@ public class WebHdfsFileSystem extends FileSystem
|
||||||
cancelDelegationToken(delegationToken);
|
cancelDelegationToken(delegationToken);
|
||||||
}
|
}
|
||||||
} catch (IOException ioe) {
|
} catch (IOException ioe) {
|
||||||
if (LOG.isDebugEnabled()) {
|
LOG.debug("Token cancel failed: ", ioe);
|
||||||
LOG.debug("Token cancel failed: ", ioe);
|
|
||||||
}
|
|
||||||
} finally {
|
} finally {
|
||||||
super.close();
|
super.close();
|
||||||
}
|
}
|
||||||
|
|
|
@ -655,6 +655,9 @@ Release 2.8.0 - UNRELEASED
|
||||||
HDFS-8696. Make the lower and higher watermark in the DN Netty server
|
HDFS-8696. Make the lower and higher watermark in the DN Netty server
|
||||||
configurable. (Xiaobing Zhou via wheat9)
|
configurable. (Xiaobing Zhou via wheat9)
|
||||||
|
|
||||||
|
HDFS-8971. Remove guards when calling LOG.debug() and LOG.trace() in client
|
||||||
|
package. (Mingliang Liu via wheat9)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than
|
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than
|
||||||
|
|
Loading…
Reference in New Issue