Get s3 objects directly from prefixes when listing is failed due to permission (#4444)

* Fall back to getObject when listing is failed due to permission

* Throws an exception when listing is not allowed on directory

* Fix error messages
This commit is contained in:
Jihoon Son 2017-06-28 10:58:37 +09:00 committed by Gian Merlino
parent e3c13c246a
commit 79fd5338e3
1 changed files with 30 additions and 5 deletions

View File

@ -27,6 +27,7 @@ import io.druid.data.input.impl.PrefetchableTextFilesFirehoseFactory;
import io.druid.java.util.common.CompressionUtils;
import io.druid.java.util.common.IAE;
import io.druid.java.util.common.logger.Logger;
import org.jets3t.service.S3ServiceException;
import org.jets3t.service.ServiceException;
import org.jets3t.service.StorageObjectsChunk;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
@ -71,11 +72,11 @@ public class StaticS3FirehoseFactory extends PrefetchableTextFilesFirehoseFactor
this.prefixes = prefixes == null ? new ArrayList<>() : prefixes;
if (!this.uris.isEmpty() && !this.prefixes.isEmpty()) {
throw new IAE("uris and directories cannot be used together");
throw new IAE("uris and prefixes cannot be used together");
}
if (this.uris.isEmpty() && this.prefixes.isEmpty()) {
throw new IAE("uris or directories must be specified");
throw new IAE("uris or prefixes must be specified");
}
for (final URI inputURI : this.uris) {
@ -131,12 +132,36 @@ public class StaticS3FirehoseFactory extends PrefetchableTextFilesFirehoseFactor
MAX_LISTING_LENGTH,
lastKey
);
Arrays.stream(objectsChunk.getObjects()).forEach(storageObject -> objects.add((S3Object) storageObject));
Arrays.stream(objectsChunk.getObjects())
.filter(storageObject -> !storageObject.isDirectoryPlaceholder())
.forEach(storageObject -> objects.add((S3Object) storageObject));
lastKey = objectsChunk.getPriorLastKey();
} while (!objectsChunk.isListingComplete());
}
catch (ServiceException e) {
throw new IOException(e);
catch (ServiceException outerException) {
log.error(outerException, "Exception while listing on %s", uri);
if (outerException.getResponseCode() == 403) {
// The "Access Denied" means users might not have a proper permission for listing on the given uri.
// Usually this is not a problem, but the uris might be the full paths to input objects instead of prefixes.
// In this case, users should be able to get objects if they have a proper permission for GetObject.
log.warn("Access denied for %s. Try to get the object from the uri without listing", uri);
try {
final S3Object s3Object = s3Client.getObject(bucket, prefix);
if (!s3Object.isDirectoryPlaceholder()) {
objects.add(s3Object);
} else {
throw new IOException(uri + " is a directory placeholder, "
+ "but failed to get the object list under the directory due to permission");
}
}
catch (S3ServiceException innerException) {
throw new IOException(innerException);
}
} else {
throw new IOException(outerException);
}
}
}
return objects;