diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/SocketInputStream.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/SocketInputStream.java index 46039a5506e..cfa7b01e813 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/SocketInputStream.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/SocketInputStream.java @@ -19,6 +19,7 @@ package org.apache.hadoop.net; import java.io.IOException; +import org.apache.hadoop.classification.InterfaceAudience; import java.io.InputStream; import java.net.Socket; import java.net.SocketTimeoutException; @@ -37,7 +38,8 @@ import java.nio.channels.SelectionKey; * IllegalBlockingModeException. * Please use {@link SocketOutputStream} for writing. */ -class SocketInputStream extends InputStream +@InterfaceAudience.LimitedPrivate("HDFS") +public class SocketInputStream extends InputStream implements ReadableByteChannel { private Reader reader; diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/SocketOutputStream.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/SocketOutputStream.java index 091c684059b..ead1d7b2b05 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/SocketOutputStream.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/SocketOutputStream.java @@ -260,4 +260,8 @@ public class SocketOutputStream extends OutputStream throws IOException { transferToFully(fileCh, position, count, null, null); } + + public void setTimeout(int timeoutMs) { + writer.setTimeout(timeoutMs); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 6ad7ccbb8e4..064e56f6ca9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -180,6 +180,9 @@ Trunk (Unreleased) HDFS-4352. Encapsulate arguments to BlockReaderFactory in a class (Colin Patrick McCabe via todd) + HDFS-4353. Encapsulate connections to peers in Peer and PeerServer classes + (Colin Patrick McCabe via todd) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java index cd6dc2d25ed..2bbae525898 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java @@ -18,10 +18,8 @@ package org.apache.hadoop.hdfs; import java.io.IOException; -import java.net.Socket; import org.apache.hadoop.fs.ByteBufferReadable; -import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair; /** * A BlockReader is responsible for reading a single block @@ -43,7 +41,18 @@ public interface BlockReader extends ByteBufferReadable { */ long skip(long n) throws IOException; - void close() throws IOException; + /** + * Close the block reader. + * + * @param peerCache The PeerCache to put the Peer we're using back + * into, or null if we should simply close the Peer + * we're using (along with its Socket). + * Some block readers, like BlockReaderLocal, may + * not make use of this parameter. + * + * @throws IOException + */ + void close(PeerCache peerCache) throws IOException; /** * Read exactly the given amount of data, throwing an exception @@ -60,20 +69,4 @@ public interface BlockReader extends ByteBufferReadable { * filled or the next call will return EOF. */ int readAll(byte[] buf, int offset, int len) throws IOException; - - /** - * Take the socket used to talk to the DN. - */ - Socket takeSocket(); - - /** - * Whether the BlockReader has reached the end of its input stream - * and successfully sent a status code back to the datanode. - */ - boolean hasSentStatusCode(); - - /** - * @return a reference to the streams this block reader is using. - */ - IOStreamPair getStreams(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java index 26ae5fb9258..e00e99095c3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java @@ -19,19 +19,18 @@ package org.apache.hadoop.hdfs; import java.io.IOException; import java.net.InetSocketAddress; -import java.net.Socket; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hdfs.DFSClient.Conf; +import org.apache.hadoop.hdfs.net.Peer; +import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; -import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor; -import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair; -import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; -import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.token.Token; +import com.google.common.base.Preconditions; + /** * Utility class to create BlockReader implementations. @@ -47,18 +46,73 @@ public class BlockReaderFactory { @InterfaceAudience.Private public static class Params { private final Conf conf; - private Socket socket = null; + /** + * The peer that this BlockReader will be connected to. + * You must set this. + */ + private Peer peer = null; + + /** + * The file name that this BlockReader pertains to. + * This is optional and only used for display and logging purposes. + */ private String file = null; + + /** + * The block that this BlockReader is reading. + * You must set this. + */ private ExtendedBlock block = null; + + /** + * The BlockTokenIdentifier to use, or null to use none. + */ private Token blockToken = null; + + /** + * The offset in the block to start reading at. + */ private long startOffset = 0; + + /** + * The total number of bytes we might want to read, or -1 to assume no + * limit. + */ private long len = -1; + + /** + * The buffer size to use. + * + * If this is not set, we will use the default from the Conf. + */ private int bufferSize; + + /** + * Whether or not we should verify the checksum. + * + * This is used instead of conf.verifyChecksum, because there are some + * cases when we may want to explicitly turn off checksum verification, + * such as when the caller has explicitly asked for a file to be opened + * without checksum verification. + */ private boolean verifyChecksum = true; + + /** + * Whether or not we should try to use short circuit local reads. + */ private boolean shortCircuitLocalReads = false; + + /** + * The name of the client using this BlockReader, for logging and + * debugging purposes. + */ private String clientName = ""; - private DataEncryptionKey encryptionKey = null; - private IOStreamPair ioStreamPair = null; + + /** + * The DataNode on which this Block resides. + * You must set this. + */ + private DatanodeID datanodeID = null; public Params(Conf conf) { this.conf = conf; @@ -67,11 +121,11 @@ public class BlockReaderFactory { public Conf getConf() { return conf; } - public Socket getSocket() { - return socket; + public Peer getPeer() { + return peer; } - public Params setSocket(Socket socket) { - this.socket = socket; + public Params setPeer(Peer peer) { + this.peer = peer; return this; } public String getFile() { @@ -137,19 +191,12 @@ public class BlockReaderFactory { this.clientName = clientName; return this; } - public Params setEncryptionKey(DataEncryptionKey encryptionKey) { - this.encryptionKey = encryptionKey; + public Params setDatanodeID(DatanodeID datanodeID) { + this.datanodeID = datanodeID; return this; } - public DataEncryptionKey getEncryptionKey() { - return encryptionKey; - } - public IOStreamPair getIoStreamPair() { - return ioStreamPair; - } - public Params setIoStreamPair(IOStreamPair ioStreamPair) { - this.ioStreamPair = ioStreamPair; - return this; + public DatanodeID getDatanodeID() { + return datanodeID; } } @@ -164,24 +211,27 @@ public class BlockReaderFactory { */ @SuppressWarnings("deprecation") public static BlockReader newBlockReader(Params params) throws IOException { + Preconditions.checkNotNull(params.getPeer()); + Preconditions.checkNotNull(params.getBlock()); + Preconditions.checkNotNull(params.getDatanodeID()); + // First, let's set the read and write timeouts appropriately. + // This will keep us from blocking forever if something goes wrong during + // network communication. + Peer peer = params.getPeer(); + peer.setReadTimeout(params.getConf().socketTimeout); + peer.setWriteTimeout(HdfsServerConstants.WRITE_TIMEOUT); + if (params.getConf().useLegacyBlockReader) { - if (params.getEncryptionKey() != null) { - throw new RuntimeException("Encryption is not supported with the legacy block reader."); - } + // The legacy BlockReader doesn't require that the Peers it uses + // have associated ReadableByteChannels. This makes it easier to use + // with some older Socket classes like, say, SocksSocketImpl. + // + // TODO: create a wrapper class that makes channel-less sockets look like + // they have a channel, so that we can finally remove the legacy + // RemoteBlockReader. See HDFS-2534. return RemoteBlockReader.newBlockReader(params); } else { - Socket sock = params.getSocket(); - if (params.getIoStreamPair() == null) { - params.setIoStreamPair(new IOStreamPair(NetUtils.getInputStream(sock), - NetUtils.getOutputStream(sock, HdfsServerConstants.WRITE_TIMEOUT))); - if (params.getEncryptionKey() != null) { - IOStreamPair encryptedStreams = - DataTransferEncryptor.getEncryptedStreams( - params.getIoStreamPair().out, params.getIoStreamPair().in, - params.getEncryptionKey()); - params.setIoStreamPair(encryptedStreams); - } - } + // The usual block reader. return RemoteBlockReader2.newBlockReader(params); } } @@ -197,4 +247,4 @@ public class BlockReaderFactory { final String poolId, final long blockId) { return s.toString() + ":" + poolId + ":" + blockId; } -} +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java index 8b1f0bdc0a6..eacd902aa2b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java @@ -649,7 +649,7 @@ class BlockReaderLocal implements BlockReader { } @Override - public synchronized void close() throws IOException { + public synchronized void close(PeerCache peerCache) throws IOException { dataIn.close(); if (checksumIn != null) { checksumIn.close(); @@ -675,19 +675,4 @@ class BlockReaderLocal implements BlockReader { public void readFully(byte[] buf, int off, int len) throws IOException { BlockReaderUtil.readFully(this, buf, off, len); } - - @Override - public Socket takeSocket() { - return null; - } - - @Override - public boolean hasSentStatusCode() { - return false; - } - - @Override - public IOStreamPair getStreams() { - return null; - } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java index ba31f9288c5..14dbda71744 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java @@ -191,7 +191,7 @@ public class DFSClient implements java.io.Closeable { final FileSystem.Statistics stats; final int hdfsTimeout; // timeout value for a DFS operation. private final String authority; - final SocketCache socketCache; + final PeerCache peerCache; final Conf dfsClientConf; private Random r = new Random(); private SocketAddress[] localInterfaceAddrs; @@ -433,7 +433,7 @@ public class DFSClient implements java.io.Closeable { Joiner.on(',').join(localInterfaceAddrs) + "]"); } - this.socketCache = SocketCache.getInstance(dfsClientConf.socketCacheCapacity, dfsClientConf.socketCacheExpiry); + this.peerCache = PeerCache.getInstance(dfsClientConf.socketCacheCapacity, dfsClientConf.socketCacheExpiry); } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java index e403a57b0ae..2b59eae6d5e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java @@ -32,12 +32,15 @@ import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import org.apache.commons.io.IOUtils; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.fs.ChecksumException; import org.apache.hadoop.fs.ByteBufferReadable; import org.apache.hadoop.fs.FSInputStream; import org.apache.hadoop.fs.UnresolvedLinkException; -import org.apache.hadoop.hdfs.SocketCache.SocketAndStreams; +import org.apache.hadoop.hdfs.net.EncryptedPeer; +import org.apache.hadoop.hdfs.net.Peer; +import org.apache.hadoop.hdfs.net.TcpPeerServer; import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; @@ -46,6 +49,7 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair; import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; +import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey; import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException; import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException; import org.apache.hadoop.ipc.RPC; @@ -60,7 +64,7 @@ import org.apache.hadoop.security.token.Token; ****************************************************************/ @InterfaceAudience.Private public class DFSInputStream extends FSInputStream implements ByteBufferReadable { - private final SocketCache socketCache; + private final PeerCache peerCache; private final DFSClient dfsClient; private boolean closed = false; @@ -110,7 +114,7 @@ public class DFSInputStream extends FSInputStream implements ByteBufferReadable this.verifyChecksum = verifyChecksum; this.buffersize = buffersize; this.src = src; - this.socketCache = dfsClient.socketCache; + this.peerCache = dfsClient.peerCache; prefetchSize = dfsClient.getConf().prefetchSize; timeWindow = dfsClient.getConf().timeWindow; nCachedConnRetry = dfsClient.getConf().nCachedConnRetry; @@ -424,7 +428,7 @@ public class DFSInputStream extends FSInputStream implements ByteBufferReadable // Will be getting a new BlockReader. if (blockReader != null) { - closeBlockReader(blockReader); + blockReader.close(peerCache); blockReader = null; } @@ -506,7 +510,7 @@ public class DFSInputStream extends FSInputStream implements ByteBufferReadable dfsClient.checkOpen(); if (blockReader != null) { - closeBlockReader(blockReader); + blockReader.close(peerCache); blockReader = null; } super.close(); @@ -833,7 +837,7 @@ public class DFSInputStream extends FSInputStream implements ByteBufferReadable } } finally { if (reader != null) { - closeBlockReader(reader); + reader.close(peerCache); } } // Put chosen node into dead list, continue @@ -841,16 +845,30 @@ public class DFSInputStream extends FSInputStream implements ByteBufferReadable } } - /** - * Close the given BlockReader and cache its socket. - */ - private void closeBlockReader(BlockReader reader) throws IOException { - if (reader.hasSentStatusCode()) { - IOStreamPair ioStreams = reader.getStreams(); - Socket oldSock = reader.takeSocket(); - socketCache.put(oldSock, ioStreams); + private Peer newPeer(InetSocketAddress addr) throws IOException { + Peer peer = null; + boolean success = false; + Socket sock = null; + try { + sock = dfsClient.socketFactory.createSocket(); + NetUtils.connect(sock, addr, + dfsClient.getRandomLocalInterfaceAddr(), + dfsClient.getConf().socketTimeout); + peer = TcpPeerServer.peerFromSocket(sock); + + // Add encryption if configured. + DataEncryptionKey key = dfsClient.getDataEncryptionKey(); + if (key != null) { + peer = new EncryptedPeer(peer, key); + } + success = true; + return peer; + } finally { + if (!success) { + IOUtils.closeQuietly(peer); + IOUtils.closeQuietly(sock); + } } - reader.close(); } /** @@ -896,40 +914,16 @@ public class DFSInputStream extends FSInputStream implements ByteBufferReadable // Allow retry since there is no way of knowing whether the cached socket // is good until we actually use it. for (int retries = 0; retries <= nCachedConnRetry && fromCache; ++retries) { - SocketAndStreams sockAndStreams = null; + Peer peer = null; // Don't use the cache on the last attempt - it's possible that there // are arbitrarily many unusable sockets in the cache, but we don't // want to fail the read. if (retries < nCachedConnRetry) { - sockAndStreams = socketCache.get(dnAddr); + peer = peerCache.get(chosenNode); } - Socket sock; - if (sockAndStreams == null) { + if (peer == null) { + peer = newPeer(dnAddr); fromCache = false; - - sock = dfsClient.socketFactory.createSocket(); - - // TCP_NODELAY is crucial here because of bad interactions between - // Nagle's Algorithm and Delayed ACKs. With connection keepalive - // between the client and DN, the conversation looks like: - // 1. Client -> DN: Read block X - // 2. DN -> Client: data for block X - // 3. Client -> DN: Status OK (successful read) - // 4. Client -> DN: Read block Y - // The fact that step #3 and #4 are both in the client->DN direction - // triggers Nagling. If the DN is using delayed ACKs, this results - // in a delay of 40ms or more. - // - // TCP_NODELAY disables nagling and thus avoids this performance - // disaster. - sock.setTcpNoDelay(true); - - NetUtils.connect(sock, dnAddr, - dfsClient.getRandomLocalInterfaceAddr(), - dfsClient.getConf().socketTimeout); - sock.setSoTimeout(dfsClient.getConf().socketTimeout); - } else { - sock = sockAndStreams.sock; } try { @@ -939,19 +933,13 @@ public class DFSInputStream extends FSInputStream implements ByteBufferReadable setFile(file).setBlock(block).setBlockToken(blockToken). setStartOffset(startOffset).setLen(len). setBufferSize(bufferSize).setVerifyChecksum(verifyChecksum). - setClientName(clientName). - setEncryptionKey(dfsClient.getDataEncryptionKey()). - setIoStreamPair(sockAndStreams == null ? null : sockAndStreams.ioStreams). - setSocket(sock)); + setClientName(clientName).setDatanodeID(chosenNode). + setPeer(peer)); return reader; } catch (IOException ex) { // Our socket is no good. - DFSClient.LOG.debug("Error making BlockReader. Closing stale " + sock, ex); - if (sockAndStreams != null) { - sockAndStreams.close(); - } else { - sock.close(); - } + DFSClient.LOG.debug("Error making BlockReader. Closing stale " + peer, ex); + IOUtils.closeQuietly(peer); err = ex; } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/SocketCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/PeerCache.java similarity index 61% rename from hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/SocketCache.java rename to hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/PeerCache.java index 596b0176c40..09b2ef70b14 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/SocketCache.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/PeerCache.java @@ -18,69 +18,55 @@ package org.apache.hadoop.hdfs; -import java.io.Closeable; -import java.net.Socket; -import java.net.SocketAddress; - import java.util.Iterator; import java.util.List; import java.util.Map.Entry; -import java.io.IOException; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.LinkedListMultimap; import org.apache.commons.logging.Log; -import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair; +import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.hdfs.net.Peer; import org.apache.hadoop.util.Daemon; -import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.Time; /** * A cache of input stream sockets to Data Node. */ -class SocketCache { - private static final Log LOG = LogFactory.getLog(SocketCache.class); +class PeerCache { + private static final Log LOG = LogFactory.getLog(PeerCache.class); + + private static class Value { + private final Peer peer; + private final long time; - @InterfaceAudience.Private - static class SocketAndStreams implements Closeable { - public final Socket sock; - public final IOStreamPair ioStreams; - long createTime; - - public SocketAndStreams(Socket s, IOStreamPair ioStreams) { - this.sock = s; - this.ioStreams = ioStreams; - this.createTime = Time.monotonicNow(); - } - - @Override - public void close() { - if (ioStreams != null) { - IOUtils.closeStream(ioStreams.in); - IOUtils.closeStream(ioStreams.out); - } - IOUtils.closeSocket(sock); + Value(Peer peer, long time) { + this.peer = peer; + this.time = time; } - public long getCreateTime() { - return this.createTime; + Peer getPeer() { + return peer; + } + + long getTime() { + return time; } } private Daemon daemon; /** A map for per user per datanode. */ - private static LinkedListMultimap multimap = + private static LinkedListMultimap multimap = LinkedListMultimap.create(); private static int capacity; private static long expiryPeriod; - private static SocketCache scInstance = new SocketCache(); + private static PeerCache instance = new PeerCache(); private static boolean isInitedOnce = false; - public static synchronized SocketCache getInstance(int c, long e) { + public static synchronized PeerCache getInstance(int c, long e) { // capacity is only initialized once if (isInitedOnce == false) { capacity = c; @@ -102,7 +88,7 @@ class SocketCache { } } - return scInstance; + return instance; } private boolean isDaemonStarted() { @@ -119,44 +105,45 @@ class SocketCache { @Override public void run() { try { - SocketCache.this.run(); + PeerCache.this.run(); } catch(InterruptedException e) { //noop } finally { - SocketCache.this.clear(); + PeerCache.this.clear(); } } @Override public String toString() { - return String.valueOf(SocketCache.this); + return String.valueOf(PeerCache.this); } }); daemon.start(); } /** - * Get a cached socket to the given address. - * @param remote Remote address the socket is connected to. - * @return A socket with unknown state, possibly closed underneath. Or null. + * Get a cached peer connected to the given DataNode. + * @param dnId The DataNode to get a Peer for. + * @return An open Peer connected to the DN, or null if none + * was found. */ - public synchronized SocketAndStreams get(SocketAddress remote) { + public synchronized Peer get(DatanodeID dnId) { if (capacity <= 0) { // disabled return null; } - List sockStreamList = multimap.get(remote); + List sockStreamList = multimap.get(dnId); if (sockStreamList == null) { return null; } - Iterator iter = sockStreamList.iterator(); + Iterator iter = sockStreamList.iterator(); while (iter.hasNext()) { - SocketAndStreams candidate = iter.next(); + Value candidate = iter.next(); iter.remove(); - if (!candidate.sock.isClosed()) { - return candidate; + if (!candidate.getPeer().isClosed()) { + return candidate.getPeer(); } } return null; @@ -166,30 +153,22 @@ class SocketCache { * Give an unused socket to the cache. * @param sock socket not used by anyone. */ - public synchronized void put(Socket sock, IOStreamPair ioStreams) { - - Preconditions.checkNotNull(sock); - SocketAndStreams s = new SocketAndStreams(sock, ioStreams); + public synchronized void put(DatanodeID dnId, Peer peer) { + Preconditions.checkNotNull(dnId); + Preconditions.checkNotNull(peer); + if (peer.isClosed()) return; if (capacity <= 0) { // Cache disabled. - s.close(); + IOUtils.cleanup(LOG, peer); return; } startExpiryDaemon(); - SocketAddress remoteAddr = sock.getRemoteSocketAddress(); - if (remoteAddr == null) { - LOG.warn("Cannot cache (unconnected) socket with no remote address: " + - sock); - IOUtils.closeSocket(sock); - return; - } - if (capacity == multimap.size()) { evictOldest(); } - multimap.put(remoteAddr, s); + multimap.put(dnId, new Value(peer, Time.monotonicNow())); } public synchronized int size() { @@ -201,18 +180,17 @@ class SocketCache { */ private synchronized void evictExpired(long expiryPeriod) { while (multimap.size() != 0) { - Iterator> iter = + Iterator> iter = multimap.entries().iterator(); - Entry entry = iter.next(); + Entry entry = iter.next(); // if oldest socket expired, remove it if (entry == null || - Time.monotonicNow() - entry.getValue().getCreateTime() < + Time.monotonicNow() - entry.getValue().getTime() < expiryPeriod) { break; } + IOUtils.cleanup(LOG, entry.getValue().getPeer()); iter.remove(); - SocketAndStreams s = entry.getValue(); - s.close(); } } @@ -220,16 +198,18 @@ class SocketCache { * Evict the oldest entry in the cache. */ private synchronized void evictOldest() { - Iterator> iter = + // We can get the oldest element immediately, because of an interesting + // property of LinkedListMultimap: its iterator traverses entries in the + // order that they were added. + Iterator> iter = multimap.entries().iterator(); if (!iter.hasNext()) { throw new IllegalStateException("Cannot evict from empty cache! " + "capacity: " + capacity); } - Entry entry = iter.next(); + Entry entry = iter.next(); + IOUtils.cleanup(LOG, entry.getValue().getPeer()); iter.remove(); - SocketAndStreams s = entry.getValue(); - s.close(); } /** @@ -253,9 +233,10 @@ class SocketCache { /** * Empty the cache, and close all sockets. */ - private synchronized void clear() { - for (SocketAndStreams sockAndStream : multimap.values()) { - sockAndStream.close(); + @VisibleForTesting + synchronized void clear() { + for (Value value : multimap.values()) { + IOUtils.cleanup(LOG, value.getPeer()); } multimap.clear(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java index f380d818ba4..4ec8a3f2b9c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java @@ -25,25 +25,20 @@ import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; import java.net.InetSocketAddress; -import java.net.Socket; import java.nio.ByteBuffer; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.fs.FSInputChecker; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.net.Peer; +import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil; -import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair; import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader; import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReadOpChecksumInfoProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; -import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; -import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.DataChecksum; @@ -56,7 +51,8 @@ import org.apache.hadoop.util.DataChecksum; @Deprecated public class RemoteBlockReader extends FSInputChecker implements BlockReader { - Socket dnSock; //for now just sending the status code (e.g. checksumOk) after the read. + private final Peer peer; + private final DatanodeID datanodeID; private final DataInputStream in; private DataChecksum checksum; @@ -126,9 +122,9 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader { // if eos was set in the previous read, send a status code to the DN if (eos && !eosBefore && nRead >= 0) { if (needChecksum()) { - sendReadResult(dnSock, Status.CHECKSUM_OK); + sendReadResult(peer, Status.CHECKSUM_OK); } else { - sendReadResult(dnSock, Status.SUCCESS); + sendReadResult(peer, Status.SUCCESS); } } return nRead; @@ -322,7 +318,8 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader { private RemoteBlockReader(String file, String bpid, long blockId, DataInputStream in, DataChecksum checksum, boolean verifyChecksum, - long startOffset, long firstChunkOffset, long bytesToRead, Socket dnSock) { + long startOffset, long firstChunkOffset, long bytesToRead, + Peer peer, DatanodeID datanodeID) { // Path is used only for printing block and file information in debug super(new Path("/blk_" + blockId + ":" + bpid + ":of:"+ file)/*too non path-like?*/, 1, verifyChecksum, @@ -330,7 +327,8 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader { checksum.getBytesPerChecksum(), checksum.getChecksumSize()); - this.dnSock = dnSock; + this.peer = peer; + this.datanodeID = datanodeID; this.in = in; this.checksum = checksum; this.startOffset = Math.max( startOffset, 0 ); @@ -367,9 +365,8 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader { public static RemoteBlockReader newBlockReader(BlockReaderFactory.Params params) throws IOException { // in and out will be closed when sock is closed (by the caller) - Socket sock = params.getSocket(); final DataOutputStream out = new DataOutputStream(new BufferedOutputStream( - NetUtils.getOutputStream(sock, HdfsServerConstants.WRITE_TIMEOUT))); + params.getPeer().getOutputStream())); new Sender(out).readBlock(params.getBlock(), params.getBlockToken(), params.getClientName(), params.getStartOffset(), params.getLen()); @@ -377,13 +374,13 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader { // Get bytes in block, set streams // DataInputStream in = new DataInputStream( - new BufferedInputStream(NetUtils.getInputStream(sock), + new BufferedInputStream(params.getPeer().getInputStream(), params.getBufferSize())); BlockOpResponseProto status = BlockOpResponseProto.parseFrom( vintPrefixed(in)); - RemoteBlockReader2.checkSuccess(status, sock, params.getBlock(), - params.getFile()); + RemoteBlockReader2.checkSuccess(status, params.getPeer(), + params.getBlock(), params.getFile()); ReadOpChecksumInfoProto checksumInfo = status.getReadOpChecksumInfo(); DataChecksum checksum = DataTransferProtoUtil.fromProto( @@ -402,18 +399,20 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader { return new RemoteBlockReader(params.getFile(), params.getBlock().getBlockPoolId(), params.getBlock().getBlockId(), in, checksum, params.getVerifyChecksum(), - params.getStartOffset(), firstChunkOffset, params.getLen(), sock); + params.getStartOffset(), firstChunkOffset, params.getLen(), + params.getPeer(), params.getDatanodeID()); } @Override - public synchronized void close() throws IOException { + public synchronized void close(PeerCache peerCache) throws IOException { startOffset = -1; checksum = null; - if (dnSock != null) { - dnSock.close(); + if (peerCache != null && sentStatusCode) { + peerCache.put(datanodeID, peer); + } else { + peer.close(); } - - // in will be closed when its Socket is closed. + // in will be closed when its Peer is closed. } @Override @@ -427,37 +426,21 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader { return readFully(this, buf, offset, len); } - @Override - public Socket takeSocket() { - assert hasSentStatusCode() : - "BlockReader shouldn't give back sockets mid-read"; - Socket res = dnSock; - dnSock = null; - return res; - } - - @Override - public boolean hasSentStatusCode() { - return sentStatusCode; - } - /** * When the reader reaches end of the read, it sends a status response * (e.g. CHECKSUM_OK) to the DN. Failure to do so could lead to the DN * closing our connection (which we will re-open), but won't affect * data correctness. */ - void sendReadResult(Socket sock, Status statusCode) { - assert !sentStatusCode : "already sent status code to " + sock; + void sendReadResult(Peer peer, Status statusCode) { + assert !sentStatusCode : "already sent status code to " + peer; try { - RemoteBlockReader2.writeReadResult( - NetUtils.getOutputStream(sock, HdfsServerConstants.WRITE_TIMEOUT), - statusCode); + RemoteBlockReader2.writeReadResult(peer.getOutputStream(), statusCode); sentStatusCode = true; } catch (IOException e) { // It's ok not to be able to send this. But something is probably wrong. LOG.info("Could not send read status (" + statusCode + ") to datanode " + - sock.getInetAddress() + ": " + e.getMessage()); + datanodeID + ": " + e.getMessage()); } } @@ -477,12 +460,4 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader { public int read(ByteBuffer buf) throws IOException { throw new UnsupportedOperationException("readDirect unsupported in RemoteBlockReader"); } - - @Override - public IOStreamPair getStreams() { - // This class doesn't support encryption, which is the only thing this - // method is used for. See HDFS-3637. - return null; - } - } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java index c250a538984..6df4332d950 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java @@ -25,16 +25,15 @@ import java.io.DataOutputStream; import java.io.IOException; import java.io.OutputStream; import java.net.InetSocketAddress; -import java.net.Socket; import java.nio.ByteBuffer; -import java.nio.channels.ReadableByteChannel; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil; -import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair; +import org.apache.hadoop.hdfs.net.Peer; import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader; import org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver; import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; @@ -42,13 +41,11 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseP import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientReadStatusProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReadOpChecksumInfoProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; -import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey; -import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException; -import org.apache.hadoop.net.SocketInputWrapper; -import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.DataChecksum; +import com.google.common.annotations.VisibleForTesting; + /** * This is a wrapper around connection to datanode * and understands checksum, offset etc. @@ -79,11 +76,8 @@ import org.apache.hadoop.util.DataChecksum; public class RemoteBlockReader2 implements BlockReader { static final Log LOG = LogFactory.getLog(RemoteBlockReader2.class); - - Socket dnSock; - // for now just sending the status code (e.g. checksumOk) after the read. - private IOStreamPair ioStreams; - private final ReadableByteChannel in; + private final DatanodeID datanodeID; + private final Peer peer; private DataChecksum checksum; private PacketReceiver packetReceiver = new PacketReceiver(true); @@ -115,6 +109,11 @@ public class RemoteBlockReader2 implements BlockReader { /** Amount of unread data in the current received packet */ int dataLeft = 0; + @VisibleForTesting + public Peer getPeer() { + return peer; + } + @Override public synchronized int read(byte[] buf, int off, int len) throws IOException { @@ -155,7 +154,7 @@ public class RemoteBlockReader2 implements BlockReader { private void readNextPacket() throws IOException { //Read packet headers. - packetReceiver.receiveNextPacket(in); + packetReceiver.receiveNextPacket(peer.getInputStreamChannel()); PacketHeader curHeader = packetReceiver.getHeader(); curDataSlice = packetReceiver.getDataSlice(); @@ -236,7 +235,7 @@ public class RemoteBlockReader2 implements BlockReader { LOG.trace("Reading empty packet at end of read"); } - packetReceiver.receiveNextPacket(in); + packetReceiver.receiveNextPacket(peer.getInputStreamChannel()); PacketHeader trailer = packetReceiver.getHeader(); if (!trailer.isLastPacketInBlock() || @@ -247,11 +246,10 @@ public class RemoteBlockReader2 implements BlockReader { } protected RemoteBlockReader2(BlockReaderFactory.Params params, - DataChecksum checksum, long firstChunkOffset, ReadableByteChannel in) { + DataChecksum checksum, long firstChunkOffset) { // Path is used only for printing block and file information in debug - this.dnSock = params.getSocket(); - this.ioStreams = params.getIoStreamPair(); - this.in = in; + this.datanodeID = params.getDatanodeID(); + this.peer = params.getPeer(); this.checksum = checksum; this.verifyChecksum = params.getVerifyChecksum(); this.startOffset = Math.max( params.getStartOffset(), 0 ); @@ -268,38 +266,19 @@ public class RemoteBlockReader2 implements BlockReader { @Override - public synchronized void close() throws IOException { + public synchronized void close(PeerCache peerCache) throws IOException { packetReceiver.close(); startOffset = -1; checksum = null; - if (dnSock != null) { - dnSock.close(); + if (peerCache != null && sentStatusCode) { + peerCache.put(datanodeID, peer); + } else { + peer.close(); } // in will be closed when its Socket is closed. } - - /** - * Take the socket used to talk to the DN. - */ - @Override - public Socket takeSocket() { - assert hasSentStatusCode() : - "BlockReader shouldn't give back sockets mid-read"; - Socket res = dnSock; - dnSock = null; - return res; - } - - /** - * Whether the BlockReader has reached the end of its input stream - * and successfully sent a status code back to the datanode. - */ - @Override - public boolean hasSentStatusCode() { - return sentStatusCode; - } /** * When the reader reaches end of the read, it sends a status response @@ -308,14 +287,14 @@ public class RemoteBlockReader2 implements BlockReader { * data correctness. */ void sendReadResult(Status statusCode) { - assert !sentStatusCode : "already sent status code to " + dnSock; + assert !sentStatusCode : "already sent status code to " + peer; try { - writeReadResult(ioStreams.out, statusCode); + writeReadResult(peer.getOutputStream(), statusCode); sentStatusCode = true; } catch (IOException e) { // It's ok not to be able to send this. But something is probably wrong. LOG.info("Could not send read status (" + statusCode + ") to datanode " + - dnSock.getInetAddress() + ": " + e.getMessage()); + peer.getRemoteAddressString() + ": " + e.getMessage()); } } @@ -373,29 +352,20 @@ public class RemoteBlockReader2 implements BlockReader { */ public static BlockReader newBlockReader(BlockReaderFactory.Params params) throws IOException { - IOStreamPair ioStreams = params.getIoStreamPair(); - ReadableByteChannel ch; - if (ioStreams.in instanceof SocketInputWrapper) { - ch = ((SocketInputWrapper)ioStreams.in).getReadableByteChannel(); - } else { - ch = (ReadableByteChannel) ioStreams.in; - } - // in and out will be closed when sock is closed (by the caller) final DataOutputStream out = new DataOutputStream(new BufferedOutputStream( - ioStreams.out)); + params.getPeer().getOutputStream())); new Sender(out).readBlock(params.getBlock(), params.getBlockToken(), params.getClientName(), params.getStartOffset(), params.getLen()); // // Get bytes in block // - DataInputStream in = new DataInputStream(ioStreams.in); + DataInputStream in = new DataInputStream(params.getPeer().getInputStream()); BlockOpResponseProto status = BlockOpResponseProto.parseFrom( vintPrefixed(in)); - checkSuccess(status, params.getSocket(), params.getBlock(), - params.getFile()); + checkSuccess(status, params.getPeer(), params.getBlock(), params.getFile()); ReadOpChecksumInfoProto checksumInfo = status.getReadOpChecksumInfo(); DataChecksum checksum = DataTransferProtoUtil.fromProto( @@ -412,33 +382,28 @@ public class RemoteBlockReader2 implements BlockReader { params.getStartOffset() + " for file " + params.getFile()); } - return new RemoteBlockReader2(params, checksum, firstChunkOffset, ch); + return new RemoteBlockReader2(params, checksum, firstChunkOffset); } static void checkSuccess( - BlockOpResponseProto status, Socket sock, + BlockOpResponseProto status, Peer peer, ExtendedBlock block, String file) throws IOException { if (status.getStatus() != Status.SUCCESS) { if (status.getStatus() == Status.ERROR_ACCESS_TOKEN) { throw new InvalidBlockTokenException( "Got access token error for OP_READ_BLOCK, self=" - + sock.getLocalSocketAddress() + ", remote=" - + sock.getRemoteSocketAddress() + ", for file " + file + + peer.getLocalAddressString() + ", remote=" + + peer.getRemoteAddressString() + ", for file " + file + ", for pool " + block.getBlockPoolId() + " block " + block.getBlockId() + "_" + block.getGenerationStamp()); } else { throw new IOException("Got error for OP_READ_BLOCK, self=" - + sock.getLocalSocketAddress() + ", remote=" - + sock.getRemoteSocketAddress() + ", for file " + file + + peer.getLocalAddressString() + ", remote=" + + peer.getRemoteAddressString() + ", for file " + file + ", for pool " + block.getBlockPoolId() + " block " + block.getBlockId() + "_" + block.getGenerationStamp()); } } } - - @Override - public IOStreamPair getStreams() { - return ioStreams; - } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/BasicInetPeer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/BasicInetPeer.java new file mode 100644 index 00000000000..eb2d0c92d9e --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/BasicInetPeer.java @@ -0,0 +1,121 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.net; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.Socket; +import java.nio.channels.ReadableByteChannel; + +/** + * Represents a peer that we communicate with by using a basic Socket + * that has no associated Channel. + * + */ +class BasicInetPeer implements Peer { + private final Socket socket; + private final OutputStream out; + private final InputStream in; + private final boolean isLocal; + + public BasicInetPeer(Socket socket) throws IOException { + this.socket = socket; + this.out = socket.getOutputStream(); + this.in = socket.getInputStream(); + this.isLocal = socket.getInetAddress().equals(socket.getLocalAddress()); + } + + @Override + public ReadableByteChannel getInputStreamChannel() { + /* + * This Socket has no channel, so there's nothing to return here. + */ + return null; + } + + @Override + public void setReadTimeout(int timeoutMs) throws IOException { + socket.setSoTimeout(timeoutMs); + } + + @Override + public int getReceiveBufferSize() throws IOException { + return socket.getReceiveBufferSize(); + } + + @Override + public boolean getTcpNoDelay() throws IOException { + return socket.getTcpNoDelay(); + } + + @Override + public void setWriteTimeout(int timeoutMs) { + /* + * We can't implement write timeouts. :( + * + * Java provides no facility to set a blocking write timeout on a Socket. + * You can simulate a blocking write with a timeout by using + * non-blocking I/O. However, we can't use nio here, because this Socket + * doesn't have an associated Channel. + * + * See http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=4031100 for + * more details. + */ + } + + @Override + public boolean isClosed() { + return socket.isClosed(); + } + + @Override + public void close() throws IOException { + socket.close(); + } + + @Override + public String getRemoteAddressString() { + return socket.getRemoteSocketAddress().toString(); + } + + @Override + public String getLocalAddressString() { + return socket.getLocalSocketAddress().toString(); + } + + @Override + public InputStream getInputStream() throws IOException { + return in; + } + + @Override + public OutputStream getOutputStream() throws IOException { + return out; + } + + @Override + public boolean isLocal() { + return isLocal; + } + + @Override + public String toString() { + return "BasicInetPeer(" + socket.toString() + ")"; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/EncryptedPeer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/EncryptedPeer.java new file mode 100644 index 00000000000..295632ca63d --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/EncryptedPeer.java @@ -0,0 +1,136 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.net; + +import java.io.IOException; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor; +import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair; +import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey; + +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.channels.ReadableByteChannel; + +/** + * Represents a peer that we communicate with by using an encrypted + * communications medium. + */ +@InterfaceAudience.Private +public class EncryptedPeer implements Peer { + private final Peer enclosedPeer; + + /** + * An encrypted InputStream. + */ + private final InputStream in; + + /** + * An encrypted OutputStream. + */ + private final OutputStream out; + + /** + * An encrypted ReadableByteChannel. + */ + private final ReadableByteChannel channel; + + public EncryptedPeer(Peer enclosedPeer, DataEncryptionKey key) + throws IOException { + this.enclosedPeer = enclosedPeer; + IOStreamPair ios = DataTransferEncryptor.getEncryptedStreams( + enclosedPeer.getOutputStream(), enclosedPeer.getInputStream(), key); + this.in = ios.in; + this.out = ios.out; + this.channel = ios.in instanceof ReadableByteChannel ? + (ReadableByteChannel)ios.in : null; + } + + @Override + public ReadableByteChannel getInputStreamChannel() { + return channel; + } + + @Override + public void setReadTimeout(int timeoutMs) throws IOException { + enclosedPeer.setReadTimeout(timeoutMs); + } + + @Override + public int getReceiveBufferSize() throws IOException { + return enclosedPeer.getReceiveBufferSize(); + } + + @Override + public boolean getTcpNoDelay() throws IOException { + return enclosedPeer.getTcpNoDelay(); + } + + @Override + public void setWriteTimeout(int timeoutMs) throws IOException { + enclosedPeer.setWriteTimeout(timeoutMs); + } + + @Override + public boolean isClosed() { + return enclosedPeer.isClosed(); + } + + @Override + public void close() throws IOException { + try { + in.close(); + } finally { + try { + out.close(); + } finally { + enclosedPeer.close(); + } + } + } + + @Override + public String getRemoteAddressString() { + return enclosedPeer.getRemoteAddressString(); + } + + @Override + public String getLocalAddressString() { + return enclosedPeer.getLocalAddressString(); + } + + @Override + public InputStream getInputStream() throws IOException { + return in; + } + + @Override + public OutputStream getOutputStream() throws IOException { + return out; + } + + @Override + public boolean isLocal() { + return enclosedPeer.isLocal(); + } + + @Override + public String toString() { + return "EncryptedPeer(" + enclosedPeer + ")"; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/NioInetPeer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/NioInetPeer.java new file mode 100644 index 00000000000..14b0d01f047 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/NioInetPeer.java @@ -0,0 +1,125 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.net; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.Socket; +import java.nio.channels.ReadableByteChannel; + +import org.apache.hadoop.net.SocketInputStream; +import org.apache.hadoop.net.SocketOutputStream; + +/** + * Represents a peer that we communicate with by using non-blocking I/O + * on a Socket. + */ +class NioInetPeer implements Peer { + private final Socket socket; + + /** + * An InputStream which simulates blocking I/O with timeouts using NIO. + */ + private final SocketInputStream in; + + /** + * An OutputStream which simulates blocking I/O with timeouts using NIO. + */ + private final SocketOutputStream out; + + private final boolean isLocal; + + NioInetPeer(Socket socket) throws IOException { + this.socket = socket; + this.in = new SocketInputStream(socket.getChannel(), 0); + this.out = new SocketOutputStream(socket.getChannel(), 0); + this.isLocal = socket.getInetAddress().equals(socket.getLocalAddress()); + } + + @Override + public ReadableByteChannel getInputStreamChannel() { + return socket.getChannel(); + } + + @Override + public void setReadTimeout(int timeoutMs) throws IOException { + in.setTimeout(timeoutMs); + } + + @Override + public int getReceiveBufferSize() throws IOException { + return socket.getReceiveBufferSize(); + } + + @Override + public boolean getTcpNoDelay() throws IOException { + return socket.getTcpNoDelay(); + } + + @Override + public void setWriteTimeout(int timeoutMs) throws IOException { + out.setTimeout(timeoutMs); + } + + @Override + public boolean isClosed() { + return socket.isClosed(); + } + + @Override + public void close() throws IOException { + // We always close the outermost streams-- in this case, 'in' and 'out' + // Closing either one of these will also close the Socket. + try { + in.close(); + } finally { + out.close(); + } + } + + @Override + public String getRemoteAddressString() { + return socket.getRemoteSocketAddress().toString(); + } + + @Override + public String getLocalAddressString() { + return socket.getLocalSocketAddress().toString(); + } + + @Override + public InputStream getInputStream() throws IOException { + return in; + } + + @Override + public OutputStream getOutputStream() throws IOException { + return out; + } + + @Override + public boolean isLocal() { + return isLocal; + } + + @Override + public String toString() { + return "NioInetPeer(" + socket.toString() + ")"; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/Peer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/Peer.java new file mode 100644 index 00000000000..129ada76116 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/Peer.java @@ -0,0 +1,108 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.net; + +import java.io.Closeable; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.channels.ReadableByteChannel; +import org.apache.hadoop.classification.InterfaceAudience; + +/** + * Represents a connection to a peer. + */ +@InterfaceAudience.Private +public interface Peer extends Closeable { + /** + * @return The input stream channel associated with this + * peer, or null if it has none. + */ + public ReadableByteChannel getInputStreamChannel(); + + /** + * Set the read timeout on this peer. + * + * @param timeoutMs The timeout in milliseconds. + */ + public void setReadTimeout(int timeoutMs) throws IOException; + + /** + * @return The receive buffer size. + */ + public int getReceiveBufferSize() throws IOException; + + /** + * @return True if TCP_NODELAY is turned on. + */ + public boolean getTcpNoDelay() throws IOException; + + /** + * Set the write timeout on this peer. + * + * Note: this is not honored for BasicInetPeer. + * See {@link BasicSocketPeer#setWriteTimeout} for details. + * + * @param timeoutMs The timeout in milliseconds. + */ + public void setWriteTimeout(int timeoutMs) throws IOException; + + /** + * @return true only if the peer is closed. + */ + public boolean isClosed(); + + /** + * Close the peer. + * + * It's safe to re-close a Peer that is already closed. + */ + public void close() throws IOException; + + /** + * @return A string representing the remote end of our + * connection to the peer. + */ + public String getRemoteAddressString(); + + /** + * @return A string representing the local end of our + * connection to the peer. + */ + public String getLocalAddressString(); + + /** + * @return An InputStream associated with the Peer. + * This InputStream will be valid until you close + * this peer with Peer#close. + */ + public InputStream getInputStream() throws IOException; + + /** + * @return An OutputStream associated with the Peer. + * This OutputStream will be valid until you close + * this peer with Peer#close. + */ + public OutputStream getOutputStream() throws IOException; + + /** + * @return True if the peer resides on the same + * computer as we. + */ + public boolean isLocal(); +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/PeerServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/PeerServer.java new file mode 100644 index 00000000000..c7b6b14df49 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/PeerServer.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.net; + +import java.io.Closeable; +import org.apache.hadoop.classification.InterfaceAudience; +import java.io.IOException; +import java.net.SocketTimeoutException; + +@InterfaceAudience.Private +public interface PeerServer extends Closeable { + /** + * Set the receive buffer size of the PeerServer. + * + * @param size The receive buffer size. + */ + public void setReceiveBufferSize(int size) throws IOException; + + /** + * Listens for a connection to be made to this server and accepts + * it. The method blocks until a connection is made. + * + * @exception IOException if an I/O error occurs when waiting for a + * connection. + * @exception SecurityException if a security manager exists and its + * checkAccept method doesn't allow the operation. + * @exception SocketTimeoutException if a timeout was previously set and + * the timeout has been reached. + */ + public Peer accept() throws IOException, SocketTimeoutException; + + /** + * @return A string representation of the address we're + * listening on. + */ + public String getListeningString(); + + /** + * Free the resources associated with this peer server. + * This normally includes sockets, etc. + * + * @throws IOException If there is an error closing the PeerServer + */ + public void close() throws IOException; +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/TcpPeerServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/TcpPeerServer.java new file mode 100644 index 00000000000..29d86634f29 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/TcpPeerServer.java @@ -0,0 +1,156 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.net; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.ServerSocket; +import java.net.Socket; +import java.net.SocketTimeoutException; +import java.nio.channels.ServerSocketChannel; +import java.nio.channels.SocketChannel; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey; +import org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter.SecureResources; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.ipc.Server; + +@InterfaceAudience.Private +public class TcpPeerServer implements PeerServer { + static Log LOG = LogFactory.getLog(TcpPeerServer.class); + + private final ServerSocket serverSocket; + + public static Peer peerFromSocket(Socket socket) + throws IOException { + Peer peer = null; + boolean success = false; + try { + // TCP_NODELAY is crucial here because of bad interactions between + // Nagle's Algorithm and Delayed ACKs. With connection keepalive + // between the client and DN, the conversation looks like: + // 1. Client -> DN: Read block X + // 2. DN -> Client: data for block X + // 3. Client -> DN: Status OK (successful read) + // 4. Client -> DN: Read block Y + // The fact that step #3 and #4 are both in the client->DN direction + // triggers Nagling. If the DN is using delayed ACKs, this results + // in a delay of 40ms or more. + // + // TCP_NODELAY disables nagling and thus avoids this performance + // disaster. + socket.setTcpNoDelay(true); + SocketChannel channel = socket.getChannel(); + if (channel == null) { + peer = new BasicInetPeer(socket); + } else { + peer = new NioInetPeer(socket); + } + success = true; + return peer; + } finally { + if (!success) { + if (peer != null) peer.close(); + socket.close(); + } + } + } + + public static Peer peerFromSocketAndKey(Socket s, + DataEncryptionKey key) throws IOException { + Peer peer = null; + boolean success = false; + try { + peer = peerFromSocket(s); + if (key != null) { + peer = new EncryptedPeer(peer, key); + } + success = true; + return peer; + } finally { + if (!success) { + IOUtils.cleanup(null, peer); + } + } + } + + /** + * Create a non-secure TcpPeerServer. + * + * @param socketWriteTimeout The Socket write timeout in ms. + * @param bindAddr The address to bind to. + * @throws IOException + */ + public TcpPeerServer(int socketWriteTimeout, + InetSocketAddress bindAddr) throws IOException { + this.serverSocket = (socketWriteTimeout > 0) ? + ServerSocketChannel.open().socket() : new ServerSocket(); + Server.bind(serverSocket, bindAddr, 0); + } + + /** + * Create a secure TcpPeerServer. + * + * @param secureResources Security resources. + */ + public TcpPeerServer(SecureResources secureResources) { + this.serverSocket = secureResources.getStreamingSocket(); + } + + /** + * @return the IP address which this TcpPeerServer is listening on. + */ + public InetSocketAddress getStreamingAddr() { + return new InetSocketAddress( + serverSocket.getInetAddress().getHostAddress(), + serverSocket.getLocalPort()); + } + + @Override + public void setReceiveBufferSize(int size) throws IOException { + this.serverSocket.setReceiveBufferSize(size); + } + + @Override + public Peer accept() throws IOException, SocketTimeoutException { + Peer peer = peerFromSocket(serverSocket.accept()); + return peer; + } + + @Override + public String getListeningString() { + return serverSocket.getLocalSocketAddress().toString(); + } + + @Override + public void close() throws IOException { + try { + serverSocket.close(); + } catch(IOException e) { + LOG.error("error closing TcpPeerServer: ", e); + } + } + + @Override + public String toString() { + return "TcpPeerServer(" + getListeningString() + ")"; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java index 2cdfae7193f..3f9edd3352b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java @@ -46,6 +46,8 @@ import org.apache.hadoop.hdfs.BlockReader; import org.apache.hadoop.hdfs.BlockReaderFactory; import org.apache.hadoop.hdfs.DFSClient.Conf; import org.apache.hadoop.hdfs.DFSUtil; +import org.apache.hadoop.hdfs.net.TcpPeerServer; +import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock; @@ -206,12 +208,14 @@ public class JspHelper { // Use the block name for file name. BlockReader blockReader = BlockReaderFactory.newBlockReader( new BlockReaderFactory.Params(new Conf(conf)). - setSocket(s). + setPeer(TcpPeerServer.peerFromSocketAndKey(s, encryptionKey)). setBlockToken(blockToken).setStartOffset(offsetIntoBlock). setLen(amtToRead). - setEncryptionKey(encryptionKey). setFile(BlockReaderFactory.getFileName(addr, poolId, blockId)). - setBlock(new ExtendedBlock(poolId, blockId, 0, genStamp))); + setBlock(new ExtendedBlock(poolId, blockId, 0, genStamp)). + setDatanodeID(new DatanodeID(addr.getAddress().toString(), + addr.getHostName(), poolId, addr.getPort(), 0, 0))); + byte[] buf = new byte[(int)amtToRead]; int readOffset = 0; int retries = 2; @@ -229,8 +233,7 @@ public class JspHelper { amtToRead -= numRead; readOffset += numRead; } - blockReader = null; - s.close(); + blockReader.close(null); out.print(HtmlQuoting.quoteHtmlChars(new String(buf))); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index ab6551f57f5..e7bf6dad44f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -90,6 +90,7 @@ import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.HDFSPolicyProvider; import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.net.TcpPeerServer; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo; import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol; @@ -522,24 +523,19 @@ public class DataNode extends Configured private void initDataXceiver(Configuration conf) throws IOException { // find free port or use privileged port provided - ServerSocket ss; - if (secureResources == null) { - InetSocketAddress addr = DataNode.getStreamingAddr(conf); - ss = (dnConf.socketWriteTimeout > 0) ? - ServerSocketChannel.open().socket() : new ServerSocket(); - Server.bind(ss, addr, 0); + TcpPeerServer tcpPeerServer; + if (secureResources != null) { + tcpPeerServer = new TcpPeerServer(secureResources); } else { - ss = secureResources.getStreamingSocket(); + tcpPeerServer = new TcpPeerServer(dnConf.socketWriteTimeout, + DataNode.getStreamingAddr(conf)); } - ss.setReceiveBufferSize(HdfsConstants.DEFAULT_DATA_SOCKET_SIZE); - - streamingAddr = new InetSocketAddress(ss.getInetAddress().getHostAddress(), - ss.getLocalPort()); - + tcpPeerServer.setReceiveBufferSize(HdfsConstants.DEFAULT_DATA_SOCKET_SIZE); + streamingAddr = tcpPeerServer.getStreamingAddr(); LOG.info("Opened streaming server at " + streamingAddr); this.threadGroup = new ThreadGroup("dataXceiverServer"); this.dataXceiverServer = new Daemon(threadGroup, - new DataXceiverServer(ss, conf, this)); + new DataXceiverServer(tcpPeerServer, conf, this)); this.threadGroup.setDaemon(true); // auto destroy when empty } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java index 31b896caf93..d618c787d13 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java @@ -39,6 +39,7 @@ import java.nio.channels.ClosedChannelException; import java.util.Arrays; import org.apache.commons.logging.Log; +import org.apache.hadoop.hdfs.net.Peer; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.HdfsConstants; @@ -64,7 +65,6 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.MD5Hash; import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.net.SocketInputWrapper; import org.apache.hadoop.security.token.SecretManager.InvalidToken; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.DataChecksum; @@ -79,8 +79,7 @@ class DataXceiver extends Receiver implements Runnable { public static final Log LOG = DataNode.LOG; static final Log ClientTraceLog = DataNode.ClientTraceLog; - private final Socket s; - private final boolean isLocal; //is a local connection? + private final Peer peer; private final String remoteAddress; // address of remote side private final String localAddress; // local address of this daemon private final DataNode datanode; @@ -88,7 +87,7 @@ class DataXceiver extends Receiver implements Runnable { private final DataXceiverServer dataXceiverServer; private final boolean connectToDnViaHostname; private long opStartTime; //the start time of receiving an Op - private final SocketInputWrapper socketIn; + private final InputStream socketIn; private OutputStream socketOut; /** @@ -97,25 +96,23 @@ class DataXceiver extends Receiver implements Runnable { */ private String previousOpClientName; - public static DataXceiver create(Socket s, DataNode dn, + public static DataXceiver create(Peer peer, DataNode dn, DataXceiverServer dataXceiverServer) throws IOException { - return new DataXceiver(s, dn, dataXceiverServer); + return new DataXceiver(peer, dn, dataXceiverServer); } - private DataXceiver(Socket s, - DataNode datanode, + private DataXceiver(Peer peer, DataNode datanode, DataXceiverServer dataXceiverServer) throws IOException { - this.s = s; + this.peer = peer; this.dnConf = datanode.getDnConf(); - this.socketIn = NetUtils.getInputStream(s); - this.socketOut = NetUtils.getOutputStream(s, dnConf.socketWriteTimeout); - this.isLocal = s.getInetAddress().equals(s.getLocalAddress()); + this.socketIn = peer.getInputStream(); + this.socketOut = peer.getOutputStream(); this.datanode = datanode; this.dataXceiverServer = dataXceiverServer; this.connectToDnViaHostname = datanode.getDnConf().connectToDnViaHostname; - remoteAddress = s.getRemoteSocketAddress().toString(); - localAddress = s.getLocalSocketAddress().toString(); + remoteAddress = peer.getRemoteAddressString(); + localAddress = peer.getLocalAddressString(); if (LOG.isDebugEnabled()) { LOG.debug("Number of active connections is: " @@ -155,11 +152,10 @@ class DataXceiver extends Receiver implements Runnable { public void run() { int opsProcessed = 0; Op op = null; - - dataXceiverServer.childSockets.add(s); - + + dataXceiverServer.addPeer(peer); try { - + peer.setWriteTimeout(datanode.getDnConf().socketWriteTimeout); InputStream input = socketIn; if (dnConf.encryptDataTransfer) { IOStreamPair encryptedStreams = null; @@ -169,8 +165,9 @@ class DataXceiver extends Receiver implements Runnable { dnConf.encryptionAlgorithm); } catch (InvalidMagicNumberException imne) { LOG.info("Failed to read expected encryption handshake from client " + - "at " + s.getInetAddress() + ". Perhaps the client is running an " + - "older version of Hadoop which does not support encryption"); + "at " + peer.getRemoteAddressString() + ". Perhaps the client " + + "is running an older version of Hadoop which does not support " + + "encryption"); return; } input = encryptedStreams.in; @@ -189,9 +186,9 @@ class DataXceiver extends Receiver implements Runnable { try { if (opsProcessed != 0) { assert dnConf.socketKeepaliveTimeout > 0; - socketIn.setTimeout(dnConf.socketKeepaliveTimeout); + peer.setReadTimeout(dnConf.socketKeepaliveTimeout); } else { - socketIn.setTimeout(dnConf.socketTimeout); + peer.setReadTimeout(dnConf.socketTimeout); } op = readOp(); } catch (InterruptedIOException ignored) { @@ -202,7 +199,7 @@ class DataXceiver extends Receiver implements Runnable { if (opsProcessed > 0 && (err instanceof EOFException || err instanceof ClosedChannelException)) { if (LOG.isDebugEnabled()) { - LOG.debug("Cached " + s.toString() + " closing after " + opsProcessed + " ops"); + LOG.debug("Cached " + peer + " closing after " + opsProcessed + " ops"); } } else { throw err; @@ -212,13 +209,13 @@ class DataXceiver extends Receiver implements Runnable { // restore normal timeout if (opsProcessed != 0) { - s.setSoTimeout(dnConf.socketTimeout); + peer.setReadTimeout(dnConf.socketTimeout); } opStartTime = now(); processOp(op); ++opsProcessed; - } while (!s.isClosed() && dnConf.socketKeepaliveTimeout > 0); + } while (!peer.isClosed() && dnConf.socketKeepaliveTimeout > 0); } catch (Throwable t) { LOG.error(datanode.getDisplayName() + ":DataXceiver error processing " + ((op == null) ? "unknown" : op.name()) + " operation " + @@ -230,9 +227,8 @@ class DataXceiver extends Receiver implements Runnable { + datanode.getXceiverCount()); } updateCurrentThreadName("Cleaning up"); + dataXceiverServer.closePeer(peer); IOUtils.closeStream(in); - IOUtils.closeSocket(s); - dataXceiverServer.childSockets.remove(s); } } @@ -286,8 +282,9 @@ class DataXceiver extends Receiver implements Runnable { ClientReadStatusProto stat = ClientReadStatusProto.parseFrom( HdfsProtoUtil.vintPrefixed(in)); if (!stat.hasStatus()) { - LOG.warn("Client " + s.getInetAddress() + " did not send a valid status " + - "code after reading. Will close connection."); + LOG.warn("Client " + peer.getRemoteAddressString() + + " did not send a valid status code after reading. " + + "Will close connection."); IOUtils.closeStream(out); } } catch (IOException ioe) { @@ -320,7 +317,7 @@ class DataXceiver extends Receiver implements Runnable { //update metrics datanode.metrics.addReadBlockOp(elapsed()); - datanode.metrics.incrReadsFromClient(isLocal); + datanode.metrics.incrReadsFromClient(peer.isLocal()); } @Override @@ -358,8 +355,8 @@ class DataXceiver extends Receiver implements Runnable { LOG.debug("isDatanode=" + isDatanode + ", isClient=" + isClient + ", isTransfer=" + isTransfer); - LOG.debug("writeBlock receive buf size " + s.getReceiveBufferSize() + - " tcp no delay " + s.getTcpNoDelay()); + LOG.debug("writeBlock receive buf size " + peer.getReceiveBufferSize() + + " tcp no delay " + peer.getTcpNoDelay()); } // We later mutate block's generation stamp and length, but we need to @@ -390,8 +387,8 @@ class DataXceiver extends Receiver implements Runnable { stage != BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) { // open a block receiver blockReceiver = new BlockReceiver(block, in, - s.getRemoteSocketAddress().toString(), - s.getLocalSocketAddress().toString(), + peer.getRemoteAddressString(), + peer.getLocalAddressString(), stage, latestGenerationStamp, minBytesRcvd, maxBytesRcvd, clientname, srcDataNode, datanode, requestedChecksum); } else { @@ -546,7 +543,7 @@ class DataXceiver extends Receiver implements Runnable { //update metrics datanode.metrics.addWriteBlockOp(elapsed()); - datanode.metrics.incrWritesFromClient(isLocal); + datanode.metrics.incrWritesFromClient(peer.isLocal()); } @Override @@ -554,7 +551,7 @@ class DataXceiver extends Receiver implements Runnable { final Token blockToken, final String clientName, final DatanodeInfo[] targets) throws IOException { - checkAccess(null, true, blk, blockToken, + checkAccess(socketOut, true, blk, blockToken, Op.TRANSFER_BLOCK, BlockTokenSecretManager.AccessMode.COPY); previousOpClientName = clientName; updateCurrentThreadName(Op.TRANSFER_BLOCK + " " + blk); @@ -641,8 +638,9 @@ class DataXceiver extends Receiver implements Runnable { } if (!dataXceiverServer.balanceThrottler.acquire()) { // not able to start - String msg = "Not able to copy block " + block.getBlockId() + " to " - + s.getRemoteSocketAddress() + " because threads quota is exceeded."; + String msg = "Not able to copy block " + block.getBlockId() + " " + + "to " + peer.getRemoteAddressString() + " because threads " + + "quota is exceeded."; LOG.info(msg); sendResponse(ERROR, msg); return; @@ -671,7 +669,7 @@ class DataXceiver extends Receiver implements Runnable { datanode.metrics.incrBytesRead((int) read); datanode.metrics.incrBlocksRead(); - LOG.info("Copied " + block + " to " + s.getRemoteSocketAddress()); + LOG.info("Copied " + block + " to " + peer.getRemoteAddressString()); } catch (IOException ioe) { isOpSuccess = false; LOG.info("opCopyBlock " + block + " received exception " + ioe); @@ -716,8 +714,9 @@ class DataXceiver extends Receiver implements Runnable { } if (!dataXceiverServer.balanceThrottler.acquire()) { // not able to start - String msg = "Not able to receive block " + block.getBlockId() + " from " - + s.getRemoteSocketAddress() + " because threads quota is exceeded."; + String msg = "Not able to receive block " + block.getBlockId() + + " from " + peer.getRemoteAddressString() + " because threads " + + "quota is exceeded."; LOG.warn(msg); sendResponse(ERROR, msg); return; @@ -794,7 +793,7 @@ class DataXceiver extends Receiver implements Runnable { // notify name node datanode.notifyNamenodeReceivedBlock(block, delHint); - LOG.info("Moved " + block + " from " + s.getRemoteSocketAddress()); + LOG.info("Moved " + block + " from " + peer.getRemoteAddressString()); } catch (IOException ioe) { opStatus = ERROR; @@ -817,7 +816,7 @@ class DataXceiver extends Receiver implements Runnable { try { sendResponse(opStatus, errMsg); } catch (IOException ioe) { - LOG.warn("Error writing reply back to " + s.getRemoteSocketAddress()); + LOG.warn("Error writing reply back to " + peer.getRemoteAddressString()); } IOUtils.closeStream(proxyOut); IOUtils.closeStream(blockReceiver); @@ -871,7 +870,7 @@ class DataXceiver extends Receiver implements Runnable { } - private void checkAccess(DataOutputStream out, final boolean reply, + private void checkAccess(OutputStream out, final boolean reply, final ExtendedBlock blk, final Token t, final Op op, @@ -886,11 +885,6 @@ class DataXceiver extends Receiver implements Runnable { } catch(InvalidToken e) { try { if (reply) { - if (out == null) { - out = new DataOutputStream( - NetUtils.getOutputStream(s, dnConf.socketWriteTimeout)); - } - BlockOpResponseProto.Builder resp = BlockOpResponseProto.newBuilder() .setStatus(ERROR_ACCESS_TOKEN); if (mode == BlockTokenSecretManager.AccessMode.WRITE) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java index bb0f7fd81b4..2755eb415f8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java @@ -18,18 +18,16 @@ package org.apache.hadoop.hdfs.server.datanode; import java.io.IOException; -import java.net.ServerSocket; -import java.net.Socket; import java.net.SocketTimeoutException; import java.nio.channels.AsynchronousCloseException; -import java.util.Collections; import java.util.HashSet; -import java.util.Iterator; import java.util.Set; import org.apache.commons.logging.Log; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.net.Peer; +import org.apache.hadoop.hdfs.net.PeerServer; import org.apache.hadoop.hdfs.server.balancer.Balancer; import org.apache.hadoop.hdfs.util.DataTransferThrottler; import org.apache.hadoop.io.IOUtils; @@ -45,11 +43,9 @@ import org.apache.hadoop.util.Daemon; class DataXceiverServer implements Runnable { public static final Log LOG = DataNode.LOG; - ServerSocket ss; - DataNode datanode; - // Record all sockets opened for data transfer - Set childSockets = Collections.synchronizedSet( - new HashSet()); + private final PeerServer peerServer; + private final DataNode datanode; + private final Set peers = new HashSet(); /** * Maximal number of concurrent xceivers per node. @@ -109,10 +105,10 @@ class DataXceiverServer implements Runnable { long estimateBlockSize; - DataXceiverServer(ServerSocket ss, Configuration conf, + DataXceiverServer(PeerServer peerServer, Configuration conf, DataNode datanode) { - this.ss = ss; + this.peerServer = peerServer; this.datanode = datanode; this.maxXceiverCount = @@ -130,12 +126,10 @@ class DataXceiverServer implements Runnable { @Override public void run() { + Peer peer = null; while (datanode.shouldRun) { - Socket s = null; try { - s = ss.accept(); - s.setTcpNoDelay(true); - // Timeouts are set within DataXceiver.run() + peer = peerServer.accept(); // Make sure the xceiver count is not exceeded int curXceiverCount = datanode.getXceiverCount(); @@ -146,7 +140,7 @@ class DataXceiverServer implements Runnable { } new Daemon(datanode.threadGroup, - DataXceiver.create(s, datanode, this)) + DataXceiver.create(peer, datanode, this)) .start(); } catch (SocketTimeoutException ignored) { // wake up to see if should continue to run @@ -157,10 +151,10 @@ class DataXceiverServer implements Runnable { LOG.warn(datanode.getDisplayName() + ":DataXceiverServer: ", ace); } } catch (IOException ie) { - IOUtils.closeSocket(s); + IOUtils.cleanup(null, peer); LOG.warn(datanode.getDisplayName() + ":DataXceiverServer: ", ie); } catch (OutOfMemoryError ie) { - IOUtils.closeSocket(s); + IOUtils.cleanup(null, peer); // DataNode can run out of memory if there is too many transfers. // Log the event, Sleep for 30 seconds, other transfers may complete by // then. @@ -176,33 +170,35 @@ class DataXceiverServer implements Runnable { datanode.shouldRun = false; } } + synchronized (this) { + for (Peer p : peers) { + IOUtils.cleanup(LOG, p); + } + } try { - ss.close(); + peerServer.close(); } catch (IOException ie) { LOG.warn(datanode.getDisplayName() + " :DataXceiverServer: close exception", ie); } } - + void kill() { assert datanode.shouldRun == false : "shoudRun should be set to false before killing"; try { - this.ss.close(); + this.peerServer.close(); } catch (IOException ie) { LOG.warn(datanode.getDisplayName() + ":DataXceiverServer.kill(): ", ie); } + } + + synchronized void addPeer(Peer peer) { + peers.add(peer); + } - // close all the sockets that were accepted earlier - synchronized (childSockets) { - for (Iterator it = childSockets.iterator(); - it.hasNext();) { - Socket thissock = it.next(); - try { - thissock.close(); - } catch (IOException e) { - } - } - } + synchronized void closePeer(Peer peer) { + peers.remove(peer); + IOUtils.cleanup(null, peer); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java index 32c643b560b..8d16e950741 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java @@ -43,6 +43,7 @@ import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.hdfs.DFSClient.Conf; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSUtil; +import org.apache.hadoop.hdfs.net.TcpPeerServer; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DirectoryListing; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; @@ -559,13 +560,13 @@ public class NamenodeFsck { blockReader = BlockReaderFactory.newBlockReader( new BlockReaderFactory.Params(new Conf(conf)). - setSocket(s).setBlock(block). + setPeer(TcpPeerServer.peerFromSocketAndKey(s, + namenode.getRpcServer().getDataEncryptionKey())). + setBlock(block). setFile(BlockReaderFactory.getFileName(targetAddr, block.getBlockPoolId(), block.getBlockId())). setBlockToken(lblock.getBlockToken()). - setEncryptionKey(namenode.getRpcServer().getDataEncryptionKey()). - setLen(-1)); - + setDatanodeID(chosenNode)); } catch (IOException ex) { // Put chosen node into dead list, continue LOG.info("Failed to connect to " + targetAddr + ":" + ex); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java index be75e1cd1a5..14b975bb3ec 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java @@ -32,6 +32,7 @@ import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.DFSClient.Conf; +import org.apache.hadoop.hdfs.net.TcpPeerServer; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock; @@ -152,13 +153,13 @@ public class BlockReaderTestUtil { return BlockReaderFactory.newBlockReader( new BlockReaderFactory.Params(new Conf(conf)). - setSocket(sock). + setPeer(TcpPeerServer.peerFromSocket(sock)). setFile(targetAddr.toString() + ":" + block.getBlockId()). setBlock(block).setBlockToken(testBlock.getBlockToken()). setStartOffset(offset).setLen(lenToRead). setBufferSize(conf.getInt( CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY, 4096)). - setVerifyChecksum(true)); + setVerifyChecksum(true).setDatanodeID(nodes[0])); } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientBlockVerification.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientBlockVerification.java index 8dd3d6fd38a..2a0e0a85565 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientBlockVerification.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientBlockVerification.java @@ -61,7 +61,7 @@ public class TestClientBlockVerification { util.getBlockReader(testBlock, 0, FILE_SIZE_K * 1024)); util.readAndCheckEOS(reader, FILE_SIZE_K * 1024, true); verify(reader).sendReadResult(Status.CHECKSUM_OK); - reader.close(); + reader.close(null); } /** @@ -76,7 +76,7 @@ public class TestClientBlockVerification { // We asked the blockreader for the whole file, and only read // half of it, so no CHECKSUM_OK verify(reader, never()).sendReadResult(Status.CHECKSUM_OK); - reader.close(); + reader.close(null); } /** @@ -92,7 +92,7 @@ public class TestClientBlockVerification { // And read half the file util.readAndCheckEOS(reader, FILE_SIZE_K * 1024 / 2, true); verify(reader).sendReadResult(Status.CHECKSUM_OK); - reader.close(); + reader.close(null); } /** @@ -111,7 +111,7 @@ public class TestClientBlockVerification { util.getBlockReader(testBlock, startOffset, length)); util.readAndCheckEOS(reader, length, true); verify(reader).sendReadResult(Status.CHECKSUM_OK); - reader.close(); + reader.close(null); } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java index d9020e0bd0a..b140bae1109 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java @@ -18,28 +18,20 @@ package org.apache.hadoop.hdfs; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertSame; -import static org.junit.Assert.assertTrue; -import static org.mockito.Mockito.spy; import java.io.IOException; import java.net.InetSocketAddress; -import java.net.Socket; -import java.security.PrivilegedExceptionAction; + +import junit.framework.Assert; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; -import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; -import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.hdfs.net.Peer; import org.apache.hadoop.security.token.Token; -import org.junit.AfterClass; -import org.junit.BeforeClass; import org.junit.Test; import org.mockito.Matchers; import org.mockito.Mockito; @@ -55,59 +47,31 @@ public class TestConnCache { static final int BLOCK_SIZE = 4096; static final int FILE_SIZE = 3 * BLOCK_SIZE; - final static int CACHE_SIZE = 4; - final static long CACHE_EXPIRY_MS = 200; - static Configuration conf = null; - static MiniDFSCluster cluster = null; - static FileSystem fs = null; - static SocketCache cache; - - static final Path testFile = new Path("/testConnCache.dat"); - static byte authenticData[] = null; - - static BlockReaderTestUtil util = null; - /** * A mock Answer to remember the BlockReader used. * * It verifies that all invocation to DFSInputStream.getBlockReader() - * use the same socket. + * use the same peer. */ private class MockGetBlockReader implements Answer { public RemoteBlockReader2 reader = null; - private Socket sock = null; + private Peer peer = null; @Override public RemoteBlockReader2 answer(InvocationOnMock invocation) throws Throwable { RemoteBlockReader2 prevReader = reader; reader = (RemoteBlockReader2) invocation.callRealMethod(); - if (sock == null) { - sock = reader.dnSock; + if (peer == null) { + peer = reader.getPeer(); } else if (prevReader != null) { - assertSame("DFSInputStream should use the same socket", - sock, reader.dnSock); + Assert.assertSame("DFSInputStream should use the same peer", + peer, reader.getPeer()); } return reader; } } - @BeforeClass - public static void setupCluster() throws Exception { - final int REPLICATION_FACTOR = 1; - - /* create a socket cache. There is only one socket cache per jvm */ - cache = SocketCache.getInstance(CACHE_SIZE, CACHE_EXPIRY_MS); - - util = new BlockReaderTestUtil(REPLICATION_FACTOR); - cluster = util.getCluster(); - conf = util.getConf(); - fs = cluster.getFileSystem(); - - authenticData = util.writeFile(testFile, FILE_SIZE / 1024); - } - - /** * (Optionally) seek to position, read and verify data. * @@ -117,9 +81,10 @@ public class TestConnCache { long pos, byte[] buffer, int offset, - int length) + int length, + byte[] authenticData) throws IOException { - assertTrue("Test buffer too small", buffer.length >= offset + length); + Assert.assertTrue("Test buffer too small", buffer.length >= offset + length); if (pos >= 0) in.seek(pos); @@ -129,7 +94,7 @@ public class TestConnCache { while (length > 0) { int cnt = in.read(buffer, offset, length); - assertTrue("Error in read", cnt > 0); + Assert.assertTrue("Error in read", cnt > 0); offset += cnt; length -= cnt; } @@ -144,116 +109,23 @@ public class TestConnCache { } } - /** - * Test the SocketCache itself. - */ - @Test - public void testSocketCache() throws Exception { - // Make a client - InetSocketAddress nnAddr = - new InetSocketAddress("localhost", cluster.getNameNodePort()); - DFSClient client = new DFSClient(nnAddr, conf); - - // Find out the DN addr - LocatedBlock block = - client.getNamenode().getBlockLocations( - testFile.toString(), 0, FILE_SIZE) - .getLocatedBlocks().get(0); - DataNode dn = util.getDataNode(block); - InetSocketAddress dnAddr = dn.getXferAddress(); - - - // Make some sockets to the DN - Socket[] dnSockets = new Socket[CACHE_SIZE]; - for (int i = 0; i < dnSockets.length; ++i) { - dnSockets[i] = client.socketFactory.createSocket( - dnAddr.getAddress(), dnAddr.getPort()); - } - - - // Insert a socket to the NN - Socket nnSock = new Socket(nnAddr.getAddress(), nnAddr.getPort()); - cache.put(nnSock, null); - assertSame("Read the write", nnSock, cache.get(nnAddr).sock); - cache.put(nnSock, null); - - // Insert DN socks - for (Socket dnSock : dnSockets) { - cache.put(dnSock, null); - } - - assertEquals("NN socket evicted", null, cache.get(nnAddr)); - assertTrue("Evicted socket closed", nnSock.isClosed()); - - // Lookup the DN socks - for (Socket dnSock : dnSockets) { - assertEquals("Retrieve cached sockets", dnSock, cache.get(dnAddr).sock); - dnSock.close(); - } - - assertEquals("Cache is empty", 0, cache.size()); - } - - - /** - * Test the SocketCache expiry. - * Verify that socket cache entries expire after the set - * expiry time. - */ - @Test - public void testSocketCacheExpiry() throws Exception { - // Make a client - InetSocketAddress nnAddr = - new InetSocketAddress("localhost", cluster.getNameNodePort()); - DFSClient client = new DFSClient(nnAddr, conf); - - // Find out the DN addr - LocatedBlock block = - client.getNamenode().getBlockLocations( - testFile.toString(), 0, FILE_SIZE) - .getLocatedBlocks().get(0); - DataNode dn = util.getDataNode(block); - InetSocketAddress dnAddr = dn.getXferAddress(); - - - // Make some sockets to the DN and put in cache - Socket[] dnSockets = new Socket[CACHE_SIZE]; - for (int i = 0; i < dnSockets.length; ++i) { - dnSockets[i] = client.socketFactory.createSocket( - dnAddr.getAddress(), dnAddr.getPort()); - cache.put(dnSockets[i], null); - } - - // Client side still has the sockets cached - assertEquals(CACHE_SIZE, client.socketCache.size()); - - //sleep for a second and see if it expired - Thread.sleep(CACHE_EXPIRY_MS + 1000); - - // Client side has no sockets cached - assertEquals(0, client.socketCache.size()); - - //sleep for another second and see if - //the daemon thread runs fine on empty cache - Thread.sleep(CACHE_EXPIRY_MS + 1000); - } - - /** * Read a file served entirely from one DN. Seek around and read from * different offsets. And verify that they all use the same socket. - * - * @throws java.io.IOException + * @throws Exception */ @Test @SuppressWarnings("unchecked") - public void testReadFromOneDN() throws IOException { - LOG.info("Starting testReadFromOneDN()"); + public void testReadFromOneDN() throws Exception { + BlockReaderTestUtil util = new BlockReaderTestUtil(1, + new HdfsConfiguration()); + final Path testFile = new Path("/testConnCache.dat"); + byte authenticData[] = util.writeFile(testFile, FILE_SIZE / 1024); DFSClient client = new DFSClient( - new InetSocketAddress("localhost", cluster.getNameNodePort()), conf); - DFSInputStream in = spy(client.open(testFile.toString())); + new InetSocketAddress("localhost", + util.getCluster().getNameNodePort()), util.getConf()); + DFSInputStream in = Mockito.spy(client.open(testFile.toString())); LOG.info("opened " + testFile.toString()); - byte[] dataBuf = new byte[BLOCK_SIZE]; MockGetBlockReader answer = new MockGetBlockReader(); @@ -270,18 +142,15 @@ public class TestConnCache { Matchers.anyString()); // Initial read - pread(in, 0, dataBuf, 0, dataBuf.length); + pread(in, 0, dataBuf, 0, dataBuf.length, authenticData); // Read again and verify that the socket is the same - pread(in, FILE_SIZE - dataBuf.length, dataBuf, 0, dataBuf.length); - pread(in, 1024, dataBuf, 0, dataBuf.length); - pread(in, -1, dataBuf, 0, dataBuf.length); // No seek; just read - pread(in, 64, dataBuf, 0, dataBuf.length / 2); + pread(in, FILE_SIZE - dataBuf.length, dataBuf, 0, dataBuf.length, + authenticData); + pread(in, 1024, dataBuf, 0, dataBuf.length, authenticData); + // No seek; just read + pread(in, -1, dataBuf, 0, dataBuf.length, authenticData); + pread(in, 64, dataBuf, 0, dataBuf.length / 2, authenticData); in.close(); } - - @AfterClass - public static void teardownCluster() throws Exception { - util.shutdown(); - } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferKeepalive.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferKeepalive.java index c8ab6e002fa..9ef0f093ffa 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferKeepalive.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferKeepalive.java @@ -35,6 +35,7 @@ import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties; +import org.apache.hadoop.hdfs.net.Peer; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; @@ -92,13 +93,13 @@ public class TestDataTransferKeepalive { DFSTestUtil.createFile(fs, TEST_FILE, 1L, (short)1, 0L); // Clients that write aren't currently re-used. - assertEquals(0, dfsClient.socketCache.size()); + assertEquals(0, dfsClient.peerCache.size()); assertXceiverCount(0); // Reads the file, so we should get a // cached socket, and should have an xceiver on the other side. DFSTestUtil.readFile(fs, TEST_FILE); - assertEquals(1, dfsClient.socketCache.size()); + assertEquals(1, dfsClient.peerCache.size()); assertXceiverCount(1); // Sleep for a bit longer than the keepalive timeout @@ -109,13 +110,13 @@ public class TestDataTransferKeepalive { // The socket is still in the cache, because we don't // notice that it's closed until we try to read // from it again. - assertEquals(1, dfsClient.socketCache.size()); + assertEquals(1, dfsClient.peerCache.size()); // Take it out of the cache - reading should // give an EOF. - Socket s = dfsClient.socketCache.get(dnAddr).sock; - assertNotNull(s); - assertEquals(-1, NetUtils.getInputStream(s).read()); + Peer peer = dfsClient.peerCache.get(dn.getDatanodeId()); + assertNotNull(peer); + assertEquals(-1, peer.getInputStream().read()); } /** @@ -174,14 +175,14 @@ public class TestDataTransferKeepalive { } DFSClient client = ((DistributedFileSystem)fs).dfs; - assertEquals(5, client.socketCache.size()); + assertEquals(5, client.peerCache.size()); // Let all the xceivers timeout Thread.sleep(1500); assertXceiverCount(0); // Client side still has the sockets cached - assertEquals(5, client.socketCache.size()); + assertEquals(5, client.peerCache.size()); // Reading should not throw an exception. DFSTestUtil.readFile(fs, TEST_FILE); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDisableConnCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDisableConnCache.java new file mode 100644 index 00000000000..f7fb128bb1b --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDisableConnCache.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import static org.junit.Assert.assertEquals; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.junit.Test; + +/** + * This class tests disabling client connection caching in a single node + * mini-cluster. + */ +public class TestDisableConnCache { + static final Log LOG = LogFactory.getLog(TestDisableConnCache.class); + + static final int BLOCK_SIZE = 4096; + static final int FILE_SIZE = 3 * BLOCK_SIZE; + + /** + * Test that the socket cache can be disabled by setting the capacity to + * 0. Regression test for HDFS-3365. + * @throws Exception + */ + @Test + public void testDisableCache() throws Exception { + HdfsConfiguration confWithoutCache = new HdfsConfiguration(); + // Configure a new instance with no peer caching, ensure that it doesn't + // cache anything + confWithoutCache.setInt( + DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY, 0); + BlockReaderTestUtil util = new BlockReaderTestUtil(1, confWithoutCache); + final Path testFile = new Path("/testConnCache.dat"); + util.writeFile(testFile, FILE_SIZE / 1024); + FileSystem fsWithoutCache = FileSystem.newInstance(util.getConf()); + try { + DFSTestUtil.readFile(fsWithoutCache, testFile); + assertEquals(0, ((DistributedFileSystem)fsWithoutCache).dfs.peerCache.size()); + } finally { + fsWithoutCache.close(); + util.shutdown(); + } + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPeerCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPeerCache.java new file mode 100644 index 00000000000..0953c410b03 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPeerCache.java @@ -0,0 +1,218 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.channels.ReadableByteChannel; +import java.util.HashSet; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.hdfs.net.Peer; +import org.junit.Test; + +public class TestPeerCache { + static final Log LOG = LogFactory.getLog(TestPeerCache.class); + + private static final int CAPACITY = 3; + private static final int EXPIRY_PERIOD = 20; + private static PeerCache cache = + PeerCache.getInstance(CAPACITY, EXPIRY_PERIOD); + + private static class FakePeer implements Peer { + private boolean closed = false; + + private DatanodeID dnId; + + public FakePeer(DatanodeID dnId) { + this.dnId = dnId; + } + + @Override + public ReadableByteChannel getInputStreamChannel() { + throw new UnsupportedOperationException(); + } + + @Override + public void setReadTimeout(int timeoutMs) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public int getReceiveBufferSize() throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public boolean getTcpNoDelay() throws IOException { + return false; + } + + @Override + public void setWriteTimeout(int timeoutMs) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isClosed() { + return closed; + } + + @Override + public void close() throws IOException { + closed = true; + } + + @Override + public String getRemoteAddressString() { + return dnId.getInfoAddr(); + } + + @Override + public String getLocalAddressString() { + return "127.0.0.1:123"; + } + + @Override + public InputStream getInputStream() throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public OutputStream getOutputStream() throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isLocal() { + return true; + } + + @Override + public String toString() { + return "FakePeer(dnId=" + dnId + ")"; + } + } + + @Test + public void testAddAndRetrieve() throws Exception { + DatanodeID dnId = new DatanodeID("192.168.0.1", + "fakehostname", "fake_storage_id", + 100, 101, 102); + FakePeer peer = new FakePeer(dnId); + cache.put(dnId, peer); + assertTrue(!peer.isClosed()); + assertEquals(1, cache.size()); + assertEquals(peer, cache.get(dnId)); + assertEquals(0, cache.size()); + cache.clear(); + } + + @Test + public void testExpiry() throws Exception { + DatanodeID dnIds[] = new DatanodeID[CAPACITY]; + FakePeer peers[] = new FakePeer[CAPACITY]; + for (int i = 0; i < CAPACITY; ++i) { + dnIds[i] = new DatanodeID("192.168.0.1", + "fakehostname_" + i, "fake_storage_id", + 100, 101, 102); + peers[i] = new FakePeer(dnIds[i]); + } + for (int i = 0; i < CAPACITY; ++i) { + cache.put(dnIds[i], peers[i]); + } + // Check that the peers are cached + assertEquals(CAPACITY, cache.size()); + + // Wait for the peers to expire + Thread.sleep(EXPIRY_PERIOD * 50); + assertEquals(0, cache.size()); + + // make sure that the peers were closed when they were expired + for (int i = 0; i < CAPACITY; ++i) { + assertTrue(peers[i].isClosed()); + } + + // sleep for another second and see if + // the daemon thread runs fine on empty cache + Thread.sleep(EXPIRY_PERIOD * 50); + cache.clear(); + } + + @Test + public void testEviction() throws Exception { + DatanodeID dnIds[] = new DatanodeID[CAPACITY + 1]; + FakePeer peers[] = new FakePeer[CAPACITY + 1]; + for (int i = 0; i < dnIds.length; ++i) { + dnIds[i] = new DatanodeID("192.168.0.1", + "fakehostname_" + i, "fake_storage_id_" + i, + 100, 101, 102); + peers[i] = new FakePeer(dnIds[i]); + } + for (int i = 0; i < CAPACITY; ++i) { + cache.put(dnIds[i], peers[i]); + } + // Check that the peers are cached + assertEquals(CAPACITY, cache.size()); + + // Add another entry and check that the first entry was evicted + cache.put(dnIds[CAPACITY], peers[CAPACITY]); + assertEquals(CAPACITY, cache.size()); + assertSame(null, cache.get(dnIds[0])); + + // Make sure that the other entries are still there + for (int i = 1; i < CAPACITY; ++i) { + Peer peer = cache.get(dnIds[i]); + assertSame(peers[i], peer); + assertTrue(!peer.isClosed()); + peer.close(); + } + assertEquals(1, cache.size()); + cache.clear(); + } + + @Test + public void testMultiplePeersWithSameDnId() throws Exception { + DatanodeID dnId = new DatanodeID("192.168.0.1", + "fakehostname", "fake_storage_id", + 100, 101, 102); + HashSet peers = new HashSet(CAPACITY); + for (int i = 0; i < CAPACITY; ++i) { + FakePeer peer = new FakePeer(dnId); + peers.add(peer); + cache.put(dnId, peer); + } + // Check that all of the peers ended up in the cache + assertEquals(CAPACITY, cache.size()); + while (!peers.isEmpty()) { + Peer peer = cache.get(dnId); + assertTrue(peer != null); + assertTrue(!peer.isClosed()); + peers.remove(peer); + } + assertEquals(0, cache.size()); + cache.clear(); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestSocketCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestSocketCache.java deleted file mode 100644 index 255d408f824..00000000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestSocketCache.java +++ /dev/null @@ -1,171 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertSame; -import static org.junit.Assert.assertTrue; -import static org.mockito.Mockito.spy; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.net.Socket; -import java.security.PrivilegedExceptionAction; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hdfs.protocol.DatanodeInfo; -import org.apache.hadoop.hdfs.protocol.ExtendedBlock; -import org.apache.hadoop.hdfs.protocol.LocatedBlock; -import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; -import org.apache.hadoop.hdfs.server.datanode.DataNode; -import org.apache.hadoop.security.token.Token; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Test; -import org.mockito.Matchers; -import org.mockito.Mockito; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; - -/** - * This class tests the client connection caching in a single node - * mini-cluster. - */ -public class TestSocketCache { - static final Log LOG = LogFactory.getLog(TestSocketCache.class); - - static final int BLOCK_SIZE = 4096; - static final int FILE_SIZE = 3 * BLOCK_SIZE; - final static int CACHE_SIZE = 4; - final static long CACHE_EXPIRY_MS = 200; - static Configuration conf = null; - static MiniDFSCluster cluster = null; - static FileSystem fs = null; - static SocketCache cache; - - static final Path testFile = new Path("/testConnCache.dat"); - static byte authenticData[] = null; - - static BlockReaderTestUtil util = null; - - - /** - * A mock Answer to remember the BlockReader used. - * - * It verifies that all invocation to DFSInputStream.getBlockReader() - * use the same socket. - */ - private class MockGetBlockReader implements Answer { - public RemoteBlockReader2 reader = null; - private Socket sock = null; - - @Override - public RemoteBlockReader2 answer(InvocationOnMock invocation) throws Throwable { - RemoteBlockReader2 prevReader = reader; - reader = (RemoteBlockReader2) invocation.callRealMethod(); - if (sock == null) { - sock = reader.dnSock; - } else if (prevReader != null) { - assertSame("DFSInputStream should use the same socket", - sock, reader.dnSock); - } - return reader; - } - } - - @BeforeClass - public static void setupCluster() throws Exception { - final int REPLICATION_FACTOR = 1; - - HdfsConfiguration confWithoutCache = new HdfsConfiguration(); - confWithoutCache.setInt( - DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY, 0); - util = new BlockReaderTestUtil(REPLICATION_FACTOR, confWithoutCache); - cluster = util.getCluster(); - conf = util.getConf(); - - authenticData = util.writeFile(testFile, FILE_SIZE / 1024); - } - - - /** - * (Optionally) seek to position, read and verify data. - * - * Seek to specified position if pos is non-negative. - */ - private void pread(DFSInputStream in, - long pos, - byte[] buffer, - int offset, - int length) - throws IOException { - assertTrue("Test buffer too small", buffer.length >= offset + length); - - if (pos >= 0) - in.seek(pos); - - LOG.info("Reading from file of size " + in.getFileLength() + - " at offset " + in.getPos()); - - while (length > 0) { - int cnt = in.read(buffer, offset, length); - assertTrue("Error in read", cnt > 0); - offset += cnt; - length -= cnt; - } - - // Verify - for (int i = 0; i < length; ++i) { - byte actual = buffer[i]; - byte expect = authenticData[(int)pos + i]; - assertEquals("Read data mismatch at file offset " + (pos + i) + - ". Expects " + expect + "; got " + actual, - actual, expect); - } - } - - - /** - * Test that the socket cache can be disabled by setting the capacity to - * 0. Regression test for HDFS-3365. - */ - @Test - public void testDisableCache() throws IOException { - LOG.info("Starting testDisableCache()"); - - // Configure a new instance with no caching, ensure that it doesn't - // cache anything - - FileSystem fsWithoutCache = FileSystem.newInstance(conf); - try { - DFSTestUtil.readFile(fsWithoutCache, testFile); - assertEquals(0, ((DistributedFileSystem)fsWithoutCache).dfs.socketCache.size()); - } finally { - fsWithoutCache.close(); - } - } - - @AfterClass - public static void teardownCluster() throws Exception { - util.shutdown(); - } -} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java index 0dad0648879..bc1affbc1f5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java @@ -43,6 +43,7 @@ import org.apache.hadoop.hdfs.DFSClient.Conf; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.net.TcpPeerServer; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock; @@ -147,9 +148,10 @@ public class TestBlockTokenWithDFS { "test-blockpoolid", block.getBlockId()); blockReader = BlockReaderFactory.newBlockReader( new BlockReaderFactory.Params(new Conf(conf)). - setSocket(s).setBlock(block).setFile(file). + setPeer(TcpPeerServer.peerFromSocket(s)). + setBlock(block).setFile(file). setBlockToken(lblock.getBlockToken()).setStartOffset(0). - setLen(-1)); + setLen(-1).setDatanodeID(nodes[0])); } catch (IOException ex) { if (ex instanceof InvalidBlockTokenException) { assertFalse("OP_READ_BLOCK: access token is invalid, " diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java index 10bcc3d7da0..0a20ffcd85d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java @@ -39,6 +39,7 @@ import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.net.TcpPeerServer; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; @@ -284,8 +285,9 @@ public class TestDataNodeVolumeFailure { setFile(BlockReaderFactory.getFileName(targetAddr, "test-blockpoolid", block.getBlockId())). setBlock(block).setBlockToken(lblock.getBlockToken()). - setSocket(s)); - blockReader.close(); + setPeer(TcpPeerServer.peerFromSocket(s)). + setDatanodeID(datanode)); + blockReader.close(null); } /**