fix http firehose factory leaky connection in constructor (#8576)

* fix http firehose factory leaky connection in constructor

* stylin
This commit is contained in:
Clint Wylie 2019-09-24 16:08:43 -07:00 committed by Gian Merlino
parent 7c14fa08f8
commit eabddffd6e
1 changed files with 22 additions and 30 deletions

View File

@ -49,7 +49,6 @@ public class HttpFirehoseFactory extends PrefetchableTextFilesFirehoseFactory<UR
{ {
private static final Logger log = new Logger(HttpFirehoseFactory.class); private static final Logger log = new Logger(HttpFirehoseFactory.class);
private final List<URI> uris; private final List<URI> uris;
private final boolean supportContentRange;
@Nullable @Nullable
private final String httpAuthenticationUsername; private final String httpAuthenticationUsername;
@Nullable @Nullable
@ -63,29 +62,25 @@ public class HttpFirehoseFactory extends PrefetchableTextFilesFirehoseFactory<UR
@JsonProperty("prefetchTriggerBytes") Long prefetchTriggerBytes, @JsonProperty("prefetchTriggerBytes") Long prefetchTriggerBytes,
@JsonProperty("fetchTimeout") Long fetchTimeout, @JsonProperty("fetchTimeout") Long fetchTimeout,
@JsonProperty("maxFetchRetry") Integer maxFetchRetry, @JsonProperty("maxFetchRetry") Integer maxFetchRetry,
@Nullable @JsonProperty("httpAuthenticationUsername") @Nullable String httpAuthenticationUsername,
@JsonProperty("httpAuthenticationUsername") String httpAuthenticationUsername, @JsonProperty("httpAuthenticationPassword") @Nullable PasswordProvider httpAuthenticationPasswordProvider
@Nullable )
@JsonProperty("httpAuthenticationPassword") PasswordProvider httpAuthenticationPasswordProvider
) throws IOException
{ {
super(maxCacheCapacityBytes, maxFetchCapacityBytes, prefetchTriggerBytes, fetchTimeout, maxFetchRetry); super(maxCacheCapacityBytes, maxFetchCapacityBytes, prefetchTriggerBytes, fetchTimeout, maxFetchRetry);
this.uris = uris;
Preconditions.checkArgument(uris.size() > 0, "Empty URIs"); Preconditions.checkArgument(uris.size() > 0, "Empty URIs");
final URLConnection connection = uris.get(0).toURL().openConnection(); this.uris = uris;
final String acceptRanges = connection.getHeaderField(HttpHeaders.ACCEPT_RANGES);
this.supportContentRange = acceptRanges != null && "bytes".equalsIgnoreCase(acceptRanges);
this.httpAuthenticationUsername = httpAuthenticationUsername; this.httpAuthenticationUsername = httpAuthenticationUsername;
this.httpAuthenticationPasswordProvider = httpAuthenticationPasswordProvider; this.httpAuthenticationPasswordProvider = httpAuthenticationPasswordProvider;
} }
@Nullable
@JsonProperty @JsonProperty
public String getHttpAuthenticationUsername() public String getHttpAuthenticationUsername()
{ {
return httpAuthenticationUsername; return httpAuthenticationUsername;
} }
@Nullable
@JsonProperty("httpAuthenticationPassword") @JsonProperty("httpAuthenticationPassword")
public PasswordProvider getHttpAuthenticationPasswordProvider() public PasswordProvider getHttpAuthenticationPasswordProvider()
{ {
@ -115,14 +110,16 @@ public class HttpFirehoseFactory extends PrefetchableTextFilesFirehoseFactory<UR
protected InputStream openObjectStream(URI object, long start) throws IOException protected InputStream openObjectStream(URI object, long start) throws IOException
{ {
URLConnection urlConnection = openURLConnection(object); URLConnection urlConnection = openURLConnection(object);
if (supportContentRange && start > 0) { final String acceptRanges = urlConnection.getHeaderField(HttpHeaders.ACCEPT_RANGES);
final boolean withRanges = "bytes".equalsIgnoreCase(acceptRanges);
if (withRanges && start > 0) {
// Set header for range request. // Set header for range request.
// Since we need to set only the start offset, the header is "bytes=<range-start>-". // Since we need to set only the start offset, the header is "bytes=<range-start>-".
// See https://tools.ietf.org/html/rfc7233#section-2.1 // See https://tools.ietf.org/html/rfc7233#section-2.1
urlConnection.addRequestProperty(HttpHeaders.RANGE, StringUtils.format("bytes=%d-", start)); urlConnection.addRequestProperty(HttpHeaders.RANGE, StringUtils.format("bytes=%d-", start));
return urlConnection.getInputStream(); return urlConnection.getInputStream();
} else { } else {
if (!supportContentRange && start > 0) { if (!withRanges && start > 0) {
log.warn( log.warn(
"Since the input source doesn't support range requests, the object input stream is opened from the start and " "Since the input source doesn't support range requests, the object input stream is opened from the start and "
+ "then skipped. This may make the ingestion speed slower. Consider enabling prefetch if you see this message" + "then skipped. This may make the ingestion speed slower. Consider enabling prefetch if you see this message"
@ -159,8 +156,8 @@ public class HttpFirehoseFactory extends PrefetchableTextFilesFirehoseFactory<UR
getPrefetchTriggerBytes() == that.getPrefetchTriggerBytes() && getPrefetchTriggerBytes() == that.getPrefetchTriggerBytes() &&
getFetchTimeout() == that.getFetchTimeout() && getFetchTimeout() == that.getFetchTimeout() &&
getMaxFetchRetry() == that.getMaxFetchRetry() && getMaxFetchRetry() == that.getMaxFetchRetry() &&
httpAuthenticationUsername.equals(that.getHttpAuthenticationUsername()) && Objects.equals(httpAuthenticationUsername, that.getHttpAuthenticationUsername()) &&
httpAuthenticationPasswordProvider.equals(that.getHttpAuthenticationPasswordProvider()); Objects.equals(httpAuthenticationPasswordProvider, that.getHttpAuthenticationPasswordProvider());
} }
@Override @Override
@ -187,7 +184,6 @@ public class HttpFirehoseFactory extends PrefetchableTextFilesFirehoseFactory<UR
@Override @Override
public FiniteFirehoseFactory<StringInputRowParser, URI> withSplit(InputSplit<URI> split) public FiniteFirehoseFactory<StringInputRowParser, URI> withSplit(InputSplit<URI> split)
{ {
try {
return new HttpFirehoseFactory( return new HttpFirehoseFactory(
Collections.singletonList(split.get()), Collections.singletonList(split.get()),
getMaxCacheCapacityBytes(), getMaxCacheCapacityBytes(),
@ -199,10 +195,6 @@ public class HttpFirehoseFactory extends PrefetchableTextFilesFirehoseFactory<UR
httpAuthenticationPasswordProvider httpAuthenticationPasswordProvider
); );
} }
catch (IOException e) {
throw new RuntimeException(e);
}
}
private URLConnection openURLConnection(URI object) throws IOException private URLConnection openURLConnection(URI object) throws IOException
{ {