HDFS-8053. Move DFSIn/OutputStream and related classes to hadoop-hdfs-client. Contributed by Mingliang Liu.
This commit is contained in:
parent
b46e4ceafd
commit
94cbb6d164
|
@ -32,4 +32,28 @@
|
||||||
<Method name="allocSlot" />
|
<Method name="allocSlot" />
|
||||||
<Bug pattern="UL_UNRELEASED_LOCK" />
|
<Bug pattern="UL_UNRELEASED_LOCK" />
|
||||||
</Match>
|
</Match>
|
||||||
|
<Match>
|
||||||
|
<Class name="org.apache.hadoop.hdfs.DFSInputStream"/>
|
||||||
|
<Field name="tcpReadsDisabledForTesting"/>
|
||||||
|
<Bug pattern="MS_SHOULD_BE_FINAL"/>
|
||||||
|
</Match>
|
||||||
|
|
||||||
|
<!--
|
||||||
|
ResponseProccessor is thread that is designed to catch RuntimeException.
|
||||||
|
-->
|
||||||
|
<Match>
|
||||||
|
<Class name="org.apache.hadoop.hdfs.DataStreamer$ResponseProcessor" />
|
||||||
|
<Method name="run" />
|
||||||
|
<Bug pattern="REC_CATCH_EXCEPTION" />
|
||||||
|
</Match>
|
||||||
|
|
||||||
|
<!--
|
||||||
|
We use a separate lock to guard cachingStrategy in order to separate
|
||||||
|
locks for p-reads from seek + read invocations.
|
||||||
|
-->
|
||||||
|
<Match>
|
||||||
|
<Class name="org.apache.hadoop.hdfs.DFSInputStream" />
|
||||||
|
<Field name="cachingStrategy" />
|
||||||
|
<Bug pattern="IS2_INCONSISTENT_SYNC" />
|
||||||
|
</Match>
|
||||||
</FindBugsFilter>
|
</FindBugsFilter>
|
||||||
|
|
|
@ -31,8 +31,6 @@ import java.util.List;
|
||||||
import com.google.common.io.ByteArrayDataOutput;
|
import com.google.common.io.ByteArrayDataOutput;
|
||||||
import com.google.common.io.ByteStreams;
|
import com.google.common.io.ByteStreams;
|
||||||
import org.apache.commons.lang.mutable.MutableBoolean;
|
import org.apache.commons.lang.mutable.MutableBoolean;
|
||||||
import org.apache.commons.logging.Log;
|
|
||||||
import org.apache.commons.logging.LogFactory;
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.StorageType;
|
import org.apache.hadoop.fs.StorageType;
|
||||||
|
@ -56,7 +54,7 @@ import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitReplica;
|
||||||
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitReplicaInfo;
|
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitReplicaInfo;
|
||||||
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.Slot;
|
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.Slot;
|
||||||
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.SlotId;
|
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.SlotId;
|
||||||
import org.apache.hadoop.io.IOUtils;
|
import org.apache.hadoop.hdfs.util.IOUtilsClient;
|
||||||
import org.apache.hadoop.ipc.RemoteException;
|
import org.apache.hadoop.ipc.RemoteException;
|
||||||
import org.apache.hadoop.net.unix.DomainSocket;
|
import org.apache.hadoop.net.unix.DomainSocket;
|
||||||
import org.apache.hadoop.security.AccessControlException;
|
import org.apache.hadoop.security.AccessControlException;
|
||||||
|
@ -69,13 +67,16 @@ import org.apache.hadoop.util.Time;
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import com.google.common.base.Preconditions;
|
import com.google.common.base.Preconditions;
|
||||||
|
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Utility class to create BlockReader implementations.
|
* Utility class to create BlockReader implementations.
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public class BlockReaderFactory implements ShortCircuitReplicaCreator {
|
public class BlockReaderFactory implements ShortCircuitReplicaCreator {
|
||||||
static final Log LOG = LogFactory.getLog(BlockReaderFactory.class);
|
static final Logger LOG = LoggerFactory.getLogger(BlockReaderFactory.class);
|
||||||
|
|
||||||
public static class FailureInjector {
|
public static class FailureInjector {
|
||||||
public void injectRequestFileDescriptorsFailure() throws IOException {
|
public void injectRequestFileDescriptorsFailure() throws IOException {
|
||||||
|
@ -551,14 +552,14 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug(this + ": closing stale domain peer " + peer, e);
|
LOG.debug(this + ": closing stale domain peer " + peer, e);
|
||||||
}
|
}
|
||||||
IOUtils.cleanup(LOG, peer);
|
IOUtilsClient.cleanup(LOG, peer);
|
||||||
} else {
|
} else {
|
||||||
// Handle an I/O error we got when using a newly created socket.
|
// Handle an I/O error we got when using a newly created socket.
|
||||||
// We temporarily disable the domain socket path for a few minutes in
|
// We temporarily disable the domain socket path for a few minutes in
|
||||||
// this case, to prevent wasting more time on it.
|
// this case, to prevent wasting more time on it.
|
||||||
LOG.warn(this + ": I/O error requesting file descriptors. " +
|
LOG.warn(this + ": I/O error requesting file descriptors. " +
|
||||||
"Disabling domain socket " + peer.getDomainSocket(), e);
|
"Disabling domain socket " + peer.getDomainSocket(), e);
|
||||||
IOUtils.cleanup(LOG, peer);
|
IOUtilsClient.cleanup(LOG, peer);
|
||||||
clientContext.getDomainSocketFactory()
|
clientContext.getDomainSocketFactory()
|
||||||
.disableDomainSocketPath(pathInfo.getPath());
|
.disableDomainSocketPath(pathInfo.getPath());
|
||||||
return null;
|
return null;
|
||||||
|
@ -617,7 +618,7 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
|
||||||
return null;
|
return null;
|
||||||
} finally {
|
} finally {
|
||||||
if (replica == null) {
|
if (replica == null) {
|
||||||
IOUtils.cleanup(DFSClient.LOG, fis[0], fis[1]);
|
IOUtilsClient.cleanup(DFSClient.LOG, fis[0], fis[1]);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
case ERROR_UNSUPPORTED:
|
case ERROR_UNSUPPORTED:
|
||||||
|
@ -685,7 +686,7 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
|
||||||
blockReader = getRemoteBlockReader(peer);
|
blockReader = getRemoteBlockReader(peer);
|
||||||
return blockReader;
|
return blockReader;
|
||||||
} catch (IOException ioe) {
|
} catch (IOException ioe) {
|
||||||
IOUtils.cleanup(LOG, peer);
|
IOUtilsClient.cleanup(LOG, peer);
|
||||||
if (isSecurityException(ioe)) {
|
if (isSecurityException(ioe)) {
|
||||||
if (LOG.isTraceEnabled()) {
|
if (LOG.isTraceEnabled()) {
|
||||||
LOG.trace(this + ": got security exception while constructing " +
|
LOG.trace(this + ": got security exception while constructing " +
|
||||||
|
@ -712,7 +713,7 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
if (blockReader == null) {
|
if (blockReader == null) {
|
||||||
IOUtils.cleanup(LOG, peer);
|
IOUtilsClient.cleanup(LOG, peer);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -769,7 +770,7 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
if (blockReader == null) {
|
if (blockReader == null) {
|
||||||
IOUtils.cleanup(LOG, peer);
|
IOUtilsClient.cleanup(LOG, peer);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
|
@ -31,8 +31,6 @@ import java.util.concurrent.Future;
|
||||||
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
|
||||||
import org.apache.commons.logging.LogFactory;
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.classification.InterfaceStability;
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
@ -54,13 +52,15 @@ import org.apache.htrace.TraceScope;
|
||||||
|
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
import com.google.common.collect.Maps;
|
import com.google.common.collect.Maps;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
@InterfaceStability.Unstable
|
@InterfaceStability.Unstable
|
||||||
class BlockStorageLocationUtil {
|
class BlockStorageLocationUtil {
|
||||||
|
|
||||||
static final Log LOG = LogFactory
|
static final Logger LOG = LoggerFactory
|
||||||
.getLog(BlockStorageLocationUtil.class);
|
.getLogger(BlockStorageLocationUtil.class);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create a list of {@link VolumeBlockLocationCallable} corresponding to a set
|
* Create a list of {@link VolumeBlockLocationCallable} corresponding to a set
|
|
@ -55,8 +55,6 @@ import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
import javax.net.SocketFactory;
|
import javax.net.SocketFactory;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
|
||||||
import org.apache.commons.logging.LogFactory;
|
|
||||||
import org.apache.hadoop.HadoopIllegalArgumentException;
|
import org.apache.hadoop.HadoopIllegalArgumentException;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
@ -157,6 +155,7 @@ import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifie
|
||||||
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
|
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
|
import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
|
||||||
|
import org.apache.hadoop.hdfs.util.IOUtilsClient;
|
||||||
import org.apache.hadoop.io.DataOutputBuffer;
|
import org.apache.hadoop.io.DataOutputBuffer;
|
||||||
import org.apache.hadoop.io.EnumSetWritable;
|
import org.apache.hadoop.io.EnumSetWritable;
|
||||||
import org.apache.hadoop.io.IOUtils;
|
import org.apache.hadoop.io.IOUtils;
|
||||||
|
@ -192,6 +191,9 @@ import com.google.common.base.Preconditions;
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
import com.google.common.net.InetAddresses;
|
import com.google.common.net.InetAddresses;
|
||||||
|
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
/********************************************************
|
/********************************************************
|
||||||
* DFSClient can connect to a Hadoop Filesystem and
|
* DFSClient can connect to a Hadoop Filesystem and
|
||||||
* perform basic file tasks. It uses the ClientProtocol
|
* perform basic file tasks. It uses the ClientProtocol
|
||||||
|
@ -206,7 +208,7 @@ import com.google.common.net.InetAddresses;
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public class DFSClient implements java.io.Closeable, RemotePeerFactory,
|
public class DFSClient implements java.io.Closeable, RemotePeerFactory,
|
||||||
DataEncryptionKeyFactory {
|
DataEncryptionKeyFactory {
|
||||||
public static final Log LOG = LogFactory.getLog(DFSClient.class);
|
public static final Logger LOG = LoggerFactory.getLogger(DFSClient.class);
|
||||||
public static final long SERVER_DEFAULTS_VALIDITY_PERIOD = 60 * 60 * 1000L; // 1 hour
|
public static final long SERVER_DEFAULTS_VALIDITY_PERIOD = 60 * 60 * 1000L; // 1 hour
|
||||||
|
|
||||||
private final Configuration conf;
|
private final Configuration conf;
|
||||||
|
@ -310,7 +312,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
|
||||||
this.stats = stats;
|
this.stats = stats;
|
||||||
this.socketFactory = NetUtils.getSocketFactory(conf, ClientProtocol.class);
|
this.socketFactory = NetUtils.getSocketFactory(conf, ClientProtocol.class);
|
||||||
this.dtpReplaceDatanodeOnFailure = ReplaceDatanodeOnFailure.get(conf);
|
this.dtpReplaceDatanodeOnFailure = ReplaceDatanodeOnFailure.get(conf);
|
||||||
this.smallBufferSize = DFSUtil.getSmallBufferSize(conf);
|
this.smallBufferSize = DFSUtilClient.getSmallBufferSize(conf);
|
||||||
|
|
||||||
this.ugi = UserGroupInformation.getCurrentUser();
|
this.ugi = UserGroupInformation.getCurrentUser();
|
||||||
|
|
||||||
|
@ -323,6 +325,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
|
||||||
HdfsClientConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_DEFAULT);
|
HdfsClientConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_DEFAULT);
|
||||||
ProxyAndInfo<ClientProtocol> proxyInfo = null;
|
ProxyAndInfo<ClientProtocol> proxyInfo = null;
|
||||||
AtomicBoolean nnFallbackToSimpleAuth = new AtomicBoolean(false);
|
AtomicBoolean nnFallbackToSimpleAuth = new AtomicBoolean(false);
|
||||||
|
|
||||||
if (numResponseToDrop > 0) {
|
if (numResponseToDrop > 0) {
|
||||||
// This case is used for testing.
|
// This case is used for testing.
|
||||||
LOG.warn(HdfsClientConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_KEY
|
LOG.warn(HdfsClientConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_KEY
|
||||||
|
@ -732,7 +735,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
|
||||||
static {
|
static {
|
||||||
//Ensure that HDFS Configuration files are loaded before trying to use
|
//Ensure that HDFS Configuration files are loaded before trying to use
|
||||||
// the renewer.
|
// the renewer.
|
||||||
HdfsConfiguration.init();
|
HdfsConfigurationLoader.init();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -2068,7 +2071,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
|
||||||
|
|
||||||
return PBHelperClient.convert(reply.getReadOpChecksumInfo().getChecksum().getType());
|
return PBHelperClient.convert(reply.getReadOpChecksumInfo().getChecksum().getType());
|
||||||
} finally {
|
} finally {
|
||||||
IOUtils.cleanup(null, pair.in, pair.out);
|
IOUtilsClient.cleanup(null, pair.in, pair.out);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3085,7 +3088,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
|
||||||
return peer;
|
return peer;
|
||||||
} finally {
|
} finally {
|
||||||
if (!success) {
|
if (!success) {
|
||||||
IOUtils.cleanup(LOG, peer);
|
IOUtilsClient.cleanup(LOG, peer);
|
||||||
IOUtils.closeSocket(sock);
|
IOUtils.closeSocket(sock);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -3157,11 +3160,11 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Probe for encryption enabled on this filesystem.
|
* Probe for encryption enabled on this filesystem.
|
||||||
* See {@link DFSUtil#isHDFSEncryptionEnabled(Configuration)}
|
* See {@link DFSUtilClient#isHDFSEncryptionEnabled(Configuration)}
|
||||||
* @return true if encryption is enabled
|
* @return true if encryption is enabled
|
||||||
*/
|
*/
|
||||||
public boolean isHDFSEncryptionEnabled() {
|
public boolean isHDFSEncryptionEnabled() {
|
||||||
return DFSUtil.isHDFSEncryptionEnabled(this.conf);
|
return DFSUtilClient.isHDFSEncryptionEnabled(this.conf);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
|
@ -30,12 +30,15 @@ import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public class DFSClientFaultInjector {
|
public class DFSClientFaultInjector {
|
||||||
public static DFSClientFaultInjector instance = new DFSClientFaultInjector();
|
private static DFSClientFaultInjector instance = new DFSClientFaultInjector();
|
||||||
public static AtomicLong exceptionNum = new AtomicLong(0);
|
public static AtomicLong exceptionNum = new AtomicLong(0);
|
||||||
|
|
||||||
public static DFSClientFaultInjector get() {
|
public static DFSClientFaultInjector get() {
|
||||||
return instance;
|
return instance;
|
||||||
}
|
}
|
||||||
|
public static void set(DFSClientFaultInjector instance) {
|
||||||
|
DFSClientFaultInjector.instance = instance;
|
||||||
|
}
|
||||||
|
|
||||||
public boolean corruptPacket() {
|
public boolean corruptPacket() {
|
||||||
return false;
|
return false;
|
|
@ -44,8 +44,8 @@ import java.util.concurrent.TimeUnit;
|
||||||
@InterfaceAudience.Public
|
@InterfaceAudience.Public
|
||||||
@InterfaceStability.Unstable
|
@InterfaceStability.Unstable
|
||||||
public class DFSInotifyEventInputStream {
|
public class DFSInotifyEventInputStream {
|
||||||
public static Logger LOG = LoggerFactory.getLogger(DFSInotifyEventInputStream
|
public static final Logger LOG = LoggerFactory.getLogger(
|
||||||
.class);
|
DFSInotifyEventInputStream.class);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The trace sampler to use when making RPCs to the NameNode.
|
* The trace sampler to use when making RPCs to the NameNode.
|
|
@ -25,6 +25,7 @@ import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.crypto.key.KeyProvider;
|
import org.apache.hadoop.crypto.key.KeyProvider;
|
||||||
import org.apache.hadoop.crypto.key.KeyProviderFactory;
|
import org.apache.hadoop.crypto.key.KeyProviderFactory;
|
||||||
import org.apache.hadoop.fs.BlockLocation;
|
import org.apache.hadoop.fs.BlockLocation;
|
||||||
|
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
||||||
|
@ -590,6 +591,29 @@ public class DFSUtilClient {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static int getIoFileBufferSize(Configuration conf) {
|
||||||
|
return conf.getInt(
|
||||||
|
CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY,
|
||||||
|
CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static int getSmallBufferSize(Configuration conf) {
|
||||||
|
return Math.min(getIoFileBufferSize(conf) / 2, 512);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Probe for HDFS Encryption being enabled; this uses the value of
|
||||||
|
* the option {@link HdfsClientConfigKeys#DFS_ENCRYPTION_KEY_PROVIDER_URI},
|
||||||
|
* returning true if that property contains a non-empty, non-whitespace
|
||||||
|
* string.
|
||||||
|
* @param conf configuration to probe
|
||||||
|
* @return true if encryption is considered enabled.
|
||||||
|
*/
|
||||||
|
public static boolean isHDFSEncryptionEnabled(Configuration conf) {
|
||||||
|
return !conf.getTrimmed(
|
||||||
|
HdfsClientConfigKeys.DFS_ENCRYPTION_KEY_PROVIDER_URI, "").isEmpty();
|
||||||
|
}
|
||||||
|
|
||||||
public static InetSocketAddress getNNAddress(String address) {
|
public static InetSocketAddress getNNAddress(String address) {
|
||||||
return NetUtils.createSocketAddr(address,
|
return NetUtils.createSocketAddr(address,
|
||||||
HdfsClientConfigKeys.DFS_NAMENODE_RPC_PORT_DEFAULT);
|
HdfsClientConfigKeys.DFS_NAMENODE_RPC_PORT_DEFAULT);
|
||||||
|
|
|
@ -40,8 +40,6 @@ import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
|
||||||
import org.apache.commons.logging.LogFactory;
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.fs.StorageType;
|
import org.apache.hadoop.fs.StorageType;
|
||||||
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.BlockWrite;
|
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.BlockWrite;
|
||||||
|
@ -94,6 +92,9 @@ import com.google.common.cache.LoadingCache;
|
||||||
import com.google.common.cache.RemovalListener;
|
import com.google.common.cache.RemovalListener;
|
||||||
import com.google.common.cache.RemovalNotification;
|
import com.google.common.cache.RemovalNotification;
|
||||||
|
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
/*********************************************************************
|
/*********************************************************************
|
||||||
*
|
*
|
||||||
* The DataStreamer class is responsible for sending data packets to the
|
* The DataStreamer class is responsible for sending data packets to the
|
||||||
|
@ -117,7 +118,7 @@ import com.google.common.cache.RemovalNotification;
|
||||||
|
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
class DataStreamer extends Daemon {
|
class DataStreamer extends Daemon {
|
||||||
static final Log LOG = LogFactory.getLog(DataStreamer.class);
|
static final Logger LOG = LoggerFactory.getLogger(DataStreamer.class);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create a socket for a write pipeline
|
* Create a socket for a write pipeline
|
||||||
|
@ -1229,7 +1230,7 @@ class DataStreamer extends Daemon {
|
||||||
unbufOut = saslStreams.out;
|
unbufOut = saslStreams.out;
|
||||||
unbufIn = saslStreams.in;
|
unbufIn = saslStreams.in;
|
||||||
out = new DataOutputStream(new BufferedOutputStream(unbufOut,
|
out = new DataOutputStream(new BufferedOutputStream(unbufOut,
|
||||||
DFSUtil.getSmallBufferSize(dfsClient.getConfiguration())));
|
DFSUtilClient.getSmallBufferSize(dfsClient.getConfiguration())));
|
||||||
in = new DataInputStream(unbufIn);
|
in = new DataInputStream(unbufIn);
|
||||||
|
|
||||||
//send the TRANSFER_BLOCK request
|
//send the TRANSFER_BLOCK request
|
||||||
|
@ -1494,7 +1495,7 @@ class DataStreamer extends Daemon {
|
||||||
unbufOut = saslStreams.out;
|
unbufOut = saslStreams.out;
|
||||||
unbufIn = saslStreams.in;
|
unbufIn = saslStreams.in;
|
||||||
out = new DataOutputStream(new BufferedOutputStream(unbufOut,
|
out = new DataOutputStream(new BufferedOutputStream(unbufOut,
|
||||||
DFSUtil.getSmallBufferSize(dfsClient.getConfiguration())));
|
DFSUtilClient.getSmallBufferSize(dfsClient.getConfiguration())));
|
||||||
blockReplyStream = new DataInputStream(unbufIn);
|
blockReplyStream = new DataInputStream(unbufIn);
|
||||||
|
|
||||||
//
|
//
|
|
@ -0,0 +1,44 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hdfs;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Load default HDFS configuration resources.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
class HdfsConfigurationLoader {
|
||||||
|
|
||||||
|
static {
|
||||||
|
// adds the default resources
|
||||||
|
Configuration.addDefaultResource("hdfs-default.xml");
|
||||||
|
Configuration.addDefaultResource("hdfs-site.xml");
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This method is here so that when invoked, default resources are added if
|
||||||
|
* they haven't already been previously loaded. Upon loading this class, the
|
||||||
|
* static initializer block above will be executed to add the default
|
||||||
|
* resources. It is safe for this method to be called multiple times
|
||||||
|
* as the static initializer block will only get invoked once.
|
||||||
|
*/
|
||||||
|
public static void init() {
|
||||||
|
}
|
||||||
|
}
|
|
@ -27,8 +27,6 @@ import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
|
||||||
import org.apache.commons.logging.LogFactory;
|
|
||||||
import org.apache.hadoop.HadoopIllegalArgumentException;
|
import org.apache.hadoop.HadoopIllegalArgumentException;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.hdfs.DFSClient;
|
import org.apache.hadoop.hdfs.DFSClient;
|
||||||
|
@ -39,6 +37,8 @@ import org.apache.hadoop.util.Daemon;
|
||||||
import org.apache.hadoop.util.StringUtils;
|
import org.apache.hadoop.util.StringUtils;
|
||||||
import org.apache.hadoop.util.Time;
|
import org.apache.hadoop.util.Time;
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* <p>
|
* <p>
|
||||||
|
@ -73,7 +73,7 @@ import com.google.common.annotations.VisibleForTesting;
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public class LeaseRenewer {
|
public class LeaseRenewer {
|
||||||
static final Log LOG = LogFactory.getLog(LeaseRenewer.class);
|
static final Logger LOG = LoggerFactory.getLogger(LeaseRenewer.class);
|
||||||
|
|
||||||
static final long LEASE_RENEWER_GRACE_DEFAULT = 60*1000L;
|
static final long LEASE_RENEWER_GRACE_DEFAULT = 60*1000L;
|
||||||
static final long LEASE_RENEWER_SLEEP_DEFAULT = 1000L;
|
static final long LEASE_RENEWER_SLEEP_DEFAULT = 1000L;
|
|
@ -629,6 +629,9 @@ Release 2.8.0 - UNRELEASED
|
||||||
HDFS-8873. Allow the directoryScanner to be rate-limited (Daniel Templeton
|
HDFS-8873. Allow the directoryScanner to be rate-limited (Daniel Templeton
|
||||||
via Colin P. McCabe)
|
via Colin P. McCabe)
|
||||||
|
|
||||||
|
HDFS-8053. Move DFSIn/OutputStream and related classes to
|
||||||
|
hadoop-hdfs-client. (Mingliang Liu via wheat9)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than
|
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than
|
||||||
|
|
|
@ -73,15 +73,6 @@
|
||||||
<Bug pattern="OBL_UNSATISFIED_OBLIGATION" />
|
<Bug pattern="OBL_UNSATISFIED_OBLIGATION" />
|
||||||
</Match>
|
</Match>
|
||||||
|
|
||||||
<!--
|
|
||||||
ResponseProccessor is thread that is designed to catch RuntimeException.
|
|
||||||
-->
|
|
||||||
<Match>
|
|
||||||
<Class name="org.apache.hadoop.hdfs.DataStreamer$ResponseProcessor" />
|
|
||||||
<Method name="run" />
|
|
||||||
<Bug pattern="REC_CATCH_EXCEPTION" />
|
|
||||||
</Match>
|
|
||||||
|
|
||||||
<!--
|
<!--
|
||||||
lastAppliedTxid is carefully unsynchronized in the BackupNode in a couple spots.
|
lastAppliedTxid is carefully unsynchronized in the BackupNode in a couple spots.
|
||||||
See the comments in BackupImage for justification.
|
See the comments in BackupImage for justification.
|
||||||
|
@ -196,14 +187,4 @@
|
||||||
<Method name="assertAllResultsEqual" />
|
<Method name="assertAllResultsEqual" />
|
||||||
<Bug pattern="NP_LOAD_OF_KNOWN_NULL_VALUE" />
|
<Bug pattern="NP_LOAD_OF_KNOWN_NULL_VALUE" />
|
||||||
</Match>
|
</Match>
|
||||||
|
|
||||||
<!--
|
|
||||||
We use a separate lock to guard cachingStrategy in order to separate
|
|
||||||
locks for p-reads from seek + read invocations.
|
|
||||||
-->
|
|
||||||
<Match>
|
|
||||||
<Class name="org.apache.hadoop.hdfs.DFSInputStream" />
|
|
||||||
<Field name="cachingStrategy" />
|
|
||||||
<Bug pattern="IS2_INCONSISTENT_SYNC" />
|
|
||||||
</Match>
|
|
||||||
</FindBugsFilter>
|
</FindBugsFilter>
|
||||||
|
|
|
@ -18,8 +18,6 @@
|
||||||
|
|
||||||
package org.apache.hadoop.hdfs;
|
package org.apache.hadoop.hdfs;
|
||||||
|
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||||
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
||||||
|
|
|
@ -1477,27 +1477,4 @@ public class DFSUtil {
|
||||||
return cryptoProvider;
|
return cryptoProvider;
|
||||||
}
|
}
|
||||||
|
|
||||||
public static int getIoFileBufferSize(Configuration conf) {
|
|
||||||
return conf.getInt(
|
|
||||||
CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY,
|
|
||||||
CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT);
|
|
||||||
}
|
|
||||||
|
|
||||||
public static int getSmallBufferSize(Configuration conf) {
|
|
||||||
return Math.min(getIoFileBufferSize(conf) / 2, 512);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Probe for HDFS Encryption being enabled; this uses the value of
|
|
||||||
* the option {@link DFSConfigKeys.DFS_ENCRYPTION_KEY_PROVIDER_URI},
|
|
||||||
* returning true if that property contains a non-empty, non-whitespace
|
|
||||||
* string.
|
|
||||||
* @param conf configuration to probe
|
|
||||||
* @return true if encryption is considered enabled.
|
|
||||||
*/
|
|
||||||
public static boolean isHDFSEncryptionEnabled(Configuration conf) {
|
|
||||||
return !conf.getTrimmed(
|
|
||||||
DFSConfigKeys.DFS_ENCRYPTION_KEY_PROVIDER_URI, "").isEmpty();
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -31,9 +31,7 @@ public class HdfsConfiguration extends Configuration {
|
||||||
addDeprecatedKeys();
|
addDeprecatedKeys();
|
||||||
|
|
||||||
// adds the default resources
|
// adds the default resources
|
||||||
Configuration.addDefaultResource("hdfs-default.xml");
|
HdfsConfigurationLoader.init();
|
||||||
Configuration.addDefaultResource("hdfs-site.xml");
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public HdfsConfiguration() {
|
public HdfsConfiguration() {
|
||||||
|
@ -52,9 +50,10 @@ public class HdfsConfiguration extends Configuration {
|
||||||
* This method is here so that when invoked, HdfsConfiguration is class-loaded if
|
* This method is here so that when invoked, HdfsConfiguration is class-loaded if
|
||||||
* it hasn't already been previously loaded. Upon loading the class, the static
|
* it hasn't already been previously loaded. Upon loading the class, the static
|
||||||
* initializer block above will be executed to add the deprecated keys and to add
|
* initializer block above will be executed to add the deprecated keys and to add
|
||||||
* the default resources. It is safe for this method to be called multiple times
|
* the default resources via {@link HdfsConfigurationLoader#init()}. It is
|
||||||
* as the static initializer block will only get invoked once.
|
* safe for this method to be called multiple times as the static initializer
|
||||||
*
|
* block will only get invoked once.
|
||||||
|
*
|
||||||
* This replaces the previously, dangerous practice of other classes calling
|
* This replaces the previously, dangerous practice of other classes calling
|
||||||
* Configuration.addDefaultResource("hdfs-default.xml") directly without loading
|
* Configuration.addDefaultResource("hdfs-default.xml") directly without loading
|
||||||
* HdfsConfiguration class first, thereby skipping the key deprecation
|
* HdfsConfiguration class first, thereby skipping the key deprecation
|
||||||
|
|
|
@ -49,6 +49,7 @@ import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||||
import org.apache.hadoop.fs.StorageType;
|
import org.apache.hadoop.fs.StorageType;
|
||||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.DFSUtil;
|
import org.apache.hadoop.hdfs.DFSUtil;
|
||||||
|
import org.apache.hadoop.hdfs.DFSUtilClient;
|
||||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||||
import org.apache.hadoop.hdfs.protocol.Block;
|
import org.apache.hadoop.hdfs.protocol.Block;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||||
|
@ -879,7 +880,7 @@ public class Dispatcher {
|
||||||
this.saslClient = new SaslDataTransferClient(conf,
|
this.saslClient = new SaslDataTransferClient(conf,
|
||||||
DataTransferSaslUtil.getSaslPropertiesResolver(conf),
|
DataTransferSaslUtil.getSaslPropertiesResolver(conf),
|
||||||
TrustedChannelResolver.getInstance(conf), nnc.fallbackToSimpleAuth);
|
TrustedChannelResolver.getInstance(conf), nnc.fallbackToSimpleAuth);
|
||||||
this.ioFileBufferSize = DFSUtil.getIoFileBufferSize(conf);
|
this.ioFileBufferSize = DFSUtilClient.getIoFileBufferSize(conf);
|
||||||
}
|
}
|
||||||
|
|
||||||
public DistributedFileSystem getDistributedFileSystem() {
|
public DistributedFileSystem getDistributedFileSystem() {
|
||||||
|
|
|
@ -38,7 +38,7 @@ import org.apache.commons.logging.Log;
|
||||||
import org.apache.hadoop.fs.ChecksumException;
|
import org.apache.hadoop.fs.ChecksumException;
|
||||||
import org.apache.hadoop.fs.FSOutputSummer;
|
import org.apache.hadoop.fs.FSOutputSummer;
|
||||||
import org.apache.hadoop.fs.StorageType;
|
import org.apache.hadoop.fs.StorageType;
|
||||||
import org.apache.hadoop.hdfs.DFSUtil;
|
import org.apache.hadoop.hdfs.DFSUtilClient;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||||
import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
|
import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
|
||||||
|
@ -248,7 +248,7 @@ class BlockReceiver implements Closeable {
|
||||||
out.getClass());
|
out.getClass());
|
||||||
}
|
}
|
||||||
this.checksumOut = new DataOutputStream(new BufferedOutputStream(
|
this.checksumOut = new DataOutputStream(new BufferedOutputStream(
|
||||||
streams.getChecksumOut(), DFSUtil.getSmallBufferSize(
|
streams.getChecksumOut(), DFSUtilClient.getSmallBufferSize(
|
||||||
datanode.getConf())));
|
datanode.getConf())));
|
||||||
// write data chunk header if creating a new replica
|
// write data chunk header if creating a new replica
|
||||||
if (isCreate) {
|
if (isCreate) {
|
||||||
|
|
|
@ -34,7 +34,7 @@ import java.util.Arrays;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.hadoop.fs.ChecksumException;
|
import org.apache.hadoop.fs.ChecksumException;
|
||||||
import org.apache.hadoop.hdfs.DFSUtil;
|
import org.apache.hadoop.hdfs.DFSUtilClient;
|
||||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||||
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
|
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
|
||||||
|
@ -111,7 +111,7 @@ class BlockSender implements java.io.Closeable {
|
||||||
private static final int IO_FILE_BUFFER_SIZE;
|
private static final int IO_FILE_BUFFER_SIZE;
|
||||||
static {
|
static {
|
||||||
HdfsConfiguration conf = new HdfsConfiguration();
|
HdfsConfiguration conf = new HdfsConfiguration();
|
||||||
IO_FILE_BUFFER_SIZE = DFSUtil.getIoFileBufferSize(conf);
|
IO_FILE_BUFFER_SIZE = DFSUtilClient.getIoFileBufferSize(conf);
|
||||||
}
|
}
|
||||||
private static final int TRANSFERTO_BUFFER_SIZE = Math.max(
|
private static final int TRANSFERTO_BUFFER_SIZE = Math.max(
|
||||||
IO_FILE_BUFFER_SIZE, MIN_BUFFER_WITH_TRANSFERTO);
|
IO_FILE_BUFFER_SIZE, MIN_BUFFER_WITH_TRANSFERTO);
|
||||||
|
|
|
@ -109,6 +109,7 @@ import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.DFSUtil;
|
import org.apache.hadoop.hdfs.DFSUtil;
|
||||||
import org.apache.hadoop.hdfs.DFSUtil.ConfiguredNNAddress;
|
import org.apache.hadoop.hdfs.DFSUtil.ConfiguredNNAddress;
|
||||||
|
import org.apache.hadoop.hdfs.DFSUtilClient;
|
||||||
import org.apache.hadoop.hdfs.HDFSPolicyProvider;
|
import org.apache.hadoop.hdfs.HDFSPolicyProvider;
|
||||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||||
import org.apache.hadoop.hdfs.client.BlockReportOptions;
|
import org.apache.hadoop.hdfs.client.BlockReportOptions;
|
||||||
|
@ -2186,7 +2187,7 @@ public class DataNode extends ReconfigurableBase
|
||||||
unbufIn = saslStreams.in;
|
unbufIn = saslStreams.in;
|
||||||
|
|
||||||
out = new DataOutputStream(new BufferedOutputStream(unbufOut,
|
out = new DataOutputStream(new BufferedOutputStream(unbufOut,
|
||||||
DFSUtil.getSmallBufferSize(conf)));
|
DFSUtilClient.getSmallBufferSize(conf)));
|
||||||
in = new DataInputStream(unbufIn);
|
in = new DataInputStream(unbufIn);
|
||||||
blockSender = new BlockSender(b, 0, b.getNumBytes(),
|
blockSender = new BlockSender(b, 0, b.getNumBytes(),
|
||||||
false, false, true, DataNode.this, null, cachingStrategy);
|
false, false, true, DataNode.this, null, cachingStrategy);
|
||||||
|
|
|
@ -49,7 +49,7 @@ import java.util.Arrays;
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.hadoop.fs.StorageType;
|
import org.apache.hadoop.fs.StorageType;
|
||||||
import org.apache.hadoop.hdfs.DFSUtil;
|
import org.apache.hadoop.hdfs.DFSUtilClient;
|
||||||
import org.apache.hadoop.hdfs.ExtendedBlockId;
|
import org.apache.hadoop.hdfs.ExtendedBlockId;
|
||||||
import org.apache.hadoop.hdfs.net.Peer;
|
import org.apache.hadoop.hdfs.net.Peer;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||||
|
@ -134,8 +134,8 @@ class DataXceiver extends Receiver implements Runnable {
|
||||||
this.datanode = datanode;
|
this.datanode = datanode;
|
||||||
this.dataXceiverServer = dataXceiverServer;
|
this.dataXceiverServer = dataXceiverServer;
|
||||||
this.connectToDnViaHostname = datanode.getDnConf().connectToDnViaHostname;
|
this.connectToDnViaHostname = datanode.getDnConf().connectToDnViaHostname;
|
||||||
this.ioFileBufferSize = DFSUtil.getIoFileBufferSize(datanode.getConf());
|
this.ioFileBufferSize = DFSUtilClient.getIoFileBufferSize(datanode.getConf());
|
||||||
this.smallBufferSize = DFSUtil.getSmallBufferSize(datanode.getConf());
|
this.smallBufferSize = DFSUtilClient.getSmallBufferSize(datanode.getConf());
|
||||||
remoteAddress = peer.getRemoteAddressString();
|
remoteAddress = peer.getRemoteAddressString();
|
||||||
final int colonIdx = remoteAddress.indexOf(':');
|
final int colonIdx = remoteAddress.indexOf(':');
|
||||||
remoteAddressWithoutPort =
|
remoteAddressWithoutPort =
|
||||||
|
|
|
@ -38,7 +38,7 @@ import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.DU;
|
import org.apache.hadoop.fs.DU;
|
||||||
import org.apache.hadoop.fs.FileUtil;
|
import org.apache.hadoop.fs.FileUtil;
|
||||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.DFSUtil;
|
import org.apache.hadoop.hdfs.DFSUtilClient;
|
||||||
import org.apache.hadoop.hdfs.protocol.Block;
|
import org.apache.hadoop.hdfs.protocol.Block;
|
||||||
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
|
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
|
||||||
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportReplica;
|
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportReplica;
|
||||||
|
@ -111,7 +111,7 @@ class BlockPoolSlice {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
this.ioFileBufferSize = DFSUtil.getIoFileBufferSize(conf);
|
this.ioFileBufferSize = DFSUtilClient.getIoFileBufferSize(conf);
|
||||||
|
|
||||||
this.deleteDuplicateReplicas = conf.getBoolean(
|
this.deleteDuplicateReplicas = conf.getBoolean(
|
||||||
DFSConfigKeys.DFS_DATANODE_DUPLICATE_REPLICA_DELETION,
|
DFSConfigKeys.DFS_DATANODE_DUPLICATE_REPLICA_DELETION,
|
||||||
|
|
|
@ -59,7 +59,7 @@ import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.StorageType;
|
import org.apache.hadoop.fs.StorageType;
|
||||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.DFSUtil;
|
import org.apache.hadoop.hdfs.DFSUtilClient;
|
||||||
import org.apache.hadoop.hdfs.ExtendedBlockId;
|
import org.apache.hadoop.hdfs.ExtendedBlockId;
|
||||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||||
import org.apache.hadoop.hdfs.protocol.Block;
|
import org.apache.hadoop.hdfs.protocol.Block;
|
||||||
|
@ -268,8 +268,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
||||||
this.datanode = datanode;
|
this.datanode = datanode;
|
||||||
this.dataStorage = storage;
|
this.dataStorage = storage;
|
||||||
this.conf = conf;
|
this.conf = conf;
|
||||||
this.smallBufferSize = DFSUtil.getSmallBufferSize(conf);
|
this.smallBufferSize = DFSUtilClient.getSmallBufferSize(conf);
|
||||||
// The number of volumes required for operation is the total number
|
// The number of volumes required for operation is the total number
|
||||||
// of volumes minus the number of failed volumes we can tolerate.
|
// of volumes minus the number of failed volumes we can tolerate.
|
||||||
volFailuresTolerated =
|
volFailuresTolerated =
|
||||||
conf.getInt(DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY,
|
conf.getInt(DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY,
|
||||||
|
@ -961,7 +961,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
||||||
File blockFile, int smallBufferSize, final Configuration conf)
|
File blockFile, int smallBufferSize, final Configuration conf)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
final DataChecksum checksum = BlockMetadataHeader.readDataChecksum(srcMeta,
|
final DataChecksum checksum = BlockMetadataHeader.readDataChecksum(srcMeta,
|
||||||
DFSUtil.getIoFileBufferSize(conf));
|
DFSUtilClient.getIoFileBufferSize(conf));
|
||||||
final byte[] data = new byte[1 << 16];
|
final byte[] data = new byte[1 << 16];
|
||||||
final byte[] crcs = new byte[checksum.getChecksumSize(data.length)];
|
final byte[] crcs = new byte[checksum.getChecksumSize(data.length)];
|
||||||
|
|
||||||
|
|
|
@ -21,7 +21,7 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hdfs.DFSUtil;
|
import org.apache.hadoop.hdfs.DFSUtilClient;
|
||||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
|
||||||
|
@ -240,7 +240,7 @@ class RamDiskAsyncLazyPersistService {
|
||||||
boolean succeeded = false;
|
boolean succeeded = false;
|
||||||
final FsDatasetImpl dataset = (FsDatasetImpl)datanode.getFSDataset();
|
final FsDatasetImpl dataset = (FsDatasetImpl)datanode.getFSDataset();
|
||||||
try (FsVolumeReference ref = this.targetVolume) {
|
try (FsVolumeReference ref = this.targetVolume) {
|
||||||
int smallBufferSize = DFSUtil.getSmallBufferSize(EMPTY_HDFS_CONF);
|
int smallBufferSize = DFSUtilClient.getSmallBufferSize(EMPTY_HDFS_CONF);
|
||||||
// No FsDatasetImpl lock for the file copy
|
// No FsDatasetImpl lock for the file copy
|
||||||
File targetFiles[] = FsDatasetImpl.copyBlockFiles(
|
File targetFiles[] = FsDatasetImpl.copyBlockFiles(
|
||||||
blockId, genStamp, metaFile, blockFile, lazyPersistDir, true,
|
blockId, genStamp, metaFile, blockFile, lazyPersistDir, true,
|
||||||
|
|
|
@ -43,7 +43,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileUtil;
|
import org.apache.hadoop.fs.FileUtil;
|
||||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.DFSUtil;
|
import org.apache.hadoop.hdfs.DFSUtilClient;
|
||||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||||
import org.apache.hadoop.hdfs.server.common.Storage;
|
import org.apache.hadoop.hdfs.server.common.Storage;
|
||||||
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
|
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
|
||||||
|
@ -89,7 +89,7 @@ public class TransferFsImage {
|
||||||
connectionFactory = URLConnectionFactory
|
connectionFactory = URLConnectionFactory
|
||||||
.newDefaultURLConnectionFactory(conf);
|
.newDefaultURLConnectionFactory(conf);
|
||||||
isSpnegoEnabled = UserGroupInformation.isSecurityEnabled();
|
isSpnegoEnabled = UserGroupInformation.isSecurityEnabled();
|
||||||
IO_FILE_BUFFER_SIZE = DFSUtil.getIoFileBufferSize(conf);
|
IO_FILE_BUFFER_SIZE = DFSUtilClient.getIoFileBufferSize(conf);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static final Log LOG = LogFactory.getLog(TransferFsImage.class);
|
private static final Log LOG = LogFactory.getLog(TransferFsImage.class);
|
||||||
|
|
|
@ -20,7 +20,6 @@ package org.apache.hadoop.hdfs.server.datanode;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Random;
|
import java.util.Random;
|
||||||
|
|
||||||
import org.apache.commons.logging.impl.Log4JLogger;
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fi.DataTransferTestUtil;
|
import org.apache.hadoop.fi.DataTransferTestUtil;
|
||||||
import org.apache.hadoop.fi.DataTransferTestUtil.CountdownDoosAction;
|
import org.apache.hadoop.fi.DataTransferTestUtil.CountdownDoosAction;
|
||||||
|
|
|
@ -990,7 +990,7 @@ public class DFSTestUtil {
|
||||||
final long writeTimeout = dfsClient.getDatanodeWriteTimeout(datanodes.length);
|
final long writeTimeout = dfsClient.getDatanodeWriteTimeout(datanodes.length);
|
||||||
final DataOutputStream out = new DataOutputStream(new BufferedOutputStream(
|
final DataOutputStream out = new DataOutputStream(new BufferedOutputStream(
|
||||||
NetUtils.getOutputStream(s, writeTimeout),
|
NetUtils.getOutputStream(s, writeTimeout),
|
||||||
DFSUtil.getSmallBufferSize(dfsClient.getConfiguration())));
|
DFSUtilClient.getSmallBufferSize(dfsClient.getConfiguration())));
|
||||||
final DataInputStream in = new DataInputStream(NetUtils.getInputStream(s));
|
final DataInputStream in = new DataInputStream(NetUtils.getInputStream(s));
|
||||||
|
|
||||||
// send the request
|
// send the request
|
||||||
|
|
|
@ -28,8 +28,8 @@ import org.apache.hadoop.fs.StorageType;
|
||||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
|
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter;
|
import org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
|
|
||||||
import org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter.SecureResources;
|
import org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter.SecureResources;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl;
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl;
|
||||||
import org.apache.hadoop.net.NetUtils;
|
import org.apache.hadoop.net.NetUtils;
|
||||||
|
|
|
@ -36,7 +36,6 @@ import org.apache.hadoop.fs.StorageType;
|
||||||
import org.apache.hadoop.hdfs.protocol.*;
|
import org.apache.hadoop.hdfs.protocol.*;
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.*;
|
import org.apache.hadoop.hdfs.server.blockmanagement.*;
|
||||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
|
||||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
|
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||||
|
|
|
@ -124,8 +124,8 @@ public class TestClientProtocolForPipelineRecovery {
|
||||||
public void testPipelineRecoveryForLastBlock() throws IOException {
|
public void testPipelineRecoveryForLastBlock() throws IOException {
|
||||||
DFSClientFaultInjector faultInjector
|
DFSClientFaultInjector faultInjector
|
||||||
= Mockito.mock(DFSClientFaultInjector.class);
|
= Mockito.mock(DFSClientFaultInjector.class);
|
||||||
DFSClientFaultInjector oldInjector = DFSClientFaultInjector.instance;
|
DFSClientFaultInjector oldInjector = DFSClientFaultInjector.get();
|
||||||
DFSClientFaultInjector.instance = faultInjector;
|
DFSClientFaultInjector.set(faultInjector);
|
||||||
Configuration conf = new HdfsConfiguration();
|
Configuration conf = new HdfsConfiguration();
|
||||||
|
|
||||||
conf.setInt(HdfsClientConfigKeys.BlockWrite.LOCATEFOLLOWINGBLOCK_RETRIES_KEY, 3);
|
conf.setInt(HdfsClientConfigKeys.BlockWrite.LOCATEFOLLOWINGBLOCK_RETRIES_KEY, 3);
|
||||||
|
@ -153,7 +153,7 @@ public class TestClientProtocolForPipelineRecovery {
|
||||||
+ " corrupt replicas.");
|
+ " corrupt replicas.");
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
DFSClientFaultInjector.instance = oldInjector;
|
DFSClientFaultInjector.set(oldInjector);
|
||||||
if (cluster != null) {
|
if (cluster != null) {
|
||||||
cluster.shutdown();
|
cluster.shutdown();
|
||||||
}
|
}
|
||||||
|
|
|
@ -76,7 +76,7 @@ public class TestCrcCorruption {
|
||||||
@Before
|
@Before
|
||||||
public void setUp() throws IOException {
|
public void setUp() throws IOException {
|
||||||
faultInjector = Mockito.mock(DFSClientFaultInjector.class);
|
faultInjector = Mockito.mock(DFSClientFaultInjector.class);
|
||||||
DFSClientFaultInjector.instance = faultInjector;
|
DFSClientFaultInjector.set(faultInjector);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -903,16 +903,16 @@ public class TestDFSUtil {
|
||||||
Configuration conf = new Configuration(false);
|
Configuration conf = new Configuration(false);
|
||||||
conf.unset(DFSConfigKeys.DFS_ENCRYPTION_KEY_PROVIDER_URI);
|
conf.unset(DFSConfigKeys.DFS_ENCRYPTION_KEY_PROVIDER_URI);
|
||||||
assertFalse("encryption enabled on no provider key",
|
assertFalse("encryption enabled on no provider key",
|
||||||
DFSUtil.isHDFSEncryptionEnabled(conf));
|
DFSUtilClient.isHDFSEncryptionEnabled(conf));
|
||||||
conf.set(DFSConfigKeys.DFS_ENCRYPTION_KEY_PROVIDER_URI, "");
|
conf.set(DFSConfigKeys.DFS_ENCRYPTION_KEY_PROVIDER_URI, "");
|
||||||
assertFalse("encryption enabled on empty provider key",
|
assertFalse("encryption enabled on empty provider key",
|
||||||
DFSUtil.isHDFSEncryptionEnabled(conf));
|
DFSUtilClient.isHDFSEncryptionEnabled(conf));
|
||||||
conf.set(DFSConfigKeys.DFS_ENCRYPTION_KEY_PROVIDER_URI, "\n\t\n");
|
conf.set(DFSConfigKeys.DFS_ENCRYPTION_KEY_PROVIDER_URI, "\n\t\n");
|
||||||
assertFalse("encryption enabled on whitespace provider key",
|
assertFalse("encryption enabled on whitespace provider key",
|
||||||
DFSUtil.isHDFSEncryptionEnabled(conf));
|
DFSUtilClient.isHDFSEncryptionEnabled(conf));
|
||||||
conf.set(DFSConfigKeys.DFS_ENCRYPTION_KEY_PROVIDER_URI, "http://hadoop.apache.org");
|
conf.set(DFSConfigKeys.DFS_ENCRYPTION_KEY_PROVIDER_URI, "http://hadoop.apache.org");
|
||||||
assertTrue("encryption disabled on valid provider key",
|
assertTrue("encryption disabled on valid provider key",
|
||||||
DFSUtil.isHDFSEncryptionEnabled(conf));
|
DFSUtilClient.isHDFSEncryptionEnabled(conf));
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -90,7 +90,7 @@ public class TestFileStatus {
|
||||||
int fileSize, int blockSize) throws IOException {
|
int fileSize, int blockSize) throws IOException {
|
||||||
// Create and write a file that contains three blocks of data
|
// Create and write a file that contains three blocks of data
|
||||||
FSDataOutputStream stm = fileSys.create(name, true,
|
FSDataOutputStream stm = fileSys.create(name, true,
|
||||||
DFSUtil.getIoFileBufferSize(conf), (short)repl, (long)blockSize);
|
DFSUtilClient.getIoFileBufferSize(conf), (short)repl, (long)blockSize);
|
||||||
byte[] buffer = new byte[fileSize];
|
byte[] buffer = new byte[fileSize];
|
||||||
Random rand = new Random(seed);
|
Random rand = new Random(seed);
|
||||||
rand.nextBytes(buffer);
|
rand.nextBytes(buffer);
|
||||||
|
|
|
@ -295,9 +295,8 @@ public class TestPread {
|
||||||
hedgedReadTimeoutMillis);
|
hedgedReadTimeoutMillis);
|
||||||
conf.setInt(HdfsClientConfigKeys.Retry.WINDOW_BASE_KEY, 0);
|
conf.setInt(HdfsClientConfigKeys.Retry.WINDOW_BASE_KEY, 0);
|
||||||
// Set up the InjectionHandler
|
// Set up the InjectionHandler
|
||||||
DFSClientFaultInjector.instance = Mockito
|
DFSClientFaultInjector.set(Mockito.mock(DFSClientFaultInjector.class));
|
||||||
.mock(DFSClientFaultInjector.class);
|
DFSClientFaultInjector injector = DFSClientFaultInjector.get();
|
||||||
DFSClientFaultInjector injector = DFSClientFaultInjector.instance;
|
|
||||||
final int sleepMs = 100;
|
final int sleepMs = 100;
|
||||||
Mockito.doAnswer(new Answer<Void>() {
|
Mockito.doAnswer(new Answer<Void>() {
|
||||||
@Override
|
@Override
|
||||||
|
@ -371,9 +370,8 @@ public class TestPread {
|
||||||
initialHedgedReadTimeoutMillis);
|
initialHedgedReadTimeoutMillis);
|
||||||
|
|
||||||
// Set up the InjectionHandler
|
// Set up the InjectionHandler
|
||||||
DFSClientFaultInjector.instance = Mockito
|
DFSClientFaultInjector.set(Mockito.mock(DFSClientFaultInjector.class));
|
||||||
.mock(DFSClientFaultInjector.class);
|
DFSClientFaultInjector injector = DFSClientFaultInjector.get();
|
||||||
DFSClientFaultInjector injector = DFSClientFaultInjector.instance;
|
|
||||||
// make preads sleep for 50ms
|
// make preads sleep for 50ms
|
||||||
Mockito.doAnswer(new Answer<Void>() {
|
Mockito.doAnswer(new Answer<Void>() {
|
||||||
@Override
|
@Override
|
||||||
|
|
Loading…
Reference in New Issue