HADOOP-11584 s3a file block size set to 0 in getFileStatus. (Brahma Reddy Battula via stevel)
This commit is contained in:
parent
eca1588db8
commit
a36cad0522
|
@ -578,6 +578,9 @@ Release 2.7.0 - UNRELEASED
|
||||||
HADOOP-11612. Workaround for Curator's ChildReaper requiring Guava 15+.
|
HADOOP-11612. Workaround for Curator's ChildReaper requiring Guava 15+.
|
||||||
(rkanter)
|
(rkanter)
|
||||||
|
|
||||||
|
HADOOP-11584 s3a file block size set to 0 in getFileStatus.
|
||||||
|
(Brahma Reddy Battula via stevel)
|
||||||
|
|
||||||
Release 2.6.1 - UNRELEASED
|
Release 2.6.1 - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -31,8 +31,9 @@ public class S3AFileStatus extends FileStatus {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Files
|
// Files
|
||||||
public S3AFileStatus(long length, long modification_time, Path path) {
|
public S3AFileStatus(long length, long modification_time, Path path,
|
||||||
super(length, false, 1, 0, modification_time, path);
|
long blockSize) {
|
||||||
|
super(length, false, 1, blockSize, modification_time, path);
|
||||||
isEmptyDirectory = false;
|
isEmptyDirectory = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -77,6 +77,10 @@ import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
public class S3AFileSystem extends FileSystem {
|
public class S3AFileSystem extends FileSystem {
|
||||||
|
/**
|
||||||
|
* Default blocksize as used in blocksize and FS status queries
|
||||||
|
*/
|
||||||
|
public static final int DEFAULT_BLOCKSIZE = 32 * 1024 * 1024;
|
||||||
private URI uri;
|
private URI uri;
|
||||||
private Path workingDir;
|
private Path workingDir;
|
||||||
private AmazonS3Client s3;
|
private AmazonS3Client s3;
|
||||||
|
@ -256,7 +260,7 @@ public class S3AFileSystem extends FileSystem {
|
||||||
}
|
}
|
||||||
long keepAliveTime = conf.getLong(KEEPALIVE_TIME, DEFAULT_KEEPALIVE_TIME);
|
long keepAliveTime = conf.getLong(KEEPALIVE_TIME, DEFAULT_KEEPALIVE_TIME);
|
||||||
LinkedBlockingQueue<Runnable> workQueue =
|
LinkedBlockingQueue<Runnable> workQueue =
|
||||||
new LinkedBlockingQueue<Runnable>(maxThreads *
|
new LinkedBlockingQueue<>(maxThreads *
|
||||||
conf.getInt(MAX_TOTAL_TASKS, DEFAULT_MAX_TOTAL_TASKS));
|
conf.getInt(MAX_TOTAL_TASKS, DEFAULT_MAX_TOTAL_TASKS));
|
||||||
ThreadPoolExecutor tpe = new ThreadPoolExecutor(
|
ThreadPoolExecutor tpe = new ThreadPoolExecutor(
|
||||||
coreThreads,
|
coreThreads,
|
||||||
|
@ -434,7 +438,7 @@ public class S3AFileSystem extends FileSystem {
|
||||||
String srcKey = pathToKey(src);
|
String srcKey = pathToKey(src);
|
||||||
String dstKey = pathToKey(dst);
|
String dstKey = pathToKey(dst);
|
||||||
|
|
||||||
if (srcKey.length() == 0 || dstKey.length() == 0) {
|
if (srcKey.isEmpty() || dstKey.isEmpty()) {
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("rename: src or dst are empty");
|
LOG.debug("rename: src or dst are empty");
|
||||||
}
|
}
|
||||||
|
@ -526,7 +530,7 @@ public class S3AFileSystem extends FileSystem {
|
||||||
}
|
}
|
||||||
|
|
||||||
List<DeleteObjectsRequest.KeyVersion> keysToDelete =
|
List<DeleteObjectsRequest.KeyVersion> keysToDelete =
|
||||||
new ArrayList<DeleteObjectsRequest.KeyVersion>();
|
new ArrayList<>();
|
||||||
if (dstStatus != null && dstStatus.isEmptyDirectory()) {
|
if (dstStatus != null && dstStatus.isEmptyDirectory()) {
|
||||||
// delete unnecessary fake directory.
|
// delete unnecessary fake directory.
|
||||||
keysToDelete.add(new DeleteObjectsRequest.KeyVersion(dstKey));
|
keysToDelete.add(new DeleteObjectsRequest.KeyVersion(dstKey));
|
||||||
|
@ -640,7 +644,7 @@ public class S3AFileSystem extends FileSystem {
|
||||||
request.setMaxKeys(maxKeys);
|
request.setMaxKeys(maxKeys);
|
||||||
|
|
||||||
List<DeleteObjectsRequest.KeyVersion> keys =
|
List<DeleteObjectsRequest.KeyVersion> keys =
|
||||||
new ArrayList<DeleteObjectsRequest.KeyVersion>();
|
new ArrayList<>();
|
||||||
ObjectListing objects = s3.listObjects(request);
|
ObjectListing objects = s3.listObjects(request);
|
||||||
statistics.incrementReadOps(1);
|
statistics.incrementReadOps(1);
|
||||||
while (true) {
|
while (true) {
|
||||||
|
@ -663,7 +667,7 @@ public class S3AFileSystem extends FileSystem {
|
||||||
objects = s3.listNextBatchOfObjects(objects);
|
objects = s3.listNextBatchOfObjects(objects);
|
||||||
statistics.incrementReadOps(1);
|
statistics.incrementReadOps(1);
|
||||||
} else {
|
} else {
|
||||||
if (keys.size() > 0) {
|
if (!keys.isEmpty()) {
|
||||||
DeleteObjectsRequest deleteRequest =
|
DeleteObjectsRequest deleteRequest =
|
||||||
new DeleteObjectsRequest(bucket).withKeys(keys);
|
new DeleteObjectsRequest(bucket).withKeys(keys);
|
||||||
s3.deleteObjects(deleteRequest);
|
s3.deleteObjects(deleteRequest);
|
||||||
|
@ -751,7 +755,8 @@ public class S3AFileSystem extends FileSystem {
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
result.add(new S3AFileStatus(summary.getSize(),
|
result.add(new S3AFileStatus(summary.getSize(),
|
||||||
dateToLong(summary.getLastModified()), keyPath));
|
dateToLong(summary.getLastModified()), keyPath,
|
||||||
|
getDefaultBlockSize(f.makeQualified(uri, workingDir))));
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("Adding: fi: " + keyPath);
|
LOG.debug("Adding: fi: " + keyPath);
|
||||||
}
|
}
|
||||||
|
@ -877,13 +882,16 @@ public class S3AFileSystem extends FileSystem {
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("Found exact file: fake directory");
|
LOG.debug("Found exact file: fake directory");
|
||||||
}
|
}
|
||||||
return new S3AFileStatus(true, true, f.makeQualified(uri, workingDir));
|
return new S3AFileStatus(true, true,
|
||||||
|
f.makeQualified(uri, workingDir));
|
||||||
} else {
|
} else {
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("Found exact file: normal file");
|
LOG.debug("Found exact file: normal file");
|
||||||
}
|
}
|
||||||
return new S3AFileStatus(meta.getContentLength(), dateToLong(meta.getLastModified()),
|
return new S3AFileStatus(meta.getContentLength(),
|
||||||
f.makeQualified(uri, workingDir));
|
dateToLong(meta.getLastModified()),
|
||||||
|
f.makeQualified(uri, workingDir),
|
||||||
|
getDefaultBlockSize(f.makeQualified(uri, workingDir)));
|
||||||
}
|
}
|
||||||
} catch (AmazonServiceException e) {
|
} catch (AmazonServiceException e) {
|
||||||
if (e.getStatusCode() != 404) {
|
if (e.getStatusCode() != 404) {
|
||||||
|
@ -910,8 +918,10 @@ public class S3AFileSystem extends FileSystem {
|
||||||
} else {
|
} else {
|
||||||
LOG.warn("Found file (with /): real file? should not happen: {}", key);
|
LOG.warn("Found file (with /): real file? should not happen: {}", key);
|
||||||
|
|
||||||
return new S3AFileStatus(meta.getContentLength(), dateToLong(meta.getLastModified()),
|
return new S3AFileStatus(meta.getContentLength(),
|
||||||
f.makeQualified(uri, workingDir));
|
dateToLong(meta.getLastModified()),
|
||||||
|
f.makeQualified(uri, workingDir),
|
||||||
|
getDefaultBlockSize(f.makeQualified(uri, workingDir)));
|
||||||
}
|
}
|
||||||
} catch (AmazonServiceException e) {
|
} catch (AmazonServiceException e) {
|
||||||
if (e.getStatusCode() != 404) {
|
if (e.getStatusCode() != 404) {
|
||||||
|
@ -953,7 +963,8 @@ public class S3AFileSystem extends FileSystem {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return new S3AFileStatus(true, false, f.makeQualified(uri, workingDir));
|
return new S3AFileStatus(true, false,
|
||||||
|
f.makeQualified(uri, workingDir));
|
||||||
}
|
}
|
||||||
} catch (AmazonServiceException e) {
|
} catch (AmazonServiceException e) {
|
||||||
if (e.getStatusCode() != 404) {
|
if (e.getStatusCode() != 404) {
|
||||||
|
@ -1128,8 +1139,8 @@ public class S3AFileSystem extends FileSystem {
|
||||||
s3.deleteObject(bucket, key + "/");
|
s3.deleteObject(bucket, key + "/");
|
||||||
statistics.incrementWriteOps(1);
|
statistics.incrementWriteOps(1);
|
||||||
}
|
}
|
||||||
} catch (FileNotFoundException e) {
|
} catch (FileNotFoundException | AmazonServiceException e) {
|
||||||
} catch (AmazonServiceException e) {}
|
}
|
||||||
|
|
||||||
if (f.isRoot()) {
|
if (f.isRoot()) {
|
||||||
break;
|
break;
|
||||||
|
@ -1178,7 +1189,7 @@ public class S3AFileSystem extends FileSystem {
|
||||||
@Deprecated
|
@Deprecated
|
||||||
public long getDefaultBlockSize() {
|
public long getDefaultBlockSize() {
|
||||||
// default to 32MB: large enough to minimize the impact of seeks
|
// default to 32MB: large enough to minimize the impact of seeks
|
||||||
return getConf().getLong(FS_S3A_BLOCK_SIZE, 32 * 1024 * 1024);
|
return getConf().getLong(FS_S3A_BLOCK_SIZE, DEFAULT_BLOCKSIZE);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void printAmazonServiceException(AmazonServiceException ase) {
|
private void printAmazonServiceException(AmazonServiceException ase) {
|
||||||
|
|
|
@ -0,0 +1,93 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.fs.s3a;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.fs.contract.AbstractFSContract;
|
||||||
|
import org.apache.hadoop.fs.contract.AbstractFSContractTestBase;
|
||||||
|
import org.apache.hadoop.fs.contract.s3a.S3AContract;
|
||||||
|
import org.junit.Rule;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.rules.Timeout;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
|
||||||
|
import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
|
||||||
|
import static org.apache.hadoop.fs.contract.ContractTestUtils.fileStatsToString;
|
||||||
|
|
||||||
|
public class TestS3ABlocksize extends AbstractFSContractTestBase {
|
||||||
|
|
||||||
|
private static final Logger LOG =
|
||||||
|
LoggerFactory.getLogger(TestS3ABlocksize.class);
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected AbstractFSContract createContract(Configuration conf) {
|
||||||
|
return new S3AContract(conf);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Rule
|
||||||
|
public Timeout testTimeout = new Timeout(30 * 60 * 1000);
|
||||||
|
|
||||||
|
@Test
|
||||||
|
@SuppressWarnings("deprecation")
|
||||||
|
public void testBlockSize() throws Exception {
|
||||||
|
FileSystem fs = getFileSystem();
|
||||||
|
long defaultBlockSize = fs.getDefaultBlockSize();
|
||||||
|
assertEquals("incorrect blocksize",
|
||||||
|
S3AFileSystem.DEFAULT_BLOCKSIZE, defaultBlockSize);
|
||||||
|
long newBlockSize = defaultBlockSize * 2;
|
||||||
|
fs.getConf().setLong(Constants.FS_S3A_BLOCK_SIZE, newBlockSize);
|
||||||
|
|
||||||
|
Path dir = path("testBlockSize");
|
||||||
|
Path file = new Path(dir, "file");
|
||||||
|
createFile(fs, file, true, dataset(1024, 'a', 'z' - 'a'));
|
||||||
|
FileStatus fileStatus = fs.getFileStatus(file);
|
||||||
|
assertEquals("Double default block size in stat(): " + fileStatus,
|
||||||
|
newBlockSize,
|
||||||
|
fileStatus.getBlockSize());
|
||||||
|
|
||||||
|
// check the listing & assert that the block size is picked up by
|
||||||
|
// this route too.
|
||||||
|
boolean found = false;
|
||||||
|
FileStatus[] listing = fs.listStatus(dir);
|
||||||
|
for (FileStatus stat : listing) {
|
||||||
|
LOG.info("entry: {}", stat);
|
||||||
|
if (file.equals(stat.getPath())) {
|
||||||
|
found = true;
|
||||||
|
assertEquals("Double default block size in ls(): " + stat,
|
||||||
|
newBlockSize,
|
||||||
|
stat.getBlockSize());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
assertTrue("Did not find " + fileStatsToString(listing, ", "), found);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRootFileStatusHasBlocksize() throws Throwable {
|
||||||
|
FileSystem fs = getFileSystem();
|
||||||
|
FileStatus status = fs.getFileStatus(new Path("/"));
|
||||||
|
assertTrue("Invalid root blocksize",
|
||||||
|
status.getBlockSize() >= 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,18 @@
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
# you may not use this file except in compliance with the License.
|
||||||
|
# You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
# log4j configuration used during build and unit tests
|
||||||
|
|
||||||
|
log4j.rootLogger=info,stdout
|
||||||
|
log4j.threshhold=ALL
|
||||||
|
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
|
||||||
|
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
|
||||||
|
log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n
|
Loading…
Reference in New Issue