HADOOP-13680. fs.s3a.readahead.range to use getLongBytes. Contributed by Abhishek Modi.
This commit is contained in:
parent
ce13463e7a
commit
c4ccafdaff
|
@ -1044,8 +1044,10 @@
|
||||||
|
|
||||||
<property>
|
<property>
|
||||||
<name>fs.s3a.multipart.size</name>
|
<name>fs.s3a.multipart.size</name>
|
||||||
<value>104857600</value>
|
<value>100M</value>
|
||||||
<description>How big (in bytes) to split upload or copy operations up into.</description>
|
<description>How big (in bytes) to split upload or copy operations up into.
|
||||||
|
A suffix from the set {K,M,G,T,P} may be used to scale the numeric value.
|
||||||
|
</description>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
<property>
|
<property>
|
||||||
|
@ -1053,7 +1055,8 @@
|
||||||
<value>2147483647</value>
|
<value>2147483647</value>
|
||||||
<description>How big (in bytes) to split upload or copy operations up into.
|
<description>How big (in bytes) to split upload or copy operations up into.
|
||||||
This also controls the partition size in renamed files, as rename() involves
|
This also controls the partition size in renamed files, as rename() involves
|
||||||
copying the source file(s)
|
copying the source file(s).
|
||||||
|
A suffix from the set {K,M,G,T,P} may be used to scale the numeric value.
|
||||||
</description>
|
</description>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
|
@ -1109,8 +1112,9 @@
|
||||||
|
|
||||||
<property>
|
<property>
|
||||||
<name>fs.s3a.block.size</name>
|
<name>fs.s3a.block.size</name>
|
||||||
<value>33554432</value>
|
<value>32M</value>
|
||||||
<description>Block size to use when reading files using s3a: file system.
|
<description>Block size to use when reading files using s3a: file system.
|
||||||
|
A suffix from the set {K,M,G,T,P} may be used to scale the numeric value.
|
||||||
</description>
|
</description>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
|
@ -1172,10 +1176,12 @@
|
||||||
|
|
||||||
<property>
|
<property>
|
||||||
<name>fs.s3a.readahead.range</name>
|
<name>fs.s3a.readahead.range</name>
|
||||||
<value>65536</value>
|
<value>64K</value>
|
||||||
<description>Bytes to read ahead during a seek() before closing and
|
<description>Bytes to read ahead during a seek() before closing and
|
||||||
re-opening the S3 HTTP connection. This option will be overridden if
|
re-opening the S3 HTTP connection. This option will be overridden if
|
||||||
any call to setReadahead() is made to an open stream.</description>
|
any call to setReadahead() is made to an open stream.
|
||||||
|
A suffix from the set {K,M,G,T,P} may be used to scale the numeric value.
|
||||||
|
</description>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
<property>
|
<property>
|
||||||
|
|
|
@ -182,10 +182,11 @@ public class S3AFileSystem extends FileSystem {
|
||||||
MIN_MULTIPART_THRESHOLD, DEFAULT_MIN_MULTIPART_THRESHOLD);
|
MIN_MULTIPART_THRESHOLD, DEFAULT_MIN_MULTIPART_THRESHOLD);
|
||||||
|
|
||||||
//check but do not store the block size
|
//check but do not store the block size
|
||||||
longOption(conf, FS_S3A_BLOCK_SIZE, DEFAULT_BLOCKSIZE, 1);
|
longBytesOption(conf, FS_S3A_BLOCK_SIZE, DEFAULT_BLOCKSIZE, 1);
|
||||||
enableMultiObjectsDelete = conf.getBoolean(ENABLE_MULTI_DELETE, true);
|
enableMultiObjectsDelete = conf.getBoolean(ENABLE_MULTI_DELETE, true);
|
||||||
|
|
||||||
readAhead = longOption(conf, READAHEAD_RANGE, DEFAULT_READAHEAD_RANGE, 0);
|
readAhead = longBytesOption(conf, READAHEAD_RANGE,
|
||||||
|
DEFAULT_READAHEAD_RANGE, 0);
|
||||||
storageStatistics = (S3AStorageStatistics)
|
storageStatistics = (S3AStorageStatistics)
|
||||||
GlobalStorageStatistics.INSTANCE
|
GlobalStorageStatistics.INSTANCE
|
||||||
.put(S3AStorageStatistics.NAME,
|
.put(S3AStorageStatistics.NAME,
|
||||||
|
@ -356,6 +357,16 @@ public class S3AFileSystem extends FileSystem {
|
||||||
return s3;
|
return s3;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the read ahead range value used by this filesystem
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
long getReadAheadRange() {
|
||||||
|
return readAhead;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get the input policy for this FS instance.
|
* Get the input policy for this FS instance.
|
||||||
* @return the input policy
|
* @return the input policy
|
||||||
|
@ -1881,7 +1892,7 @@ public class S3AFileSystem extends FileSystem {
|
||||||
*/
|
*/
|
||||||
@Deprecated
|
@Deprecated
|
||||||
public long getDefaultBlockSize() {
|
public long getDefaultBlockSize() {
|
||||||
return getConf().getLong(FS_S3A_BLOCK_SIZE, DEFAULT_BLOCKSIZE);
|
return getConf().getLongBytes(FS_S3A_BLOCK_SIZE, DEFAULT_BLOCKSIZE);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -509,6 +509,27 @@ public final class S3AUtils {
|
||||||
return v;
|
return v;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get a long option >= the minimum allowed value, supporting memory
|
||||||
|
* prefixes K,M,G,T,P.
|
||||||
|
* @param conf configuration
|
||||||
|
* @param key key to look up
|
||||||
|
* @param defVal default value
|
||||||
|
* @param min minimum value
|
||||||
|
* @return the value
|
||||||
|
* @throws IllegalArgumentException if the value is below the minimum
|
||||||
|
*/
|
||||||
|
static long longBytesOption(Configuration conf,
|
||||||
|
String key,
|
||||||
|
long defVal,
|
||||||
|
long min) {
|
||||||
|
long v = conf.getLongBytes(key, defVal);
|
||||||
|
Preconditions.checkArgument(v >= min,
|
||||||
|
String.format("Value of %s: %d is below the minimum value %d",
|
||||||
|
key, v, min));
|
||||||
|
return v;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get a size property from the configuration: this property must
|
* Get a size property from the configuration: this property must
|
||||||
* be at least equal to {@link Constants#MULTIPART_MIN_SIZE}.
|
* be at least equal to {@link Constants#MULTIPART_MIN_SIZE}.
|
||||||
|
@ -521,7 +542,7 @@ public final class S3AUtils {
|
||||||
*/
|
*/
|
||||||
public static long getMultipartSizeProperty(Configuration conf,
|
public static long getMultipartSizeProperty(Configuration conf,
|
||||||
String property, long defVal) {
|
String property, long defVal) {
|
||||||
long partSize = conf.getLong(property, defVal);
|
long partSize = conf.getLongBytes(property, defVal);
|
||||||
if (partSize < MULTIPART_MIN_SIZE) {
|
if (partSize < MULTIPART_MIN_SIZE) {
|
||||||
LOG.warn("{} must be at least 5 MB; configured value is {}",
|
LOG.warn("{} must be at least 5 MB; configured value is {}",
|
||||||
property, partSize);
|
property, partSize);
|
||||||
|
|
|
@ -791,16 +791,20 @@ from placing its declaration on the command line.
|
||||||
|
|
||||||
<property>
|
<property>
|
||||||
<name>fs.s3a.multipart.size</name>
|
<name>fs.s3a.multipart.size</name>
|
||||||
<value>104857600</value>
|
<value>100M</value>
|
||||||
<description>How big (in bytes) to split upload or copy operations up into.
|
<description>How big (in bytes) to split upload or copy operations up into.
|
||||||
This also controls the partition size in renamed files, as rename() involves
|
A suffix from the set {K,M,G,T,P} may be used to scale the numeric value.
|
||||||
copying the source file(s)</description>
|
</description>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
<property>
|
<property>
|
||||||
<name>fs.s3a.multipart.threshold</name>
|
<name>fs.s3a.multipart.threshold</name>
|
||||||
<value>2147483647</value>
|
<value>2147483647</value>
|
||||||
<description>Threshold before uploads or copies use parallel multipart operations.</description>
|
<description>How big (in bytes) to split upload or copy operations up into.
|
||||||
|
This also controls the partition size in renamed files, as rename() involves
|
||||||
|
copying the source file(s).
|
||||||
|
A suffix from the set {K,M,G,T,P} may be used to scale the numeric value.
|
||||||
|
</description>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
<property>
|
<property>
|
||||||
|
@ -854,7 +858,7 @@ from placing its declaration on the command line.
|
||||||
|
|
||||||
<property>
|
<property>
|
||||||
<name>fs.s3a.block.size</name>
|
<name>fs.s3a.block.size</name>
|
||||||
<value>33554432</value>
|
<value>32M</value>
|
||||||
<description>Block size to use when reading files using s3a: file system.
|
<description>Block size to use when reading files using s3a: file system.
|
||||||
</description>
|
</description>
|
||||||
</property>
|
</property>
|
||||||
|
@ -888,7 +892,7 @@ from placing its declaration on the command line.
|
||||||
|
|
||||||
<property>
|
<property>
|
||||||
<name>fs.s3a.readahead.range</name>
|
<name>fs.s3a.readahead.range</name>
|
||||||
<value>65536</value>
|
<value>64K</value>
|
||||||
<description>Bytes to read ahead during a seek() before closing and
|
<description>Bytes to read ahead during a seek() before closing and
|
||||||
re-opening the S3 HTTP connection. This option will be overridden if
|
re-opening the S3 HTTP connection. This option will be overridden if
|
||||||
any call to setReadahead() is made to an open stream.</description>
|
any call to setReadahead() is made to an open stream.</description>
|
||||||
|
@ -1058,9 +1062,9 @@ S3 endpoints, as disks are not used for intermediate data storage.
|
||||||
|
|
||||||
<property>
|
<property>
|
||||||
<name>fs.s3a.multipart.size</name>
|
<name>fs.s3a.multipart.size</name>
|
||||||
<value>104857600</value>
|
<value>100M</value>
|
||||||
<description>
|
<description>How big (in bytes) to split upload or copy operations up into.
|
||||||
How big (in bytes) to split upload or copy operations up into.
|
A suffix from the set {K,M,G,T,P} may be used to scale the numeric value.
|
||||||
</description>
|
</description>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
|
|
|
@ -380,7 +380,7 @@ public class ITestS3AConfiguration {
|
||||||
byte[] file = ContractTestUtils.toAsciiByteArray("test file");
|
byte[] file = ContractTestUtils.toAsciiByteArray("test file");
|
||||||
ContractTestUtils.writeAndRead(fs,
|
ContractTestUtils.writeAndRead(fs,
|
||||||
new Path("/path/style/access/testFile"), file, file.length,
|
new Path("/path/style/access/testFile"), file, file.length,
|
||||||
conf.getInt(Constants.FS_S3A_BLOCK_SIZE, file.length), false, true);
|
(int) conf.getLongBytes(Constants.FS_S3A_BLOCK_SIZE, file.length), false, true);
|
||||||
} catch (final AWSS3IOException e) {
|
} catch (final AWSS3IOException e) {
|
||||||
LOG.error("Caught exception: ", e);
|
LOG.error("Caught exception: ", e);
|
||||||
// Catch/pass standard path style access behaviour when live bucket
|
// Catch/pass standard path style access behaviour when live bucket
|
||||||
|
@ -451,6 +451,17 @@ public class ITestS3AConfiguration {
|
||||||
tmp1.getParent(), tmp2.getParent());
|
tmp1.getParent(), tmp2.getParent());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testReadAheadRange() throws Exception {
|
||||||
|
conf = new Configuration();
|
||||||
|
conf.set(Constants.READAHEAD_RANGE, "300K");
|
||||||
|
fs = S3ATestUtils.createTestFileSystem(conf);
|
||||||
|
assertNotNull(fs);
|
||||||
|
long readAheadRange = fs.getReadAheadRange();
|
||||||
|
assertNotNull(readAheadRange);
|
||||||
|
assertEquals("Read Ahead Range Incorrect.", 300 * 1024, readAheadRange);
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testUsernameFromUGI() throws Throwable {
|
public void testUsernameFromUGI() throws Throwable {
|
||||||
final String alice = "alice";
|
final String alice = "alice";
|
||||||
|
|
Loading…
Reference in New Issue