Add HDFS firehose (#8754)

* Add HDFS firehose.

* Tests, support for lists of paths.

* Fixups.

* Update list of firehoses.

* Wildcards is a word.
This commit is contained in:
Gian Merlino 2019-10-28 08:07:38 -07:00 committed by Fangjin Yang
parent f9b94a5db1
commit b65d2ac648
8 changed files with 312 additions and 57 deletions

View File

@ -27,7 +27,7 @@ To use this Apache Druid (incubating) extension, make sure to [include](../../de
## Deep Storage
### Configuration
### Configuration for HDFS
|Property|Possible Values|Description|Default|
|--------|---------------|-----------|-------|
@ -39,12 +39,10 @@ To use this Apache Druid (incubating) extension, make sure to [include](../../de
If you are using the Hadoop indexer, set your output directory to be a location on Hadoop and it will work.
If you want to eagerly authenticate against a secured hadoop/hdfs cluster you must set `druid.hadoop.security.kerberos.principal` and `druid.hadoop.security.kerberos.keytab`, this is an alternative to the cron job method that runs `kinit` command periodically.
## Google Cloud Storage
### Configuration for Google Cloud Storage
The HDFS extension can also be used for GCS as deep storage.
### Configuration
|Property|Possible Values|Description|Default|
|--------|---------------|-----------|-------|
|`druid.storage.type`|hdfs||Must be set.|
@ -53,3 +51,34 @@ The HDFS extension can also be used for GCS as deep storage.
All services that need to access GCS need to have the [GCS connector jar](https://cloud.google.com/hadoop/google-cloud-storage-connector#manualinstallation) in their class path. One option is to place this jar in <druid>/lib/ and <druid>/extensions/druid-hdfs-storage/
Tested with Druid 0.9.0, Hadoop 2.7.2 and gcs-connector jar 1.4.4-hadoop2.
<a name="firehose"></a>
## Native batch ingestion
This firehose ingests events from a predefined list of S3 objects.
This firehose is _splittable_ and can be used by [native parallel index tasks](../../ingestion/native-batch.md#parallel-task).
Since each split represents an HDFS file, each worker task of `index_parallel` will read an object.
Sample spec:
```json
"firehose" : {
"type" : "hdfs",
"paths": "/foo/bar,/foo/baz"
}
```
This firehose provides caching and prefetching features. During native batch indexing, a firehose can be read twice if
`intervals` are not specified, and, in this case, caching can be useful. Prefetching is preferred when direct scanning
of files is slow.
|Property|Description|Default|
|--------|-----------|-------|
|type|This should be `hdfs`.|none (required)|
|paths|HDFS paths. Can be either a JSON array or comma-separated string of paths. Wildcards like `*` are supported in these paths.|none (required)|
|maxCacheCapacityBytes|Maximum size of the cache space in bytes. 0 means disabling cache. Cached files are not removed until the ingestion task completes.|1073741824|
|maxFetchCapacityBytes|Maximum size of the fetch space in bytes. 0 means disabling prefetch. Prefetched files are removed immediately once they are read.|1073741824|
|prefetchTriggerBytes|Threshold to trigger prefetching s3 objects.|maxFetchCapacityBytes / 2|
|fetchTimeout|Timeout for fetching each file.|60000|
|maxFetchRetry|Maximum number of retries for fetching each file.|3|

View File

@ -63,13 +63,14 @@ As in the single phase execution, the created segments are reported to the super
To use this task, the `firehose` in `ioConfig` should be _splittable_ and `maxNumConcurrentSubTasks` should be set something larger than 1 in `tuningConfig`.
Otherwise, this task runs sequentially. Here is the list of currently splittable firehoses.
- [`LocalFirehose`](#local-firehose)
- [`IngestSegmentFirehose`](#segment-firehose)
- [`HttpFirehose`](#http-firehose)
- [`StaticS3Firehose`](../development/extensions-core/s3.md#firehose)
- [`StaticAzureBlobStoreFirehose`](../development/extensions-contrib/azure.md#firehose)
- [`StaticGoogleBlobStoreFirehose`](../development/extensions-core/google.md#firehose)
- [`StaticCloudFilesFirehose`](../development/extensions-contrib/cloudfiles.md#firehose)
- [`local`](#local-firehose)
- [`ingestSegment`](#segment-firehose)
- [`http`](#http-firehose)
- [`s3`](../development/extensions-core/s3.md#firehose)
- [`hdfs`](../development/extensions-core/hdfs.md#firehose)
- [`static-azure-blobstore`](../development/extensions-contrib/azure.md#firehose)
- [`static-google-blobstore`](../development/extensions-core/google.md#firehose)
- [`static-cloudfiles`](../development/extensions-contrib/cloudfiles.md#firehose)
The splittable firehose is responsible for generating _splits_. The supervisor task generates _worker task specs_ containing a split
and submits worker tasks using those specs. As a result, the number of worker tasks depends on

View File

@ -0,0 +1,158 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.firehose.hdfs;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Predicate;
import org.apache.druid.data.input.FiniteFirehoseFactory;
import org.apache.druid.data.input.InputSplit;
import org.apache.druid.data.input.impl.StringInputRowParser;
import org.apache.druid.data.input.impl.prefetch.PrefetchableTextFilesFirehoseFactory;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.storage.hdfs.HdfsDataSegmentPuller;
import org.apache.druid.utils.CompressionUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import java.io.IOException;
import java.io.InputStream;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
public class HdfsFirehoseFactory extends PrefetchableTextFilesFirehoseFactory<Path>
{
private final List<String> inputPaths;
private final Configuration conf;
@JsonCreator
public HdfsFirehoseFactory(
@JacksonInject Configuration conf,
@JsonProperty("paths") Object inputPaths,
@JsonProperty("maxCacheCapacityBytes") Long maxCacheCapacityBytes,
@JsonProperty("maxFetchCapacityBytes") Long maxFetchCapacityBytes,
@JsonProperty("prefetchTriggerBytes") Long prefetchTriggerBytes,
@JsonProperty("fetchTimeout") Long fetchTimeout,
@JsonProperty("maxFetchRetry") Integer maxFetchRetry
)
{
super(maxCacheCapacityBytes, maxFetchCapacityBytes, prefetchTriggerBytes, fetchTimeout, maxFetchRetry);
this.conf = conf;
// Coerce 'inputPaths' to List<String>
if (inputPaths instanceof String) {
this.inputPaths = Collections.singletonList((String) inputPaths);
} else if (inputPaths instanceof List && ((List<?>) inputPaths).stream().allMatch(x -> x instanceof String)) {
this.inputPaths = ((List<?>) inputPaths).stream().map(x -> (String) x).collect(Collectors.toList());
} else {
throw new IAE("'inputPaths' must be a string or an array of strings");
}
}
@JsonProperty("paths")
public List<String> getInputPaths()
{
return inputPaths;
}
@Override
protected Collection<Path> initObjects() throws IOException
{
// Use TextInputFormat to read splits. To do this, we need to make a fake Job.
final Job job = Job.getInstance(conf);
// Add paths to the fake JobContext.
inputPaths.forEach(input -> {
try {
FileInputFormat.addInputPaths(job, input);
}
catch (IOException e) {
throw new RuntimeException(e);
}
});
return new TextInputFormat().getSplits(job)
.stream()
.map(split -> ((FileSplit) split).getPath())
.collect(Collectors.toSet());
}
@Override
protected InputStream openObjectStream(Path path) throws IOException
{
return path.getFileSystem(conf).open(path);
}
@Override
protected InputStream openObjectStream(Path path, long start) throws IOException
{
final FSDataInputStream in = path.getFileSystem(conf).open(path);
in.seek(start);
return in;
}
@Override
protected InputStream wrapObjectStream(Path path, InputStream stream) throws IOException
{
return CompressionUtils.decompress(stream, path.getName());
}
@Override
protected Predicate<Throwable> getRetryCondition()
{
return HdfsDataSegmentPuller.RETRY_PREDICATE;
}
@Override
public boolean isSplittable()
{
return true;
}
@Override
public FiniteFirehoseFactory<StringInputRowParser, Path> withSplit(InputSplit<Path> split)
{
return new HdfsFirehoseFactory(
conf,
split.get().toString(),
getMaxCacheCapacityBytes(),
getMaxFetchCapacityBytes(),
getPrefetchTriggerBytes(),
getFetchTimeout(),
getMaxFetchRetry()
);
}
@Override
public String toString()
{
return "HdfsFirehoseFactory{" +
"inputPaths=" + inputPaths +
'}';
}
}

View File

@ -49,11 +49,30 @@ import java.io.Writer;
import java.net.URI;
/**
*
*/
public class HdfsDataSegmentPuller implements URIDataPuller
{
public static final int DEFAULT_RETRY_COUNT = 3;
public static final Predicate<Throwable> RETRY_PREDICATE = new Predicate<Throwable>()
{
@Override
public boolean apply(Throwable input)
{
if (input == null) {
return false;
}
if (input instanceof HdfsIOException) {
return true;
}
if (input instanceof IOException) {
return true;
}
return apply(input.getCause());
}
};
/**
* FileObject.getLastModified and FileObject.delete don't throw IOException. This allows us to wrap those calls
*/
@ -310,22 +329,6 @@ public class HdfsDataSegmentPuller implements URIDataPuller
@Override
public Predicate<Throwable> shouldRetryPredicate()
{
return new Predicate<Throwable>()
{
@Override
public boolean apply(Throwable input)
{
if (input == null) {
return false;
}
if (input instanceof HdfsIOException) {
return true;
}
if (input instanceof IOException) {
return true;
}
return apply(input.getCause());
}
};
return RETRY_PREDICATE;
}
}

View File

@ -22,7 +22,6 @@ package org.apache.druid.storage.hdfs;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.base.Preconditions;
import org.apache.druid.segment.loading.LoadSpec;
import org.apache.druid.segment.loading.SegmentLoadingException;
@ -30,14 +29,10 @@ import org.apache.hadoop.fs.Path;
import java.io.File;
/**
*
*/
@JsonTypeName(HdfsStorageDruidModule.SCHEME)
public class HdfsLoadSpec implements LoadSpec
{
private final Path path;
final HdfsDataSegmentPuller puller;
private final HdfsDataSegmentPuller puller;
@JsonCreator
public HdfsLoadSpec(

View File

@ -19,13 +19,14 @@
package org.apache.druid.storage.hdfs;
import com.fasterxml.jackson.core.Version;
import com.fasterxml.jackson.databind.Module;
import com.google.common.collect.ImmutableList;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.inject.Binder;
import com.google.inject.Inject;
import com.google.inject.multibindings.MapBinder;
import org.apache.druid.data.SearchableVersionedDataFinder;
import org.apache.druid.firehose.hdfs.HdfsFirehoseFactory;
import org.apache.druid.guice.Binders;
import org.apache.druid.guice.JsonConfigProvider;
import org.apache.druid.guice.LazySingleton;
@ -38,10 +39,12 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
/**
*
*/
public class HdfsStorageDruidModule implements DruidModule
{
@ -57,27 +60,11 @@ public class HdfsStorageDruidModule implements DruidModule
@Override
public List<? extends Module> getJacksonModules()
{
return ImmutableList.of(
new Module()
{
@Override
public String getModuleName()
{
return "DruidHDFSStorage-" + System.identityHashCode(this);
}
@Override
public Version version()
{
return Version.unknownVersion();
}
@Override
public void setupModule(SetupContext context)
{
context.registerSubtypes(HdfsLoadSpec.class);
}
}
return Collections.singletonList(
new SimpleModule().registerSubtypes(
new NamedType(HdfsLoadSpec.class, HdfsStorageDruidModule.SCHEME),
new NamedType(HdfsFirehoseFactory.class, HdfsStorageDruidModule.SCHEME)
)
);
}

View File

@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.firehose.hdfs;
import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.data.input.FirehoseFactory;
import org.apache.druid.storage.hdfs.HdfsStorageDruidModule;
import org.apache.hadoop.conf.Configuration;
import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
import java.util.Collections;
public class HdfsFirehoseFactoryTest
{
@Test
public void testArrayPaths() throws IOException
{
final HdfsFirehoseFactory firehoseFactory = new HdfsFirehoseFactory(
null,
Collections.singletonList("/foo/bar"),
null,
null,
null,
null,
null
);
final ObjectMapper mapper = createMapper();
final HdfsFirehoseFactory firehoseFactory2 = (HdfsFirehoseFactory)
mapper.readValue(mapper.writeValueAsString(firehoseFactory), FirehoseFactory.class);
Assert.assertEquals(
firehoseFactory.getInputPaths(),
firehoseFactory2.getInputPaths()
);
}
@Test
public void testStringPaths() throws IOException
{
final HdfsFirehoseFactory firehoseFactory = new HdfsFirehoseFactory(null, "/foo/bar", null, null, null, null, null);
final ObjectMapper mapper = createMapper();
final HdfsFirehoseFactory firehoseFactory2 = (HdfsFirehoseFactory)
mapper.readValue(mapper.writeValueAsString(firehoseFactory), FirehoseFactory.class);
Assert.assertEquals(
firehoseFactory.getInputPaths(),
firehoseFactory2.getInputPaths()
);
}
private static ObjectMapper createMapper()
{
final ObjectMapper mapper = new ObjectMapper();
new HdfsStorageDruidModule().getJacksonModules().forEach(mapper::registerModule);
mapper.setInjectableValues(new InjectableValues.Std().addValue(Configuration.class, new Configuration()));
return mapper;
}
}

View File

@ -368,6 +368,7 @@ whitelist
whitelisted
whitespace
wildcard
wildcards
xml
znode
znodes