From f0f6f1c7e95b2d2a9ecd44a107c48b9ec965339b Mon Sep 17 00:00:00 2001 From: Haohui Mai Date: Wed, 26 Aug 2015 14:02:48 -0700 Subject: [PATCH] HDFS-8951. Move the shortcircuit package to hdfs-client. Contributed by Mingliang Liu. --- .../org/apache/hadoop/hdfs/DFSUtilClient.java | 26 +++++++++++ .../server/datanode/BlockMetadataHeader.java | 16 +++---- .../hadoop/hdfs/shortcircuit/ClientMmap.java | 6 +-- .../shortcircuit/DomainSocketFactory.java | 12 +++-- .../hdfs/shortcircuit/ShortCircuitCache.java | 24 +++++----- .../shortcircuit/ShortCircuitReplica.java | 11 +++-- .../shortcircuit/ShortCircuitReplicaInfo.java | 0 .../hadoop/hdfs/util/IOUtilsClient.java | 46 +++++++++++++++++++ hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 ++ .../hadoop/hdfs/BlockReaderFactory.java | 2 +- .../org/apache/hadoop/hdfs/DFSClient.java | 25 ---------- .../apache/hadoop/hdfs/RemoteBlockReader.java | 2 +- .../hadoop/hdfs/RemoteBlockReader2.java | 2 +- .../fsdataset/impl/FsDatasetImpl.java | 19 ++++---- .../impl/RamDiskAsyncLazyPersistService.java | 8 +++- 15 files changed, 129 insertions(+), 73 deletions(-) rename hadoop-hdfs-project/{hadoop-hdfs => hadoop-hdfs-client}/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java (93%) rename hadoop-hdfs-project/{hadoop-hdfs => hadoop-hdfs-client}/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ClientMmap.java (93%) rename hadoop-hdfs-project/{hadoop-hdfs => hadoop-hdfs-client}/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DomainSocketFactory.java (95%) rename hadoop-hdfs-project/{hadoop-hdfs => hadoop-hdfs-client}/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitCache.java (98%) rename hadoop-hdfs-project/{hadoop-hdfs => hadoop-hdfs-client}/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitReplica.java (97%) rename hadoop-hdfs-project/{hadoop-hdfs => hadoop-hdfs-client}/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitReplicaInfo.java (100%) create mode 100644 hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/IOUtilsClient.java diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java index fa1f5e6b0e6..3d0acb0792d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java @@ -36,11 +36,13 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.UnsupportedEncodingException; +import java.net.InetAddress; import java.net.InetSocketAddress; import java.text.SimpleDateFormat; import java.util.Collection; import java.util.Collections; import java.util.Date; +import java.util.HashMap; import java.util.List; import java.util.Locale; import java.util.Map; @@ -429,4 +431,28 @@ public class DFSUtilClient { new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ssZ", Locale.ENGLISH); return df.format(date); } + + private static final Map localAddrMap = Collections + .synchronizedMap(new HashMap()); + + public static boolean isLocalAddress(InetSocketAddress targetAddr) { + InetAddress addr = targetAddr.getAddress(); + Boolean cached = localAddrMap.get(addr.getHostAddress()); + if (cached != null) { + if (LOG.isTraceEnabled()) { + LOG.trace("Address " + targetAddr + + (cached ? " is local" : " is not local")); + } + return cached; + } + + boolean local = NetUtils.isLocalAddress(addr); + + if (LOG.isTraceEnabled()) { + LOG.trace("Address " + targetAddr + + (local ? " is local" : " is not local")); + } + localAddrMap.put(addr.getHostAddress(), local); + return local; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java similarity index 93% rename from hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java rename to hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java index 4977fd7069e..d2986904f5d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java @@ -29,17 +29,15 @@ import java.io.RandomAccessFile; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.hdfs.DFSUtil; -import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.util.DataChecksum; import com.google.common.annotations.VisibleForTesting; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** @@ -50,7 +48,8 @@ import com.google.common.annotations.VisibleForTesting; @InterfaceAudience.Private @InterfaceStability.Evolving public class BlockMetadataHeader { - private static final Log LOG = LogFactory.getLog(BlockMetadataHeader.class); + private static final Logger LOG = LoggerFactory.getLogger( + BlockMetadataHeader.class); public static final short VERSION = 1; @@ -62,8 +61,6 @@ public class BlockMetadataHeader { private final short version; private DataChecksum checksum = null; - private static final HdfsConfiguration conf = new HdfsConfiguration(); - @VisibleForTesting public BlockMetadataHeader(short version, DataChecksum checksum) { this.checksum = checksum; @@ -84,11 +81,12 @@ public class BlockMetadataHeader { * Read the checksum header from the meta file. * @return the data checksum obtained from the header. */ - public static DataChecksum readDataChecksum(File metaFile) throws IOException { + public static DataChecksum readDataChecksum(File metaFile, int bufSize) + throws IOException { DataInputStream in = null; try { in = new DataInputStream(new BufferedInputStream( - new FileInputStream(metaFile), DFSUtil.getIoFileBufferSize(conf))); + new FileInputStream(metaFile), bufSize)); return readDataChecksum(in, metaFile); } finally { IOUtils.closeStream(in); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ClientMmap.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ClientMmap.java similarity index 93% rename from hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ClientMmap.java rename to hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ClientMmap.java index 8184fdf3f86..2d871fcd606 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ClientMmap.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ClientMmap.java @@ -22,15 +22,15 @@ import org.apache.hadoop.classification.InterfaceAudience; import java.io.Closeable; import java.nio.MappedByteBuffer; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * A reference to a memory-mapped region used by an HDFS client. */ @InterfaceAudience.Private public class ClientMmap implements Closeable { - static final Log LOG = LogFactory.getLog(ClientMmap.class); + static final Logger LOG = LoggerFactory.getLogger(ClientMmap.class); /** * A reference to the block replica which this mmap relates to. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DomainSocketFactory.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DomainSocketFactory.java similarity index 95% rename from hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DomainSocketFactory.java rename to hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DomainSocketFactory.java index 992d8b41add..6a7d39d9059 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DomainSocketFactory.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DomainSocketFactory.java @@ -23,10 +23,8 @@ import java.util.concurrent.TimeUnit; import com.google.common.annotations.VisibleForTesting; import org.apache.commons.io.IOUtils; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.HadoopIllegalArgumentException; -import org.apache.hadoop.hdfs.DFSClient; +import org.apache.hadoop.hdfs.DFSUtilClient; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; import org.apache.hadoop.hdfs.client.impl.DfsClientConf.ShortCircuitConf; import org.apache.hadoop.net.unix.DomainSocket; @@ -36,8 +34,12 @@ import com.google.common.base.Preconditions; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + public class DomainSocketFactory { - private static final Log LOG = LogFactory.getLog(DomainSocketFactory.class); + private static final Logger LOG = LoggerFactory.getLogger( + DomainSocketFactory.class); public enum PathState { UNUSABLE(false, false), @@ -145,7 +147,7 @@ public class DomainSocketFactory { return PathInfo.NOT_CONFIGURED; } // UNIX domain sockets can only be used to talk to local peers - if (!DFSClient.isLocalAddress(addr)) return PathInfo.NOT_CONFIGURED; + if (!DFSUtilClient.isLocalAddress(addr)) return PathInfo.NOT_CONFIGURED; String escapedPath = DomainSocket.getEffectivePath( conf.getDomainSocketPath(), addr.getPort()); PathState status = pathMap.getIfPresent(escapedPath); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitCache.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitCache.java similarity index 98% rename from hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitCache.java rename to hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitCache.java index 15b8dea8e0e..52c1a6eeb97 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitCache.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitCache.java @@ -34,8 +34,6 @@ import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; import org.apache.commons.lang.mutable.MutableBoolean; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hdfs.ExtendedBlockId; import org.apache.hadoop.hdfs.client.impl.DfsClientConf.ShortCircuitConf; @@ -46,7 +44,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReleaseShortCirc import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; import org.apache.hadoop.hdfs.protocolPB.PBHelperClient; import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.Slot; -import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.hdfs.util.IOUtilsClient; import org.apache.hadoop.ipc.RetriableException; import org.apache.hadoop.net.unix.DomainSocket; import org.apache.hadoop.net.unix.DomainSocketWatcher; @@ -59,6 +57,9 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + /** * The ShortCircuitCache tracks things which the client needs to access * HDFS block files via short-circuit. @@ -68,7 +69,8 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; */ @InterfaceAudience.Private public class ShortCircuitCache implements Closeable { - public static final Log LOG = LogFactory.getLog(ShortCircuitCache.class); + public static final Logger LOG = LoggerFactory.getLogger( + ShortCircuitCache.class); /** * Expiry thread which makes sure that the file descriptors get closed @@ -189,14 +191,11 @@ public class ShortCircuitCache implements Closeable { } final DfsClientShm shm = (DfsClientShm)slot.getShm(); final DomainSocket shmSock = shm.getPeer().getDomainSocket(); - DomainSocket sock = null; - DataOutputStream out = null; final String path = shmSock.getPath(); boolean success = false; - try { - sock = DomainSocket.connect(path); - out = new DataOutputStream( - new BufferedOutputStream(sock.getOutputStream())); + try (DomainSocket sock = DomainSocket.connect(path); + DataOutputStream out = new DataOutputStream( + new BufferedOutputStream(sock.getOutputStream()))) { new Sender(out).releaseShortCircuitFds(slot.getSlotId()); DataInputStream in = new DataInputStream(sock.getInputStream()); ReleaseShortCircuitAccessResponseProto resp = @@ -221,7 +220,6 @@ public class ShortCircuitCache implements Closeable { } else { shm.getEndpointShmManager().shutdown(shm); } - IOUtils.cleanup(LOG, sock, out); } } } @@ -890,7 +888,7 @@ public class ShortCircuitCache implements Closeable { maxNonMmappedEvictableLifespanMs = 0; maxEvictableMmapedSize = 0; // Close and join cacheCleaner thread. - IOUtils.cleanup(LOG, cacheCleaner); + IOUtilsClient.cleanup(LOG, cacheCleaner); // Purge all replicas. while (true) { Entry entry = evictable.firstEntry(); @@ -933,7 +931,7 @@ public class ShortCircuitCache implements Closeable { LOG.error("Interrupted while waiting for CleanerThreadPool " + "to terminate", e); } - IOUtils.cleanup(LOG, shmManager); + IOUtilsClient.cleanup(LOG, shmManager); } @VisibleForTesting // ONLY for testing diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitReplica.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitReplica.java similarity index 97% rename from hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitReplica.java rename to hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitReplica.java index 1390cf3eee1..37566e2ab60 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitReplica.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitReplica.java @@ -23,12 +23,11 @@ import java.nio.MappedByteBuffer; import java.nio.channels.FileChannel; import java.nio.channels.FileChannel.MapMode; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hdfs.ExtendedBlockId; import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader; import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.Slot; +import org.apache.hadoop.hdfs.util.IOUtilsClient; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.nativeio.NativeIO; import org.apache.hadoop.util.Time; @@ -36,6 +35,9 @@ import org.apache.hadoop.util.Time; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + /** * A ShortCircuitReplica object contains file descriptors for a block that * we are reading via short-circuit local reads. @@ -46,7 +48,8 @@ import com.google.common.base.Preconditions; */ @InterfaceAudience.Private public class ShortCircuitReplica { - public static final Log LOG = LogFactory.getLog(ShortCircuitCache.class); + public static final Logger LOG = LoggerFactory.getLogger( + ShortCircuitCache.class); /** * Identifies this ShortCircuitReplica object. @@ -253,7 +256,7 @@ public class ShortCircuitReplica { suffix += " munmapped."; } } - IOUtils.cleanup(LOG, dataStream, metaStream); + IOUtilsClient.cleanup(LOG, dataStream, metaStream); if (slot != null) { cache.scheduleSlotReleaser(slot); if (LOG.isTraceEnabled()) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitReplicaInfo.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitReplicaInfo.java similarity index 100% rename from hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitReplicaInfo.java rename to hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitReplicaInfo.java diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/IOUtilsClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/IOUtilsClient.java new file mode 100644 index 00000000000..56f8ecc4175 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/IOUtilsClient.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.util; + +import org.slf4j.Logger; + +import java.io.IOException; + +public class IOUtilsClient { + /** + * Close the Closeable objects and ignore any {@link IOException} or + * null pointers. Must only be used for cleanup in exception handlers. + * + * @param log the log to record problems to at debug level. Can be null. + * @param closeables the objects to close + */ + public static void cleanup(Logger log, java.io.Closeable... closeables) { + for (java.io.Closeable c : closeables) { + if (c != null) { + try { + c.close(); + } catch(Throwable e) { + if (log != null && log.isDebugEnabled()) { + log.debug("Exception in closing " + c, e); + } + } + } + } + } + +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 5373e669b9c..5332fc2d530 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -493,6 +493,9 @@ Release 2.8.0 - UNRELEASED HDFS-8846. Add a unit test for INotify functionality across a layout version upgrade (Zhe Zhang via Colin P. McCabe) + HDFS-8951. Move the shortcircuit package to hdfs-client. + (Mingliang Liu via wheat9) + OPTIMIZATIONS HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java index fec6b85ad27..52ba899d0ed 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java @@ -419,7 +419,7 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator { if (LOG.isTraceEnabled()) { LOG.trace(this + ": trying to construct BlockReaderLocalLegacy"); } - if (!DFSClient.isLocalAddress(inetSocketAddress)) { + if (!DFSUtilClient.isLocalAddress(inetSocketAddress)) { if (LOG.isTraceEnabled()) { LOG.trace(this + ": can't construct BlockReaderLocalLegacy because " + "the address " + inetSocketAddress + " is not local"); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java index 12646b5037a..93c5e009aec 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java @@ -40,7 +40,6 @@ import java.net.URI; import java.net.UnknownHostException; import java.security.GeneralSecurityException; import java.util.ArrayList; -import java.util.Collections; import java.util.EnumSet; import java.util.HashMap; import java.util.LinkedHashMap; @@ -707,30 +706,6 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, } } - private static final Map localAddrMap = Collections - .synchronizedMap(new HashMap()); - - public static boolean isLocalAddress(InetSocketAddress targetAddr) { - InetAddress addr = targetAddr.getAddress(); - Boolean cached = localAddrMap.get(addr.getHostAddress()); - if (cached != null) { - if (LOG.isTraceEnabled()) { - LOG.trace("Address " + targetAddr + - (cached ? " is local" : " is not local")); - } - return cached; - } - - boolean local = NetUtils.isLocalAddress(addr); - - if (LOG.isTraceEnabled()) { - LOG.trace("Address " + targetAddr + - (local ? " is local" : " is not local")); - } - localAddrMap.put(addr.getHostAddress(), local); - return local; - } - /** * Cancel a delegation token * @param token the token to cancel diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java index 05a9f2caa07..015e1549740 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java @@ -351,7 +351,7 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader { checksum.getBytesPerChecksum(), checksum.getChecksumSize()); - this.isLocal = DFSClient.isLocalAddress(NetUtils. + this.isLocal = DFSUtilClient.isLocalAddress(NetUtils. createSocketAddr(datanodeID.getXferAddr())); this.peer = peer; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java index 4c23d363d6b..2a77cb6a84d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java @@ -290,7 +290,7 @@ public class RemoteBlockReader2 implements BlockReader { DataChecksum checksum, boolean verifyChecksum, long startOffset, long firstChunkOffset, long bytesToRead, Peer peer, DatanodeID datanodeID, PeerCache peerCache) { - this.isLocal = DFSClient.isLocalAddress(NetUtils. + this.isLocal = DFSUtilClient.isLocalAddress(NetUtils. createSocketAddr(datanodeID.getXferAddr())); // Path is used only for printing block and file information in debug this.peer = peer; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index 2de9a381d91..4034ef890f6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -310,7 +310,7 @@ class FsDatasetImpl implements FsDatasetSpi { volumes = new FsVolumeList(volumeFailureInfos, datanode.getBlockScanner(), blockChooserImpl); asyncDiskService = new FsDatasetAsyncDiskService(datanode, this); - asyncLazyPersistService = new RamDiskAsyncLazyPersistService(datanode); + asyncLazyPersistService = new RamDiskAsyncLazyPersistService(datanode, conf); deletingBlock = new HashMap>(); for (int idx = 0; idx < storage.getNumStorageDirs(); idx++) { @@ -852,20 +852,20 @@ class FsDatasetImpl implements FsDatasetSpi { */ static File[] copyBlockFiles(long blockId, long genStamp, File srcMeta, File srcFile, File destRoot, boolean calculateChecksum, - int smallBufferSize) throws IOException { + int smallBufferSize, final Configuration conf) throws IOException { final File destDir = DatanodeUtil.idToBlockDir(destRoot, blockId); final File dstFile = new File(destDir, srcFile.getName()); final File dstMeta = FsDatasetUtil.getMetaFile(dstFile, genStamp); return copyBlockFiles(srcMeta, srcFile, dstMeta, dstFile, calculateChecksum, - smallBufferSize); + smallBufferSize, conf); } static File[] copyBlockFiles(File srcMeta, File srcFile, File dstMeta, File dstFile, boolean calculateChecksum, - int smallBufferSize) + int smallBufferSize, final Configuration conf) throws IOException { if (calculateChecksum) { - computeChecksum(srcMeta, dstMeta, srcFile, smallBufferSize); + computeChecksum(srcMeta, dstMeta, srcFile, smallBufferSize, conf); } else { try { Storage.nativeCopyFileUnbuffered(srcMeta, dstMeta, true); @@ -929,7 +929,7 @@ class FsDatasetImpl implements FsDatasetSpi { File[] blockFiles = copyBlockFiles(block.getBlockId(), block.getGenerationStamp(), oldMetaFile, oldBlockFile, targetVolume.getTmpDir(block.getBlockPoolId()), - replicaInfo.isOnTransientStorage(), smallBufferSize); + replicaInfo.isOnTransientStorage(), smallBufferSize, conf); ReplicaInfo newReplicaInfo = new ReplicaInPipeline( replicaInfo.getBlockId(), replicaInfo.getGenerationStamp(), @@ -958,9 +958,10 @@ class FsDatasetImpl implements FsDatasetSpi { * @throws IOException */ private static void computeChecksum(File srcMeta, File dstMeta, - File blockFile, int smallBufferSize) + File blockFile, int smallBufferSize, final Configuration conf) throws IOException { - final DataChecksum checksum = BlockMetadataHeader.readDataChecksum(srcMeta); + final DataChecksum checksum = BlockMetadataHeader.readDataChecksum(srcMeta, + DFSUtil.getIoFileBufferSize(conf)); final byte[] data = new byte[1 << 16]; final byte[] crcs = new byte[checksum.getChecksumSize(data.length)]; @@ -2518,7 +2519,7 @@ class FsDatasetImpl implements FsDatasetSpi { final File dstMetaFile = FsDatasetUtil.getMetaFile(dstBlockFile, newGS); return copyBlockFiles(replicaInfo.getMetaFile(), replicaInfo.getBlockFile(), - dstMetaFile, dstBlockFile, true, smallBufferSize); + dstMetaFile, dstBlockFile, true, smallBufferSize, conf); } @Override // FsDatasetSpi diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskAsyncLazyPersistService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskAsyncLazyPersistService.java index 791a711e49e..bf1fd98d5e2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskAsyncLazyPersistService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskAsyncLazyPersistService.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.server.datanode.DataNode; @@ -53,6 +54,8 @@ class RamDiskAsyncLazyPersistService { private static final long THREADS_KEEP_ALIVE_SECONDS = 60; private final DataNode datanode; + private final Configuration conf; + private final ThreadGroup threadGroup; private Map executors = new HashMap(); @@ -65,8 +68,9 @@ class RamDiskAsyncLazyPersistService { * The RamDiskAsyncLazyPersistService uses one ThreadPool per volume to do the async * disk operations. */ - RamDiskAsyncLazyPersistService(DataNode datanode) { + RamDiskAsyncLazyPersistService(DataNode datanode, Configuration conf) { this.datanode = datanode; + this.conf = conf; this.threadGroup = new ThreadGroup(getClass().getSimpleName()); } @@ -240,7 +244,7 @@ class RamDiskAsyncLazyPersistService { // No FsDatasetImpl lock for the file copy File targetFiles[] = FsDatasetImpl.copyBlockFiles( blockId, genStamp, metaFile, blockFile, lazyPersistDir, true, - smallBufferSize); + smallBufferSize, conf); // Lock FsDataSetImpl during onCompleteLazyPersist callback dataset.onCompleteLazyPersist(bpId, blockId,