diff --git a/server/src/main/java/org/apache/druid/segment/realtime/firehose/HttpFirehoseFactory.java b/server/src/main/java/org/apache/druid/segment/realtime/firehose/HttpFirehoseFactory.java index a9d758e7cad..860f6cc1106 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/firehose/HttpFirehoseFactory.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/firehose/HttpFirehoseFactory.java @@ -49,7 +49,6 @@ public class HttpFirehoseFactory extends PrefetchableTextFilesFirehoseFactory uris; - private final boolean supportContentRange; @Nullable private final String httpAuthenticationUsername; @Nullable @@ -63,29 +62,25 @@ public class HttpFirehoseFactory extends PrefetchableTextFilesFirehoseFactory 0, "Empty URIs"); - final URLConnection connection = uris.get(0).toURL().openConnection(); - final String acceptRanges = connection.getHeaderField(HttpHeaders.ACCEPT_RANGES); - this.supportContentRange = acceptRanges != null && "bytes".equalsIgnoreCase(acceptRanges); + this.uris = uris; this.httpAuthenticationUsername = httpAuthenticationUsername; this.httpAuthenticationPasswordProvider = httpAuthenticationPasswordProvider; } + @Nullable @JsonProperty public String getHttpAuthenticationUsername() { return httpAuthenticationUsername; } + @Nullable @JsonProperty("httpAuthenticationPassword") public PasswordProvider getHttpAuthenticationPasswordProvider() { @@ -115,14 +110,16 @@ public class HttpFirehoseFactory extends PrefetchableTextFilesFirehoseFactory 0) { + final String acceptRanges = urlConnection.getHeaderField(HttpHeaders.ACCEPT_RANGES); + final boolean withRanges = "bytes".equalsIgnoreCase(acceptRanges); + if (withRanges && start > 0) { // Set header for range request. // Since we need to set only the start offset, the header is "bytes=-". // See https://tools.ietf.org/html/rfc7233#section-2.1 urlConnection.addRequestProperty(HttpHeaders.RANGE, StringUtils.format("bytes=%d-", start)); return urlConnection.getInputStream(); } else { - if (!supportContentRange && start > 0) { + if (!withRanges && start > 0) { log.warn( "Since the input source doesn't support range requests, the object input stream is opened from the start and " + "then skipped. This may make the ingestion speed slower. Consider enabling prefetch if you see this message" @@ -159,8 +156,8 @@ public class HttpFirehoseFactory extends PrefetchableTextFilesFirehoseFactory withSplit(InputSplit split) { - try { - return new HttpFirehoseFactory( - Collections.singletonList(split.get()), - getMaxCacheCapacityBytes(), - getMaxFetchCapacityBytes(), - getPrefetchTriggerBytes(), - getFetchTimeout(), - getMaxFetchRetry(), - getHttpAuthenticationUsername(), - httpAuthenticationPasswordProvider - ); - } - catch (IOException e) { - throw new RuntimeException(e); - } + return new HttpFirehoseFactory( + Collections.singletonList(split.get()), + getMaxCacheCapacityBytes(), + getMaxFetchCapacityBytes(), + getPrefetchTriggerBytes(), + getFetchTimeout(), + getMaxFetchRetry(), + getHttpAuthenticationUsername(), + httpAuthenticationPasswordProvider + ); } private URLConnection openURLConnection(URI object) throws IOException