HDFS-4353. Encapsulate connections to peers in Peer and PeerServer classes. Contributed by Colin Patrick McCabe.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1430507 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Todd Lipcon 2013-01-08 20:44:09 +00:00
parent db99f7f67d
commit 239b2742d0
31 changed files with 1401 additions and 775 deletions

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.net; package org.apache.hadoop.net;
import java.io.IOException; import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import java.io.InputStream; import java.io.InputStream;
import java.net.Socket; import java.net.Socket;
import java.net.SocketTimeoutException; import java.net.SocketTimeoutException;
@ -37,7 +38,8 @@ import java.nio.channels.SelectionKey;
* IllegalBlockingModeException. * IllegalBlockingModeException.
* Please use {@link SocketOutputStream} for writing. * Please use {@link SocketOutputStream} for writing.
*/ */
class SocketInputStream extends InputStream @InterfaceAudience.LimitedPrivate("HDFS")
public class SocketInputStream extends InputStream
implements ReadableByteChannel { implements ReadableByteChannel {
private Reader reader; private Reader reader;

View File

@ -260,4 +260,8 @@ public class SocketOutputStream extends OutputStream
throws IOException { throws IOException {
transferToFully(fileCh, position, count, null, null); transferToFully(fileCh, position, count, null, null);
} }
public void setTimeout(int timeoutMs) {
writer.setTimeout(timeoutMs);
}
} }

View File

@ -180,6 +180,9 @@ Trunk (Unreleased)
HDFS-4352. Encapsulate arguments to BlockReaderFactory in a class HDFS-4352. Encapsulate arguments to BlockReaderFactory in a class
(Colin Patrick McCabe via todd) (Colin Patrick McCabe via todd)
HDFS-4353. Encapsulate connections to peers in Peer and PeerServer classes
(Colin Patrick McCabe via todd)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -18,10 +18,8 @@
package org.apache.hadoop.hdfs; package org.apache.hadoop.hdfs;
import java.io.IOException; import java.io.IOException;
import java.net.Socket;
import org.apache.hadoop.fs.ByteBufferReadable; import org.apache.hadoop.fs.ByteBufferReadable;
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
/** /**
* A BlockReader is responsible for reading a single block * A BlockReader is responsible for reading a single block
@ -43,7 +41,18 @@ public interface BlockReader extends ByteBufferReadable {
*/ */
long skip(long n) throws IOException; long skip(long n) throws IOException;
void close() throws IOException; /**
* Close the block reader.
*
* @param peerCache The PeerCache to put the Peer we're using back
* into, or null if we should simply close the Peer
* we're using (along with its Socket).
* Some block readers, like BlockReaderLocal, may
* not make use of this parameter.
*
* @throws IOException
*/
void close(PeerCache peerCache) throws IOException;
/** /**
* Read exactly the given amount of data, throwing an exception * Read exactly the given amount of data, throwing an exception
@ -60,20 +69,4 @@ public interface BlockReader extends ByteBufferReadable {
* filled or the next call will return EOF. * filled or the next call will return EOF.
*/ */
int readAll(byte[] buf, int offset, int len) throws IOException; int readAll(byte[] buf, int offset, int len) throws IOException;
/**
* Take the socket used to talk to the DN.
*/
Socket takeSocket();
/**
* Whether the BlockReader has reached the end of its input stream
* and successfully sent a status code back to the datanode.
*/
boolean hasSentStatusCode();
/**
* @return a reference to the streams this block reader is using.
*/
IOStreamPair getStreams();
} }

View File

@ -19,19 +19,18 @@ package org.apache.hadoop.hdfs;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.Socket;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.DFSClient.Conf; import org.apache.hadoop.hdfs.DFSClient.Conf;
import org.apache.hadoop.hdfs.net.Peer;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor;
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import com.google.common.base.Preconditions;
/** /**
* Utility class to create BlockReader implementations. * Utility class to create BlockReader implementations.
@ -47,18 +46,73 @@ public class BlockReaderFactory {
@InterfaceAudience.Private @InterfaceAudience.Private
public static class Params { public static class Params {
private final Conf conf; private final Conf conf;
private Socket socket = null; /**
* The peer that this BlockReader will be connected to.
* You must set this.
*/
private Peer peer = null;
/**
* The file name that this BlockReader pertains to.
* This is optional and only used for display and logging purposes.
*/
private String file = null; private String file = null;
/**
* The block that this BlockReader is reading.
* You must set this.
*/
private ExtendedBlock block = null; private ExtendedBlock block = null;
/**
* The BlockTokenIdentifier to use, or null to use none.
*/
private Token<BlockTokenIdentifier> blockToken = null; private Token<BlockTokenIdentifier> blockToken = null;
/**
* The offset in the block to start reading at.
*/
private long startOffset = 0; private long startOffset = 0;
/**
* The total number of bytes we might want to read, or -1 to assume no
* limit.
*/
private long len = -1; private long len = -1;
/**
* The buffer size to use.
*
* If this is not set, we will use the default from the Conf.
*/
private int bufferSize; private int bufferSize;
/**
* Whether or not we should verify the checksum.
*
* This is used instead of conf.verifyChecksum, because there are some
* cases when we may want to explicitly turn off checksum verification,
* such as when the caller has explicitly asked for a file to be opened
* without checksum verification.
*/
private boolean verifyChecksum = true; private boolean verifyChecksum = true;
/**
* Whether or not we should try to use short circuit local reads.
*/
private boolean shortCircuitLocalReads = false; private boolean shortCircuitLocalReads = false;
/**
* The name of the client using this BlockReader, for logging and
* debugging purposes.
*/
private String clientName = ""; private String clientName = "";
private DataEncryptionKey encryptionKey = null;
private IOStreamPair ioStreamPair = null; /**
* The DataNode on which this Block resides.
* You must set this.
*/
private DatanodeID datanodeID = null;
public Params(Conf conf) { public Params(Conf conf) {
this.conf = conf; this.conf = conf;
@ -67,11 +121,11 @@ public class BlockReaderFactory {
public Conf getConf() { public Conf getConf() {
return conf; return conf;
} }
public Socket getSocket() { public Peer getPeer() {
return socket; return peer;
} }
public Params setSocket(Socket socket) { public Params setPeer(Peer peer) {
this.socket = socket; this.peer = peer;
return this; return this;
} }
public String getFile() { public String getFile() {
@ -137,19 +191,12 @@ public class BlockReaderFactory {
this.clientName = clientName; this.clientName = clientName;
return this; return this;
} }
public Params setEncryptionKey(DataEncryptionKey encryptionKey) { public Params setDatanodeID(DatanodeID datanodeID) {
this.encryptionKey = encryptionKey; this.datanodeID = datanodeID;
return this; return this;
} }
public DataEncryptionKey getEncryptionKey() { public DatanodeID getDatanodeID() {
return encryptionKey; return datanodeID;
}
public IOStreamPair getIoStreamPair() {
return ioStreamPair;
}
public Params setIoStreamPair(IOStreamPair ioStreamPair) {
this.ioStreamPair = ioStreamPair;
return this;
} }
} }
@ -164,24 +211,27 @@ public class BlockReaderFactory {
*/ */
@SuppressWarnings("deprecation") @SuppressWarnings("deprecation")
public static BlockReader newBlockReader(Params params) throws IOException { public static BlockReader newBlockReader(Params params) throws IOException {
Preconditions.checkNotNull(params.getPeer());
Preconditions.checkNotNull(params.getBlock());
Preconditions.checkNotNull(params.getDatanodeID());
// First, let's set the read and write timeouts appropriately.
// This will keep us from blocking forever if something goes wrong during
// network communication.
Peer peer = params.getPeer();
peer.setReadTimeout(params.getConf().socketTimeout);
peer.setWriteTimeout(HdfsServerConstants.WRITE_TIMEOUT);
if (params.getConf().useLegacyBlockReader) { if (params.getConf().useLegacyBlockReader) {
if (params.getEncryptionKey() != null) { // The legacy BlockReader doesn't require that the Peers it uses
throw new RuntimeException("Encryption is not supported with the legacy block reader."); // have associated ReadableByteChannels. This makes it easier to use
} // with some older Socket classes like, say, SocksSocketImpl.
//
// TODO: create a wrapper class that makes channel-less sockets look like
// they have a channel, so that we can finally remove the legacy
// RemoteBlockReader. See HDFS-2534.
return RemoteBlockReader.newBlockReader(params); return RemoteBlockReader.newBlockReader(params);
} else { } else {
Socket sock = params.getSocket(); // The usual block reader.
if (params.getIoStreamPair() == null) {
params.setIoStreamPair(new IOStreamPair(NetUtils.getInputStream(sock),
NetUtils.getOutputStream(sock, HdfsServerConstants.WRITE_TIMEOUT)));
if (params.getEncryptionKey() != null) {
IOStreamPair encryptedStreams =
DataTransferEncryptor.getEncryptedStreams(
params.getIoStreamPair().out, params.getIoStreamPair().in,
params.getEncryptionKey());
params.setIoStreamPair(encryptedStreams);
}
}
return RemoteBlockReader2.newBlockReader(params); return RemoteBlockReader2.newBlockReader(params);
} }
} }
@ -197,4 +247,4 @@ public class BlockReaderFactory {
final String poolId, final long blockId) { final String poolId, final long blockId) {
return s.toString() + ":" + poolId + ":" + blockId; return s.toString() + ":" + poolId + ":" + blockId;
} }
} }

View File

@ -649,7 +649,7 @@ class BlockReaderLocal implements BlockReader {
} }
@Override @Override
public synchronized void close() throws IOException { public synchronized void close(PeerCache peerCache) throws IOException {
dataIn.close(); dataIn.close();
if (checksumIn != null) { if (checksumIn != null) {
checksumIn.close(); checksumIn.close();
@ -675,19 +675,4 @@ class BlockReaderLocal implements BlockReader {
public void readFully(byte[] buf, int off, int len) throws IOException { public void readFully(byte[] buf, int off, int len) throws IOException {
BlockReaderUtil.readFully(this, buf, off, len); BlockReaderUtil.readFully(this, buf, off, len);
} }
@Override
public Socket takeSocket() {
return null;
}
@Override
public boolean hasSentStatusCode() {
return false;
}
@Override
public IOStreamPair getStreams() {
return null;
}
} }

View File

@ -191,7 +191,7 @@ public class DFSClient implements java.io.Closeable {
final FileSystem.Statistics stats; final FileSystem.Statistics stats;
final int hdfsTimeout; // timeout value for a DFS operation. final int hdfsTimeout; // timeout value for a DFS operation.
private final String authority; private final String authority;
final SocketCache socketCache; final PeerCache peerCache;
final Conf dfsClientConf; final Conf dfsClientConf;
private Random r = new Random(); private Random r = new Random();
private SocketAddress[] localInterfaceAddrs; private SocketAddress[] localInterfaceAddrs;
@ -433,7 +433,7 @@ public class DFSClient implements java.io.Closeable {
Joiner.on(',').join(localInterfaceAddrs) + "]"); Joiner.on(',').join(localInterfaceAddrs) + "]");
} }
this.socketCache = SocketCache.getInstance(dfsClientConf.socketCacheCapacity, dfsClientConf.socketCacheExpiry); this.peerCache = PeerCache.getInstance(dfsClientConf.socketCacheCapacity, dfsClientConf.socketCacheExpiry);
} }
/** /**

View File

@ -32,12 +32,15 @@ import java.util.Map.Entry;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.ChecksumException; import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.fs.ByteBufferReadable; import org.apache.hadoop.fs.ByteBufferReadable;
import org.apache.hadoop.fs.FSInputStream; import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.fs.UnresolvedLinkException; import org.apache.hadoop.fs.UnresolvedLinkException;
import org.apache.hadoop.hdfs.SocketCache.SocketAndStreams; import org.apache.hadoop.hdfs.net.EncryptedPeer;
import org.apache.hadoop.hdfs.net.Peer;
import org.apache.hadoop.hdfs.net.TcpPeerServer;
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol; import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
@ -46,6 +49,7 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair; import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException; import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException; import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException; import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException;
import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RPC;
@ -60,7 +64,7 @@ import org.apache.hadoop.security.token.Token;
****************************************************************/ ****************************************************************/
@InterfaceAudience.Private @InterfaceAudience.Private
public class DFSInputStream extends FSInputStream implements ByteBufferReadable { public class DFSInputStream extends FSInputStream implements ByteBufferReadable {
private final SocketCache socketCache; private final PeerCache peerCache;
private final DFSClient dfsClient; private final DFSClient dfsClient;
private boolean closed = false; private boolean closed = false;
@ -110,7 +114,7 @@ public class DFSInputStream extends FSInputStream implements ByteBufferReadable
this.verifyChecksum = verifyChecksum; this.verifyChecksum = verifyChecksum;
this.buffersize = buffersize; this.buffersize = buffersize;
this.src = src; this.src = src;
this.socketCache = dfsClient.socketCache; this.peerCache = dfsClient.peerCache;
prefetchSize = dfsClient.getConf().prefetchSize; prefetchSize = dfsClient.getConf().prefetchSize;
timeWindow = dfsClient.getConf().timeWindow; timeWindow = dfsClient.getConf().timeWindow;
nCachedConnRetry = dfsClient.getConf().nCachedConnRetry; nCachedConnRetry = dfsClient.getConf().nCachedConnRetry;
@ -424,7 +428,7 @@ public class DFSInputStream extends FSInputStream implements ByteBufferReadable
// Will be getting a new BlockReader. // Will be getting a new BlockReader.
if (blockReader != null) { if (blockReader != null) {
closeBlockReader(blockReader); blockReader.close(peerCache);
blockReader = null; blockReader = null;
} }
@ -506,7 +510,7 @@ public class DFSInputStream extends FSInputStream implements ByteBufferReadable
dfsClient.checkOpen(); dfsClient.checkOpen();
if (blockReader != null) { if (blockReader != null) {
closeBlockReader(blockReader); blockReader.close(peerCache);
blockReader = null; blockReader = null;
} }
super.close(); super.close();
@ -833,7 +837,7 @@ public class DFSInputStream extends FSInputStream implements ByteBufferReadable
} }
} finally { } finally {
if (reader != null) { if (reader != null) {
closeBlockReader(reader); reader.close(peerCache);
} }
} }
// Put chosen node into dead list, continue // Put chosen node into dead list, continue
@ -841,16 +845,30 @@ public class DFSInputStream extends FSInputStream implements ByteBufferReadable
} }
} }
/** private Peer newPeer(InetSocketAddress addr) throws IOException {
* Close the given BlockReader and cache its socket. Peer peer = null;
*/ boolean success = false;
private void closeBlockReader(BlockReader reader) throws IOException { Socket sock = null;
if (reader.hasSentStatusCode()) { try {
IOStreamPair ioStreams = reader.getStreams(); sock = dfsClient.socketFactory.createSocket();
Socket oldSock = reader.takeSocket(); NetUtils.connect(sock, addr,
socketCache.put(oldSock, ioStreams); dfsClient.getRandomLocalInterfaceAddr(),
dfsClient.getConf().socketTimeout);
peer = TcpPeerServer.peerFromSocket(sock);
// Add encryption if configured.
DataEncryptionKey key = dfsClient.getDataEncryptionKey();
if (key != null) {
peer = new EncryptedPeer(peer, key);
}
success = true;
return peer;
} finally {
if (!success) {
IOUtils.closeQuietly(peer);
IOUtils.closeQuietly(sock);
}
} }
reader.close();
} }
/** /**
@ -896,40 +914,16 @@ public class DFSInputStream extends FSInputStream implements ByteBufferReadable
// Allow retry since there is no way of knowing whether the cached socket // Allow retry since there is no way of knowing whether the cached socket
// is good until we actually use it. // is good until we actually use it.
for (int retries = 0; retries <= nCachedConnRetry && fromCache; ++retries) { for (int retries = 0; retries <= nCachedConnRetry && fromCache; ++retries) {
SocketAndStreams sockAndStreams = null; Peer peer = null;
// Don't use the cache on the last attempt - it's possible that there // Don't use the cache on the last attempt - it's possible that there
// are arbitrarily many unusable sockets in the cache, but we don't // are arbitrarily many unusable sockets in the cache, but we don't
// want to fail the read. // want to fail the read.
if (retries < nCachedConnRetry) { if (retries < nCachedConnRetry) {
sockAndStreams = socketCache.get(dnAddr); peer = peerCache.get(chosenNode);
} }
Socket sock; if (peer == null) {
if (sockAndStreams == null) { peer = newPeer(dnAddr);
fromCache = false; fromCache = false;
sock = dfsClient.socketFactory.createSocket();
// TCP_NODELAY is crucial here because of bad interactions between
// Nagle's Algorithm and Delayed ACKs. With connection keepalive
// between the client and DN, the conversation looks like:
// 1. Client -> DN: Read block X
// 2. DN -> Client: data for block X
// 3. Client -> DN: Status OK (successful read)
// 4. Client -> DN: Read block Y
// The fact that step #3 and #4 are both in the client->DN direction
// triggers Nagling. If the DN is using delayed ACKs, this results
// in a delay of 40ms or more.
//
// TCP_NODELAY disables nagling and thus avoids this performance
// disaster.
sock.setTcpNoDelay(true);
NetUtils.connect(sock, dnAddr,
dfsClient.getRandomLocalInterfaceAddr(),
dfsClient.getConf().socketTimeout);
sock.setSoTimeout(dfsClient.getConf().socketTimeout);
} else {
sock = sockAndStreams.sock;
} }
try { try {
@ -939,19 +933,13 @@ public class DFSInputStream extends FSInputStream implements ByteBufferReadable
setFile(file).setBlock(block).setBlockToken(blockToken). setFile(file).setBlock(block).setBlockToken(blockToken).
setStartOffset(startOffset).setLen(len). setStartOffset(startOffset).setLen(len).
setBufferSize(bufferSize).setVerifyChecksum(verifyChecksum). setBufferSize(bufferSize).setVerifyChecksum(verifyChecksum).
setClientName(clientName). setClientName(clientName).setDatanodeID(chosenNode).
setEncryptionKey(dfsClient.getDataEncryptionKey()). setPeer(peer));
setIoStreamPair(sockAndStreams == null ? null : sockAndStreams.ioStreams).
setSocket(sock));
return reader; return reader;
} catch (IOException ex) { } catch (IOException ex) {
// Our socket is no good. // Our socket is no good.
DFSClient.LOG.debug("Error making BlockReader. Closing stale " + sock, ex); DFSClient.LOG.debug("Error making BlockReader. Closing stale " + peer, ex);
if (sockAndStreams != null) { IOUtils.closeQuietly(peer);
sockAndStreams.close();
} else {
sock.close();
}
err = ex; err = ex;
} }
} }

View File

@ -18,69 +18,55 @@
package org.apache.hadoop.hdfs; package org.apache.hadoop.hdfs;
import java.io.Closeable;
import java.net.Socket;
import java.net.SocketAddress;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.io.IOException; import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.collect.LinkedListMultimap; import com.google.common.collect.LinkedListMultimap;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.hdfs.net.Peer;
import org.apache.hadoop.util.Daemon; import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Time;
/** /**
* A cache of input stream sockets to Data Node. * A cache of input stream sockets to Data Node.
*/ */
class SocketCache { class PeerCache {
private static final Log LOG = LogFactory.getLog(SocketCache.class); private static final Log LOG = LogFactory.getLog(PeerCache.class);
private static class Value {
private final Peer peer;
private final long time;
@InterfaceAudience.Private Value(Peer peer, long time) {
static class SocketAndStreams implements Closeable { this.peer = peer;
public final Socket sock; this.time = time;
public final IOStreamPair ioStreams;
long createTime;
public SocketAndStreams(Socket s, IOStreamPair ioStreams) {
this.sock = s;
this.ioStreams = ioStreams;
this.createTime = Time.monotonicNow();
}
@Override
public void close() {
if (ioStreams != null) {
IOUtils.closeStream(ioStreams.in);
IOUtils.closeStream(ioStreams.out);
}
IOUtils.closeSocket(sock);
} }
public long getCreateTime() { Peer getPeer() {
return this.createTime; return peer;
}
long getTime() {
return time;
} }
} }
private Daemon daemon; private Daemon daemon;
/** A map for per user per datanode. */ /** A map for per user per datanode. */
private static LinkedListMultimap<SocketAddress, SocketAndStreams> multimap = private static LinkedListMultimap<DatanodeID, Value> multimap =
LinkedListMultimap.create(); LinkedListMultimap.create();
private static int capacity; private static int capacity;
private static long expiryPeriod; private static long expiryPeriod;
private static SocketCache scInstance = new SocketCache(); private static PeerCache instance = new PeerCache();
private static boolean isInitedOnce = false; private static boolean isInitedOnce = false;
public static synchronized SocketCache getInstance(int c, long e) { public static synchronized PeerCache getInstance(int c, long e) {
// capacity is only initialized once // capacity is only initialized once
if (isInitedOnce == false) { if (isInitedOnce == false) {
capacity = c; capacity = c;
@ -102,7 +88,7 @@ class SocketCache {
} }
} }
return scInstance; return instance;
} }
private boolean isDaemonStarted() { private boolean isDaemonStarted() {
@ -119,44 +105,45 @@ class SocketCache {
@Override @Override
public void run() { public void run() {
try { try {
SocketCache.this.run(); PeerCache.this.run();
} catch(InterruptedException e) { } catch(InterruptedException e) {
//noop //noop
} finally { } finally {
SocketCache.this.clear(); PeerCache.this.clear();
} }
} }
@Override @Override
public String toString() { public String toString() {
return String.valueOf(SocketCache.this); return String.valueOf(PeerCache.this);
} }
}); });
daemon.start(); daemon.start();
} }
/** /**
* Get a cached socket to the given address. * Get a cached peer connected to the given DataNode.
* @param remote Remote address the socket is connected to. * @param dnId The DataNode to get a Peer for.
* @return A socket with unknown state, possibly closed underneath. Or null. * @return An open Peer connected to the DN, or null if none
* was found.
*/ */
public synchronized SocketAndStreams get(SocketAddress remote) { public synchronized Peer get(DatanodeID dnId) {
if (capacity <= 0) { // disabled if (capacity <= 0) { // disabled
return null; return null;
} }
List<SocketAndStreams> sockStreamList = multimap.get(remote); List<Value> sockStreamList = multimap.get(dnId);
if (sockStreamList == null) { if (sockStreamList == null) {
return null; return null;
} }
Iterator<SocketAndStreams> iter = sockStreamList.iterator(); Iterator<Value> iter = sockStreamList.iterator();
while (iter.hasNext()) { while (iter.hasNext()) {
SocketAndStreams candidate = iter.next(); Value candidate = iter.next();
iter.remove(); iter.remove();
if (!candidate.sock.isClosed()) { if (!candidate.getPeer().isClosed()) {
return candidate; return candidate.getPeer();
} }
} }
return null; return null;
@ -166,30 +153,22 @@ class SocketCache {
* Give an unused socket to the cache. * Give an unused socket to the cache.
* @param sock socket not used by anyone. * @param sock socket not used by anyone.
*/ */
public synchronized void put(Socket sock, IOStreamPair ioStreams) { public synchronized void put(DatanodeID dnId, Peer peer) {
Preconditions.checkNotNull(dnId);
Preconditions.checkNotNull(sock); Preconditions.checkNotNull(peer);
SocketAndStreams s = new SocketAndStreams(sock, ioStreams); if (peer.isClosed()) return;
if (capacity <= 0) { if (capacity <= 0) {
// Cache disabled. // Cache disabled.
s.close(); IOUtils.cleanup(LOG, peer);
return; return;
} }
startExpiryDaemon(); startExpiryDaemon();
SocketAddress remoteAddr = sock.getRemoteSocketAddress();
if (remoteAddr == null) {
LOG.warn("Cannot cache (unconnected) socket with no remote address: " +
sock);
IOUtils.closeSocket(sock);
return;
}
if (capacity == multimap.size()) { if (capacity == multimap.size()) {
evictOldest(); evictOldest();
} }
multimap.put(remoteAddr, s); multimap.put(dnId, new Value(peer, Time.monotonicNow()));
} }
public synchronized int size() { public synchronized int size() {
@ -201,18 +180,17 @@ class SocketCache {
*/ */
private synchronized void evictExpired(long expiryPeriod) { private synchronized void evictExpired(long expiryPeriod) {
while (multimap.size() != 0) { while (multimap.size() != 0) {
Iterator<Entry<SocketAddress, SocketAndStreams>> iter = Iterator<Entry<DatanodeID, Value>> iter =
multimap.entries().iterator(); multimap.entries().iterator();
Entry<SocketAddress, SocketAndStreams> entry = iter.next(); Entry<DatanodeID, Value> entry = iter.next();
// if oldest socket expired, remove it // if oldest socket expired, remove it
if (entry == null || if (entry == null ||
Time.monotonicNow() - entry.getValue().getCreateTime() < Time.monotonicNow() - entry.getValue().getTime() <
expiryPeriod) { expiryPeriod) {
break; break;
} }
IOUtils.cleanup(LOG, entry.getValue().getPeer());
iter.remove(); iter.remove();
SocketAndStreams s = entry.getValue();
s.close();
} }
} }
@ -220,16 +198,18 @@ class SocketCache {
* Evict the oldest entry in the cache. * Evict the oldest entry in the cache.
*/ */
private synchronized void evictOldest() { private synchronized void evictOldest() {
Iterator<Entry<SocketAddress, SocketAndStreams>> iter = // We can get the oldest element immediately, because of an interesting
// property of LinkedListMultimap: its iterator traverses entries in the
// order that they were added.
Iterator<Entry<DatanodeID, Value>> iter =
multimap.entries().iterator(); multimap.entries().iterator();
if (!iter.hasNext()) { if (!iter.hasNext()) {
throw new IllegalStateException("Cannot evict from empty cache! " + throw new IllegalStateException("Cannot evict from empty cache! " +
"capacity: " + capacity); "capacity: " + capacity);
} }
Entry<SocketAddress, SocketAndStreams> entry = iter.next(); Entry<DatanodeID, Value> entry = iter.next();
IOUtils.cleanup(LOG, entry.getValue().getPeer());
iter.remove(); iter.remove();
SocketAndStreams s = entry.getValue();
s.close();
} }
/** /**
@ -253,9 +233,10 @@ class SocketCache {
/** /**
* Empty the cache, and close all sockets. * Empty the cache, and close all sockets.
*/ */
private synchronized void clear() { @VisibleForTesting
for (SocketAndStreams sockAndStream : multimap.values()) { synchronized void clear() {
sockAndStream.close(); for (Value value : multimap.values()) {
IOUtils.cleanup(LOG, value.getPeer());
} }
multimap.clear(); multimap.clear();
} }

View File

@ -25,25 +25,20 @@ import java.io.DataInputStream;
import java.io.DataOutputStream; import java.io.DataOutputStream;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.FSInputChecker; import org.apache.hadoop.fs.FSInputChecker;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.net.Peer;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil; import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader; import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReadOpChecksumInfoProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReadOpChecksumInfoProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.DataChecksum;
@ -56,7 +51,8 @@ import org.apache.hadoop.util.DataChecksum;
@Deprecated @Deprecated
public class RemoteBlockReader extends FSInputChecker implements BlockReader { public class RemoteBlockReader extends FSInputChecker implements BlockReader {
Socket dnSock; //for now just sending the status code (e.g. checksumOk) after the read. private final Peer peer;
private final DatanodeID datanodeID;
private final DataInputStream in; private final DataInputStream in;
private DataChecksum checksum; private DataChecksum checksum;
@ -126,9 +122,9 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader {
// if eos was set in the previous read, send a status code to the DN // if eos was set in the previous read, send a status code to the DN
if (eos && !eosBefore && nRead >= 0) { if (eos && !eosBefore && nRead >= 0) {
if (needChecksum()) { if (needChecksum()) {
sendReadResult(dnSock, Status.CHECKSUM_OK); sendReadResult(peer, Status.CHECKSUM_OK);
} else { } else {
sendReadResult(dnSock, Status.SUCCESS); sendReadResult(peer, Status.SUCCESS);
} }
} }
return nRead; return nRead;
@ -322,7 +318,8 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader {
private RemoteBlockReader(String file, String bpid, long blockId, private RemoteBlockReader(String file, String bpid, long blockId,
DataInputStream in, DataChecksum checksum, boolean verifyChecksum, DataInputStream in, DataChecksum checksum, boolean verifyChecksum,
long startOffset, long firstChunkOffset, long bytesToRead, Socket dnSock) { long startOffset, long firstChunkOffset, long bytesToRead,
Peer peer, DatanodeID datanodeID) {
// Path is used only for printing block and file information in debug // Path is used only for printing block and file information in debug
super(new Path("/blk_" + blockId + ":" + bpid + ":of:"+ file)/*too non path-like?*/, super(new Path("/blk_" + blockId + ":" + bpid + ":of:"+ file)/*too non path-like?*/,
1, verifyChecksum, 1, verifyChecksum,
@ -330,7 +327,8 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader {
checksum.getBytesPerChecksum(), checksum.getBytesPerChecksum(),
checksum.getChecksumSize()); checksum.getChecksumSize());
this.dnSock = dnSock; this.peer = peer;
this.datanodeID = datanodeID;
this.in = in; this.in = in;
this.checksum = checksum; this.checksum = checksum;
this.startOffset = Math.max( startOffset, 0 ); this.startOffset = Math.max( startOffset, 0 );
@ -367,9 +365,8 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader {
public static RemoteBlockReader newBlockReader(BlockReaderFactory.Params params) public static RemoteBlockReader newBlockReader(BlockReaderFactory.Params params)
throws IOException { throws IOException {
// in and out will be closed when sock is closed (by the caller) // in and out will be closed when sock is closed (by the caller)
Socket sock = params.getSocket();
final DataOutputStream out = new DataOutputStream(new BufferedOutputStream( final DataOutputStream out = new DataOutputStream(new BufferedOutputStream(
NetUtils.getOutputStream(sock, HdfsServerConstants.WRITE_TIMEOUT))); params.getPeer().getOutputStream()));
new Sender(out).readBlock(params.getBlock(), params.getBlockToken(), new Sender(out).readBlock(params.getBlock(), params.getBlockToken(),
params.getClientName(), params.getStartOffset(), params.getLen()); params.getClientName(), params.getStartOffset(), params.getLen());
@ -377,13 +374,13 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader {
// Get bytes in block, set streams // Get bytes in block, set streams
// //
DataInputStream in = new DataInputStream( DataInputStream in = new DataInputStream(
new BufferedInputStream(NetUtils.getInputStream(sock), new BufferedInputStream(params.getPeer().getInputStream(),
params.getBufferSize())); params.getBufferSize()));
BlockOpResponseProto status = BlockOpResponseProto.parseFrom( BlockOpResponseProto status = BlockOpResponseProto.parseFrom(
vintPrefixed(in)); vintPrefixed(in));
RemoteBlockReader2.checkSuccess(status, sock, params.getBlock(), RemoteBlockReader2.checkSuccess(status, params.getPeer(),
params.getFile()); params.getBlock(), params.getFile());
ReadOpChecksumInfoProto checksumInfo = ReadOpChecksumInfoProto checksumInfo =
status.getReadOpChecksumInfo(); status.getReadOpChecksumInfo();
DataChecksum checksum = DataTransferProtoUtil.fromProto( DataChecksum checksum = DataTransferProtoUtil.fromProto(
@ -402,18 +399,20 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader {
return new RemoteBlockReader(params.getFile(), params.getBlock().getBlockPoolId(), return new RemoteBlockReader(params.getFile(), params.getBlock().getBlockPoolId(),
params.getBlock().getBlockId(), in, checksum, params.getVerifyChecksum(), params.getBlock().getBlockId(), in, checksum, params.getVerifyChecksum(),
params.getStartOffset(), firstChunkOffset, params.getLen(), sock); params.getStartOffset(), firstChunkOffset, params.getLen(),
params.getPeer(), params.getDatanodeID());
} }
@Override @Override
public synchronized void close() throws IOException { public synchronized void close(PeerCache peerCache) throws IOException {
startOffset = -1; startOffset = -1;
checksum = null; checksum = null;
if (dnSock != null) { if (peerCache != null && sentStatusCode) {
dnSock.close(); peerCache.put(datanodeID, peer);
} else {
peer.close();
} }
// in will be closed when its Peer is closed.
// in will be closed when its Socket is closed.
} }
@Override @Override
@ -427,37 +426,21 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader {
return readFully(this, buf, offset, len); return readFully(this, buf, offset, len);
} }
@Override
public Socket takeSocket() {
assert hasSentStatusCode() :
"BlockReader shouldn't give back sockets mid-read";
Socket res = dnSock;
dnSock = null;
return res;
}
@Override
public boolean hasSentStatusCode() {
return sentStatusCode;
}
/** /**
* When the reader reaches end of the read, it sends a status response * When the reader reaches end of the read, it sends a status response
* (e.g. CHECKSUM_OK) to the DN. Failure to do so could lead to the DN * (e.g. CHECKSUM_OK) to the DN. Failure to do so could lead to the DN
* closing our connection (which we will re-open), but won't affect * closing our connection (which we will re-open), but won't affect
* data correctness. * data correctness.
*/ */
void sendReadResult(Socket sock, Status statusCode) { void sendReadResult(Peer peer, Status statusCode) {
assert !sentStatusCode : "already sent status code to " + sock; assert !sentStatusCode : "already sent status code to " + peer;
try { try {
RemoteBlockReader2.writeReadResult( RemoteBlockReader2.writeReadResult(peer.getOutputStream(), statusCode);
NetUtils.getOutputStream(sock, HdfsServerConstants.WRITE_TIMEOUT),
statusCode);
sentStatusCode = true; sentStatusCode = true;
} catch (IOException e) { } catch (IOException e) {
// It's ok not to be able to send this. But something is probably wrong. // It's ok not to be able to send this. But something is probably wrong.
LOG.info("Could not send read status (" + statusCode + ") to datanode " + LOG.info("Could not send read status (" + statusCode + ") to datanode " +
sock.getInetAddress() + ": " + e.getMessage()); datanodeID + ": " + e.getMessage());
} }
} }
@ -477,12 +460,4 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader {
public int read(ByteBuffer buf) throws IOException { public int read(ByteBuffer buf) throws IOException {
throw new UnsupportedOperationException("readDirect unsupported in RemoteBlockReader"); throw new UnsupportedOperationException("readDirect unsupported in RemoteBlockReader");
} }
@Override
public IOStreamPair getStreams() {
// This class doesn't support encryption, which is the only thing this
// method is used for. See HDFS-3637.
return null;
}
} }

View File

@ -25,16 +25,15 @@ import java.io.DataOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream; import java.io.OutputStream;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.ReadableByteChannel;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil; import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair; import org.apache.hadoop.hdfs.net.Peer;
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader; import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver; import org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver;
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
@ -42,13 +41,11 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseP
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientReadStatusProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientReadStatusProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReadOpChecksumInfoProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReadOpChecksumInfoProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException; import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
import org.apache.hadoop.net.SocketInputWrapper;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.DataChecksum;
import com.google.common.annotations.VisibleForTesting;
/** /**
* This is a wrapper around connection to datanode * This is a wrapper around connection to datanode
* and understands checksum, offset etc. * and understands checksum, offset etc.
@ -79,11 +76,8 @@ import org.apache.hadoop.util.DataChecksum;
public class RemoteBlockReader2 implements BlockReader { public class RemoteBlockReader2 implements BlockReader {
static final Log LOG = LogFactory.getLog(RemoteBlockReader2.class); static final Log LOG = LogFactory.getLog(RemoteBlockReader2.class);
private final DatanodeID datanodeID;
Socket dnSock; private final Peer peer;
// for now just sending the status code (e.g. checksumOk) after the read.
private IOStreamPair ioStreams;
private final ReadableByteChannel in;
private DataChecksum checksum; private DataChecksum checksum;
private PacketReceiver packetReceiver = new PacketReceiver(true); private PacketReceiver packetReceiver = new PacketReceiver(true);
@ -115,6 +109,11 @@ public class RemoteBlockReader2 implements BlockReader {
/** Amount of unread data in the current received packet */ /** Amount of unread data in the current received packet */
int dataLeft = 0; int dataLeft = 0;
@VisibleForTesting
public Peer getPeer() {
return peer;
}
@Override @Override
public synchronized int read(byte[] buf, int off, int len) public synchronized int read(byte[] buf, int off, int len)
throws IOException { throws IOException {
@ -155,7 +154,7 @@ public class RemoteBlockReader2 implements BlockReader {
private void readNextPacket() throws IOException { private void readNextPacket() throws IOException {
//Read packet headers. //Read packet headers.
packetReceiver.receiveNextPacket(in); packetReceiver.receiveNextPacket(peer.getInputStreamChannel());
PacketHeader curHeader = packetReceiver.getHeader(); PacketHeader curHeader = packetReceiver.getHeader();
curDataSlice = packetReceiver.getDataSlice(); curDataSlice = packetReceiver.getDataSlice();
@ -236,7 +235,7 @@ public class RemoteBlockReader2 implements BlockReader {
LOG.trace("Reading empty packet at end of read"); LOG.trace("Reading empty packet at end of read");
} }
packetReceiver.receiveNextPacket(in); packetReceiver.receiveNextPacket(peer.getInputStreamChannel());
PacketHeader trailer = packetReceiver.getHeader(); PacketHeader trailer = packetReceiver.getHeader();
if (!trailer.isLastPacketInBlock() || if (!trailer.isLastPacketInBlock() ||
@ -247,11 +246,10 @@ public class RemoteBlockReader2 implements BlockReader {
} }
protected RemoteBlockReader2(BlockReaderFactory.Params params, protected RemoteBlockReader2(BlockReaderFactory.Params params,
DataChecksum checksum, long firstChunkOffset, ReadableByteChannel in) { DataChecksum checksum, long firstChunkOffset) {
// Path is used only for printing block and file information in debug // Path is used only for printing block and file information in debug
this.dnSock = params.getSocket(); this.datanodeID = params.getDatanodeID();
this.ioStreams = params.getIoStreamPair(); this.peer = params.getPeer();
this.in = in;
this.checksum = checksum; this.checksum = checksum;
this.verifyChecksum = params.getVerifyChecksum(); this.verifyChecksum = params.getVerifyChecksum();
this.startOffset = Math.max( params.getStartOffset(), 0 ); this.startOffset = Math.max( params.getStartOffset(), 0 );
@ -268,38 +266,19 @@ public class RemoteBlockReader2 implements BlockReader {
@Override @Override
public synchronized void close() throws IOException { public synchronized void close(PeerCache peerCache) throws IOException {
packetReceiver.close(); packetReceiver.close();
startOffset = -1; startOffset = -1;
checksum = null; checksum = null;
if (dnSock != null) { if (peerCache != null && sentStatusCode) {
dnSock.close(); peerCache.put(datanodeID, peer);
} else {
peer.close();
} }
// in will be closed when its Socket is closed. // in will be closed when its Socket is closed.
} }
/**
* Take the socket used to talk to the DN.
*/
@Override
public Socket takeSocket() {
assert hasSentStatusCode() :
"BlockReader shouldn't give back sockets mid-read";
Socket res = dnSock;
dnSock = null;
return res;
}
/**
* Whether the BlockReader has reached the end of its input stream
* and successfully sent a status code back to the datanode.
*/
@Override
public boolean hasSentStatusCode() {
return sentStatusCode;
}
/** /**
* When the reader reaches end of the read, it sends a status response * When the reader reaches end of the read, it sends a status response
@ -308,14 +287,14 @@ public class RemoteBlockReader2 implements BlockReader {
* data correctness. * data correctness.
*/ */
void sendReadResult(Status statusCode) { void sendReadResult(Status statusCode) {
assert !sentStatusCode : "already sent status code to " + dnSock; assert !sentStatusCode : "already sent status code to " + peer;
try { try {
writeReadResult(ioStreams.out, statusCode); writeReadResult(peer.getOutputStream(), statusCode);
sentStatusCode = true; sentStatusCode = true;
} catch (IOException e) { } catch (IOException e) {
// It's ok not to be able to send this. But something is probably wrong. // It's ok not to be able to send this. But something is probably wrong.
LOG.info("Could not send read status (" + statusCode + ") to datanode " + LOG.info("Could not send read status (" + statusCode + ") to datanode " +
dnSock.getInetAddress() + ": " + e.getMessage()); peer.getRemoteAddressString() + ": " + e.getMessage());
} }
} }
@ -373,29 +352,20 @@ public class RemoteBlockReader2 implements BlockReader {
*/ */
public static BlockReader newBlockReader(BlockReaderFactory.Params params) public static BlockReader newBlockReader(BlockReaderFactory.Params params)
throws IOException { throws IOException {
IOStreamPair ioStreams = params.getIoStreamPair();
ReadableByteChannel ch;
if (ioStreams.in instanceof SocketInputWrapper) {
ch = ((SocketInputWrapper)ioStreams.in).getReadableByteChannel();
} else {
ch = (ReadableByteChannel) ioStreams.in;
}
// in and out will be closed when sock is closed (by the caller) // in and out will be closed when sock is closed (by the caller)
final DataOutputStream out = new DataOutputStream(new BufferedOutputStream( final DataOutputStream out = new DataOutputStream(new BufferedOutputStream(
ioStreams.out)); params.getPeer().getOutputStream()));
new Sender(out).readBlock(params.getBlock(), params.getBlockToken(), new Sender(out).readBlock(params.getBlock(), params.getBlockToken(),
params.getClientName(), params.getStartOffset(), params.getLen()); params.getClientName(), params.getStartOffset(), params.getLen());
// //
// Get bytes in block // Get bytes in block
// //
DataInputStream in = new DataInputStream(ioStreams.in); DataInputStream in = new DataInputStream(params.getPeer().getInputStream());
BlockOpResponseProto status = BlockOpResponseProto.parseFrom( BlockOpResponseProto status = BlockOpResponseProto.parseFrom(
vintPrefixed(in)); vintPrefixed(in));
checkSuccess(status, params.getSocket(), params.getBlock(), checkSuccess(status, params.getPeer(), params.getBlock(), params.getFile());
params.getFile());
ReadOpChecksumInfoProto checksumInfo = ReadOpChecksumInfoProto checksumInfo =
status.getReadOpChecksumInfo(); status.getReadOpChecksumInfo();
DataChecksum checksum = DataTransferProtoUtil.fromProto( DataChecksum checksum = DataTransferProtoUtil.fromProto(
@ -412,33 +382,28 @@ public class RemoteBlockReader2 implements BlockReader {
params.getStartOffset() + " for file " + params.getFile()); params.getStartOffset() + " for file " + params.getFile());
} }
return new RemoteBlockReader2(params, checksum, firstChunkOffset, ch); return new RemoteBlockReader2(params, checksum, firstChunkOffset);
} }
static void checkSuccess( static void checkSuccess(
BlockOpResponseProto status, Socket sock, BlockOpResponseProto status, Peer peer,
ExtendedBlock block, String file) ExtendedBlock block, String file)
throws IOException { throws IOException {
if (status.getStatus() != Status.SUCCESS) { if (status.getStatus() != Status.SUCCESS) {
if (status.getStatus() == Status.ERROR_ACCESS_TOKEN) { if (status.getStatus() == Status.ERROR_ACCESS_TOKEN) {
throw new InvalidBlockTokenException( throw new InvalidBlockTokenException(
"Got access token error for OP_READ_BLOCK, self=" "Got access token error for OP_READ_BLOCK, self="
+ sock.getLocalSocketAddress() + ", remote=" + peer.getLocalAddressString() + ", remote="
+ sock.getRemoteSocketAddress() + ", for file " + file + peer.getRemoteAddressString() + ", for file " + file
+ ", for pool " + block.getBlockPoolId() + " block " + ", for pool " + block.getBlockPoolId() + " block "
+ block.getBlockId() + "_" + block.getGenerationStamp()); + block.getBlockId() + "_" + block.getGenerationStamp());
} else { } else {
throw new IOException("Got error for OP_READ_BLOCK, self=" throw new IOException("Got error for OP_READ_BLOCK, self="
+ sock.getLocalSocketAddress() + ", remote=" + peer.getLocalAddressString() + ", remote="
+ sock.getRemoteSocketAddress() + ", for file " + file + peer.getRemoteAddressString() + ", for file " + file
+ ", for pool " + block.getBlockPoolId() + " block " + ", for pool " + block.getBlockPoolId() + " block "
+ block.getBlockId() + "_" + block.getGenerationStamp()); + block.getBlockId() + "_" + block.getGenerationStamp());
} }
} }
} }
@Override
public IOStreamPair getStreams() {
return ioStreams;
}
} }

View File

@ -0,0 +1,121 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.net;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.nio.channels.ReadableByteChannel;
/**
* Represents a peer that we communicate with by using a basic Socket
* that has no associated Channel.
*
*/
class BasicInetPeer implements Peer {
private final Socket socket;
private final OutputStream out;
private final InputStream in;
private final boolean isLocal;
public BasicInetPeer(Socket socket) throws IOException {
this.socket = socket;
this.out = socket.getOutputStream();
this.in = socket.getInputStream();
this.isLocal = socket.getInetAddress().equals(socket.getLocalAddress());
}
@Override
public ReadableByteChannel getInputStreamChannel() {
/*
* This Socket has no channel, so there's nothing to return here.
*/
return null;
}
@Override
public void setReadTimeout(int timeoutMs) throws IOException {
socket.setSoTimeout(timeoutMs);
}
@Override
public int getReceiveBufferSize() throws IOException {
return socket.getReceiveBufferSize();
}
@Override
public boolean getTcpNoDelay() throws IOException {
return socket.getTcpNoDelay();
}
@Override
public void setWriteTimeout(int timeoutMs) {
/*
* We can't implement write timeouts. :(
*
* Java provides no facility to set a blocking write timeout on a Socket.
* You can simulate a blocking write with a timeout by using
* non-blocking I/O. However, we can't use nio here, because this Socket
* doesn't have an associated Channel.
*
* See http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=4031100 for
* more details.
*/
}
@Override
public boolean isClosed() {
return socket.isClosed();
}
@Override
public void close() throws IOException {
socket.close();
}
@Override
public String getRemoteAddressString() {
return socket.getRemoteSocketAddress().toString();
}
@Override
public String getLocalAddressString() {
return socket.getLocalSocketAddress().toString();
}
@Override
public InputStream getInputStream() throws IOException {
return in;
}
@Override
public OutputStream getOutputStream() throws IOException {
return out;
}
@Override
public boolean isLocal() {
return isLocal;
}
@Override
public String toString() {
return "BasicInetPeer(" + socket.toString() + ")";
}
}

View File

@ -0,0 +1,136 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.net;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor;
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.channels.ReadableByteChannel;
/**
* Represents a peer that we communicate with by using an encrypted
* communications medium.
*/
@InterfaceAudience.Private
public class EncryptedPeer implements Peer {
private final Peer enclosedPeer;
/**
* An encrypted InputStream.
*/
private final InputStream in;
/**
* An encrypted OutputStream.
*/
private final OutputStream out;
/**
* An encrypted ReadableByteChannel.
*/
private final ReadableByteChannel channel;
public EncryptedPeer(Peer enclosedPeer, DataEncryptionKey key)
throws IOException {
this.enclosedPeer = enclosedPeer;
IOStreamPair ios = DataTransferEncryptor.getEncryptedStreams(
enclosedPeer.getOutputStream(), enclosedPeer.getInputStream(), key);
this.in = ios.in;
this.out = ios.out;
this.channel = ios.in instanceof ReadableByteChannel ?
(ReadableByteChannel)ios.in : null;
}
@Override
public ReadableByteChannel getInputStreamChannel() {
return channel;
}
@Override
public void setReadTimeout(int timeoutMs) throws IOException {
enclosedPeer.setReadTimeout(timeoutMs);
}
@Override
public int getReceiveBufferSize() throws IOException {
return enclosedPeer.getReceiveBufferSize();
}
@Override
public boolean getTcpNoDelay() throws IOException {
return enclosedPeer.getTcpNoDelay();
}
@Override
public void setWriteTimeout(int timeoutMs) throws IOException {
enclosedPeer.setWriteTimeout(timeoutMs);
}
@Override
public boolean isClosed() {
return enclosedPeer.isClosed();
}
@Override
public void close() throws IOException {
try {
in.close();
} finally {
try {
out.close();
} finally {
enclosedPeer.close();
}
}
}
@Override
public String getRemoteAddressString() {
return enclosedPeer.getRemoteAddressString();
}
@Override
public String getLocalAddressString() {
return enclosedPeer.getLocalAddressString();
}
@Override
public InputStream getInputStream() throws IOException {
return in;
}
@Override
public OutputStream getOutputStream() throws IOException {
return out;
}
@Override
public boolean isLocal() {
return enclosedPeer.isLocal();
}
@Override
public String toString() {
return "EncryptedPeer(" + enclosedPeer + ")";
}
}

View File

@ -0,0 +1,125 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.net;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.nio.channels.ReadableByteChannel;
import org.apache.hadoop.net.SocketInputStream;
import org.apache.hadoop.net.SocketOutputStream;
/**
* Represents a peer that we communicate with by using non-blocking I/O
* on a Socket.
*/
class NioInetPeer implements Peer {
private final Socket socket;
/**
* An InputStream which simulates blocking I/O with timeouts using NIO.
*/
private final SocketInputStream in;
/**
* An OutputStream which simulates blocking I/O with timeouts using NIO.
*/
private final SocketOutputStream out;
private final boolean isLocal;
NioInetPeer(Socket socket) throws IOException {
this.socket = socket;
this.in = new SocketInputStream(socket.getChannel(), 0);
this.out = new SocketOutputStream(socket.getChannel(), 0);
this.isLocal = socket.getInetAddress().equals(socket.getLocalAddress());
}
@Override
public ReadableByteChannel getInputStreamChannel() {
return socket.getChannel();
}
@Override
public void setReadTimeout(int timeoutMs) throws IOException {
in.setTimeout(timeoutMs);
}
@Override
public int getReceiveBufferSize() throws IOException {
return socket.getReceiveBufferSize();
}
@Override
public boolean getTcpNoDelay() throws IOException {
return socket.getTcpNoDelay();
}
@Override
public void setWriteTimeout(int timeoutMs) throws IOException {
out.setTimeout(timeoutMs);
}
@Override
public boolean isClosed() {
return socket.isClosed();
}
@Override
public void close() throws IOException {
// We always close the outermost streams-- in this case, 'in' and 'out'
// Closing either one of these will also close the Socket.
try {
in.close();
} finally {
out.close();
}
}
@Override
public String getRemoteAddressString() {
return socket.getRemoteSocketAddress().toString();
}
@Override
public String getLocalAddressString() {
return socket.getLocalSocketAddress().toString();
}
@Override
public InputStream getInputStream() throws IOException {
return in;
}
@Override
public OutputStream getOutputStream() throws IOException {
return out;
}
@Override
public boolean isLocal() {
return isLocal;
}
@Override
public String toString() {
return "NioInetPeer(" + socket.toString() + ")";
}
}

View File

@ -0,0 +1,108 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.net;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.channels.ReadableByteChannel;
import org.apache.hadoop.classification.InterfaceAudience;
/**
* Represents a connection to a peer.
*/
@InterfaceAudience.Private
public interface Peer extends Closeable {
/**
* @return The input stream channel associated with this
* peer, or null if it has none.
*/
public ReadableByteChannel getInputStreamChannel();
/**
* Set the read timeout on this peer.
*
* @param timeoutMs The timeout in milliseconds.
*/
public void setReadTimeout(int timeoutMs) throws IOException;
/**
* @return The receive buffer size.
*/
public int getReceiveBufferSize() throws IOException;
/**
* @return True if TCP_NODELAY is turned on.
*/
public boolean getTcpNoDelay() throws IOException;
/**
* Set the write timeout on this peer.
*
* Note: this is not honored for BasicInetPeer.
* See {@link BasicSocketPeer#setWriteTimeout} for details.
*
* @param timeoutMs The timeout in milliseconds.
*/
public void setWriteTimeout(int timeoutMs) throws IOException;
/**
* @return true only if the peer is closed.
*/
public boolean isClosed();
/**
* Close the peer.
*
* It's safe to re-close a Peer that is already closed.
*/
public void close() throws IOException;
/**
* @return A string representing the remote end of our
* connection to the peer.
*/
public String getRemoteAddressString();
/**
* @return A string representing the local end of our
* connection to the peer.
*/
public String getLocalAddressString();
/**
* @return An InputStream associated with the Peer.
* This InputStream will be valid until you close
* this peer with Peer#close.
*/
public InputStream getInputStream() throws IOException;
/**
* @return An OutputStream associated with the Peer.
* This OutputStream will be valid until you close
* this peer with Peer#close.
*/
public OutputStream getOutputStream() throws IOException;
/**
* @return True if the peer resides on the same
* computer as we.
*/
public boolean isLocal();
}

View File

@ -0,0 +1,60 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.net;
import java.io.Closeable;
import org.apache.hadoop.classification.InterfaceAudience;
import java.io.IOException;
import java.net.SocketTimeoutException;
@InterfaceAudience.Private
public interface PeerServer extends Closeable {
/**
* Set the receive buffer size of the PeerServer.
*
* @param size The receive buffer size.
*/
public void setReceiveBufferSize(int size) throws IOException;
/**
* Listens for a connection to be made to this server and accepts
* it. The method blocks until a connection is made.
*
* @exception IOException if an I/O error occurs when waiting for a
* connection.
* @exception SecurityException if a security manager exists and its
* <code>checkAccept</code> method doesn't allow the operation.
* @exception SocketTimeoutException if a timeout was previously set and
* the timeout has been reached.
*/
public Peer accept() throws IOException, SocketTimeoutException;
/**
* @return A string representation of the address we're
* listening on.
*/
public String getListeningString();
/**
* Free the resources associated with this peer server.
* This normally includes sockets, etc.
*
* @throws IOException If there is an error closing the PeerServer
*/
public void close() throws IOException;
}

View File

@ -0,0 +1,156 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.net;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketTimeoutException;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
import org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter.SecureResources;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ipc.Server;
@InterfaceAudience.Private
public class TcpPeerServer implements PeerServer {
static Log LOG = LogFactory.getLog(TcpPeerServer.class);
private final ServerSocket serverSocket;
public static Peer peerFromSocket(Socket socket)
throws IOException {
Peer peer = null;
boolean success = false;
try {
// TCP_NODELAY is crucial here because of bad interactions between
// Nagle's Algorithm and Delayed ACKs. With connection keepalive
// between the client and DN, the conversation looks like:
// 1. Client -> DN: Read block X
// 2. DN -> Client: data for block X
// 3. Client -> DN: Status OK (successful read)
// 4. Client -> DN: Read block Y
// The fact that step #3 and #4 are both in the client->DN direction
// triggers Nagling. If the DN is using delayed ACKs, this results
// in a delay of 40ms or more.
//
// TCP_NODELAY disables nagling and thus avoids this performance
// disaster.
socket.setTcpNoDelay(true);
SocketChannel channel = socket.getChannel();
if (channel == null) {
peer = new BasicInetPeer(socket);
} else {
peer = new NioInetPeer(socket);
}
success = true;
return peer;
} finally {
if (!success) {
if (peer != null) peer.close();
socket.close();
}
}
}
public static Peer peerFromSocketAndKey(Socket s,
DataEncryptionKey key) throws IOException {
Peer peer = null;
boolean success = false;
try {
peer = peerFromSocket(s);
if (key != null) {
peer = new EncryptedPeer(peer, key);
}
success = true;
return peer;
} finally {
if (!success) {
IOUtils.cleanup(null, peer);
}
}
}
/**
* Create a non-secure TcpPeerServer.
*
* @param socketWriteTimeout The Socket write timeout in ms.
* @param bindAddr The address to bind to.
* @throws IOException
*/
public TcpPeerServer(int socketWriteTimeout,
InetSocketAddress bindAddr) throws IOException {
this.serverSocket = (socketWriteTimeout > 0) ?
ServerSocketChannel.open().socket() : new ServerSocket();
Server.bind(serverSocket, bindAddr, 0);
}
/**
* Create a secure TcpPeerServer.
*
* @param secureResources Security resources.
*/
public TcpPeerServer(SecureResources secureResources) {
this.serverSocket = secureResources.getStreamingSocket();
}
/**
* @return the IP address which this TcpPeerServer is listening on.
*/
public InetSocketAddress getStreamingAddr() {
return new InetSocketAddress(
serverSocket.getInetAddress().getHostAddress(),
serverSocket.getLocalPort());
}
@Override
public void setReceiveBufferSize(int size) throws IOException {
this.serverSocket.setReceiveBufferSize(size);
}
@Override
public Peer accept() throws IOException, SocketTimeoutException {
Peer peer = peerFromSocket(serverSocket.accept());
return peer;
}
@Override
public String getListeningString() {
return serverSocket.getLocalSocketAddress().toString();
}
@Override
public void close() throws IOException {
try {
serverSocket.close();
} catch(IOException e) {
LOG.error("error closing TcpPeerServer: ", e);
}
}
@Override
public String toString() {
return "TcpPeerServer(" + getListeningString() + ")";
}
}

View File

@ -46,6 +46,8 @@ import org.apache.hadoop.hdfs.BlockReader;
import org.apache.hadoop.hdfs.BlockReaderFactory; import org.apache.hadoop.hdfs.BlockReaderFactory;
import org.apache.hadoop.hdfs.DFSClient.Conf; import org.apache.hadoop.hdfs.DFSClient.Conf;
import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.net.TcpPeerServer;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock;
@ -206,12 +208,14 @@ public class JspHelper {
// Use the block name for file name. // Use the block name for file name.
BlockReader blockReader = BlockReaderFactory.newBlockReader( BlockReader blockReader = BlockReaderFactory.newBlockReader(
new BlockReaderFactory.Params(new Conf(conf)). new BlockReaderFactory.Params(new Conf(conf)).
setSocket(s). setPeer(TcpPeerServer.peerFromSocketAndKey(s, encryptionKey)).
setBlockToken(blockToken).setStartOffset(offsetIntoBlock). setBlockToken(blockToken).setStartOffset(offsetIntoBlock).
setLen(amtToRead). setLen(amtToRead).
setEncryptionKey(encryptionKey).
setFile(BlockReaderFactory.getFileName(addr, poolId, blockId)). setFile(BlockReaderFactory.getFileName(addr, poolId, blockId)).
setBlock(new ExtendedBlock(poolId, blockId, 0, genStamp))); setBlock(new ExtendedBlock(poolId, blockId, 0, genStamp)).
setDatanodeID(new DatanodeID(addr.getAddress().toString(),
addr.getHostName(), poolId, addr.getPort(), 0, 0)));
byte[] buf = new byte[(int)amtToRead]; byte[] buf = new byte[(int)amtToRead];
int readOffset = 0; int readOffset = 0;
int retries = 2; int retries = 2;
@ -229,8 +233,7 @@ public class JspHelper {
amtToRead -= numRead; amtToRead -= numRead;
readOffset += numRead; readOffset += numRead;
} }
blockReader = null; blockReader.close(null);
s.close();
out.print(HtmlQuoting.quoteHtmlChars(new String(buf))); out.print(HtmlQuoting.quoteHtmlChars(new String(buf)));
} }

View File

@ -90,6 +90,7 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HDFSPolicyProvider; import org.apache.hadoop.hdfs.HDFSPolicyProvider;
import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.net.TcpPeerServer;
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo; import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol; import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
@ -522,24 +523,19 @@ public class DataNode extends Configured
private void initDataXceiver(Configuration conf) throws IOException { private void initDataXceiver(Configuration conf) throws IOException {
// find free port or use privileged port provided // find free port or use privileged port provided
ServerSocket ss; TcpPeerServer tcpPeerServer;
if (secureResources == null) { if (secureResources != null) {
InetSocketAddress addr = DataNode.getStreamingAddr(conf); tcpPeerServer = new TcpPeerServer(secureResources);
ss = (dnConf.socketWriteTimeout > 0) ?
ServerSocketChannel.open().socket() : new ServerSocket();
Server.bind(ss, addr, 0);
} else { } else {
ss = secureResources.getStreamingSocket(); tcpPeerServer = new TcpPeerServer(dnConf.socketWriteTimeout,
DataNode.getStreamingAddr(conf));
} }
ss.setReceiveBufferSize(HdfsConstants.DEFAULT_DATA_SOCKET_SIZE); tcpPeerServer.setReceiveBufferSize(HdfsConstants.DEFAULT_DATA_SOCKET_SIZE);
streamingAddr = tcpPeerServer.getStreamingAddr();
streamingAddr = new InetSocketAddress(ss.getInetAddress().getHostAddress(),
ss.getLocalPort());
LOG.info("Opened streaming server at " + streamingAddr); LOG.info("Opened streaming server at " + streamingAddr);
this.threadGroup = new ThreadGroup("dataXceiverServer"); this.threadGroup = new ThreadGroup("dataXceiverServer");
this.dataXceiverServer = new Daemon(threadGroup, this.dataXceiverServer = new Daemon(threadGroup,
new DataXceiverServer(ss, conf, this)); new DataXceiverServer(tcpPeerServer, conf, this));
this.threadGroup.setDaemon(true); // auto destroy when empty this.threadGroup.setDaemon(true); // auto destroy when empty
} }

View File

@ -39,6 +39,7 @@ import java.nio.channels.ClosedChannelException;
import java.util.Arrays; import java.util.Arrays;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.hadoop.hdfs.net.Peer;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants;
@ -64,7 +65,6 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.MD5Hash; import org.apache.hadoop.io.MD5Hash;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.net.SocketInputWrapper;
import org.apache.hadoop.security.token.SecretManager.InvalidToken; import org.apache.hadoop.security.token.SecretManager.InvalidToken;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.DataChecksum;
@ -79,8 +79,7 @@ class DataXceiver extends Receiver implements Runnable {
public static final Log LOG = DataNode.LOG; public static final Log LOG = DataNode.LOG;
static final Log ClientTraceLog = DataNode.ClientTraceLog; static final Log ClientTraceLog = DataNode.ClientTraceLog;
private final Socket s; private final Peer peer;
private final boolean isLocal; //is a local connection?
private final String remoteAddress; // address of remote side private final String remoteAddress; // address of remote side
private final String localAddress; // local address of this daemon private final String localAddress; // local address of this daemon
private final DataNode datanode; private final DataNode datanode;
@ -88,7 +87,7 @@ class DataXceiver extends Receiver implements Runnable {
private final DataXceiverServer dataXceiverServer; private final DataXceiverServer dataXceiverServer;
private final boolean connectToDnViaHostname; private final boolean connectToDnViaHostname;
private long opStartTime; //the start time of receiving an Op private long opStartTime; //the start time of receiving an Op
private final SocketInputWrapper socketIn; private final InputStream socketIn;
private OutputStream socketOut; private OutputStream socketOut;
/** /**
@ -97,25 +96,23 @@ class DataXceiver extends Receiver implements Runnable {
*/ */
private String previousOpClientName; private String previousOpClientName;
public static DataXceiver create(Socket s, DataNode dn, public static DataXceiver create(Peer peer, DataNode dn,
DataXceiverServer dataXceiverServer) throws IOException { DataXceiverServer dataXceiverServer) throws IOException {
return new DataXceiver(s, dn, dataXceiverServer); return new DataXceiver(peer, dn, dataXceiverServer);
} }
private DataXceiver(Socket s, private DataXceiver(Peer peer, DataNode datanode,
DataNode datanode,
DataXceiverServer dataXceiverServer) throws IOException { DataXceiverServer dataXceiverServer) throws IOException {
this.s = s; this.peer = peer;
this.dnConf = datanode.getDnConf(); this.dnConf = datanode.getDnConf();
this.socketIn = NetUtils.getInputStream(s); this.socketIn = peer.getInputStream();
this.socketOut = NetUtils.getOutputStream(s, dnConf.socketWriteTimeout); this.socketOut = peer.getOutputStream();
this.isLocal = s.getInetAddress().equals(s.getLocalAddress());
this.datanode = datanode; this.datanode = datanode;
this.dataXceiverServer = dataXceiverServer; this.dataXceiverServer = dataXceiverServer;
this.connectToDnViaHostname = datanode.getDnConf().connectToDnViaHostname; this.connectToDnViaHostname = datanode.getDnConf().connectToDnViaHostname;
remoteAddress = s.getRemoteSocketAddress().toString(); remoteAddress = peer.getRemoteAddressString();
localAddress = s.getLocalSocketAddress().toString(); localAddress = peer.getLocalAddressString();
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Number of active connections is: " LOG.debug("Number of active connections is: "
@ -155,11 +152,10 @@ class DataXceiver extends Receiver implements Runnable {
public void run() { public void run() {
int opsProcessed = 0; int opsProcessed = 0;
Op op = null; Op op = null;
dataXceiverServer.childSockets.add(s); dataXceiverServer.addPeer(peer);
try { try {
peer.setWriteTimeout(datanode.getDnConf().socketWriteTimeout);
InputStream input = socketIn; InputStream input = socketIn;
if (dnConf.encryptDataTransfer) { if (dnConf.encryptDataTransfer) {
IOStreamPair encryptedStreams = null; IOStreamPair encryptedStreams = null;
@ -169,8 +165,9 @@ class DataXceiver extends Receiver implements Runnable {
dnConf.encryptionAlgorithm); dnConf.encryptionAlgorithm);
} catch (InvalidMagicNumberException imne) { } catch (InvalidMagicNumberException imne) {
LOG.info("Failed to read expected encryption handshake from client " + LOG.info("Failed to read expected encryption handshake from client " +
"at " + s.getInetAddress() + ". Perhaps the client is running an " + "at " + peer.getRemoteAddressString() + ". Perhaps the client " +
"older version of Hadoop which does not support encryption"); "is running an older version of Hadoop which does not support " +
"encryption");
return; return;
} }
input = encryptedStreams.in; input = encryptedStreams.in;
@ -189,9 +186,9 @@ class DataXceiver extends Receiver implements Runnable {
try { try {
if (opsProcessed != 0) { if (opsProcessed != 0) {
assert dnConf.socketKeepaliveTimeout > 0; assert dnConf.socketKeepaliveTimeout > 0;
socketIn.setTimeout(dnConf.socketKeepaliveTimeout); peer.setReadTimeout(dnConf.socketKeepaliveTimeout);
} else { } else {
socketIn.setTimeout(dnConf.socketTimeout); peer.setReadTimeout(dnConf.socketTimeout);
} }
op = readOp(); op = readOp();
} catch (InterruptedIOException ignored) { } catch (InterruptedIOException ignored) {
@ -202,7 +199,7 @@ class DataXceiver extends Receiver implements Runnable {
if (opsProcessed > 0 && if (opsProcessed > 0 &&
(err instanceof EOFException || err instanceof ClosedChannelException)) { (err instanceof EOFException || err instanceof ClosedChannelException)) {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Cached " + s.toString() + " closing after " + opsProcessed + " ops"); LOG.debug("Cached " + peer + " closing after " + opsProcessed + " ops");
} }
} else { } else {
throw err; throw err;
@ -212,13 +209,13 @@ class DataXceiver extends Receiver implements Runnable {
// restore normal timeout // restore normal timeout
if (opsProcessed != 0) { if (opsProcessed != 0) {
s.setSoTimeout(dnConf.socketTimeout); peer.setReadTimeout(dnConf.socketTimeout);
} }
opStartTime = now(); opStartTime = now();
processOp(op); processOp(op);
++opsProcessed; ++opsProcessed;
} while (!s.isClosed() && dnConf.socketKeepaliveTimeout > 0); } while (!peer.isClosed() && dnConf.socketKeepaliveTimeout > 0);
} catch (Throwable t) { } catch (Throwable t) {
LOG.error(datanode.getDisplayName() + ":DataXceiver error processing " + LOG.error(datanode.getDisplayName() + ":DataXceiver error processing " +
((op == null) ? "unknown" : op.name()) + " operation " + ((op == null) ? "unknown" : op.name()) + " operation " +
@ -230,9 +227,8 @@ class DataXceiver extends Receiver implements Runnable {
+ datanode.getXceiverCount()); + datanode.getXceiverCount());
} }
updateCurrentThreadName("Cleaning up"); updateCurrentThreadName("Cleaning up");
dataXceiverServer.closePeer(peer);
IOUtils.closeStream(in); IOUtils.closeStream(in);
IOUtils.closeSocket(s);
dataXceiverServer.childSockets.remove(s);
} }
} }
@ -286,8 +282,9 @@ class DataXceiver extends Receiver implements Runnable {
ClientReadStatusProto stat = ClientReadStatusProto.parseFrom( ClientReadStatusProto stat = ClientReadStatusProto.parseFrom(
HdfsProtoUtil.vintPrefixed(in)); HdfsProtoUtil.vintPrefixed(in));
if (!stat.hasStatus()) { if (!stat.hasStatus()) {
LOG.warn("Client " + s.getInetAddress() + " did not send a valid status " + LOG.warn("Client " + peer.getRemoteAddressString() +
"code after reading. Will close connection."); " did not send a valid status code after reading. " +
"Will close connection.");
IOUtils.closeStream(out); IOUtils.closeStream(out);
} }
} catch (IOException ioe) { } catch (IOException ioe) {
@ -320,7 +317,7 @@ class DataXceiver extends Receiver implements Runnable {
//update metrics //update metrics
datanode.metrics.addReadBlockOp(elapsed()); datanode.metrics.addReadBlockOp(elapsed());
datanode.metrics.incrReadsFromClient(isLocal); datanode.metrics.incrReadsFromClient(peer.isLocal());
} }
@Override @Override
@ -358,8 +355,8 @@ class DataXceiver extends Receiver implements Runnable {
LOG.debug("isDatanode=" + isDatanode LOG.debug("isDatanode=" + isDatanode
+ ", isClient=" + isClient + ", isClient=" + isClient
+ ", isTransfer=" + isTransfer); + ", isTransfer=" + isTransfer);
LOG.debug("writeBlock receive buf size " + s.getReceiveBufferSize() + LOG.debug("writeBlock receive buf size " + peer.getReceiveBufferSize() +
" tcp no delay " + s.getTcpNoDelay()); " tcp no delay " + peer.getTcpNoDelay());
} }
// We later mutate block's generation stamp and length, but we need to // We later mutate block's generation stamp and length, but we need to
@ -390,8 +387,8 @@ class DataXceiver extends Receiver implements Runnable {
stage != BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) { stage != BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
// open a block receiver // open a block receiver
blockReceiver = new BlockReceiver(block, in, blockReceiver = new BlockReceiver(block, in,
s.getRemoteSocketAddress().toString(), peer.getRemoteAddressString(),
s.getLocalSocketAddress().toString(), peer.getLocalAddressString(),
stage, latestGenerationStamp, minBytesRcvd, maxBytesRcvd, stage, latestGenerationStamp, minBytesRcvd, maxBytesRcvd,
clientname, srcDataNode, datanode, requestedChecksum); clientname, srcDataNode, datanode, requestedChecksum);
} else { } else {
@ -546,7 +543,7 @@ class DataXceiver extends Receiver implements Runnable {
//update metrics //update metrics
datanode.metrics.addWriteBlockOp(elapsed()); datanode.metrics.addWriteBlockOp(elapsed());
datanode.metrics.incrWritesFromClient(isLocal); datanode.metrics.incrWritesFromClient(peer.isLocal());
} }
@Override @Override
@ -554,7 +551,7 @@ class DataXceiver extends Receiver implements Runnable {
final Token<BlockTokenIdentifier> blockToken, final Token<BlockTokenIdentifier> blockToken,
final String clientName, final String clientName,
final DatanodeInfo[] targets) throws IOException { final DatanodeInfo[] targets) throws IOException {
checkAccess(null, true, blk, blockToken, checkAccess(socketOut, true, blk, blockToken,
Op.TRANSFER_BLOCK, BlockTokenSecretManager.AccessMode.COPY); Op.TRANSFER_BLOCK, BlockTokenSecretManager.AccessMode.COPY);
previousOpClientName = clientName; previousOpClientName = clientName;
updateCurrentThreadName(Op.TRANSFER_BLOCK + " " + blk); updateCurrentThreadName(Op.TRANSFER_BLOCK + " " + blk);
@ -641,8 +638,9 @@ class DataXceiver extends Receiver implements Runnable {
} }
if (!dataXceiverServer.balanceThrottler.acquire()) { // not able to start if (!dataXceiverServer.balanceThrottler.acquire()) { // not able to start
String msg = "Not able to copy block " + block.getBlockId() + " to " String msg = "Not able to copy block " + block.getBlockId() + " " +
+ s.getRemoteSocketAddress() + " because threads quota is exceeded."; "to " + peer.getRemoteAddressString() + " because threads " +
"quota is exceeded.";
LOG.info(msg); LOG.info(msg);
sendResponse(ERROR, msg); sendResponse(ERROR, msg);
return; return;
@ -671,7 +669,7 @@ class DataXceiver extends Receiver implements Runnable {
datanode.metrics.incrBytesRead((int) read); datanode.metrics.incrBytesRead((int) read);
datanode.metrics.incrBlocksRead(); datanode.metrics.incrBlocksRead();
LOG.info("Copied " + block + " to " + s.getRemoteSocketAddress()); LOG.info("Copied " + block + " to " + peer.getRemoteAddressString());
} catch (IOException ioe) { } catch (IOException ioe) {
isOpSuccess = false; isOpSuccess = false;
LOG.info("opCopyBlock " + block + " received exception " + ioe); LOG.info("opCopyBlock " + block + " received exception " + ioe);
@ -716,8 +714,9 @@ class DataXceiver extends Receiver implements Runnable {
} }
if (!dataXceiverServer.balanceThrottler.acquire()) { // not able to start if (!dataXceiverServer.balanceThrottler.acquire()) { // not able to start
String msg = "Not able to receive block " + block.getBlockId() + " from " String msg = "Not able to receive block " + block.getBlockId() +
+ s.getRemoteSocketAddress() + " because threads quota is exceeded."; " from " + peer.getRemoteAddressString() + " because threads " +
"quota is exceeded.";
LOG.warn(msg); LOG.warn(msg);
sendResponse(ERROR, msg); sendResponse(ERROR, msg);
return; return;
@ -794,7 +793,7 @@ class DataXceiver extends Receiver implements Runnable {
// notify name node // notify name node
datanode.notifyNamenodeReceivedBlock(block, delHint); datanode.notifyNamenodeReceivedBlock(block, delHint);
LOG.info("Moved " + block + " from " + s.getRemoteSocketAddress()); LOG.info("Moved " + block + " from " + peer.getRemoteAddressString());
} catch (IOException ioe) { } catch (IOException ioe) {
opStatus = ERROR; opStatus = ERROR;
@ -817,7 +816,7 @@ class DataXceiver extends Receiver implements Runnable {
try { try {
sendResponse(opStatus, errMsg); sendResponse(opStatus, errMsg);
} catch (IOException ioe) { } catch (IOException ioe) {
LOG.warn("Error writing reply back to " + s.getRemoteSocketAddress()); LOG.warn("Error writing reply back to " + peer.getRemoteAddressString());
} }
IOUtils.closeStream(proxyOut); IOUtils.closeStream(proxyOut);
IOUtils.closeStream(blockReceiver); IOUtils.closeStream(blockReceiver);
@ -871,7 +870,7 @@ class DataXceiver extends Receiver implements Runnable {
} }
private void checkAccess(DataOutputStream out, final boolean reply, private void checkAccess(OutputStream out, final boolean reply,
final ExtendedBlock blk, final ExtendedBlock blk,
final Token<BlockTokenIdentifier> t, final Token<BlockTokenIdentifier> t,
final Op op, final Op op,
@ -886,11 +885,6 @@ class DataXceiver extends Receiver implements Runnable {
} catch(InvalidToken e) { } catch(InvalidToken e) {
try { try {
if (reply) { if (reply) {
if (out == null) {
out = new DataOutputStream(
NetUtils.getOutputStream(s, dnConf.socketWriteTimeout));
}
BlockOpResponseProto.Builder resp = BlockOpResponseProto.newBuilder() BlockOpResponseProto.Builder resp = BlockOpResponseProto.newBuilder()
.setStatus(ERROR_ACCESS_TOKEN); .setStatus(ERROR_ACCESS_TOKEN);
if (mode == BlockTokenSecretManager.AccessMode.WRITE) { if (mode == BlockTokenSecretManager.AccessMode.WRITE) {

View File

@ -18,18 +18,16 @@
package org.apache.hadoop.hdfs.server.datanode; package org.apache.hadoop.hdfs.server.datanode;
import java.io.IOException; import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketTimeoutException; import java.net.SocketTimeoutException;
import java.nio.channels.AsynchronousCloseException; import java.nio.channels.AsynchronousCloseException;
import java.util.Collections;
import java.util.HashSet; import java.util.HashSet;
import java.util.Iterator;
import java.util.Set; import java.util.Set;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.net.Peer;
import org.apache.hadoop.hdfs.net.PeerServer;
import org.apache.hadoop.hdfs.server.balancer.Balancer; import org.apache.hadoop.hdfs.server.balancer.Balancer;
import org.apache.hadoop.hdfs.util.DataTransferThrottler; import org.apache.hadoop.hdfs.util.DataTransferThrottler;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
@ -45,11 +43,9 @@ import org.apache.hadoop.util.Daemon;
class DataXceiverServer implements Runnable { class DataXceiverServer implements Runnable {
public static final Log LOG = DataNode.LOG; public static final Log LOG = DataNode.LOG;
ServerSocket ss; private final PeerServer peerServer;
DataNode datanode; private final DataNode datanode;
// Record all sockets opened for data transfer private final Set<Peer> peers = new HashSet<Peer>();
Set<Socket> childSockets = Collections.synchronizedSet(
new HashSet<Socket>());
/** /**
* Maximal number of concurrent xceivers per node. * Maximal number of concurrent xceivers per node.
@ -109,10 +105,10 @@ class DataXceiverServer implements Runnable {
long estimateBlockSize; long estimateBlockSize;
DataXceiverServer(ServerSocket ss, Configuration conf, DataXceiverServer(PeerServer peerServer, Configuration conf,
DataNode datanode) { DataNode datanode) {
this.ss = ss; this.peerServer = peerServer;
this.datanode = datanode; this.datanode = datanode;
this.maxXceiverCount = this.maxXceiverCount =
@ -130,12 +126,10 @@ class DataXceiverServer implements Runnable {
@Override @Override
public void run() { public void run() {
Peer peer = null;
while (datanode.shouldRun) { while (datanode.shouldRun) {
Socket s = null;
try { try {
s = ss.accept(); peer = peerServer.accept();
s.setTcpNoDelay(true);
// Timeouts are set within DataXceiver.run()
// Make sure the xceiver count is not exceeded // Make sure the xceiver count is not exceeded
int curXceiverCount = datanode.getXceiverCount(); int curXceiverCount = datanode.getXceiverCount();
@ -146,7 +140,7 @@ class DataXceiverServer implements Runnable {
} }
new Daemon(datanode.threadGroup, new Daemon(datanode.threadGroup,
DataXceiver.create(s, datanode, this)) DataXceiver.create(peer, datanode, this))
.start(); .start();
} catch (SocketTimeoutException ignored) { } catch (SocketTimeoutException ignored) {
// wake up to see if should continue to run // wake up to see if should continue to run
@ -157,10 +151,10 @@ class DataXceiverServer implements Runnable {
LOG.warn(datanode.getDisplayName() + ":DataXceiverServer: ", ace); LOG.warn(datanode.getDisplayName() + ":DataXceiverServer: ", ace);
} }
} catch (IOException ie) { } catch (IOException ie) {
IOUtils.closeSocket(s); IOUtils.cleanup(null, peer);
LOG.warn(datanode.getDisplayName() + ":DataXceiverServer: ", ie); LOG.warn(datanode.getDisplayName() + ":DataXceiverServer: ", ie);
} catch (OutOfMemoryError ie) { } catch (OutOfMemoryError ie) {
IOUtils.closeSocket(s); IOUtils.cleanup(null, peer);
// DataNode can run out of memory if there is too many transfers. // DataNode can run out of memory if there is too many transfers.
// Log the event, Sleep for 30 seconds, other transfers may complete by // Log the event, Sleep for 30 seconds, other transfers may complete by
// then. // then.
@ -176,33 +170,35 @@ class DataXceiverServer implements Runnable {
datanode.shouldRun = false; datanode.shouldRun = false;
} }
} }
synchronized (this) {
for (Peer p : peers) {
IOUtils.cleanup(LOG, p);
}
}
try { try {
ss.close(); peerServer.close();
} catch (IOException ie) { } catch (IOException ie) {
LOG.warn(datanode.getDisplayName() LOG.warn(datanode.getDisplayName()
+ " :DataXceiverServer: close exception", ie); + " :DataXceiverServer: close exception", ie);
} }
} }
void kill() { void kill() {
assert datanode.shouldRun == false : assert datanode.shouldRun == false :
"shoudRun should be set to false before killing"; "shoudRun should be set to false before killing";
try { try {
this.ss.close(); this.peerServer.close();
} catch (IOException ie) { } catch (IOException ie) {
LOG.warn(datanode.getDisplayName() + ":DataXceiverServer.kill(): ", ie); LOG.warn(datanode.getDisplayName() + ":DataXceiverServer.kill(): ", ie);
} }
}
synchronized void addPeer(Peer peer) {
peers.add(peer);
}
// close all the sockets that were accepted earlier synchronized void closePeer(Peer peer) {
synchronized (childSockets) { peers.remove(peer);
for (Iterator<Socket> it = childSockets.iterator(); IOUtils.cleanup(null, peer);
it.hasNext();) {
Socket thissock = it.next();
try {
thissock.close();
} catch (IOException e) {
}
}
}
} }
} }

View File

@ -43,6 +43,7 @@ import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSClient.Conf; import org.apache.hadoop.hdfs.DFSClient.Conf;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.net.TcpPeerServer;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.DirectoryListing; import org.apache.hadoop.hdfs.protocol.DirectoryListing;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
@ -559,13 +560,13 @@ public class NamenodeFsck {
blockReader = BlockReaderFactory.newBlockReader( blockReader = BlockReaderFactory.newBlockReader(
new BlockReaderFactory.Params(new Conf(conf)). new BlockReaderFactory.Params(new Conf(conf)).
setSocket(s).setBlock(block). setPeer(TcpPeerServer.peerFromSocketAndKey(s,
namenode.getRpcServer().getDataEncryptionKey())).
setBlock(block).
setFile(BlockReaderFactory.getFileName(targetAddr, setFile(BlockReaderFactory.getFileName(targetAddr,
block.getBlockPoolId(), block.getBlockId())). block.getBlockPoolId(), block.getBlockId())).
setBlockToken(lblock.getBlockToken()). setBlockToken(lblock.getBlockToken()).
setEncryptionKey(namenode.getRpcServer().getDataEncryptionKey()). setDatanodeID(chosenNode));
setLen(-1));
} catch (IOException ex) { } catch (IOException ex) {
// Put chosen node into dead list, continue // Put chosen node into dead list, continue
LOG.info("Failed to connect to " + targetAddr + ":" + ex); LOG.info("Failed to connect to " + targetAddr + ":" + ex);

View File

@ -32,6 +32,7 @@ import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSClient.Conf; import org.apache.hadoop.hdfs.DFSClient.Conf;
import org.apache.hadoop.hdfs.net.TcpPeerServer;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock;
@ -152,13 +153,13 @@ public class BlockReaderTestUtil {
return BlockReaderFactory.newBlockReader( return BlockReaderFactory.newBlockReader(
new BlockReaderFactory.Params(new Conf(conf)). new BlockReaderFactory.Params(new Conf(conf)).
setSocket(sock). setPeer(TcpPeerServer.peerFromSocket(sock)).
setFile(targetAddr.toString() + ":" + block.getBlockId()). setFile(targetAddr.toString() + ":" + block.getBlockId()).
setBlock(block).setBlockToken(testBlock.getBlockToken()). setBlock(block).setBlockToken(testBlock.getBlockToken()).
setStartOffset(offset).setLen(lenToRead). setStartOffset(offset).setLen(lenToRead).
setBufferSize(conf.getInt( setBufferSize(conf.getInt(
CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY, 4096)). CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY, 4096)).
setVerifyChecksum(true)); setVerifyChecksum(true).setDatanodeID(nodes[0]));
} }
/** /**

View File

@ -61,7 +61,7 @@ public class TestClientBlockVerification {
util.getBlockReader(testBlock, 0, FILE_SIZE_K * 1024)); util.getBlockReader(testBlock, 0, FILE_SIZE_K * 1024));
util.readAndCheckEOS(reader, FILE_SIZE_K * 1024, true); util.readAndCheckEOS(reader, FILE_SIZE_K * 1024, true);
verify(reader).sendReadResult(Status.CHECKSUM_OK); verify(reader).sendReadResult(Status.CHECKSUM_OK);
reader.close(); reader.close(null);
} }
/** /**
@ -76,7 +76,7 @@ public class TestClientBlockVerification {
// We asked the blockreader for the whole file, and only read // We asked the blockreader for the whole file, and only read
// half of it, so no CHECKSUM_OK // half of it, so no CHECKSUM_OK
verify(reader, never()).sendReadResult(Status.CHECKSUM_OK); verify(reader, never()).sendReadResult(Status.CHECKSUM_OK);
reader.close(); reader.close(null);
} }
/** /**
@ -92,7 +92,7 @@ public class TestClientBlockVerification {
// And read half the file // And read half the file
util.readAndCheckEOS(reader, FILE_SIZE_K * 1024 / 2, true); util.readAndCheckEOS(reader, FILE_SIZE_K * 1024 / 2, true);
verify(reader).sendReadResult(Status.CHECKSUM_OK); verify(reader).sendReadResult(Status.CHECKSUM_OK);
reader.close(); reader.close(null);
} }
/** /**
@ -111,7 +111,7 @@ public class TestClientBlockVerification {
util.getBlockReader(testBlock, startOffset, length)); util.getBlockReader(testBlock, startOffset, length));
util.readAndCheckEOS(reader, length, true); util.readAndCheckEOS(reader, length, true);
verify(reader).sendReadResult(Status.CHECKSUM_OK); verify(reader).sendReadResult(Status.CHECKSUM_OK);
reader.close(); reader.close(null);
} }
} }
} }

View File

@ -18,28 +18,20 @@
package org.apache.hadoop.hdfs; package org.apache.hadoop.hdfs;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.spy;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.Socket;
import java.security.PrivilegedExceptionAction; import junit.framework.Assert;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.net.Peer;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
import org.mockito.Matchers; import org.mockito.Matchers;
import org.mockito.Mockito; import org.mockito.Mockito;
@ -55,59 +47,31 @@ public class TestConnCache {
static final int BLOCK_SIZE = 4096; static final int BLOCK_SIZE = 4096;
static final int FILE_SIZE = 3 * BLOCK_SIZE; static final int FILE_SIZE = 3 * BLOCK_SIZE;
final static int CACHE_SIZE = 4;
final static long CACHE_EXPIRY_MS = 200;
static Configuration conf = null;
static MiniDFSCluster cluster = null;
static FileSystem fs = null;
static SocketCache cache;
static final Path testFile = new Path("/testConnCache.dat");
static byte authenticData[] = null;
static BlockReaderTestUtil util = null;
/** /**
* A mock Answer to remember the BlockReader used. * A mock Answer to remember the BlockReader used.
* *
* It verifies that all invocation to DFSInputStream.getBlockReader() * It verifies that all invocation to DFSInputStream.getBlockReader()
* use the same socket. * use the same peer.
*/ */
private class MockGetBlockReader implements Answer<RemoteBlockReader2> { private class MockGetBlockReader implements Answer<RemoteBlockReader2> {
public RemoteBlockReader2 reader = null; public RemoteBlockReader2 reader = null;
private Socket sock = null; private Peer peer = null;
@Override @Override
public RemoteBlockReader2 answer(InvocationOnMock invocation) throws Throwable { public RemoteBlockReader2 answer(InvocationOnMock invocation) throws Throwable {
RemoteBlockReader2 prevReader = reader; RemoteBlockReader2 prevReader = reader;
reader = (RemoteBlockReader2) invocation.callRealMethod(); reader = (RemoteBlockReader2) invocation.callRealMethod();
if (sock == null) { if (peer == null) {
sock = reader.dnSock; peer = reader.getPeer();
} else if (prevReader != null) { } else if (prevReader != null) {
assertSame("DFSInputStream should use the same socket", Assert.assertSame("DFSInputStream should use the same peer",
sock, reader.dnSock); peer, reader.getPeer());
} }
return reader; return reader;
} }
} }
@BeforeClass
public static void setupCluster() throws Exception {
final int REPLICATION_FACTOR = 1;
/* create a socket cache. There is only one socket cache per jvm */
cache = SocketCache.getInstance(CACHE_SIZE, CACHE_EXPIRY_MS);
util = new BlockReaderTestUtil(REPLICATION_FACTOR);
cluster = util.getCluster();
conf = util.getConf();
fs = cluster.getFileSystem();
authenticData = util.writeFile(testFile, FILE_SIZE / 1024);
}
/** /**
* (Optionally) seek to position, read and verify data. * (Optionally) seek to position, read and verify data.
* *
@ -117,9 +81,10 @@ public class TestConnCache {
long pos, long pos,
byte[] buffer, byte[] buffer,
int offset, int offset,
int length) int length,
byte[] authenticData)
throws IOException { throws IOException {
assertTrue("Test buffer too small", buffer.length >= offset + length); Assert.assertTrue("Test buffer too small", buffer.length >= offset + length);
if (pos >= 0) if (pos >= 0)
in.seek(pos); in.seek(pos);
@ -129,7 +94,7 @@ public class TestConnCache {
while (length > 0) { while (length > 0) {
int cnt = in.read(buffer, offset, length); int cnt = in.read(buffer, offset, length);
assertTrue("Error in read", cnt > 0); Assert.assertTrue("Error in read", cnt > 0);
offset += cnt; offset += cnt;
length -= cnt; length -= cnt;
} }
@ -144,116 +109,23 @@ public class TestConnCache {
} }
} }
/**
* Test the SocketCache itself.
*/
@Test
public void testSocketCache() throws Exception {
// Make a client
InetSocketAddress nnAddr =
new InetSocketAddress("localhost", cluster.getNameNodePort());
DFSClient client = new DFSClient(nnAddr, conf);
// Find out the DN addr
LocatedBlock block =
client.getNamenode().getBlockLocations(
testFile.toString(), 0, FILE_SIZE)
.getLocatedBlocks().get(0);
DataNode dn = util.getDataNode(block);
InetSocketAddress dnAddr = dn.getXferAddress();
// Make some sockets to the DN
Socket[] dnSockets = new Socket[CACHE_SIZE];
for (int i = 0; i < dnSockets.length; ++i) {
dnSockets[i] = client.socketFactory.createSocket(
dnAddr.getAddress(), dnAddr.getPort());
}
// Insert a socket to the NN
Socket nnSock = new Socket(nnAddr.getAddress(), nnAddr.getPort());
cache.put(nnSock, null);
assertSame("Read the write", nnSock, cache.get(nnAddr).sock);
cache.put(nnSock, null);
// Insert DN socks
for (Socket dnSock : dnSockets) {
cache.put(dnSock, null);
}
assertEquals("NN socket evicted", null, cache.get(nnAddr));
assertTrue("Evicted socket closed", nnSock.isClosed());
// Lookup the DN socks
for (Socket dnSock : dnSockets) {
assertEquals("Retrieve cached sockets", dnSock, cache.get(dnAddr).sock);
dnSock.close();
}
assertEquals("Cache is empty", 0, cache.size());
}
/**
* Test the SocketCache expiry.
* Verify that socket cache entries expire after the set
* expiry time.
*/
@Test
public void testSocketCacheExpiry() throws Exception {
// Make a client
InetSocketAddress nnAddr =
new InetSocketAddress("localhost", cluster.getNameNodePort());
DFSClient client = new DFSClient(nnAddr, conf);
// Find out the DN addr
LocatedBlock block =
client.getNamenode().getBlockLocations(
testFile.toString(), 0, FILE_SIZE)
.getLocatedBlocks().get(0);
DataNode dn = util.getDataNode(block);
InetSocketAddress dnAddr = dn.getXferAddress();
// Make some sockets to the DN and put in cache
Socket[] dnSockets = new Socket[CACHE_SIZE];
for (int i = 0; i < dnSockets.length; ++i) {
dnSockets[i] = client.socketFactory.createSocket(
dnAddr.getAddress(), dnAddr.getPort());
cache.put(dnSockets[i], null);
}
// Client side still has the sockets cached
assertEquals(CACHE_SIZE, client.socketCache.size());
//sleep for a second and see if it expired
Thread.sleep(CACHE_EXPIRY_MS + 1000);
// Client side has no sockets cached
assertEquals(0, client.socketCache.size());
//sleep for another second and see if
//the daemon thread runs fine on empty cache
Thread.sleep(CACHE_EXPIRY_MS + 1000);
}
/** /**
* Read a file served entirely from one DN. Seek around and read from * Read a file served entirely from one DN. Seek around and read from
* different offsets. And verify that they all use the same socket. * different offsets. And verify that they all use the same socket.
* * @throws Exception
* @throws java.io.IOException
*/ */
@Test @Test
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public void testReadFromOneDN() throws IOException { public void testReadFromOneDN() throws Exception {
LOG.info("Starting testReadFromOneDN()"); BlockReaderTestUtil util = new BlockReaderTestUtil(1,
new HdfsConfiguration());
final Path testFile = new Path("/testConnCache.dat");
byte authenticData[] = util.writeFile(testFile, FILE_SIZE / 1024);
DFSClient client = new DFSClient( DFSClient client = new DFSClient(
new InetSocketAddress("localhost", cluster.getNameNodePort()), conf); new InetSocketAddress("localhost",
DFSInputStream in = spy(client.open(testFile.toString())); util.getCluster().getNameNodePort()), util.getConf());
DFSInputStream in = Mockito.spy(client.open(testFile.toString()));
LOG.info("opened " + testFile.toString()); LOG.info("opened " + testFile.toString());
byte[] dataBuf = new byte[BLOCK_SIZE]; byte[] dataBuf = new byte[BLOCK_SIZE];
MockGetBlockReader answer = new MockGetBlockReader(); MockGetBlockReader answer = new MockGetBlockReader();
@ -270,18 +142,15 @@ public class TestConnCache {
Matchers.anyString()); Matchers.anyString());
// Initial read // Initial read
pread(in, 0, dataBuf, 0, dataBuf.length); pread(in, 0, dataBuf, 0, dataBuf.length, authenticData);
// Read again and verify that the socket is the same // Read again and verify that the socket is the same
pread(in, FILE_SIZE - dataBuf.length, dataBuf, 0, dataBuf.length); pread(in, FILE_SIZE - dataBuf.length, dataBuf, 0, dataBuf.length,
pread(in, 1024, dataBuf, 0, dataBuf.length); authenticData);
pread(in, -1, dataBuf, 0, dataBuf.length); // No seek; just read pread(in, 1024, dataBuf, 0, dataBuf.length, authenticData);
pread(in, 64, dataBuf, 0, dataBuf.length / 2); // No seek; just read
pread(in, -1, dataBuf, 0, dataBuf.length, authenticData);
pread(in, 64, dataBuf, 0, dataBuf.length / 2, authenticData);
in.close(); in.close();
} }
@AfterClass
public static void teardownCluster() throws Exception {
util.shutdown();
}
} }

View File

@ -35,6 +35,7 @@ import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties; import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
import org.apache.hadoop.hdfs.net.Peer;
import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@ -92,13 +93,13 @@ public class TestDataTransferKeepalive {
DFSTestUtil.createFile(fs, TEST_FILE, 1L, (short)1, 0L); DFSTestUtil.createFile(fs, TEST_FILE, 1L, (short)1, 0L);
// Clients that write aren't currently re-used. // Clients that write aren't currently re-used.
assertEquals(0, dfsClient.socketCache.size()); assertEquals(0, dfsClient.peerCache.size());
assertXceiverCount(0); assertXceiverCount(0);
// Reads the file, so we should get a // Reads the file, so we should get a
// cached socket, and should have an xceiver on the other side. // cached socket, and should have an xceiver on the other side.
DFSTestUtil.readFile(fs, TEST_FILE); DFSTestUtil.readFile(fs, TEST_FILE);
assertEquals(1, dfsClient.socketCache.size()); assertEquals(1, dfsClient.peerCache.size());
assertXceiverCount(1); assertXceiverCount(1);
// Sleep for a bit longer than the keepalive timeout // Sleep for a bit longer than the keepalive timeout
@ -109,13 +110,13 @@ public class TestDataTransferKeepalive {
// The socket is still in the cache, because we don't // The socket is still in the cache, because we don't
// notice that it's closed until we try to read // notice that it's closed until we try to read
// from it again. // from it again.
assertEquals(1, dfsClient.socketCache.size()); assertEquals(1, dfsClient.peerCache.size());
// Take it out of the cache - reading should // Take it out of the cache - reading should
// give an EOF. // give an EOF.
Socket s = dfsClient.socketCache.get(dnAddr).sock; Peer peer = dfsClient.peerCache.get(dn.getDatanodeId());
assertNotNull(s); assertNotNull(peer);
assertEquals(-1, NetUtils.getInputStream(s).read()); assertEquals(-1, peer.getInputStream().read());
} }
/** /**
@ -174,14 +175,14 @@ public class TestDataTransferKeepalive {
} }
DFSClient client = ((DistributedFileSystem)fs).dfs; DFSClient client = ((DistributedFileSystem)fs).dfs;
assertEquals(5, client.socketCache.size()); assertEquals(5, client.peerCache.size());
// Let all the xceivers timeout // Let all the xceivers timeout
Thread.sleep(1500); Thread.sleep(1500);
assertXceiverCount(0); assertXceiverCount(0);
// Client side still has the sockets cached // Client side still has the sockets cached
assertEquals(5, client.socketCache.size()); assertEquals(5, client.peerCache.size());
// Reading should not throw an exception. // Reading should not throw an exception.
DFSTestUtil.readFile(fs, TEST_FILE); DFSTestUtil.readFile(fs, TEST_FILE);

View File

@ -0,0 +1,62 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import static org.junit.Assert.assertEquals;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.junit.Test;
/**
* This class tests disabling client connection caching in a single node
* mini-cluster.
*/
public class TestDisableConnCache {
static final Log LOG = LogFactory.getLog(TestDisableConnCache.class);
static final int BLOCK_SIZE = 4096;
static final int FILE_SIZE = 3 * BLOCK_SIZE;
/**
* Test that the socket cache can be disabled by setting the capacity to
* 0. Regression test for HDFS-3365.
* @throws Exception
*/
@Test
public void testDisableCache() throws Exception {
HdfsConfiguration confWithoutCache = new HdfsConfiguration();
// Configure a new instance with no peer caching, ensure that it doesn't
// cache anything
confWithoutCache.setInt(
DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY, 0);
BlockReaderTestUtil util = new BlockReaderTestUtil(1, confWithoutCache);
final Path testFile = new Path("/testConnCache.dat");
util.writeFile(testFile, FILE_SIZE / 1024);
FileSystem fsWithoutCache = FileSystem.newInstance(util.getConf());
try {
DFSTestUtil.readFile(fsWithoutCache, testFile);
assertEquals(0, ((DistributedFileSystem)fsWithoutCache).dfs.peerCache.size());
} finally {
fsWithoutCache.close();
util.shutdown();
}
}
}

View File

@ -0,0 +1,218 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.channels.ReadableByteChannel;
import java.util.HashSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.net.Peer;
import org.junit.Test;
public class TestPeerCache {
static final Log LOG = LogFactory.getLog(TestPeerCache.class);
private static final int CAPACITY = 3;
private static final int EXPIRY_PERIOD = 20;
private static PeerCache cache =
PeerCache.getInstance(CAPACITY, EXPIRY_PERIOD);
private static class FakePeer implements Peer {
private boolean closed = false;
private DatanodeID dnId;
public FakePeer(DatanodeID dnId) {
this.dnId = dnId;
}
@Override
public ReadableByteChannel getInputStreamChannel() {
throw new UnsupportedOperationException();
}
@Override
public void setReadTimeout(int timeoutMs) throws IOException {
throw new UnsupportedOperationException();
}
@Override
public int getReceiveBufferSize() throws IOException {
throw new UnsupportedOperationException();
}
@Override
public boolean getTcpNoDelay() throws IOException {
return false;
}
@Override
public void setWriteTimeout(int timeoutMs) throws IOException {
throw new UnsupportedOperationException();
}
@Override
public boolean isClosed() {
return closed;
}
@Override
public void close() throws IOException {
closed = true;
}
@Override
public String getRemoteAddressString() {
return dnId.getInfoAddr();
}
@Override
public String getLocalAddressString() {
return "127.0.0.1:123";
}
@Override
public InputStream getInputStream() throws IOException {
throw new UnsupportedOperationException();
}
@Override
public OutputStream getOutputStream() throws IOException {
throw new UnsupportedOperationException();
}
@Override
public boolean isLocal() {
return true;
}
@Override
public String toString() {
return "FakePeer(dnId=" + dnId + ")";
}
}
@Test
public void testAddAndRetrieve() throws Exception {
DatanodeID dnId = new DatanodeID("192.168.0.1",
"fakehostname", "fake_storage_id",
100, 101, 102);
FakePeer peer = new FakePeer(dnId);
cache.put(dnId, peer);
assertTrue(!peer.isClosed());
assertEquals(1, cache.size());
assertEquals(peer, cache.get(dnId));
assertEquals(0, cache.size());
cache.clear();
}
@Test
public void testExpiry() throws Exception {
DatanodeID dnIds[] = new DatanodeID[CAPACITY];
FakePeer peers[] = new FakePeer[CAPACITY];
for (int i = 0; i < CAPACITY; ++i) {
dnIds[i] = new DatanodeID("192.168.0.1",
"fakehostname_" + i, "fake_storage_id",
100, 101, 102);
peers[i] = new FakePeer(dnIds[i]);
}
for (int i = 0; i < CAPACITY; ++i) {
cache.put(dnIds[i], peers[i]);
}
// Check that the peers are cached
assertEquals(CAPACITY, cache.size());
// Wait for the peers to expire
Thread.sleep(EXPIRY_PERIOD * 50);
assertEquals(0, cache.size());
// make sure that the peers were closed when they were expired
for (int i = 0; i < CAPACITY; ++i) {
assertTrue(peers[i].isClosed());
}
// sleep for another second and see if
// the daemon thread runs fine on empty cache
Thread.sleep(EXPIRY_PERIOD * 50);
cache.clear();
}
@Test
public void testEviction() throws Exception {
DatanodeID dnIds[] = new DatanodeID[CAPACITY + 1];
FakePeer peers[] = new FakePeer[CAPACITY + 1];
for (int i = 0; i < dnIds.length; ++i) {
dnIds[i] = new DatanodeID("192.168.0.1",
"fakehostname_" + i, "fake_storage_id_" + i,
100, 101, 102);
peers[i] = new FakePeer(dnIds[i]);
}
for (int i = 0; i < CAPACITY; ++i) {
cache.put(dnIds[i], peers[i]);
}
// Check that the peers are cached
assertEquals(CAPACITY, cache.size());
// Add another entry and check that the first entry was evicted
cache.put(dnIds[CAPACITY], peers[CAPACITY]);
assertEquals(CAPACITY, cache.size());
assertSame(null, cache.get(dnIds[0]));
// Make sure that the other entries are still there
for (int i = 1; i < CAPACITY; ++i) {
Peer peer = cache.get(dnIds[i]);
assertSame(peers[i], peer);
assertTrue(!peer.isClosed());
peer.close();
}
assertEquals(1, cache.size());
cache.clear();
}
@Test
public void testMultiplePeersWithSameDnId() throws Exception {
DatanodeID dnId = new DatanodeID("192.168.0.1",
"fakehostname", "fake_storage_id",
100, 101, 102);
HashSet<FakePeer> peers = new HashSet<FakePeer>(CAPACITY);
for (int i = 0; i < CAPACITY; ++i) {
FakePeer peer = new FakePeer(dnId);
peers.add(peer);
cache.put(dnId, peer);
}
// Check that all of the peers ended up in the cache
assertEquals(CAPACITY, cache.size());
while (!peers.isEmpty()) {
Peer peer = cache.get(dnId);
assertTrue(peer != null);
assertTrue(!peer.isClosed());
peers.remove(peer);
}
assertEquals(0, cache.size());
cache.clear();
}
}

View File

@ -1,171 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.spy;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.security.PrivilegedExceptionAction;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.security.token.Token;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
/**
* This class tests the client connection caching in a single node
* mini-cluster.
*/
public class TestSocketCache {
static final Log LOG = LogFactory.getLog(TestSocketCache.class);
static final int BLOCK_SIZE = 4096;
static final int FILE_SIZE = 3 * BLOCK_SIZE;
final static int CACHE_SIZE = 4;
final static long CACHE_EXPIRY_MS = 200;
static Configuration conf = null;
static MiniDFSCluster cluster = null;
static FileSystem fs = null;
static SocketCache cache;
static final Path testFile = new Path("/testConnCache.dat");
static byte authenticData[] = null;
static BlockReaderTestUtil util = null;
/**
* A mock Answer to remember the BlockReader used.
*
* It verifies that all invocation to DFSInputStream.getBlockReader()
* use the same socket.
*/
private class MockGetBlockReader implements Answer<RemoteBlockReader2> {
public RemoteBlockReader2 reader = null;
private Socket sock = null;
@Override
public RemoteBlockReader2 answer(InvocationOnMock invocation) throws Throwable {
RemoteBlockReader2 prevReader = reader;
reader = (RemoteBlockReader2) invocation.callRealMethod();
if (sock == null) {
sock = reader.dnSock;
} else if (prevReader != null) {
assertSame("DFSInputStream should use the same socket",
sock, reader.dnSock);
}
return reader;
}
}
@BeforeClass
public static void setupCluster() throws Exception {
final int REPLICATION_FACTOR = 1;
HdfsConfiguration confWithoutCache = new HdfsConfiguration();
confWithoutCache.setInt(
DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY, 0);
util = new BlockReaderTestUtil(REPLICATION_FACTOR, confWithoutCache);
cluster = util.getCluster();
conf = util.getConf();
authenticData = util.writeFile(testFile, FILE_SIZE / 1024);
}
/**
* (Optionally) seek to position, read and verify data.
*
* Seek to specified position if pos is non-negative.
*/
private void pread(DFSInputStream in,
long pos,
byte[] buffer,
int offset,
int length)
throws IOException {
assertTrue("Test buffer too small", buffer.length >= offset + length);
if (pos >= 0)
in.seek(pos);
LOG.info("Reading from file of size " + in.getFileLength() +
" at offset " + in.getPos());
while (length > 0) {
int cnt = in.read(buffer, offset, length);
assertTrue("Error in read", cnt > 0);
offset += cnt;
length -= cnt;
}
// Verify
for (int i = 0; i < length; ++i) {
byte actual = buffer[i];
byte expect = authenticData[(int)pos + i];
assertEquals("Read data mismatch at file offset " + (pos + i) +
". Expects " + expect + "; got " + actual,
actual, expect);
}
}
/**
* Test that the socket cache can be disabled by setting the capacity to
* 0. Regression test for HDFS-3365.
*/
@Test
public void testDisableCache() throws IOException {
LOG.info("Starting testDisableCache()");
// Configure a new instance with no caching, ensure that it doesn't
// cache anything
FileSystem fsWithoutCache = FileSystem.newInstance(conf);
try {
DFSTestUtil.readFile(fsWithoutCache, testFile);
assertEquals(0, ((DistributedFileSystem)fsWithoutCache).dfs.socketCache.size());
} finally {
fsWithoutCache.close();
}
}
@AfterClass
public static void teardownCluster() throws Exception {
util.shutdown();
}
}

View File

@ -43,6 +43,7 @@ import org.apache.hadoop.hdfs.DFSClient.Conf;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.net.TcpPeerServer;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock;
@ -147,9 +148,10 @@ public class TestBlockTokenWithDFS {
"test-blockpoolid", block.getBlockId()); "test-blockpoolid", block.getBlockId());
blockReader = BlockReaderFactory.newBlockReader( blockReader = BlockReaderFactory.newBlockReader(
new BlockReaderFactory.Params(new Conf(conf)). new BlockReaderFactory.Params(new Conf(conf)).
setSocket(s).setBlock(block).setFile(file). setPeer(TcpPeerServer.peerFromSocket(s)).
setBlock(block).setFile(file).
setBlockToken(lblock.getBlockToken()).setStartOffset(0). setBlockToken(lblock.getBlockToken()).setStartOffset(0).
setLen(-1)); setLen(-1).setDatanodeID(nodes[0]));
} catch (IOException ex) { } catch (IOException ex) {
if (ex instanceof InvalidBlockTokenException) { if (ex instanceof InvalidBlockTokenException) {
assertFalse("OP_READ_BLOCK: access token is invalid, " assertFalse("OP_READ_BLOCK: access token is invalid, "

View File

@ -39,6 +39,7 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.net.TcpPeerServer;
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
@ -284,8 +285,9 @@ public class TestDataNodeVolumeFailure {
setFile(BlockReaderFactory.getFileName(targetAddr, setFile(BlockReaderFactory.getFileName(targetAddr,
"test-blockpoolid", block.getBlockId())). "test-blockpoolid", block.getBlockId())).
setBlock(block).setBlockToken(lblock.getBlockToken()). setBlock(block).setBlockToken(lblock.getBlockToken()).
setSocket(s)); setPeer(TcpPeerServer.peerFromSocket(s)).
blockReader.close(); setDatanodeID(datanode));
blockReader.close(null);
} }
/** /**