diff --git a/extensions/cloudfiles-extensions/src/main/java/io/druid/firehose/cloudfiles/CloudFilesBlob.java b/extensions/cloudfiles-extensions/src/main/java/io/druid/firehose/cloudfiles/CloudFilesBlob.java new file mode 100644 index 00000000000..1f16ce1c49b --- /dev/null +++ b/extensions/cloudfiles-extensions/src/main/java/io/druid/firehose/cloudfiles/CloudFilesBlob.java @@ -0,0 +1,65 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.firehose.cloudfiles; + +import com.fasterxml.jackson.annotation.JsonProperty; + +import javax.validation.constraints.NotNull; + +public class CloudFilesBlob +{ + @JsonProperty + @NotNull + private String container = null; + + @JsonProperty + @NotNull + private String path = null; + + @JsonProperty + @NotNull + private String region = null; + + public CloudFilesBlob() + { + } + + public CloudFilesBlob(String container, String path, String region) + { + this.container = container; + this.path = path; + this.region = region; + } + + public String getContainer() + { + return container; + } + + public String getPath() + { + return path; + } + + public String getRegion() + { + return region; + } +} diff --git a/extensions/cloudfiles-extensions/src/main/java/io/druid/firehose/cloudfiles/CloudFilesFirehoseDruidModule.java b/extensions/cloudfiles-extensions/src/main/java/io/druid/firehose/cloudfiles/CloudFilesFirehoseDruidModule.java new file mode 100644 index 00000000000..99300da94a0 --- /dev/null +++ b/extensions/cloudfiles-extensions/src/main/java/io/druid/firehose/cloudfiles/CloudFilesFirehoseDruidModule.java @@ -0,0 +1,49 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.firehose.cloudfiles; + +import java.util.List; + +import com.fasterxml.jackson.databind.Module; +import com.fasterxml.jackson.databind.jsontype.NamedType; +import com.fasterxml.jackson.databind.module.SimpleModule; +import com.google.common.collect.ImmutableList; +import com.google.inject.Binder; + +import io.druid.initialization.DruidModule; + +public class CloudFilesFirehoseDruidModule implements DruidModule +{ + + @Override + public List getJacksonModules() + { + return ImmutableList.of( + new SimpleModule().registerSubtypes( + new NamedType(StaticCloudFilesFirehoseFactory.class, "static-cloudfiles"))); + } + + @Override + public void configure(Binder arg0) + { + + } + +} diff --git a/extensions/cloudfiles-extensions/src/main/java/io/druid/firehose/cloudfiles/StaticCloudFilesFirehoseFactory.java b/extensions/cloudfiles-extensions/src/main/java/io/druid/firehose/cloudfiles/StaticCloudFilesFirehoseFactory.java new file mode 100644 index 00000000000..c3d1c0c7ad1 --- /dev/null +++ b/extensions/cloudfiles-extensions/src/main/java/io/druid/firehose/cloudfiles/StaticCloudFilesFirehoseFactory.java @@ -0,0 +1,138 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.firehose.cloudfiles; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; + +import com.fasterxml.jackson.annotation.JacksonInject; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Charsets; +import com.google.common.base.Preconditions; +import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import com.metamx.common.CompressionUtils; +import com.metamx.common.logger.Logger; +import com.metamx.common.parsers.ParseException; + +import io.druid.data.input.impl.FileIteratingFirehose; +import io.druid.data.input.Firehose; +import io.druid.data.input.FirehoseFactory; + +import org.apache.commons.io.IOUtils; +import org.apache.commons.io.LineIterator; +import org.jclouds.rackspace.cloudfiles.v1.CloudFilesApi; + +import io.druid.data.input.impl.StringInputRowParser; +import io.druid.storage.cloudfiles.CloudFilesByteSource; +import io.druid.storage.cloudfiles.CloudFilesObjectApiProxy; + +public class StaticCloudFilesFirehoseFactory implements FirehoseFactory +{ + private static final Logger log = new Logger(StaticCloudFilesFirehoseFactory.class); + + private final CloudFilesApi cloudFilesApi; + private final List blobs; + + @JsonCreator + public StaticCloudFilesFirehoseFactory( + @JacksonInject("objectApi") CloudFilesApi cloudFilesApi, + @JsonProperty("blobs") CloudFilesBlob[] blobs) + { + this.cloudFilesApi = cloudFilesApi; + this.blobs = ImmutableList.copyOf(blobs); + } + + @JsonProperty + public List getBlobs() + { + return blobs; + } + + @Override + public Firehose connect(StringInputRowParser stringInputRowParser) throws IOException, ParseException + { + Preconditions.checkNotNull(cloudFilesApi, "null cloudFilesApi"); + + final LinkedList objectQueue = Lists.newLinkedList(blobs); + + return new FileIteratingFirehose( + new Iterator() + { + + @Override + public boolean hasNext() + { + return !objectQueue.isEmpty(); + } + + @Override + public LineIterator next() + { + final CloudFilesBlob nextURI = objectQueue.poll(); + + final String region = nextURI.getRegion(); + final String container = nextURI.getContainer(); + final String path = nextURI.getPath(); + + log.info("Retrieving file from region[%s], container[%s] and path [%s]", + region, container, path); + CloudFilesObjectApiProxy objectApi = new CloudFilesObjectApiProxy( + cloudFilesApi, region, container); + final CloudFilesByteSource byteSource = new CloudFilesByteSource(objectApi, path); + + try + { + final InputStream innerInputStream = byteSource.openStream(); + final InputStream outerInputStream = path.endsWith(".gz") + ? CompressionUtils.gzipInputStream(innerInputStream) + : innerInputStream; + + return IOUtils.lineIterator( + new BufferedReader( + new InputStreamReader(outerInputStream, Charsets.UTF_8))); + } catch (IOException e) + { + log.error(e, + "Exception opening container[%s] blob[%s] from region[%s]", + container, path, region); + + throw Throwables.propagate(e); + } + } + + @Override + public void remove() + { + throw new UnsupportedOperationException(); + } + + }, + stringInputRowParser); + } + +} diff --git a/extensions/cloudfiles-extensions/src/main/resources/META-INF/services/io.druid.initialization.DruidModule b/extensions/cloudfiles-extensions/src/main/resources/META-INF/services/io.druid.initialization.DruidModule index 184f8677e4d..19e3d19adf8 100644 --- a/extensions/cloudfiles-extensions/src/main/resources/META-INF/services/io.druid.initialization.DruidModule +++ b/extensions/cloudfiles-extensions/src/main/resources/META-INF/services/io.druid.initialization.DruidModule @@ -18,3 +18,4 @@ # io.druid.storage.cloudfiles.CloudFilesStorageDruidModule +io.druid.firehose.cloudfiles.CloudFilesFirehoseDruidModule