diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 991526c8b00..deb1a55114a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -241,9 +241,6 @@ Trunk (Unreleased) HDFS-5431. Support cachepool-based limit management in path-based caching (awang via cmccabe) - HDFS-5634. Allow BlockReaderLocal to switch between checksumming and not - (cmccabe) - OPTIMIZATIONS HDFS-5349. DNA_CACHE and DNA_UNCACHE should be by blockId only. (cmccabe) @@ -754,6 +751,9 @@ Release 2.4.0 - UNRELEASED FSEditLogOp; change FSEditLogOpCodes.fromByte(..) to be more efficient; and change Some fields in FSEditLog to final. (szetszwo) + HDFS-5634. Allow BlockReaderLocal to switch between checksumming and not + (cmccabe) + OPTIMIZATIONS HDFS-5239. Allow FSNamesystem lock fairness to be configurable (daryn) @@ -809,6 +809,8 @@ Release 2.4.0 - UNRELEASED HDFS-5580. Fix infinite loop in Balancer.waitForMoveCompletion. (Binglin Chang via junping_du) + HDFS-5676. fix inconsistent synchronization of CachingStrategy (cmccabe) + Release 2.3.0 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java index 0ab51c7e716..bdc660d484a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java @@ -228,7 +228,7 @@ void addToDeadNodes(DatanodeInfo dnInfo) { dfsClient.getConf().shortCircuitStreamsCacheSize, dfsClient.getConf().shortCircuitStreamsCacheExpiryMs); this.cachingStrategy = - dfsClient.getDefaultReadCachingStrategy().duplicate(); + dfsClient.getDefaultReadCachingStrategy(); openInfo(); } @@ -574,7 +574,7 @@ private synchronized DatanodeInfo blockSeekTo(long target) throws IOException { Token accessToken = targetBlock.getBlockToken(); blockReader = getBlockReader(targetAddr, chosenNode, src, blk, accessToken, offsetIntoBlock, blk.getNumBytes() - offsetIntoBlock, - buffersize, verifyChecksum, dfsClient.clientName); + buffersize, verifyChecksum, dfsClient.clientName, cachingStrategy); if(connectFailedOnce) { DFSClient.LOG.info("Successfully connected to " + targetAddr + " for " + blk); @@ -928,7 +928,11 @@ private void fetchBlockByteRange(LocatedBlock block, long start, long end, // cached block locations may have been updated by chooseDataNode() // or fetchBlockAt(). Always get the latest list of locations at the // start of the loop. - block = getBlockAt(block.getStartOffset(), false); + CachingStrategy curCachingStrategy; + synchronized (this) { + block = getBlockAt(block.getStartOffset(), false); + curCachingStrategy = cachingStrategy; + } DNAddrPair retval = chooseDataNode(block); DatanodeInfo chosenNode = retval.info; InetSocketAddress targetAddr = retval.addr; @@ -940,7 +944,7 @@ private void fetchBlockByteRange(LocatedBlock block, long start, long end, int len = (int) (end - start + 1); reader = getBlockReader(targetAddr, chosenNode, src, block.getBlock(), blockToken, start, len, buffersize, verifyChecksum, - dfsClient.clientName); + dfsClient.clientName, curCachingStrategy); int nread = reader.readAll(buf, offset, len); if (nread != len) { throw new IOException("truncated return from reader.read(): " + @@ -1053,6 +1057,7 @@ private Peer newTcpPeer(InetSocketAddress addr) throws IOException { * @param bufferSize The IO buffer size (not the client buffer size) * @param verifyChecksum Whether to verify checksum * @param clientName Client name + * @param CachingStrategy caching strategy to use * @return New BlockReader instance */ protected BlockReader getBlockReader(InetSocketAddress dnAddr, @@ -1064,7 +1069,8 @@ protected BlockReader getBlockReader(InetSocketAddress dnAddr, long len, int bufferSize, boolean verifyChecksum, - String clientName) + String clientName, + CachingStrategy curCachingStrategy) throws IOException { // Firstly, we check to see if we have cached any file descriptors for // local blocks. If so, we can just re-use those file descriptors. @@ -1084,7 +1090,7 @@ protected BlockReader getBlockReader(InetSocketAddress dnAddr, setBlockMetadataHeader(BlockMetadataHeader. preadHeader(fis[1].getChannel())). setFileInputStreamCache(fileInputStreamCache). - setCachingStrategy(cachingStrategy). + setCachingStrategy(curCachingStrategy). build(); } @@ -1119,7 +1125,7 @@ protected BlockReader getBlockReader(InetSocketAddress dnAddr, dfsClient.getConf(), file, block, blockToken, startOffset, len, verifyChecksum, clientName, peer, chosenNode, dsFactory, peerCache, fileInputStreamCache, - allowShortCircuitLocalReads, cachingStrategy); + allowShortCircuitLocalReads, curCachingStrategy); return reader; } catch (IOException ex) { DFSClient.LOG.debug("Error making BlockReader with DomainSocket. " + @@ -1142,7 +1148,7 @@ protected BlockReader getBlockReader(InetSocketAddress dnAddr, dfsClient.getConf(), file, block, blockToken, startOffset, len, verifyChecksum, clientName, peer, chosenNode, dsFactory, peerCache, fileInputStreamCache, - allowShortCircuitLocalReads, cachingStrategy); + allowShortCircuitLocalReads, curCachingStrategy); return reader; } catch (IOException e) { DFSClient.LOG.warn("failed to connect to " + domSock, e); @@ -1166,7 +1172,7 @@ protected BlockReader getBlockReader(InetSocketAddress dnAddr, dfsClient.getConf(), file, block, blockToken, startOffset, len, verifyChecksum, clientName, peer, chosenNode, dsFactory, peerCache, fileInputStreamCache, false, - cachingStrategy); + curCachingStrategy); return reader; } catch (IOException ex) { DFSClient.LOG.debug("Error making BlockReader. Closing stale " + @@ -1186,7 +1192,7 @@ protected BlockReader getBlockReader(InetSocketAddress dnAddr, dfsClient.getConf(), file, block, blockToken, startOffset, len, verifyChecksum, clientName, peer, chosenNode, dsFactory, peerCache, fileInputStreamCache, false, - cachingStrategy); + curCachingStrategy); } @@ -1460,14 +1466,18 @@ private synchronized void closeCurrentBlockReader() { @Override public synchronized void setReadahead(Long readahead) throws IOException { - this.cachingStrategy.setReadahead(readahead); + this.cachingStrategy = + new CachingStrategy.Builder(this.cachingStrategy). + setReadahead(readahead).build(); closeCurrentBlockReader(); } @Override public synchronized void setDropBehind(Boolean dropBehind) throws IOException { - this.cachingStrategy.setDropBehind(dropBehind); + this.cachingStrategy = + new CachingStrategy.Builder(this.cachingStrategy). + setDropBehind(dropBehind).build(); closeCurrentBlockReader(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java index 26e2fc2247e..4f4e71a944d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java @@ -150,7 +150,7 @@ public class DFSOutputStream extends FSOutputSummer private Progressable progress; private final short blockReplication; // replication factor of file private boolean shouldSyncBlock = false; // force blocks to disk upon close - private CachingStrategy cachingStrategy; + private AtomicReference cachingStrategy; private boolean failPacket = false; private static class Packet { @@ -1183,7 +1183,7 @@ private boolean createBlockOutputStream(DatanodeInfo[] nodes, long newGS, new Sender(out).writeBlock(block, accessToken, dfsClient.clientName, nodes, null, recoveryFlag? stage.getRecoveryStage() : stage, nodes.length, block.getNumBytes(), bytesSent, newGS, checksum, - cachingStrategy); + cachingStrategy.get()); // receive ack for connect BlockOpResponseProto resp = BlockOpResponseProto.parseFrom( @@ -1378,8 +1378,8 @@ private DFSOutputStream(DFSClient dfsClient, String src, Progressable progress, this.blockSize = stat.getBlockSize(); this.blockReplication = stat.getReplication(); this.progress = progress; - this.cachingStrategy = - dfsClient.getDefaultWriteCachingStrategy().duplicate(); + this.cachingStrategy = new AtomicReference( + dfsClient.getDefaultWriteCachingStrategy()); if ((progress != null) && DFSClient.LOG.isDebugEnabled()) { DFSClient.LOG.debug( "Set non-null progress callback on DFSOutputStream " + src); @@ -1993,7 +1993,14 @@ synchronized Token getBlockToken() { @Override public void setDropBehind(Boolean dropBehind) throws IOException { - this.cachingStrategy.setDropBehind(dropBehind); + CachingStrategy prevStrategy, nextStrategy; + // CachingStrategy is immutable. So build a new CachingStrategy with the + // modifications we want, and compare-and-swap it in. + do { + prevStrategy = this.cachingStrategy.get(); + nextStrategy = new CachingStrategy.Builder(prevStrategy). + setDropBehind(dropBehind).build(); + } while (!this.cachingStrategy.compareAndSet(prevStrategy, nextStrategy)); } @VisibleForTesting diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/CachingStrategy.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/CachingStrategy.java index 3795dbba3cb..215df134669 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/CachingStrategy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/CachingStrategy.java @@ -21,8 +21,8 @@ * The caching strategy we should use for an HDFS read or write operation. */ public class CachingStrategy { - private Boolean dropBehind; // null = use server defaults - private Long readahead; // null = use server defaults + private final Boolean dropBehind; // null = use server defaults + private final Long readahead; // null = use server defaults public static CachingStrategy newDefaultStrategy() { return new CachingStrategy(null, null); @@ -32,8 +32,28 @@ public static CachingStrategy newDropBehind() { return new CachingStrategy(true, null); } - public CachingStrategy duplicate() { - return new CachingStrategy(this.dropBehind, this.readahead); + public static class Builder { + private Boolean dropBehind; + private Long readahead; + + public Builder(CachingStrategy prev) { + this.dropBehind = prev.dropBehind; + this.readahead = prev.readahead; + } + + public Builder setDropBehind(Boolean dropBehind) { + this.dropBehind = dropBehind; + return this; + } + + public Builder setReadahead(Long readahead) { + this.readahead = readahead; + return this; + } + + public CachingStrategy build() { + return new CachingStrategy(dropBehind, readahead); + } } public CachingStrategy(Boolean dropBehind, Long readahead) { @@ -45,18 +65,10 @@ public Boolean getDropBehind() { return dropBehind; } - public void setDropBehind(Boolean dropBehind) { - this.dropBehind = dropBehind; - } - public Long getReadahead() { return readahead; } - public void setReadahead(Long readahead) { - this.readahead = readahead; - } - public String toString() { return "CachingStrategy(dropBehind=" + dropBehind + ", readahead=" + readahead + ")"; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java index 837e5523210..cffd91dfa47 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java @@ -28,6 +28,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; +import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; import org.apache.hadoop.hdfs.net.Peer; import org.apache.hadoop.security.token.Token; import org.junit.Assert; @@ -138,7 +139,8 @@ public void testReadFromOneDN() throws Exception { Matchers.anyLong(), Matchers.anyInt(), Matchers.anyBoolean(), - Matchers.anyString()); + Matchers.anyString(), + (CachingStrategy)Matchers.anyObject()); // Initial read pread(in, 0, dataBuf, 0, dataBuf.length, authenticData);