HDFS-8951. Move the shortcircuit package to hdfs-client. Contributed by Mingliang Liu.

This commit is contained in:
Haohui Mai 2015-08-26 14:02:48 -07:00
parent a4d9acc51d
commit c992bcf9c1
15 changed files with 129 additions and 73 deletions

View File

@ -36,11 +36,13 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.io.UnsupportedEncodingException; import java.io.UnsupportedEncodingException;
import java.net.InetAddress;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.text.SimpleDateFormat; import java.text.SimpleDateFormat;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.Date; import java.util.Date;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Locale; import java.util.Locale;
import java.util.Map; import java.util.Map;
@ -429,4 +431,28 @@ public class DFSUtilClient {
new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ssZ", Locale.ENGLISH); new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ssZ", Locale.ENGLISH);
return df.format(date); return df.format(date);
} }
private static final Map<String, Boolean> localAddrMap = Collections
.synchronizedMap(new HashMap<String, Boolean>());
public static boolean isLocalAddress(InetSocketAddress targetAddr) {
InetAddress addr = targetAddr.getAddress();
Boolean cached = localAddrMap.get(addr.getHostAddress());
if (cached != null) {
if (LOG.isTraceEnabled()) {
LOG.trace("Address " + targetAddr +
(cached ? " is local" : " is not local"));
}
return cached;
}
boolean local = NetUtils.isLocalAddress(addr);
if (LOG.isTraceEnabled()) {
LOG.trace("Address " + targetAddr +
(local ? " is local" : " is not local"));
}
localAddrMap.put(addr.getHostAddress(), local);
return local;
}
} }

View File

@ -29,17 +29,15 @@ import java.io.RandomAccessFile;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.FileChannel; import java.nio.channels.FileChannel;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.DataChecksum;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** /**
@ -50,7 +48,8 @@ import com.google.common.annotations.VisibleForTesting;
@InterfaceAudience.Private @InterfaceAudience.Private
@InterfaceStability.Evolving @InterfaceStability.Evolving
public class BlockMetadataHeader { public class BlockMetadataHeader {
private static final Log LOG = LogFactory.getLog(BlockMetadataHeader.class); private static final Logger LOG = LoggerFactory.getLogger(
BlockMetadataHeader.class);
public static final short VERSION = 1; public static final short VERSION = 1;
@ -62,8 +61,6 @@ public class BlockMetadataHeader {
private final short version; private final short version;
private DataChecksum checksum = null; private DataChecksum checksum = null;
private static final HdfsConfiguration conf = new HdfsConfiguration();
@VisibleForTesting @VisibleForTesting
public BlockMetadataHeader(short version, DataChecksum checksum) { public BlockMetadataHeader(short version, DataChecksum checksum) {
this.checksum = checksum; this.checksum = checksum;
@ -84,11 +81,12 @@ public class BlockMetadataHeader {
* Read the checksum header from the meta file. * Read the checksum header from the meta file.
* @return the data checksum obtained from the header. * @return the data checksum obtained from the header.
*/ */
public static DataChecksum readDataChecksum(File metaFile) throws IOException { public static DataChecksum readDataChecksum(File metaFile, int bufSize)
throws IOException {
DataInputStream in = null; DataInputStream in = null;
try { try {
in = new DataInputStream(new BufferedInputStream( in = new DataInputStream(new BufferedInputStream(
new FileInputStream(metaFile), DFSUtil.getIoFileBufferSize(conf))); new FileInputStream(metaFile), bufSize));
return readDataChecksum(in, metaFile); return readDataChecksum(in, metaFile);
} finally { } finally {
IOUtils.closeStream(in); IOUtils.closeStream(in);

View File

@ -22,15 +22,15 @@ import org.apache.hadoop.classification.InterfaceAudience;
import java.io.Closeable; import java.io.Closeable;
import java.nio.MappedByteBuffer; import java.nio.MappedByteBuffer;
import org.apache.commons.logging.Log; import org.slf4j.Logger;
import org.apache.commons.logging.LogFactory; import org.slf4j.LoggerFactory;
/** /**
* A reference to a memory-mapped region used by an HDFS client. * A reference to a memory-mapped region used by an HDFS client.
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public class ClientMmap implements Closeable { public class ClientMmap implements Closeable {
static final Log LOG = LogFactory.getLog(ClientMmap.class); static final Logger LOG = LoggerFactory.getLogger(ClientMmap.class);
/** /**
* A reference to the block replica which this mmap relates to. * A reference to the block replica which this mmap relates to.

View File

@ -23,10 +23,8 @@ import java.util.concurrent.TimeUnit;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.client.impl.DfsClientConf.ShortCircuitConf; import org.apache.hadoop.hdfs.client.impl.DfsClientConf.ShortCircuitConf;
import org.apache.hadoop.net.unix.DomainSocket; import org.apache.hadoop.net.unix.DomainSocket;
@ -36,8 +34,12 @@ import com.google.common.base.Preconditions;
import com.google.common.cache.Cache; import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class DomainSocketFactory { public class DomainSocketFactory {
private static final Log LOG = LogFactory.getLog(DomainSocketFactory.class); private static final Logger LOG = LoggerFactory.getLogger(
DomainSocketFactory.class);
public enum PathState { public enum PathState {
UNUSABLE(false, false), UNUSABLE(false, false),
@ -145,7 +147,7 @@ public class DomainSocketFactory {
return PathInfo.NOT_CONFIGURED; return PathInfo.NOT_CONFIGURED;
} }
// UNIX domain sockets can only be used to talk to local peers // UNIX domain sockets can only be used to talk to local peers
if (!DFSClient.isLocalAddress(addr)) return PathInfo.NOT_CONFIGURED; if (!DFSUtilClient.isLocalAddress(addr)) return PathInfo.NOT_CONFIGURED;
String escapedPath = DomainSocket.getEffectivePath( String escapedPath = DomainSocket.getEffectivePath(
conf.getDomainSocketPath(), addr.getPort()); conf.getDomainSocketPath(), addr.getPort());
PathState status = pathMap.getIfPresent(escapedPath); PathState status = pathMap.getIfPresent(escapedPath);

View File

@ -34,8 +34,6 @@ import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.lang.mutable.MutableBoolean; import org.apache.commons.lang.mutable.MutableBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.ExtendedBlockId; import org.apache.hadoop.hdfs.ExtendedBlockId;
import org.apache.hadoop.hdfs.client.impl.DfsClientConf.ShortCircuitConf; import org.apache.hadoop.hdfs.client.impl.DfsClientConf.ShortCircuitConf;
@ -46,7 +44,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReleaseShortCirc
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.protocolPB.PBHelperClient; import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.Slot; import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.Slot;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.hdfs.util.IOUtilsClient;
import org.apache.hadoop.ipc.RetriableException; import org.apache.hadoop.ipc.RetriableException;
import org.apache.hadoop.net.unix.DomainSocket; import org.apache.hadoop.net.unix.DomainSocket;
import org.apache.hadoop.net.unix.DomainSocketWatcher; import org.apache.hadoop.net.unix.DomainSocketWatcher;
@ -59,6 +57,9 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** /**
* The ShortCircuitCache tracks things which the client needs to access * The ShortCircuitCache tracks things which the client needs to access
* HDFS block files via short-circuit. * HDFS block files via short-circuit.
@ -68,7 +69,8 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public class ShortCircuitCache implements Closeable { public class ShortCircuitCache implements Closeable {
public static final Log LOG = LogFactory.getLog(ShortCircuitCache.class); public static final Logger LOG = LoggerFactory.getLogger(
ShortCircuitCache.class);
/** /**
* Expiry thread which makes sure that the file descriptors get closed * Expiry thread which makes sure that the file descriptors get closed
@ -189,14 +191,11 @@ public class ShortCircuitCache implements Closeable {
} }
final DfsClientShm shm = (DfsClientShm)slot.getShm(); final DfsClientShm shm = (DfsClientShm)slot.getShm();
final DomainSocket shmSock = shm.getPeer().getDomainSocket(); final DomainSocket shmSock = shm.getPeer().getDomainSocket();
DomainSocket sock = null;
DataOutputStream out = null;
final String path = shmSock.getPath(); final String path = shmSock.getPath();
boolean success = false; boolean success = false;
try { try (DomainSocket sock = DomainSocket.connect(path);
sock = DomainSocket.connect(path); DataOutputStream out = new DataOutputStream(
out = new DataOutputStream( new BufferedOutputStream(sock.getOutputStream()))) {
new BufferedOutputStream(sock.getOutputStream()));
new Sender(out).releaseShortCircuitFds(slot.getSlotId()); new Sender(out).releaseShortCircuitFds(slot.getSlotId());
DataInputStream in = new DataInputStream(sock.getInputStream()); DataInputStream in = new DataInputStream(sock.getInputStream());
ReleaseShortCircuitAccessResponseProto resp = ReleaseShortCircuitAccessResponseProto resp =
@ -221,7 +220,6 @@ public class ShortCircuitCache implements Closeable {
} else { } else {
shm.getEndpointShmManager().shutdown(shm); shm.getEndpointShmManager().shutdown(shm);
} }
IOUtils.cleanup(LOG, sock, out);
} }
} }
} }
@ -890,7 +888,7 @@ public class ShortCircuitCache implements Closeable {
maxNonMmappedEvictableLifespanMs = 0; maxNonMmappedEvictableLifespanMs = 0;
maxEvictableMmapedSize = 0; maxEvictableMmapedSize = 0;
// Close and join cacheCleaner thread. // Close and join cacheCleaner thread.
IOUtils.cleanup(LOG, cacheCleaner); IOUtilsClient.cleanup(LOG, cacheCleaner);
// Purge all replicas. // Purge all replicas.
while (true) { while (true) {
Entry<Long, ShortCircuitReplica> entry = evictable.firstEntry(); Entry<Long, ShortCircuitReplica> entry = evictable.firstEntry();
@ -933,7 +931,7 @@ public class ShortCircuitCache implements Closeable {
LOG.error("Interrupted while waiting for CleanerThreadPool " LOG.error("Interrupted while waiting for CleanerThreadPool "
+ "to terminate", e); + "to terminate", e);
} }
IOUtils.cleanup(LOG, shmManager); IOUtilsClient.cleanup(LOG, shmManager);
} }
@VisibleForTesting // ONLY for testing @VisibleForTesting // ONLY for testing

View File

@ -23,12 +23,11 @@ import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel; import java.nio.channels.FileChannel;
import java.nio.channels.FileChannel.MapMode; import java.nio.channels.FileChannel.MapMode;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.ExtendedBlockId; import org.apache.hadoop.hdfs.ExtendedBlockId;
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader; import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.Slot; import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.Slot;
import org.apache.hadoop.hdfs.util.IOUtilsClient;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.nativeio.NativeIO; import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Time;
@ -36,6 +35,9 @@ import org.apache.hadoop.util.Time;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** /**
* A ShortCircuitReplica object contains file descriptors for a block that * A ShortCircuitReplica object contains file descriptors for a block that
* we are reading via short-circuit local reads. * we are reading via short-circuit local reads.
@ -46,7 +48,8 @@ import com.google.common.base.Preconditions;
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public class ShortCircuitReplica { public class ShortCircuitReplica {
public static final Log LOG = LogFactory.getLog(ShortCircuitCache.class); public static final Logger LOG = LoggerFactory.getLogger(
ShortCircuitCache.class);
/** /**
* Identifies this ShortCircuitReplica object. * Identifies this ShortCircuitReplica object.
@ -253,7 +256,7 @@ public class ShortCircuitReplica {
suffix += " munmapped."; suffix += " munmapped.";
} }
} }
IOUtils.cleanup(LOG, dataStream, metaStream); IOUtilsClient.cleanup(LOG, dataStream, metaStream);
if (slot != null) { if (slot != null) {
cache.scheduleSlotReleaser(slot); cache.scheduleSlotReleaser(slot);
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {

View File

@ -0,0 +1,46 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.util;
import org.slf4j.Logger;
import java.io.IOException;
public class IOUtilsClient {
/**
* Close the Closeable objects and <b>ignore</b> any {@link IOException} or
* null pointers. Must only be used for cleanup in exception handlers.
*
* @param log the log to record problems to at debug level. Can be null.
* @param closeables the objects to close
*/
public static void cleanup(Logger log, java.io.Closeable... closeables) {
for (java.io.Closeable c : closeables) {
if (c != null) {
try {
c.close();
} catch(Throwable e) {
if (log != null && log.isDebugEnabled()) {
log.debug("Exception in closing " + c, e);
}
}
}
}
}
}

View File

@ -838,6 +838,9 @@ Release 2.8.0 - UNRELEASED
HDFS-8846. Add a unit test for INotify functionality across a layout HDFS-8846. Add a unit test for INotify functionality across a layout
version upgrade (Zhe Zhang via Colin P. McCabe) version upgrade (Zhe Zhang via Colin P. McCabe)
HDFS-8951. Move the shortcircuit package to hdfs-client.
(Mingliang Liu via wheat9)
OPTIMIZATIONS OPTIMIZATIONS
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than

View File

@ -419,7 +419,7 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
LOG.trace(this + ": trying to construct BlockReaderLocalLegacy"); LOG.trace(this + ": trying to construct BlockReaderLocalLegacy");
} }
if (!DFSClient.isLocalAddress(inetSocketAddress)) { if (!DFSUtilClient.isLocalAddress(inetSocketAddress)) {
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
LOG.trace(this + ": can't construct BlockReaderLocalLegacy because " + LOG.trace(this + ": can't construct BlockReaderLocalLegacy because " +
"the address " + inetSocketAddress + " is not local"); "the address " + inetSocketAddress + " is not local");

View File

@ -39,7 +39,6 @@ import java.net.URI;
import java.net.UnknownHostException; import java.net.UnknownHostException;
import java.security.GeneralSecurityException; import java.security.GeneralSecurityException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
@ -703,30 +702,6 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
} }
} }
private static final Map<String, Boolean> localAddrMap = Collections
.synchronizedMap(new HashMap<String, Boolean>());
public static boolean isLocalAddress(InetSocketAddress targetAddr) {
InetAddress addr = targetAddr.getAddress();
Boolean cached = localAddrMap.get(addr.getHostAddress());
if (cached != null) {
if (LOG.isTraceEnabled()) {
LOG.trace("Address " + targetAddr +
(cached ? " is local" : " is not local"));
}
return cached;
}
boolean local = NetUtils.isLocalAddress(addr);
if (LOG.isTraceEnabled()) {
LOG.trace("Address " + targetAddr +
(local ? " is local" : " is not local"));
}
localAddrMap.put(addr.getHostAddress(), local);
return local;
}
/** /**
* Cancel a delegation token * Cancel a delegation token
* @param token the token to cancel * @param token the token to cancel

View File

@ -351,7 +351,7 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader {
checksum.getBytesPerChecksum(), checksum.getBytesPerChecksum(),
checksum.getChecksumSize()); checksum.getChecksumSize());
this.isLocal = DFSClient.isLocalAddress(NetUtils. this.isLocal = DFSUtilClient.isLocalAddress(NetUtils.
createSocketAddr(datanodeID.getXferAddr())); createSocketAddr(datanodeID.getXferAddr()));
this.peer = peer; this.peer = peer;

View File

@ -290,7 +290,7 @@ public class RemoteBlockReader2 implements BlockReader {
DataChecksum checksum, boolean verifyChecksum, DataChecksum checksum, boolean verifyChecksum,
long startOffset, long firstChunkOffset, long bytesToRead, Peer peer, long startOffset, long firstChunkOffset, long bytesToRead, Peer peer,
DatanodeID datanodeID, PeerCache peerCache) { DatanodeID datanodeID, PeerCache peerCache) {
this.isLocal = DFSClient.isLocalAddress(NetUtils. this.isLocal = DFSUtilClient.isLocalAddress(NetUtils.
createSocketAddr(datanodeID.getXferAddr())); createSocketAddr(datanodeID.getXferAddr()));
// Path is used only for printing block and file information in debug // Path is used only for printing block and file information in debug
this.peer = peer; this.peer = peer;

View File

@ -305,7 +305,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
volumes = new FsVolumeList(volumeFailureInfos, datanode.getBlockScanner(), volumes = new FsVolumeList(volumeFailureInfos, datanode.getBlockScanner(),
blockChooserImpl); blockChooserImpl);
asyncDiskService = new FsDatasetAsyncDiskService(datanode, this); asyncDiskService = new FsDatasetAsyncDiskService(datanode, this);
asyncLazyPersistService = new RamDiskAsyncLazyPersistService(datanode); asyncLazyPersistService = new RamDiskAsyncLazyPersistService(datanode, conf);
deletingBlock = new HashMap<String, Set<Long>>(); deletingBlock = new HashMap<String, Set<Long>>();
for (int idx = 0; idx < storage.getNumStorageDirs(); idx++) { for (int idx = 0; idx < storage.getNumStorageDirs(); idx++) {
@ -847,20 +847,20 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
*/ */
static File[] copyBlockFiles(long blockId, long genStamp, File srcMeta, static File[] copyBlockFiles(long blockId, long genStamp, File srcMeta,
File srcFile, File destRoot, boolean calculateChecksum, File srcFile, File destRoot, boolean calculateChecksum,
int smallBufferSize) throws IOException { int smallBufferSize, final Configuration conf) throws IOException {
final File destDir = DatanodeUtil.idToBlockDir(destRoot, blockId); final File destDir = DatanodeUtil.idToBlockDir(destRoot, blockId);
final File dstFile = new File(destDir, srcFile.getName()); final File dstFile = new File(destDir, srcFile.getName());
final File dstMeta = FsDatasetUtil.getMetaFile(dstFile, genStamp); final File dstMeta = FsDatasetUtil.getMetaFile(dstFile, genStamp);
return copyBlockFiles(srcMeta, srcFile, dstMeta, dstFile, calculateChecksum, return copyBlockFiles(srcMeta, srcFile, dstMeta, dstFile, calculateChecksum,
smallBufferSize); smallBufferSize, conf);
} }
static File[] copyBlockFiles(File srcMeta, File srcFile, File dstMeta, static File[] copyBlockFiles(File srcMeta, File srcFile, File dstMeta,
File dstFile, boolean calculateChecksum, File dstFile, boolean calculateChecksum,
int smallBufferSize) int smallBufferSize, final Configuration conf)
throws IOException { throws IOException {
if (calculateChecksum) { if (calculateChecksum) {
computeChecksum(srcMeta, dstMeta, srcFile, smallBufferSize); computeChecksum(srcMeta, dstMeta, srcFile, smallBufferSize, conf);
} else { } else {
try { try {
Storage.nativeCopyFileUnbuffered(srcMeta, dstMeta, true); Storage.nativeCopyFileUnbuffered(srcMeta, dstMeta, true);
@ -924,7 +924,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
File[] blockFiles = copyBlockFiles(block.getBlockId(), File[] blockFiles = copyBlockFiles(block.getBlockId(),
block.getGenerationStamp(), oldMetaFile, oldBlockFile, block.getGenerationStamp(), oldMetaFile, oldBlockFile,
targetVolume.getTmpDir(block.getBlockPoolId()), targetVolume.getTmpDir(block.getBlockPoolId()),
replicaInfo.isOnTransientStorage(), smallBufferSize); replicaInfo.isOnTransientStorage(), smallBufferSize, conf);
ReplicaInfo newReplicaInfo = new ReplicaInPipeline( ReplicaInfo newReplicaInfo = new ReplicaInPipeline(
replicaInfo.getBlockId(), replicaInfo.getGenerationStamp(), replicaInfo.getBlockId(), replicaInfo.getGenerationStamp(),
@ -953,9 +953,10 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
* @throws IOException * @throws IOException
*/ */
private static void computeChecksum(File srcMeta, File dstMeta, private static void computeChecksum(File srcMeta, File dstMeta,
File blockFile, int smallBufferSize) File blockFile, int smallBufferSize, final Configuration conf)
throws IOException { throws IOException {
final DataChecksum checksum = BlockMetadataHeader.readDataChecksum(srcMeta); final DataChecksum checksum = BlockMetadataHeader.readDataChecksum(srcMeta,
DFSUtil.getIoFileBufferSize(conf));
final byte[] data = new byte[1 << 16]; final byte[] data = new byte[1 << 16];
final byte[] crcs = new byte[checksum.getChecksumSize(data.length)]; final byte[] crcs = new byte[checksum.getChecksumSize(data.length)];
@ -2513,7 +2514,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
final File dstMetaFile = FsDatasetUtil.getMetaFile(dstBlockFile, newGS); final File dstMetaFile = FsDatasetUtil.getMetaFile(dstBlockFile, newGS);
return copyBlockFiles(replicaInfo.getMetaFile(), return copyBlockFiles(replicaInfo.getMetaFile(),
replicaInfo.getBlockFile(), replicaInfo.getBlockFile(),
dstMetaFile, dstBlockFile, true, smallBufferSize); dstMetaFile, dstBlockFile, true, smallBufferSize, conf);
} }
} }

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNode;
@ -53,6 +54,8 @@ class RamDiskAsyncLazyPersistService {
private static final long THREADS_KEEP_ALIVE_SECONDS = 60; private static final long THREADS_KEEP_ALIVE_SECONDS = 60;
private final DataNode datanode; private final DataNode datanode;
private final Configuration conf;
private final ThreadGroup threadGroup; private final ThreadGroup threadGroup;
private Map<File, ThreadPoolExecutor> executors private Map<File, ThreadPoolExecutor> executors
= new HashMap<File, ThreadPoolExecutor>(); = new HashMap<File, ThreadPoolExecutor>();
@ -65,8 +68,9 @@ class RamDiskAsyncLazyPersistService {
* The RamDiskAsyncLazyPersistService uses one ThreadPool per volume to do the async * The RamDiskAsyncLazyPersistService uses one ThreadPool per volume to do the async
* disk operations. * disk operations.
*/ */
RamDiskAsyncLazyPersistService(DataNode datanode) { RamDiskAsyncLazyPersistService(DataNode datanode, Configuration conf) {
this.datanode = datanode; this.datanode = datanode;
this.conf = conf;
this.threadGroup = new ThreadGroup(getClass().getSimpleName()); this.threadGroup = new ThreadGroup(getClass().getSimpleName());
} }
@ -240,7 +244,7 @@ class RamDiskAsyncLazyPersistService {
// No FsDatasetImpl lock for the file copy // No FsDatasetImpl lock for the file copy
File targetFiles[] = FsDatasetImpl.copyBlockFiles( File targetFiles[] = FsDatasetImpl.copyBlockFiles(
blockId, genStamp, metaFile, blockFile, lazyPersistDir, true, blockId, genStamp, metaFile, blockFile, lazyPersistDir, true,
smallBufferSize); smallBufferSize, conf);
// Lock FsDataSetImpl during onCompleteLazyPersist callback // Lock FsDataSetImpl during onCompleteLazyPersist callback
dataset.onCompleteLazyPersist(bpId, blockId, dataset.onCompleteLazyPersist(bpId, blockId,