HADOOP-11584 s3a file block size set to 0 in getFileStatus. (Brahma Reddy Battula via stevel)

This commit is contained in:
Steve Loughran 2015-02-21 12:02:41 +00:00
parent 737bad02d4
commit 709ff99cff
5 changed files with 143 additions and 17 deletions

View File

@ -984,6 +984,9 @@ Release 2.7.0 - UNRELEASED
HADOOP-11612. Workaround for Curator's ChildReaper requiring Guava 15+. HADOOP-11612. Workaround for Curator's ChildReaper requiring Guava 15+.
(rkanter) (rkanter)
HADOOP-11584 s3a file block size set to 0 in getFileStatus.
(Brahma Reddy Battula via stevel)
Release 2.6.1 - UNRELEASED Release 2.6.1 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -31,8 +31,9 @@ public class S3AFileStatus extends FileStatus {
} }
// Files // Files
public S3AFileStatus(long length, long modification_time, Path path) { public S3AFileStatus(long length, long modification_time, Path path,
super(length, false, 1, 0, modification_time, path); long blockSize) {
super(length, false, 1, blockSize, modification_time, path);
isEmptyDirectory = false; isEmptyDirectory = false;
} }

View File

@ -77,6 +77,10 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
public class S3AFileSystem extends FileSystem { public class S3AFileSystem extends FileSystem {
/**
* Default blocksize as used in blocksize and FS status queries
*/
public static final int DEFAULT_BLOCKSIZE = 32 * 1024 * 1024;
private URI uri; private URI uri;
private Path workingDir; private Path workingDir;
private AmazonS3Client s3; private AmazonS3Client s3;
@ -256,7 +260,7 @@ public class S3AFileSystem extends FileSystem {
} }
long keepAliveTime = conf.getLong(KEEPALIVE_TIME, DEFAULT_KEEPALIVE_TIME); long keepAliveTime = conf.getLong(KEEPALIVE_TIME, DEFAULT_KEEPALIVE_TIME);
LinkedBlockingQueue<Runnable> workQueue = LinkedBlockingQueue<Runnable> workQueue =
new LinkedBlockingQueue<Runnable>(maxThreads * new LinkedBlockingQueue<>(maxThreads *
conf.getInt(MAX_TOTAL_TASKS, DEFAULT_MAX_TOTAL_TASKS)); conf.getInt(MAX_TOTAL_TASKS, DEFAULT_MAX_TOTAL_TASKS));
ThreadPoolExecutor tpe = new ThreadPoolExecutor( ThreadPoolExecutor tpe = new ThreadPoolExecutor(
coreThreads, coreThreads,
@ -434,7 +438,7 @@ public class S3AFileSystem extends FileSystem {
String srcKey = pathToKey(src); String srcKey = pathToKey(src);
String dstKey = pathToKey(dst); String dstKey = pathToKey(dst);
if (srcKey.length() == 0 || dstKey.length() == 0) { if (srcKey.isEmpty() || dstKey.isEmpty()) {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("rename: src or dst are empty"); LOG.debug("rename: src or dst are empty");
} }
@ -526,7 +530,7 @@ public class S3AFileSystem extends FileSystem {
} }
List<DeleteObjectsRequest.KeyVersion> keysToDelete = List<DeleteObjectsRequest.KeyVersion> keysToDelete =
new ArrayList<DeleteObjectsRequest.KeyVersion>(); new ArrayList<>();
if (dstStatus != null && dstStatus.isEmptyDirectory()) { if (dstStatus != null && dstStatus.isEmptyDirectory()) {
// delete unnecessary fake directory. // delete unnecessary fake directory.
keysToDelete.add(new DeleteObjectsRequest.KeyVersion(dstKey)); keysToDelete.add(new DeleteObjectsRequest.KeyVersion(dstKey));
@ -640,7 +644,7 @@ public class S3AFileSystem extends FileSystem {
request.setMaxKeys(maxKeys); request.setMaxKeys(maxKeys);
List<DeleteObjectsRequest.KeyVersion> keys = List<DeleteObjectsRequest.KeyVersion> keys =
new ArrayList<DeleteObjectsRequest.KeyVersion>(); new ArrayList<>();
ObjectListing objects = s3.listObjects(request); ObjectListing objects = s3.listObjects(request);
statistics.incrementReadOps(1); statistics.incrementReadOps(1);
while (true) { while (true) {
@ -663,7 +667,7 @@ public class S3AFileSystem extends FileSystem {
objects = s3.listNextBatchOfObjects(objects); objects = s3.listNextBatchOfObjects(objects);
statistics.incrementReadOps(1); statistics.incrementReadOps(1);
} else { } else {
if (keys.size() > 0) { if (!keys.isEmpty()) {
DeleteObjectsRequest deleteRequest = DeleteObjectsRequest deleteRequest =
new DeleteObjectsRequest(bucket).withKeys(keys); new DeleteObjectsRequest(bucket).withKeys(keys);
s3.deleteObjects(deleteRequest); s3.deleteObjects(deleteRequest);
@ -751,7 +755,8 @@ public class S3AFileSystem extends FileSystem {
} }
} else { } else {
result.add(new S3AFileStatus(summary.getSize(), result.add(new S3AFileStatus(summary.getSize(),
dateToLong(summary.getLastModified()), keyPath)); dateToLong(summary.getLastModified()), keyPath,
getDefaultBlockSize(f.makeQualified(uri, workingDir))));
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Adding: fi: " + keyPath); LOG.debug("Adding: fi: " + keyPath);
} }
@ -877,13 +882,16 @@ public class S3AFileSystem extends FileSystem {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Found exact file: fake directory"); LOG.debug("Found exact file: fake directory");
} }
return new S3AFileStatus(true, true, f.makeQualified(uri, workingDir)); return new S3AFileStatus(true, true,
f.makeQualified(uri, workingDir));
} else { } else {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Found exact file: normal file"); LOG.debug("Found exact file: normal file");
} }
return new S3AFileStatus(meta.getContentLength(), dateToLong(meta.getLastModified()), return new S3AFileStatus(meta.getContentLength(),
f.makeQualified(uri, workingDir)); dateToLong(meta.getLastModified()),
f.makeQualified(uri, workingDir),
getDefaultBlockSize(f.makeQualified(uri, workingDir)));
} }
} catch (AmazonServiceException e) { } catch (AmazonServiceException e) {
if (e.getStatusCode() != 404) { if (e.getStatusCode() != 404) {
@ -910,8 +918,10 @@ public class S3AFileSystem extends FileSystem {
} else { } else {
LOG.warn("Found file (with /): real file? should not happen: {}", key); LOG.warn("Found file (with /): real file? should not happen: {}", key);
return new S3AFileStatus(meta.getContentLength(), dateToLong(meta.getLastModified()), return new S3AFileStatus(meta.getContentLength(),
f.makeQualified(uri, workingDir)); dateToLong(meta.getLastModified()),
f.makeQualified(uri, workingDir),
getDefaultBlockSize(f.makeQualified(uri, workingDir)));
} }
} catch (AmazonServiceException e) { } catch (AmazonServiceException e) {
if (e.getStatusCode() != 404) { if (e.getStatusCode() != 404) {
@ -953,7 +963,8 @@ public class S3AFileSystem extends FileSystem {
} }
} }
return new S3AFileStatus(true, false, f.makeQualified(uri, workingDir)); return new S3AFileStatus(true, false,
f.makeQualified(uri, workingDir));
} }
} catch (AmazonServiceException e) { } catch (AmazonServiceException e) {
if (e.getStatusCode() != 404) { if (e.getStatusCode() != 404) {
@ -1128,8 +1139,8 @@ public class S3AFileSystem extends FileSystem {
s3.deleteObject(bucket, key + "/"); s3.deleteObject(bucket, key + "/");
statistics.incrementWriteOps(1); statistics.incrementWriteOps(1);
} }
} catch (FileNotFoundException e) { } catch (FileNotFoundException | AmazonServiceException e) {
} catch (AmazonServiceException e) {} }
if (f.isRoot()) { if (f.isRoot()) {
break; break;
@ -1178,7 +1189,7 @@ public class S3AFileSystem extends FileSystem {
@Deprecated @Deprecated
public long getDefaultBlockSize() { public long getDefaultBlockSize() {
// default to 32MB: large enough to minimize the impact of seeks // default to 32MB: large enough to minimize the impact of seeks
return getConf().getLong(FS_S3A_BLOCK_SIZE, 32 * 1024 * 1024); return getConf().getLong(FS_S3A_BLOCK_SIZE, DEFAULT_BLOCKSIZE);
} }
private void printAmazonServiceException(AmazonServiceException ase) { private void printAmazonServiceException(AmazonServiceException ase) {

View File

@ -0,0 +1,93 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.contract.AbstractFSContract;
import org.apache.hadoop.fs.contract.AbstractFSContractTestBase;
import org.apache.hadoop.fs.contract.s3a.S3AContract;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
import static org.apache.hadoop.fs.contract.ContractTestUtils.fileStatsToString;
public class TestS3ABlocksize extends AbstractFSContractTestBase {
private static final Logger LOG =
LoggerFactory.getLogger(TestS3ABlocksize.class);
@Override
protected AbstractFSContract createContract(Configuration conf) {
return new S3AContract(conf);
}
@Rule
public Timeout testTimeout = new Timeout(30 * 60 * 1000);
@Test
@SuppressWarnings("deprecation")
public void testBlockSize() throws Exception {
FileSystem fs = getFileSystem();
long defaultBlockSize = fs.getDefaultBlockSize();
assertEquals("incorrect blocksize",
S3AFileSystem.DEFAULT_BLOCKSIZE, defaultBlockSize);
long newBlockSize = defaultBlockSize * 2;
fs.getConf().setLong(Constants.FS_S3A_BLOCK_SIZE, newBlockSize);
Path dir = path("testBlockSize");
Path file = new Path(dir, "file");
createFile(fs, file, true, dataset(1024, 'a', 'z' - 'a'));
FileStatus fileStatus = fs.getFileStatus(file);
assertEquals("Double default block size in stat(): " + fileStatus,
newBlockSize,
fileStatus.getBlockSize());
// check the listing & assert that the block size is picked up by
// this route too.
boolean found = false;
FileStatus[] listing = fs.listStatus(dir);
for (FileStatus stat : listing) {
LOG.info("entry: {}", stat);
if (file.equals(stat.getPath())) {
found = true;
assertEquals("Double default block size in ls(): " + stat,
newBlockSize,
stat.getBlockSize());
}
}
assertTrue("Did not find " + fileStatsToString(listing, ", "), found);
}
@Test
public void testRootFileStatusHasBlocksize() throws Throwable {
FileSystem fs = getFileSystem();
FileStatus status = fs.getFileStatus(new Path("/"));
assertTrue("Invalid root blocksize",
status.getBlockSize() >= 0);
}
}

View File

@ -0,0 +1,18 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# log4j configuration used during build and unit tests
log4j.rootLogger=info,stdout
log4j.threshhold=ALL
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n