Enhance the Http Firehose to work with URIs requiring basic authentication (#7145)

* Enhnace the HttpFirehose to work with both insecure URIs and URIs requiring basic authentication

* Improve security of enhanced HttpFirehoseFactory by not logging auth credentials

* Fix checkstyle failure in HttpFirehoseFactory.java

* Update docs and fix TeamCity build with required noinspection

* Indentation cleanup and logic modification for HttpFirehose object stream

* Remove default Empty string password provider in http firehose

* Add JavaDoc for MixIn describing its intended use

* Reverting documentation notation for json code to be inline with rest of doc

* Improve instantiation of ObjectMappers that require MixIn for redacting password from task logs

* Add comment to clarify fully qualified references of Objects in SQLMetadataStorageActionHandler
This commit is contained in:
Lucas Capistrant 2019-04-15 16:29:01 -05:00 committed by Jihoon Son
parent 4654e1e851
commit 8acad27d99
7 changed files with 150 additions and 18 deletions

View File

@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.metadata;
import com.fasterxml.jackson.annotation.JsonIgnore;
/**
* This Interface is used as a MixIn for ObjectMapper objects when there is a desire to avoid serializing a Password
* from a PasswordProvider to JSON in plaintext when that JSON is going to be used for purposes that don't require the
* password to be present, such as logging to a file.
*/
public interface PasswordProviderRedactionMixIn
{
@JsonIgnore
String getPassword();
}

View File

@ -58,4 +58,13 @@ public class DefaultPasswordProviderTest
PasswordProvider.class); PasswordProvider.class);
Assert.assertEquals(pwd, pp.getPassword()); Assert.assertEquals(pwd, pp.getPassword());
} }
@Test
public void testSerializationWithMixIn() throws Exception
{
DefaultPasswordProvider pp = new DefaultPasswordProvider(pwd);
jsonMapper.addMixIn(PasswordProvider.class, PasswordProviderRedactionMixIn.class);
String valueAsString = jsonMapper.writeValueAsString(pp);
Assert.assertEquals("{\"type\":\"default\"}", valueAsString);
}
} }

View File

@ -74,6 +74,39 @@ A sample http firehose spec is shown below:
} }
``` ```
The below configurations can be optionally used if the URIs specified in the spec require a Basic Authentication Header.
Omitting these fields from your spec will result in HTTP requests with no Basic Authentication Header.
|property|description|default|
|--------|-----------|-------|
|httpAuthenticationUsername|Username to use for authentication with specified URIs|None|
|httpAuthenticationPassword|PasswordProvider to use with specified URIs|None|
Example with authentication fields using the DefaultPassword provider (this requires the password to be in the ingestion spec):
```json
{
"type": "http",
"uris": ["http://example.com/uri1", "http://example2.com/uri2"],
"httpAuthenticationUsername": "username",
"httpAuthenticationPassword": "password123"
}
```
You can also use the other existing Druid PasswordProviders. Here is an example using the EnvironmentVariablePasswordProvider:
```json
{
"type": "http",
"uris": ["http://example.com/uri1", "http://example2.com/uri2"],
"httpAuthenticationUsername": "username",
"httpAuthenticationPassword": {
"type": "environment",
"variable": "HTTP_FIREHOSE_PW"
}
}
```
The below configurations can be optionally used for tuning the firehose performance. The below configurations can be optionally used for tuning the firehose performance.
|property|description|default| |property|description|default|

View File

@ -37,6 +37,8 @@ import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart; import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop; import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.metadata.PasswordProvider;
import org.apache.druid.metadata.PasswordProviderRedactionMixIn;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
@ -80,7 +82,7 @@ public class ExecutorLifecycle
this.taskConfig = taskConfig; this.taskConfig = taskConfig;
this.taskActionClientFactory = taskActionClientFactory; this.taskActionClientFactory = taskActionClientFactory;
this.taskRunner = taskRunner; this.taskRunner = taskRunner;
this.jsonMapper = jsonMapper; this.jsonMapper = jsonMapper.copy().addMixIn(PasswordProvider.class, PasswordProviderRedactionMixIn.class);
} }
@LifecycleStart @LifecycleStart

View File

@ -82,7 +82,10 @@ public abstract class SQLMetadataStorageActionHandler<EntryType, StatusType, Log
) )
{ {
this.connector = connector; this.connector = connector;
this.jsonMapper = jsonMapper; //fully qualified references required below due to identical package names across project modules.
//noinspection UnnecessaryFullyQualifiedName
this.jsonMapper = jsonMapper.copy().addMixIn(org.apache.druid.metadata.PasswordProvider.class,
org.apache.druid.metadata.PasswordProviderRedactionMixIn.class);
this.entryType = types.getEntryType(); this.entryType = types.getEntryType();
this.statusType = types.getStatusType(); this.statusType = types.getStatusType();
this.logType = types.getLogType(); this.logType = types.getLogType();

View File

@ -21,8 +21,10 @@ package org.apache.druid.segment.realtime.firehose;
import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.base.Predicate; import com.google.common.base.Predicate;
import com.google.common.base.Strings;
import com.google.common.net.HttpHeaders; import com.google.common.net.HttpHeaders;
import org.apache.druid.data.input.FiniteFirehoseFactory; import org.apache.druid.data.input.FiniteFirehoseFactory;
import org.apache.druid.data.input.InputSplit; import org.apache.druid.data.input.InputSplit;
@ -30,12 +32,15 @@ import org.apache.druid.data.input.impl.StringInputRowParser;
import org.apache.druid.data.input.impl.prefetch.PrefetchableTextFilesFirehoseFactory; import org.apache.druid.data.input.impl.prefetch.PrefetchableTextFilesFirehoseFactory;
import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.metadata.PasswordProvider;
import org.apache.druid.utils.CompressionUtils; import org.apache.druid.utils.CompressionUtils;
import javax.annotation.Nullable;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.net.URI; import java.net.URI;
import java.net.URLConnection; import java.net.URLConnection;
import java.util.Base64;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
@ -46,6 +51,10 @@ public class HttpFirehoseFactory extends PrefetchableTextFilesFirehoseFactory<UR
private static final Logger log = new Logger(HttpFirehoseFactory.class); private static final Logger log = new Logger(HttpFirehoseFactory.class);
private final List<URI> uris; private final List<URI> uris;
private final boolean supportContentRange; private final boolean supportContentRange;
@Nullable
private final String httpAuthenticationUsername;
@Nullable
private final PasswordProvider httpAuthenticationPasswordProvider;
@JsonCreator @JsonCreator
public HttpFirehoseFactory( public HttpFirehoseFactory(
@ -54,7 +63,11 @@ public class HttpFirehoseFactory extends PrefetchableTextFilesFirehoseFactory<UR
@JsonProperty("maxFetchCapacityBytes") Long maxFetchCapacityBytes, @JsonProperty("maxFetchCapacityBytes") Long maxFetchCapacityBytes,
@JsonProperty("prefetchTriggerBytes") Long prefetchTriggerBytes, @JsonProperty("prefetchTriggerBytes") Long prefetchTriggerBytes,
@JsonProperty("fetchTimeout") Long fetchTimeout, @JsonProperty("fetchTimeout") Long fetchTimeout,
@JsonProperty("maxFetchRetry") Integer maxFetchRetry @JsonProperty("maxFetchRetry") Integer maxFetchRetry,
@Nullable
@JsonProperty("httpAuthenticationUsername") String httpAuthenticationUsername,
@Nullable
@JsonProperty("httpAuthenticationPassword") PasswordProvider httpAuthenticationPasswordProvider
) throws IOException ) throws IOException
{ {
super(maxCacheCapacityBytes, maxFetchCapacityBytes, prefetchTriggerBytes, fetchTimeout, maxFetchRetry); super(maxCacheCapacityBytes, maxFetchCapacityBytes, prefetchTriggerBytes, fetchTimeout, maxFetchRetry);
@ -64,6 +77,20 @@ public class HttpFirehoseFactory extends PrefetchableTextFilesFirehoseFactory<UR
final URLConnection connection = uris.get(0).toURL().openConnection(); final URLConnection connection = uris.get(0).toURL().openConnection();
final String acceptRanges = connection.getHeaderField(HttpHeaders.ACCEPT_RANGES); final String acceptRanges = connection.getHeaderField(HttpHeaders.ACCEPT_RANGES);
this.supportContentRange = acceptRanges != null && "bytes".equalsIgnoreCase(acceptRanges); this.supportContentRange = acceptRanges != null && "bytes".equalsIgnoreCase(acceptRanges);
this.httpAuthenticationUsername = httpAuthenticationUsername;
this.httpAuthenticationPasswordProvider = httpAuthenticationPasswordProvider;
}
@JsonProperty
public String getHttpAuthenticationUsername()
{
return httpAuthenticationUsername;
}
@JsonProperty("httpAuthenticationPassword")
public PasswordProvider getHttpAuthenticationPasswordProvider()
{
return httpAuthenticationPasswordProvider;
} }
@JsonProperty @JsonProperty
@ -81,26 +108,29 @@ public class HttpFirehoseFactory extends PrefetchableTextFilesFirehoseFactory<UR
@Override @Override
protected InputStream openObjectStream(URI object) throws IOException protected InputStream openObjectStream(URI object) throws IOException
{ {
return object.toURL().openConnection().getInputStream(); // A negative start value will ensure no bytes of the InputStream are skipped
return openObjectStream(object, 0);
} }
@Override @Override
protected InputStream openObjectStream(URI object, long start) throws IOException protected InputStream openObjectStream(URI object, long start) throws IOException
{ {
if (supportContentRange) { URLConnection urlConnection = openURLConnection(object);
final URLConnection connection = object.toURL().openConnection(); if (supportContentRange && start > 0) {
// Set header for range request. // Set header for range request.
// Since we need to set only the start offset, the header is "bytes=<range-start>-". // Since we need to set only the start offset, the header is "bytes=<range-start>-".
// See https://tools.ietf.org/html/rfc7233#section-2.1 // See https://tools.ietf.org/html/rfc7233#section-2.1
connection.addRequestProperty(HttpHeaders.RANGE, StringUtils.format("bytes=%d-", start)); urlConnection.addRequestProperty(HttpHeaders.RANGE, StringUtils.format("bytes=%d-", start));
return connection.getInputStream(); return urlConnection.getInputStream();
} else { } else {
log.warn( if (!supportContentRange && start > 0) {
"Since the input source doesn't support range requests, the object input stream is opened from the start and " log.warn(
+ "then skipped. This may make the ingestion speed slower. Consider enabling prefetch if you see this message" "Since the input source doesn't support range requests, the object input stream is opened from the start and "
+ " a lot." + "then skipped. This may make the ingestion speed slower. Consider enabling prefetch if you see this message"
); + " a lot."
final InputStream in = openObjectStream(object); );
}
final InputStream in = urlConnection.getInputStream();
in.skip(start); in.skip(start);
return in; return in;
} }
@ -129,7 +159,9 @@ public class HttpFirehoseFactory extends PrefetchableTextFilesFirehoseFactory<UR
getMaxFetchCapacityBytes() == that.getMaxFetchCapacityBytes() && getMaxFetchCapacityBytes() == that.getMaxFetchCapacityBytes() &&
getPrefetchTriggerBytes() == that.getPrefetchTriggerBytes() && getPrefetchTriggerBytes() == that.getPrefetchTriggerBytes() &&
getFetchTimeout() == that.getFetchTimeout() && getFetchTimeout() == that.getFetchTimeout() &&
getMaxFetchRetry() == that.getMaxFetchRetry(); getMaxFetchRetry() == that.getMaxFetchRetry() &&
httpAuthenticationUsername.equals(that.getHttpAuthenticationUsername()) &&
httpAuthenticationPasswordProvider.equals(that.getHttpAuthenticationPasswordProvider());
} }
@Override @Override
@ -141,7 +173,9 @@ public class HttpFirehoseFactory extends PrefetchableTextFilesFirehoseFactory<UR
getMaxFetchCapacityBytes(), getMaxFetchCapacityBytes(),
getPrefetchTriggerBytes(), getPrefetchTriggerBytes(),
getFetchTimeout(), getFetchTimeout(),
getMaxFetchRetry() getMaxFetchRetry(),
httpAuthenticationUsername,
httpAuthenticationPasswordProvider
); );
} }
@ -161,11 +195,25 @@ public class HttpFirehoseFactory extends PrefetchableTextFilesFirehoseFactory<UR
getMaxFetchCapacityBytes(), getMaxFetchCapacityBytes(),
getPrefetchTriggerBytes(), getPrefetchTriggerBytes(),
getFetchTimeout(), getFetchTimeout(),
getMaxFetchRetry() getMaxFetchRetry(),
getHttpAuthenticationUsername(),
httpAuthenticationPasswordProvider
); );
} }
catch (IOException e) { catch (IOException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
} }
@VisibleForTesting
URLConnection openURLConnection(URI object) throws IOException
{
URLConnection urlConnection = object.toURL().openConnection();
if (!Strings.isNullOrEmpty(httpAuthenticationUsername) && httpAuthenticationPasswordProvider != null) {
String userPass = httpAuthenticationUsername + ":" + httpAuthenticationPasswordProvider.getPassword();
String basicAuthString = "Basic " + Base64.getEncoder().encodeToString(StringUtils.toUtf8(userPass));
urlConnection.setRequestProperty("Authorization", basicAuthString);
}
return urlConnection;
}
} }

View File

@ -22,6 +22,7 @@ package org.apache.druid.segment.realtime.firehose;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.metadata.DefaultPasswordProvider;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
@ -34,13 +35,16 @@ public class HttpFirehoseFactoryTest
public void testSerde() throws IOException public void testSerde() throws IOException
{ {
final ObjectMapper mapper = new DefaultObjectMapper(); final ObjectMapper mapper = new DefaultObjectMapper();
final DefaultPasswordProvider pwProvider = new DefaultPasswordProvider("testPassword");
final HttpFirehoseFactory factory = new HttpFirehoseFactory( final HttpFirehoseFactory factory = new HttpFirehoseFactory(
ImmutableList.of(URI.create("http://foo/bar"), URI.create("http://foo/bar2")), ImmutableList.of(URI.create("http://foo/bar"), URI.create("http://foo/bar2")),
2048L, 2048L,
1024L, 1024L,
512L, 512L,
100L, 100L,
5 5,
"testUser",
pwProvider
); );
final HttpFirehoseFactory outputFact = mapper.readValue( final HttpFirehoseFactory outputFact = mapper.readValue(