From 11b7b1bea6788bcce6a720f3038623d2ea901fe5 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Fri, 26 May 2017 06:13:04 +0900 Subject: [PATCH] Add support for HttpFirehose (#4297) * Add support for HttpFirehose * Fix document * Add documents --- .../PrefetchableTextFilesFirehoseFactory.java | 49 +++++++++--- .../development/extensions-contrib/azure.md | 4 +- .../extensions-contrib/cloudfiles.md | 4 +- .../development/extensions-contrib/google.md | 4 +- .../content/development/extensions-core/s3.md | 4 +- docs/content/ingestion/firehose.md | 22 ++++++ .../java/io/druid/guice/FirehoseModule.java | 2 + .../firehose/HttpFirehoseFactory.java | 74 +++++++++++++++++++ 8 files changed, 146 insertions(+), 17 deletions(-) create mode 100644 server/src/main/java/io/druid/segment/realtime/firehose/HttpFirehoseFactory.java diff --git a/api/src/main/java/io/druid/data/input/impl/PrefetchableTextFilesFirehoseFactory.java b/api/src/main/java/io/druid/data/input/impl/PrefetchableTextFilesFirehoseFactory.java index 4baacbcf246..b5bc768870f 100644 --- a/api/src/main/java/io/druid/data/input/impl/PrefetchableTextFilesFirehoseFactory.java +++ b/api/src/main/java/io/druid/data/input/impl/PrefetchableTextFilesFirehoseFactory.java @@ -57,20 +57,51 @@ import java.util.concurrent.atomic.AtomicLong; * by this class provides three key functionalities. * * * * This implementation can be useful when the cost for reading input objects is large as reading from AWS S3 because * IndexTask can read the whole data twice for determining partition specs and generating segments if the intervals of * GranularitySpec is not specified. + * + * Prefetching can be turned on/off by setting {@link #maxFetchCapacityBytes}. Depending on prefetching is enabled or + * disabled, the behavior of the firehose is different like below. + * + *
    + *
  1. + * If prefetch is enabled, PrefetchableTextFilesFirehose can fetch input objects in background. + *
  2. + *
  3. When next() is called, it first checks that there are already fetched files in local storage. + *
      + *
    1. + * If exists, it simply chooses a fetched file and returns a {@link LineIterator} reading that file. + *
    2. + *
    3. + * If there is no fetched files in local storage but some objects are still remained to be read, the firehose + * fetches one of input objects in background immediately. If an IOException occurs while downloading the object, + * it retries up to the maximum retry count. Finally, the firehose returns a {@link LineIterator} only when the + * download operation is successfully finished. + *
    4. + *
    + *
  4. + *
  5. + * If prefetch is disabled, the firehose returns a {@link LineIterator} which directly reads the stream opened by + * {@link #openObjectStream}. If there is an IOException, it will throw it and the read will fail. + *
  6. + *
*/ public abstract class PrefetchableTextFilesFirehoseFactory extends AbstractTextFilesFirehoseFactory diff --git a/docs/content/development/extensions-contrib/azure.md b/docs/content/development/extensions-contrib/azure.md index cb470967765..3a3946accf4 100644 --- a/docs/content/development/extensions-contrib/azure.md +++ b/docs/content/development/extensions-contrib/azure.md @@ -58,8 +58,8 @@ shardSpecs are not specified, and, in this case, caching can be useful. Prefetch |--------|-----------|-------|---------| |type|This should be `static-azure-blobstore`.|N/A|yes| |blobs|JSON array of [Azure blobs](https://msdn.microsoft.com/en-us/library/azure/ee691964.aspx).|N/A|yes| -|maxCacheCapacityBytes|Maximum size of the cache space in bytes. 0 means disabling cache.|1073741824|no| -|maxFetchCapacityBytes|Maximum size of the fetch space in bytes. 0 means disabling prefetch.|1073741824|no| +|maxCacheCapacityBytes|Maximum size of the cache space in bytes. 0 means disabling cache. Cached files are not removed until the ingestion task completes.|1073741824|no| +|maxFetchCapacityBytes|Maximum size of the fetch space in bytes. 0 means disabling prefetch. Prefetched files are removed immediately once they are read.|1073741824|no| |prefetchTriggerBytes|Threshold to trigger prefetching Azure objects.|maxFetchCapacityBytes / 2|no| |fetchTimeout|Timeout for fetching an Azure object.|60000|no| |maxFetchRetry|Maximum retry for fetching an Azure object.|3|no| diff --git a/docs/content/development/extensions-contrib/cloudfiles.md b/docs/content/development/extensions-contrib/cloudfiles.md index d11a9a0ec82..954179d5fbd 100644 --- a/docs/content/development/extensions-contrib/cloudfiles.md +++ b/docs/content/development/extensions-contrib/cloudfiles.md @@ -59,8 +59,8 @@ shardSpecs are not specified, and, in this case, caching can be useful. Prefetch |type|This should be `static-cloudfiles`.|N/A|yes| |blobs|JSON array of Cloud Files blobs.|N/A|yes| |maxCacheCapacityBytes|Maximum size of the cache space in bytes. 0 means disabling cache.|1073741824|no| -|maxFetchCapacityBytes|Maximum size of the fetch space in bytes. 0 means disabling prefetch.|1073741824|no| -|prefetchTriggerBytes|Threshold to trigger prefetching Cloud Files objects.|maxFetchCapacityBytes / 2|no| +|maxCacheCapacityBytes|Maximum size of the cache space in bytes. 0 means disabling cache. Cached files are not removed until the ingestion task completes.|1073741824|no| +|maxFetchCapacityBytes|Maximum size of the fetch space in bytes. 0 means disabling prefetch. Prefetched files are removed immediately once they are read.|1073741824|no| |fetchTimeout|Timeout for fetching a Cloud Files object.|60000|no| |maxFetchRetry|Maximum retry for fetching a Cloud Files object.|3|no| diff --git a/docs/content/development/extensions-contrib/google.md b/docs/content/development/extensions-contrib/google.md index 0852b886ceb..7aa76472882 100644 --- a/docs/content/development/extensions-contrib/google.md +++ b/docs/content/development/extensions-contrib/google.md @@ -48,8 +48,8 @@ shardSpecs are not specified, and, in this case, caching can be useful. Prefetch |--------|-----------|-------|---------| |type|This should be `static-google-blobstore`.|N/A|yes| |blobs|JSON array of Google Blobs.|N/A|yes| -|maxCacheCapacityBytes|Maximum size of the cache space in bytes. 0 means disabling cache.|1073741824|no| -|maxFetchCapacityBytes|Maximum size of the fetch space in bytes. 0 means disabling prefetch.|1073741824|no| +|maxCacheCapacityBytes|Maximum size of the cache space in bytes. 0 means disabling cache. Cached files are not removed until the ingestion task completes.|1073741824|no| +|maxFetchCapacityBytes|Maximum size of the fetch space in bytes. 0 means disabling prefetch. Prefetched files are removed immediately once they are read.|1073741824|no| |prefetchTriggerBytes|Threshold to trigger prefetching Google Blobs.|maxFetchCapacityBytes / 2|no| |fetchTimeout|Timeout for fetching a Google Blob.|60000|no| |maxFetchRetry|Maximum retry for fetching a Google Blob.|3|no| diff --git a/docs/content/development/extensions-core/s3.md b/docs/content/development/extensions-core/s3.md index 6a804f95997..f24a406946b 100644 --- a/docs/content/development/extensions-core/s3.md +++ b/docs/content/development/extensions-core/s3.md @@ -40,8 +40,8 @@ shardSpecs are not specified, and, in this case, caching can be useful. Prefetch |type|This should be `static-s3`.|N/A|yes| |uris|JSON array of URIs where s3 files to be ingested are located.|N/A|`uris` or `prefixes` must be set| |prefixes|JSON array of URI prefixes for the locations of s3 files to be ingested.|N/A|`uris` or `prefixes` must be set| -|maxCacheCapacityBytes|Maximum size of the cache space in bytes. 0 means disabling cache.|1073741824|no| -|maxFetchCapacityBytes|Maximum size of the fetch space in bytes. 0 means disabling prefetch.|1073741824|no| +|maxCacheCapacityBytes|Maximum size of the cache space in bytes. 0 means disabling cache. Cached files are not removed until the ingestion task completes.|1073741824|no| +|maxFetchCapacityBytes|Maximum size of the fetch space in bytes. 0 means disabling prefetch. Prefetched files are removed immediately once they are read.|1073741824|no| |prefetchTriggerBytes|Threshold to trigger prefetching s3 objects.|maxFetchCapacityBytes / 2|no| |fetchTimeout|Timeout for fetching an s3 object.|60000|no| |maxFetchRetry|Maximum retry for fetching an s3 object.|3|no| \ No newline at end of file diff --git a/docs/content/ingestion/firehose.md b/docs/content/ingestion/firehose.md index 91d594ec8c1..aec3ea15f62 100644 --- a/docs/content/ingestion/firehose.md +++ b/docs/content/ingestion/firehose.md @@ -36,6 +36,28 @@ A sample local firehose spec is shown below: |filter|A wildcard filter for files. See [here](http://commons.apache.org/proper/commons-io/apidocs/org/apache/commons/io/filefilter/WildcardFileFilter.html) for more information.|yes| |baseDir|directory to search recursively for files to be ingested. |yes| +#### HttpFirehose + +This Firehose can be used to read the data from remote sites via HTTP. +A sample http firehose spec is shown below: + +```json +{ + "type" : "http", + "uris" : ["http://example.com/uri1", "http://example2.com/uri2"] +} +``` + +The below configurations can be optionally used for tuning the firehose performance. + +|property|description|default| +|--------|-----------|-------| +|maxCacheCapacityBytes|Maximum size of the cache space in bytes. 0 means disabling cache. Cached files are not removed until the ingestion task completes.|1073741824| +|maxFetchCapacityBytes|Maximum size of the fetch space in bytes. 0 means disabling prefetch. Prefetched files are removed immediately once they are read.|1073741824| +|prefetchTriggerBytes|Threshold to trigger prefetching http objects.|maxFetchCapacityBytes / 2| +|fetchTimeout|Timeout for fetching a http object.|60000| +|maxFetchRetry|Maximum retry for fetching a http object.|3| + #### IngestSegmentFirehose This Firehose can be used to read the data from existing druid segments. diff --git a/server/src/main/java/io/druid/guice/FirehoseModule.java b/server/src/main/java/io/druid/guice/FirehoseModule.java index dd8a605af96..4678edbfe4d 100644 --- a/server/src/main/java/io/druid/guice/FirehoseModule.java +++ b/server/src/main/java/io/druid/guice/FirehoseModule.java @@ -28,6 +28,7 @@ import io.druid.segment.realtime.firehose.ClippedFirehoseFactory; import io.druid.segment.realtime.firehose.CombiningFirehoseFactory; import io.druid.segment.realtime.firehose.EventReceiverFirehoseFactory; import io.druid.segment.realtime.firehose.FixedCountFirehoseFactory; +import io.druid.segment.realtime.firehose.HttpFirehoseFactory; import io.druid.segment.realtime.firehose.IrcFirehoseFactory; import io.druid.segment.realtime.firehose.LocalFirehoseFactory; import io.druid.segment.realtime.firehose.TimedShutoffFirehoseFactory; @@ -54,6 +55,7 @@ public class FirehoseModule implements DruidModule new NamedType(TimedShutoffFirehoseFactory.class, "timed"), new NamedType(IrcFirehoseFactory.class, "irc"), new NamedType(LocalFirehoseFactory.class, "local"), + new NamedType(HttpFirehoseFactory.class, "http"), new NamedType(EventReceiverFirehoseFactory.class, "receiver"), new NamedType(CombiningFirehoseFactory.class, "combining"), new NamedType(FixedCountFirehoseFactory.class, "fixedCount") diff --git a/server/src/main/java/io/druid/segment/realtime/firehose/HttpFirehoseFactory.java b/server/src/main/java/io/druid/segment/realtime/firehose/HttpFirehoseFactory.java new file mode 100644 index 00000000000..4a604ae7f93 --- /dev/null +++ b/server/src/main/java/io/druid/segment/realtime/firehose/HttpFirehoseFactory.java @@ -0,0 +1,74 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.segment.realtime.firehose; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import io.druid.data.input.impl.PrefetchableTextFilesFirehoseFactory; +import io.druid.java.util.common.CompressionUtils; + +import java.io.IOException; +import java.io.InputStream; +import java.net.URI; +import java.util.Collection; +import java.util.List; + +public class HttpFirehoseFactory extends PrefetchableTextFilesFirehoseFactory +{ + private final List uris; + + @JsonCreator + public HttpFirehoseFactory( + @JsonProperty("uris") List uris, + @JsonProperty("maxCacheCapacityBytes") Long maxCacheCapacityBytes, + @JsonProperty("maxFetchCapacityBytes") Long maxFetchCapacityBytes, + @JsonProperty("prefetchTriggerBytes") Long prefetchTriggerBytes, + @JsonProperty("fetchTimeout") Long fetchTimeout, + @JsonProperty("maxFetchRetry") Integer maxFetchRetry + ) + { + super(maxCacheCapacityBytes, maxFetchCapacityBytes, prefetchTriggerBytes, fetchTimeout, maxFetchRetry); + this.uris = uris; + } + + @JsonProperty + public List getUris() + { + return uris; + } + + @Override + protected Collection initObjects() + { + return uris; + } + + @Override + protected InputStream openObjectStream(URI object) throws IOException + { + return object.toURL().openConnection().getInputStream(); + } + + @Override + protected InputStream wrapObjectStream(URI object, InputStream stream) throws IOException + { + return object.getPath().endsWith(".gz") ? CompressionUtils.gzipInputStream(stream) : stream; + } +}