Add configurations for allowed protocols for HTTP and HDFS inputSources/firehoses (#10830)

* Allow only HTTP and HTTPS protocols for the HTTP inputSource

* rename

* Update core/src/main/java/org/apache/druid/data/input/impl/HttpInputSource.java

Co-authored-by: Abhishek Agarwal <1477457+abhishekagarwal87@users.noreply.github.com>

* fix http firehose and update doc

* HDFS inputSource

* add configs for allowed protocols

* fix checkstyle and doc

* more checkstyle

* remove stale doc

* remove more doc

* Apply doc suggestions from code review

Co-authored-by: Charles Smith <38529548+techdocsmith@users.noreply.github.com>

* update hdfs address in docs

* fix test

Co-authored-by: Abhishek Agarwal <1477457+abhishekagarwal87@users.noreply.github.com>
Co-authored-by: Charles Smith <38529548+techdocsmith@users.noreply.github.com>
This commit is contained in:
Jihoon Son 2021-03-06 11:43:00 -08:00 committed by GitHub
parent bddacbb1c3
commit 9946306d4b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 878 additions and 74 deletions

View File

@ -19,6 +19,7 @@
package org.apache.druid.data.input.impl; package org.apache.druid.data.input.impl;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
@ -28,6 +29,8 @@ import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.InputSourceReader; import org.apache.druid.data.input.InputSourceReader;
import org.apache.druid.data.input.InputSplit; import org.apache.druid.data.input.InputSplit;
import org.apache.druid.data.input.SplitHintSpec; import org.apache.druid.data.input.SplitHintSpec;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.metadata.PasswordProvider; import org.apache.druid.metadata.PasswordProvider;
import javax.annotation.Nullable; import javax.annotation.Nullable;
@ -45,18 +48,31 @@ public class HttpInputSource extends AbstractInputSource implements SplittableIn
private final String httpAuthenticationUsername; private final String httpAuthenticationUsername;
@Nullable @Nullable
private final PasswordProvider httpAuthenticationPasswordProvider; private final PasswordProvider httpAuthenticationPasswordProvider;
private final HttpInputSourceConfig config;
@JsonCreator @JsonCreator
public HttpInputSource( public HttpInputSource(
@JsonProperty("uris") List<URI> uris, @JsonProperty("uris") List<URI> uris,
@JsonProperty("httpAuthenticationUsername") @Nullable String httpAuthenticationUsername, @JsonProperty("httpAuthenticationUsername") @Nullable String httpAuthenticationUsername,
@JsonProperty("httpAuthenticationPassword") @Nullable PasswordProvider httpAuthenticationPasswordProvider @JsonProperty("httpAuthenticationPassword") @Nullable PasswordProvider httpAuthenticationPasswordProvider,
@JacksonInject HttpInputSourceConfig config
) )
{ {
Preconditions.checkArgument(uris != null && !uris.isEmpty(), "Empty URIs"); Preconditions.checkArgument(uris != null && !uris.isEmpty(), "Empty URIs");
throwIfInvalidProtocols(config, uris);
this.uris = uris; this.uris = uris;
this.httpAuthenticationUsername = httpAuthenticationUsername; this.httpAuthenticationUsername = httpAuthenticationUsername;
this.httpAuthenticationPasswordProvider = httpAuthenticationPasswordProvider; this.httpAuthenticationPasswordProvider = httpAuthenticationPasswordProvider;
this.config = config;
}
public static void throwIfInvalidProtocols(HttpInputSourceConfig config, List<URI> uris)
{
for (URI uri : uris) {
if (!config.getAllowedProtocols().contains(StringUtils.toLowerCase(uri.getScheme()))) {
throw new IAE("Only %s protocols are allowed", config.getAllowedProtocols());
}
}
} }
@JsonProperty @JsonProperty
@ -97,7 +113,8 @@ public class HttpInputSource extends AbstractInputSource implements SplittableIn
return new HttpInputSource( return new HttpInputSource(
Collections.singletonList(split.get()), Collections.singletonList(split.get()),
httpAuthenticationUsername, httpAuthenticationUsername,
httpAuthenticationPasswordProvider httpAuthenticationPasswordProvider,
config
); );
} }
@ -129,16 +146,17 @@ public class HttpInputSource extends AbstractInputSource implements SplittableIn
if (o == null || getClass() != o.getClass()) { if (o == null || getClass() != o.getClass()) {
return false; return false;
} }
HttpInputSource source = (HttpInputSource) o; HttpInputSource that = (HttpInputSource) o;
return Objects.equals(uris, source.uris) && return Objects.equals(uris, that.uris) &&
Objects.equals(httpAuthenticationUsername, source.httpAuthenticationUsername) && Objects.equals(httpAuthenticationUsername, that.httpAuthenticationUsername) &&
Objects.equals(httpAuthenticationPasswordProvider, source.httpAuthenticationPasswordProvider); Objects.equals(httpAuthenticationPasswordProvider, that.httpAuthenticationPasswordProvider) &&
Objects.equals(config, that.config);
} }
@Override @Override
public int hashCode() public int hashCode()
{ {
return Objects.hash(uris, httpAuthenticationUsername, httpAuthenticationPasswordProvider); return Objects.hash(uris, httpAuthenticationUsername, httpAuthenticationPasswordProvider, config);
} }
@Override @Override

View File

@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.data.input.impl;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.java.util.common.StringUtils;
import javax.annotation.Nullable;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
public class HttpInputSourceConfig
{
@VisibleForTesting
public static final Set<String> DEFAULT_ALLOWED_PROTOCOLS = ImmutableSet.of("http", "https");
@JsonProperty
private final Set<String> allowedProtocols;
@JsonCreator
public HttpInputSourceConfig(
@JsonProperty("allowedProtocols") @Nullable Set<String> allowedProtocols
)
{
this.allowedProtocols = allowedProtocols == null || allowedProtocols.isEmpty()
? DEFAULT_ALLOWED_PROTOCOLS
: allowedProtocols.stream().map(StringUtils::toLowerCase).collect(Collectors.toSet());
}
public Set<String> getAllowedProtocols()
{
return allowedProtocols;
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
HttpInputSourceConfig that = (HttpInputSourceConfig) o;
return Objects.equals(allowedProtocols, that.allowedProtocols);
}
@Override
public int hashCode()
{
return Objects.hash(allowedProtocols);
}
@Override
public String toString()
{
return "HttpInputSourceConfig{" +
", allowedProtocols=" + allowedProtocols +
'}';
}
}

View File

@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.data.input.impl;
import com.google.common.collect.ImmutableSet;
import nl.jqno.equalsverifier.EqualsVerifier;
import org.junit.Assert;
import org.junit.Test;
public class HttpInputSourceConfigTest
{
@Test
public void testEquals()
{
EqualsVerifier.forClass(HttpInputSourceConfig.class).usingGetClass().verify();
}
@Test
public void testNullAllowedProtocolsUseDefault()
{
HttpInputSourceConfig config = new HttpInputSourceConfig(null);
Assert.assertEquals(HttpInputSourceConfig.DEFAULT_ALLOWED_PROTOCOLS, config.getAllowedProtocols());
}
@Test
public void testEmptyAllowedProtocolsUseDefault()
{
HttpInputSourceConfig config = new HttpInputSourceConfig(ImmutableSet.of());
Assert.assertEquals(HttpInputSourceConfig.DEFAULT_ALLOWED_PROTOCOLS, config.getAllowedProtocols());
}
@Test
public void testCustomAllowedProtocols()
{
HttpInputSourceConfig config = new HttpInputSourceConfig(ImmutableSet.of("druid"));
Assert.assertEquals(ImmutableSet.of("druid"), config.getAllowedProtocols());
}
}

View File

@ -19,29 +19,87 @@
package org.apache.druid.data.input.impl; package org.apache.druid.data.input.impl;
import com.fasterxml.jackson.databind.InjectableValues.Std;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.data.input.InputSource; import org.apache.druid.data.input.InputSource;
import org.apache.druid.metadata.DefaultPasswordProvider; import org.apache.druid.metadata.DefaultPasswordProvider;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.io.IOException; import java.io.IOException;
import java.net.URI; import java.net.URI;
public class HttpInputSourceTest public class HttpInputSourceTest
{ {
@Rule
public ExpectedException expectedException = ExpectedException.none();
@Test @Test
public void testSerde() throws IOException public void testSerde() throws IOException
{ {
HttpInputSourceConfig httpInputSourceConfig = new HttpInputSourceConfig(null);
final ObjectMapper mapper = new ObjectMapper(); final ObjectMapper mapper = new ObjectMapper();
mapper.setInjectableValues(new Std().addValue(HttpInputSourceConfig.class, httpInputSourceConfig));
final HttpInputSource source = new HttpInputSource( final HttpInputSource source = new HttpInputSource(
ImmutableList.of(URI.create("http://test.com/http-test")), ImmutableList.of(URI.create("http://test.com/http-test")),
"myName", "myName",
new DefaultPasswordProvider("myPassword") new DefaultPasswordProvider("myPassword"),
httpInputSourceConfig
); );
final byte[] json = mapper.writeValueAsBytes(source); final byte[] json = mapper.writeValueAsBytes(source);
final HttpInputSource fromJson = (HttpInputSource) mapper.readValue(json, InputSource.class); final HttpInputSource fromJson = (HttpInputSource) mapper.readValue(json, InputSource.class);
Assert.assertEquals(source, fromJson); Assert.assertEquals(source, fromJson);
} }
@Test
public void testConstructorAllowsOnlyDefaultProtocols()
{
new HttpInputSource(
ImmutableList.of(URI.create("http:///")),
"myName",
new DefaultPasswordProvider("myPassword"),
new HttpInputSourceConfig(null)
);
new HttpInputSource(
ImmutableList.of(URI.create("https:///")),
"myName",
new DefaultPasswordProvider("myPassword"),
new HttpInputSourceConfig(null)
);
expectedException.expect(IllegalArgumentException.class);
expectedException.expectMessage("Only [http, https] protocols are allowed");
new HttpInputSource(
ImmutableList.of(URI.create("my-protocol:///")),
"myName",
new DefaultPasswordProvider("myPassword"),
new HttpInputSourceConfig(null)
);
}
@Test
public void testConstructorAllowsOnlyCustomProtocols()
{
final HttpInputSourceConfig customConfig = new HttpInputSourceConfig(ImmutableSet.of("druid"));
new HttpInputSource(
ImmutableList.of(URI.create("druid:///")),
"myName",
new DefaultPasswordProvider("myPassword"),
customConfig
);
expectedException.expect(IllegalArgumentException.class);
expectedException.expectMessage("Only [druid] protocols are allowed");
new HttpInputSource(
ImmutableList.of(URI.create("https:///")),
"myName",
new DefaultPasswordProvider("myPassword"),
customConfig
);
}
} }

View File

@ -515,6 +515,28 @@ This deep storage is used to interface with Cassandra. Note that the `druid-cas
|`druid.storage.keyspace`|Cassandra key space.|none| |`druid.storage.keyspace`|Cassandra key space.|none|
### Ingestion Security Configuration
#### HDFS input source
You can set the following property to specify permissible protocols for
the [HDFS input source](../ingestion/native-batch.md#hdfs-input-source) and the [HDFS firehose](../ingestion/native-batch.md#hdfsfirehose).
|Property|Possible Values|Description|Default|
|--------|---------------|-----------|-------|
|`druid.ingestion.hdfs.allowedProtocols`|List of protocols|Allowed protocols for the HDFS input source and HDFS firehose.|["hdfs"]|
#### HTTP input source
You can set the following property to specify permissible protocols for
the [HTTP input source](../ingestion/native-batch.md#http-input-source) and the [HTTP firehose](../ingestion/native-batch.md#httpfirehose).
|Property|Possible Values|Description|Default|
|--------|---------------|-----------|-------|
|`druid.ingestion.http.allowedProtocols`|List of protocols|Allowed protocols for the HTTP input source and HTTP firehose.|["http", "https"]|
### Task Logging ### Task Logging
If you are running the indexing service in remote mode, the task logs must be stored in S3, Azure Blob Store, Google Cloud Storage or HDFS. If you are running the indexing service in remote mode, the task logs must be stored in S3, Azure Blob Store, Google Cloud Storage or HDFS.
@ -1355,6 +1377,7 @@ The amount of direct memory needed by Druid is at least
ensure at least this amount of direct memory is available by providing `-XX:MaxDirectMemorySize=<VALUE>` at the command ensure at least this amount of direct memory is available by providing `-XX:MaxDirectMemorySize=<VALUE>` at the command
line. line.
#### Query Configurations #### Query Configurations
See [general query configuration](#general-query-configuration). See [general query configuration](#general-query-configuration).

View File

@ -1064,7 +1064,7 @@ Sample specs:
"type": "index_parallel", "type": "index_parallel",
"inputSource": { "inputSource": {
"type": "hdfs", "type": "hdfs",
"paths": "hdfs://foo/bar/", "hdfs://bar/foo" "paths": "hdfs://namenode_host/foo/bar/", "hdfs://namenode_host/bar/foo"
}, },
"inputFormat": { "inputFormat": {
"type": "json" "type": "json"
@ -1080,7 +1080,7 @@ Sample specs:
"type": "index_parallel", "type": "index_parallel",
"inputSource": { "inputSource": {
"type": "hdfs", "type": "hdfs",
"paths": ["hdfs://foo/bar", "hdfs://bar/foo"] "paths": "hdfs://namenode_host/foo/bar/", "hdfs://namenode_host/bar/foo"
}, },
"inputFormat": { "inputFormat": {
"type": "json" "type": "json"
@ -1096,7 +1096,7 @@ Sample specs:
"type": "index_parallel", "type": "index_parallel",
"inputSource": { "inputSource": {
"type": "hdfs", "type": "hdfs",
"paths": "hdfs://foo/bar/file.json", "hdfs://bar/foo/file2.json" "paths": "hdfs://namenode_host/foo/bar/file.json", "hdfs://namenode_host/bar/foo/file2.json"
}, },
"inputFormat": { "inputFormat": {
"type": "json" "type": "json"
@ -1112,7 +1112,7 @@ Sample specs:
"type": "index_parallel", "type": "index_parallel",
"inputSource": { "inputSource": {
"type": "hdfs", "type": "hdfs",
"paths": ["hdfs://foo/bar/file.json", "hdfs://bar/foo/file2.json"] "paths": ["hdfs://namenode_host/foo/bar/file.json", "hdfs://namenode_host/bar/foo/file2.json"]
}, },
"inputFormat": { "inputFormat": {
"type": "json" "type": "json"
@ -1127,9 +1127,10 @@ Sample specs:
|type|This should be `hdfs`.|None|yes| |type|This should be `hdfs`.|None|yes|
|paths|HDFS paths. Can be either a JSON array or comma-separated string of paths. Wildcards like `*` are supported in these paths. Empty files located under one of the given paths will be skipped.|None|yes| |paths|HDFS paths. Can be either a JSON array or comma-separated string of paths. Wildcards like `*` are supported in these paths. Empty files located under one of the given paths will be skipped.|None|yes|
You can also ingest from cloud storage using the HDFS input source. You can also ingest from other storage using the HDFS input source if the HDFS client supports that storage.
However, if you want to read from AWS S3 or Google Cloud Storage, consider using However, if you want to ingest from cloud storage, consider using the service-specific input source for your data storage.
the [S3 input source](#s3-input-source) or the [Google Cloud Storage input source](#google-cloud-storage-input-source) instead. If you want to use a non-hdfs protocol with the HDFS input source, include the protocol
in `druid.ingestion.hdfs.allowedProtocols`. See [HDFS input source security configuration](../configuration/index.md#hdfs-input-source) for more details.
### HTTP Input Source ### HTTP Input Source
@ -1209,10 +1210,13 @@ You can also use the other existing Druid PasswordProviders. Here is an example
|property|description|default|required?| |property|description|default|required?|
|--------|-----------|-------|---------| |--------|-----------|-------|---------|
|type|This should be `http`|None|yes| |type|This should be `http`|None|yes|
|uris|URIs of the input files.|None|yes| |uris|URIs of the input files. See below for the protocols allowed for URIs.|None|yes|
|httpAuthenticationUsername|Username to use for authentication with specified URIs. Can be optionally used if the URIs specified in the spec require a Basic Authentication Header.|None|no| |httpAuthenticationUsername|Username to use for authentication with specified URIs. Can be optionally used if the URIs specified in the spec require a Basic Authentication Header.|None|no|
|httpAuthenticationPassword|PasswordProvider to use with specified URIs. Can be optionally used if the URIs specified in the spec require a Basic Authentication Header.|None|no| |httpAuthenticationPassword|PasswordProvider to use with specified URIs. Can be optionally used if the URIs specified in the spec require a Basic Authentication Header.|None|no|
You can only use protocols listed in the `druid.ingestion.http.allowedProtocols` property as HTTP input sources.
The `http` and `https` protocols are allowed by default. See [HTTP input source security configuration](../configuration/index.md#http-input-source) for more details.
### Inline Input Source ### Inline Input Source
The Inline input source can be used to read the data inlined in its own spec. The Inline input source can be used to read the data inlined in its own spec.
@ -1559,6 +1563,11 @@ Note that prefetching or caching isn't that useful in the Parallel task.
|fetchTimeout|Timeout for fetching each file.|60000| |fetchTimeout|Timeout for fetching each file.|60000|
|maxFetchRetry|Maximum number of retries for fetching each file.|3| |maxFetchRetry|Maximum number of retries for fetching each file.|3|
You can also ingest from other storage using the HDFS firehose if the HDFS client supports that storage.
However, if you want to ingest from cloud storage, consider using the service-specific input source for your data storage.
If you want to use a non-hdfs protocol with the HDFS firehose, you need to include the protocol you want
in `druid.ingestion.hdfs.allowedProtocols`. See [HDFS firehose security configuration](../configuration/index.md#hdfs-input-source) for more details.
### LocalFirehose ### LocalFirehose
This Firehose can be used to read the data from files on local disk, and is mainly intended for proof-of-concept testing, and works with `string` typed parsers. This Firehose can be used to read the data from files on local disk, and is mainly intended for proof-of-concept testing, and works with `string` typed parsers.
@ -1596,6 +1605,9 @@ A sample HTTP Firehose spec is shown below:
} }
``` ```
You can only use protocols listed in the `druid.ingestion.http.allowedProtocols` property as HTTP firehose input sources.
The `http` and `https` protocols are allowed by default. See [HTTP firehose security configuration](../configuration/index.md#http-input-source) for more details.
The below configurations can be optionally used if the URIs specified in the spec require a Basic Authentication Header. The below configurations can be optionally used if the URIs specified in the spec require a Basic Authentication Header.
Omitting these fields from your spec will result in HTTP requests with no Basic Authentication Header. Omitting these fields from your spec will result in HTTP requests with no Basic Authentication Header.

View File

@ -29,6 +29,7 @@ import org.apache.druid.data.input.impl.StringInputRowParser;
import org.apache.druid.data.input.impl.prefetch.PrefetchableTextFilesFirehoseFactory; import org.apache.druid.data.input.impl.prefetch.PrefetchableTextFilesFirehoseFactory;
import org.apache.druid.guice.Hdfs; import org.apache.druid.guice.Hdfs;
import org.apache.druid.inputsource.hdfs.HdfsInputSource; import org.apache.druid.inputsource.hdfs.HdfsInputSource;
import org.apache.druid.inputsource.hdfs.HdfsInputSourceConfig;
import org.apache.druid.storage.hdfs.HdfsDataSegmentPuller; import org.apache.druid.storage.hdfs.HdfsDataSegmentPuller;
import org.apache.druid.utils.CompressionUtils; import org.apache.druid.utils.CompressionUtils;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -44,21 +45,25 @@ public class HdfsFirehoseFactory extends PrefetchableTextFilesFirehoseFactory<Pa
{ {
private final List<String> inputPaths; private final List<String> inputPaths;
private final Configuration conf; private final Configuration conf;
private final HdfsInputSourceConfig inputSourceConfig;
@JsonCreator @JsonCreator
public HdfsFirehoseFactory( public HdfsFirehoseFactory(
@JacksonInject @Hdfs Configuration conf,
@JsonProperty("paths") Object inputPaths, @JsonProperty("paths") Object inputPaths,
@JsonProperty("maxCacheCapacityBytes") Long maxCacheCapacityBytes, @JsonProperty("maxCacheCapacityBytes") Long maxCacheCapacityBytes,
@JsonProperty("maxFetchCapacityBytes") Long maxFetchCapacityBytes, @JsonProperty("maxFetchCapacityBytes") Long maxFetchCapacityBytes,
@JsonProperty("prefetchTriggerBytes") Long prefetchTriggerBytes, @JsonProperty("prefetchTriggerBytes") Long prefetchTriggerBytes,
@JsonProperty("fetchTimeout") Long fetchTimeout, @JsonProperty("fetchTimeout") Long fetchTimeout,
@JsonProperty("maxFetchRetry") Integer maxFetchRetry @JsonProperty("maxFetchRetry") Integer maxFetchRetry,
@JacksonInject @Hdfs Configuration conf,
@JacksonInject HdfsInputSourceConfig inputSourceConfig
) )
{ {
super(maxCacheCapacityBytes, maxFetchCapacityBytes, prefetchTriggerBytes, fetchTimeout, maxFetchRetry); super(maxCacheCapacityBytes, maxFetchCapacityBytes, prefetchTriggerBytes, fetchTimeout, maxFetchRetry);
this.inputPaths = HdfsInputSource.coerceInputPathsToList(inputPaths, "inputPaths"); this.inputPaths = HdfsInputSource.coerceInputPathsToList(inputPaths, "paths");
this.conf = conf; this.conf = conf;
this.inputSourceConfig = inputSourceConfig;
this.inputPaths.forEach(p -> HdfsInputSource.verifyProtocol(conf, inputSourceConfig, p));
} }
@JsonProperty("paths") @JsonProperty("paths")
@ -109,21 +114,14 @@ public class HdfsFirehoseFactory extends PrefetchableTextFilesFirehoseFactory<Pa
public FiniteFirehoseFactory<StringInputRowParser, Path> withSplit(InputSplit<Path> split) public FiniteFirehoseFactory<StringInputRowParser, Path> withSplit(InputSplit<Path> split)
{ {
return new HdfsFirehoseFactory( return new HdfsFirehoseFactory(
conf,
split.get().toString(), split.get().toString(),
getMaxCacheCapacityBytes(), getMaxCacheCapacityBytes(),
getMaxFetchCapacityBytes(), getMaxFetchCapacityBytes(),
getPrefetchTriggerBytes(), getPrefetchTriggerBytes(),
getFetchTimeout(), getFetchTimeout(),
getMaxFetchRetry() getMaxFetchRetry(),
conf,
inputSourceConfig
); );
} }
@Override
public String toString()
{
return "HdfsFirehoseFactory{" +
"inputPaths=" + inputPaths +
'}';
}
} }

View File

@ -37,6 +37,7 @@ import org.apache.druid.data.input.impl.InputEntityIteratingReader;
import org.apache.druid.data.input.impl.SplittableInputSource; import org.apache.druid.data.input.impl.SplittableInputSource;
import org.apache.druid.guice.Hdfs; import org.apache.druid.guice.Hdfs;
import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.utils.Streams; import org.apache.druid.utils.Streams;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
@ -64,6 +65,7 @@ public class HdfsInputSource extends AbstractInputSource implements SplittableIn
private final List<String> inputPaths; private final List<String> inputPaths;
private final Configuration configuration; private final Configuration configuration;
private final HdfsInputSourceConfig inputSourceConfig;
// Although the javadocs for SplittableInputSource say to avoid caching splits to reduce memory, HdfsInputSource // Although the javadocs for SplittableInputSource say to avoid caching splits to reduce memory, HdfsInputSource
// *does* cache the splits for the following reasons: // *does* cache the splits for the following reasons:
@ -73,32 +75,49 @@ public class HdfsInputSource extends AbstractInputSource implements SplittableIn
// //
// 2) The index_hadoop task allocates splits eagerly, so the memory usage should not be a problem for anyone // 2) The index_hadoop task allocates splits eagerly, so the memory usage should not be a problem for anyone
// migrating from Hadoop. // migrating from Hadoop.
private List<Path> cachedPaths; @Nullable
private List<Path> cachedPaths = null;
@JsonCreator @JsonCreator
public HdfsInputSource( public HdfsInputSource(
@JsonProperty(PROP_PATHS) Object inputPaths, @JsonProperty(PROP_PATHS) Object inputPaths,
@JacksonInject @Hdfs Configuration configuration @JacksonInject @Hdfs Configuration configuration,
@JacksonInject HdfsInputSourceConfig inputSourceConfig
) )
{ {
this.inputPaths = coerceInputPathsToList(inputPaths, PROP_PATHS); this.inputPaths = coerceInputPathsToList(inputPaths, PROP_PATHS);
this.configuration = configuration; this.configuration = configuration;
this.cachedPaths = null; this.inputSourceConfig = inputSourceConfig;
this.inputPaths.forEach(p -> verifyProtocol(configuration, inputSourceConfig, p));
} }
public static List<String> coerceInputPathsToList(Object inputPaths, String propertyName) public static List<String> coerceInputPathsToList(Object inputPaths, String propertyName)
{ {
final List<String> paths;
if (inputPaths instanceof String) { if (inputPaths instanceof String) {
paths = Collections.singletonList((String) inputPaths); return Collections.singletonList((String) inputPaths);
} else if (inputPaths instanceof List && ((List<?>) inputPaths).stream().allMatch(x -> x instanceof String)) { } else if (inputPaths instanceof List && ((List<?>) inputPaths).stream().allMatch(x -> x instanceof String)) {
paths = ((List<?>) inputPaths).stream().map(x -> (String) x).collect(Collectors.toList()); return ((List<?>) inputPaths).stream().map(x -> (String) x).collect(Collectors.toList());
} else { } else {
throw new IAE("'%s' must be a string or an array of strings", propertyName); throw new IAE("'%s' must be a string or an array of strings", propertyName);
} }
}
return paths; public static void verifyProtocol(Configuration conf, HdfsInputSourceConfig config, String pathString)
{
Path path = new Path(pathString);
try {
throwIfInvalidProtocol(config, path.getFileSystem(conf).getScheme());
}
catch (IOException e) {
throw new RuntimeException(e);
}
}
private static void throwIfInvalidProtocol(HdfsInputSourceConfig config, String scheme)
{
if (!config.getAllowedProtocols().contains(StringUtils.toLowerCase(scheme))) {
throw new IAE("Only %s protocols are allowed", config.getAllowedProtocols());
}
} }
public static Collection<Path> getPaths(List<String> inputPaths, Configuration configuration) throws IOException public static Collection<Path> getPaths(List<String> inputPaths, Configuration configuration) throws IOException
@ -202,7 +221,7 @@ public class HdfsInputSource extends AbstractInputSource implements SplittableIn
public SplittableInputSource<List<Path>> withSplit(InputSplit<List<Path>> split) public SplittableInputSource<List<Path>> withSplit(InputSplit<List<Path>> split)
{ {
List<String> paths = split.get().stream().map(path -> path.toString()).collect(Collectors.toList()); List<String> paths = split.get().stream().map(path -> path.toString()).collect(Collectors.toList());
return new HdfsInputSource(paths, configuration); return new HdfsInputSource(paths, configuration, inputSourceConfig);
} }
@Override @Override
@ -218,6 +237,7 @@ public class HdfsInputSource extends AbstractInputSource implements SplittableIn
} }
} }
@VisibleForTesting
static Builder builder() static Builder builder()
{ {
return new Builder(); return new Builder();
@ -227,6 +247,7 @@ public class HdfsInputSource extends AbstractInputSource implements SplittableIn
{ {
private Object paths; private Object paths;
private Configuration configuration; private Configuration configuration;
private HdfsInputSourceConfig inputSourceConfig;
private Builder() private Builder()
{ {
@ -244,9 +265,19 @@ public class HdfsInputSource extends AbstractInputSource implements SplittableIn
return this; return this;
} }
Builder inputSourceConfig(HdfsInputSourceConfig inputSourceConfig)
{
this.inputSourceConfig = inputSourceConfig;
return this;
}
HdfsInputSource build() HdfsInputSource build()
{ {
return new HdfsInputSource(paths, configuration); return new HdfsInputSource(
Preconditions.checkNotNull(paths, "paths"),
Preconditions.checkNotNull(configuration, "configuration"),
Preconditions.checkNotNull(inputSourceConfig, "inputSourceConfig")
);
} }
} }
} }

View File

@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.inputsource.hdfs;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.java.util.common.StringUtils;
import javax.annotation.Nullable;
import java.util.Set;
import java.util.stream.Collectors;
public class HdfsInputSourceConfig
{
static final Set<String> DEFAULT_ALLOWED_PROTOCOLS = ImmutableSet.of("hdfs");
@JsonProperty
private final Set<String> allowedProtocols;
@JsonCreator
public HdfsInputSourceConfig(
@JsonProperty("allowedProtocols") @Nullable Set<String> allowedProtocols
)
{
this.allowedProtocols = allowedProtocols == null || allowedProtocols.isEmpty()
? DEFAULT_ALLOWED_PROTOCOLS
: allowedProtocols.stream().map(StringUtils::toLowerCase).collect(Collectors.toSet());
}
public Set<String> getAllowedProtocols()
{
return allowedProtocols;
}
}

View File

@ -35,6 +35,7 @@ import org.apache.druid.guice.LifecycleModule;
import org.apache.druid.guice.ManageLifecycle; import org.apache.druid.guice.ManageLifecycle;
import org.apache.druid.initialization.DruidModule; import org.apache.druid.initialization.DruidModule;
import org.apache.druid.inputsource.hdfs.HdfsInputSource; import org.apache.druid.inputsource.hdfs.HdfsInputSource;
import org.apache.druid.inputsource.hdfs.HdfsInputSourceConfig;
import org.apache.druid.storage.hdfs.tasklog.HdfsTaskLogs; import org.apache.druid.storage.hdfs.tasklog.HdfsTaskLogs;
import org.apache.druid.storage.hdfs.tasklog.HdfsTaskLogsConfig; import org.apache.druid.storage.hdfs.tasklog.HdfsTaskLogsConfig;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -118,5 +119,7 @@ public class HdfsStorageDruidModule implements DruidModule
JsonConfigProvider.bind(binder, "druid.hadoop.security.kerberos", HdfsKerberosConfig.class); JsonConfigProvider.bind(binder, "druid.hadoop.security.kerberos", HdfsKerberosConfig.class);
binder.bind(HdfsStorageAuthentication.class).in(ManageLifecycle.class); binder.bind(HdfsStorageAuthentication.class).in(ManageLifecycle.class);
LifecycleModule.register(binder, HdfsStorageAuthentication.class); LifecycleModule.register(binder, HdfsStorageAuthentication.class);
JsonConfigProvider.bind(binder, "druid.ingestion.hdfs", HdfsInputSourceConfig.class);
} }
} }

View File

@ -19,30 +19,48 @@
package org.apache.druid.firehose.hdfs; package org.apache.druid.firehose.hdfs;
import com.fasterxml.jackson.databind.InjectableValues; import com.fasterxml.jackson.databind.InjectableValues.Std;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.data.input.FirehoseFactory; import org.apache.druid.data.input.FirehoseFactory;
import org.apache.druid.inputsource.hdfs.HdfsInputSourceConfig;
import org.apache.druid.storage.hdfs.HdfsStorageDruidModule; import org.apache.druid.storage.hdfs.HdfsStorageDruidModule;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.junit.Assert; import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.io.IOException; import java.io.IOException;
import java.util.Collections; import java.util.Collections;
public class HdfsFirehoseFactoryTest public class HdfsFirehoseFactoryTest
{ {
private static final HdfsInputSourceConfig DEFAULT_INPUT_SOURCE_CONFIG = new HdfsInputSourceConfig(null);
private static final Configuration DEFAULT_CONFIGURATION = new Configuration();
@BeforeClass
public static void setup()
{
DEFAULT_CONFIGURATION.set("fs.default.name", "hdfs://localhost:7020");
}
@Rule
public ExpectedException expectedException = ExpectedException.none();
@Test @Test
public void testArrayPaths() throws IOException public void testArrayPaths() throws IOException
{ {
final HdfsFirehoseFactory firehoseFactory = new HdfsFirehoseFactory( final HdfsFirehoseFactory firehoseFactory = new HdfsFirehoseFactory(
null,
Collections.singletonList("/foo/bar"), Collections.singletonList("/foo/bar"),
null, null,
null, null,
null, null,
null, null,
null null,
DEFAULT_CONFIGURATION,
DEFAULT_INPUT_SOURCE_CONFIG
); );
final ObjectMapper mapper = createMapper(); final ObjectMapper mapper = createMapper();
@ -59,7 +77,16 @@ public class HdfsFirehoseFactoryTest
@Test @Test
public void testStringPaths() throws IOException public void testStringPaths() throws IOException
{ {
final HdfsFirehoseFactory firehoseFactory = new HdfsFirehoseFactory(null, "/foo/bar", null, null, null, null, null); final HdfsFirehoseFactory firehoseFactory = new HdfsFirehoseFactory(
"/foo/bar",
null,
null,
null,
null,
null,
DEFAULT_CONFIGURATION,
DEFAULT_INPUT_SOURCE_CONFIG
);
final ObjectMapper mapper = createMapper(); final ObjectMapper mapper = createMapper();
final HdfsFirehoseFactory firehoseFactory2 = (HdfsFirehoseFactory) final HdfsFirehoseFactory firehoseFactory2 = (HdfsFirehoseFactory)
@ -71,11 +98,121 @@ public class HdfsFirehoseFactoryTest
); );
} }
@Test
public void testConstructorAllowsOnlyDefaultProtocol()
{
new HdfsFirehoseFactory(
"hdfs://localhost:7020/foo/bar",
null,
null,
null,
null,
null,
DEFAULT_CONFIGURATION,
DEFAULT_INPUT_SOURCE_CONFIG
);
expectedException.expect(IllegalArgumentException.class);
expectedException.expectMessage("Only [hdfs] protocols are allowed");
new HdfsFirehoseFactory(
"file:/foo/bar",
null,
null,
null,
null,
null,
DEFAULT_CONFIGURATION,
DEFAULT_INPUT_SOURCE_CONFIG
);
}
@Test
public void testConstructorAllowsOnlyCustomProtocol()
{
final Configuration conf = new Configuration();
conf.set("fs.ftp.impl", "org.apache.hadoop.fs.ftp.FTPFileSystem");
new HdfsFirehoseFactory(
"ftp://localhost:21/foo/bar",
null,
null,
null,
null,
null,
DEFAULT_CONFIGURATION,
new HdfsInputSourceConfig(ImmutableSet.of("ftp"))
);
expectedException.expect(IllegalArgumentException.class);
expectedException.expectMessage("Only [druid] protocols are allowed");
new HdfsFirehoseFactory(
"hdfs://localhost:7020/foo/bar",
null,
null,
null,
null,
null,
DEFAULT_CONFIGURATION,
new HdfsInputSourceConfig(ImmutableSet.of("druid"))
);
}
@Test
public void testConstructorWithDefaultHdfs()
{
new HdfsFirehoseFactory(
"/foo/bar*",
null,
null,
null,
null,
null,
DEFAULT_CONFIGURATION,
DEFAULT_INPUT_SOURCE_CONFIG
);
new HdfsFirehoseFactory(
"foo/bar*",
null,
null,
null,
null,
null,
DEFAULT_CONFIGURATION,
DEFAULT_INPUT_SOURCE_CONFIG
);
new HdfsFirehoseFactory(
"hdfs:///foo/bar*",
null,
null,
null,
null,
null,
DEFAULT_CONFIGURATION,
DEFAULT_INPUT_SOURCE_CONFIG
);
new HdfsFirehoseFactory(
"hdfs://localhost:10020/foo/bar*", // different hdfs
null,
null,
null,
null,
null,
DEFAULT_CONFIGURATION,
DEFAULT_INPUT_SOURCE_CONFIG
);
}
private static ObjectMapper createMapper() private static ObjectMapper createMapper()
{ {
final ObjectMapper mapper = new ObjectMapper(); final ObjectMapper mapper = new ObjectMapper();
new HdfsStorageDruidModule().getJacksonModules().forEach(mapper::registerModule); new HdfsStorageDruidModule().getJacksonModules().forEach(mapper::registerModule);
mapper.setInjectableValues(new InjectableValues.Std().addValue(Configuration.class, new Configuration())); mapper.setInjectableValues(
new Std()
.addValue(Configuration.class, DEFAULT_CONFIGURATION)
.addValue(HdfsInputSourceConfig.class, DEFAULT_INPUT_SOURCE_CONFIG)
);
return mapper; return mapper;
} }
} }

View File

@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.inputsource.hdfs;
import com.google.common.collect.ImmutableSet;
import org.junit.Assert;
import org.junit.Test;
public class HdfsInputSourceConfigTest
{
@Test
public void testNullAllowedProtocolsUseDefault()
{
HdfsInputSourceConfig config = new HdfsInputSourceConfig(null);
Assert.assertEquals(HdfsInputSourceConfig.DEFAULT_ALLOWED_PROTOCOLS, config.getAllowedProtocols());
}
@Test
public void testEmptyAllowedProtocolsUseDefault()
{
HdfsInputSourceConfig config = new HdfsInputSourceConfig(ImmutableSet.of());
Assert.assertEquals(HdfsInputSourceConfig.DEFAULT_ALLOWED_PROTOCOLS, config.getAllowedProtocols());
}
@Test
public void testCustomAllowedProtocols()
{
HdfsInputSourceConfig config = new HdfsInputSourceConfig(ImmutableSet.of("druid"));
Assert.assertEquals(ImmutableSet.of("druid"), config.getAllowedProtocols());
}
}

View File

@ -20,8 +20,9 @@
package org.apache.druid.inputsource.hdfs; package org.apache.druid.inputsource.hdfs;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.InjectableValues; import com.fasterxml.jackson.databind.InjectableValues.Std;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables; import com.google.common.collect.Iterables;
import org.apache.druid.data.input.InputFormat; import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.InputRow; import org.apache.druid.data.input.InputRow;
@ -68,8 +69,9 @@ import java.util.stream.IntStream;
@RunWith(Enclosed.class) @RunWith(Enclosed.class)
public class HdfsInputSourceTest extends InitializedNullHandlingTest public class HdfsInputSourceTest extends InitializedNullHandlingTest
{ {
private static final String PATH = "/foo/bar"; private static final String PATH = "hdfs://localhost:7020/foo/bar";
private static final Configuration CONFIGURATION = new Configuration(); private static final Configuration CONFIGURATION = new Configuration();
private static final HdfsInputSourceConfig DEFAULT_INPUT_SOURCE_CONFIG = new HdfsInputSourceConfig(null);
private static final String COLUMN = "value"; private static final String COLUMN = "value";
private static final InputRowSchema INPUT_ROW_SCHEMA = new InputRowSchema( private static final InputRowSchema INPUT_ROW_SCHEMA = new InputRowSchema(
new TimestampSpec(null, null, null), new TimestampSpec(null, null, null),
@ -84,6 +86,80 @@ public class HdfsInputSourceTest extends InitializedNullHandlingTest
0 0
); );
public static class ConstructorTest
{
@Rule
public ExpectedException expectedException = ExpectedException.none();
@Test
public void testConstructorAllowsOnlyDefaultProtocol()
{
HdfsInputSource.builder()
.paths(PATH + "*")
.configuration(CONFIGURATION)
.inputSourceConfig(DEFAULT_INPUT_SOURCE_CONFIG)
.build();
expectedException.expect(IllegalArgumentException.class);
expectedException.expectMessage("Only [hdfs] protocols are allowed");
HdfsInputSource.builder()
.paths("file:/foo/bar*")
.configuration(CONFIGURATION)
.inputSourceConfig(DEFAULT_INPUT_SOURCE_CONFIG)
.build();
}
@Test
public void testConstructorAllowsOnlyCustomProtocol()
{
final Configuration conf = new Configuration();
conf.set("fs.ftp.impl", "org.apache.hadoop.fs.ftp.FTPFileSystem");
HdfsInputSource.builder()
.paths("ftp://localhost:21/foo/bar")
.configuration(CONFIGURATION)
.inputSourceConfig(new HdfsInputSourceConfig(ImmutableSet.of("ftp")))
.build();
expectedException.expect(IllegalArgumentException.class);
expectedException.expectMessage("Only [druid] protocols are allowed");
HdfsInputSource.builder()
.paths(PATH + "*")
.configuration(CONFIGURATION)
.inputSourceConfig(new HdfsInputSourceConfig(ImmutableSet.of("druid")))
.build();
}
@Test
public void testConstructorWithDefaultHdfs()
{
final Configuration conf = new Configuration();
conf.set("fs.default.name", "hdfs://localhost:7020");
HdfsInputSource.builder()
.paths("/foo/bar*")
.configuration(conf)
.inputSourceConfig(DEFAULT_INPUT_SOURCE_CONFIG)
.build();
HdfsInputSource.builder()
.paths("foo/bar*")
.configuration(conf)
.inputSourceConfig(DEFAULT_INPUT_SOURCE_CONFIG)
.build();
HdfsInputSource.builder()
.paths("hdfs:///foo/bar*")
.configuration(conf)
.inputSourceConfig(DEFAULT_INPUT_SOURCE_CONFIG)
.build();
HdfsInputSource.builder()
.paths("hdfs://localhost:10020/foo/bar*") // different hdfs
.configuration(conf)
.inputSourceConfig(DEFAULT_INPUT_SOURCE_CONFIG)
.build();
}
}
public static class SerializeDeserializeTest public static class SerializeDeserializeTest
{ {
private static final ObjectMapper OBJECT_MAPPER = createObjectMapper(); private static final ObjectMapper OBJECT_MAPPER = createObjectMapper();
@ -98,7 +174,8 @@ public class HdfsInputSourceTest extends InitializedNullHandlingTest
{ {
hdfsInputSourceBuilder = HdfsInputSource.builder() hdfsInputSourceBuilder = HdfsInputSource.builder()
.paths(PATH) .paths(PATH)
.configuration(CONFIGURATION); .configuration(CONFIGURATION)
.inputSourceConfig(DEFAULT_INPUT_SOURCE_CONFIG);
} }
@Test @Test
@ -139,7 +216,11 @@ public class HdfsInputSourceTest extends InitializedNullHandlingTest
private static ObjectMapper createObjectMapper() private static ObjectMapper createObjectMapper()
{ {
final ObjectMapper mapper = new ObjectMapper(); final ObjectMapper mapper = new ObjectMapper();
mapper.setInjectableValues(new InjectableValues.Std().addValue(Configuration.class, new Configuration())); mapper.setInjectableValues(
new Std()
.addValue(Configuration.class, new Configuration())
.addValue(HdfsInputSourceConfig.class, DEFAULT_INPUT_SOURCE_CONFIG)
);
new HdfsStorageDruidModule().getJacksonModules().forEach(mapper::registerModule); new HdfsStorageDruidModule().getJacksonModules().forEach(mapper::registerModule);
return mapper; return mapper;
} }
@ -204,6 +285,7 @@ public class HdfsInputSourceTest extends InitializedNullHandlingTest
target = HdfsInputSource.builder() target = HdfsInputSource.builder()
.paths(dfsCluster.getURI() + PATH + "*") .paths(dfsCluster.getURI() + PATH + "*")
.configuration(CONFIGURATION) .configuration(CONFIGURATION)
.inputSourceConfig(DEFAULT_INPUT_SOURCE_CONFIG)
.build(); .build();
} }
@ -304,6 +386,7 @@ public class HdfsInputSourceTest extends InitializedNullHandlingTest
target = HdfsInputSource.builder() target = HdfsInputSource.builder()
.paths(Collections.emptyList()) .paths(Collections.emptyList())
.configuration(CONFIGURATION) .configuration(CONFIGURATION)
.inputSourceConfig(DEFAULT_INPUT_SOURCE_CONFIG)
.build(); .build();
} }

View File

@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.storage.hdfs;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.inject.Guice;
import com.google.inject.Injector;
import org.apache.druid.guice.DruidGuiceExtensions;
import org.apache.druid.guice.JsonConfigurator;
import org.apache.druid.guice.LazySingleton;
import org.apache.druid.guice.LifecycleModule;
import org.apache.druid.inputsource.hdfs.HdfsInputSourceConfig;
import org.junit.Assert;
import org.junit.Test;
import javax.validation.Validation;
import javax.validation.Validator;
import java.util.Properties;
public class HdfsStorageDruidModuleTest
{
@Test
public void testHdfsInputSourceConfigDefaultAllowedProtocols()
{
Properties props = new Properties();
Injector injector = makeInjectorWithProperties(props);
HdfsInputSourceConfig instance = injector.getInstance(HdfsInputSourceConfig.class);
Assert.assertEquals(
ImmutableSet.of("hdfs"),
instance.getAllowedProtocols()
);
}
@Test
public void testHdfsInputSourceConfigCustomAllowedProtocols()
{
Properties props = new Properties();
props.setProperty("druid.ingestion.hdfs.allowedProtocols", "[\"webhdfs\"]");
Injector injector = makeInjectorWithProperties(props);
HdfsInputSourceConfig instance = injector.getInstance(HdfsInputSourceConfig.class);
Assert.assertEquals(
ImmutableSet.of("webhdfs"),
instance.getAllowedProtocols()
);
}
private Injector makeInjectorWithProperties(final Properties props)
{
return Guice.createInjector(
ImmutableList.of(
new DruidGuiceExtensions(),
new LifecycleModule(),
binder -> {
binder.bind(Validator.class).toInstance(Validation.buildDefaultValidatorFactory().getValidator());
binder.bind(JsonConfigurator.class).in(LazySingleton.class);
binder.bind(Properties.class).toInstance(props);
},
new HdfsStorageDruidModule()
)
);
}
}

View File

@ -24,6 +24,8 @@ import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.fasterxml.jackson.databind.module.SimpleModule; import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.inject.Binder; import com.google.inject.Binder;
import org.apache.druid.data.input.impl.HttpInputSourceConfig;
import org.apache.druid.guice.JsonConfigProvider;
import org.apache.druid.initialization.DruidModule; import org.apache.druid.initialization.DruidModule;
import java.util.List; import java.util.List;
@ -47,5 +49,6 @@ public class InputSourceModule implements DruidModule
@Override @Override
public void configure(Binder binder) public void configure(Binder binder)
{ {
JsonConfigProvider.bind(binder, "druid.ingestion.http", HttpInputSourceConfig.class);
} }
} }

View File

@ -19,6 +19,7 @@
package org.apache.druid.segment.realtime.firehose; package org.apache.druid.segment.realtime.firehose;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
@ -26,9 +27,10 @@ import com.google.common.base.Predicate;
import org.apache.druid.data.input.FiniteFirehoseFactory; import org.apache.druid.data.input.FiniteFirehoseFactory;
import org.apache.druid.data.input.InputSplit; import org.apache.druid.data.input.InputSplit;
import org.apache.druid.data.input.impl.HttpEntity; import org.apache.druid.data.input.impl.HttpEntity;
import org.apache.druid.data.input.impl.HttpInputSource;
import org.apache.druid.data.input.impl.HttpInputSourceConfig;
import org.apache.druid.data.input.impl.StringInputRowParser; import org.apache.druid.data.input.impl.StringInputRowParser;
import org.apache.druid.data.input.impl.prefetch.PrefetchableTextFilesFirehoseFactory; import org.apache.druid.data.input.impl.prefetch.PrefetchableTextFilesFirehoseFactory;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.metadata.PasswordProvider; import org.apache.druid.metadata.PasswordProvider;
import org.apache.druid.utils.CompressionUtils; import org.apache.druid.utils.CompressionUtils;
@ -43,12 +45,12 @@ import java.util.Objects;
public class HttpFirehoseFactory extends PrefetchableTextFilesFirehoseFactory<URI> public class HttpFirehoseFactory extends PrefetchableTextFilesFirehoseFactory<URI>
{ {
private static final Logger log = new Logger(HttpFirehoseFactory.class);
private final List<URI> uris; private final List<URI> uris;
@Nullable @Nullable
private final String httpAuthenticationUsername; private final String httpAuthenticationUsername;
@Nullable @Nullable
private final PasswordProvider httpAuthenticationPasswordProvider; private final PasswordProvider httpAuthenticationPasswordProvider;
private final HttpInputSourceConfig inputSourceConfig;
@JsonCreator @JsonCreator
public HttpFirehoseFactory( public HttpFirehoseFactory(
@ -59,14 +61,17 @@ public class HttpFirehoseFactory extends PrefetchableTextFilesFirehoseFactory<UR
@JsonProperty("fetchTimeout") Long fetchTimeout, @JsonProperty("fetchTimeout") Long fetchTimeout,
@JsonProperty("maxFetchRetry") Integer maxFetchRetry, @JsonProperty("maxFetchRetry") Integer maxFetchRetry,
@JsonProperty("httpAuthenticationUsername") @Nullable String httpAuthenticationUsername, @JsonProperty("httpAuthenticationUsername") @Nullable String httpAuthenticationUsername,
@JsonProperty("httpAuthenticationPassword") @Nullable PasswordProvider httpAuthenticationPasswordProvider @JsonProperty("httpAuthenticationPassword") @Nullable PasswordProvider httpAuthenticationPasswordProvider,
@JacksonInject HttpInputSourceConfig inputSourceConfig
) )
{ {
super(maxCacheCapacityBytes, maxFetchCapacityBytes, prefetchTriggerBytes, fetchTimeout, maxFetchRetry); super(maxCacheCapacityBytes, maxFetchCapacityBytes, prefetchTriggerBytes, fetchTimeout, maxFetchRetry);
Preconditions.checkArgument(uris.size() > 0, "Empty URIs"); Preconditions.checkArgument(uris.size() > 0, "Empty URIs");
HttpInputSource.throwIfInvalidProtocols(inputSourceConfig, uris);
this.uris = uris; this.uris = uris;
this.httpAuthenticationUsername = httpAuthenticationUsername; this.httpAuthenticationUsername = httpAuthenticationUsername;
this.httpAuthenticationPasswordProvider = httpAuthenticationPasswordProvider; this.httpAuthenticationPasswordProvider = httpAuthenticationPasswordProvider;
this.inputSourceConfig = inputSourceConfig;
} }
@Nullable @Nullable
@ -120,35 +125,20 @@ public class HttpFirehoseFactory extends PrefetchableTextFilesFirehoseFactory<UR
if (this == o) { if (this == o) {
return true; return true;
} }
if (o == null || getClass() != o.getClass()) { if (o == null || getClass() != o.getClass()) {
return false; return false;
} }
HttpFirehoseFactory that = (HttpFirehoseFactory) o;
final HttpFirehoseFactory that = (HttpFirehoseFactory) o; return uris.equals(that.uris) &&
return Objects.equals(uris, that.uris) && Objects.equals(httpAuthenticationUsername, that.httpAuthenticationUsername) &&
getMaxCacheCapacityBytes() == that.getMaxCacheCapacityBytes() && Objects.equals(httpAuthenticationPasswordProvider, that.httpAuthenticationPasswordProvider) &&
getMaxFetchCapacityBytes() == that.getMaxFetchCapacityBytes() && inputSourceConfig.equals(that.inputSourceConfig);
getPrefetchTriggerBytes() == that.getPrefetchTriggerBytes() &&
getFetchTimeout() == that.getFetchTimeout() &&
getMaxFetchRetry() == that.getMaxFetchRetry() &&
Objects.equals(httpAuthenticationUsername, that.getHttpAuthenticationUsername()) &&
Objects.equals(httpAuthenticationPasswordProvider, that.getHttpAuthenticationPasswordProvider());
} }
@Override @Override
public int hashCode() public int hashCode()
{ {
return Objects.hash( return Objects.hash(uris, httpAuthenticationUsername, httpAuthenticationPasswordProvider, inputSourceConfig);
uris,
getMaxCacheCapacityBytes(),
getMaxFetchCapacityBytes(),
getPrefetchTriggerBytes(),
getFetchTimeout(),
getMaxFetchRetry(),
httpAuthenticationUsername,
httpAuthenticationPasswordProvider
);
} }
@Override @Override
@ -168,7 +158,8 @@ public class HttpFirehoseFactory extends PrefetchableTextFilesFirehoseFactory<UR
getFetchTimeout(), getFetchTimeout(),
getMaxFetchRetry(), getMaxFetchRetry(),
getHttpAuthenticationUsername(), getHttpAuthenticationUsername(),
httpAuthenticationPasswordProvider httpAuthenticationPasswordProvider,
inputSourceConfig
); );
} }
} }

View File

@ -25,12 +25,25 @@ import com.fasterxml.jackson.databind.cfg.MapperConfig;
import com.fasterxml.jackson.databind.introspect.AnnotatedClass; import com.fasterxml.jackson.databind.introspect.AnnotatedClass;
import com.fasterxml.jackson.databind.introspect.AnnotatedClassResolver; import com.fasterxml.jackson.databind.introspect.AnnotatedClassResolver;
import com.fasterxml.jackson.databind.jsontype.NamedType; import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables; import com.google.common.collect.Iterables;
import com.google.inject.Guice;
import com.google.inject.Injector;
import org.apache.druid.data.input.impl.HttpInputSourceConfig;
import org.apache.druid.guice.DruidGuiceExtensions;
import org.apache.druid.guice.JsonConfigurator;
import org.apache.druid.guice.LazySingleton;
import org.apache.druid.guice.LifecycleModule;
import org.apache.druid.guice.ServerModule;
import org.apache.druid.jackson.JacksonModule;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import javax.validation.Validation;
import javax.validation.Validator;
import java.util.List; import java.util.List;
import java.util.Properties;
import java.util.stream.Collectors; import java.util.stream.Collectors;
public class InputSourceModuleTest public class InputSourceModuleTest
@ -59,4 +72,31 @@ public class InputSourceModuleTest
Assert.assertNotNull(subtypes); Assert.assertNotNull(subtypes);
Assert.assertEquals(SQL_NAMED_TYPE, Iterables.getOnlyElement(subtypes)); Assert.assertEquals(SQL_NAMED_TYPE, Iterables.getOnlyElement(subtypes));
} }
@Test
public void testHttpInputSourceDefaultConfig()
{
Properties props = new Properties();
Injector injector = makeInjectorWithProperties(props);
HttpInputSourceConfig instance = injector.getInstance(HttpInputSourceConfig.class);
Assert.assertEquals(new HttpInputSourceConfig(null), instance);
Assert.assertEquals(HttpInputSourceConfig.DEFAULT_ALLOWED_PROTOCOLS, instance.getAllowedProtocols());
}
private Injector makeInjectorWithProperties(final Properties props)
{
return Guice.createInjector(
ImmutableList.of(
new DruidGuiceExtensions(),
new LifecycleModule(),
new ServerModule(),
binder -> {
binder.bind(Validator.class).toInstance(Validation.buildDefaultValidatorFactory().getValidator());
binder.bind(JsonConfigurator.class).in(LazySingleton.class);
binder.bind(Properties.class).toInstance(props);
},
new JacksonModule(),
new InputSourceModule()
));
}
} }

View File

@ -19,22 +19,36 @@
package org.apache.druid.segment.realtime.firehose; package org.apache.druid.segment.realtime.firehose;
import com.fasterxml.jackson.databind.InjectableValues.Std;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.data.input.impl.HttpInputSourceConfig;
import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.metadata.DefaultPasswordProvider; import org.apache.druid.metadata.DefaultPasswordProvider;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.io.IOException; import java.io.IOException;
import java.net.URI; import java.net.URI;
public class HttpFirehoseFactoryTest public class HttpFirehoseFactoryTest
{ {
@Rule
public ExpectedException expectedException = ExpectedException.none();
@Test @Test
public void testSerde() throws IOException public void testSerde() throws IOException
{ {
final HttpInputSourceConfig inputSourceConfig = new HttpInputSourceConfig(null);
final ObjectMapper mapper = new DefaultObjectMapper(); final ObjectMapper mapper = new DefaultObjectMapper();
mapper.setInjectableValues(new Std().addValue(
HttpInputSourceConfig.class,
inputSourceConfig
));
final DefaultPasswordProvider pwProvider = new DefaultPasswordProvider("testPassword"); final DefaultPasswordProvider pwProvider = new DefaultPasswordProvider("testPassword");
final HttpFirehoseFactory factory = new HttpFirehoseFactory( final HttpFirehoseFactory factory = new HttpFirehoseFactory(
ImmutableList.of(URI.create("http://foo/bar"), URI.create("http://foo/bar2")), ImmutableList.of(URI.create("http://foo/bar"), URI.create("http://foo/bar2")),
@ -44,7 +58,8 @@ public class HttpFirehoseFactoryTest
100L, 100L,
5, 5,
"testUser", "testUser",
pwProvider pwProvider,
inputSourceConfig
); );
final HttpFirehoseFactory outputFact = mapper.readValue( final HttpFirehoseFactory outputFact = mapper.readValue(
@ -54,4 +69,77 @@ public class HttpFirehoseFactoryTest
Assert.assertEquals(factory, outputFact); Assert.assertEquals(factory, outputFact);
} }
@Test
public void testConstructorAllowsOnlyDefaultProtocols()
{
new HttpFirehoseFactory(
ImmutableList.of(URI.create("http:///")),
null,
null,
null,
null,
null,
null,
null,
new HttpInputSourceConfig(null)
);
new HttpFirehoseFactory(
ImmutableList.of(URI.create("https:///")),
null,
null,
null,
null,
null,
null,
null,
new HttpInputSourceConfig(null)
);
expectedException.expect(IllegalArgumentException.class);
expectedException.expectMessage("Only [http, https] protocols are allowed");
new HttpFirehoseFactory(
ImmutableList.of(URI.create("my-protocol:///")),
null,
null,
null,
null,
null,
null,
null,
new HttpInputSourceConfig(null)
);
}
@Test
public void testConstructorAllowsOnlyCustomProtocols()
{
final HttpInputSourceConfig customConfig = new HttpInputSourceConfig(ImmutableSet.of("druid"));
new HttpFirehoseFactory(
ImmutableList.of(URI.create("druid:///")),
null,
null,
null,
null,
null,
null,
null,
customConfig
);
expectedException.expect(IllegalArgumentException.class);
expectedException.expectMessage("Only [druid] protocols are allowed");
new HttpFirehoseFactory(
ImmutableList.of(URI.create("https:///")),
null,
null,
null,
null,
null,
null,
null,
customConfig
);
}
} }