diff --git a/docs/development/extensions-core/hdfs.md b/docs/development/extensions-core/hdfs.md index d069e569274..900d9295661 100644 --- a/docs/development/extensions-core/hdfs.md +++ b/docs/development/extensions-core/hdfs.md @@ -27,7 +27,7 @@ To use this Apache Druid (incubating) extension, make sure to [include](../../de ## Deep Storage -### Configuration +### Configuration for HDFS |Property|Possible Values|Description|Default| |--------|---------------|-----------|-------| @@ -39,12 +39,10 @@ To use this Apache Druid (incubating) extension, make sure to [include](../../de If you are using the Hadoop indexer, set your output directory to be a location on Hadoop and it will work. If you want to eagerly authenticate against a secured hadoop/hdfs cluster you must set `druid.hadoop.security.kerberos.principal` and `druid.hadoop.security.kerberos.keytab`, this is an alternative to the cron job method that runs `kinit` command periodically. -## Google Cloud Storage +### Configuration for Google Cloud Storage The HDFS extension can also be used for GCS as deep storage. -### Configuration - |Property|Possible Values|Description|Default| |--------|---------------|-----------|-------| |`druid.storage.type`|hdfs||Must be set.| @@ -53,3 +51,34 @@ The HDFS extension can also be used for GCS as deep storage. All services that need to access GCS need to have the [GCS connector jar](https://cloud.google.com/hadoop/google-cloud-storage-connector#manualinstallation) in their class path. One option is to place this jar in /lib/ and /extensions/druid-hdfs-storage/ Tested with Druid 0.9.0, Hadoop 2.7.2 and gcs-connector jar 1.4.4-hadoop2. + + + +## Native batch ingestion + +This firehose ingests events from a predefined list of S3 objects. +This firehose is _splittable_ and can be used by [native parallel index tasks](../../ingestion/native-batch.md#parallel-task). +Since each split represents an HDFS file, each worker task of `index_parallel` will read an object. + +Sample spec: + +```json +"firehose" : { + "type" : "hdfs", + "paths": "/foo/bar,/foo/baz" +} +``` + +This firehose provides caching and prefetching features. During native batch indexing, a firehose can be read twice if +`intervals` are not specified, and, in this case, caching can be useful. Prefetching is preferred when direct scanning +of files is slow. + +|Property|Description|Default| +|--------|-----------|-------| +|type|This should be `hdfs`.|none (required)| +|paths|HDFS paths. Can be either a JSON array or comma-separated string of paths. Wildcards like `*` are supported in these paths.|none (required)| +|maxCacheCapacityBytes|Maximum size of the cache space in bytes. 0 means disabling cache. Cached files are not removed until the ingestion task completes.|1073741824| +|maxFetchCapacityBytes|Maximum size of the fetch space in bytes. 0 means disabling prefetch. Prefetched files are removed immediately once they are read.|1073741824| +|prefetchTriggerBytes|Threshold to trigger prefetching s3 objects.|maxFetchCapacityBytes / 2| +|fetchTimeout|Timeout for fetching each file.|60000| +|maxFetchRetry|Maximum number of retries for fetching each file.|3| diff --git a/docs/ingestion/native-batch.md b/docs/ingestion/native-batch.md index b60121887b0..428df3eff2b 100644 --- a/docs/ingestion/native-batch.md +++ b/docs/ingestion/native-batch.md @@ -63,13 +63,14 @@ As in the single phase execution, the created segments are reported to the super To use this task, the `firehose` in `ioConfig` should be _splittable_ and `maxNumConcurrentSubTasks` should be set something larger than 1 in `tuningConfig`. Otherwise, this task runs sequentially. Here is the list of currently splittable firehoses. -- [`LocalFirehose`](#local-firehose) -- [`IngestSegmentFirehose`](#segment-firehose) -- [`HttpFirehose`](#http-firehose) -- [`StaticS3Firehose`](../development/extensions-core/s3.md#firehose) -- [`StaticAzureBlobStoreFirehose`](../development/extensions-contrib/azure.md#firehose) -- [`StaticGoogleBlobStoreFirehose`](../development/extensions-core/google.md#firehose) -- [`StaticCloudFilesFirehose`](../development/extensions-contrib/cloudfiles.md#firehose) +- [`local`](#local-firehose) +- [`ingestSegment`](#segment-firehose) +- [`http`](#http-firehose) +- [`s3`](../development/extensions-core/s3.md#firehose) +- [`hdfs`](../development/extensions-core/hdfs.md#firehose) +- [`static-azure-blobstore`](../development/extensions-contrib/azure.md#firehose) +- [`static-google-blobstore`](../development/extensions-core/google.md#firehose) +- [`static-cloudfiles`](../development/extensions-contrib/cloudfiles.md#firehose) The splittable firehose is responsible for generating _splits_. The supervisor task generates _worker task specs_ containing a split and submits worker tasks using those specs. As a result, the number of worker tasks depends on diff --git a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/firehose/hdfs/HdfsFirehoseFactory.java b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/firehose/hdfs/HdfsFirehoseFactory.java new file mode 100644 index 00000000000..8fe0eca1fad --- /dev/null +++ b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/firehose/hdfs/HdfsFirehoseFactory.java @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.firehose.hdfs; + +import com.fasterxml.jackson.annotation.JacksonInject; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Predicate; +import org.apache.druid.data.input.FiniteFirehoseFactory; +import org.apache.druid.data.input.InputSplit; +import org.apache.druid.data.input.impl.StringInputRowParser; +import org.apache.druid.data.input.impl.prefetch.PrefetchableTextFilesFirehoseFactory; +import org.apache.druid.java.util.common.IAE; +import org.apache.druid.storage.hdfs.HdfsDataSegmentPuller; +import org.apache.druid.utils.CompressionUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; +import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; + +public class HdfsFirehoseFactory extends PrefetchableTextFilesFirehoseFactory +{ + private final List inputPaths; + private final Configuration conf; + + @JsonCreator + public HdfsFirehoseFactory( + @JacksonInject Configuration conf, + @JsonProperty("paths") Object inputPaths, + @JsonProperty("maxCacheCapacityBytes") Long maxCacheCapacityBytes, + @JsonProperty("maxFetchCapacityBytes") Long maxFetchCapacityBytes, + @JsonProperty("prefetchTriggerBytes") Long prefetchTriggerBytes, + @JsonProperty("fetchTimeout") Long fetchTimeout, + @JsonProperty("maxFetchRetry") Integer maxFetchRetry + ) + { + super(maxCacheCapacityBytes, maxFetchCapacityBytes, prefetchTriggerBytes, fetchTimeout, maxFetchRetry); + this.conf = conf; + + // Coerce 'inputPaths' to List + if (inputPaths instanceof String) { + this.inputPaths = Collections.singletonList((String) inputPaths); + } else if (inputPaths instanceof List && ((List) inputPaths).stream().allMatch(x -> x instanceof String)) { + this.inputPaths = ((List) inputPaths).stream().map(x -> (String) x).collect(Collectors.toList()); + } else { + throw new IAE("'inputPaths' must be a string or an array of strings"); + } + } + + @JsonProperty("paths") + public List getInputPaths() + { + return inputPaths; + } + + @Override + protected Collection initObjects() throws IOException + { + // Use TextInputFormat to read splits. To do this, we need to make a fake Job. + final Job job = Job.getInstance(conf); + + // Add paths to the fake JobContext. + inputPaths.forEach(input -> { + try { + FileInputFormat.addInputPaths(job, input); + } + catch (IOException e) { + throw new RuntimeException(e); + } + }); + + return new TextInputFormat().getSplits(job) + .stream() + .map(split -> ((FileSplit) split).getPath()) + .collect(Collectors.toSet()); + } + + @Override + protected InputStream openObjectStream(Path path) throws IOException + { + return path.getFileSystem(conf).open(path); + } + + @Override + protected InputStream openObjectStream(Path path, long start) throws IOException + { + final FSDataInputStream in = path.getFileSystem(conf).open(path); + in.seek(start); + return in; + } + + @Override + protected InputStream wrapObjectStream(Path path, InputStream stream) throws IOException + { + return CompressionUtils.decompress(stream, path.getName()); + } + + @Override + protected Predicate getRetryCondition() + { + return HdfsDataSegmentPuller.RETRY_PREDICATE; + } + + @Override + public boolean isSplittable() + { + return true; + } + + @Override + public FiniteFirehoseFactory withSplit(InputSplit split) + { + return new HdfsFirehoseFactory( + conf, + split.get().toString(), + getMaxCacheCapacityBytes(), + getMaxFetchCapacityBytes(), + getPrefetchTriggerBytes(), + getFetchTimeout(), + getMaxFetchRetry() + ); + } + + @Override + public String toString() + { + return "HdfsFirehoseFactory{" + + "inputPaths=" + inputPaths + + '}'; + } +} diff --git a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsDataSegmentPuller.java b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsDataSegmentPuller.java index 5dd29ad5ff3..1a526c0e239 100644 --- a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsDataSegmentPuller.java +++ b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsDataSegmentPuller.java @@ -49,11 +49,30 @@ import java.io.Writer; import java.net.URI; /** + * */ public class HdfsDataSegmentPuller implements URIDataPuller { public static final int DEFAULT_RETRY_COUNT = 3; + public static final Predicate RETRY_PREDICATE = new Predicate() + { + @Override + public boolean apply(Throwable input) + { + if (input == null) { + return false; + } + if (input instanceof HdfsIOException) { + return true; + } + if (input instanceof IOException) { + return true; + } + return apply(input.getCause()); + } + }; + /** * FileObject.getLastModified and FileObject.delete don't throw IOException. This allows us to wrap those calls */ @@ -310,22 +329,6 @@ public class HdfsDataSegmentPuller implements URIDataPuller @Override public Predicate shouldRetryPredicate() { - return new Predicate() - { - @Override - public boolean apply(Throwable input) - { - if (input == null) { - return false; - } - if (input instanceof HdfsIOException) { - return true; - } - if (input instanceof IOException) { - return true; - } - return apply(input.getCause()); - } - }; + return RETRY_PREDICATE; } } diff --git a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsLoadSpec.java b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsLoadSpec.java index e169ab23de1..00ac749954b 100644 --- a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsLoadSpec.java +++ b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsLoadSpec.java @@ -22,7 +22,6 @@ package org.apache.druid.storage.hdfs; import com.fasterxml.jackson.annotation.JacksonInject; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.annotation.JsonTypeName; import com.google.common.base.Preconditions; import org.apache.druid.segment.loading.LoadSpec; import org.apache.druid.segment.loading.SegmentLoadingException; @@ -30,14 +29,10 @@ import org.apache.hadoop.fs.Path; import java.io.File; -/** - * - */ -@JsonTypeName(HdfsStorageDruidModule.SCHEME) public class HdfsLoadSpec implements LoadSpec { private final Path path; - final HdfsDataSegmentPuller puller; + private final HdfsDataSegmentPuller puller; @JsonCreator public HdfsLoadSpec( diff --git a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsStorageDruidModule.java b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsStorageDruidModule.java index 95cd12f7a81..5f5a1d0cf44 100644 --- a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsStorageDruidModule.java +++ b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsStorageDruidModule.java @@ -19,13 +19,14 @@ package org.apache.druid.storage.hdfs; -import com.fasterxml.jackson.core.Version; import com.fasterxml.jackson.databind.Module; -import com.google.common.collect.ImmutableList; +import com.fasterxml.jackson.databind.jsontype.NamedType; +import com.fasterxml.jackson.databind.module.SimpleModule; import com.google.inject.Binder; import com.google.inject.Inject; import com.google.inject.multibindings.MapBinder; import org.apache.druid.data.SearchableVersionedDataFinder; +import org.apache.druid.firehose.hdfs.HdfsFirehoseFactory; import org.apache.druid.guice.Binders; import org.apache.druid.guice.JsonConfigProvider; import org.apache.druid.guice.LazySingleton; @@ -38,10 +39,12 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import java.io.IOException; +import java.util.Collections; import java.util.List; import java.util.Properties; /** + * */ public class HdfsStorageDruidModule implements DruidModule { @@ -57,27 +60,11 @@ public class HdfsStorageDruidModule implements DruidModule @Override public List getJacksonModules() { - return ImmutableList.of( - new Module() - { - @Override - public String getModuleName() - { - return "DruidHDFSStorage-" + System.identityHashCode(this); - } - - @Override - public Version version() - { - return Version.unknownVersion(); - } - - @Override - public void setupModule(SetupContext context) - { - context.registerSubtypes(HdfsLoadSpec.class); - } - } + return Collections.singletonList( + new SimpleModule().registerSubtypes( + new NamedType(HdfsLoadSpec.class, HdfsStorageDruidModule.SCHEME), + new NamedType(HdfsFirehoseFactory.class, HdfsStorageDruidModule.SCHEME) + ) ); } diff --git a/extensions-core/hdfs-storage/src/test/java/org/apache/druid/firehose/hdfs/HdfsFirehoseFactoryTest.java b/extensions-core/hdfs-storage/src/test/java/org/apache/druid/firehose/hdfs/HdfsFirehoseFactoryTest.java new file mode 100644 index 00000000000..88daed7a821 --- /dev/null +++ b/extensions-core/hdfs-storage/src/test/java/org/apache/druid/firehose/hdfs/HdfsFirehoseFactoryTest.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.firehose.hdfs; + +import com.fasterxml.jackson.databind.InjectableValues; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.druid.data.input.FirehoseFactory; +import org.apache.druid.storage.hdfs.HdfsStorageDruidModule; +import org.apache.hadoop.conf.Configuration; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; +import java.util.Collections; + +public class HdfsFirehoseFactoryTest +{ + @Test + public void testArrayPaths() throws IOException + { + final HdfsFirehoseFactory firehoseFactory = new HdfsFirehoseFactory( + null, + Collections.singletonList("/foo/bar"), + null, + null, + null, + null, + null + ); + + final ObjectMapper mapper = createMapper(); + + final HdfsFirehoseFactory firehoseFactory2 = (HdfsFirehoseFactory) + mapper.readValue(mapper.writeValueAsString(firehoseFactory), FirehoseFactory.class); + + Assert.assertEquals( + firehoseFactory.getInputPaths(), + firehoseFactory2.getInputPaths() + ); + } + + @Test + public void testStringPaths() throws IOException + { + final HdfsFirehoseFactory firehoseFactory = new HdfsFirehoseFactory(null, "/foo/bar", null, null, null, null, null); + final ObjectMapper mapper = createMapper(); + + final HdfsFirehoseFactory firehoseFactory2 = (HdfsFirehoseFactory) + mapper.readValue(mapper.writeValueAsString(firehoseFactory), FirehoseFactory.class); + + Assert.assertEquals( + firehoseFactory.getInputPaths(), + firehoseFactory2.getInputPaths() + ); + } + + private static ObjectMapper createMapper() + { + final ObjectMapper mapper = new ObjectMapper(); + new HdfsStorageDruidModule().getJacksonModules().forEach(mapper::registerModule); + mapper.setInjectableValues(new InjectableValues.Std().addValue(Configuration.class, new Configuration())); + return mapper; + } +} diff --git a/website/.spelling b/website/.spelling index ecc56db475c..f74eb0eb69e 100644 --- a/website/.spelling +++ b/website/.spelling @@ -368,6 +368,7 @@ whitelist whitelisted whitespace wildcard +wildcards xml znode znodes