HDFS-5676. fix inconsistent synchronization of CachingStrategy (cmccabe)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1552164 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Colin McCabe 2013-12-18 23:30:08 +00:00
parent 48a9215359
commit 2ebc126b9e
5 changed files with 65 additions and 32 deletions

View File

@ -178,6 +178,9 @@ Release 2.4.0 - UNRELEASED
FSEditLogOp; change FSEditLogOpCodes.fromByte(..) to be more efficient; and FSEditLogOp; change FSEditLogOpCodes.fromByte(..) to be more efficient; and
change Some fields in FSEditLog to final. (szetszwo) change Some fields in FSEditLog to final. (szetszwo)
HDFS-5634. Allow BlockReaderLocal to switch between checksumming and not
(cmccabe)
OPTIMIZATIONS OPTIMIZATIONS
HDFS-5239. Allow FSNamesystem lock fairness to be configurable (daryn) HDFS-5239. Allow FSNamesystem lock fairness to be configurable (daryn)
@ -230,8 +233,7 @@ Release 2.4.0 - UNRELEASED
HDFS-5580. Fix infinite loop in Balancer.waitForMoveCompletion. HDFS-5580. Fix infinite loop in Balancer.waitForMoveCompletion.
(Binglin Chang via junping_du) (Binglin Chang via junping_du)
HDFS-5634. Allow BlockReaderLocal to switch between checksumming and not HDFS-5676. fix inconsistent synchronization of CachingStrategy (cmccabe)
(cmccabe)
Release 2.3.0 - UNRELEASED Release 2.3.0 - UNRELEASED

View File

@ -228,7 +228,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
dfsClient.getConf().shortCircuitStreamsCacheSize, dfsClient.getConf().shortCircuitStreamsCacheSize,
dfsClient.getConf().shortCircuitStreamsCacheExpiryMs); dfsClient.getConf().shortCircuitStreamsCacheExpiryMs);
this.cachingStrategy = this.cachingStrategy =
dfsClient.getDefaultReadCachingStrategy().duplicate(); dfsClient.getDefaultReadCachingStrategy();
openInfo(); openInfo();
} }
@ -574,7 +574,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
Token<BlockTokenIdentifier> accessToken = targetBlock.getBlockToken(); Token<BlockTokenIdentifier> accessToken = targetBlock.getBlockToken();
blockReader = getBlockReader(targetAddr, chosenNode, src, blk, blockReader = getBlockReader(targetAddr, chosenNode, src, blk,
accessToken, offsetIntoBlock, blk.getNumBytes() - offsetIntoBlock, accessToken, offsetIntoBlock, blk.getNumBytes() - offsetIntoBlock,
buffersize, verifyChecksum, dfsClient.clientName); buffersize, verifyChecksum, dfsClient.clientName, cachingStrategy);
if(connectFailedOnce) { if(connectFailedOnce) {
DFSClient.LOG.info("Successfully connected to " + targetAddr + DFSClient.LOG.info("Successfully connected to " + targetAddr +
" for " + blk); " for " + blk);
@ -928,7 +928,11 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
// cached block locations may have been updated by chooseDataNode() // cached block locations may have been updated by chooseDataNode()
// or fetchBlockAt(). Always get the latest list of locations at the // or fetchBlockAt(). Always get the latest list of locations at the
// start of the loop. // start of the loop.
block = getBlockAt(block.getStartOffset(), false); CachingStrategy curCachingStrategy;
synchronized (this) {
block = getBlockAt(block.getStartOffset(), false);
curCachingStrategy = cachingStrategy;
}
DNAddrPair retval = chooseDataNode(block); DNAddrPair retval = chooseDataNode(block);
DatanodeInfo chosenNode = retval.info; DatanodeInfo chosenNode = retval.info;
InetSocketAddress targetAddr = retval.addr; InetSocketAddress targetAddr = retval.addr;
@ -940,7 +944,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
int len = (int) (end - start + 1); int len = (int) (end - start + 1);
reader = getBlockReader(targetAddr, chosenNode, src, block.getBlock(), reader = getBlockReader(targetAddr, chosenNode, src, block.getBlock(),
blockToken, start, len, buffersize, verifyChecksum, blockToken, start, len, buffersize, verifyChecksum,
dfsClient.clientName); dfsClient.clientName, curCachingStrategy);
int nread = reader.readAll(buf, offset, len); int nread = reader.readAll(buf, offset, len);
if (nread != len) { if (nread != len) {
throw new IOException("truncated return from reader.read(): " + throw new IOException("truncated return from reader.read(): " +
@ -1053,6 +1057,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
* @param bufferSize The IO buffer size (not the client buffer size) * @param bufferSize The IO buffer size (not the client buffer size)
* @param verifyChecksum Whether to verify checksum * @param verifyChecksum Whether to verify checksum
* @param clientName Client name * @param clientName Client name
* @param CachingStrategy caching strategy to use
* @return New BlockReader instance * @return New BlockReader instance
*/ */
protected BlockReader getBlockReader(InetSocketAddress dnAddr, protected BlockReader getBlockReader(InetSocketAddress dnAddr,
@ -1064,7 +1069,8 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
long len, long len,
int bufferSize, int bufferSize,
boolean verifyChecksum, boolean verifyChecksum,
String clientName) String clientName,
CachingStrategy curCachingStrategy)
throws IOException { throws IOException {
// Firstly, we check to see if we have cached any file descriptors for // Firstly, we check to see if we have cached any file descriptors for
// local blocks. If so, we can just re-use those file descriptors. // local blocks. If so, we can just re-use those file descriptors.
@ -1084,7 +1090,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
setBlockMetadataHeader(BlockMetadataHeader. setBlockMetadataHeader(BlockMetadataHeader.
preadHeader(fis[1].getChannel())). preadHeader(fis[1].getChannel())).
setFileInputStreamCache(fileInputStreamCache). setFileInputStreamCache(fileInputStreamCache).
setCachingStrategy(cachingStrategy). setCachingStrategy(curCachingStrategy).
build(); build();
} }
@ -1119,7 +1125,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
dfsClient.getConf(), file, block, blockToken, startOffset, dfsClient.getConf(), file, block, blockToken, startOffset,
len, verifyChecksum, clientName, peer, chosenNode, len, verifyChecksum, clientName, peer, chosenNode,
dsFactory, peerCache, fileInputStreamCache, dsFactory, peerCache, fileInputStreamCache,
allowShortCircuitLocalReads, cachingStrategy); allowShortCircuitLocalReads, curCachingStrategy);
return reader; return reader;
} catch (IOException ex) { } catch (IOException ex) {
DFSClient.LOG.debug("Error making BlockReader with DomainSocket. " + DFSClient.LOG.debug("Error making BlockReader with DomainSocket. " +
@ -1142,7 +1148,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
dfsClient.getConf(), file, block, blockToken, startOffset, dfsClient.getConf(), file, block, blockToken, startOffset,
len, verifyChecksum, clientName, peer, chosenNode, len, verifyChecksum, clientName, peer, chosenNode,
dsFactory, peerCache, fileInputStreamCache, dsFactory, peerCache, fileInputStreamCache,
allowShortCircuitLocalReads, cachingStrategy); allowShortCircuitLocalReads, curCachingStrategy);
return reader; return reader;
} catch (IOException e) { } catch (IOException e) {
DFSClient.LOG.warn("failed to connect to " + domSock, e); DFSClient.LOG.warn("failed to connect to " + domSock, e);
@ -1166,7 +1172,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
dfsClient.getConf(), file, block, blockToken, startOffset, dfsClient.getConf(), file, block, blockToken, startOffset,
len, verifyChecksum, clientName, peer, chosenNode, len, verifyChecksum, clientName, peer, chosenNode,
dsFactory, peerCache, fileInputStreamCache, false, dsFactory, peerCache, fileInputStreamCache, false,
cachingStrategy); curCachingStrategy);
return reader; return reader;
} catch (IOException ex) { } catch (IOException ex) {
DFSClient.LOG.debug("Error making BlockReader. Closing stale " + DFSClient.LOG.debug("Error making BlockReader. Closing stale " +
@ -1186,7 +1192,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
dfsClient.getConf(), file, block, blockToken, startOffset, dfsClient.getConf(), file, block, blockToken, startOffset,
len, verifyChecksum, clientName, peer, chosenNode, len, verifyChecksum, clientName, peer, chosenNode,
dsFactory, peerCache, fileInputStreamCache, false, dsFactory, peerCache, fileInputStreamCache, false,
cachingStrategy); curCachingStrategy);
} }
@ -1460,14 +1466,18 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
@Override @Override
public synchronized void setReadahead(Long readahead) public synchronized void setReadahead(Long readahead)
throws IOException { throws IOException {
this.cachingStrategy.setReadahead(readahead); this.cachingStrategy =
new CachingStrategy.Builder(this.cachingStrategy).
setReadahead(readahead).build();
closeCurrentBlockReader(); closeCurrentBlockReader();
} }
@Override @Override
public synchronized void setDropBehind(Boolean dropBehind) public synchronized void setDropBehind(Boolean dropBehind)
throws IOException { throws IOException {
this.cachingStrategy.setDropBehind(dropBehind); this.cachingStrategy =
new CachingStrategy.Builder(this.cachingStrategy).
setDropBehind(dropBehind).build();
closeCurrentBlockReader(); closeCurrentBlockReader();
} }

View File

@ -150,7 +150,7 @@ public class DFSOutputStream extends FSOutputSummer
private Progressable progress; private Progressable progress;
private final short blockReplication; // replication factor of file private final short blockReplication; // replication factor of file
private boolean shouldSyncBlock = false; // force blocks to disk upon close private boolean shouldSyncBlock = false; // force blocks to disk upon close
private CachingStrategy cachingStrategy; private AtomicReference<CachingStrategy> cachingStrategy;
private boolean failPacket = false; private boolean failPacket = false;
private class Packet { private class Packet {
@ -1170,7 +1170,7 @@ public class DFSOutputStream extends FSOutputSummer
new Sender(out).writeBlock(block, accessToken, dfsClient.clientName, new Sender(out).writeBlock(block, accessToken, dfsClient.clientName,
nodes, null, recoveryFlag? stage.getRecoveryStage() : stage, nodes, null, recoveryFlag? stage.getRecoveryStage() : stage,
nodes.length, block.getNumBytes(), bytesSent, newGS, checksum, nodes.length, block.getNumBytes(), bytesSent, newGS, checksum,
cachingStrategy); cachingStrategy.get());
// receive ack for connect // receive ack for connect
BlockOpResponseProto resp = BlockOpResponseProto.parseFrom( BlockOpResponseProto resp = BlockOpResponseProto.parseFrom(
@ -1364,8 +1364,8 @@ public class DFSOutputStream extends FSOutputSummer
this.blockSize = stat.getBlockSize(); this.blockSize = stat.getBlockSize();
this.blockReplication = stat.getReplication(); this.blockReplication = stat.getReplication();
this.progress = progress; this.progress = progress;
this.cachingStrategy = this.cachingStrategy = new AtomicReference<CachingStrategy>(
dfsClient.getDefaultWriteCachingStrategy().duplicate(); dfsClient.getDefaultWriteCachingStrategy());
if ((progress != null) && DFSClient.LOG.isDebugEnabled()) { if ((progress != null) && DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug( DFSClient.LOG.debug(
"Set non-null progress callback on DFSOutputStream " + src); "Set non-null progress callback on DFSOutputStream " + src);
@ -1982,7 +1982,14 @@ public class DFSOutputStream extends FSOutputSummer
@Override @Override
public void setDropBehind(Boolean dropBehind) throws IOException { public void setDropBehind(Boolean dropBehind) throws IOException {
this.cachingStrategy.setDropBehind(dropBehind); CachingStrategy prevStrategy, nextStrategy;
// CachingStrategy is immutable. So build a new CachingStrategy with the
// modifications we want, and compare-and-swap it in.
do {
prevStrategy = this.cachingStrategy.get();
nextStrategy = new CachingStrategy.Builder(prevStrategy).
setDropBehind(dropBehind).build();
} while (!this.cachingStrategy.compareAndSet(prevStrategy, nextStrategy));
} }
@VisibleForTesting @VisibleForTesting

View File

@ -21,8 +21,8 @@ package org.apache.hadoop.hdfs.server.datanode;
* The caching strategy we should use for an HDFS read or write operation. * The caching strategy we should use for an HDFS read or write operation.
*/ */
public class CachingStrategy { public class CachingStrategy {
private Boolean dropBehind; // null = use server defaults private final Boolean dropBehind; // null = use server defaults
private Long readahead; // null = use server defaults private final Long readahead; // null = use server defaults
public static CachingStrategy newDefaultStrategy() { public static CachingStrategy newDefaultStrategy() {
return new CachingStrategy(null, null); return new CachingStrategy(null, null);
@ -32,8 +32,28 @@ public class CachingStrategy {
return new CachingStrategy(true, null); return new CachingStrategy(true, null);
} }
public CachingStrategy duplicate() { public static class Builder {
return new CachingStrategy(this.dropBehind, this.readahead); private Boolean dropBehind;
private Long readahead;
public Builder(CachingStrategy prev) {
this.dropBehind = prev.dropBehind;
this.readahead = prev.readahead;
}
public Builder setDropBehind(Boolean dropBehind) {
this.dropBehind = dropBehind;
return this;
}
public Builder setReadahead(Long readahead) {
this.readahead = readahead;
return this;
}
public CachingStrategy build() {
return new CachingStrategy(dropBehind, readahead);
}
} }
public CachingStrategy(Boolean dropBehind, Long readahead) { public CachingStrategy(Boolean dropBehind, Long readahead) {
@ -45,18 +65,10 @@ public class CachingStrategy {
return dropBehind; return dropBehind;
} }
public void setDropBehind(Boolean dropBehind) {
this.dropBehind = dropBehind;
}
public Long getReadahead() { public Long getReadahead() {
return readahead; return readahead;
} }
public void setReadahead(Long readahead) {
this.readahead = readahead;
}
public String toString() { public String toString() {
return "CachingStrategy(dropBehind=" + dropBehind + return "CachingStrategy(dropBehind=" + dropBehind +
", readahead=" + readahead + ")"; ", readahead=" + readahead + ")";

View File

@ -28,6 +28,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.hdfs.net.Peer; import org.apache.hadoop.hdfs.net.Peer;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.junit.Assert; import org.junit.Assert;
@ -138,7 +139,8 @@ public class TestConnCache {
Matchers.anyLong(), Matchers.anyLong(),
Matchers.anyInt(), Matchers.anyInt(),
Matchers.anyBoolean(), Matchers.anyBoolean(),
Matchers.anyString()); Matchers.anyString(),
(CachingStrategy)Matchers.anyObject());
// Initial read // Initial read
pread(in, 0, dataBuf, 0, dataBuf.length, authenticData); pread(in, 0, dataBuf, 0, dataBuf.length, authenticData);