HDFS-8951. Move the shortcircuit package to hdfs-client. Contributed by Mingliang Liu.
This commit is contained in:
parent
9264b7e119
commit
f0f6f1c7e9
|
@ -36,11 +36,13 @@ import org.slf4j.Logger;
|
|||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.UnsupportedEncodingException;
|
||||
import java.net.InetAddress;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.text.SimpleDateFormat;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.Date;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
|
@ -429,4 +431,28 @@ public class DFSUtilClient {
|
|||
new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ssZ", Locale.ENGLISH);
|
||||
return df.format(date);
|
||||
}
|
||||
|
||||
private static final Map<String, Boolean> localAddrMap = Collections
|
||||
.synchronizedMap(new HashMap<String, Boolean>());
|
||||
|
||||
public static boolean isLocalAddress(InetSocketAddress targetAddr) {
|
||||
InetAddress addr = targetAddr.getAddress();
|
||||
Boolean cached = localAddrMap.get(addr.getHostAddress());
|
||||
if (cached != null) {
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("Address " + targetAddr +
|
||||
(cached ? " is local" : " is not local"));
|
||||
}
|
||||
return cached;
|
||||
}
|
||||
|
||||
boolean local = NetUtils.isLocalAddress(addr);
|
||||
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("Address " + targetAddr +
|
||||
(local ? " is local" : " is not local"));
|
||||
}
|
||||
localAddrMap.put(addr.getHostAddress(), local);
|
||||
return local;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -29,17 +29,15 @@ import java.io.RandomAccessFile;
|
|||
import java.nio.ByteBuffer;
|
||||
import java.nio.channels.FileChannel;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.hdfs.DFSUtil;
|
||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.util.DataChecksum;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
||||
/**
|
||||
|
@ -50,7 +48,8 @@ import com.google.common.annotations.VisibleForTesting;
|
|||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Evolving
|
||||
public class BlockMetadataHeader {
|
||||
private static final Log LOG = LogFactory.getLog(BlockMetadataHeader.class);
|
||||
private static final Logger LOG = LoggerFactory.getLogger(
|
||||
BlockMetadataHeader.class);
|
||||
|
||||
public static final short VERSION = 1;
|
||||
|
||||
|
@ -62,8 +61,6 @@ public class BlockMetadataHeader {
|
|||
private final short version;
|
||||
private DataChecksum checksum = null;
|
||||
|
||||
private static final HdfsConfiguration conf = new HdfsConfiguration();
|
||||
|
||||
@VisibleForTesting
|
||||
public BlockMetadataHeader(short version, DataChecksum checksum) {
|
||||
this.checksum = checksum;
|
||||
|
@ -84,11 +81,12 @@ public class BlockMetadataHeader {
|
|||
* Read the checksum header from the meta file.
|
||||
* @return the data checksum obtained from the header.
|
||||
*/
|
||||
public static DataChecksum readDataChecksum(File metaFile) throws IOException {
|
||||
public static DataChecksum readDataChecksum(File metaFile, int bufSize)
|
||||
throws IOException {
|
||||
DataInputStream in = null;
|
||||
try {
|
||||
in = new DataInputStream(new BufferedInputStream(
|
||||
new FileInputStream(metaFile), DFSUtil.getIoFileBufferSize(conf)));
|
||||
new FileInputStream(metaFile), bufSize));
|
||||
return readDataChecksum(in, metaFile);
|
||||
} finally {
|
||||
IOUtils.closeStream(in);
|
|
@ -22,15 +22,15 @@ import org.apache.hadoop.classification.InterfaceAudience;
|
|||
import java.io.Closeable;
|
||||
import java.nio.MappedByteBuffer;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* A reference to a memory-mapped region used by an HDFS client.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class ClientMmap implements Closeable {
|
||||
static final Log LOG = LogFactory.getLog(ClientMmap.class);
|
||||
static final Logger LOG = LoggerFactory.getLogger(ClientMmap.class);
|
||||
|
||||
/**
|
||||
* A reference to the block replica which this mmap relates to.
|
|
@ -23,10 +23,8 @@ import java.util.concurrent.TimeUnit;
|
|||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.HadoopIllegalArgumentException;
|
||||
import org.apache.hadoop.hdfs.DFSClient;
|
||||
import org.apache.hadoop.hdfs.DFSUtilClient;
|
||||
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
||||
import org.apache.hadoop.hdfs.client.impl.DfsClientConf.ShortCircuitConf;
|
||||
import org.apache.hadoop.net.unix.DomainSocket;
|
||||
|
@ -36,8 +34,12 @@ import com.google.common.base.Preconditions;
|
|||
import com.google.common.cache.Cache;
|
||||
import com.google.common.cache.CacheBuilder;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
public class DomainSocketFactory {
|
||||
private static final Log LOG = LogFactory.getLog(DomainSocketFactory.class);
|
||||
private static final Logger LOG = LoggerFactory.getLogger(
|
||||
DomainSocketFactory.class);
|
||||
|
||||
public enum PathState {
|
||||
UNUSABLE(false, false),
|
||||
|
@ -145,7 +147,7 @@ public class DomainSocketFactory {
|
|||
return PathInfo.NOT_CONFIGURED;
|
||||
}
|
||||
// UNIX domain sockets can only be used to talk to local peers
|
||||
if (!DFSClient.isLocalAddress(addr)) return PathInfo.NOT_CONFIGURED;
|
||||
if (!DFSUtilClient.isLocalAddress(addr)) return PathInfo.NOT_CONFIGURED;
|
||||
String escapedPath = DomainSocket.getEffectivePath(
|
||||
conf.getDomainSocketPath(), addr.getPort());
|
||||
PathState status = pathMap.getIfPresent(escapedPath);
|
|
@ -34,8 +34,6 @@ import java.util.concurrent.locks.Condition;
|
|||
import java.util.concurrent.locks.ReentrantLock;
|
||||
|
||||
import org.apache.commons.lang.mutable.MutableBoolean;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hdfs.ExtendedBlockId;
|
||||
import org.apache.hadoop.hdfs.client.impl.DfsClientConf.ShortCircuitConf;
|
||||
|
@ -46,7 +44,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReleaseShortCirc
|
|||
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
|
||||
import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
|
||||
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.Slot;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.hdfs.util.IOUtilsClient;
|
||||
import org.apache.hadoop.ipc.RetriableException;
|
||||
import org.apache.hadoop.net.unix.DomainSocket;
|
||||
import org.apache.hadoop.net.unix.DomainSocketWatcher;
|
||||
|
@ -59,6 +57,9 @@ import com.google.common.annotations.VisibleForTesting;
|
|||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* The ShortCircuitCache tracks things which the client needs to access
|
||||
* HDFS block files via short-circuit.
|
||||
|
@ -68,7 +69,8 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
|||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class ShortCircuitCache implements Closeable {
|
||||
public static final Log LOG = LogFactory.getLog(ShortCircuitCache.class);
|
||||
public static final Logger LOG = LoggerFactory.getLogger(
|
||||
ShortCircuitCache.class);
|
||||
|
||||
/**
|
||||
* Expiry thread which makes sure that the file descriptors get closed
|
||||
|
@ -189,14 +191,11 @@ public class ShortCircuitCache implements Closeable {
|
|||
}
|
||||
final DfsClientShm shm = (DfsClientShm)slot.getShm();
|
||||
final DomainSocket shmSock = shm.getPeer().getDomainSocket();
|
||||
DomainSocket sock = null;
|
||||
DataOutputStream out = null;
|
||||
final String path = shmSock.getPath();
|
||||
boolean success = false;
|
||||
try {
|
||||
sock = DomainSocket.connect(path);
|
||||
out = new DataOutputStream(
|
||||
new BufferedOutputStream(sock.getOutputStream()));
|
||||
try (DomainSocket sock = DomainSocket.connect(path);
|
||||
DataOutputStream out = new DataOutputStream(
|
||||
new BufferedOutputStream(sock.getOutputStream()))) {
|
||||
new Sender(out).releaseShortCircuitFds(slot.getSlotId());
|
||||
DataInputStream in = new DataInputStream(sock.getInputStream());
|
||||
ReleaseShortCircuitAccessResponseProto resp =
|
||||
|
@ -221,7 +220,6 @@ public class ShortCircuitCache implements Closeable {
|
|||
} else {
|
||||
shm.getEndpointShmManager().shutdown(shm);
|
||||
}
|
||||
IOUtils.cleanup(LOG, sock, out);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -890,7 +888,7 @@ public class ShortCircuitCache implements Closeable {
|
|||
maxNonMmappedEvictableLifespanMs = 0;
|
||||
maxEvictableMmapedSize = 0;
|
||||
// Close and join cacheCleaner thread.
|
||||
IOUtils.cleanup(LOG, cacheCleaner);
|
||||
IOUtilsClient.cleanup(LOG, cacheCleaner);
|
||||
// Purge all replicas.
|
||||
while (true) {
|
||||
Entry<Long, ShortCircuitReplica> entry = evictable.firstEntry();
|
||||
|
@ -933,7 +931,7 @@ public class ShortCircuitCache implements Closeable {
|
|||
LOG.error("Interrupted while waiting for CleanerThreadPool "
|
||||
+ "to terminate", e);
|
||||
}
|
||||
IOUtils.cleanup(LOG, shmManager);
|
||||
IOUtilsClient.cleanup(LOG, shmManager);
|
||||
}
|
||||
|
||||
@VisibleForTesting // ONLY for testing
|
|
@ -23,12 +23,11 @@ import java.nio.MappedByteBuffer;
|
|||
import java.nio.channels.FileChannel;
|
||||
import java.nio.channels.FileChannel.MapMode;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hdfs.ExtendedBlockId;
|
||||
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
|
||||
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.Slot;
|
||||
import org.apache.hadoop.hdfs.util.IOUtilsClient;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.io.nativeio.NativeIO;
|
||||
import org.apache.hadoop.util.Time;
|
||||
|
@ -36,6 +35,9 @@ import org.apache.hadoop.util.Time;
|
|||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Preconditions;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* A ShortCircuitReplica object contains file descriptors for a block that
|
||||
* we are reading via short-circuit local reads.
|
||||
|
@ -46,7 +48,8 @@ import com.google.common.base.Preconditions;
|
|||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class ShortCircuitReplica {
|
||||
public static final Log LOG = LogFactory.getLog(ShortCircuitCache.class);
|
||||
public static final Logger LOG = LoggerFactory.getLogger(
|
||||
ShortCircuitCache.class);
|
||||
|
||||
/**
|
||||
* Identifies this ShortCircuitReplica object.
|
||||
|
@ -253,7 +256,7 @@ public class ShortCircuitReplica {
|
|||
suffix += " munmapped.";
|
||||
}
|
||||
}
|
||||
IOUtils.cleanup(LOG, dataStream, metaStream);
|
||||
IOUtilsClient.cleanup(LOG, dataStream, metaStream);
|
||||
if (slot != null) {
|
||||
cache.scheduleSlotReleaser(slot);
|
||||
if (LOG.isTraceEnabled()) {
|
|
@ -0,0 +1,46 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.util;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
public class IOUtilsClient {
|
||||
/**
|
||||
* Close the Closeable objects and <b>ignore</b> any {@link IOException} or
|
||||
* null pointers. Must only be used for cleanup in exception handlers.
|
||||
*
|
||||
* @param log the log to record problems to at debug level. Can be null.
|
||||
* @param closeables the objects to close
|
||||
*/
|
||||
public static void cleanup(Logger log, java.io.Closeable... closeables) {
|
||||
for (java.io.Closeable c : closeables) {
|
||||
if (c != null) {
|
||||
try {
|
||||
c.close();
|
||||
} catch(Throwable e) {
|
||||
if (log != null && log.isDebugEnabled()) {
|
||||
log.debug("Exception in closing " + c, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -493,6 +493,9 @@ Release 2.8.0 - UNRELEASED
|
|||
HDFS-8846. Add a unit test for INotify functionality across a layout
|
||||
version upgrade (Zhe Zhang via Colin P. McCabe)
|
||||
|
||||
HDFS-8951. Move the shortcircuit package to hdfs-client.
|
||||
(Mingliang Liu via wheat9)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than
|
||||
|
|
|
@ -419,7 +419,7 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
|
|||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace(this + ": trying to construct BlockReaderLocalLegacy");
|
||||
}
|
||||
if (!DFSClient.isLocalAddress(inetSocketAddress)) {
|
||||
if (!DFSUtilClient.isLocalAddress(inetSocketAddress)) {
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace(this + ": can't construct BlockReaderLocalLegacy because " +
|
||||
"the address " + inetSocketAddress + " is not local");
|
||||
|
|
|
@ -40,7 +40,6 @@ import java.net.URI;
|
|||
import java.net.UnknownHostException;
|
||||
import java.security.GeneralSecurityException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.EnumSet;
|
||||
import java.util.HashMap;
|
||||
import java.util.LinkedHashMap;
|
||||
|
@ -707,30 +706,6 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
|
|||
}
|
||||
}
|
||||
|
||||
private static final Map<String, Boolean> localAddrMap = Collections
|
||||
.synchronizedMap(new HashMap<String, Boolean>());
|
||||
|
||||
public static boolean isLocalAddress(InetSocketAddress targetAddr) {
|
||||
InetAddress addr = targetAddr.getAddress();
|
||||
Boolean cached = localAddrMap.get(addr.getHostAddress());
|
||||
if (cached != null) {
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("Address " + targetAddr +
|
||||
(cached ? " is local" : " is not local"));
|
||||
}
|
||||
return cached;
|
||||
}
|
||||
|
||||
boolean local = NetUtils.isLocalAddress(addr);
|
||||
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("Address " + targetAddr +
|
||||
(local ? " is local" : " is not local"));
|
||||
}
|
||||
localAddrMap.put(addr.getHostAddress(), local);
|
||||
return local;
|
||||
}
|
||||
|
||||
/**
|
||||
* Cancel a delegation token
|
||||
* @param token the token to cancel
|
||||
|
|
|
@ -351,7 +351,7 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader {
|
|||
checksum.getBytesPerChecksum(),
|
||||
checksum.getChecksumSize());
|
||||
|
||||
this.isLocal = DFSClient.isLocalAddress(NetUtils.
|
||||
this.isLocal = DFSUtilClient.isLocalAddress(NetUtils.
|
||||
createSocketAddr(datanodeID.getXferAddr()));
|
||||
|
||||
this.peer = peer;
|
||||
|
|
|
@ -290,7 +290,7 @@ public class RemoteBlockReader2 implements BlockReader {
|
|||
DataChecksum checksum, boolean verifyChecksum,
|
||||
long startOffset, long firstChunkOffset, long bytesToRead, Peer peer,
|
||||
DatanodeID datanodeID, PeerCache peerCache) {
|
||||
this.isLocal = DFSClient.isLocalAddress(NetUtils.
|
||||
this.isLocal = DFSUtilClient.isLocalAddress(NetUtils.
|
||||
createSocketAddr(datanodeID.getXferAddr()));
|
||||
// Path is used only for printing block and file information in debug
|
||||
this.peer = peer;
|
||||
|
|
|
@ -310,7 +310,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
volumes = new FsVolumeList(volumeFailureInfos, datanode.getBlockScanner(),
|
||||
blockChooserImpl);
|
||||
asyncDiskService = new FsDatasetAsyncDiskService(datanode, this);
|
||||
asyncLazyPersistService = new RamDiskAsyncLazyPersistService(datanode);
|
||||
asyncLazyPersistService = new RamDiskAsyncLazyPersistService(datanode, conf);
|
||||
deletingBlock = new HashMap<String, Set<Long>>();
|
||||
|
||||
for (int idx = 0; idx < storage.getNumStorageDirs(); idx++) {
|
||||
|
@ -852,20 +852,20 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
*/
|
||||
static File[] copyBlockFiles(long blockId, long genStamp, File srcMeta,
|
||||
File srcFile, File destRoot, boolean calculateChecksum,
|
||||
int smallBufferSize) throws IOException {
|
||||
int smallBufferSize, final Configuration conf) throws IOException {
|
||||
final File destDir = DatanodeUtil.idToBlockDir(destRoot, blockId);
|
||||
final File dstFile = new File(destDir, srcFile.getName());
|
||||
final File dstMeta = FsDatasetUtil.getMetaFile(dstFile, genStamp);
|
||||
return copyBlockFiles(srcMeta, srcFile, dstMeta, dstFile, calculateChecksum,
|
||||
smallBufferSize);
|
||||
smallBufferSize, conf);
|
||||
}
|
||||
|
||||
static File[] copyBlockFiles(File srcMeta, File srcFile, File dstMeta,
|
||||
File dstFile, boolean calculateChecksum,
|
||||
int smallBufferSize)
|
||||
int smallBufferSize, final Configuration conf)
|
||||
throws IOException {
|
||||
if (calculateChecksum) {
|
||||
computeChecksum(srcMeta, dstMeta, srcFile, smallBufferSize);
|
||||
computeChecksum(srcMeta, dstMeta, srcFile, smallBufferSize, conf);
|
||||
} else {
|
||||
try {
|
||||
Storage.nativeCopyFileUnbuffered(srcMeta, dstMeta, true);
|
||||
|
@ -929,7 +929,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
File[] blockFiles = copyBlockFiles(block.getBlockId(),
|
||||
block.getGenerationStamp(), oldMetaFile, oldBlockFile,
|
||||
targetVolume.getTmpDir(block.getBlockPoolId()),
|
||||
replicaInfo.isOnTransientStorage(), smallBufferSize);
|
||||
replicaInfo.isOnTransientStorage(), smallBufferSize, conf);
|
||||
|
||||
ReplicaInfo newReplicaInfo = new ReplicaInPipeline(
|
||||
replicaInfo.getBlockId(), replicaInfo.getGenerationStamp(),
|
||||
|
@ -958,9 +958,10 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
* @throws IOException
|
||||
*/
|
||||
private static void computeChecksum(File srcMeta, File dstMeta,
|
||||
File blockFile, int smallBufferSize)
|
||||
File blockFile, int smallBufferSize, final Configuration conf)
|
||||
throws IOException {
|
||||
final DataChecksum checksum = BlockMetadataHeader.readDataChecksum(srcMeta);
|
||||
final DataChecksum checksum = BlockMetadataHeader.readDataChecksum(srcMeta,
|
||||
DFSUtil.getIoFileBufferSize(conf));
|
||||
final byte[] data = new byte[1 << 16];
|
||||
final byte[] crcs = new byte[checksum.getChecksumSize(data.length)];
|
||||
|
||||
|
@ -2518,7 +2519,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
final File dstMetaFile = FsDatasetUtil.getMetaFile(dstBlockFile, newGS);
|
||||
return copyBlockFiles(replicaInfo.getMetaFile(),
|
||||
replicaInfo.getBlockFile(),
|
||||
dstMetaFile, dstBlockFile, true, smallBufferSize);
|
||||
dstMetaFile, dstBlockFile, true, smallBufferSize, conf);
|
||||
}
|
||||
|
||||
@Override // FsDatasetSpi
|
||||
|
|
|
@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
|
|||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdfs.DFSUtil;
|
||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||
|
@ -53,6 +54,8 @@ class RamDiskAsyncLazyPersistService {
|
|||
private static final long THREADS_KEEP_ALIVE_SECONDS = 60;
|
||||
|
||||
private final DataNode datanode;
|
||||
private final Configuration conf;
|
||||
|
||||
private final ThreadGroup threadGroup;
|
||||
private Map<File, ThreadPoolExecutor> executors
|
||||
= new HashMap<File, ThreadPoolExecutor>();
|
||||
|
@ -65,8 +68,9 @@ class RamDiskAsyncLazyPersistService {
|
|||
* The RamDiskAsyncLazyPersistService uses one ThreadPool per volume to do the async
|
||||
* disk operations.
|
||||
*/
|
||||
RamDiskAsyncLazyPersistService(DataNode datanode) {
|
||||
RamDiskAsyncLazyPersistService(DataNode datanode, Configuration conf) {
|
||||
this.datanode = datanode;
|
||||
this.conf = conf;
|
||||
this.threadGroup = new ThreadGroup(getClass().getSimpleName());
|
||||
}
|
||||
|
||||
|
@ -240,7 +244,7 @@ class RamDiskAsyncLazyPersistService {
|
|||
// No FsDatasetImpl lock for the file copy
|
||||
File targetFiles[] = FsDatasetImpl.copyBlockFiles(
|
||||
blockId, genStamp, metaFile, blockFile, lazyPersistDir, true,
|
||||
smallBufferSize);
|
||||
smallBufferSize, conf);
|
||||
|
||||
// Lock FsDataSetImpl during onCompleteLazyPersist callback
|
||||
dataset.onCompleteLazyPersist(bpId, blockId,
|
||||
|
|
Loading…
Reference in New Issue