Merge pull request #1719 from se7entyse7en/feature-rackspace-cloud-files-deep-storage

Add Rackspace Cloud Files Deep Storage Extension
This commit is contained in:
Fangjin Yang 2015-11-04 11:13:25 -08:00
commit 1cbc514eb5
17 changed files with 1137 additions and 0 deletions

View File

@ -91,6 +91,8 @@
<argument>io.druid.extensions:druid-rabbitmq</argument>
<argument>-c</argument>
<argument>io.druid.extensions:druid-s3-extensions</argument>
<argument>-c</argument>
<argument>io.druid.extensions:druid-cloudfiles-extensions</argument>
</arguments>
</configuration>
</execution>

View File

@ -69,3 +69,21 @@ Please note that this is a community contributed module and does not support Cas
|`druid.azure.maxTries`||Number of tries before cancel an Azure operation.|3|
Please note that this is a community contributed module. See [Azure Services](http://azure.microsoft.com/en-us/pricing/free-trial/) for more information.
### Rackspace
[Rackspace Cloud Files](http://www.rackspace.com/cloud/files/) is another option for deep storage. This requires some additional druid configuration.
|Property|Possible Values|Description|Default|
|--------|---------------|-----------|-------|
|`druid.storage.type`|cloudfiles||Must be set.|
|`druid.storage.region`||Rackspace Cloud Files region.|Must be set.|
|`druid.storage.container`||Rackspace Cloud Files container name.|Must be set.|
|`druid.storage.basePath`||Rackspace Cloud Files base path to use in the container.|Must be set.|
|`druid.storage.operationMaxRetries`||Number of tries before cancel a Rackspace operation.|10|
|`druid.cloudfiles.userName`||Rackspace Cloud username|Must be set.|
|`druid.cloudfiles.apiKey`||Rackspace Cloud api key.|Must be set.|
|`druid.cloudfiles.provider`|rackspace-cloudfiles-us,rackspace-cloudfiles-uk|Name of the provider depending on the region.|Must be set.|
|`druid.cloudfiles.useServiceNet`|true,false|Whether to use the internal service net.|true|
Please note that this is a community contributed module.

View File

@ -0,0 +1,106 @@
<?xml version="1.0"?>
<!--
~ Licensed to Metamarkets Group Inc. (Metamarkets) under one
~ or more contributor license agreements. See the NOTICE file
~ distributed with this work for additional information
~ regarding copyright ownership. Metamarkets licenses this file
~ to you under the Apache License, Version 2.0 (the
~ "License"); you may not use this file except in compliance
~ with the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing,
~ software distributed under the License is distributed on an
~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
~ KIND, either express or implied. See the License for the
~ specific language governing permissions and limitations
~ under the License.
-->
<project
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<modelVersion>4.0.0</modelVersion>
<groupId>io.druid.extensions</groupId>
<artifactId>druid-cloudfiles-extensions</artifactId>
<name>druid-cloudfiles-extensions</name>
<description>druid-cloudfiles-extensions</description>
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.9.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<jclouds.version>1.9.1</jclouds.version>
<!-- The version of guice is forced to 3.0 since JClouds 1.9.1 does not
work with guice 4.0-beta -->
<guice.version>3.0</guice.version>
</properties>
<dependencies>
<dependency>
<groupId>io.druid</groupId>
<artifactId>druid-api</artifactId>
</dependency>
<dependency>
<groupId>com.google.inject</groupId>
<artifactId>guice</artifactId>
<version>${guice.version}</version>
<!--$NO-MVN-MAN-VER$ -->
</dependency>
<dependency>
<groupId>com.google.inject.extensions</groupId>
<artifactId>guice-servlet</artifactId>
<version>${guice.version}</version>
<!--$NO-MVN-MAN-VER$ -->
</dependency>
<dependency>
<groupId>com.google.inject.extensions</groupId>
<artifactId>guice-multibindings</artifactId>
<version>${guice.version}</version>
<!--$NO-MVN-MAN-VER$ -->
</dependency>
<!-- jclouds dependencies -->
<dependency>
<groupId>org.apache.jclouds.driver</groupId>
<artifactId>jclouds-slf4j</artifactId>
<version>${jclouds.version}</version>
</dependency>
<dependency>
<groupId>org.apache.jclouds.driver</groupId>
<artifactId>jclouds-sshj</artifactId>
<version>${jclouds.version}</version>
</dependency>
<!-- Rackspace US dependencies -->
<dependency>
<groupId>org.apache.jclouds.provider</groupId>
<artifactId>rackspace-cloudfiles-us</artifactId>
<version>${jclouds.version}</version>
</dependency>
<!-- Rackspace UK dependencies -->
<dependency>
<groupId>org.apache.jclouds.provider</groupId>
<artifactId>rackspace-cloudfiles-uk</artifactId>
<version>${jclouds.version}</version>
</dependency>
<!-- Tests -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.easymock</groupId>
<artifactId>easymock</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,63 @@
/*
* Druid - a distributed column store.
* Copyright 2012 - 2015 Metamarkets Group Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.druid.storage.cloudfiles;
import com.fasterxml.jackson.annotation.JsonProperty;
import javax.validation.constraints.NotNull;
public class CloudFilesAccountConfig
{
@JsonProperty
@NotNull
private String provider;
@JsonProperty
@NotNull
private String userName;
@JsonProperty
@NotNull
private String apiKey;
@JsonProperty
@NotNull
private boolean useServiceNet = true;
public String getProvider()
{
return provider;
}
public String getUserName()
{
return userName;
}
public String getApiKey()
{
return apiKey;
}
public boolean getUseServiceNet()
{
return useServiceNet;
}
}

View File

@ -0,0 +1,64 @@
/*
* Druid - a distributed column store.
* Copyright 2012 - 2015 Metamarkets Group Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.druid.storage.cloudfiles;
import com.google.common.base.Throwables;
import com.google.common.io.ByteSource;
import org.jclouds.io.Payload;
import java.io.IOException;
import java.io.InputStream;
public class CloudFilesByteSource extends ByteSource
{
final private CloudFilesObjectApiProxy objectApi;
final private String path;
private Payload payload;
public CloudFilesByteSource(CloudFilesObjectApiProxy objectApi, String path)
{
this.objectApi = objectApi;
this.path = path;
this.payload = null;
}
public void closeStream() throws IOException
{
if (payload != null) {
payload.close();
payload = null;
}
}
@Override
public InputStream openStream() throws IOException
{
payload = (payload == null) ? objectApi.get(path).getPayload() : payload;
try {
return payload.openStream();
}
catch (IOException e) {
if (CloudFilesUtils.CLOUDFILESRETRY.apply(e)) {
throw new IOException("Recoverable exception", e);
}
throw Throwables.propagate(e);
}
}
}

View File

@ -0,0 +1,110 @@
/*
* Druid - a distributed column store.
* Copyright 2012 - 2015 Metamarkets Group Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.druid.storage.cloudfiles;
import com.google.inject.Inject;
import com.metamx.common.CompressionUtils;
import com.metamx.common.FileUtils;
import com.metamx.common.ISE;
import com.metamx.common.MapUtils;
import com.metamx.common.logger.Logger;
import io.druid.segment.loading.DataSegmentPuller;
import io.druid.segment.loading.SegmentLoadingException;
import io.druid.timeline.DataSegment;
import org.jclouds.rackspace.cloudfiles.v1.CloudFilesApi;
import java.io.File;
import java.io.IOException;
import java.util.Map;
/**
* A data segment puller that also handles URI data pulls.
*/
public class CloudFilesDataSegmentPuller implements DataSegmentPuller
{
private static final Logger log = new Logger(CloudFilesDataSegmentPuller.class);
private final CloudFilesApi cloudFilesApi;
@Inject
public CloudFilesDataSegmentPuller(final CloudFilesApi cloudFilesApi)
{
this.cloudFilesApi = cloudFilesApi;
}
@Override
public void getSegmentFiles(final DataSegment segment, final File outDir) throws SegmentLoadingException
{
final Map<String, Object> loadSpec = segment.getLoadSpec();
final String region = MapUtils.getString(loadSpec, "region");
final String container = MapUtils.getString(loadSpec, "container");
final String path = MapUtils.getString(loadSpec, "path");
log.info("Pulling index at path[%s] to outDir[%s]", path, outDir);
prepareOutDir(outDir);
getSegmentFiles(region, container, path, outDir);
}
public FileUtils.FileCopyResult getSegmentFiles(String region, String container, String path, File outDir)
throws SegmentLoadingException
{
CloudFilesObjectApiProxy objectApi = new CloudFilesObjectApiProxy(cloudFilesApi, region, container);
final CloudFilesByteSource byteSource = new CloudFilesByteSource(objectApi, path);
try {
final FileUtils.FileCopyResult result = CompressionUtils.unzip(
byteSource, outDir,
CloudFilesUtils.CLOUDFILESRETRY, true
);
log.info("Loaded %d bytes from [%s] to [%s]", result.size(), path, outDir.getAbsolutePath());
return result;
}
catch (Exception e) {
try {
org.apache.commons.io.FileUtils.deleteDirectory(outDir);
}
catch (IOException ioe) {
log.warn(
ioe, "Failed to remove output directory [%s] for segment pulled from [%s]",
outDir.getAbsolutePath(), path
);
}
throw new SegmentLoadingException(e, e.getMessage());
}
finally {
try {
byteSource.closeStream();
}
catch (IOException ioe) {
log.warn(ioe, "Failed to close payload for segmente pulled from [%s]", path);
}
}
}
private void prepareOutDir(final File outDir) throws ISE
{
if (!outDir.exists()) {
outDir.mkdirs();
}
if (!outDir.isDirectory()) {
throw new ISE("outDir[%s] must be a directory.", outDir);
}
}
}

View File

@ -0,0 +1,131 @@
/*
* Druid - a distributed column store.
* Copyright 2012 - 2015 Metamarkets Group Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.druid.storage.cloudfiles;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.inject.Inject;
import com.metamx.common.CompressionUtils;
import com.metamx.common.logger.Logger;
import io.druid.segment.SegmentUtils;
import io.druid.segment.loading.DataSegmentPusher;
import io.druid.timeline.DataSegment;
import org.jclouds.rackspace.cloudfiles.v1.CloudFilesApi;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.concurrent.Callable;
public class CloudFilesDataSegmentPusher implements DataSegmentPusher
{
private static final Logger log = new Logger(CloudFilesDataSegmentPusher.class);
private final CloudFilesObjectApiProxy objectApi;
private final CloudFilesDataSegmentPusherConfig config;
private final ObjectMapper jsonMapper;
@Inject
public CloudFilesDataSegmentPusher(
final CloudFilesApi cloudFilesApi,
final CloudFilesDataSegmentPusherConfig config, final ObjectMapper jsonMapper
)
{
this.config = config;
String region = this.config.getRegion();
String container = this.config.getContainer();
this.objectApi = new CloudFilesObjectApiProxy(cloudFilesApi, region, container);
this.jsonMapper = jsonMapper;
log.info("Configured CloudFiles as deep storage");
}
@Override
public String getPathForHadoop(final String dataSource)
{
return null;
}
@Override
public DataSegment push(final File indexFilesDir, final DataSegment inSegment) throws IOException
{
final String segmentPath = CloudFilesUtils.buildCloudFilesPath(this.config.getBasePath(), inSegment);
final File zipOutFile = File.createTempFile("druid", "index.zip");
final File descriptorFile = File.createTempFile("descriptor", ".json");
log.info("Copying segment[%s] to CloudFiles at location[%s]", inSegment.getIdentifier(), segmentPath);
try {
return CloudFilesUtils.retryCloudFilesOperation(
new Callable<DataSegment>()
{
@Override
public DataSegment call() throws Exception
{
CompressionUtils.zip(indexFilesDir, zipOutFile);
CloudFilesObject segmentData = new CloudFilesObject(
segmentPath, zipOutFile, objectApi.getRegion(),
objectApi.getContainer()
);
log.info("Pushing %s.", segmentData.getPath());
objectApi.put(segmentData);
try (FileOutputStream stream = new FileOutputStream(descriptorFile)) {
stream.write(jsonMapper.writeValueAsBytes(inSegment));
}
CloudFilesObject descriptorData = new CloudFilesObject(
segmentPath, descriptorFile,
objectApi.getRegion(), objectApi.getContainer()
);
log.info("Pushing %s.", descriptorData.getPath());
objectApi.put(descriptorData);
final DataSegment outSegment = inSegment
.withSize(segmentData.getFile().length())
.withLoadSpec(
ImmutableMap.<String, Object>of(
"type",
CloudFilesStorageDruidModule.SCHEME,
"region",
segmentData.getRegion(),
"container",
segmentData.getContainer(),
"path",
segmentData.getPath()
)
)
.withBinaryVersion(SegmentUtils.getVersionFromDir(indexFilesDir));
return outSegment;
}
}, this.config.getOperationMaxRetries()
);
}
catch (Exception e) {
throw Throwables.propagate(e);
}
finally {
log.info("Deleting zipped index File[%s]", zipOutFile);
zipOutFile.delete();
log.info("Deleting descriptor file[%s]", descriptorFile);
descriptorFile.delete();
}
}
}

View File

@ -0,0 +1,68 @@
/*
* Druid - a distributed column store.
* Copyright 2012 - 2015 Metamarkets Group Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.druid.storage.cloudfiles;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import javax.validation.constraints.NotNull;
/**
*/
public class CloudFilesDataSegmentPusherConfig
{
@JsonProperty
@NotNull
private String region;
@JsonProperty
@NotNull
private String container;
@JsonProperty
@NotNull
private String basePath;
@JsonProperty
private int operationMaxRetries = 10;
public String getRegion()
{
Preconditions.checkNotNull(region);
return region;
}
public String getContainer()
{
Preconditions.checkNotNull(container);
return container;
}
public String getBasePath()
{
Preconditions.checkNotNull(basePath);
return basePath;
}
public int getOperationMaxRetries()
{
return operationMaxRetries;
}
}

View File

@ -0,0 +1,65 @@
/*
* Druid - a distributed column store.
* Copyright 2012 - 2015 Metamarkets Group Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.druid.storage.cloudfiles;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.base.Preconditions;
import io.druid.segment.loading.LoadSpec;
import io.druid.segment.loading.SegmentLoadingException;
import java.io.File;
@JsonTypeName(CloudFilesStorageDruidModule.SCHEME)
public class CloudFilesLoadSpec implements LoadSpec
{
@JsonProperty
private final String region;
@JsonProperty
private final String container;
@JsonProperty
private final String path;
private final CloudFilesDataSegmentPuller puller;
@JsonCreator
public CloudFilesLoadSpec(
@JsonProperty("region") String region, @JsonProperty("container") String container,
@JsonProperty("path") String path, @JacksonInject CloudFilesDataSegmentPuller puller
)
{
Preconditions.checkNotNull(region);
Preconditions.checkNotNull(container);
Preconditions.checkNotNull(path);
this.container = container;
this.region = region;
this.path = path;
this.puller = puller;
}
@Override
public LoadSpecResult loadSegment(File file) throws SegmentLoadingException
{
return new LoadSpecResult(puller.getSegmentFiles(region, container, path, file).size());
}
}

View File

@ -0,0 +1,88 @@
/*
* Druid - a distributed column store.
* Copyright 2012 - 2015 Metamarkets Group Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.druid.storage.cloudfiles;
import com.google.common.io.ByteSource;
import com.google.common.io.Files;
import org.jclouds.io.Payload;
import org.jclouds.io.Payloads;
import java.io.File;
public class CloudFilesObject
{
private File file;
private Payload payload;
private String path;
private final String region;
private final String container;
public CloudFilesObject(final String basePath, final File file, final String region, final String container)
{
this(region, container);
this.file = file;
ByteSource byteSource = Files.asByteSource(file);
this.payload = Payloads.newByteSourcePayload(byteSource);
this.path = CloudFilesUtils.buildCloudFilesPath(basePath, file.getName());
}
public CloudFilesObject(final Payload payload, final String region, final String container, final String path)
{
this(region, container, path);
this.payload = payload;
}
private CloudFilesObject(final String region, final String container, final String path)
{
this(region, container);
this.path = path;
}
private CloudFilesObject(final String region, final String container)
{
this.region = region;
this.container = container;
}
public File getFile()
{
return file;
}
public String getRegion()
{
return region;
}
public String getContainer()
{
return container;
}
public String getPath()
{
return path;
}
public Payload getPayload()
{
return payload;
}
}

View File

@ -0,0 +1,59 @@
/*
* Druid - a distributed column store.
* Copyright 2012 - 2015 Metamarkets Group Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.druid.storage.cloudfiles;
import org.jclouds.io.Payload;
import org.jclouds.openstack.swift.v1.domain.SwiftObject;
import org.jclouds.openstack.swift.v1.features.ObjectApi;
import org.jclouds.rackspace.cloudfiles.v1.CloudFilesApi;
public class CloudFilesObjectApiProxy
{
private final ObjectApi objectApi;
private final String region;
private final String container;
public CloudFilesObjectApiProxy(final CloudFilesApi cloudFilesApi, final String region, final String container)
{
this.region = region;
this.container = container;
this.objectApi = cloudFilesApi.getObjectApi(region, container);
}
public String getRegion()
{
return region;
}
public String getContainer()
{
return container;
}
public String put(final CloudFilesObject cloudFilesObject)
{
return objectApi.put(cloudFilesObject.getPath(), cloudFilesObject.getPayload());
}
public CloudFilesObject get(String path)
{
SwiftObject swiftObject = objectApi.get(path);
Payload payload = swiftObject.getPayload();
return new CloudFilesObject(payload, this.region, this.container, path);
}
}

View File

@ -0,0 +1,117 @@
/*
* Druid - a distributed column store.
* Copyright 2012 - 2015 Metamarkets Group Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.druid.storage.cloudfiles;
import com.fasterxml.jackson.core.Version;
import com.fasterxml.jackson.databind.Module;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.inject.Binder;
import com.google.inject.Provides;
import com.metamx.common.logger.Logger;
import io.druid.guice.Binders;
import io.druid.guice.JsonConfigProvider;
import io.druid.guice.LazySingleton;
import io.druid.initialization.DruidModule;
import org.jclouds.ContextBuilder;
import org.jclouds.logging.slf4j.config.SLF4JLoggingModule;
import org.jclouds.openstack.v2_0.config.InternalUrlModule;
import org.jclouds.osgi.ProviderRegistry;
import org.jclouds.rackspace.cloudfiles.uk.CloudFilesUKProviderMetadata;
import org.jclouds.rackspace.cloudfiles.us.CloudFilesUSProviderMetadata;
import org.jclouds.rackspace.cloudfiles.v1.CloudFilesApi;
import java.util.List;
/**
*/
public class CloudFilesStorageDruidModule implements DruidModule
{
private static final Logger log = new Logger(CloudFilesStorageDruidModule.class);
public static final String SCHEME = "cloudfiles";
@Override
public List<? extends Module> getJacksonModules()
{
log.info("Getting jackson modules...");
return ImmutableList.of(
new Module()
{
@Override
public String getModuleName()
{
return "CloudFiles-" + System.identityHashCode(this);
}
@Override
public Version version()
{
return Version.unknownVersion();
}
@Override
public void setupModule(SetupContext context)
{
context.registerSubtypes(CloudFilesLoadSpec.class);
}
}
);
}
@Override
public void configure(Binder binder)
{
log.info("Configuring CloudFilesStorageDruidModule...");
JsonConfigProvider.bind(binder, "druid.storage", CloudFilesDataSegmentPusherConfig.class);
JsonConfigProvider.bind(binder, "druid.cloudfiles", CloudFilesAccountConfig.class);
Binders.dataSegmentPullerBinder(binder).addBinding(SCHEME).to(CloudFilesDataSegmentPuller.class)
.in(LazySingleton.class);
Binders.dataSegmentPusherBinder(binder).addBinding(SCHEME).to(CloudFilesDataSegmentPusher.class)
.in(LazySingleton.class);
log.info("Configured CloudFilesStorageDruidModule.");
}
@Provides
@LazySingleton
public CloudFilesApi getCloudFilesApi(final CloudFilesAccountConfig config)
{
log.info("Building Cloud Files Api...");
Iterable<com.google.inject.Module> modules = null;
if (config.getUseServiceNet()) {
log.info("Configuring Cloud Files Api to use the internal service network...");
modules = ImmutableSet.<com.google.inject.Module>of(new SLF4JLoggingModule(), new InternalUrlModule());
} else {
log.info("Configuring Cloud Files Api to use the public network...");
modules = ImmutableSet.<com.google.inject.Module>of(new SLF4JLoggingModule());
}
ProviderRegistry.registerProvider(CloudFilesUSProviderMetadata.builder().build());
ProviderRegistry.registerProvider(CloudFilesUKProviderMetadata.builder().build());
ContextBuilder cb = ContextBuilder.newBuilder(config.getProvider())
.credentials(config.getUserName(), config.getApiKey()).modules(modules);
CloudFilesApi cfa = cb.buildApi(CloudFilesApi.class);
log.info("Cloud Files Api built.");
return cfa;
}
}

View File

@ -0,0 +1,75 @@
/*
* Druid - a distributed column store.
* Copyright 2012 - 2015 Metamarkets Group Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.druid.storage.cloudfiles;
import com.google.common.base.Predicate;
import com.metamx.common.RetryUtils;
import io.druid.segment.loading.DataSegmentPusherUtil;
import io.druid.timeline.DataSegment;
import java.io.IOException;
import java.util.concurrent.Callable;
/**
*
*/
public class CloudFilesUtils
{
public static final Predicate<Throwable> CLOUDFILESRETRY = new Predicate<Throwable>()
{
@Override
public boolean apply(Throwable e)
{
if (e == null) {
return false;
} else if (e instanceof IOException) {
return true;
} else {
return apply(e.getCause());
}
}
};
/**
* Retries CloudFiles operations that fail due to io-related exceptions.
*/
public static <T> T retryCloudFilesOperation(Callable<T> f, final int maxTries) throws Exception
{
return RetryUtils.retry(f, CLOUDFILESRETRY, maxTries);
}
public static String buildCloudFilesPath(String basePath, final String fileName)
{
String path = fileName;
if (!basePath.isEmpty()) {
int lastSlashIndex = basePath.lastIndexOf("/");
if (lastSlashIndex != -1) {
basePath = basePath.substring(0, lastSlashIndex);
}
path = basePath + "/" + fileName;
}
return path;
}
public static String buildCloudFilesPath(String basePath, final DataSegment segment)
{
return buildCloudFilesPath(basePath, DataSegmentPusherUtil.getStorageDir(segment));
}
}

View File

@ -0,0 +1,20 @@
#
# Licensed to Metamarkets Group Inc. (Metamarkets) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. Metamarkets licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
io.druid.storage.cloudfiles.CloudFilesStorageDruidModule

View File

@ -0,0 +1,88 @@
/*
* Druid - a distributed column store.
* Copyright 2012 - 2015 Metamarkets Group Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.druid.storage.cloudfiles;
import org.easymock.EasyMockSupport;
import org.jclouds.io.Payload;
import org.junit.Test;
import java.io.IOException;
import java.io.InputStream;
import static org.easymock.EasyMock.expect;
import static org.junit.Assert.assertEquals;
public class CloudFilesByteSourceTest extends EasyMockSupport
{
@Test
public void openStreamTest() throws IOException
{
final String path = "path";
CloudFilesObjectApiProxy objectApi = createMock(CloudFilesObjectApiProxy.class);
CloudFilesObject cloudFilesObject = createMock(CloudFilesObject.class);
Payload payload = createMock(Payload.class);
InputStream stream = createMock(InputStream.class);
expect(objectApi.get(path)).andReturn(cloudFilesObject);
expect(cloudFilesObject.getPayload()).andReturn(payload);
expect(payload.openStream()).andReturn(stream);
payload.close();
replayAll();
CloudFilesByteSource byteSource = new CloudFilesByteSource(objectApi, path);
assertEquals(stream, byteSource.openStream());
byteSource.closeStream();
verifyAll();
}
@Test()
public void openStreamWithRecoverableErrorTest() throws IOException
{
final String path = "path";
CloudFilesObjectApiProxy objectApi = createMock(CloudFilesObjectApiProxy.class);
CloudFilesObject cloudFilesObject = createMock(CloudFilesObject.class);
Payload payload = createMock(Payload.class);
InputStream stream = createMock(InputStream.class);
expect(objectApi.get(path)).andReturn(cloudFilesObject);
expect(cloudFilesObject.getPayload()).andReturn(payload);
expect(payload.openStream()).andThrow(new IOException()).andReturn(stream);
payload.close();
replayAll();
CloudFilesByteSource byteSource = new CloudFilesByteSource(objectApi, path);
try {
byteSource.openStream();
}
catch (Exception e) {
assertEquals("Recoverable exception", e.getMessage());
}
assertEquals(stream, byteSource.openStream());
byteSource.closeStream();
verifyAll();
}
}

View File

@ -0,0 +1,62 @@
/*
* Druid - a distributed column store.
* Copyright 2012 - 2015 Metamarkets Group Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.druid.storage.cloudfiles;
import org.easymock.EasyMockSupport;
import org.jclouds.io.Payload;
import org.jclouds.openstack.swift.v1.domain.SwiftObject;
import org.jclouds.openstack.swift.v1.features.ObjectApi;
import org.jclouds.rackspace.cloudfiles.v1.CloudFilesApi;
import org.junit.Test;
import static org.easymock.EasyMock.expect;
import static org.junit.Assert.assertEquals;
public class CloudFilesObjectApiProxyTest extends EasyMockSupport
{
@Test
public void getTest()
{
final String path = "path";
final String region = "region";
final String container = "container";
CloudFilesApi cloudFilesApi = createMock(CloudFilesApi.class);
ObjectApi objectApi = createMock(ObjectApi.class);
SwiftObject swiftObject = createMock(SwiftObject.class);
Payload payload = createMock(Payload.class);
expect(cloudFilesApi.getObjectApi(region, container)).andReturn(objectApi);
expect(objectApi.get(path)).andReturn(swiftObject);
expect(swiftObject.getPayload()).andReturn(payload);
replayAll();
CloudFilesObjectApiProxy cfoApiProxy = new CloudFilesObjectApiProxy(cloudFilesApi, region, container);
CloudFilesObject cloudFilesObject = cfoApiProxy.get(path);
assertEquals(cloudFilesObject.getPayload(), payload);
assertEquals(cloudFilesObject.getRegion(), region);
assertEquals(cloudFilesObject.getContainer(), container);
assertEquals(cloudFilesObject.getPath(), path);
verifyAll();
}
}

View File

@ -104,6 +104,7 @@
<module>extensions/azure-extensions</module>
<module>extensions/namespace-lookup</module>
<module>extensions/kafka-extraction-namespace</module>
<module>extensions/cloudfiles-extensions</module>
<!-- distribution packaging -->
<module>distribution</module>