WIP: Add Google Storage support (#2458)

Also excludes the correct artifacts from #2741
This commit is contained in:
Erik Dubbelboer 2016-11-16 16:36:45 +08:00 committed by Nishant
parent 607f64376c
commit 7d36f540e8
26 changed files with 1811 additions and 8 deletions

View File

@ -19,11 +19,11 @@ The indexing service uses several of the global configs in [Configuration](../co
#### Task Logging #### Task Logging
If you are running the indexing service in remote mode, the task logs must be stored in S3, Azure Blob Store or HDFS. If you are running the indexing service in remote mode, the task logs must be stored in S3, Azure Blob Store, Google Cloud Storage or HDFS.
|Property|Description|Default| |Property|Description|Default|
|--------|-----------|-------| |--------|-----------|-------|
|`druid.indexer.logs.type`|Choices:noop, s3, azure, hdfs, file. Where to store task logs|file| |`druid.indexer.logs.type`|Choices:noop, s3, azure, google, hdfs, file. Where to store task logs|file|
##### File Task Logs ##### File Task Logs
@ -52,6 +52,16 @@ Note: this uses the same storage account as the deep storage module for azure.
|`druid.indexer.logs.container`|The Azure Blob Store container to write logs to|none| |`druid.indexer.logs.container`|The Azure Blob Store container to write logs to|none|
|`druid.indexer.logs.prefix`|The path to prepend to logs|none| |`druid.indexer.logs.prefix`|The path to prepend to logs|none|
#### Google Cloud Storage Task Logs
Store task logs in Google Cloud Storage.
Note: this uses the same storage settings as the deep storage module for google.
|Property|Description|Default|
|--------|-----------|-------|
|`druid.indexer.logs.bucket`|The Google Cloud Storage bucket to write logs to|none|
|`druid.indexer.logs.prefix`|The path to prepend to logs|none|
##### HDFS Task Logs ##### HDFS Task Logs
Store task logs in HDFS. Store task logs in HDFS.

View File

@ -0,0 +1,55 @@
---
layout: doc_page
---
# Google Cloud Storage
To use this extension, make sure to [include](../../operations/including-extensions.html) `druid-google-extensions` extension.
## Deep Storage
[Google Cloud Storage](https://cloud.google.com/storage/) is another option for deep storage. This requires some additional druid configuration.
|Property|Description|Default|Required?|
|--------|-----------|-------|---------|
|bucket|Name of the Google Cloud bucket|N/A|yes|
|path|The path where data is located.|N/A|yes|
## Firehose
#### StaticGoogleBlobStoreFirehose
This firehose ingests events, similar to the StaticS3Firehose, but from an Google Cloud Store.
As with the S3 blobstore, it is assumed to be gzipped if the extension ends in .gz
Sample spec:
```json
"firehose" : {
"type" : "static-google-blobstore",
"blobs": [
{
"bucket": "foo",
"path": "/path/to/your/file.json"
},
{
"container": "bar",
"path": "/another/path.json"
}
]
}
```
|property|description|default|required?|
|--------|-----------|-------|---------|
|type|This should be "static-google-blobstore".|N/A|yes|
|blobs|JSON array of Google Blobs.|N/A|yes|
Google Blobs:
|property|description|default|required?|
|--------|-----------|-------|---------|
|bucket|Name of the Google Cloud bucket|N/A|yes|
|path|The path where data is located.|N/A|yes|

View File

@ -59,6 +59,7 @@ All of these community extensions can be downloaded using *pull-deps* with the c
|druid-rabbitmq|RabbitMQ firehose.|[link](../development/extensions-contrib/rabbitmq.html)| |druid-rabbitmq|RabbitMQ firehose.|[link](../development/extensions-contrib/rabbitmq.html)|
|druid-rocketmq|RocketMQ firehose.|[link](../development/extensions-contrib/rocketmq.html)| |druid-rocketmq|RocketMQ firehose.|[link](../development/extensions-contrib/rocketmq.html)|
|druid-time-min-max|Min/Max aggregator for timestamp.|[link](../development/extensions-contrib/time-min-max.html)| |druid-time-min-max|Min/Max aggregator for timestamp.|[link](../development/extensions-contrib/time-min-max.html)|
|druid-google-extensions|Google Cloud Storage deep storage.|[link](../development/extensions-contrib/google.html)|
|sqlserver-metadata-storage|Microsoft SqlServer deep storage.|[link](../development/extensions-contrib/sqlserver.html)| |sqlserver-metadata-storage|Microsoft SqlServer deep storage.|[link](../development/extensions-contrib/sqlserver.html)|
|graphite-emitter|Graphite metrics emitter|[link](../development/extensions-contrib/graphite.html)| |graphite-emitter|Graphite metrics emitter|[link](../development/extensions-contrib/graphite.html)|
|statsd-emitter|StatsD metrics emitter|[link](../development/extensions-contrib/statsd.html)| |statsd-emitter|StatsD metrics emitter|[link](../development/extensions-contrib/statsd.html)|

View File

@ -49,7 +49,7 @@ If your jar has this file, then when it is added to the classpath or as an exten
### Adding a new deep storage implementation ### Adding a new deep storage implementation
Check the `azure-storage`, `cassandra-storage`, `hdfs-storage` and `s3-extensions` modules for examples of how to do this. Check the `azure-storage`, `google-storage`, `cassandra-storage`, `hdfs-storage` and `s3-extensions` modules for examples of how to do this.
The basic idea behind the extension is that you need to add bindings for your DataSegmentPusher and DataSegmentPuller objects. The way to add them is something like (taken from HdfsStorageDruidModule) The basic idea behind the extension is that you need to add bindings for your DataSegmentPusher and DataSegmentPuller objects. The way to add them is something like (taken from HdfsStorageDruidModule)

View File

@ -0,0 +1,73 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to Metamarkets Group Inc. (Metamarkets) under one
~ or more contributor license agreements. See the NOTICE file
~ distributed with this work for additional information
~ regarding copyright ownership. Metamarkets licenses this file
~ to you under the Apache License, Version 2.0 (the
~ "License"); you may not use this file except in compliance
~ with the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing,
~ software distributed under the License is distributed on an
~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
~ KIND, either express or implied. See the License for the
~ specific language governing permissions and limitations
~ under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>io.druid.extensions.contrib</groupId>
<artifactId>druid-google-extensions</artifactId>
<name>druid-google-extensions</name>
<description>druid-google-extensions</description>
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.9.3-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>
<dependencies>
<dependency>
<groupId>io.druid</groupId>
<artifactId>druid-api</artifactId>
<version>${project.parent.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.google.apis</groupId>
<artifactId>google-api-services-storage</artifactId>
<version>v1-rev79-1.22.0</version>
</dependency>
<dependency>
<groupId>com.google.http-client</groupId>
<artifactId>google-http-client-jackson2</artifactId>
<version>1.22.0</version>
</dependency>
<!-- Tests -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.druid</groupId>
<artifactId>druid-server</artifactId>
<version>${project.parent.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.easymock</groupId>
<artifactId>easymock</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,45 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.firehose.google;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
public class GoogleBlob {
private final String bucket;
private final String path;
@JsonCreator
public GoogleBlob(@JsonProperty("bucket") String bucket, @JsonProperty("path") String path) {
this.bucket = bucket;
this.path = path;
}
@JsonProperty
public String getBucket() {
return bucket;
}
@JsonProperty
public String getPath() {
return path;
}
}

View File

@ -0,0 +1,123 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.firehose.google;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Charsets;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import io.druid.data.input.Firehose;
import io.druid.data.input.FirehoseFactory;
import io.druid.data.input.impl.FileIteratingFirehose;
import io.druid.data.input.impl.StringInputRowParser;
import io.druid.java.util.common.CompressionUtils;
import io.druid.java.util.common.logger.Logger;
import io.druid.storage.google.GoogleByteSource;
import io.druid.storage.google.GoogleStorage;
import org.apache.commons.io.IOUtils;
import org.apache.commons.io.LineIterator;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
public class StaticGoogleBlobStoreFirehoseFactory implements FirehoseFactory<StringInputRowParser> {
private static final Logger LOG = new Logger(StaticGoogleBlobStoreFirehoseFactory.class);
private final GoogleStorage storage;
private final List<GoogleBlob> blobs;
@JsonCreator
public StaticGoogleBlobStoreFirehoseFactory(
@JacksonInject GoogleStorage storage,
@JsonProperty("blobs") GoogleBlob[] blobs
) {
this.storage = storage;
this.blobs = ImmutableList.copyOf(blobs);
}
@JsonProperty
public List<GoogleBlob> getBlobs() {
return blobs;
}
@Override
public Firehose connect(StringInputRowParser stringInputRowParser) throws IOException {
Preconditions.checkNotNull(storage, "null storage");
final LinkedList<GoogleBlob> objectQueue = Lists.newLinkedList(blobs);
return new FileIteratingFirehose(
new Iterator<LineIterator>() {
@Override
public boolean hasNext() {
return !objectQueue.isEmpty();
}
@Override
public LineIterator next() {
final GoogleBlob nextURI = objectQueue.poll();
final String bucket = nextURI.getBucket();
final String path = nextURI.getPath().startsWith("/")
? nextURI.getPath().substring(1)
: nextURI.getPath();
try {
final InputStream innerInputStream = new GoogleByteSource(storage, bucket, path).openStream();
final InputStream outerInputStream = path.endsWith(".gz")
? CompressionUtils.gzipInputStream(innerInputStream)
: innerInputStream;
return IOUtils.lineIterator(
new BufferedReader(
new InputStreamReader(outerInputStream, Charsets.UTF_8)
)
);
} catch (Exception e) {
LOG.error(e,
"Exception opening bucket[%s] blob[%s]",
bucket,
path
);
throw Throwables.propagate(e);
}
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
},
stringInputRowParser
);
}
}

View File

@ -0,0 +1,54 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.storage.google;
import com.fasterxml.jackson.annotation.JsonProperty;
import javax.validation.constraints.NotNull;
public class GoogleAccountConfig
{
@JsonProperty
@NotNull
private String bucket;
@JsonProperty
private String prefix;
public void setBucket(String bucket)
{
this.bucket = bucket;
}
public void setPrefix(String prefix)
{
this.prefix = prefix;
}
public String getBucket()
{
return bucket;
}
public String getPrefix()
{
return prefix;
}
}

View File

@ -0,0 +1,45 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.storage.google;
import com.google.common.io.ByteSource;
import java.io.IOException;
import java.io.InputStream;
public class GoogleByteSource extends ByteSource
{
private final GoogleStorage storage;
private final String bucket;
private final String path;
public GoogleByteSource(final GoogleStorage storage, final String bucket, final String path)
{
this.storage = storage;
this.bucket = bucket;
this.path = path;
}
@Override
public InputStream openStream() throws IOException
{
return storage.get(bucket, path);
}
}

View File

@ -0,0 +1,68 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.storage.google;
import com.google.inject.Inject;
import io.druid.java.util.common.MapUtils;
import io.druid.java.util.common.logger.Logger;
import io.druid.segment.loading.DataSegmentKiller;
import io.druid.segment.loading.SegmentLoadingException;
import io.druid.timeline.DataSegment;
import java.io.IOException;
import java.util.Map;
public class GoogleDataSegmentKiller implements DataSegmentKiller
{
private static final Logger LOG = new Logger(GoogleDataSegmentKiller.class);
private final GoogleStorage storage;
@Inject
public GoogleDataSegmentKiller(final GoogleStorage storage)
{
this.storage = storage;
}
@Override
public void kill(DataSegment segment) throws SegmentLoadingException
{
LOG.info("Killing segment [%s]", segment);
Map<String, Object> loadSpec = segment.getLoadSpec();
final String bucket = MapUtils.getString(loadSpec, "bucket");
final String indexPath = MapUtils.getString(loadSpec, "path");
final String descriptorPath = indexPath.substring(0, indexPath.lastIndexOf("/")) + "/descriptor.json";
try {
storage.delete(bucket, indexPath);
storage.delete(bucket, descriptorPath);
}
catch(IOException e) {
throw new SegmentLoadingException(e, "Couldn't kill segment[%s]: [%s]", segment.getIdentifier(), e.getMessage());
}
}
@Override
public void killAll() throws IOException
{
throw new UnsupportedOperationException("not implemented");
}
}

View File

@ -0,0 +1,145 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.storage.google;
import com.google.common.base.Predicate;
import com.google.inject.Inject;
import io.druid.java.util.common.CompressionUtils;
import io.druid.java.util.common.FileUtils;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.MapUtils;
import io.druid.java.util.common.logger.Logger;
import io.druid.segment.loading.DataSegmentPuller;
import io.druid.segment.loading.SegmentLoadingException;
import io.druid.segment.loading.URIDataPuller;
import io.druid.timeline.DataSegment;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.util.Map;
public class GoogleDataSegmentPuller implements DataSegmentPuller, URIDataPuller
{
private static final Logger LOG = new Logger(GoogleDataSegmentPuller.class);
private final GoogleStorage storage;
@Inject
public GoogleDataSegmentPuller(final GoogleStorage storage)
{
this.storage = storage;
}
@Override
public void getSegmentFiles(final DataSegment segment, final File outDir) throws SegmentLoadingException
{
final Map<String, Object> loadSpec = segment.getLoadSpec();
final String bucket = MapUtils.getString(loadSpec, "bucket");
final String path = MapUtils.getString(loadSpec, "path");
getSegmentFiles(bucket, path, outDir);
}
public FileUtils.FileCopyResult getSegmentFiles(final String bucket, final String path, File outDir)
throws SegmentLoadingException
{
LOG.info("Pulling index at path[%s] to outDir[%s]", bucket, path, outDir.getAbsolutePath());
prepareOutDir(outDir);
try {
final GoogleByteSource byteSource = new GoogleByteSource(storage, bucket, path);
final FileUtils.FileCopyResult result = CompressionUtils.unzip(
byteSource,
outDir,
GoogleUtils.GOOGLE_RETRY,
true
);
LOG.info("Loaded %d bytes from [%s] to [%s]", result.size(), path, outDir.getAbsolutePath());
return result;
}
catch (Exception e) {
try {
org.apache.commons.io.FileUtils.deleteDirectory(outDir);
}
catch (IOException ioe) {
LOG.warn(
ioe, "Failed to remove output directory [%s] for segment pulled from [%s]",
outDir.getAbsolutePath(), path
);
}
throw new SegmentLoadingException(e, e.getMessage());
}
}
// Needs to be public for the tests.
public void prepareOutDir(final File outDir) throws ISE
{
if (!outDir.exists()) {
outDir.mkdirs();
}
if (!outDir.isDirectory()) {
throw new ISE("outDir[%s] must be a directory.", outDir);
}
}
@Override
public InputStream getInputStream(URI uri) throws IOException
{
String path = uri.getPath();
if (path.startsWith("/")) {
path = path.substring(1);
}
return storage.get(uri.getHost(), path);
}
@Override
public String getVersion(URI uri) throws IOException
{
String path = uri.getPath();
if (path.startsWith("/")) {
path = path.substring(1);
}
return storage.version(uri.getHost(), path);
}
@Override
public Predicate<Throwable> shouldRetryPredicate()
{
return new Predicate<Throwable>()
{
@Override
public boolean apply(Throwable e)
{
if (e == null) {
return false;
}
if (GoogleUtils.GOOGLE_RETRY.apply(e)) {
return true;
}
// Look all the way down the cause chain, just in case something wraps it deep.
return apply(e.getCause());
}
};
}
}

View File

@ -0,0 +1,153 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.storage.google;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.api.client.http.InputStreamContent;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.inject.Inject;
import io.druid.java.util.common.CompressionUtils;
import io.druid.java.util.common.logger.Logger;
import io.druid.segment.SegmentUtils;
import io.druid.segment.loading.DataSegmentPusher;
import io.druid.segment.loading.DataSegmentPusherUtil;
import io.druid.timeline.DataSegment;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
public class GoogleDataSegmentPusher implements DataSegmentPusher
{
private static final Logger LOG = new Logger(GoogleDataSegmentPusher.class);
private final GoogleStorage storage;
private final GoogleAccountConfig config;
private final ObjectMapper jsonMapper;
@Inject
public GoogleDataSegmentPusher(
final GoogleStorage storage,
final GoogleAccountConfig config,
final ObjectMapper jsonMapper
)
{
this.storage = storage;
this.config = config;
this.jsonMapper = jsonMapper;
LOG.info("Configured Google as deep storage");
}
@Deprecated
@Override
public String getPathForHadoop(String dataSource)
{
return getPathForHadoop();
}
@Override
public String getPathForHadoop()
{
return String.format("gs://%s/%s", config.getBucket(), config.getPrefix());
}
public File createDescriptorFile(final ObjectMapper jsonMapper, final DataSegment segment)
throws IOException
{
File descriptorFile = File.createTempFile("descriptor", ".json");
try (FileOutputStream stream = new FileOutputStream(descriptorFile)) {
stream.write(jsonMapper.writeValueAsBytes(segment));
}
return descriptorFile;
}
public void insert(final File file, final String contentType, final String path) throws IOException {
LOG.info("Inserting [%s] to [%s]", file, path);
FileInputStream fileSteam = new FileInputStream(file);
InputStreamContent mediaContent = new InputStreamContent(contentType, fileSteam);
mediaContent.setLength(file.length());
storage.insert(config.getBucket(), path, mediaContent);
}
@Override
public DataSegment push(final File indexFilesDir, final DataSegment segment) throws IOException
{
LOG.info("Uploading [%s] to Google.", indexFilesDir);
final int version = SegmentUtils.getVersionFromDir(indexFilesDir);
File indexFile = null;
File descriptorFile = null;
try {
indexFile = File.createTempFile("index", ".zip");
final long indexSize = CompressionUtils.zip(indexFilesDir, indexFile);
final String storageDir = DataSegmentPusherUtil.getStorageDir(segment);
final String indexPath = buildPath(storageDir + "/" + "index.zip");
final String descriptorPath = buildPath(storageDir + "/" + "descriptor.json");
final DataSegment outSegment = segment
.withSize(indexSize)
.withLoadSpec(
ImmutableMap.<String, Object>of(
"type", GoogleStorageDruidModule.SCHEME,
"bucket", config.getBucket(),
"path", indexPath
)
)
.withBinaryVersion(version);
descriptorFile = createDescriptorFile(jsonMapper, outSegment);
insert(indexFile, "application/zip", indexPath);
insert(descriptorFile, "application/json", descriptorPath);
return outSegment;
}
catch (Exception e) {
throw Throwables.propagate(e);
} finally {
if (indexFile != null) {
LOG.info("Deleting file [%s]", indexFile);
indexFile.delete();
}
if (descriptorFile != null) {
LOG.info("Deleting file [%s]", descriptorFile);
descriptorFile.delete();
}
}
}
public String buildPath(final String path)
{
if (config.getPrefix() != "") {
return config.getPrefix() + "/" + path;
} else {
return path;
}
}
}

View File

@ -0,0 +1,62 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.storage.google;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.base.Preconditions;
import io.druid.segment.loading.LoadSpec;
import io.druid.segment.loading.SegmentLoadingException;
import java.io.File;
@JsonTypeName(GoogleStorageDruidModule.SCHEME)
public class GoogleLoadSpec implements LoadSpec
{
@JsonProperty
private final String bucket;
@JsonProperty
private final String path;
private final GoogleDataSegmentPuller puller;
@JsonCreator
public GoogleLoadSpec(
@JsonProperty("bucket") String bucket,
@JsonProperty("path") String path,
@JacksonInject GoogleDataSegmentPuller puller
)
{
Preconditions.checkNotNull(bucket);
Preconditions.checkNotNull(path);
this.bucket = bucket;
this.path = path;
this.puller = puller;
}
@Override
public LoadSpecResult loadSegment(File file) throws SegmentLoadingException
{
return new LoadSpecResult(puller.getSegmentFiles(bucket, path, file).size());
}
}

View File

@ -0,0 +1,75 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.storage.google;
import com.google.api.client.http.AbstractInputStreamContent;
import com.google.api.services.storage.Storage;
import java.io.IOException;
import java.io.InputStream;
public class GoogleStorage
{
private final Storage storage;
public GoogleStorage(Storage storage)
{
this.storage = storage;
}
public void insert(final String bucket, final String path, AbstractInputStreamContent mediaContent) throws IOException
{
Storage.Objects.Insert insertObject = storage.objects().insert(bucket, null, mediaContent);
insertObject.setName(path);
insertObject.getMediaHttpUploader().setDirectUploadEnabled(false);
insertObject.execute();
}
public InputStream get(final String bucket, final String path) throws IOException
{
Storage.Objects.Get getObject = storage.objects().get(bucket, path);
getObject.getMediaHttpDownloader().setDirectDownloadEnabled(false);
return getObject.executeMediaAsInputStream();
}
public void delete(final String bucket, final String path) throws IOException
{
storage.objects().delete(bucket, path).execute();
}
public boolean exists(final String bucket, final String path)
{
try {
return storage.objects().get(bucket, path).executeUsingHead().isSuccessStatusCode();
} catch (Exception e) {
return false;
}
}
public long size(final String bucket, final String path) throws IOException
{
return storage.objects().get(bucket, path).execute().getSize().longValue();
}
public String version(final String bucket, final String path) throws IOException
{
return storage.objects().get(bucket, path).execute().getEtag();
}
}

View File

@ -0,0 +1,122 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.storage.google;
import com.fasterxml.jackson.core.Version;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.api.client.googleapis.auth.oauth2.GoogleCredential;
import com.google.api.client.googleapis.javanet.GoogleNetHttpTransport;
import com.google.api.client.http.HttpTransport;
import com.google.api.client.json.JsonFactory;
import com.google.api.client.json.jackson2.JacksonFactory;
import com.google.api.services.storage.Storage;
import com.google.api.services.storage.StorageScopes;
import com.google.common.collect.ImmutableList;
import com.google.inject.Binder;
import com.google.inject.Provides;
import io.druid.firehose.google.StaticGoogleBlobStoreFirehoseFactory;
import io.druid.guice.Binders;
import io.druid.guice.JsonConfigProvider;
import io.druid.guice.LazySingleton;
import io.druid.initialization.DruidModule;
import io.druid.java.util.common.logger.Logger;
import java.io.IOException;
import java.security.GeneralSecurityException;
import java.util.List;
public class GoogleStorageDruidModule implements DruidModule
{
private static final Logger LOG = new Logger(GoogleStorageDruidModule.class);
public static final String SCHEME = "google";
private static final String APPLICATION_NAME = "druid-google-extensions";
@Override
public List<? extends Module> getJacksonModules()
{
LOG.info("Getting jackson modules...");
return ImmutableList.of(
new Module()
{
@Override
public String getModuleName()
{
return "Google-" + System.identityHashCode(this);
}
@Override
public Version version()
{
return Version.unknownVersion();
}
@Override
public void setupModule(SetupContext context)
{
context.registerSubtypes(GoogleLoadSpec.class);
}
},
new SimpleModule().registerSubtypes(
new NamedType(StaticGoogleBlobStoreFirehoseFactory.class, "static-google-blobstore"))
);
}
@Override
public void configure(Binder binder)
{
LOG.info("Configuring GoogleStorageDruidModule...");
JsonConfigProvider.bind(binder, "druid.google", GoogleAccountConfig.class);
Binders.dataSegmentPullerBinder(binder).addBinding(SCHEME).to(GoogleDataSegmentPuller.class)
.in(LazySingleton.class);
Binders.dataSegmentPusherBinder(binder).addBinding(SCHEME).to(GoogleDataSegmentPusher.class)
.in(LazySingleton.class);
Binders.dataSegmentKillerBinder(binder).addBinding(SCHEME).to(GoogleDataSegmentKiller.class)
.in(LazySingleton.class);
Binders.taskLogsBinder(binder).addBinding(SCHEME).to(GoogleTaskLogs.class);
JsonConfigProvider.bind(binder, "druid.indexer.logs", GoogleTaskLogsConfig.class);
binder.bind(GoogleTaskLogs.class).in(LazySingleton.class);
}
@Provides
@LazySingleton
public GoogleStorage getGoogleStorage(final GoogleAccountConfig config)
throws IOException, GeneralSecurityException
{
LOG.info("Building Cloud Storage Client...");
HttpTransport httpTransport = GoogleNetHttpTransport.newTrustedTransport();
JsonFactory jsonFactory = JacksonFactory.getDefaultInstance();
GoogleCredential credential = GoogleCredential.getApplicationDefault(httpTransport, jsonFactory);
if (credential.createScopedRequired()) {
credential = credential.createScoped(StorageScopes.all());
}
Storage storage = new Storage.Builder(httpTransport, jsonFactory, credential).setApplicationName(APPLICATION_NAME).build();
return new GoogleStorage(storage);
}
}

View File

@ -0,0 +1,109 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.storage.google;
import com.google.api.client.http.InputStreamContent;
import com.google.common.base.Optional;
import com.google.common.io.ByteSource;
import com.google.inject.Inject;
import io.druid.java.util.common.logger.Logger;
import io.druid.tasklogs.TaskLogs;
import java.io.File;
import java.io.FileInputStream;
import java.io.InputStream;
import java.io.IOException;
public class GoogleTaskLogs implements TaskLogs {
private static final Logger LOG = new Logger(GoogleTaskLogs.class);
private final GoogleTaskLogsConfig config;
private final GoogleStorage storage;
@Inject
public GoogleTaskLogs(GoogleTaskLogsConfig config, GoogleStorage storage) {
this.config = config;
this.storage = storage;
}
@Override
public void pushTaskLog(final String taskid, final File logFile) throws IOException {
final String taskKey = getTaskLogKey(taskid);
LOG.info("Pushing task log %s to: %s", logFile, taskKey);
FileInputStream fileSteam = new FileInputStream(logFile);
InputStreamContent mediaContent = new InputStreamContent("text/plain", fileSteam);
mediaContent.setLength(logFile.length());
storage.insert(config.getBucket(), taskKey, mediaContent);
}
@Override
public Optional<ByteSource> streamTaskLog(final String taskid, final long offset) throws IOException {
final String taskKey = getTaskLogKey(taskid);
try {
if (!storage.exists(config.getBucket(), taskKey)) {
return Optional.absent();
}
final long length = storage.size(config.getBucket(), taskKey);
return Optional.<ByteSource>of(
new ByteSource() {
@Override
public InputStream openStream() throws IOException {
try {
final long start;
if (offset > 0 && offset < length) {
start = offset;
} else if (offset < 0 && (-1 * offset) < length) {
start = length + offset;
} else {
start = 0;
}
InputStream stream = new GoogleByteSource(storage, config.getBucket(), taskKey).openStream();
stream.skip(start);
return stream;
} catch(Exception e) {
throw new IOException(e);
}
}
}
);
} catch (IOException e) {
throw new IOException(String.format("Failed to stream logs from: %s", taskKey), e);
}
}
private String getTaskLogKey(String taskid) {
return config.getPrefix() + "/" + taskid.replaceAll(":", "_");
}
@Override
public void killAll() throws IOException
{
throw new UnsupportedOperationException("not implemented");
}
}

View File

@ -0,0 +1,47 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.storage.google;
import com.fasterxml.jackson.annotation.JsonProperty;
import javax.validation.constraints.NotNull;
public class GoogleTaskLogsConfig {
@JsonProperty
@NotNull
private final String bucket;
@JsonProperty
@NotNull
private final String prefix;
public GoogleTaskLogsConfig(@JsonProperty("bucket") String bucket, @JsonProperty("prefix") String prefix) {
this.bucket = bucket;
this.prefix = prefix;
}
public String getBucket() {
return bucket;
}
public String getPrefix() {
return prefix;
}
}

View File

@ -0,0 +1,34 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.storage.google;
import com.google.common.base.Predicate;
public class GoogleUtils
{
public static final Predicate<Throwable> GOOGLE_RETRY = new Predicate<Throwable>()
{
@Override
public boolean apply(Throwable e)
{
return false;
}
};
}

View File

@ -0,0 +1,20 @@
#
# Licensed to Metamarkets Group Inc. (Metamarkets) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. Metamarkets licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
io.druid.storage.google.GoogleStorageDruidModule

View File

@ -0,0 +1,68 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.storage.google;
import org.easymock.EasyMockSupport;
import org.junit.Test;
import java.io.IOException;
import java.io.InputStream;
import static org.easymock.EasyMock.expect;
public class GoogleByteSourceTest extends EasyMockSupport
{
@Test
public void openStreamTest() throws IOException
{
final String bucket = "bucket";
final String path = "/path/to/file";
GoogleStorage storage = createMock(GoogleStorage.class);
InputStream stream = createMock(InputStream.class);
expect(storage.get(bucket, path)).andReturn(stream);
replayAll();
GoogleByteSource byteSource = new GoogleByteSource(storage, bucket, path);
byteSource.openStream();
verifyAll();
}
@Test(expected = IOException.class)
public void openStreamWithRecoverableErrorTest() throws IOException
{
final String bucket = "bucket";
final String path = "/path/to/file";
GoogleStorage storage = createMock(GoogleStorage.class);
expect(storage.get(bucket, path)).andThrow(new IOException(""));
replayAll();
GoogleByteSource byteSource = new GoogleByteSource(storage, bucket, path);
byteSource.openStream();
verifyAll();
}
}

View File

@ -0,0 +1,93 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.storage.google;
import com.google.common.collect.ImmutableMap;
import io.druid.segment.loading.SegmentLoadingException;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.NoneShardSpec;
import org.easymock.EasyMockSupport;
import org.joda.time.Interval;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import static org.easymock.EasyMock.expectLastCall;
public class GoogleDataSegmentKillerTest extends EasyMockSupport
{
private static final String bucket = "bucket";
private static final String indexPath = "test/2015-04-12T00:00:00.000Z_2015-04-13T00:00:00.000Z/1/0/index.zip";
private static final DataSegment dataSegment = new DataSegment(
"test",
new Interval("2015-04-12/2015-04-13"),
"1",
ImmutableMap.<String, Object>of("bucket", bucket, "path", indexPath),
null,
null,
new NoneShardSpec(),
0,
1
);
private GoogleStorage storage;
@Before
public void before()
{
storage = createMock(GoogleStorage.class);
}
@Test
public void killTest() throws SegmentLoadingException, IOException
{
final String descriptorPath = indexPath.substring(0, indexPath.lastIndexOf("/")) + "/descriptor.json";
storage.delete(bucket, indexPath);
expectLastCall();
storage.delete(bucket, descriptorPath);
expectLastCall();
replayAll();
GoogleDataSegmentKiller killer = new GoogleDataSegmentKiller(storage);
killer.kill(dataSegment);
verifyAll();
}
@Test(expected = SegmentLoadingException.class)
public void killWithErrorTest() throws SegmentLoadingException, IOException
{
storage.delete(bucket, indexPath);
expectLastCall().andThrow(new IOException(""));
replayAll();
GoogleDataSegmentKiller killer = new GoogleDataSegmentKiller(storage);
killer.kill(dataSegment);
verifyAll();
}
}

View File

@ -0,0 +1,110 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.storage.google;
import com.google.common.collect.ImmutableMap;
import io.druid.java.util.common.FileUtils;
import io.druid.segment.loading.SegmentLoadingException;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.NoneShardSpec;
import org.easymock.EasyMockSupport;
import org.joda.time.Interval;
import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import static org.easymock.EasyMock.expect;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
public class GoogleDataSegmentPullerTest extends EasyMockSupport
{
private static final String bucket = "bucket";
private static final String path = "/path/to/storage/index.zip";
private static final DataSegment dataSegment = new DataSegment(
"test",
new Interval("2015-04-12/2015-04-13"),
"1",
ImmutableMap.<String, Object>of("bucket", bucket, "path", path),
null,
null,
NoneShardSpec.instance(),
0,
1
);
@Test(expected = SegmentLoadingException.class)
public void testDeleteOutputDirectoryWhenErrorIsRaisedPullingSegmentFiles()
throws IOException, SegmentLoadingException
{
final File outDir = Files.createTempDirectory("druid").toFile();
outDir.deleteOnExit();
GoogleStorage storage = createMock(GoogleStorage.class);
expect(storage.get(bucket, path)).andThrow(new IOException(""));
replayAll();
GoogleDataSegmentPuller puller = new GoogleDataSegmentPuller(storage);
puller.getSegmentFiles(bucket, path, outDir);
assertFalse(outDir.exists());
verifyAll();
}
@Test
public void getSegmentFilesTest() throws SegmentLoadingException
{
final File outDir = new File("");
final FileUtils.FileCopyResult result = createMock(FileUtils.FileCopyResult.class);
GoogleStorage storage = createMock(GoogleStorage.class);
GoogleDataSegmentPuller puller = createMockBuilder(GoogleDataSegmentPuller.class).withConstructor(
storage
).addMockedMethod("getSegmentFiles", String.class, String.class, File.class).createMock();
expect(puller.getSegmentFiles(bucket, path, outDir)).andReturn(result);
replayAll();
puller.getSegmentFiles(dataSegment, outDir);
verifyAll();
}
@Test
public void prepareOutDirTest() throws IOException
{
GoogleStorage storage = createMock(GoogleStorage.class);
File outDir = Files.createTempDirectory("druid").toFile();
try {
GoogleDataSegmentPuller puller = new GoogleDataSegmentPuller(storage);
puller.prepareOutDir(outDir);
assertTrue(outDir.exists());
}
finally {
outDir.delete();
}
}
}

View File

@ -0,0 +1,135 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.storage.google;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.io.Files;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.segment.loading.DataSegmentPusherUtil;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.NoneShardSpec;
import org.easymock.EasyMock;
import org.easymock.EasyMockSupport;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import java.io.File;
import static org.easymock.EasyMock.expectLastCall;
public class GoogleDataSegmentPusherTest extends EasyMockSupport
{
@Rule
public final TemporaryFolder tempFolder = new TemporaryFolder();
private static final String bucket = "bucket";
private static final String prefix = "prefix";
private static final String path = "prefix/test/2015-04-12T00:00:00.000Z_2015-04-13T00:00:00.000Z/1/0/index.zip";
private static final DataSegment dataSegment = new DataSegment(
"test",
new Interval("2015-04-12/2015-04-13"),
"1",
ImmutableMap.<String, Object>of("bucket", bucket, "path", path),
null,
null,
new NoneShardSpec(),
0,
1
);
private GoogleStorage storage;
private GoogleAccountConfig googleAccountConfig;
private ObjectMapper jsonMapper;
@Before
public void before()
{
storage = createMock(GoogleStorage.class);
googleAccountConfig = new GoogleAccountConfig();
googleAccountConfig.setBucket(bucket);
googleAccountConfig.setPrefix(prefix);
jsonMapper = new DefaultObjectMapper();
}
@Test
public void testPush() throws Exception
{
// Create a mock segment on disk
File tmp = tempFolder.newFile("version.bin");
final byte[] data = new byte[]{0x0, 0x0, 0x0, 0x1};
Files.write(data, tmp);
final long size = data.length;
DataSegment segmentToPush = new DataSegment(
"foo",
new Interval("2015/2016"),
"0",
Maps.<String, Object>newHashMap(),
Lists.<String>newArrayList(),
Lists.<String>newArrayList(),
new NoneShardSpec(),
0,
size
);
GoogleDataSegmentPusher pusher = createMockBuilder(
GoogleDataSegmentPusher.class
).withConstructor(
storage,
googleAccountConfig,
jsonMapper
).addMockedMethod("insert", File.class, String.class, String.class).createMock();
final String storageDir = DataSegmentPusherUtil.getStorageDir(segmentToPush);
final String indexPath = prefix + "/" + storageDir + "/" + "index.zip";
final String descriptorPath = prefix + "/" + storageDir + "/" + "descriptor.json";
pusher.insert(EasyMock.anyObject(File.class), EasyMock.eq("application/zip"), EasyMock.eq(indexPath));
expectLastCall();
pusher.insert(EasyMock.anyObject(File.class), EasyMock.eq("application/json"), EasyMock.eq(descriptorPath));
expectLastCall();
replayAll();
DataSegment segment = pusher.push(tempFolder.getRoot(), segmentToPush);
Assert.assertEquals(segmentToPush.getSize(), segment.getSize());
Assert.assertEquals(segmentToPush, segment);
Assert.assertEquals(ImmutableMap.of(
"type",
GoogleStorageDruidModule.SCHEME,
"bucket",
bucket,
"path",
indexPath
), segment.getLoadSpec());
verifyAll();
}
}

View File

@ -0,0 +1,147 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.storage.google;
import com.google.api.client.http.InputStreamContent;
import com.google.common.base.Charsets;
import com.google.common.base.Optional;
import com.google.common.io.ByteSource;
import com.google.common.io.Files;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.easymock.EasyMock;
import org.easymock.EasyMockSupport;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.BufferedWriter;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileWriter;
import java.io.StringWriter;
import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.expectLastCall;
public class GoogleTaskLogsTest extends EasyMockSupport {
private static final String bucket = "test";
private static final String prefix = "test/log";
private static final String taskid = "taskid";
private GoogleStorage storage;
private GoogleTaskLogs googleTaskLogs;
@Before
public void before() {
storage = createMock(GoogleStorage.class);
GoogleTaskLogsConfig config = new GoogleTaskLogsConfig(bucket, prefix);
googleTaskLogs = new GoogleTaskLogs(config, storage);
}
@Test
public void testPushTaskLog() throws Exception {
final File tmpDir = Files.createTempDir();
try {
final File logFile = new File(tmpDir, "log");
BufferedWriter output = new BufferedWriter(new FileWriter(logFile));
output.write("test");
output.close();
storage.insert(EasyMock.eq(bucket), EasyMock.eq(prefix + "/" + taskid), EasyMock.anyObject(InputStreamContent.class));
expectLastCall();
replayAll();
googleTaskLogs.pushTaskLog(taskid, logFile);
verifyAll();
} finally {
FileUtils.deleteDirectory(tmpDir);
}
}
@Test
public void testStreamTaskLogWithoutOffset() throws Exception {
final String testLog = "hello this is a log";
final String logPath = prefix + "/" + taskid;
expect(storage.exists(bucket, logPath)).andReturn(true);
expect(storage.size(bucket, logPath)).andReturn((long) testLog.length());
expect(storage.get(bucket, logPath)).andReturn(
new ByteArrayInputStream(testLog.getBytes(Charsets.UTF_8))
);
replayAll();
final Optional<ByteSource> byteSource = googleTaskLogs.streamTaskLog(taskid, 0);
final StringWriter writer = new StringWriter();
IOUtils.copy(byteSource.get().openStream(), writer, "UTF-8");
Assert.assertEquals(writer.toString(), testLog);
verifyAll();
}
@Test
public void testStreamTaskLogWithPositiveOffset() throws Exception {
final String testLog = "hello this is a log";
final String logPath = prefix + "/" + taskid;
expect(storage.exists(bucket, logPath)).andReturn(true);
expect(storage.size(bucket, logPath)).andReturn((long) testLog.length());
expect(storage.get(bucket, logPath)).andReturn(
new ByteArrayInputStream(testLog.getBytes(Charsets.UTF_8))
);
replayAll();
final Optional<ByteSource> byteSource = googleTaskLogs.streamTaskLog(taskid, 5);
final StringWriter writer = new StringWriter();
IOUtils.copy(byteSource.get().openStream(), writer, "UTF-8");
Assert.assertEquals(writer.toString(), testLog.substring(5));
verifyAll();
}
@Test
public void testStreamTaskLogWithNegative() throws Exception {
final String testLog = "hello this is a log";
final String logPath = prefix + "/" + taskid;
expect(storage.exists(bucket, logPath)).andReturn(true);
expect(storage.size(bucket, logPath)).andReturn((long) testLog.length());
expect(storage.get(bucket, logPath)).andReturn(
new ByteArrayInputStream(testLog.getBytes(Charsets.UTF_8))
);
replayAll();
final Optional<ByteSource> byteSource = googleTaskLogs.streamTaskLog(taskid, -3);
final StringWriter writer = new StringWriter();
IOUtils.copy(byteSource.get().openStream(), writer, "UTF-8");
Assert.assertEquals(writer.toString(), testLog.substring(testLog.length() - 3));
verifyAll();
}
}

View File

@ -420,12 +420,18 @@ public class JobHelper
switch (outputFS.getScheme()) { switch (outputFS.getScheme()) {
case "hdfs": case "hdfs":
case "viewfs": case "viewfs":
case "gs":
loadSpec = ImmutableMap.<String, Object>of( loadSpec = ImmutableMap.<String, Object>of(
"type", "hdfs", "type", "hdfs",
"path", indexOutURI.toString() "path", indexOutURI.toString()
); );
break; break;
case "gs":
loadSpec = ImmutableMap.<String, Object>of(
"type", "google",
"bucket", indexOutURI.getHost(),
"path", indexOutURI.getPath().substring(1) // remove the leading "/"
);
break;
case "s3": case "s3":
case "s3n": case "s3n":
loadSpec = ImmutableMap.<String, Object>of( loadSpec = ImmutableMap.<String, Object>of(
@ -730,6 +736,8 @@ public class JobHelper
segmentLocURI = URI.create(String.format("s3n://%s/%s", loadSpec.get("bucket"), loadSpec.get("key"))); segmentLocURI = URI.create(String.format("s3n://%s/%s", loadSpec.get("bucket"), loadSpec.get("key")));
} else if ("hdfs".equals(type)) { } else if ("hdfs".equals(type)) {
segmentLocURI = URI.create(loadSpec.get("path").toString()); segmentLocURI = URI.create(loadSpec.get("path").toString());
} else if ("google".equals(type)) {
segmentLocURI = URI.create(String.format("gs://%s/%s", loadSpec.get("bucket"), loadSpec.get("path")));
} else if ("local".equals(type)) { } else if ("local".equals(type)) {
try { try {
segmentLocURI = new URI("file", null, loadSpec.get("path").toString(), null, null); segmentLocURI = new URI("file", null, loadSpec.get("path").toString(), null, null);

View File

@ -112,6 +112,7 @@
<module>extensions-contrib/statsd-emitter</module> <module>extensions-contrib/statsd-emitter</module>
<module>extensions-contrib/orc-extensions</module> <module>extensions-contrib/orc-extensions</module>
<module>extensions-contrib/time-min-max</module> <module>extensions-contrib/time-min-max</module>
<module>extensions-contrib/google-extensions</module>
</modules> </modules>
<dependencyManagement> <dependencyManagement>
@ -632,12 +633,12 @@
<artifactId>google-http-client</artifactId> <artifactId>google-http-client</artifactId>
</exclusion> </exclusion>
<exclusion> <exclusion>
<groupId>org.apache.httpcomponents</groupId> <groupId>com.google.http-client</groupId>
<artifactId>httpclient</artifactId> <artifactId>google-http-client-jackson2</artifactId>
</exclusion> </exclusion>
<exclusion> <exclusion>
<groupId>org.apache.httpcomponents</groupId> <groupId>com.fasterxml.jackson.core</groupId>
<artifactId>httpcore</artifactId> <artifactId>jackson-databind</artifactId>
</exclusion> </exclusion>
</exclusions> </exclusions>
</dependency> </dependency>