From 2ebc126b9e6a8a4ee4a533349aee77425b167074 Mon Sep 17 00:00:00 2001 From: Colin McCabe Date: Wed, 18 Dec 2013 23:30:08 +0000 Subject: [PATCH] HDFS-5676. fix inconsistent synchronization of CachingStrategy (cmccabe) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1552164 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 6 ++-- .../apache/hadoop/hdfs/DFSInputStream.java | 34 +++++++++++------- .../apache/hadoop/hdfs/DFSOutputStream.java | 17 ++++++--- .../hdfs/server/datanode/CachingStrategy.java | 36 ++++++++++++------- .../org/apache/hadoop/hdfs/TestConnCache.java | 4 ++- 5 files changed, 65 insertions(+), 32 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index a4313288ec9..bb2341210fe 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -178,6 +178,9 @@ Release 2.4.0 - UNRELEASED FSEditLogOp; change FSEditLogOpCodes.fromByte(..) to be more efficient; and change Some fields in FSEditLog to final. (szetszwo) + HDFS-5634. Allow BlockReaderLocal to switch between checksumming and not + (cmccabe) + OPTIMIZATIONS HDFS-5239. Allow FSNamesystem lock fairness to be configurable (daryn) @@ -230,8 +233,7 @@ Release 2.4.0 - UNRELEASED HDFS-5580. Fix infinite loop in Balancer.waitForMoveCompletion. (Binglin Chang via junping_du) - HDFS-5634. Allow BlockReaderLocal to switch between checksumming and not - (cmccabe) + HDFS-5676. fix inconsistent synchronization of CachingStrategy (cmccabe) Release 2.3.0 - UNRELEASED diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java index ad2cb7adce1..fe5270ea906 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java @@ -228,7 +228,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, dfsClient.getConf().shortCircuitStreamsCacheSize, dfsClient.getConf().shortCircuitStreamsCacheExpiryMs); this.cachingStrategy = - dfsClient.getDefaultReadCachingStrategy().duplicate(); + dfsClient.getDefaultReadCachingStrategy(); openInfo(); } @@ -574,7 +574,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, Token accessToken = targetBlock.getBlockToken(); blockReader = getBlockReader(targetAddr, chosenNode, src, blk, accessToken, offsetIntoBlock, blk.getNumBytes() - offsetIntoBlock, - buffersize, verifyChecksum, dfsClient.clientName); + buffersize, verifyChecksum, dfsClient.clientName, cachingStrategy); if(connectFailedOnce) { DFSClient.LOG.info("Successfully connected to " + targetAddr + " for " + blk); @@ -928,7 +928,11 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, // cached block locations may have been updated by chooseDataNode() // or fetchBlockAt(). Always get the latest list of locations at the // start of the loop. - block = getBlockAt(block.getStartOffset(), false); + CachingStrategy curCachingStrategy; + synchronized (this) { + block = getBlockAt(block.getStartOffset(), false); + curCachingStrategy = cachingStrategy; + } DNAddrPair retval = chooseDataNode(block); DatanodeInfo chosenNode = retval.info; InetSocketAddress targetAddr = retval.addr; @@ -940,7 +944,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, int len = (int) (end - start + 1); reader = getBlockReader(targetAddr, chosenNode, src, block.getBlock(), blockToken, start, len, buffersize, verifyChecksum, - dfsClient.clientName); + dfsClient.clientName, curCachingStrategy); int nread = reader.readAll(buf, offset, len); if (nread != len) { throw new IOException("truncated return from reader.read(): " + @@ -1053,6 +1057,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, * @param bufferSize The IO buffer size (not the client buffer size) * @param verifyChecksum Whether to verify checksum * @param clientName Client name + * @param CachingStrategy caching strategy to use * @return New BlockReader instance */ protected BlockReader getBlockReader(InetSocketAddress dnAddr, @@ -1064,7 +1069,8 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, long len, int bufferSize, boolean verifyChecksum, - String clientName) + String clientName, + CachingStrategy curCachingStrategy) throws IOException { // Firstly, we check to see if we have cached any file descriptors for // local blocks. If so, we can just re-use those file descriptors. @@ -1084,7 +1090,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, setBlockMetadataHeader(BlockMetadataHeader. preadHeader(fis[1].getChannel())). setFileInputStreamCache(fileInputStreamCache). - setCachingStrategy(cachingStrategy). + setCachingStrategy(curCachingStrategy). build(); } @@ -1119,7 +1125,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, dfsClient.getConf(), file, block, blockToken, startOffset, len, verifyChecksum, clientName, peer, chosenNode, dsFactory, peerCache, fileInputStreamCache, - allowShortCircuitLocalReads, cachingStrategy); + allowShortCircuitLocalReads, curCachingStrategy); return reader; } catch (IOException ex) { DFSClient.LOG.debug("Error making BlockReader with DomainSocket. " + @@ -1142,7 +1148,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, dfsClient.getConf(), file, block, blockToken, startOffset, len, verifyChecksum, clientName, peer, chosenNode, dsFactory, peerCache, fileInputStreamCache, - allowShortCircuitLocalReads, cachingStrategy); + allowShortCircuitLocalReads, curCachingStrategy); return reader; } catch (IOException e) { DFSClient.LOG.warn("failed to connect to " + domSock, e); @@ -1166,7 +1172,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, dfsClient.getConf(), file, block, blockToken, startOffset, len, verifyChecksum, clientName, peer, chosenNode, dsFactory, peerCache, fileInputStreamCache, false, - cachingStrategy); + curCachingStrategy); return reader; } catch (IOException ex) { DFSClient.LOG.debug("Error making BlockReader. Closing stale " + @@ -1186,7 +1192,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, dfsClient.getConf(), file, block, blockToken, startOffset, len, verifyChecksum, clientName, peer, chosenNode, dsFactory, peerCache, fileInputStreamCache, false, - cachingStrategy); + curCachingStrategy); } @@ -1460,14 +1466,18 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, @Override public synchronized void setReadahead(Long readahead) throws IOException { - this.cachingStrategy.setReadahead(readahead); + this.cachingStrategy = + new CachingStrategy.Builder(this.cachingStrategy). + setReadahead(readahead).build(); closeCurrentBlockReader(); } @Override public synchronized void setDropBehind(Boolean dropBehind) throws IOException { - this.cachingStrategy.setDropBehind(dropBehind); + this.cachingStrategy = + new CachingStrategy.Builder(this.cachingStrategy). + setDropBehind(dropBehind).build(); closeCurrentBlockReader(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java index 635dff190ad..60e6c274345 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java @@ -150,7 +150,7 @@ public class DFSOutputStream extends FSOutputSummer private Progressable progress; private final short blockReplication; // replication factor of file private boolean shouldSyncBlock = false; // force blocks to disk upon close - private CachingStrategy cachingStrategy; + private AtomicReference cachingStrategy; private boolean failPacket = false; private class Packet { @@ -1170,7 +1170,7 @@ public class DFSOutputStream extends FSOutputSummer new Sender(out).writeBlock(block, accessToken, dfsClient.clientName, nodes, null, recoveryFlag? stage.getRecoveryStage() : stage, nodes.length, block.getNumBytes(), bytesSent, newGS, checksum, - cachingStrategy); + cachingStrategy.get()); // receive ack for connect BlockOpResponseProto resp = BlockOpResponseProto.parseFrom( @@ -1364,8 +1364,8 @@ public class DFSOutputStream extends FSOutputSummer this.blockSize = stat.getBlockSize(); this.blockReplication = stat.getReplication(); this.progress = progress; - this.cachingStrategy = - dfsClient.getDefaultWriteCachingStrategy().duplicate(); + this.cachingStrategy = new AtomicReference( + dfsClient.getDefaultWriteCachingStrategy()); if ((progress != null) && DFSClient.LOG.isDebugEnabled()) { DFSClient.LOG.debug( "Set non-null progress callback on DFSOutputStream " + src); @@ -1982,7 +1982,14 @@ public class DFSOutputStream extends FSOutputSummer @Override public void setDropBehind(Boolean dropBehind) throws IOException { - this.cachingStrategy.setDropBehind(dropBehind); + CachingStrategy prevStrategy, nextStrategy; + // CachingStrategy is immutable. So build a new CachingStrategy with the + // modifications we want, and compare-and-swap it in. + do { + prevStrategy = this.cachingStrategy.get(); + nextStrategy = new CachingStrategy.Builder(prevStrategy). + setDropBehind(dropBehind).build(); + } while (!this.cachingStrategy.compareAndSet(prevStrategy, nextStrategy)); } @VisibleForTesting diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/CachingStrategy.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/CachingStrategy.java index 3795dbba3cb..215df134669 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/CachingStrategy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/CachingStrategy.java @@ -21,8 +21,8 @@ package org.apache.hadoop.hdfs.server.datanode; * The caching strategy we should use for an HDFS read or write operation. */ public class CachingStrategy { - private Boolean dropBehind; // null = use server defaults - private Long readahead; // null = use server defaults + private final Boolean dropBehind; // null = use server defaults + private final Long readahead; // null = use server defaults public static CachingStrategy newDefaultStrategy() { return new CachingStrategy(null, null); @@ -32,8 +32,28 @@ public class CachingStrategy { return new CachingStrategy(true, null); } - public CachingStrategy duplicate() { - return new CachingStrategy(this.dropBehind, this.readahead); + public static class Builder { + private Boolean dropBehind; + private Long readahead; + + public Builder(CachingStrategy prev) { + this.dropBehind = prev.dropBehind; + this.readahead = prev.readahead; + } + + public Builder setDropBehind(Boolean dropBehind) { + this.dropBehind = dropBehind; + return this; + } + + public Builder setReadahead(Long readahead) { + this.readahead = readahead; + return this; + } + + public CachingStrategy build() { + return new CachingStrategy(dropBehind, readahead); + } } public CachingStrategy(Boolean dropBehind, Long readahead) { @@ -45,18 +65,10 @@ public class CachingStrategy { return dropBehind; } - public void setDropBehind(Boolean dropBehind) { - this.dropBehind = dropBehind; - } - public Long getReadahead() { return readahead; } - public void setReadahead(Long readahead) { - this.readahead = readahead; - } - public String toString() { return "CachingStrategy(dropBehind=" + dropBehind + ", readahead=" + readahead + ")"; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java index 837e5523210..cffd91dfa47 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java @@ -28,6 +28,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; +import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; import org.apache.hadoop.hdfs.net.Peer; import org.apache.hadoop.security.token.Token; import org.junit.Assert; @@ -138,7 +139,8 @@ public class TestConnCache { Matchers.anyLong(), Matchers.anyInt(), Matchers.anyBoolean(), - Matchers.anyString()); + Matchers.anyString(), + (CachingStrategy)Matchers.anyObject()); // Initial read pread(in, 0, dataBuf, 0, dataBuf.length, authenticData);