diff --git a/extensions-core/s3-extensions/src/main/java/io/druid/firehose/s3/StaticS3FirehoseFactory.java b/extensions-core/s3-extensions/src/main/java/io/druid/firehose/s3/StaticS3FirehoseFactory.java index 95ae9394a70..89e281e1931 100644 --- a/extensions-core/s3-extensions/src/main/java/io/druid/firehose/s3/StaticS3FirehoseFactory.java +++ b/extensions-core/s3-extensions/src/main/java/io/druid/firehose/s3/StaticS3FirehoseFactory.java @@ -27,6 +27,7 @@ import io.druid.data.input.impl.PrefetchableTextFilesFirehoseFactory; import io.druid.java.util.common.CompressionUtils; import io.druid.java.util.common.IAE; import io.druid.java.util.common.logger.Logger; +import org.jets3t.service.S3ServiceException; import org.jets3t.service.ServiceException; import org.jets3t.service.StorageObjectsChunk; import org.jets3t.service.impl.rest.httpclient.RestS3Service; @@ -71,11 +72,11 @@ public class StaticS3FirehoseFactory extends PrefetchableTextFilesFirehoseFactor this.prefixes = prefixes == null ? new ArrayList<>() : prefixes; if (!this.uris.isEmpty() && !this.prefixes.isEmpty()) { - throw new IAE("uris and directories cannot be used together"); + throw new IAE("uris and prefixes cannot be used together"); } if (this.uris.isEmpty() && this.prefixes.isEmpty()) { - throw new IAE("uris or directories must be specified"); + throw new IAE("uris or prefixes must be specified"); } for (final URI inputURI : this.uris) { @@ -131,12 +132,36 @@ public class StaticS3FirehoseFactory extends PrefetchableTextFilesFirehoseFactor MAX_LISTING_LENGTH, lastKey ); - Arrays.stream(objectsChunk.getObjects()).forEach(storageObject -> objects.add((S3Object) storageObject)); + Arrays.stream(objectsChunk.getObjects()) + .filter(storageObject -> !storageObject.isDirectoryPlaceholder()) + .forEach(storageObject -> objects.add((S3Object) storageObject)); lastKey = objectsChunk.getPriorLastKey(); } while (!objectsChunk.isListingComplete()); } - catch (ServiceException e) { - throw new IOException(e); + catch (ServiceException outerException) { + log.error(outerException, "Exception while listing on %s", uri); + + if (outerException.getResponseCode() == 403) { + // The "Access Denied" means users might not have a proper permission for listing on the given uri. + // Usually this is not a problem, but the uris might be the full paths to input objects instead of prefixes. + // In this case, users should be able to get objects if they have a proper permission for GetObject. + + log.warn("Access denied for %s. Try to get the object from the uri without listing", uri); + try { + final S3Object s3Object = s3Client.getObject(bucket, prefix); + if (!s3Object.isDirectoryPlaceholder()) { + objects.add(s3Object); + } else { + throw new IOException(uri + " is a directory placeholder, " + + "but failed to get the object list under the directory due to permission"); + } + } + catch (S3ServiceException innerException) { + throw new IOException(innerException); + } + } else { + throw new IOException(outerException); + } } } return objects;