HADOOP-13658. Replace config key literal strings with names I: hadoop common. Contributed by Chen Liang
(cherry picked from commit 9a44a832a9
)
This commit is contained in:
parent
69f91d8c48
commit
ecb5f282ea
|
@ -2850,7 +2850,8 @@ public abstract class FileSystem extends Configured implements Closeable {
|
||||||
}
|
}
|
||||||
fs.key = key;
|
fs.key = key;
|
||||||
map.put(key, fs);
|
map.put(key, fs);
|
||||||
if (conf.getBoolean("fs.automatic.close", true)) {
|
if (conf.getBoolean(
|
||||||
|
FS_AUTOMATIC_CLOSE_KEY, FS_AUTOMATIC_CLOSE_DEFAULT)) {
|
||||||
toAutoClose.add(key);
|
toAutoClose.add(key);
|
||||||
}
|
}
|
||||||
return fs;
|
return fs;
|
||||||
|
|
|
@ -105,13 +105,13 @@ public class FTPFileSystem extends FileSystem {
|
||||||
// get port information from uri, (overrides info in conf)
|
// get port information from uri, (overrides info in conf)
|
||||||
int port = uri.getPort();
|
int port = uri.getPort();
|
||||||
port = (port == -1) ? FTP.DEFAULT_PORT : port;
|
port = (port == -1) ? FTP.DEFAULT_PORT : port;
|
||||||
conf.setInt("fs.ftp.host.port", port);
|
conf.setInt(FS_FTP_HOST_PORT, port);
|
||||||
|
|
||||||
// get user/password information from URI (overrides info in conf)
|
// get user/password information from URI (overrides info in conf)
|
||||||
String userAndPassword = uri.getUserInfo();
|
String userAndPassword = uri.getUserInfo();
|
||||||
if (userAndPassword == null) {
|
if (userAndPassword == null) {
|
||||||
userAndPassword = (conf.get("fs.ftp.user." + host, null) + ":" + conf
|
userAndPassword = (conf.get(FS_FTP_USER_PREFIX + host, null) + ":" + conf
|
||||||
.get("fs.ftp.password." + host, null));
|
.get(FS_FTP_PASSWORD_PREFIX + host, null));
|
||||||
}
|
}
|
||||||
String[] userPasswdInfo = userAndPassword.split(":");
|
String[] userPasswdInfo = userAndPassword.split(":");
|
||||||
Preconditions.checkState(userPasswdInfo.length > 1,
|
Preconditions.checkState(userPasswdInfo.length > 1,
|
||||||
|
|
|
@ -46,6 +46,8 @@ import org.apache.hadoop.fs.permission.AclUtil;
|
||||||
import org.apache.hadoop.fs.permission.FsPermission;
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
import org.apache.hadoop.io.IOUtils;
|
import org.apache.hadoop.io.IOUtils;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT;
|
||||||
|
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY;
|
||||||
import static org.apache.hadoop.fs.CreateFlag.CREATE;
|
import static org.apache.hadoop.fs.CreateFlag.CREATE;
|
||||||
import static org.apache.hadoop.fs.CreateFlag.LAZY_PERSIST;
|
import static org.apache.hadoop.fs.CreateFlag.LAZY_PERSIST;
|
||||||
|
|
||||||
|
@ -497,7 +499,8 @@ abstract class CommandWithDestination extends FsCommand {
|
||||||
FsPermission.getFileDefault().applyUMask(
|
FsPermission.getFileDefault().applyUMask(
|
||||||
FsPermission.getUMask(getConf())),
|
FsPermission.getUMask(getConf())),
|
||||||
createFlags,
|
createFlags,
|
||||||
getConf().getInt("io.file.buffer.size", 4096),
|
getConf().getInt(IO_FILE_BUFFER_SIZE_KEY,
|
||||||
|
IO_FILE_BUFFER_SIZE_DEFAULT),
|
||||||
lazyPersist ? 1 : getDefaultReplication(item.path),
|
lazyPersist ? 1 : getDefaultReplication(item.path),
|
||||||
getDefaultBlockSize(),
|
getDefaultBlockSize(),
|
||||||
null,
|
null,
|
||||||
|
|
|
@ -37,6 +37,11 @@ import org.apache.hadoop.util.bloom.Filter;
|
||||||
import org.apache.hadoop.util.bloom.Key;
|
import org.apache.hadoop.util.bloom.Key;
|
||||||
import org.apache.hadoop.util.hash.Hash;
|
import org.apache.hadoop.util.hash.Hash;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_MAPFILE_BLOOM_ERROR_RATE_DEFAULT;
|
||||||
|
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_MAPFILE_BLOOM_ERROR_RATE_KEY;
|
||||||
|
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_MAPFILE_BLOOM_SIZE_DEFAULT;
|
||||||
|
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_MAPFILE_BLOOM_SIZE_KEY;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This class extends {@link MapFile} and provides very much the same
|
* This class extends {@link MapFile} and provides very much the same
|
||||||
* functionality. However, it uses dynamic Bloom filters to provide
|
* functionality. However, it uses dynamic Bloom filters to provide
|
||||||
|
@ -159,13 +164,15 @@ public class BloomMapFile {
|
||||||
}
|
}
|
||||||
|
|
||||||
private synchronized void initBloomFilter(Configuration conf) {
|
private synchronized void initBloomFilter(Configuration conf) {
|
||||||
numKeys = conf.getInt("io.mapfile.bloom.size", 1024 * 1024);
|
numKeys = conf.getInt(
|
||||||
|
IO_MAPFILE_BLOOM_SIZE_KEY, IO_MAPFILE_BLOOM_SIZE_DEFAULT);
|
||||||
// vector size should be <code>-kn / (ln(1 - c^(1/k)))</code> bits for
|
// vector size should be <code>-kn / (ln(1 - c^(1/k)))</code> bits for
|
||||||
// single key, where <code> is the number of hash functions,
|
// single key, where <code> is the number of hash functions,
|
||||||
// <code>n</code> is the number of keys and <code>c</code> is the desired
|
// <code>n</code> is the number of keys and <code>c</code> is the desired
|
||||||
// max. error rate.
|
// max. error rate.
|
||||||
// Our desired error rate is by default 0.005, i.e. 0.5%
|
// Our desired error rate is by default 0.005, i.e. 0.5%
|
||||||
float errorRate = conf.getFloat("io.mapfile.bloom.error.rate", 0.005f);
|
float errorRate = conf.getFloat(
|
||||||
|
IO_MAPFILE_BLOOM_ERROR_RATE_KEY, IO_MAPFILE_BLOOM_ERROR_RATE_DEFAULT);
|
||||||
vectorSize = (int)Math.ceil((double)(-HASH_COUNT * numKeys) /
|
vectorSize = (int)Math.ceil((double)(-HASH_COUNT * numKeys) /
|
||||||
Math.log(1.0 - Math.pow(errorRate, 1.0/HASH_COUNT)));
|
Math.log(1.0 - Math.pow(errorRate, 1.0/HASH_COUNT)));
|
||||||
bloomFilter = new DynamicBloomFilter(vectorSize, HASH_COUNT,
|
bloomFilter = new DynamicBloomFilter(vectorSize, HASH_COUNT,
|
||||||
|
|
|
@ -37,6 +37,9 @@ import org.apache.hadoop.classification.InterfaceStability;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.util.ChunkedArrayList;
|
import org.apache.hadoop.util.ChunkedArrayList;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT;
|
||||||
|
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* An utility class for I/O related functionality.
|
* An utility class for I/O related functionality.
|
||||||
*/
|
*/
|
||||||
|
@ -103,7 +106,8 @@ public class IOUtils {
|
||||||
*/
|
*/
|
||||||
public static void copyBytes(InputStream in, OutputStream out, Configuration conf)
|
public static void copyBytes(InputStream in, OutputStream out, Configuration conf)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
copyBytes(in, out, conf.getInt("io.file.buffer.size", 4096), true);
|
copyBytes(in, out, conf.getInt(
|
||||||
|
IO_FILE_BUFFER_SIZE_KEY, IO_FILE_BUFFER_SIZE_DEFAULT), true);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -117,7 +121,8 @@ public class IOUtils {
|
||||||
*/
|
*/
|
||||||
public static void copyBytes(InputStream in, OutputStream out, Configuration conf, boolean close)
|
public static void copyBytes(InputStream in, OutputStream out, Configuration conf, boolean close)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
copyBytes(in, out, conf.getInt("io.file.buffer.size", 4096), close);
|
copyBytes(in, out, conf.getInt(
|
||||||
|
IO_FILE_BUFFER_SIZE_KEY, IO_FILE_BUFFER_SIZE_DEFAULT), close);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -38,6 +38,9 @@ import org.apache.hadoop.util.Options;
|
||||||
import org.apache.hadoop.util.Progressable;
|
import org.apache.hadoop.util.Progressable;
|
||||||
import org.apache.hadoop.util.ReflectionUtils;
|
import org.apache.hadoop.util.ReflectionUtils;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_MAP_INDEX_SKIP_DEFAULT;
|
||||||
|
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_MAP_INDEX_SKIP_KEY;
|
||||||
|
|
||||||
/** A file-based map from keys to values.
|
/** A file-based map from keys to values.
|
||||||
*
|
*
|
||||||
* <p>A map is a directory containing two files, the <code>data</code> file,
|
* <p>A map is a directory containing two files, the <code>data</code> file,
|
||||||
|
@ -395,7 +398,8 @@ public class MapFile {
|
||||||
Options.getOption(ComparatorOption.class, opts);
|
Options.getOption(ComparatorOption.class, opts);
|
||||||
WritableComparator comparator =
|
WritableComparator comparator =
|
||||||
comparatorOption == null ? null : comparatorOption.getValue();
|
comparatorOption == null ? null : comparatorOption.getValue();
|
||||||
INDEX_SKIP = conf.getInt("io.map.index.skip", 0);
|
INDEX_SKIP = conf.getInt(
|
||||||
|
IO_MAP_INDEX_SKIP_KEY, IO_MAP_INDEX_SKIP_DEFAULT);
|
||||||
open(dir, comparator, conf, opts);
|
open(dir, comparator, conf, opts);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -51,6 +51,13 @@ import org.apache.hadoop.util.MergeSort;
|
||||||
import org.apache.hadoop.util.PriorityQueue;
|
import org.apache.hadoop.util.PriorityQueue;
|
||||||
import org.apache.hadoop.util.Time;
|
import org.apache.hadoop.util.Time;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT;
|
||||||
|
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY;
|
||||||
|
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_SEQFILE_COMPRESS_BLOCKSIZE_DEFAULT;
|
||||||
|
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_SEQFILE_COMPRESS_BLOCKSIZE_KEY;
|
||||||
|
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_SKIP_CHECKSUM_ERRORS_DEFAULT;
|
||||||
|
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_SKIP_CHECKSUM_ERRORS_KEY;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* <code>SequenceFile</code>s are flat files consisting of binary key/value
|
* <code>SequenceFile</code>s are flat files consisting of binary key/value
|
||||||
* pairs.
|
* pairs.
|
||||||
|
@ -1513,7 +1520,9 @@ public class SequenceFile {
|
||||||
Option... options) throws IOException {
|
Option... options) throws IOException {
|
||||||
super(conf, options);
|
super(conf, options);
|
||||||
compressionBlockSize =
|
compressionBlockSize =
|
||||||
conf.getInt("io.seqfile.compress.blocksize", 1000000);
|
conf.getInt(IO_SEQFILE_COMPRESS_BLOCKSIZE_KEY,
|
||||||
|
IO_SEQFILE_COMPRESS_BLOCKSIZE_DEFAULT
|
||||||
|
);
|
||||||
keySerializer.close();
|
keySerializer.close();
|
||||||
keySerializer.open(keyBuffer);
|
keySerializer.open(keyBuffer);
|
||||||
uncompressedValSerializer.close();
|
uncompressedValSerializer.close();
|
||||||
|
@ -1637,7 +1646,7 @@ public class SequenceFile {
|
||||||
|
|
||||||
/** Get the configured buffer size */
|
/** Get the configured buffer size */
|
||||||
private static int getBufferSize(Configuration conf) {
|
private static int getBufferSize(Configuration conf) {
|
||||||
return conf.getInt("io.file.buffer.size", 4096);
|
return conf.getInt(IO_FILE_BUFFER_SIZE_KEY, IO_FILE_BUFFER_SIZE_DEFAULT);
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Reads key/value pairs from a sequence-format file. */
|
/** Reads key/value pairs from a sequence-format file. */
|
||||||
|
@ -2655,7 +2664,8 @@ public class SequenceFile {
|
||||||
|
|
||||||
private void handleChecksumException(ChecksumException e)
|
private void handleChecksumException(ChecksumException e)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
if (this.conf.getBoolean("io.skip.checksum.errors", false)) {
|
if (this.conf.getBoolean(
|
||||||
|
IO_SKIP_CHECKSUM_ERRORS_KEY, IO_SKIP_CHECKSUM_ERRORS_DEFAULT)) {
|
||||||
LOG.warn("Bad checksum at "+getPosition()+". Skipping entries.");
|
LOG.warn("Bad checksum at "+getPosition()+". Skipping entries.");
|
||||||
sync(getPosition()+this.conf.getInt("io.bytes.per.checksum", 512));
|
sync(getPosition()+this.conf.getInt("io.bytes.per.checksum", 512));
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -35,6 +35,9 @@ import org.apache.hadoop.io.compress.bzip2.CBZip2InputStream;
|
||||||
import org.apache.hadoop.io.compress.bzip2.CBZip2OutputStream;
|
import org.apache.hadoop.io.compress.bzip2.CBZip2OutputStream;
|
||||||
import org.apache.hadoop.io.compress.bzip2.Bzip2Factory;
|
import org.apache.hadoop.io.compress.bzip2.Bzip2Factory;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT;
|
||||||
|
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This class provides output and input streams for bzip2 compression
|
* This class provides output and input streams for bzip2 compression
|
||||||
* and decompression. It uses the native bzip2 library on the system
|
* and decompression. It uses the native bzip2 library on the system
|
||||||
|
@ -120,7 +123,8 @@ public class BZip2Codec implements Configurable, SplittableCompressionCodec {
|
||||||
Compressor compressor) throws IOException {
|
Compressor compressor) throws IOException {
|
||||||
return Bzip2Factory.isNativeBzip2Loaded(conf) ?
|
return Bzip2Factory.isNativeBzip2Loaded(conf) ?
|
||||||
new CompressorStream(out, compressor,
|
new CompressorStream(out, compressor,
|
||||||
conf.getInt("io.file.buffer.size", 4*1024)) :
|
conf.getInt(IO_FILE_BUFFER_SIZE_KEY,
|
||||||
|
IO_FILE_BUFFER_SIZE_DEFAULT)) :
|
||||||
new BZip2CompressionOutputStream(out);
|
new BZip2CompressionOutputStream(out);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -174,7 +178,8 @@ public class BZip2Codec implements Configurable, SplittableCompressionCodec {
|
||||||
Decompressor decompressor) throws IOException {
|
Decompressor decompressor) throws IOException {
|
||||||
return Bzip2Factory.isNativeBzip2Loaded(conf) ?
|
return Bzip2Factory.isNativeBzip2Loaded(conf) ?
|
||||||
new DecompressorStream(in, decompressor,
|
new DecompressorStream(in, decompressor,
|
||||||
conf.getInt("io.file.buffer.size", 4*1024)) :
|
conf.getInt(IO_FILE_BUFFER_SIZE_KEY,
|
||||||
|
IO_FILE_BUFFER_SIZE_DEFAULT)) :
|
||||||
new BZip2CompressionInputStream(in);
|
new BZip2CompressionInputStream(in);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -31,6 +31,9 @@ import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.io.compress.zlib.ZlibDecompressor;
|
import org.apache.hadoop.io.compress.zlib.ZlibDecompressor;
|
||||||
import org.apache.hadoop.io.compress.zlib.ZlibFactory;
|
import org.apache.hadoop.io.compress.zlib.ZlibFactory;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT;
|
||||||
|
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY;
|
||||||
|
|
||||||
@InterfaceAudience.Public
|
@InterfaceAudience.Public
|
||||||
@InterfaceStability.Evolving
|
@InterfaceStability.Evolving
|
||||||
public class DefaultCodec implements Configurable, CompressionCodec, DirectDecompressionCodec {
|
public class DefaultCodec implements Configurable, CompressionCodec, DirectDecompressionCodec {
|
||||||
|
@ -60,7 +63,8 @@ public class DefaultCodec implements Configurable, CompressionCodec, DirectDecom
|
||||||
Compressor compressor)
|
Compressor compressor)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
return new CompressorStream(out, compressor,
|
return new CompressorStream(out, compressor,
|
||||||
conf.getInt("io.file.buffer.size", 4*1024));
|
conf.getInt(IO_FILE_BUFFER_SIZE_KEY,
|
||||||
|
IO_FILE_BUFFER_SIZE_DEFAULT));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -85,7 +89,8 @@ public class DefaultCodec implements Configurable, CompressionCodec, DirectDecom
|
||||||
Decompressor decompressor)
|
Decompressor decompressor)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
return new DecompressorStream(in, decompressor,
|
return new DecompressorStream(in, decompressor,
|
||||||
conf.getInt("io.file.buffer.size", 4*1024));
|
conf.getInt(IO_FILE_BUFFER_SIZE_KEY,
|
||||||
|
IO_FILE_BUFFER_SIZE_DEFAULT));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -27,6 +27,8 @@ import org.apache.hadoop.io.compress.DefaultCodec;
|
||||||
import org.apache.hadoop.io.compress.zlib.*;
|
import org.apache.hadoop.io.compress.zlib.*;
|
||||||
import org.apache.hadoop.io.compress.zlib.ZlibDecompressor.ZlibDirectDecompressor;
|
import org.apache.hadoop.io.compress.zlib.ZlibDecompressor.ZlibDirectDecompressor;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT;
|
||||||
|
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY;
|
||||||
import static org.apache.hadoop.util.PlatformName.IBM_JAVA;
|
import static org.apache.hadoop.util.PlatformName.IBM_JAVA;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -117,8 +119,8 @@ public class GzipCodec extends DefaultCodec {
|
||||||
throws IOException {
|
throws IOException {
|
||||||
return (compressor != null) ?
|
return (compressor != null) ?
|
||||||
new CompressorStream(out, compressor,
|
new CompressorStream(out, compressor,
|
||||||
conf.getInt("io.file.buffer.size",
|
conf.getInt(IO_FILE_BUFFER_SIZE_KEY,
|
||||||
4*1024)) :
|
IO_FILE_BUFFER_SIZE_DEFAULT)) :
|
||||||
createOutputStream(out);
|
createOutputStream(out);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -151,7 +153,8 @@ public class GzipCodec extends DefaultCodec {
|
||||||
decompressor = createDecompressor(); // always succeeds (or throws)
|
decompressor = createDecompressor(); // always succeeds (or throws)
|
||||||
}
|
}
|
||||||
return new DecompressorStream(in, decompressor,
|
return new DecompressorStream(in, decompressor,
|
||||||
conf.getInt("io.file.buffer.size", 4*1024));
|
conf.getInt(IO_FILE_BUFFER_SIZE_KEY,
|
||||||
|
IO_FILE_BUFFER_SIZE_DEFAULT));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -36,6 +36,10 @@ import org.apache.hadoop.io.compress.Decompressor;
|
||||||
import org.apache.hadoop.io.compress.DefaultCodec;
|
import org.apache.hadoop.io.compress.DefaultCodec;
|
||||||
import org.apache.hadoop.util.ReflectionUtils;
|
import org.apache.hadoop.util.ReflectionUtils;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.fs.CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_DEFAULT;
|
||||||
|
import static org.apache.hadoop.fs.CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_KEY;
|
||||||
|
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Compression related stuff.
|
* Compression related stuff.
|
||||||
*/
|
*/
|
||||||
|
@ -124,7 +128,8 @@ final class Compression {
|
||||||
} else {
|
} else {
|
||||||
bis1 = downStream;
|
bis1 = downStream;
|
||||||
}
|
}
|
||||||
conf.setInt("io.compression.codec.lzo.buffersize", 64 * 1024);
|
conf.setInt(IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_KEY,
|
||||||
|
IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_DEFAULT);
|
||||||
CompressionInputStream cis =
|
CompressionInputStream cis =
|
||||||
codec.createInputStream(bis1, decompressor);
|
codec.createInputStream(bis1, decompressor);
|
||||||
BufferedInputStream bis2 = new BufferedInputStream(cis, DATA_IBUF_SIZE);
|
BufferedInputStream bis2 = new BufferedInputStream(cis, DATA_IBUF_SIZE);
|
||||||
|
@ -146,7 +151,8 @@ final class Compression {
|
||||||
} else {
|
} else {
|
||||||
bos1 = downStream;
|
bos1 = downStream;
|
||||||
}
|
}
|
||||||
conf.setInt("io.compression.codec.lzo.buffersize", 64 * 1024);
|
conf.setInt(IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_KEY,
|
||||||
|
IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_DEFAULT);
|
||||||
CompressionOutputStream cos =
|
CompressionOutputStream cos =
|
||||||
codec.createOutputStream(bos1, compressor);
|
codec.createOutputStream(bos1, compressor);
|
||||||
BufferedOutputStream bos2 =
|
BufferedOutputStream bos2 =
|
||||||
|
@ -175,7 +181,7 @@ final class Compression {
|
||||||
int downStreamBufferSize) throws IOException {
|
int downStreamBufferSize) throws IOException {
|
||||||
// Set the internal buffer size to read from down stream.
|
// Set the internal buffer size to read from down stream.
|
||||||
if (downStreamBufferSize > 0) {
|
if (downStreamBufferSize > 0) {
|
||||||
codec.getConf().setInt("io.file.buffer.size", downStreamBufferSize);
|
codec.getConf().setInt(IO_FILE_BUFFER_SIZE_KEY, downStreamBufferSize);
|
||||||
}
|
}
|
||||||
CompressionInputStream cis =
|
CompressionInputStream cis =
|
||||||
codec.createInputStream(downStream, decompressor);
|
codec.createInputStream(downStream, decompressor);
|
||||||
|
@ -193,7 +199,7 @@ final class Compression {
|
||||||
} else {
|
} else {
|
||||||
bos1 = downStream;
|
bos1 = downStream;
|
||||||
}
|
}
|
||||||
codec.getConf().setInt("io.file.buffer.size", 32 * 1024);
|
codec.getConf().setInt(IO_FILE_BUFFER_SIZE_KEY, 32 * 1024);
|
||||||
CompressionOutputStream cos =
|
CompressionOutputStream cos =
|
||||||
codec.createOutputStream(bos1, compressor);
|
codec.createOutputStream(bos1, compressor);
|
||||||
BufferedOutputStream bos2 =
|
BufferedOutputStream bos2 =
|
||||||
|
|
|
@ -31,6 +31,8 @@ import org.apache.hadoop.classification.InterfaceStability;
|
||||||
import org.apache.hadoop.conf.Configurable;
|
import org.apache.hadoop.conf.Configurable;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SOCKS_SERVER_KEY;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Specialized SocketFactory to create sockets with a SOCKS proxy
|
* Specialized SocketFactory to create sockets with a SOCKS proxy
|
||||||
*/
|
*/
|
||||||
|
@ -133,7 +135,7 @@ public class SocksSocketFactory extends SocketFactory implements
|
||||||
@Override
|
@Override
|
||||||
public void setConf(Configuration conf) {
|
public void setConf(Configuration conf) {
|
||||||
this.conf = conf;
|
this.conf = conf;
|
||||||
String proxyStr = conf.get("hadoop.socks.server");
|
String proxyStr = conf.get(HADOOP_SOCKS_SERVER_KEY);
|
||||||
if ((proxyStr != null) && (proxyStr.length() > 0)) {
|
if ((proxyStr != null) && (proxyStr.length() > 0)) {
|
||||||
setProxy(proxyStr);
|
setProxy(proxyStr);
|
||||||
}
|
}
|
||||||
|
|
|
@ -27,6 +27,8 @@ import org.apache.hadoop.classification.InterfaceStability;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.io.Text;
|
import org.apache.hadoop.io.Text;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A class that provides a line reader from an input stream.
|
* A class that provides a line reader from an input stream.
|
||||||
* Depending on the constructor used, lines will either be terminated by:
|
* Depending on the constructor used, lines will either be terminated by:
|
||||||
|
@ -89,7 +91,7 @@ public class LineReader implements Closeable {
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
public LineReader(InputStream in, Configuration conf) throws IOException {
|
public LineReader(InputStream in, Configuration conf) throws IOException {
|
||||||
this(in, conf.getInt("io.file.buffer.size", DEFAULT_BUFFER_SIZE));
|
this(in, conf.getInt(IO_FILE_BUFFER_SIZE_KEY, DEFAULT_BUFFER_SIZE));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -136,7 +138,7 @@ public class LineReader implements Closeable {
|
||||||
public LineReader(InputStream in, Configuration conf,
|
public LineReader(InputStream in, Configuration conf,
|
||||||
byte[] recordDelimiterBytes) throws IOException {
|
byte[] recordDelimiterBytes) throws IOException {
|
||||||
this.in = in;
|
this.in = in;
|
||||||
this.bufferSize = conf.getInt("io.file.buffer.size", DEFAULT_BUFFER_SIZE);
|
this.bufferSize = conf.getInt(IO_FILE_BUFFER_SIZE_KEY, DEFAULT_BUFFER_SIZE);
|
||||||
this.buffer = new byte[this.bufferSize];
|
this.buffer = new byte[this.bufferSize];
|
||||||
this.recordDelimiterBytes = recordDelimiterBytes;
|
this.recordDelimiterBytes = recordDelimiterBytes;
|
||||||
}
|
}
|
||||||
|
|
|
@ -22,6 +22,9 @@ import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.classification.InterfaceStability;
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_UTIL_HASH_TYPE_DEFAULT;
|
||||||
|
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_UTIL_HASH_TYPE_KEY;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This class represents a common API for hashing functions.
|
* This class represents a common API for hashing functions.
|
||||||
*/
|
*/
|
||||||
|
@ -59,7 +62,8 @@ public abstract class Hash {
|
||||||
* @return one of the predefined constants
|
* @return one of the predefined constants
|
||||||
*/
|
*/
|
||||||
public static int getHashType(Configuration conf) {
|
public static int getHashType(Configuration conf) {
|
||||||
String name = conf.get("hadoop.util.hash.type", "murmur");
|
String name = conf.get(HADOOP_UTIL_HASH_TYPE_KEY,
|
||||||
|
HADOOP_UTIL_HASH_TYPE_DEFAULT);
|
||||||
return parseHashType(name);
|
return parseHashType(name);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -47,6 +47,9 @@ import java.util.Properties;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT;
|
||||||
|
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Utilities used across test cases.
|
* Utilities used across test cases.
|
||||||
*/
|
*/
|
||||||
|
@ -55,8 +58,6 @@ public class ContractTestUtils extends Assert {
|
||||||
private static final Logger LOG =
|
private static final Logger LOG =
|
||||||
LoggerFactory.getLogger(ContractTestUtils.class);
|
LoggerFactory.getLogger(ContractTestUtils.class);
|
||||||
|
|
||||||
public static final String IO_FILE_BUFFER_SIZE = "io.file.buffer.size";
|
|
||||||
|
|
||||||
// For scale testing, we can repeatedly write small chunk data to generate
|
// For scale testing, we can repeatedly write small chunk data to generate
|
||||||
// a large file.
|
// a large file.
|
||||||
public static final String IO_CHUNK_BUFFER_SIZE = "io.chunk.buffer.size";
|
public static final String IO_CHUNK_BUFFER_SIZE = "io.chunk.buffer.size";
|
||||||
|
@ -150,8 +151,8 @@ public class ContractTestUtils extends Assert {
|
||||||
FSDataOutputStream out = fs.create(path,
|
FSDataOutputStream out = fs.create(path,
|
||||||
overwrite,
|
overwrite,
|
||||||
fs.getConf()
|
fs.getConf()
|
||||||
.getInt(IO_FILE_BUFFER_SIZE,
|
.getInt(IO_FILE_BUFFER_SIZE_KEY,
|
||||||
4096),
|
IO_FILE_BUFFER_SIZE_DEFAULT),
|
||||||
(short) 1,
|
(short) 1,
|
||||||
buffersize);
|
buffersize);
|
||||||
out.write(src, 0, len);
|
out.write(src, 0, len);
|
||||||
|
|
Loading…
Reference in New Issue