diff --git a/distribution/pom.xml b/distribution/pom.xml index e14101c4b79..db6ac8d099d 100644 --- a/distribution/pom.xml +++ b/distribution/pom.xml @@ -91,6 +91,8 @@ io.druid.extensions:druid-rabbitmq -c io.druid.extensions:druid-s3-extensions + -c + io.druid.extensions:druid-cloudfiles-extensions diff --git a/docs/content/dependencies/deep-storage.md b/docs/content/dependencies/deep-storage.md index 325b4c14cc0..95e51b3dee1 100644 --- a/docs/content/dependencies/deep-storage.md +++ b/docs/content/dependencies/deep-storage.md @@ -69,3 +69,21 @@ Please note that this is a community contributed module and does not support Cas |`druid.azure.maxTries`||Number of tries before cancel an Azure operation.|3| Please note that this is a community contributed module. See [Azure Services](http://azure.microsoft.com/en-us/pricing/free-trial/) for more information. + +### Rackspace + +[Rackspace Cloud Files](http://www.rackspace.com/cloud/files/) is another option for deep storage. This requires some additional druid configuration. + +|Property|Possible Values|Description|Default| +|--------|---------------|-----------|-------| +|`druid.storage.type`|cloudfiles||Must be set.| +|`druid.storage.region`||Rackspace Cloud Files region.|Must be set.| +|`druid.storage.container`||Rackspace Cloud Files container name.|Must be set.| +|`druid.storage.basePath`||Rackspace Cloud Files base path to use in the container.|Must be set.| +|`druid.storage.operationMaxRetries`||Number of tries before cancel a Rackspace operation.|10| +|`druid.cloudfiles.userName`||Rackspace Cloud username|Must be set.| +|`druid.cloudfiles.apiKey`||Rackspace Cloud api key.|Must be set.| +|`druid.cloudfiles.provider`|rackspace-cloudfiles-us,rackspace-cloudfiles-uk|Name of the provider depending on the region.|Must be set.| +|`druid.cloudfiles.useServiceNet`|true,false|Whether to use the internal service net.|true| + +Please note that this is a community contributed module. diff --git a/extensions/cloudfiles-extensions/pom.xml b/extensions/cloudfiles-extensions/pom.xml new file mode 100644 index 00000000000..7358d4b60b5 --- /dev/null +++ b/extensions/cloudfiles-extensions/pom.xml @@ -0,0 +1,106 @@ + + + + + + 4.0.0 + io.druid.extensions + druid-cloudfiles-extensions + druid-cloudfiles-extensions + druid-cloudfiles-extensions + + + io.druid + druid + 0.9.0-SNAPSHOT + ../../pom.xml + + + + UTF-8 + 1.9.1 + + 3.0 + + + + + io.druid + druid-api + + + com.google.inject + guice + ${guice.version} + + + + com.google.inject.extensions + guice-servlet + ${guice.version} + + + + com.google.inject.extensions + guice-multibindings + ${guice.version} + + + + + org.apache.jclouds.driver + jclouds-slf4j + ${jclouds.version} + + + org.apache.jclouds.driver + jclouds-sshj + ${jclouds.version} + + + + org.apache.jclouds.provider + rackspace-cloudfiles-us + ${jclouds.version} + + + + org.apache.jclouds.provider + rackspace-cloudfiles-uk + ${jclouds.version} + + + + + junit + junit + test + + + org.easymock + easymock + test + + + + diff --git a/extensions/cloudfiles-extensions/src/main/java/io/druid/storage/cloudfiles/CloudFilesAccountConfig.java b/extensions/cloudfiles-extensions/src/main/java/io/druid/storage/cloudfiles/CloudFilesAccountConfig.java new file mode 100644 index 00000000000..9a1d7774ae3 --- /dev/null +++ b/extensions/cloudfiles-extensions/src/main/java/io/druid/storage/cloudfiles/CloudFilesAccountConfig.java @@ -0,0 +1,63 @@ +/* + * Druid - a distributed column store. + * Copyright 2012 - 2015 Metamarkets Group Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.druid.storage.cloudfiles; + +import com.fasterxml.jackson.annotation.JsonProperty; + +import javax.validation.constraints.NotNull; + +public class CloudFilesAccountConfig +{ + + @JsonProperty + @NotNull + private String provider; + + @JsonProperty + @NotNull + private String userName; + + @JsonProperty + @NotNull + private String apiKey; + + @JsonProperty + @NotNull + private boolean useServiceNet = true; + + public String getProvider() + { + return provider; + } + + public String getUserName() + { + return userName; + } + + public String getApiKey() + { + return apiKey; + } + + public boolean getUseServiceNet() + { + return useServiceNet; + } + +} diff --git a/extensions/cloudfiles-extensions/src/main/java/io/druid/storage/cloudfiles/CloudFilesByteSource.java b/extensions/cloudfiles-extensions/src/main/java/io/druid/storage/cloudfiles/CloudFilesByteSource.java new file mode 100644 index 00000000000..83f462aeaf0 --- /dev/null +++ b/extensions/cloudfiles-extensions/src/main/java/io/druid/storage/cloudfiles/CloudFilesByteSource.java @@ -0,0 +1,64 @@ +/* + * Druid - a distributed column store. + * Copyright 2012 - 2015 Metamarkets Group Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.druid.storage.cloudfiles; + +import com.google.common.base.Throwables; +import com.google.common.io.ByteSource; +import org.jclouds.io.Payload; + +import java.io.IOException; +import java.io.InputStream; + +public class CloudFilesByteSource extends ByteSource +{ + + final private CloudFilesObjectApiProxy objectApi; + final private String path; + private Payload payload; + + public CloudFilesByteSource(CloudFilesObjectApiProxy objectApi, String path) + { + this.objectApi = objectApi; + this.path = path; + this.payload = null; + } + + public void closeStream() throws IOException + { + if (payload != null) { + payload.close(); + payload = null; + } + } + + @Override + public InputStream openStream() throws IOException + { + payload = (payload == null) ? objectApi.get(path).getPayload() : payload; + + try { + return payload.openStream(); + } + catch (IOException e) { + if (CloudFilesUtils.CLOUDFILESRETRY.apply(e)) { + throw new IOException("Recoverable exception", e); + } + throw Throwables.propagate(e); + } + } +} diff --git a/extensions/cloudfiles-extensions/src/main/java/io/druid/storage/cloudfiles/CloudFilesDataSegmentPuller.java b/extensions/cloudfiles-extensions/src/main/java/io/druid/storage/cloudfiles/CloudFilesDataSegmentPuller.java new file mode 100644 index 00000000000..c759fc03f02 --- /dev/null +++ b/extensions/cloudfiles-extensions/src/main/java/io/druid/storage/cloudfiles/CloudFilesDataSegmentPuller.java @@ -0,0 +1,110 @@ +/* + * Druid - a distributed column store. + * Copyright 2012 - 2015 Metamarkets Group Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.druid.storage.cloudfiles; + +import com.google.inject.Inject; +import com.metamx.common.CompressionUtils; +import com.metamx.common.FileUtils; +import com.metamx.common.ISE; +import com.metamx.common.MapUtils; +import com.metamx.common.logger.Logger; +import io.druid.segment.loading.DataSegmentPuller; +import io.druid.segment.loading.SegmentLoadingException; +import io.druid.timeline.DataSegment; +import org.jclouds.rackspace.cloudfiles.v1.CloudFilesApi; + +import java.io.File; +import java.io.IOException; +import java.util.Map; + +/** + * A data segment puller that also handles URI data pulls. + */ +public class CloudFilesDataSegmentPuller implements DataSegmentPuller +{ + + private static final Logger log = new Logger(CloudFilesDataSegmentPuller.class); + private final CloudFilesApi cloudFilesApi; + + @Inject + public CloudFilesDataSegmentPuller(final CloudFilesApi cloudFilesApi) + { + this.cloudFilesApi = cloudFilesApi; + } + + @Override + public void getSegmentFiles(final DataSegment segment, final File outDir) throws SegmentLoadingException + { + final Map loadSpec = segment.getLoadSpec(); + final String region = MapUtils.getString(loadSpec, "region"); + final String container = MapUtils.getString(loadSpec, "container"); + final String path = MapUtils.getString(loadSpec, "path"); + + log.info("Pulling index at path[%s] to outDir[%s]", path, outDir); + prepareOutDir(outDir); + getSegmentFiles(region, container, path, outDir); + } + + public FileUtils.FileCopyResult getSegmentFiles(String region, String container, String path, File outDir) + throws SegmentLoadingException + { + CloudFilesObjectApiProxy objectApi = new CloudFilesObjectApiProxy(cloudFilesApi, region, container); + final CloudFilesByteSource byteSource = new CloudFilesByteSource(objectApi, path); + + try { + final FileUtils.FileCopyResult result = CompressionUtils.unzip( + byteSource, outDir, + CloudFilesUtils.CLOUDFILESRETRY, true + ); + log.info("Loaded %d bytes from [%s] to [%s]", result.size(), path, outDir.getAbsolutePath()); + return result; + } + catch (Exception e) { + try { + org.apache.commons.io.FileUtils.deleteDirectory(outDir); + } + catch (IOException ioe) { + log.warn( + ioe, "Failed to remove output directory [%s] for segment pulled from [%s]", + outDir.getAbsolutePath(), path + ); + } + throw new SegmentLoadingException(e, e.getMessage()); + } + finally { + try { + byteSource.closeStream(); + } + catch (IOException ioe) { + log.warn(ioe, "Failed to close payload for segmente pulled from [%s]", path); + } + } + } + + private void prepareOutDir(final File outDir) throws ISE + { + if (!outDir.exists()) { + outDir.mkdirs(); + } + + if (!outDir.isDirectory()) { + throw new ISE("outDir[%s] must be a directory.", outDir); + } + } + +} diff --git a/extensions/cloudfiles-extensions/src/main/java/io/druid/storage/cloudfiles/CloudFilesDataSegmentPusher.java b/extensions/cloudfiles-extensions/src/main/java/io/druid/storage/cloudfiles/CloudFilesDataSegmentPusher.java new file mode 100644 index 00000000000..fdbc322e32f --- /dev/null +++ b/extensions/cloudfiles-extensions/src/main/java/io/druid/storage/cloudfiles/CloudFilesDataSegmentPusher.java @@ -0,0 +1,131 @@ +/* + * Druid - a distributed column store. + * Copyright 2012 - 2015 Metamarkets Group Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.druid.storage.cloudfiles; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableMap; +import com.google.inject.Inject; +import com.metamx.common.CompressionUtils; +import com.metamx.common.logger.Logger; +import io.druid.segment.SegmentUtils; +import io.druid.segment.loading.DataSegmentPusher; +import io.druid.timeline.DataSegment; +import org.jclouds.rackspace.cloudfiles.v1.CloudFilesApi; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.util.concurrent.Callable; + +public class CloudFilesDataSegmentPusher implements DataSegmentPusher +{ + + private static final Logger log = new Logger(CloudFilesDataSegmentPusher.class); + private final CloudFilesObjectApiProxy objectApi; + private final CloudFilesDataSegmentPusherConfig config; + private final ObjectMapper jsonMapper; + + @Inject + public CloudFilesDataSegmentPusher( + final CloudFilesApi cloudFilesApi, + final CloudFilesDataSegmentPusherConfig config, final ObjectMapper jsonMapper + ) + { + this.config = config; + String region = this.config.getRegion(); + String container = this.config.getContainer(); + this.objectApi = new CloudFilesObjectApiProxy(cloudFilesApi, region, container); + this.jsonMapper = jsonMapper; + + log.info("Configured CloudFiles as deep storage"); + } + + @Override + public String getPathForHadoop(final String dataSource) + { + return null; + } + + @Override + public DataSegment push(final File indexFilesDir, final DataSegment inSegment) throws IOException + { + final String segmentPath = CloudFilesUtils.buildCloudFilesPath(this.config.getBasePath(), inSegment); + final File zipOutFile = File.createTempFile("druid", "index.zip"); + final File descriptorFile = File.createTempFile("descriptor", ".json"); + + log.info("Copying segment[%s] to CloudFiles at location[%s]", inSegment.getIdentifier(), segmentPath); + + try { + return CloudFilesUtils.retryCloudFilesOperation( + new Callable() + { + @Override + public DataSegment call() throws Exception + { + CompressionUtils.zip(indexFilesDir, zipOutFile); + CloudFilesObject segmentData = new CloudFilesObject( + segmentPath, zipOutFile, objectApi.getRegion(), + objectApi.getContainer() + ); + log.info("Pushing %s.", segmentData.getPath()); + objectApi.put(segmentData); + + try (FileOutputStream stream = new FileOutputStream(descriptorFile)) { + stream.write(jsonMapper.writeValueAsBytes(inSegment)); + } + CloudFilesObject descriptorData = new CloudFilesObject( + segmentPath, descriptorFile, + objectApi.getRegion(), objectApi.getContainer() + ); + log.info("Pushing %s.", descriptorData.getPath()); + objectApi.put(descriptorData); + + final DataSegment outSegment = inSegment + .withSize(segmentData.getFile().length()) + .withLoadSpec( + ImmutableMap.of( + "type", + CloudFilesStorageDruidModule.SCHEME, + "region", + segmentData.getRegion(), + "container", + segmentData.getContainer(), + "path", + segmentData.getPath() + ) + ) + .withBinaryVersion(SegmentUtils.getVersionFromDir(indexFilesDir)); + + return outSegment; + } + }, this.config.getOperationMaxRetries() + ); + } + catch (Exception e) { + throw Throwables.propagate(e); + } + finally { + log.info("Deleting zipped index File[%s]", zipOutFile); + zipOutFile.delete(); + + log.info("Deleting descriptor file[%s]", descriptorFile); + descriptorFile.delete(); + } + } +} diff --git a/extensions/cloudfiles-extensions/src/main/java/io/druid/storage/cloudfiles/CloudFilesDataSegmentPusherConfig.java b/extensions/cloudfiles-extensions/src/main/java/io/druid/storage/cloudfiles/CloudFilesDataSegmentPusherConfig.java new file mode 100644 index 00000000000..3d6eff381f2 --- /dev/null +++ b/extensions/cloudfiles-extensions/src/main/java/io/druid/storage/cloudfiles/CloudFilesDataSegmentPusherConfig.java @@ -0,0 +1,68 @@ +/* + * Druid - a distributed column store. + * Copyright 2012 - 2015 Metamarkets Group Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.druid.storage.cloudfiles; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; + +import javax.validation.constraints.NotNull; + +/** + */ +public class CloudFilesDataSegmentPusherConfig +{ + + @JsonProperty + @NotNull + private String region; + + @JsonProperty + @NotNull + private String container; + + @JsonProperty + @NotNull + private String basePath; + + @JsonProperty + private int operationMaxRetries = 10; + + public String getRegion() + { + Preconditions.checkNotNull(region); + return region; + } + + public String getContainer() + { + Preconditions.checkNotNull(container); + return container; + } + + public String getBasePath() + { + Preconditions.checkNotNull(basePath); + return basePath; + } + + public int getOperationMaxRetries() + { + return operationMaxRetries; + } + +} diff --git a/extensions/cloudfiles-extensions/src/main/java/io/druid/storage/cloudfiles/CloudFilesLoadSpec.java b/extensions/cloudfiles-extensions/src/main/java/io/druid/storage/cloudfiles/CloudFilesLoadSpec.java new file mode 100644 index 00000000000..c72532c2620 --- /dev/null +++ b/extensions/cloudfiles-extensions/src/main/java/io/druid/storage/cloudfiles/CloudFilesLoadSpec.java @@ -0,0 +1,65 @@ +/* + * Druid - a distributed column store. + * Copyright 2012 - 2015 Metamarkets Group Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.druid.storage.cloudfiles; + +import com.fasterxml.jackson.annotation.JacksonInject; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; +import com.google.common.base.Preconditions; +import io.druid.segment.loading.LoadSpec; +import io.druid.segment.loading.SegmentLoadingException; + +import java.io.File; + +@JsonTypeName(CloudFilesStorageDruidModule.SCHEME) +public class CloudFilesLoadSpec implements LoadSpec +{ + + @JsonProperty + private final String region; + + @JsonProperty + private final String container; + + @JsonProperty + private final String path; + + private final CloudFilesDataSegmentPuller puller; + + @JsonCreator + public CloudFilesLoadSpec( + @JsonProperty("region") String region, @JsonProperty("container") String container, + @JsonProperty("path") String path, @JacksonInject CloudFilesDataSegmentPuller puller + ) + { + Preconditions.checkNotNull(region); + Preconditions.checkNotNull(container); + Preconditions.checkNotNull(path); + this.container = container; + this.region = region; + this.path = path; + this.puller = puller; + } + + @Override + public LoadSpecResult loadSegment(File file) throws SegmentLoadingException + { + return new LoadSpecResult(puller.getSegmentFiles(region, container, path, file).size()); + } +} diff --git a/extensions/cloudfiles-extensions/src/main/java/io/druid/storage/cloudfiles/CloudFilesObject.java b/extensions/cloudfiles-extensions/src/main/java/io/druid/storage/cloudfiles/CloudFilesObject.java new file mode 100644 index 00000000000..8f07b369653 --- /dev/null +++ b/extensions/cloudfiles-extensions/src/main/java/io/druid/storage/cloudfiles/CloudFilesObject.java @@ -0,0 +1,88 @@ +/* + * Druid - a distributed column store. + * Copyright 2012 - 2015 Metamarkets Group Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.druid.storage.cloudfiles; + +import com.google.common.io.ByteSource; +import com.google.common.io.Files; +import org.jclouds.io.Payload; +import org.jclouds.io.Payloads; + +import java.io.File; + +public class CloudFilesObject +{ + + private File file; + private Payload payload; + private String path; + private final String region; + private final String container; + + public CloudFilesObject(final String basePath, final File file, final String region, final String container) + { + this(region, container); + this.file = file; + ByteSource byteSource = Files.asByteSource(file); + this.payload = Payloads.newByteSourcePayload(byteSource); + this.path = CloudFilesUtils.buildCloudFilesPath(basePath, file.getName()); + } + + public CloudFilesObject(final Payload payload, final String region, final String container, final String path) + { + this(region, container, path); + this.payload = payload; + } + + private CloudFilesObject(final String region, final String container, final String path) + { + this(region, container); + this.path = path; + } + + private CloudFilesObject(final String region, final String container) + { + this.region = region; + this.container = container; + } + + public File getFile() + { + return file; + } + + public String getRegion() + { + return region; + } + + public String getContainer() + { + return container; + } + + public String getPath() + { + return path; + } + + public Payload getPayload() + { + return payload; + } + +} diff --git a/extensions/cloudfiles-extensions/src/main/java/io/druid/storage/cloudfiles/CloudFilesObjectApiProxy.java b/extensions/cloudfiles-extensions/src/main/java/io/druid/storage/cloudfiles/CloudFilesObjectApiProxy.java new file mode 100644 index 00000000000..7224856fa00 --- /dev/null +++ b/extensions/cloudfiles-extensions/src/main/java/io/druid/storage/cloudfiles/CloudFilesObjectApiProxy.java @@ -0,0 +1,59 @@ +/* + * Druid - a distributed column store. + * Copyright 2012 - 2015 Metamarkets Group Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.druid.storage.cloudfiles; + +import org.jclouds.io.Payload; +import org.jclouds.openstack.swift.v1.domain.SwiftObject; +import org.jclouds.openstack.swift.v1.features.ObjectApi; +import org.jclouds.rackspace.cloudfiles.v1.CloudFilesApi; + +public class CloudFilesObjectApiProxy +{ + private final ObjectApi objectApi; + private final String region; + private final String container; + + public CloudFilesObjectApiProxy(final CloudFilesApi cloudFilesApi, final String region, final String container) + { + this.region = region; + this.container = container; + this.objectApi = cloudFilesApi.getObjectApi(region, container); + } + + public String getRegion() + { + return region; + } + + public String getContainer() + { + return container; + } + + public String put(final CloudFilesObject cloudFilesObject) + { + return objectApi.put(cloudFilesObject.getPath(), cloudFilesObject.getPayload()); + } + + public CloudFilesObject get(String path) + { + SwiftObject swiftObject = objectApi.get(path); + Payload payload = swiftObject.getPayload(); + return new CloudFilesObject(payload, this.region, this.container, path); + } +} diff --git a/extensions/cloudfiles-extensions/src/main/java/io/druid/storage/cloudfiles/CloudFilesStorageDruidModule.java b/extensions/cloudfiles-extensions/src/main/java/io/druid/storage/cloudfiles/CloudFilesStorageDruidModule.java new file mode 100644 index 00000000000..9b70aed6665 --- /dev/null +++ b/extensions/cloudfiles-extensions/src/main/java/io/druid/storage/cloudfiles/CloudFilesStorageDruidModule.java @@ -0,0 +1,117 @@ +/* + * Druid - a distributed column store. + * Copyright 2012 - 2015 Metamarkets Group Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.druid.storage.cloudfiles; + +import com.fasterxml.jackson.core.Version; +import com.fasterxml.jackson.databind.Module; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import com.google.inject.Binder; +import com.google.inject.Provides; +import com.metamx.common.logger.Logger; +import io.druid.guice.Binders; +import io.druid.guice.JsonConfigProvider; +import io.druid.guice.LazySingleton; +import io.druid.initialization.DruidModule; +import org.jclouds.ContextBuilder; +import org.jclouds.logging.slf4j.config.SLF4JLoggingModule; +import org.jclouds.openstack.v2_0.config.InternalUrlModule; +import org.jclouds.osgi.ProviderRegistry; +import org.jclouds.rackspace.cloudfiles.uk.CloudFilesUKProviderMetadata; +import org.jclouds.rackspace.cloudfiles.us.CloudFilesUSProviderMetadata; +import org.jclouds.rackspace.cloudfiles.v1.CloudFilesApi; + +import java.util.List; + +/** + */ +public class CloudFilesStorageDruidModule implements DruidModule +{ + + private static final Logger log = new Logger(CloudFilesStorageDruidModule.class); + public static final String SCHEME = "cloudfiles"; + + @Override + public List getJacksonModules() + { + log.info("Getting jackson modules..."); + + return ImmutableList.of( + new Module() + { + @Override + public String getModuleName() + { + return "CloudFiles-" + System.identityHashCode(this); + } + + @Override + public Version version() + { + return Version.unknownVersion(); + } + + @Override + public void setupModule(SetupContext context) + { + context.registerSubtypes(CloudFilesLoadSpec.class); + } + } + ); + } + + @Override + public void configure(Binder binder) + { + log.info("Configuring CloudFilesStorageDruidModule..."); + JsonConfigProvider.bind(binder, "druid.storage", CloudFilesDataSegmentPusherConfig.class); + JsonConfigProvider.bind(binder, "druid.cloudfiles", CloudFilesAccountConfig.class); + + Binders.dataSegmentPullerBinder(binder).addBinding(SCHEME).to(CloudFilesDataSegmentPuller.class) + .in(LazySingleton.class); + Binders.dataSegmentPusherBinder(binder).addBinding(SCHEME).to(CloudFilesDataSegmentPusher.class) + .in(LazySingleton.class); + + log.info("Configured CloudFilesStorageDruidModule."); + } + + @Provides + @LazySingleton + public CloudFilesApi getCloudFilesApi(final CloudFilesAccountConfig config) + { + log.info("Building Cloud Files Api..."); + + Iterable modules = null; + if (config.getUseServiceNet()) { + log.info("Configuring Cloud Files Api to use the internal service network..."); + modules = ImmutableSet.of(new SLF4JLoggingModule(), new InternalUrlModule()); + } else { + log.info("Configuring Cloud Files Api to use the public network..."); + modules = ImmutableSet.of(new SLF4JLoggingModule()); + } + + ProviderRegistry.registerProvider(CloudFilesUSProviderMetadata.builder().build()); + ProviderRegistry.registerProvider(CloudFilesUKProviderMetadata.builder().build()); + ContextBuilder cb = ContextBuilder.newBuilder(config.getProvider()) + .credentials(config.getUserName(), config.getApiKey()).modules(modules); + CloudFilesApi cfa = cb.buildApi(CloudFilesApi.class); + log.info("Cloud Files Api built."); + return cfa; + } + +} diff --git a/extensions/cloudfiles-extensions/src/main/java/io/druid/storage/cloudfiles/CloudFilesUtils.java b/extensions/cloudfiles-extensions/src/main/java/io/druid/storage/cloudfiles/CloudFilesUtils.java new file mode 100644 index 00000000000..a90871eabc1 --- /dev/null +++ b/extensions/cloudfiles-extensions/src/main/java/io/druid/storage/cloudfiles/CloudFilesUtils.java @@ -0,0 +1,75 @@ +/* + * Druid - a distributed column store. + * Copyright 2012 - 2015 Metamarkets Group Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.druid.storage.cloudfiles; + +import com.google.common.base.Predicate; +import com.metamx.common.RetryUtils; +import io.druid.segment.loading.DataSegmentPusherUtil; +import io.druid.timeline.DataSegment; + +import java.io.IOException; +import java.util.concurrent.Callable; + +/** + * + */ +public class CloudFilesUtils +{ + + public static final Predicate CLOUDFILESRETRY = new Predicate() + { + @Override + public boolean apply(Throwable e) + { + if (e == null) { + return false; + } else if (e instanceof IOException) { + return true; + } else { + return apply(e.getCause()); + } + } + }; + + /** + * Retries CloudFiles operations that fail due to io-related exceptions. + */ + public static T retryCloudFilesOperation(Callable f, final int maxTries) throws Exception + { + return RetryUtils.retry(f, CLOUDFILESRETRY, maxTries); + } + + public static String buildCloudFilesPath(String basePath, final String fileName) + { + String path = fileName; + if (!basePath.isEmpty()) { + int lastSlashIndex = basePath.lastIndexOf("/"); + if (lastSlashIndex != -1) { + basePath = basePath.substring(0, lastSlashIndex); + } + path = basePath + "/" + fileName; + } + return path; + } + + public static String buildCloudFilesPath(String basePath, final DataSegment segment) + { + return buildCloudFilesPath(basePath, DataSegmentPusherUtil.getStorageDir(segment)); + } + +} diff --git a/extensions/cloudfiles-extensions/src/main/resources/META-INF/services/io.druid.initialization.DruidModule b/extensions/cloudfiles-extensions/src/main/resources/META-INF/services/io.druid.initialization.DruidModule new file mode 100644 index 00000000000..184f8677e4d --- /dev/null +++ b/extensions/cloudfiles-extensions/src/main/resources/META-INF/services/io.druid.initialization.DruidModule @@ -0,0 +1,20 @@ +# +# Licensed to Metamarkets Group Inc. (Metamarkets) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. Metamarkets licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +io.druid.storage.cloudfiles.CloudFilesStorageDruidModule diff --git a/extensions/cloudfiles-extensions/src/test/java/io/druid/storage/cloudfiles/CloudFilesByteSourceTest.java b/extensions/cloudfiles-extensions/src/test/java/io/druid/storage/cloudfiles/CloudFilesByteSourceTest.java new file mode 100644 index 00000000000..50cfe3875cc --- /dev/null +++ b/extensions/cloudfiles-extensions/src/test/java/io/druid/storage/cloudfiles/CloudFilesByteSourceTest.java @@ -0,0 +1,88 @@ +/* + * Druid - a distributed column store. + * Copyright 2012 - 2015 Metamarkets Group Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.druid.storage.cloudfiles; + +import org.easymock.EasyMockSupport; +import org.jclouds.io.Payload; +import org.junit.Test; + +import java.io.IOException; +import java.io.InputStream; + +import static org.easymock.EasyMock.expect; +import static org.junit.Assert.assertEquals; + +public class CloudFilesByteSourceTest extends EasyMockSupport +{ + + @Test + public void openStreamTest() throws IOException + { + final String path = "path"; + + CloudFilesObjectApiProxy objectApi = createMock(CloudFilesObjectApiProxy.class); + CloudFilesObject cloudFilesObject = createMock(CloudFilesObject.class); + Payload payload = createMock(Payload.class); + InputStream stream = createMock(InputStream.class); + + expect(objectApi.get(path)).andReturn(cloudFilesObject); + expect(cloudFilesObject.getPayload()).andReturn(payload); + expect(payload.openStream()).andReturn(stream); + payload.close(); + + replayAll(); + + CloudFilesByteSource byteSource = new CloudFilesByteSource(objectApi, path); + assertEquals(stream, byteSource.openStream()); + byteSource.closeStream(); + + verifyAll(); + } + + @Test() + public void openStreamWithRecoverableErrorTest() throws IOException + { + final String path = "path"; + + CloudFilesObjectApiProxy objectApi = createMock(CloudFilesObjectApiProxy.class); + CloudFilesObject cloudFilesObject = createMock(CloudFilesObject.class); + Payload payload = createMock(Payload.class); + InputStream stream = createMock(InputStream.class); + + expect(objectApi.get(path)).andReturn(cloudFilesObject); + expect(cloudFilesObject.getPayload()).andReturn(payload); + expect(payload.openStream()).andThrow(new IOException()).andReturn(stream); + payload.close(); + + replayAll(); + + CloudFilesByteSource byteSource = new CloudFilesByteSource(objectApi, path); + try { + byteSource.openStream(); + } + catch (Exception e) { + assertEquals("Recoverable exception", e.getMessage()); + } + + assertEquals(stream, byteSource.openStream()); + byteSource.closeStream(); + + verifyAll(); + } + +} diff --git a/extensions/cloudfiles-extensions/src/test/java/io/druid/storage/cloudfiles/CloudFilesObjectApiProxyTest.java b/extensions/cloudfiles-extensions/src/test/java/io/druid/storage/cloudfiles/CloudFilesObjectApiProxyTest.java new file mode 100644 index 00000000000..2346c434b6d --- /dev/null +++ b/extensions/cloudfiles-extensions/src/test/java/io/druid/storage/cloudfiles/CloudFilesObjectApiProxyTest.java @@ -0,0 +1,62 @@ +/* + * Druid - a distributed column store. + * Copyright 2012 - 2015 Metamarkets Group Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.druid.storage.cloudfiles; + +import org.easymock.EasyMockSupport; +import org.jclouds.io.Payload; +import org.jclouds.openstack.swift.v1.domain.SwiftObject; +import org.jclouds.openstack.swift.v1.features.ObjectApi; +import org.jclouds.rackspace.cloudfiles.v1.CloudFilesApi; +import org.junit.Test; + +import static org.easymock.EasyMock.expect; +import static org.junit.Assert.assertEquals; + +public class CloudFilesObjectApiProxyTest extends EasyMockSupport +{ + + @Test + public void getTest() + { + final String path = "path"; + final String region = "region"; + final String container = "container"; + + CloudFilesApi cloudFilesApi = createMock(CloudFilesApi.class); + ObjectApi objectApi = createMock(ObjectApi.class); + SwiftObject swiftObject = createMock(SwiftObject.class); + Payload payload = createMock(Payload.class); + + expect(cloudFilesApi.getObjectApi(region, container)).andReturn(objectApi); + expect(objectApi.get(path)).andReturn(swiftObject); + expect(swiftObject.getPayload()).andReturn(payload); + + replayAll(); + + CloudFilesObjectApiProxy cfoApiProxy = new CloudFilesObjectApiProxy(cloudFilesApi, region, container); + CloudFilesObject cloudFilesObject = cfoApiProxy.get(path); + + assertEquals(cloudFilesObject.getPayload(), payload); + assertEquals(cloudFilesObject.getRegion(), region); + assertEquals(cloudFilesObject.getContainer(), container); + assertEquals(cloudFilesObject.getPath(), path); + + verifyAll(); + } + +} diff --git a/pom.xml b/pom.xml index a83cfed22f8..15089cc8f01 100644 --- a/pom.xml +++ b/pom.xml @@ -104,6 +104,7 @@ extensions/azure-extensions extensions/namespace-lookup extensions/kafka-extraction-namespace + extensions/cloudfiles-extensions distribution