makeLoadSpec(String bucket, String key)
+ {
+ return ImmutableMap.of(
+ "type",
+ OssStorageDruidModule.SCHEME_ZIP,
+ "bucket",
+ bucket,
+ "key",
+ key
+ );
+ }
+
+}
diff --git a/extensions-contrib/aliyun-oss-extensions/src/main/java/org/apache/druid/storage/aliyun/OssInputDataConfig.java b/extensions-contrib/aliyun-oss-extensions/src/main/java/org/apache/druid/storage/aliyun/OssInputDataConfig.java
new file mode 100644
index 00000000000..c2ef2dfb465
--- /dev/null
+++ b/extensions-contrib/aliyun-oss-extensions/src/main/java/org/apache/druid/storage/aliyun/OssInputDataConfig.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.storage.aliyun;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import javax.validation.constraints.Max;
+import javax.validation.constraints.Min;
+
+/**
+ * Stores the configuration for options related to reading
+ * input data from aliyun OSS into Druid
+ */
+public class OssInputDataConfig
+{
+ /**
+ * The maximum number of input files matching a given prefix to retrieve
+ * from aliyun OSS at a time.
+ * valid range is [1,1000]
+ */
+ @JsonProperty
+ @Min(1)
+ @Max(OssUtils.MAX_LISTING_LENGTH)
+ private int maxListingLength = OssUtils.MAX_LISTING_LENGTH;
+
+ public void setMaxListingLength(int maxListingLength)
+ {
+ this.maxListingLength = maxListingLength;
+ }
+
+ public int getMaxListingLength()
+ {
+ return maxListingLength;
+ }
+}
diff --git a/extensions-contrib/aliyun-oss-extensions/src/main/java/org/apache/druid/storage/aliyun/OssLoadSpec.java b/extensions-contrib/aliyun-oss-extensions/src/main/java/org/apache/druid/storage/aliyun/OssLoadSpec.java
new file mode 100644
index 00000000000..155c26fbf3c
--- /dev/null
+++ b/extensions-contrib/aliyun-oss-extensions/src/main/java/org/apache/druid/storage/aliyun/OssLoadSpec.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.storage.aliyun;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.base.Preconditions;
+import org.apache.druid.data.input.impl.CloudObjectLocation;
+import org.apache.druid.segment.loading.LoadSpec;
+import org.apache.druid.segment.loading.SegmentLoadingException;
+
+import java.io.File;
+
+@JsonTypeName(OssStorageDruidModule.SCHEME_ZIP)
+public class OssLoadSpec implements LoadSpec
+{
+ private final String bucket;
+ private final String key;
+
+ private final OssDataSegmentPuller puller;
+
+ @JsonCreator
+ public OssLoadSpec(
+ @JacksonInject OssDataSegmentPuller puller,
+ @JsonProperty(OssDataSegmentPuller.BUCKET) String bucket,
+ @JsonProperty(OssDataSegmentPuller.KEY) String key
+ )
+ {
+ Preconditions.checkNotNull(bucket);
+ Preconditions.checkNotNull(key);
+ this.bucket = bucket;
+ this.key = key;
+ this.puller = puller;
+ }
+
+ @Override
+ public LoadSpecResult loadSegment(File outDir) throws SegmentLoadingException
+ {
+ return new LoadSpecResult(puller.getSegmentFiles(new CloudObjectLocation(bucket, key), outDir).size());
+ }
+
+ @JsonProperty(OssDataSegmentPuller.BUCKET)
+ public String getBucket()
+ {
+ return bucket;
+ }
+
+ @JsonProperty(OssDataSegmentPuller.KEY)
+ public String getKey()
+ {
+ return key;
+ }
+}
diff --git a/extensions-contrib/aliyun-oss-extensions/src/main/java/org/apache/druid/storage/aliyun/OssObjectSummaryIterator.java b/extensions-contrib/aliyun-oss-extensions/src/main/java/org/apache/druid/storage/aliyun/OssObjectSummaryIterator.java
new file mode 100644
index 00000000000..8bba8961eee
--- /dev/null
+++ b/extensions-contrib/aliyun-oss-extensions/src/main/java/org/apache/druid/storage/aliyun/OssObjectSummaryIterator.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.storage.aliyun;
+
+import com.aliyun.oss.OSS;
+import com.aliyun.oss.OSSException;
+import com.aliyun.oss.model.ListObjectsRequest;
+import com.aliyun.oss.model.OSSObjectSummary;
+import com.aliyun.oss.model.ObjectListing;
+import org.apache.druid.java.util.common.RE;
+
+import java.net.URI;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+/**
+ * Iterator class used by {@link OssUtils#objectSummaryIterator}.
+ *
+ * As required by the specification of that method, this iterator is computed incrementally in batches of
+ * {@code maxListLength}. The first call is made at the same time the iterator is constructed.
+ *
+ */
+public class OssObjectSummaryIterator implements Iterator
+{
+ private final OSS client;
+ private final Iterator prefixesIterator;
+ private final int maxListingLength;
+
+ private ListObjectsRequest request;
+ private ObjectListing result;
+ private Iterator objectSummaryIterator;
+ private OSSObjectSummary currentObjectSummary;
+
+ OssObjectSummaryIterator(
+ final OSS client,
+ final Iterable prefixes,
+ final int maxListingLength
+ )
+ {
+ this.client = client;
+ this.prefixesIterator = prefixes.iterator();
+ this.maxListingLength = Math.min(OssUtils.MAX_LISTING_LENGTH, maxListingLength);
+
+ prepareNextRequest();
+ fetchNextBatch();
+ advanceObjectSummary();
+ }
+
+ @Override
+ public boolean hasNext()
+ {
+ return currentObjectSummary != null;
+ }
+
+ @Override
+ public OSSObjectSummary next()
+ {
+ if (currentObjectSummary == null) {
+ throw new NoSuchElementException();
+ }
+
+ final OSSObjectSummary retVal = currentObjectSummary;
+ advanceObjectSummary();
+ return retVal;
+ }
+
+ private void prepareNextRequest()
+ {
+ final URI currentUri = prefixesIterator.next();
+ final String currentBucket = currentUri.getAuthority();
+ final String currentPrefix = OssUtils.extractKey(currentUri);
+
+ request = new ListObjectsRequest(currentBucket, currentPrefix, null, null, maxListingLength);
+ }
+
+ private void fetchNextBatch()
+ {
+ try {
+ result = OssUtils.retry(() -> client.listObjects(request));
+ request.setMarker(result.getNextMarker());
+ objectSummaryIterator = result.getObjectSummaries().iterator();
+ }
+ catch (OSSException e) {
+ throw new RE(
+ e,
+ "Failed to get object summaries from aliyun OSS bucket[%s], prefix[%s]; error: %s",
+ request.getBucketName(),
+ request.getPrefix(),
+ e.getMessage()
+ );
+ }
+ catch (Exception e) {
+ throw new RE(
+ e,
+ "Failed to get object summaries from aliyun OSS bucket[%s], prefix[%s]",
+ request.getBucketName(),
+ request.getPrefix()
+ );
+ }
+ }
+
+ /**
+ * Advance objectSummaryIterator to the next non-placeholder, updating "currentObjectSummary".
+ */
+ private void advanceObjectSummary()
+ {
+ while (objectSummaryIterator.hasNext() || result.isTruncated() || prefixesIterator.hasNext()) {
+ while (objectSummaryIterator.hasNext()) {
+ currentObjectSummary = objectSummaryIterator.next();
+ // skips directories and empty objects
+ if (currentObjectSummary.getSize() > 0 && !isDirectory(currentObjectSummary)) {
+ return;
+ }
+ }
+
+ // Exhausted "objectSummaryIterator" without finding a non-placeholder.
+ if (result.isTruncated()) {
+ fetchNextBatch();
+ } else if (prefixesIterator.hasNext()) {
+ prepareNextRequest();
+ fetchNextBatch();
+ }
+ }
+
+ // Truly nothing left to read.
+ currentObjectSummary = null;
+ }
+
+ /**
+ * Checks if a given object is a directory placeholder and should be ignored.
+ *
+ * Based on {@link org.apache.druid.storage.s3.ObjectSummaryIterator} which is adapted from org.jets3t.service.model.StorageObject.isDirectoryPlaceholder().
+ *
+ */
+ private static boolean isDirectory(final OSSObjectSummary objectSummary)
+ {
+ return objectSummary.getSize() == 0 && objectSummary.getKey().endsWith("/");
+ }
+}
diff --git a/extensions-contrib/aliyun-oss-extensions/src/main/java/org/apache/druid/storage/aliyun/OssStorageConfig.java b/extensions-contrib/aliyun-oss-extensions/src/main/java/org/apache/druid/storage/aliyun/OssStorageConfig.java
new file mode 100644
index 00000000000..d3edcd6105c
--- /dev/null
+++ b/extensions-contrib/aliyun-oss-extensions/src/main/java/org/apache/druid/storage/aliyun/OssStorageConfig.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.storage.aliyun;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public class OssStorageConfig
+{
+ @JsonProperty
+ private String bucket = "";
+
+ @JsonProperty
+ private String prefix = "";
+
+ public void setBucket(String bucket)
+ {
+ this.bucket = bucket;
+ }
+ public void setPrefix(String prefix)
+ {
+ this.prefix = prefix;
+ }
+
+ public String getBucket()
+ {
+ return bucket;
+ }
+
+ public String getPrefix()
+ {
+ return prefix;
+ }
+}
diff --git a/extensions-contrib/aliyun-oss-extensions/src/main/java/org/apache/druid/storage/aliyun/OssStorageDruidModule.java b/extensions-contrib/aliyun-oss-extensions/src/main/java/org/apache/druid/storage/aliyun/OssStorageDruidModule.java
new file mode 100644
index 00000000000..d682bbac823
--- /dev/null
+++ b/extensions-contrib/aliyun-oss-extensions/src/main/java/org/apache/druid/storage/aliyun/OssStorageDruidModule.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.storage.aliyun;
+
+import com.aliyun.oss.OSS;
+import com.fasterxml.jackson.core.Version;
+import com.fasterxml.jackson.databind.Module;
+import com.google.common.collect.ImmutableList;
+import com.google.inject.Binder;
+import com.google.inject.Provides;
+import com.google.inject.multibindings.MapBinder;
+import org.apache.druid.data.SearchableVersionedDataFinder;
+import org.apache.druid.data.input.aliyun.OssClientConfig;
+import org.apache.druid.guice.Binders;
+import org.apache.druid.guice.JsonConfigProvider;
+import org.apache.druid.guice.LazySingleton;
+import org.apache.druid.initialization.DruidModule;
+
+import java.util.List;
+
+public class OssStorageDruidModule implements DruidModule
+{
+ public static final String SCHEME = "oss";
+ public static final String SCHEME_ZIP = "oss_zip";
+
+ @Override
+ public List extends Module> getJacksonModules()
+ {
+ return ImmutableList.of(
+ new Module()
+ {
+ @Override
+ public String getModuleName()
+ {
+ return "DruidAliyunOss-" + System.identityHashCode(this);
+ }
+
+ @Override
+ public Version version()
+ {
+ return Version.unknownVersion();
+ }
+
+ @Override
+ public void setupModule(SetupContext context)
+ {
+ context.registerSubtypes(OssLoadSpec.class);
+ }
+ }
+ );
+ }
+
+ @Override
+ public void configure(Binder binder)
+ {
+ MapBinder.newMapBinder(binder, String.class, SearchableVersionedDataFinder.class)
+ .addBinding(SCHEME)
+ .to(OssTimestampVersionedDataFinder.class)
+ .in(LazySingleton.class);
+ Binders.dataSegmentKillerBinder(binder)
+ .addBinding(SCHEME_ZIP)
+ .to(OssDataSegmentKiller.class)
+ .in(LazySingleton.class);
+ Binders.dataSegmentMoverBinder(binder)
+ .addBinding(SCHEME_ZIP)
+ .to(OssDataSegmentMover.class)
+ .in(LazySingleton.class);
+ Binders.dataSegmentArchiverBinder(binder)
+ .addBinding(SCHEME_ZIP)
+ .to(OssDataSegmentArchiver.class)
+ .in(LazySingleton.class);
+ Binders.dataSegmentPusherBinder(binder).addBinding(SCHEME).to(OssDataSegmentPusher.class).in(LazySingleton.class);
+ JsonConfigProvider.bind(binder, "druid.oss", OssClientConfig.class);
+ JsonConfigProvider.bind(binder, "druid.storage.oss", OssInputDataConfig.class);
+ JsonConfigProvider.bind(binder, "druid.storage.oss", OssStorageConfig.class);
+ JsonConfigProvider.bind(binder, "druid.storage.oss", OssDataSegmentArchiverConfig.class);
+
+ Binders.taskLogsBinder(binder).addBinding(SCHEME).to(OssTaskLogs.class);
+ JsonConfigProvider.bind(binder, "druid.indexer.logs.oss", OssTaskLogsConfig.class);
+ binder.bind(OssTaskLogs.class).in(LazySingleton.class);
+ }
+
+ @Provides
+ @LazySingleton
+ public OSS initializeOssClient(OssClientConfig inputSourceConfig)
+ {
+ return inputSourceConfig.buildClient();
+ }
+}
diff --git a/extensions-contrib/aliyun-oss-extensions/src/main/java/org/apache/druid/storage/aliyun/OssTaskLogs.java b/extensions-contrib/aliyun-oss-extensions/src/main/java/org/apache/druid/storage/aliyun/OssTaskLogs.java
new file mode 100644
index 00000000000..515d85096e0
--- /dev/null
+++ b/extensions-contrib/aliyun-oss-extensions/src/main/java/org/apache/druid/storage/aliyun/OssTaskLogs.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.storage.aliyun;
+
+import com.aliyun.oss.OSS;
+import com.aliyun.oss.OSSException;
+import com.aliyun.oss.model.GetObjectRequest;
+import com.aliyun.oss.model.ObjectMetadata;
+import com.google.common.base.Optional;
+import com.google.common.base.Throwables;
+import com.google.common.io.ByteSource;
+import com.google.inject.Inject;
+import org.apache.druid.common.utils.CurrentTimeMillisSupplier;
+import org.apache.druid.java.util.common.IOE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.tasklogs.TaskLogs;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Collections;
+import java.util.Date;
+
+/**
+ * Provides task logs archived in aliyun OSS
+ */
+public class OssTaskLogs implements TaskLogs
+{
+ private static final Logger log = new Logger(OssTaskLogs.class);
+
+ private final OSS client;
+ private final OssTaskLogsConfig config;
+ private final OssInputDataConfig inputDataConfig;
+ private final CurrentTimeMillisSupplier timeSupplier;
+
+ @Inject
+ public OssTaskLogs(
+ OSS service,
+ OssTaskLogsConfig config,
+ OssInputDataConfig inputDataConfig,
+ CurrentTimeMillisSupplier timeSupplier
+ )
+ {
+ this.client = service;
+ this.config = config;
+ this.inputDataConfig = inputDataConfig;
+ this.timeSupplier = timeSupplier;
+ }
+
+ @Override
+ public Optional streamTaskLog(final String taskid, final long offset) throws IOException
+ {
+ final String taskKey = getTaskLogKey(taskid, "log");
+ return streamTaskFile(offset, taskKey);
+ }
+
+ @Override
+ public Optional streamTaskReports(String taskid) throws IOException
+ {
+ final String taskKey = getTaskLogKey(taskid, "report.json");
+ return streamTaskFile(0, taskKey);
+ }
+
+ private Optional streamTaskFile(final long offset, String taskKey) throws IOException
+ {
+ try {
+ final ObjectMetadata objectMetadata = client.getObjectMetadata(config.getBucket(), taskKey);
+
+ return Optional.of(
+ new ByteSource()
+ {
+ @Override
+ public InputStream openStream() throws IOException
+ {
+ try {
+ final long start;
+ final long end = objectMetadata.getContentLength() - 1;
+
+ if (offset > 0 && offset < objectMetadata.getContentLength()) {
+ start = offset;
+ } else if (offset < 0 && (-1 * offset) < objectMetadata.getContentLength()) {
+ start = objectMetadata.getContentLength() + offset;
+ } else {
+ start = 0;
+ }
+
+ final GetObjectRequest request = new GetObjectRequest(config.getBucket(), taskKey);
+ request.setMatchingETagConstraints(Collections.singletonList(objectMetadata.getETag()));
+ request.setRange(start, end);
+
+ return client.getObject(request).getObjectContent();
+ }
+ catch (OSSException e) {
+ throw new IOException(e);
+ }
+ }
+ }
+ );
+ }
+ catch (OSSException e) {
+ if ("NoSuchKey".equals(e.getErrorCode())
+ || "NoSuchBucket".equals(e.getErrorCode())) {
+ return Optional.absent();
+ } else {
+ throw new IOE(e, "Failed to stream logs from: %s", taskKey);
+ }
+ }
+ }
+
+ @Override
+ public void pushTaskLog(final String taskid, final File logFile) throws IOException
+ {
+ final String taskKey = getTaskLogKey(taskid, "log");
+ log.info("Pushing task log %s to: %s", logFile, taskKey);
+ pushTaskFile(logFile, taskKey);
+ }
+
+ @Override
+ public void pushTaskReports(String taskid, File reportFile) throws IOException
+ {
+ final String taskKey = getTaskLogKey(taskid, "report.json");
+ log.info("Pushing task reports %s to: %s", reportFile, taskKey);
+ pushTaskFile(reportFile, taskKey);
+ }
+
+ private void pushTaskFile(final File logFile, String taskKey) throws IOException
+ {
+ try {
+ OssUtils.retry(
+ () -> {
+ OssUtils.uploadFileIfPossible(client, config.getBucket(), taskKey, logFile);
+ return null;
+ }
+ );
+ }
+ catch (Exception e) {
+ Throwables.propagateIfInstanceOf(e, IOException.class);
+ throw new RuntimeException(e);
+ }
+ }
+
+ String getTaskLogKey(String taskid, String filename)
+ {
+ return StringUtils.format("%s/%s/%s", config.getPrefix(), taskid, filename);
+ }
+
+ @Override
+ public void killAll() throws IOException
+ {
+ log.info(
+ "Deleting all task logs from aliyun OSS location [bucket: '%s' prefix: '%s'].",
+ config.getBucket(),
+ config.getPrefix()
+ );
+
+ long now = timeSupplier.getAsLong();
+ killOlderThan(now);
+ }
+
+ @Override
+ public void killOlderThan(long timestamp) throws IOException
+ {
+ log.info(
+ "Deleting all task logs from aliyun OSS location [bucket: '%s' prefix: '%s'] older than %s.",
+ config.getBucket(),
+ config.getPrefix(),
+ new Date(timestamp)
+ );
+ try {
+ OssUtils.deleteObjectsInPath(
+ client,
+ inputDataConfig,
+ config.getBucket(),
+ config.getPrefix(),
+ (object) -> object.getLastModified().getTime() < timestamp
+ );
+ }
+ catch (Exception e) {
+ log.error("Error occurred while deleting task log files from aliyun OSS. Error: %s", e.getMessage());
+ throw new IOException(e);
+ }
+ }
+}
diff --git a/extensions-contrib/aliyun-oss-extensions/src/main/java/org/apache/druid/storage/aliyun/OssTaskLogsConfig.java b/extensions-contrib/aliyun-oss-extensions/src/main/java/org/apache/druid/storage/aliyun/OssTaskLogsConfig.java
new file mode 100644
index 00000000000..3a3e5c37c1b
--- /dev/null
+++ b/extensions-contrib/aliyun-oss-extensions/src/main/java/org/apache/druid/storage/aliyun/OssTaskLogsConfig.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.storage.aliyun;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.annotations.VisibleForTesting;
+
+import javax.validation.constraints.NotNull;
+
+public class OssTaskLogsConfig
+{
+ @JsonProperty
+ @NotNull
+ private String bucket = null;
+
+ @JsonProperty
+ @NotNull
+ private String prefix = null;
+
+ @JsonProperty
+ private boolean disableAcl = false;
+
+ @VisibleForTesting
+ void setDisableAcl(boolean disableAcl)
+ {
+ this.disableAcl = disableAcl;
+ }
+
+ public String getBucket()
+ {
+ return bucket;
+ }
+
+ @VisibleForTesting
+ void setBucket(String bucket)
+ {
+ this.bucket = bucket;
+ }
+
+ public String getPrefix()
+ {
+ return prefix;
+ }
+
+ @VisibleForTesting
+ void setPrefix(String prefix)
+ {
+ this.prefix = prefix;
+ }
+
+ public boolean getDisableAcl()
+ {
+ return disableAcl;
+ }
+
+}
diff --git a/extensions-contrib/aliyun-oss-extensions/src/main/java/org/apache/druid/storage/aliyun/OssTimestampVersionedDataFinder.java b/extensions-contrib/aliyun-oss-extensions/src/main/java/org/apache/druid/storage/aliyun/OssTimestampVersionedDataFinder.java
new file mode 100644
index 00000000000..0b3f708477d
--- /dev/null
+++ b/extensions-contrib/aliyun-oss-extensions/src/main/java/org/apache/druid/storage/aliyun/OssTimestampVersionedDataFinder.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.storage.aliyun;
+
+import com.aliyun.oss.OSS;
+import com.aliyun.oss.model.OSSObjectSummary;
+import com.google.inject.Inject;
+import org.apache.druid.data.SearchableVersionedDataFinder;
+import org.apache.druid.data.input.impl.CloudObjectLocation;
+import org.apache.druid.java.util.common.StringUtils;
+
+import javax.annotation.Nullable;
+import java.net.URI;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.regex.Pattern;
+
+public class OssTimestampVersionedDataFinder extends OssDataSegmentPuller implements SearchableVersionedDataFinder
+{
+ @Inject
+ public OssTimestampVersionedDataFinder(OSS client)
+ {
+ super(client);
+ }
+
+ /**
+ * Gets the key with the most recently modified timestamp.
+ * `pattern` is evaluated against the entire key AFTER the path given in `uri`.
+ * The substring `pattern` is matched against will have a leading `/` removed.
+ * For example `oss://some_bucket/some_prefix/some_key` with a URI of `oss://some_bucket/some_prefix` will match against `some_key`.
+ * `oss://some_bucket/some_prefixsome_key` with a URI of `oss://some_bucket/some_prefix` will match against `some_key`
+ * `oss://some_bucket/some_prefix//some_key` with a URI of `oss://some_bucket/some_prefix` will match against `/some_key`
+ *
+ * @param uri The URI of in the form of `oss://some_bucket/some_key`
+ * @param pattern The pattern matcher to determine if a *key* is of interest, or `null` to match everything.
+ * @return A URI to the most recently modified object which matched the pattern.
+ */
+ @Override
+ public URI getLatestVersion(final URI uri, final @Nullable Pattern pattern)
+ {
+ try {
+ final CloudObjectLocation coords = new CloudObjectLocation(OssUtils.checkURI(uri));
+ long mostRecent = Long.MIN_VALUE;
+ URI latest = null;
+ final Iterator objectSummaryIterator = OssUtils.objectSummaryIterator(
+ client,
+ Collections.singletonList(uri),
+ OssUtils.MAX_LISTING_LENGTH
+ );
+ while (objectSummaryIterator.hasNext()) {
+ final OSSObjectSummary objectSummary = objectSummaryIterator.next();
+ final CloudObjectLocation objectLocation = OssUtils.summaryToCloudObjectLocation(objectSummary);
+ // remove coords path prefix from object path
+ String keyString = StringUtils.maybeRemoveLeadingSlash(
+ objectLocation.getPath().substring(coords.getPath().length())
+ );
+ if (pattern != null && !pattern.matcher(keyString).matches()) {
+ continue;
+ }
+ final long latestModified = objectSummary.getLastModified().getTime();
+ if (latestModified >= mostRecent) {
+ mostRecent = latestModified;
+ latest = objectLocation.toUri(OssStorageDruidModule.SCHEME);
+ }
+ }
+ return latest;
+ }
+ catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+}
diff --git a/extensions-contrib/aliyun-oss-extensions/src/main/java/org/apache/druid/storage/aliyun/OssUtils.java b/extensions-contrib/aliyun-oss-extensions/src/main/java/org/apache/druid/storage/aliyun/OssUtils.java
new file mode 100644
index 00000000000..1a707c78557
--- /dev/null
+++ b/extensions-contrib/aliyun-oss-extensions/src/main/java/org/apache/druid/storage/aliyun/OssUtils.java
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.storage.aliyun;
+
+import com.aliyun.oss.OSS;
+import com.aliyun.oss.OSSException;
+import com.aliyun.oss.model.DeleteObjectsRequest;
+import com.aliyun.oss.model.ListObjectsRequest;
+import com.aliyun.oss.model.OSSObjectSummary;
+import com.aliyun.oss.model.ObjectListing;
+import com.aliyun.oss.model.PutObjectRequest;
+import com.google.common.base.Joiner;
+import com.google.common.base.Predicate;
+import com.google.common.collect.ImmutableList;
+import org.apache.druid.data.input.impl.CloudObjectLocation;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.RetryUtils;
+import org.apache.druid.java.util.common.RetryUtils.Task;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.logger.Logger;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+public class OssUtils
+{
+ private static final String SCHEME = OssStorageDruidModule.SCHEME;
+ private static final Joiner JOINER = Joiner.on("/").skipNulls();
+ private static final Logger log = new Logger(OssUtils.class);
+ public static final int MAX_LISTING_LENGTH = 1000; //limited by Aliyun OSS SDK
+
+
+ static boolean isServiceExceptionRecoverable(OSSException ex)
+ {
+ final boolean isIOException = ex.getCause() instanceof IOException;
+ final boolean isTimeout = "RequestTimeout".equals(ex.getErrorCode());
+ final boolean badStatusCode = false; //ex. == 400 || ex.getStatusCode() == 403 || ex.getStatusCode() == 404;
+ return !badStatusCode && (isIOException || isTimeout);
+ }
+
+ public static final Predicate RETRYABLE = new Predicate()
+ {
+ @Override
+ public boolean apply(Throwable e)
+ {
+ if (e == null) {
+ return false;
+ } else if (e instanceof IOException) {
+ return true;
+ } else if (e instanceof OSSException) {
+ return isServiceExceptionRecoverable((OSSException) e);
+ } else {
+ return apply(e.getCause());
+ }
+ }
+ };
+
+ /**
+ * Retries aliyun OSS operations that fail due to io-related exceptions. Service-level exceptions (access denied, file not
+ * found, etc) are not retried.
+ */
+ static T retry(Task f) throws Exception
+ {
+ return RetryUtils.retry(f, RETRYABLE, RetryUtils.DEFAULT_MAX_TRIES);
+ }
+
+ static boolean isObjectInBucketIgnoringPermission(
+ OSS client,
+ String bucketName,
+ String objectKey
+ )
+ {
+ try {
+ return client.doesObjectExist(bucketName, objectKey);
+ }
+ catch (OSSException e) {
+ if (e.getErrorCode().equals("NoSuchKey")) {
+ // Object is inaccessible to current user, but does exist.
+ return true;
+ }
+ // Something else has gone wrong
+ throw e;
+ }
+ }
+
+ /**
+ * Create an iterator over a set of aliyun OSS objects specified by a set of prefixes.
+ *
+ * For each provided prefix URI, the iterator will walk through all objects that are in the same bucket as the
+ * provided URI and whose keys start with that URI's path, except for directory placeholders (which will be
+ * ignored). The iterator is computed incrementally by calling {@link OSS#listObjects} for
+ * each prefix in batches of {@param maxListingLength}. The first call is made at the same time the iterator is
+ * constructed.
+ */
+ public static Iterator objectSummaryIterator(
+ final OSS client,
+ final Iterable prefixes,
+ final int maxListingLength
+ )
+ {
+ return new OssObjectSummaryIterator(client, prefixes, maxListingLength);
+ }
+
+ /**
+ * Create an {@link URI} from the given {@link OSSObjectSummary}. The result URI is composed as below.
+ *
+ *
+ * {@code oss://{BUCKET_NAME}/{OBJECT_KEY}}
+ *
+ */
+ public static URI summaryToUri(OSSObjectSummary object)
+ {
+ return summaryToCloudObjectLocation(object).toUri(SCHEME);
+ }
+
+ public static CloudObjectLocation summaryToCloudObjectLocation(OSSObjectSummary object)
+ {
+ return new CloudObjectLocation(object.getBucketName(), object.getKey());
+ }
+
+ static String constructSegmentPath(String baseKey, String storageDir)
+ {
+ return JOINER.join(
+ baseKey.isEmpty() ? null : baseKey,
+ storageDir
+ ) + "/index.zip";
+ }
+
+ public static String extractKey(URI uri)
+ {
+ return StringUtils.maybeRemoveLeadingSlash(uri.getPath());
+ }
+
+ public static URI checkURI(URI uri)
+ {
+ if (uri.getScheme().equalsIgnoreCase(OssStorageDruidModule.SCHEME_ZIP)) {
+ uri = URI.create(SCHEME + uri.toString().substring(OssStorageDruidModule.SCHEME_ZIP.length()));
+ }
+ return CloudObjectLocation.validateUriScheme(SCHEME, uri);
+ }
+
+ /**
+ * Gets a single {@link OSSObjectSummary} from aliyun OSS. Since this method might return a wrong object if there are multiple
+ * objects that match the given key, this method should be used only when it's guaranteed that the given key is unique
+ * in the given bucket.
+ *
+ * @param client aliyun OSS client
+ * @param bucket aliyun OSS bucket
+ * @param key unique key for the object to be retrieved
+ */
+ public static OSSObjectSummary getSingleObjectSummary(OSS client, String bucket, String key)
+ {
+ final ListObjectsRequest request = new ListObjectsRequest();
+ request.setBucketName(bucket);
+ request.setPrefix(key);
+ request.setMaxKeys(1);
+ final ObjectListing result = client.listObjects(request);
+
+ // Using getObjectSummaries().size() instead of getKeyCount as, in some cases
+ // it is observed that even though the getObjectSummaries returns some data
+ // keyCount is still zero.
+ if (result.getObjectSummaries().size() == 0) {
+ throw new ISE("Cannot find object for bucket[%s] and key[%s]", bucket, key);
+ }
+ final OSSObjectSummary objectSummary = result.getObjectSummaries().get(0);
+ if (!objectSummary.getBucketName().equals(bucket) || !objectSummary.getKey().equals(key)) {
+ throw new ISE("Wrong object[%s] for bucket[%s] and key[%s]", objectSummary, bucket, key);
+ }
+
+ return objectSummary;
+ }
+
+ /**
+ * Delete the files from aliyun OSS in a specified bucket, matching a specified prefix and filter
+ *
+ * @param client aliyun OSS client
+ * @param config specifies the configuration to use when finding matching files in aliyun OSS to delete
+ * @param bucket aliyun OSS bucket
+ * @param prefix the file prefix
+ * @param filter function which returns true if the prefix file found should be deleted and false otherwise.
+ * @throws Exception
+ */
+ public static void deleteObjectsInPath(
+ OSS client,
+ OssInputDataConfig config,
+ String bucket,
+ String prefix,
+ Predicate filter
+ )
+ throws Exception
+ {
+ final List keysToDelete = new ArrayList<>(config.getMaxListingLength());
+ final OssObjectSummaryIterator iterator = new OssObjectSummaryIterator(
+ client,
+ ImmutableList.of(new CloudObjectLocation(bucket, prefix).toUri("http")),
+ config.getMaxListingLength()
+ );
+
+ while (iterator.hasNext()) {
+ final OSSObjectSummary nextObject = iterator.next();
+ if (filter.apply(nextObject)) {
+ keysToDelete.add(nextObject.getKey());
+ if (keysToDelete.size() == config.getMaxListingLength()) {
+ deleteBucketKeys(client, bucket, keysToDelete);
+ log.info("Deleted %d files", keysToDelete.size());
+ keysToDelete.clear();
+ }
+ }
+ }
+
+ if (keysToDelete.size() > 0) {
+ deleteBucketKeys(client, bucket, keysToDelete);
+ log.info("Deleted %d files", keysToDelete.size());
+ }
+ }
+
+ private static void deleteBucketKeys(
+ OSS client,
+ String bucket,
+ List keysToDelete
+ )
+ throws Exception
+ {
+ DeleteObjectsRequest deleteRequest = new DeleteObjectsRequest(bucket).withKeys(keysToDelete);
+ OssUtils.retry(() -> {
+ client.deleteObjects(deleteRequest);
+ return null;
+ });
+ }
+
+ /**
+ * Uploads a file to aliyun OSS if possible. First trying to set ACL to give the bucket owner full control of the file before uploading.
+ *
+ * @param client aliyun OSS client
+ * @param key The key under which to store the new object.
+ * @param file The path of the file to upload to aliyun OSS.
+ */
+ static void uploadFileIfPossible(
+ OSS client,
+ String bucket,
+ String key,
+ File file
+ )
+ {
+ final PutObjectRequest putObjectRequest = new PutObjectRequest(bucket, key, file);
+
+ log.info("Pushing [%s] to bucket[%s] and key[%s].", file, bucket, key);
+ client.putObject(putObjectRequest);
+ }
+}
diff --git a/extensions-contrib/aliyun-oss-extensions/src/main/resources/META-INF/services/org.apache.druid.initialization.DruidModule b/extensions-contrib/aliyun-oss-extensions/src/main/resources/META-INF/services/org.apache.druid.initialization.DruidModule
new file mode 100644
index 00000000000..3d434e7c902
--- /dev/null
+++ b/extensions-contrib/aliyun-oss-extensions/src/main/resources/META-INF/services/org.apache.druid.initialization.DruidModule
@@ -0,0 +1,18 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.druid.storage.aliyun.OssStorageDruidModule
+org.apache.druid.firehose.aliyun.OssFirehoseDruidModule
+org.apache.druid.data.input.aliyun.OssInputSourceDruidModule
\ No newline at end of file
diff --git a/extensions-contrib/aliyun-oss-extensions/src/test/java/org/apache/druid/data/input/aliyun/OssInputSourceTest.java b/extensions-contrib/aliyun-oss-extensions/src/test/java/org/apache/druid/data/input/aliyun/OssInputSourceTest.java
new file mode 100644
index 00000000000..2bd9d5816ac
--- /dev/null
+++ b/extensions-contrib/aliyun-oss-extensions/src/test/java/org/apache/druid/data/input/aliyun/OssInputSourceTest.java
@@ -0,0 +1,660 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input.aliyun;
+
+import com.aliyun.oss.OSS;
+import com.aliyun.oss.OSSClient;
+import com.aliyun.oss.OSSException;
+import com.aliyun.oss.model.GetObjectRequest;
+import com.aliyun.oss.model.ListObjectsRequest;
+import com.aliyun.oss.model.OSSObject;
+import com.aliyun.oss.model.OSSObjectSummary;
+import com.aliyun.oss.model.ObjectListing;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.Module;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import com.fasterxml.jackson.module.guice.ObjectMapperModule;
+import com.google.common.collect.ImmutableList;
+import com.google.inject.Binder;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.Provides;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.InputRowSchema;
+import org.apache.druid.data.input.InputSourceReader;
+import org.apache.druid.data.input.InputSplit;
+import org.apache.druid.data.input.MaxSizeSplitHintSpec;
+import org.apache.druid.data.input.impl.CloudObjectLocation;
+import org.apache.druid.data.input.impl.CsvInputFormat;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.JsonInputFormat;
+import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.initialization.DruidModule;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.parsers.CloseableIterator;
+import org.apache.druid.java.util.common.parsers.JSONPathSpec;
+import org.apache.druid.metadata.DefaultPasswordProvider;
+import org.apache.druid.storage.aliyun.OssInputDataConfig;
+import org.apache.druid.storage.aliyun.OssUtils;
+import org.apache.druid.testing.InitializedNullHandlingTest;
+import org.apache.druid.utils.CompressionUtils;
+import org.easymock.EasyMock;
+import org.easymock.IArgumentMatcher;
+import org.hamcrest.CoreMatchers;
+import org.joda.time.DateTime;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.internal.matchers.ThrowableMessageMatcher;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.net.URI;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class OssInputSourceTest extends InitializedNullHandlingTest
+{
+ private static final ObjectMapper MAPPER = createObjectMapper();
+ private static final OSS OSSCLIENT = EasyMock.createMock(OSSClient.class);
+ private static final OssInputDataConfig INPUT_DATA_CONFIG;
+ private static final int MAX_LISTING_LENGTH = 10;
+
+ private static final List EXPECTED_URIS = Arrays.asList(
+ URI.create("oss://foo/bar/file.csv"),
+ URI.create("oss://bar/foo/file2.csv")
+ );
+
+ private static final List EXPECTED_COMPRESSED_URIS = Arrays.asList(
+ URI.create("oss://foo/bar/file.csv.gz"),
+ URI.create("oss://bar/foo/file2.csv.gz")
+ );
+
+ private static final List> EXPECTED_COORDS =
+ EXPECTED_URIS.stream()
+ .map(uri -> Collections.singletonList(new CloudObjectLocation(uri)))
+ .collect(Collectors.toList());
+
+ private static final List PREFIXES = Arrays.asList(
+ URI.create("oss://foo/bar"),
+ URI.create("oss://bar/foo")
+ );
+
+ private static final OssClientConfig CLOUD_CONFIG_PROPERTIES = new OssClientConfig(
+ "test.oss-cn.aliyun.com",
+ new DefaultPasswordProvider("myKey"),
+ new DefaultPasswordProvider("mySecret"));
+
+ private static final List EXPECTED_LOCATION =
+ ImmutableList.of(new CloudObjectLocation("foo", "bar/file.csv"));
+
+ private static final DateTime NOW = DateTimes.nowUtc();
+ private static final byte[] CONTENT =
+ StringUtils.toUtf8(StringUtils.format("%d,hello,world", NOW.getMillis()));
+
+ static {
+ INPUT_DATA_CONFIG = new OssInputDataConfig();
+ INPUT_DATA_CONFIG.setMaxListingLength(MAX_LISTING_LENGTH);
+ }
+
+ @Rule
+ public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+ @Rule
+ public ExpectedException expectedException = ExpectedException.none();
+
+ @Test
+ public void testSerdeWithUris() throws Exception
+ {
+ final OssInputSource withUris = new OssInputSource(
+ OSSCLIENT,
+ INPUT_DATA_CONFIG,
+ EXPECTED_URIS,
+ null,
+ null,
+ null
+ );
+ final OssInputSource serdeWithUris = MAPPER.readValue(MAPPER.writeValueAsString(withUris), OssInputSource.class);
+ Assert.assertEquals(withUris, serdeWithUris);
+ }
+
+ @Test
+ public void testSerdeWithPrefixes() throws Exception
+ {
+ final OssInputSource withPrefixes = new OssInputSource(
+ OSSCLIENT,
+ INPUT_DATA_CONFIG,
+ null,
+ PREFIXES,
+ null,
+ null
+ );
+ final OssInputSource serdeWithPrefixes =
+ MAPPER.readValue(MAPPER.writeValueAsString(withPrefixes), OssInputSource.class);
+ Assert.assertEquals(withPrefixes, serdeWithPrefixes);
+ }
+
+ @Test
+ public void testSerdeWithObjects() throws Exception
+ {
+ final OssInputSource withPrefixes = new OssInputSource(
+ OSSCLIENT,
+ INPUT_DATA_CONFIG,
+ null,
+ null,
+ EXPECTED_LOCATION,
+ null
+ );
+ final OssInputSource serdeWithPrefixes =
+ MAPPER.readValue(MAPPER.writeValueAsString(withPrefixes), OssInputSource.class);
+ Assert.assertEquals(withPrefixes, serdeWithPrefixes);
+ }
+
+ @Test
+ public void testInputSourceUseDefaultPasswordWhenCloudConfigPropertiesWithoutCrediential()
+ {
+ OssClientConfig mockConfigPropertiesWithoutKeyAndSecret = EasyMock.createMock(OssClientConfig.class);
+ EasyMock.reset(mockConfigPropertiesWithoutKeyAndSecret);
+ EasyMock.expect(mockConfigPropertiesWithoutKeyAndSecret.isCredentialsConfigured())
+ .andStubReturn(false);
+ EasyMock.expect(mockConfigPropertiesWithoutKeyAndSecret.buildClient())
+ .andReturn(OSSCLIENT);
+ EasyMock.replay(mockConfigPropertiesWithoutKeyAndSecret);
+ final OssInputSource withPrefixes = new OssInputSource(
+ OSSCLIENT,
+ INPUT_DATA_CONFIG,
+ null,
+ null,
+ EXPECTED_LOCATION,
+ mockConfigPropertiesWithoutKeyAndSecret
+ );
+ Assert.assertNotNull(withPrefixes);
+
+ withPrefixes.createEntity(new CloudObjectLocation("bucket", "path"));
+ EasyMock.verify(mockConfigPropertiesWithoutKeyAndSecret);
+ }
+
+ @Test
+ public void testSerdeOssClientLazyInitializedWithCrediential() throws Exception
+ {
+ OssClientConfig clientConfig = EasyMock.createMock(OssClientConfig.class);
+ EasyMock.replay(clientConfig);
+ final OssInputSource withPrefixes = new OssInputSource(
+ OSSCLIENT,
+ INPUT_DATA_CONFIG,
+ null,
+ null,
+ EXPECTED_LOCATION,
+ CLOUD_CONFIG_PROPERTIES
+ );
+ final OssInputSource serdeWithPrefixes =
+ MAPPER.readValue(MAPPER.writeValueAsString(withPrefixes), OssInputSource.class);
+ Assert.assertEquals(withPrefixes, serdeWithPrefixes);
+ EasyMock.verify(clientConfig);
+ }
+
+ @Test
+ public void testSerdeOssClientLazyInitializedWithoutCrediential() throws Exception
+ {
+ OssClientConfig clientConfig = EasyMock.createMock(OssClientConfig.class);
+ EasyMock.replay(clientConfig);
+ final OssInputSource withPrefixes = new OssInputSource(
+ OSSCLIENT,
+ INPUT_DATA_CONFIG,
+ null,
+ null,
+ EXPECTED_LOCATION,
+ null
+ );
+ final OssInputSource serdeWithPrefixes =
+ MAPPER.readValue(MAPPER.writeValueAsString(withPrefixes), OssInputSource.class);
+ Assert.assertEquals(withPrefixes, serdeWithPrefixes);
+ EasyMock.verify(clientConfig);
+ }
+
+ @Test
+ public void testSerdeWithExtraEmptyLists() throws Exception
+ {
+ final OssInputSource withPrefixes = new OssInputSource(
+ OSSCLIENT,
+ INPUT_DATA_CONFIG,
+ ImmutableList.of(),
+ ImmutableList.of(),
+ EXPECTED_LOCATION,
+ null
+ );
+ final OssInputSource serdeWithPrefixes =
+ MAPPER.readValue(MAPPER.writeValueAsString(withPrefixes), OssInputSource.class);
+ Assert.assertEquals(withPrefixes, serdeWithPrefixes);
+ }
+
+ @Test
+ public void testSerdeWithInvalidArgs()
+ {
+ expectedException.expect(IllegalArgumentException.class);
+ // constructor will explode
+ new OssInputSource(
+ OSSCLIENT,
+ INPUT_DATA_CONFIG,
+ EXPECTED_URIS,
+ PREFIXES,
+ EXPECTED_LOCATION,
+ null
+ );
+ }
+
+ @Test
+ public void testSerdeWithOtherInvalidArgs()
+ {
+ expectedException.expect(IllegalArgumentException.class);
+ // constructor will explode
+ new OssInputSource(
+ OSSCLIENT,
+ INPUT_DATA_CONFIG,
+ EXPECTED_URIS,
+ PREFIXES,
+ ImmutableList.of(),
+ null
+ );
+ }
+
+ @Test
+ public void testSerdeWithOtherOtherInvalidArgs()
+ {
+ expectedException.expect(IllegalArgumentException.class);
+ // constructor will explode
+ new OssInputSource(
+ OSSCLIENT,
+ INPUT_DATA_CONFIG,
+ ImmutableList.of(),
+ PREFIXES,
+ EXPECTED_LOCATION,
+ null
+ );
+ }
+
+ @Test
+ public void testWithUrisSplit()
+ {
+ OssInputSource inputSource = new OssInputSource(
+ OSSCLIENT,
+ INPUT_DATA_CONFIG,
+ EXPECTED_URIS,
+ null,
+ null,
+ null
+ );
+
+ Stream>> splits = inputSource.createSplits(
+ new JsonInputFormat(JSONPathSpec.DEFAULT, null, null),
+ null
+ );
+
+ Assert.assertEquals(EXPECTED_COORDS, splits.map(InputSplit::get).collect(Collectors.toList()));
+ }
+
+ @Test
+ public void testWithPrefixesSplit()
+ {
+ EasyMock.reset(OSSCLIENT);
+ expectListObjects(PREFIXES.get(0), ImmutableList.of(EXPECTED_URIS.get(0)), CONTENT);
+ expectListObjects(PREFIXES.get(1), ImmutableList.of(EXPECTED_URIS.get(1)), CONTENT);
+ EasyMock.replay(OSSCLIENT);
+
+ OssInputSource inputSource = new OssInputSource(
+ OSSCLIENT,
+ INPUT_DATA_CONFIG,
+ null,
+ PREFIXES,
+ null,
+ null
+ );
+
+ Stream>> splits = inputSource.createSplits(
+ new JsonInputFormat(JSONPathSpec.DEFAULT, null, null),
+ new MaxSizeSplitHintSpec(1L) // set maxSplitSize to 1 so that each inputSplit has only one object
+ );
+
+ Assert.assertEquals(EXPECTED_COORDS, splits.map(InputSplit::get).collect(Collectors.toList()));
+ EasyMock.verify(OSSCLIENT);
+ }
+
+ @Test
+ public void testCreateSplitsWithSplitHintSpecRespectingHint()
+ {
+ EasyMock.reset(OSSCLIENT);
+ expectListObjects(PREFIXES.get(0), ImmutableList.of(EXPECTED_URIS.get(0)), CONTENT);
+ expectListObjects(PREFIXES.get(1), ImmutableList.of(EXPECTED_URIS.get(1)), CONTENT);
+ EasyMock.replay(OSSCLIENT);
+
+ OssInputSource inputSource = new OssInputSource(
+ OSSCLIENT,
+ INPUT_DATA_CONFIG,
+ null,
+ PREFIXES,
+ null,
+ null
+ );
+
+ Stream>> splits = inputSource.createSplits(
+ new JsonInputFormat(JSONPathSpec.DEFAULT, null, null),
+ new MaxSizeSplitHintSpec(CONTENT.length * 3L)
+ );
+
+ Assert.assertEquals(
+ ImmutableList.of(EXPECTED_URIS.stream().map(CloudObjectLocation::new).collect(Collectors.toList())),
+ splits.map(InputSplit::get).collect(Collectors.toList())
+ );
+ EasyMock.verify(OSSCLIENT);
+ }
+
+ @Test
+ public void testCreateSplitsWithEmptyObjectsIteratingOnlyNonEmptyObjects()
+ {
+ EasyMock.reset(OSSCLIENT);
+ expectListObjects(PREFIXES.get(0), ImmutableList.of(EXPECTED_URIS.get(0)), CONTENT);
+ expectListObjects(PREFIXES.get(1), ImmutableList.of(EXPECTED_URIS.get(1)), new byte[0]);
+ EasyMock.replay(OSSCLIENT);
+
+ OssInputSource inputSource = new OssInputSource(
+ OSSCLIENT,
+ INPUT_DATA_CONFIG,
+ null,
+ PREFIXES,
+ null,
+ null
+ );
+
+ Stream>> splits = inputSource.createSplits(
+ new JsonInputFormat(JSONPathSpec.DEFAULT, null, null),
+ null
+ );
+ Assert.assertEquals(
+ ImmutableList.of(ImmutableList.of(new CloudObjectLocation(EXPECTED_URIS.get(0)))),
+ splits.map(InputSplit::get).collect(Collectors.toList())
+ );
+ EasyMock.verify(OSSCLIENT);
+ }
+
+ @Test
+ public void testAccessDeniedWhileListingPrefix()
+ {
+ EasyMock.reset(OSSCLIENT);
+ expectListObjects(PREFIXES.get(0), ImmutableList.of(EXPECTED_URIS.get(0)), CONTENT);
+ expectListObjectsAndThrowAccessDenied(EXPECTED_URIS.get(1));
+ EasyMock.replay(OSSCLIENT);
+
+ OssInputSource inputSource = new OssInputSource(
+ OSSCLIENT,
+ INPUT_DATA_CONFIG,
+ null,
+ ImmutableList.of(PREFIXES.get(0), EXPECTED_URIS.get(1)),
+ null,
+ null
+ );
+
+ expectedException.expectMessage("Failed to get object summaries from aliyun OSS bucket[bar], prefix[foo/file2.csv]");
+ expectedException.expectCause(
+ ThrowableMessageMatcher.hasMessage(CoreMatchers.containsString("can't list that bucket"))
+ );
+
+ inputSource.createSplits(
+ new JsonInputFormat(JSONPathSpec.DEFAULT, null, null),
+ null
+ ).collect(Collectors.toList());
+ }
+
+ @Test
+ public void testReader() throws IOException
+ {
+ EasyMock.reset(OSSCLIENT);
+ expectListObjects(PREFIXES.get(0), ImmutableList.of(EXPECTED_URIS.get(0)), CONTENT);
+ expectListObjects(EXPECTED_URIS.get(1), ImmutableList.of(EXPECTED_URIS.get(1)), CONTENT);
+ expectGetObject(EXPECTED_URIS.get(0));
+ expectGetObject(EXPECTED_URIS.get(1));
+ EasyMock.replay(OSSCLIENT);
+
+ OssInputSource inputSource = new OssInputSource(
+ OSSCLIENT,
+ INPUT_DATA_CONFIG,
+ null,
+ ImmutableList.of(PREFIXES.get(0), EXPECTED_URIS.get(1)),
+ null,
+ null
+ );
+
+ InputRowSchema someSchema = new InputRowSchema(
+ new TimestampSpec("time", "auto", null),
+ new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("dim1", "dim2"))),
+ ImmutableList.of("count")
+ );
+
+ InputSourceReader reader = inputSource.reader(
+ someSchema,
+ new CsvInputFormat(ImmutableList.of("time", "dim1", "dim2"), "|", false, null, 0),
+ temporaryFolder.newFolder()
+ );
+
+ CloseableIterator iterator = reader.read();
+
+ while (iterator.hasNext()) {
+ InputRow nextRow = iterator.next();
+ Assert.assertEquals(NOW, nextRow.getTimestamp());
+ Assert.assertEquals("hello", nextRow.getDimension("dim1").get(0));
+ Assert.assertEquals("world", nextRow.getDimension("dim2").get(0));
+ }
+
+ EasyMock.verify(OSSCLIENT);
+ }
+
+ @Test
+ public void testCompressedReader() throws IOException
+ {
+ EasyMock.reset(OSSCLIENT);
+ expectListObjects(PREFIXES.get(0), ImmutableList.of(EXPECTED_COMPRESSED_URIS.get(0)), CONTENT);
+ expectListObjects(EXPECTED_COMPRESSED_URIS.get(1), ImmutableList.of(EXPECTED_COMPRESSED_URIS.get(1)), CONTENT);
+ expectGetObjectCompressed(EXPECTED_COMPRESSED_URIS.get(0));
+ expectGetObjectCompressed(EXPECTED_COMPRESSED_URIS.get(1));
+ EasyMock.replay(OSSCLIENT);
+
+ OssInputSource inputSource = new OssInputSource(
+ OSSCLIENT,
+ INPUT_DATA_CONFIG,
+ null,
+ ImmutableList.of(PREFIXES.get(0), EXPECTED_COMPRESSED_URIS.get(1)),
+ null,
+ null
+ );
+
+ InputRowSchema someSchema = new InputRowSchema(
+ new TimestampSpec("time", "auto", null),
+ new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("dim1", "dim2"))),
+ ImmutableList.of("count")
+ );
+
+ InputSourceReader reader = inputSource.reader(
+ someSchema,
+ new CsvInputFormat(ImmutableList.of("time", "dim1", "dim2"), "|", false, null, 0),
+ temporaryFolder.newFolder()
+ );
+
+ CloseableIterator iterator = reader.read();
+
+ while (iterator.hasNext()) {
+ InputRow nextRow = iterator.next();
+ Assert.assertEquals(NOW, nextRow.getTimestamp());
+ Assert.assertEquals("hello", nextRow.getDimension("dim1").get(0));
+ Assert.assertEquals("world", nextRow.getDimension("dim2").get(0));
+ }
+
+ EasyMock.verify(OSSCLIENT);
+ }
+
+ private static void expectListObjects(URI prefix, List uris, byte[] content)
+ {
+ final ObjectListing result = new ObjectListing();
+ result.setBucketName(prefix.getAuthority());
+ result.setMaxKeys(uris.size());
+ for (URI uri : uris) {
+ final String bucket = uri.getAuthority();
+ final String key = OssUtils.extractKey(uri);
+ final OSSObjectSummary objectSummary = new OSSObjectSummary();
+ objectSummary.setBucketName(bucket);
+ objectSummary.setKey(key);
+ objectSummary.setSize(content.length);
+ result.getObjectSummaries().add(objectSummary);
+ }
+
+ EasyMock.expect(
+ OSSCLIENT.listObjects(matchListObjectsRequest(prefix))
+ ).andReturn(result).once();
+ }
+
+ private static void expectListObjectsAndThrowAccessDenied(final URI prefix)
+ {
+ OSSException boom = new OSSException("oh dang, you can't list that bucket friend");
+ boom.setRawResponseError("403");
+ EasyMock.expect(
+ OSSCLIENT.listObjects(matchListObjectsRequest(prefix))
+ ).andThrow(boom).once();
+ }
+
+ private static void expectGetObject(URI uri)
+ {
+ final String bucket = uri.getAuthority();
+ final String key = OssUtils.extractKey(uri);
+
+ OSSObject someObject = new OSSObject();
+ someObject.setBucketName(bucket);
+ someObject.setKey(key);
+ someObject.setObjectContent(new ByteArrayInputStream(CONTENT));
+ EasyMock.expect(OSSCLIENT.getObject(EasyMock.anyObject(GetObjectRequest.class))).andReturn(someObject).once();
+ }
+
+ private static void expectGetObjectCompressed(URI uri) throws IOException
+ {
+ final String bucket = uri.getAuthority();
+ final String key = OssUtils.extractKey(uri);
+
+ OSSObject someObject = new OSSObject();
+ someObject.setBucketName(bucket);
+ someObject.setKey(key);
+ ByteArrayOutputStream gzipped = new ByteArrayOutputStream();
+ CompressionUtils.gzip(new ByteArrayInputStream(CONTENT), gzipped);
+ someObject.setObjectContent(new ByteArrayInputStream(gzipped.toByteArray()));
+ EasyMock.expect(OSSCLIENT.getObject(EasyMock.anyObject(GetObjectRequest.class))).andReturn(someObject).once();
+ }
+
+ private static ListObjectsRequest matchListObjectsRequest(final URI prefixUri)
+ {
+ // Use an IArgumentMatcher to verify that the request has the correct bucket and prefix.
+ EasyMock.reportMatcher(
+ new IArgumentMatcher()
+ {
+ @Override
+ public boolean matches(Object argument)
+ {
+ if (!(argument instanceof ListObjectsRequest)) {
+ return false;
+ }
+
+ final ListObjectsRequest request = (ListObjectsRequest) argument;
+ return prefixUri.getAuthority().equals(request.getBucketName())
+ && OssUtils.extractKey(prefixUri).equals(request.getPrefix());
+ }
+
+ @Override
+ public void appendTo(StringBuffer buffer)
+ {
+ buffer.append("");
+ }
+ }
+ );
+
+ return null;
+ }
+
+ public static ObjectMapper createObjectMapper()
+ {
+ DruidModule baseModule = new TestOssModule();
+ final Injector injector = Guice.createInjector(
+ new ObjectMapperModule(),
+ baseModule
+ );
+ final ObjectMapper baseMapper = injector.getInstance(ObjectMapper.class);
+
+ baseModule.getJacksonModules().forEach(baseMapper::registerModule);
+ return baseMapper;
+ }
+
+ public static class TestOssModule implements DruidModule
+ {
+ @Override
+ public List extends Module> getJacksonModules()
+ {
+ // Deserializer is need for OSS even though it is injected.
+ // See https://github.com/FasterXML/jackson-databind/issues/962.
+ return ImmutableList.of(
+ new SimpleModule()
+ .addDeserializer(OSS.class, new ItemDeserializer())
+ );
+ }
+
+ @Override
+ public void configure(Binder binder)
+ {
+ }
+
+ @Provides
+ public OSS getOssClient()
+ {
+ return OSSCLIENT;
+ }
+ }
+
+ public static class ItemDeserializer extends StdDeserializer
+ {
+ ItemDeserializer()
+ {
+ this(null);
+ }
+
+ ItemDeserializer(Class> vc)
+ {
+ super(vc);
+ }
+
+ @Override
+ public T deserialize(JsonParser jp, DeserializationContext ctxt)
+ {
+ throw new UnsupportedOperationException();
+ }
+ }
+}
diff --git a/extensions-contrib/aliyun-oss-extensions/src/test/java/org/apache/druid/storage/aliyun/OssDataSegmentArchiverTest.java b/extensions-contrib/aliyun-oss-extensions/src/test/java/org/apache/druid/storage/aliyun/OssDataSegmentArchiverTest.java
new file mode 100644
index 00000000000..02a88ec885c
--- /dev/null
+++ b/extensions-contrib/aliyun-oss-extensions/src/test/java/org/apache/druid/storage/aliyun/OssDataSegmentArchiverTest.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.storage.aliyun;
+
+import com.aliyun.oss.OSS;
+import com.aliyun.oss.OSSClient;
+import com.fasterxml.jackson.databind.BeanProperty;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.InjectableValues;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.apache.druid.jackson.DefaultObjectMapper;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.timeline.DataSegment;
+import org.easymock.EasyMock;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.Map;
+
+public class OssDataSegmentArchiverTest
+{
+ private static final ObjectMapper MAPPER = new DefaultObjectMapper()
+ .setInjectableValues(
+ new InjectableValues()
+ {
+ @Override
+ public Object findInjectableValue(
+ Object valueId,
+ DeserializationContext ctxt,
+ BeanProperty forProperty,
+ Object beanInstance
+ )
+ {
+ return PULLER;
+ }
+ }
+ )
+ .registerModule(new SimpleModule("aliyun-oss-archive-test-module").registerSubtypes(OssLoadSpec.class));
+ private static final OssDataSegmentArchiverConfig ARCHIVER_CONFIG = new OssDataSegmentArchiverConfig()
+ {
+ @Override
+ public String getArchiveBucket()
+ {
+ return "archive_bucket";
+ }
+
+ @Override
+ public String getArchiveBaseKey()
+ {
+ return "archive_base_key";
+ }
+ };
+ private static final OssStorageConfig PUSHER_CONFIG = new OssStorageConfig();
+ private static final OSS OSS_CLIENT = EasyMock.createStrictMock(OSSClient.class);
+ private static final OssDataSegmentPuller PULLER = new OssDataSegmentPuller(OSS_CLIENT);
+ private static final DataSegment SOURCE_SEGMENT = DataSegment
+ .builder()
+ .binaryVersion(1)
+ .dataSource("dataSource")
+ .dimensions(ImmutableList.of())
+ .interval(Intervals.of("2015/2016"))
+ .version("version")
+ .loadSpec(ImmutableMap.of(
+ "type",
+ OssStorageDruidModule.SCHEME_ZIP,
+ OssDataSegmentPuller.BUCKET,
+ "source_bucket",
+ OssDataSegmentPuller.KEY,
+ "source_key"
+ ))
+ .size(0)
+ .build();
+
+ @BeforeClass
+ public static void setUpStatic()
+ {
+ PUSHER_CONFIG.setPrefix("push_base");
+ PUSHER_CONFIG.setBucket("push_bucket");
+ }
+
+ @Test
+ public void testSimpleArchive() throws Exception
+ {
+ final DataSegment archivedSegment = SOURCE_SEGMENT
+ .withLoadSpec(ImmutableMap.of(
+ "type",
+ OssStorageDruidModule.SCHEME_ZIP,
+ OssDataSegmentPuller.BUCKET,
+ ARCHIVER_CONFIG.getArchiveBucket(),
+ OssDataSegmentPuller.KEY,
+ ARCHIVER_CONFIG.getArchiveBaseKey() + "archived"
+ ));
+ final OssDataSegmentArchiver archiver = new OssDataSegmentArchiver(
+ MAPPER,
+ OSS_CLIENT,
+ ARCHIVER_CONFIG,
+ PUSHER_CONFIG
+ )
+ {
+ @Override
+ public DataSegment move(DataSegment segment, Map targetLoadSpec)
+ {
+ return archivedSegment;
+ }
+ };
+ Assert.assertEquals(archivedSegment, archiver.archive(SOURCE_SEGMENT));
+ }
+
+ @Test
+ public void testSimpleArchiveDoesntMove() throws Exception
+ {
+ final OssDataSegmentArchiver archiver = new OssDataSegmentArchiver(
+ MAPPER,
+ OSS_CLIENT,
+ ARCHIVER_CONFIG,
+ PUSHER_CONFIG
+ )
+ {
+ @Override
+ public DataSegment move(DataSegment segment, Map targetLoadSpec)
+ {
+ return SOURCE_SEGMENT;
+ }
+ };
+ Assert.assertNull(archiver.archive(SOURCE_SEGMENT));
+ }
+
+ @Test
+ public void testSimpleRestore() throws Exception
+ {
+ final DataSegment archivedSegment = SOURCE_SEGMENT
+ .withLoadSpec(ImmutableMap.of(
+ "type",
+ OssStorageDruidModule.SCHEME_ZIP,
+ OssDataSegmentPuller.BUCKET,
+ ARCHIVER_CONFIG.getArchiveBucket(),
+ OssDataSegmentPuller.KEY,
+ ARCHIVER_CONFIG.getArchiveBaseKey() + "archived"
+ ));
+ final OssDataSegmentArchiver archiver = new OssDataSegmentArchiver(
+ MAPPER,
+ OSS_CLIENT,
+ ARCHIVER_CONFIG,
+ PUSHER_CONFIG
+ )
+ {
+ @Override
+ public DataSegment move(DataSegment segment, Map targetLoadSpec)
+ {
+ return archivedSegment;
+ }
+ };
+ Assert.assertEquals(archivedSegment, archiver.restore(SOURCE_SEGMENT));
+ }
+
+ @Test
+ public void testSimpleRestoreDoesntMove() throws Exception
+ {
+ final OssDataSegmentArchiver archiver = new OssDataSegmentArchiver(
+ MAPPER,
+ OSS_CLIENT,
+ ARCHIVER_CONFIG,
+ PUSHER_CONFIG
+ )
+ {
+ @Override
+ public DataSegment move(DataSegment segment, Map targetLoadSpec)
+ {
+ return SOURCE_SEGMENT;
+ }
+ };
+ Assert.assertNull(archiver.restore(SOURCE_SEGMENT));
+ }
+}
diff --git a/extensions-contrib/aliyun-oss-extensions/src/test/java/org/apache/druid/storage/aliyun/OssDataSegmentKillerTest.java b/extensions-contrib/aliyun-oss-extensions/src/test/java/org/apache/druid/storage/aliyun/OssDataSegmentKillerTest.java
new file mode 100644
index 00000000000..638348379a2
--- /dev/null
+++ b/extensions-contrib/aliyun-oss-extensions/src/test/java/org/apache/druid/storage/aliyun/OssDataSegmentKillerTest.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.storage.aliyun;
+
+import com.aliyun.oss.ClientException;
+import com.aliyun.oss.OSS;
+import com.aliyun.oss.model.DeleteObjectsRequest;
+import com.aliyun.oss.model.OSSObjectSummary;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.easymock.EasyMock;
+import org.easymock.EasyMockRunner;
+import org.easymock.EasyMockSupport;
+import org.easymock.Mock;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Collections;
+
+@RunWith(EasyMockRunner.class)
+public class OssDataSegmentKillerTest extends EasyMockSupport
+{
+ private static final String KEY_1 = "key1";
+ private static final String KEY_2 = "key2";
+ private static final String TEST_BUCKET = "test_bucket";
+ private static final String TEST_PREFIX = "test_prefix";
+ private static final URI PREFIX_URI = URI.create(StringUtils.format(OssStorageDruidModule.SCHEME + "://%s/%s", TEST_BUCKET, TEST_PREFIX));
+ private static final long TIME_0 = 0L;
+ private static final long TIME_1 = 1L;
+ private static final int MAX_KEYS = 1;
+ private static final Exception RECOVERABLE_EXCEPTION = new ClientException(new IOException("mocked by test case"));
+ private static final Exception NON_RECOVERABLE_EXCEPTION = new ClientException(new NullPointerException("mocked by test case"));
+
+ @Mock
+ private OSS client;
+ @Mock
+ private OssStorageConfig segmentPusherConfig;
+ @Mock
+ private OssInputDataConfig inputDataConfig;
+
+ private OssDataSegmentKiller segmentKiller;
+
+ @Test
+ public void test_killAll_accountConfigWithNullBucketAndBaseKey_throwsISEException() throws IOException
+ {
+ EasyMock.expect(segmentPusherConfig.getBucket()).andReturn(null);
+ EasyMock.expectLastCall().atLeastOnce();
+ EasyMock.expect(segmentPusherConfig.getPrefix()).andReturn(null);
+ EasyMock.expectLastCall().anyTimes();
+
+ boolean thrownISEException = false;
+
+ try {
+
+ EasyMock.replay(client, segmentPusherConfig, inputDataConfig);
+
+ segmentKiller = new OssDataSegmentKiller(client, segmentPusherConfig, inputDataConfig);
+ segmentKiller.killAll();
+ }
+ catch (ISE e) {
+ thrownISEException = true;
+ }
+ Assert.assertTrue(thrownISEException);
+ EasyMock.verify(client, segmentPusherConfig, inputDataConfig);
+ }
+
+ @Test
+ public void test_killAll_noException_deletesAllSegments() throws IOException
+ {
+ OSSObjectSummary objectSummary1 = OssTestUtils.newOSSObjectSummary(TEST_BUCKET, KEY_1, TIME_0);
+ OSSObjectSummary objectSummary2 = OssTestUtils.newOSSObjectSummary(TEST_BUCKET, KEY_2, TIME_1);
+
+ OssTestUtils.expectListObjects(
+ client,
+ PREFIX_URI,
+ ImmutableList.of(objectSummary1, objectSummary2)
+ );
+
+ DeleteObjectsRequest deleteRequest1 = new DeleteObjectsRequest(TEST_BUCKET);
+ deleteRequest1.setKeys(Collections.singletonList(KEY_1));
+ DeleteObjectsRequest deleteRequest2 = new DeleteObjectsRequest(TEST_BUCKET);
+ deleteRequest2.setKeys(Collections.singletonList(KEY_2));
+
+ OssTestUtils.mockClientDeleteObjects(
+ client,
+ ImmutableList.of(deleteRequest1, deleteRequest2),
+ ImmutableMap.of()
+ );
+
+ EasyMock.expect(segmentPusherConfig.getBucket()).andReturn(TEST_BUCKET);
+ EasyMock.expectLastCall().anyTimes();
+ EasyMock.expect(segmentPusherConfig.getPrefix()).andReturn(TEST_PREFIX);
+ EasyMock.expectLastCall().anyTimes();
+
+ EasyMock.expect(inputDataConfig.getMaxListingLength()).andReturn(MAX_KEYS);
+ EasyMock.expectLastCall().anyTimes();
+
+ EasyMock.replay(client, segmentPusherConfig, inputDataConfig);
+
+ segmentKiller = new OssDataSegmentKiller(client, segmentPusherConfig, inputDataConfig);
+ segmentKiller.killAll();
+ EasyMock.verify(client, segmentPusherConfig, inputDataConfig);
+ }
+
+ @Test
+ public void test_killAll_recoverableExceptionWhenListingObjects_deletesAllSegments() throws IOException
+ {
+ OSSObjectSummary objectSummary1 = OssTestUtils.newOSSObjectSummary(TEST_BUCKET, KEY_1, TIME_0);
+
+ OssTestUtils.expectListObjects(
+ client,
+ PREFIX_URI,
+ ImmutableList.of(objectSummary1)
+ );
+
+ DeleteObjectsRequest deleteRequest1 = new DeleteObjectsRequest(TEST_BUCKET);
+ deleteRequest1.setKeys(Collections.singletonList(KEY_1));
+
+ OssTestUtils.mockClientDeleteObjects(
+ client,
+ ImmutableList.of(deleteRequest1),
+ ImmutableMap.of(deleteRequest1, RECOVERABLE_EXCEPTION)
+ );
+
+ EasyMock.expect(segmentPusherConfig.getBucket()).andReturn(TEST_BUCKET);
+ EasyMock.expectLastCall().anyTimes();
+ EasyMock.expect(segmentPusherConfig.getPrefix()).andReturn(TEST_PREFIX);
+ EasyMock.expectLastCall().anyTimes();
+
+ EasyMock.expect(inputDataConfig.getMaxListingLength()).andReturn(MAX_KEYS);
+ EasyMock.expectLastCall().anyTimes();
+
+ EasyMock.replay(client, segmentPusherConfig, inputDataConfig);
+
+ segmentKiller = new OssDataSegmentKiller(client, segmentPusherConfig, inputDataConfig);
+ segmentKiller.killAll();
+ EasyMock.verify(client, segmentPusherConfig, inputDataConfig);
+ }
+
+ @Test
+ public void test_killAll_nonrecoverableExceptionWhenListingObjects_deletesAllSegments()
+ {
+ boolean ioExceptionThrown = false;
+ try {
+ OSSObjectSummary objectSummary1 = OssTestUtils.newOSSObjectSummary(TEST_BUCKET, KEY_1, TIME_0);
+
+ OssTestUtils.expectListObjects(
+ client,
+ PREFIX_URI,
+ ImmutableList.of(objectSummary1)
+ );
+
+ DeleteObjectsRequest deleteRequest1 = new DeleteObjectsRequest(TEST_BUCKET);
+ deleteRequest1.withKeys(ImmutableList.of(KEY_1));
+
+ OssTestUtils.mockClientDeleteObjects(
+ client,
+ ImmutableList.of(),
+ ImmutableMap.of(deleteRequest1, NON_RECOVERABLE_EXCEPTION)
+ );
+
+
+ EasyMock.expect(segmentPusherConfig.getBucket()).andReturn(TEST_BUCKET);
+ EasyMock.expectLastCall().anyTimes();
+ EasyMock.expect(segmentPusherConfig.getPrefix()).andReturn(TEST_PREFIX);
+ EasyMock.expectLastCall().anyTimes();
+
+ EasyMock.expect(inputDataConfig.getMaxListingLength()).andReturn(MAX_KEYS);
+ EasyMock.expectLastCall().anyTimes();
+
+ EasyMock.replay(client, segmentPusherConfig, inputDataConfig);
+
+ segmentKiller = new OssDataSegmentKiller(client, segmentPusherConfig, inputDataConfig);
+ segmentKiller.killAll();
+ }
+ catch (IOException e) {
+ ioExceptionThrown = true;
+ }
+
+ Assert.assertTrue(ioExceptionThrown);
+ EasyMock.verify(client, segmentPusherConfig, inputDataConfig);
+ }
+}
diff --git a/extensions-contrib/aliyun-oss-extensions/src/test/java/org/apache/druid/storage/aliyun/OssDataSegmentMoverTest.java b/extensions-contrib/aliyun-oss-extensions/src/test/java/org/apache/druid/storage/aliyun/OssDataSegmentMoverTest.java
new file mode 100644
index 00000000000..66c6f25006f
--- /dev/null
+++ b/extensions-contrib/aliyun-oss-extensions/src/test/java/org/apache/druid/storage/aliyun/OssDataSegmentMoverTest.java
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.storage.aliyun;
+
+import com.aliyun.oss.OSSClient;
+import com.aliyun.oss.OSSException;
+import com.aliyun.oss.model.CopyObjectRequest;
+import com.aliyun.oss.model.CopyObjectResult;
+import com.aliyun.oss.model.ListObjectsRequest;
+import com.aliyun.oss.model.OSSObjectSummary;
+import com.aliyun.oss.model.ObjectListing;
+import com.aliyun.oss.model.PutObjectResult;
+import com.aliyun.oss.model.StorageClass;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.MapUtils;
+import org.apache.druid.segment.loading.SegmentLoadingException;
+import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.partition.NoneShardSpec;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+public class OssDataSegmentMoverTest
+{
+ private static final DataSegment SOURCE_SEGMENT = new DataSegment(
+ "test",
+ Intervals.of("2013-01-01/2013-01-02"),
+ "1",
+ ImmutableMap.of(
+ "key",
+ "baseKey/test/2013-01-01T00:00:00.000Z_2013-01-02T00:00:00.000Z/1/0/index.zip",
+ "bucket",
+ "main"
+ ),
+ ImmutableList.of("dim1", "dim1"),
+ ImmutableList.of("metric1", "metric2"),
+ NoneShardSpec.instance(),
+ 0,
+ 1
+ );
+
+ @Test
+ public void testMove() throws Exception
+ {
+ MockClient mockClient = new MockClient();
+ OssDataSegmentMover mover = new OssDataSegmentMover(mockClient, new OssStorageConfig());
+
+ mockClient.putObject(
+ "main",
+ "baseKey/test/2013-01-01T00:00:00.000Z_2013-01-02T00:00:00.000Z/1/0/index.zip"
+ );
+
+ DataSegment movedSegment = mover.move(
+ SOURCE_SEGMENT,
+ ImmutableMap.of("baseKey", "targetBaseKey", "bucket", "archive")
+ );
+
+ Map targetLoadSpec = movedSegment.getLoadSpec();
+ Assert.assertEquals(
+ "targetBaseKey/test/2013-01-01T00:00:00.000Z_2013-01-02T00:00:00.000Z/1/0/index.zip",
+ MapUtils.getString(targetLoadSpec, "key")
+ );
+ Assert.assertEquals("archive", MapUtils.getString(targetLoadSpec, "bucket"));
+ Assert.assertTrue(mockClient.didMove());
+ }
+
+ @Test
+ public void testMoveNoop() throws Exception
+ {
+ MockClient mockOssClient = new MockClient();
+ OssDataSegmentMover mover = new OssDataSegmentMover(mockOssClient, new OssStorageConfig());
+
+ mockOssClient.putObject(
+ "archive",
+ "targetBaseKey/test/2013-01-01T00:00:00.000Z_2013-01-02T00:00:00.000Z/1/0/index.zip"
+ );
+
+ DataSegment movedSegment = mover.move(
+ SOURCE_SEGMENT,
+ ImmutableMap.of("baseKey", "targetBaseKey", "bucket", "archive")
+ );
+
+ Map targetLoadSpec = movedSegment.getLoadSpec();
+
+ Assert.assertEquals(
+ "targetBaseKey/test/2013-01-01T00:00:00.000Z_2013-01-02T00:00:00.000Z/1/0/index.zip",
+ MapUtils.getString(targetLoadSpec, "key")
+ );
+ Assert.assertEquals("archive", MapUtils.getString(targetLoadSpec, "bucket"));
+ Assert.assertFalse(mockOssClient.didMove());
+ }
+
+ @Test(expected = SegmentLoadingException.class)
+ public void testMoveException() throws Exception
+ {
+ MockClient mockClient = new MockClient();
+ OssDataSegmentMover mover = new OssDataSegmentMover(mockClient, new OssStorageConfig());
+
+ mover.move(
+ SOURCE_SEGMENT,
+ ImmutableMap.of("baseKey", "targetBaseKey", "bucket", "archive")
+ );
+ }
+
+ @Test
+ public void testIgnoresGoneButAlreadyMoved() throws Exception
+ {
+ MockClient mockOssClient = new MockClient();
+ OssDataSegmentMover mover = new OssDataSegmentMover(mockOssClient, new OssStorageConfig());
+ mover.move(new DataSegment(
+ "test",
+ Intervals.of("2013-01-01/2013-01-02"),
+ "1",
+ ImmutableMap.of(
+ "key",
+ "baseKey/test/2013-01-01T00:00:00.000Z_2013-01-02T00:00:00.000Z/1/0/index.zip",
+ "bucket",
+ "DOES NOT EXIST"
+ ),
+ ImmutableList.of("dim1", "dim1"),
+ ImmutableList.of("metric1", "metric2"),
+ NoneShardSpec.instance(),
+ 0,
+ 1
+ ), ImmutableMap.of("bucket", "DOES NOT EXIST", "baseKey", "baseKey"));
+ }
+
+ @Test(expected = SegmentLoadingException.class)
+ public void testFailsToMoveMissing() throws Exception
+ {
+ MockClient client = new MockClient();
+ OssDataSegmentMover mover = new OssDataSegmentMover(client, new OssStorageConfig());
+ mover.move(new DataSegment(
+ "test",
+ Intervals.of("2013-01-01/2013-01-02"),
+ "1",
+ ImmutableMap.of(
+ "key",
+ "baseKey/test/2013-01-01T00:00:00.000Z_2013-01-02T00:00:00.000Z/1/0/index.zip",
+ "bucket",
+ "DOES NOT EXIST"
+ ),
+ ImmutableList.of("dim1", "dim1"),
+ ImmutableList.of("metric1", "metric2"),
+ NoneShardSpec.instance(),
+ 0,
+ 1
+ ), ImmutableMap.of("bucket", "DOES NOT EXIST", "baseKey", "baseKey2"));
+ }
+
+ private static class MockClient extends OSSClient
+ {
+ Map> storage = new HashMap<>();
+ boolean copied = false;
+ boolean deletedOld = false;
+
+ private MockClient()
+ {
+ super("endpoint", "accessKeyId", "keySecret");
+ }
+
+ public boolean didMove()
+ {
+ return copied && deletedOld;
+ }
+
+ @Override
+ public boolean doesObjectExist(String bucketName, String objectKey)
+ {
+ Set objects = storage.get(bucketName);
+ return (objects != null && objects.contains(objectKey));
+ }
+
+ @Override
+ public ObjectListing listObjects(ListObjectsRequest listObjectsV2Request)
+ {
+ final String bucketName = listObjectsV2Request.getBucketName();
+ final String objectKey = listObjectsV2Request.getPrefix();
+ if (doesObjectExist(bucketName, objectKey)) {
+ final OSSObjectSummary objectSummary = new OSSObjectSummary();
+ objectSummary.setBucketName(bucketName);
+ objectSummary.setKey(objectKey);
+ objectSummary.setStorageClass(StorageClass.Standard.name());
+
+ final ObjectListing result = new ObjectListing();
+ result.setBucketName(bucketName);
+ result.setPrefix(objectKey);
+ //result.setKeyCount(1);
+ result.getObjectSummaries().add(objectSummary);
+ result.setTruncated(true);
+ return result;
+ } else {
+ return new ObjectListing();
+ }
+ }
+
+ @Override
+ public CopyObjectResult copyObject(CopyObjectRequest copyObjectRequest)
+ {
+ final String sourceBucketName = copyObjectRequest.getSourceBucketName();
+ final String sourceObjectKey = copyObjectRequest.getSourceKey();
+ final String destinationBucketName = copyObjectRequest.getDestinationBucketName();
+ final String destinationObjectKey = copyObjectRequest.getDestinationKey();
+ copied = true;
+ if (doesObjectExist(sourceBucketName, sourceObjectKey)) {
+ storage.computeIfAbsent(destinationBucketName, k -> new HashSet<>())
+ .add(destinationObjectKey);
+ return new CopyObjectResult();
+ } else {
+ final OSSException exception = new OSSException(
+ "OssDataSegmentMoverTest",
+ "NoSuchKey",
+ null,
+ null,
+ null,
+ null,
+ null
+ );
+ throw exception;
+ }
+ }
+
+ @Override
+ public void deleteObject(String bucket, String objectKey)
+ {
+ deletedOld = true;
+ storage.get(bucket).remove(objectKey);
+ }
+
+ public PutObjectResult putObject(String bucketName, String key)
+ {
+ return putObject(bucketName, key, (File) null);
+ }
+
+ @Override
+ public PutObjectResult putObject(String bucketName, String key, File file)
+ {
+ storage.computeIfAbsent(bucketName, bName -> new HashSet<>()).add(key);
+ return new PutObjectResult();
+ }
+ }
+}
diff --git a/extensions-contrib/aliyun-oss-extensions/src/test/java/org/apache/druid/storage/aliyun/OssDataSegmentPullerTest.java b/extensions-contrib/aliyun-oss-extensions/src/test/java/org/apache/druid/storage/aliyun/OssDataSegmentPullerTest.java
new file mode 100644
index 00000000000..46584cac5e1
--- /dev/null
+++ b/extensions-contrib/aliyun-oss-extensions/src/test/java/org/apache/druid/storage/aliyun/OssDataSegmentPullerTest.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.storage.aliyun;
+
+import com.aliyun.oss.OSS;
+import com.aliyun.oss.OSSException;
+import com.aliyun.oss.model.ListObjectsRequest;
+import com.aliyun.oss.model.OSSObject;
+import com.aliyun.oss.model.OSSObjectSummary;
+import com.aliyun.oss.model.ObjectListing;
+import org.apache.druid.data.input.impl.CloudObjectLocation;
+import org.apache.druid.java.util.common.FileUtils;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.segment.loading.SegmentLoadingException;
+import org.easymock.EasyMock;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.URI;
+import java.nio.charset.StandardCharsets;
+import java.util.Date;
+import java.util.zip.GZIPOutputStream;
+
+/**
+ *
+ */
+public class OssDataSegmentPullerTest
+{
+ @Rule
+ public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+ @Test
+ public void testSimpleGetVersion() throws IOException
+ {
+ String bucket = "bucket";
+ String keyPrefix = "prefix/dir/0";
+ OSS ossClient = EasyMock.createStrictMock(OSS.class);
+
+ final OSSObjectSummary objectSummary = new OSSObjectSummary();
+ objectSummary.setBucketName(bucket);
+ objectSummary.setKey(keyPrefix + "/renames-0.gz");
+ objectSummary.setLastModified(new Date(0));
+
+ final ObjectListing result = new ObjectListing();
+ result.getObjectSummaries().add(objectSummary);
+
+ EasyMock.expect(ossClient.listObjects(EasyMock.anyObject(ListObjectsRequest.class)))
+ .andReturn(result)
+ .once();
+ OssDataSegmentPuller puller = new OssDataSegmentPuller(ossClient);
+
+ EasyMock.replay(ossClient);
+
+ String version = puller.getVersion(URI.create(StringUtils.format(OssStorageDruidModule.SCHEME + "://%s/%s", bucket, objectSummary.getKey())));
+
+ EasyMock.verify(ossClient);
+
+ Assert.assertEquals(StringUtils.format("%d", new Date(0).getTime()), version);
+ }
+
+ @Test
+ public void testGZUncompress() throws IOException, SegmentLoadingException
+ {
+ final String bucket = "bucket";
+ final String keyPrefix = "prefix/dir/0";
+ final OSS ossClient = EasyMock.createStrictMock(OSS.class);
+ final byte[] value = bucket.getBytes(StandardCharsets.UTF_8);
+
+ final File tmpFile = temporaryFolder.newFile("gzTest.gz");
+
+ try (OutputStream outputStream = new GZIPOutputStream(new FileOutputStream(tmpFile))) {
+ outputStream.write(value);
+ }
+
+ final OSSObject object0 = new OSSObject();
+ object0.setBucketName(bucket);
+ object0.setKey(keyPrefix + "/renames-0.gz");
+ object0.getObjectMetadata().setLastModified(new Date(0));
+ object0.setObjectContent(new FileInputStream(tmpFile));
+
+ final OSSObjectSummary objectSummary = new OSSObjectSummary();
+ objectSummary.setBucketName(bucket);
+ objectSummary.setKey(keyPrefix + "/renames-0.gz");
+ objectSummary.setLastModified(new Date(0));
+
+ final ObjectListing listObjectsResult = new ObjectListing();
+ listObjectsResult.getObjectSummaries().add(objectSummary);
+
+ final File tmpDir = temporaryFolder.newFolder("gzTestDir");
+
+ EasyMock.expect(ossClient.doesObjectExist(EasyMock.eq(object0.getBucketName()), EasyMock.eq(object0.getKey())))
+ .andReturn(true)
+ .once();
+ EasyMock.expect(ossClient.listObjects(EasyMock.anyObject(ListObjectsRequest.class)))
+ .andReturn(listObjectsResult)
+ .once();
+ EasyMock.expect(ossClient.getObject(EasyMock.eq(object0.getBucketName()), EasyMock.eq(object0.getKey())))
+ .andReturn(object0)
+ .once();
+ OssDataSegmentPuller puller = new OssDataSegmentPuller(ossClient);
+
+ EasyMock.replay(ossClient);
+ FileUtils.FileCopyResult result = puller.getSegmentFiles(
+ new CloudObjectLocation(
+ bucket,
+ object0.getKey()
+ ), tmpDir
+ );
+ EasyMock.verify(ossClient);
+
+ Assert.assertEquals(value.length, result.size());
+ File expected = new File(tmpDir, "renames-0");
+ Assert.assertTrue(expected.exists());
+ Assert.assertEquals(value.length, expected.length());
+ }
+
+ @Test
+ public void testGZUncompressRetries() throws IOException, SegmentLoadingException
+ {
+ final String bucket = "bucket";
+ final String keyPrefix = "prefix/dir/0";
+ final OSS ossClient = EasyMock.createStrictMock(OSS.class);
+ final byte[] value = bucket.getBytes(StandardCharsets.UTF_8);
+
+ final File tmpFile = temporaryFolder.newFile("gzTest.gz");
+
+ try (OutputStream outputStream = new GZIPOutputStream(new FileOutputStream(tmpFile))) {
+ outputStream.write(value);
+ }
+
+ OSSObject object0 = new OSSObject();
+
+ object0.setBucketName(bucket);
+ object0.setKey(keyPrefix + "/renames-0.gz");
+ object0.getObjectMetadata().setLastModified(new Date(0));
+ object0.setObjectContent(new FileInputStream(tmpFile));
+
+ final OSSObjectSummary objectSummary = new OSSObjectSummary();
+ objectSummary.setBucketName(bucket);
+ objectSummary.setKey(keyPrefix + "/renames-0.gz");
+ objectSummary.setLastModified(new Date(0));
+
+ final ObjectListing listObjectsResult = new ObjectListing();
+ listObjectsResult.getObjectSummaries().add(objectSummary);
+
+ File tmpDir = temporaryFolder.newFolder("gzTestDir");
+
+ OSSException exception = new OSSException("OssDataSegmentPullerTest", "NoSuchKey", null, null, null, null, null);
+ EasyMock.expect(ossClient.doesObjectExist(EasyMock.eq(object0.getBucketName()), EasyMock.eq(object0.getKey())))
+ .andReturn(true)
+ .once();
+ EasyMock.expect(ossClient.listObjects(EasyMock.anyObject(ListObjectsRequest.class)))
+ .andReturn(listObjectsResult)
+ .once();
+ EasyMock.expect(ossClient.getObject(EasyMock.eq(bucket), EasyMock.eq(object0.getKey())))
+ .andThrow(exception)
+ .once();
+ EasyMock.expect(ossClient.listObjects(EasyMock.anyObject(ListObjectsRequest.class)))
+ .andReturn(listObjectsResult)
+ .once();
+ EasyMock.expect(ossClient.getObject(EasyMock.eq(bucket), EasyMock.eq(object0.getKey())))
+ .andReturn(object0)
+ .once();
+ OssDataSegmentPuller puller = new OssDataSegmentPuller(ossClient);
+
+ EasyMock.replay(ossClient);
+ FileUtils.FileCopyResult result = puller.getSegmentFiles(
+ new CloudObjectLocation(
+ bucket,
+ object0.getKey()
+ ), tmpDir
+ );
+ EasyMock.verify(ossClient);
+
+ Assert.assertEquals(value.length, result.size());
+ File expected = new File(tmpDir, "renames-0");
+ Assert.assertTrue(expected.exists());
+ Assert.assertEquals(value.length, expected.length());
+ }
+
+}
diff --git a/extensions-contrib/aliyun-oss-extensions/src/test/java/org/apache/druid/storage/aliyun/OssDataSegmentPusherConfigTest.java b/extensions-contrib/aliyun-oss-extensions/src/test/java/org/apache/druid/storage/aliyun/OssDataSegmentPusherConfigTest.java
new file mode 100644
index 00000000000..d558a08068c
--- /dev/null
+++ b/extensions-contrib/aliyun-oss-extensions/src/test/java/org/apache/druid/storage/aliyun/OssDataSegmentPusherConfigTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.storage.aliyun;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.druid.jackson.DefaultObjectMapper;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+
+public class OssDataSegmentPusherConfigTest
+{
+ private static final ObjectMapper JSON_MAPPER = new DefaultObjectMapper();
+
+ @Test
+ public void testSerialization() throws IOException
+ {
+ String jsonConfig = "{\"bucket\":\"bucket1\",\"prefix\":\"dataSource1\"}";
+
+ OssStorageConfig config = JSON_MAPPER.readValue(jsonConfig, OssStorageConfig.class);
+ Assert.assertEquals(jsonConfig, JSON_MAPPER.writeValueAsString(config));
+ }
+
+ @Test
+ public void testSerializationWithDefaults() throws IOException
+ {
+ String jsonConfig = "{\"bucket\":\"bucket1\",\"prefix\":\"dataSource1\"}";
+ String expectedJsonConfig = "{\"bucket\":\"bucket1\",\"prefix\":\"dataSource1\"}";
+
+ OssStorageConfig config = JSON_MAPPER.readValue(jsonConfig, OssStorageConfig.class);
+ Assert.assertEquals(expectedJsonConfig, JSON_MAPPER.writeValueAsString(config));
+ }
+}
diff --git a/extensions-contrib/aliyun-oss-extensions/src/test/java/org/apache/druid/storage/aliyun/OssDataSegmentPusherTest.java b/extensions-contrib/aliyun-oss-extensions/src/test/java/org/apache/druid/storage/aliyun/OssDataSegmentPusherTest.java
new file mode 100644
index 00000000000..b3d91c7af54
--- /dev/null
+++ b/extensions-contrib/aliyun-oss-extensions/src/test/java/org/apache/druid/storage/aliyun/OssDataSegmentPusherTest.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.storage.aliyun;
+
+import com.aliyun.oss.OSS;
+import com.aliyun.oss.model.PutObjectResult;
+import com.google.common.io.Files;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.partition.NoneShardSpec;
+import org.easymock.EasyMock;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.regex.Pattern;
+
+/**
+ *
+ */
+public class OssDataSegmentPusherTest
+{
+ private static class ValueContainer
+ {
+ private T value;
+
+ public T getValue()
+ {
+ return value;
+ }
+
+ public void setValue(T value)
+ {
+ this.value = value;
+ }
+ }
+
+ @Rule
+ public final TemporaryFolder tempFolder = new TemporaryFolder();
+
+ @Test
+ public void testPush() throws Exception
+ {
+ testPushInternal(false, "key/foo/2015-01-01T00:00:00\\.000Z_2016-01-01T00:00:00\\.000Z/0/0/index\\.zip");
+ }
+
+ @Test
+ public void testPushUseUniquePath() throws Exception
+ {
+ testPushInternal(
+ true,
+ "key/foo/2015-01-01T00:00:00\\.000Z_2016-01-01T00:00:00\\.000Z/0/0/[A-Za-z0-9-]{36}/index\\.zip"
+ );
+ }
+
+ private void testPushInternal(boolean useUniquePath, String matcher) throws Exception
+ {
+ OSS client = EasyMock.createStrictMock(OSS.class);
+
+ EasyMock.expect(client.putObject(EasyMock.anyObject()))
+ .andReturn(new PutObjectResult())
+ .once();
+
+ EasyMock.replay(client);
+
+ OssStorageConfig config = new OssStorageConfig();
+ config.setBucket("bucket");
+ config.setPrefix("key");
+
+ OssDataSegmentPusher pusher = new OssDataSegmentPusher(client, config);
+
+ // Create a mock segment on disk
+ File tmp = tempFolder.newFile("version.bin");
+
+ final byte[] data = new byte[]{0x0, 0x0, 0x0, 0x1};
+ Files.write(data, tmp);
+ final long size = data.length;
+
+ DataSegment segmentToPush = new DataSegment(
+ "foo",
+ Intervals.of("2015/2016"),
+ "0",
+ new HashMap<>(),
+ new ArrayList<>(),
+ new ArrayList<>(),
+ NoneShardSpec.instance(),
+ 0,
+ size
+ );
+
+ DataSegment segment = pusher.push(tempFolder.getRoot(), segmentToPush, useUniquePath);
+
+ Assert.assertEquals(segmentToPush.getSize(), segment.getSize());
+ Assert.assertEquals(1, (int) segment.getBinaryVersion());
+ Assert.assertEquals("bucket", segment.getLoadSpec().get("bucket"));
+ Assert.assertTrue(
+ segment.getLoadSpec().get("key").toString(),
+ Pattern.compile(matcher).matcher(segment.getLoadSpec().get("key").toString()).matches()
+ );
+ Assert.assertEquals("oss_zip", segment.getLoadSpec().get("type"));
+
+ EasyMock.verify(client);
+ }
+}
diff --git a/extensions-contrib/aliyun-oss-extensions/src/test/java/org/apache/druid/storage/aliyun/OssObjectSummaryIteratorTest.java b/extensions-contrib/aliyun-oss-extensions/src/test/java/org/apache/druid/storage/aliyun/OssObjectSummaryIteratorTest.java
new file mode 100644
index 00000000000..d124b6bf6f9
--- /dev/null
+++ b/extensions-contrib/aliyun-oss-extensions/src/test/java/org/apache/druid/storage/aliyun/OssObjectSummaryIteratorTest.java
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.storage.aliyun;
+
+import com.aliyun.oss.OSS;
+import com.aliyun.oss.OSSClient;
+import com.aliyun.oss.model.ListObjectsRequest;
+import com.aliyun.oss.model.OSSObjectSummary;
+import com.aliyun.oss.model.ObjectListing;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class OssObjectSummaryIteratorTest
+{
+ private static final ImmutableList TEST_OBJECTS =
+ ImmutableList.of(
+ makeObjectSummary("b", "foo", 10L),
+ makeObjectSummary("b", "foo/", 0L), // directory
+ makeObjectSummary("b", "foo/bar1", 10L),
+ makeObjectSummary("b", "foo/bar2", 10L),
+ makeObjectSummary("b", "foo/bar3", 10L),
+ makeObjectSummary("b", "foo/bar4", 10L),
+ makeObjectSummary("b", "foo/bar5", 0L), // empty object
+ makeObjectSummary("b", "foo/baz", 10L),
+ makeObjectSummary("bucketnotmine", "a/different/bucket", 10L),
+ makeObjectSummary("b", "foo/bar/", 0L) // another directory at the end of list
+ );
+
+ @Test
+ public void testSingleObject()
+ {
+ test(
+ ImmutableList.of(OssStorageDruidModule.SCHEME + "://b/foo/baz"),
+ ImmutableList.of(OssStorageDruidModule.SCHEME + "://b/foo/baz"),
+ 5
+ );
+ }
+
+ @Test
+ public void testMultiObjectOneKeyAtATime()
+ {
+ test(
+ ImmutableList.of(
+ OssStorageDruidModule.SCHEME + "://b/foo/bar1",
+ OssStorageDruidModule.SCHEME + "://b/foo/bar2",
+ OssStorageDruidModule.SCHEME + "://b/foo/bar3",
+ OssStorageDruidModule.SCHEME + "://b/foo/bar4",
+ OssStorageDruidModule.SCHEME + "://b/foo/baz"
+ ),
+ ImmutableList.of(OssStorageDruidModule.SCHEME + "://b/foo/"),
+ 1
+ );
+ }
+
+ @Test
+ public void testMultiObjectTwoKeysAtATime()
+ {
+ test(
+ ImmutableList.of(
+ OssStorageDruidModule.SCHEME + "://b/foo/bar1",
+ OssStorageDruidModule.SCHEME + "://b/foo/bar2",
+ OssStorageDruidModule.SCHEME + "://b/foo/bar3",
+ OssStorageDruidModule.SCHEME + "://b/foo/bar4",
+ OssStorageDruidModule.SCHEME + "://b/foo/baz"
+ ),
+ ImmutableList.of(OssStorageDruidModule.SCHEME + "://b/foo/"),
+ 2
+ );
+ }
+
+ @Test
+ public void testMultiObjectTenKeysAtATime()
+ {
+ test(
+ ImmutableList.of(
+ OssStorageDruidModule.SCHEME + "://b/foo/bar1",
+ OssStorageDruidModule.SCHEME + "://b/foo/bar2",
+ OssStorageDruidModule.SCHEME + "://b/foo/bar3",
+ OssStorageDruidModule.SCHEME + "://b/foo/bar4",
+ OssStorageDruidModule.SCHEME + "://b/foo/baz"
+ ),
+ ImmutableList.of(OssStorageDruidModule.SCHEME + "://b/foo/"),
+ 10
+ );
+ }
+
+ @Test
+ public void testPrefixInMiddleOfKey()
+ {
+ test(
+ ImmutableList.of(OssStorageDruidModule.SCHEME + "://b/foo/bar1", OssStorageDruidModule.SCHEME + "://b/foo/bar2", OssStorageDruidModule.SCHEME + "://b/foo/bar3", OssStorageDruidModule.SCHEME + "://b/foo/bar4"),
+ ImmutableList.of(OssStorageDruidModule.SCHEME + "://b/foo/bar"),
+ 10
+ );
+ }
+
+ @Test
+ public void testNoPath()
+ {
+ test(
+ ImmutableList.of(
+ OssStorageDruidModule.SCHEME + "://b/foo",
+ OssStorageDruidModule.SCHEME + "://b/foo/bar1",
+ OssStorageDruidModule.SCHEME + "://b/foo/bar2",
+ OssStorageDruidModule.SCHEME + "://b/foo/bar3",
+ OssStorageDruidModule.SCHEME + "://b/foo/bar4",
+ OssStorageDruidModule.SCHEME + "://b/foo/baz"
+ ),
+ ImmutableList.of(OssStorageDruidModule.SCHEME + "://b"),
+ 10
+ );
+ }
+
+ @Test
+ public void testSlashPath()
+ {
+ test(
+ ImmutableList.of(
+ OssStorageDruidModule.SCHEME + "://b/foo",
+ OssStorageDruidModule.SCHEME + "://b/foo/bar1",
+ OssStorageDruidModule.SCHEME + "://b/foo/bar2",
+ OssStorageDruidModule.SCHEME + "://b/foo/bar3",
+ OssStorageDruidModule.SCHEME + "://b/foo/bar4",
+ OssStorageDruidModule.SCHEME + "://b/foo/baz"
+ ),
+ ImmutableList.of(OssStorageDruidModule.SCHEME + "://b/"),
+ 10
+ );
+ }
+
+ @Test
+ public void testDifferentBucket()
+ {
+ test(
+ ImmutableList.of(),
+ ImmutableList.of(OssStorageDruidModule.SCHEME + "://bx/foo/"),
+ 10
+ );
+ }
+
+ @Test
+ public void testWithMultiplePrefixesReturningAllNonEmptyObjectsStartingWithOneOfPrefixes()
+ {
+ test(
+ ImmutableList.of(
+ OssStorageDruidModule.SCHEME + "://b/foo/bar1",
+ OssStorageDruidModule.SCHEME + "://b/foo/bar2",
+ OssStorageDruidModule.SCHEME + "://b/foo/bar3",
+ OssStorageDruidModule.SCHEME + "://b/foo/bar4",
+ OssStorageDruidModule.SCHEME + "://b/foo/baz"
+ ),
+ ImmutableList.of(OssStorageDruidModule.SCHEME + "://b/foo/bar", OssStorageDruidModule.SCHEME + "://b/foo/baz"),
+ 10
+ );
+ }
+
+ private static void test(
+ final List expectedUris,
+ final List prefixes,
+ final int maxListingLength
+ )
+ {
+ final List expectedObjects = new ArrayList<>();
+
+ // O(N^2) but who cares -- the list is short.
+ for (final String uri : expectedUris) {
+ final List matches = TEST_OBJECTS.stream()
+ .filter(
+ summary ->
+ OssUtils.summaryToUri(summary).toString().equals(uri)
+ )
+ .collect(Collectors.toList());
+
+ expectedObjects.add(Iterables.getOnlyElement(matches));
+ }
+
+ final List actualObjects = ImmutableList.copyOf(
+ OssUtils.objectSummaryIterator(
+ makeMockClient(TEST_OBJECTS),
+ prefixes.stream().map(URI::create).collect(Collectors.toList()),
+ maxListingLength
+ )
+ );
+
+ Assert.assertEquals(
+ prefixes.toString(),
+ expectedObjects.stream().map(OssUtils::summaryToUri).collect(Collectors.toList()),
+ actualObjects.stream().map(OssUtils::summaryToUri).collect(Collectors.toList())
+ );
+ }
+
+ /**
+ * Makes a mock OSS client that handles enough of "listObjects" to test the functionality of the
+ * {@link OssObjectSummaryIterator} class.
+ */
+ private static OSS makeMockClient(
+ final List objects
+ )
+ {
+ return new OSSClient("endpoint", "accessKey", "keySecret")
+ {
+ @Override
+ public ObjectListing listObjects(final ListObjectsRequest request)
+ {
+ // Continuation token is an index in the "objects" list.q
+ final String continuationToken = request.getMarker();
+ final int startIndex = continuationToken == null ? 0 : Integer.parseInt(continuationToken);
+
+ // Find matching objects.
+ final List summaries = new ArrayList<>();
+ int nextIndex = -1;
+
+ for (int i = startIndex; i < objects.size(); i++) {
+ final OSSObjectSummary summary = objects.get(i);
+
+ if (summary.getBucketName().equals(request.getBucketName())
+ && summary.getKey().startsWith(request.getPrefix())) {
+
+ if (summaries.size() == request.getMaxKeys()) {
+ // We reached our max key limit; set nextIndex (which will lead to a result with truncated = true).
+ nextIndex = i;
+ break;
+ }
+
+ // Generate a summary.
+ summaries.add(summary);
+ }
+ }
+
+ // Generate the result.
+ final ObjectListing retVal = new ObjectListing();
+ retVal.getObjectSummaries().addAll(summaries);
+
+ if (nextIndex >= 0) {
+ retVal.setTruncated(true);
+ retVal.setNextMarker(String.valueOf(nextIndex));
+ }
+
+ return retVal;
+ }
+ };
+ }
+
+ private static OSSObjectSummary makeObjectSummary(final String bucket, final String key, final long size)
+ {
+ final OSSObjectSummary summary = new OSSObjectSummary();
+ summary.setBucketName(bucket);
+ summary.setKey(key);
+ summary.setSize(size);
+ return summary;
+ }
+}
diff --git a/extensions-contrib/aliyun-oss-extensions/src/test/java/org/apache/druid/storage/aliyun/OssTaskLogsTest.java b/extensions-contrib/aliyun-oss-extensions/src/test/java/org/apache/druid/storage/aliyun/OssTaskLogsTest.java
new file mode 100644
index 00000000000..1264a0fe9d7
--- /dev/null
+++ b/extensions-contrib/aliyun-oss-extensions/src/test/java/org/apache/druid/storage/aliyun/OssTaskLogsTest.java
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.storage.aliyun;
+
+import com.aliyun.oss.ClientException;
+import com.aliyun.oss.OSS;
+import com.aliyun.oss.model.AccessControlList;
+import com.aliyun.oss.model.DeleteObjectsRequest;
+import com.aliyun.oss.model.Grant;
+import com.aliyun.oss.model.OSSObjectSummary;
+import com.aliyun.oss.model.Owner;
+import com.aliyun.oss.model.PutObjectRequest;
+import com.aliyun.oss.model.PutObjectResult;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.apache.druid.common.utils.CurrentTimeMillisSupplier;
+import org.apache.druid.java.util.common.StringUtils;
+import org.easymock.EasyMock;
+import org.easymock.EasyMockRunner;
+import org.easymock.EasyMockSupport;
+import org.easymock.Mock;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+@RunWith(EasyMockRunner.class)
+public class OssTaskLogsTest extends EasyMockSupport
+{
+
+ private static final String KEY_1 = "key1";
+ private static final String KEY_2 = "key2";
+ private static final String TEST_BUCKET = "test_bucket";
+ private static final String TEST_PREFIX = "test_prefix";
+ private static final URI PREFIX_URI = URI.create(StringUtils.format("oss://%s/%s", TEST_BUCKET, TEST_PREFIX));
+ private static final long TIME_0 = 0L;
+ private static final long TIME_1 = 1L;
+ private static final long TIME_NOW = 2L;
+ private static final long TIME_FUTURE = 3L;
+ private static final int MAX_KEYS = 1;
+ private static final Exception RECOVERABLE_EXCEPTION = new ClientException(new IOException());
+ private static final Exception NON_RECOVERABLE_EXCEPTION = new ClientException(new NullPointerException());
+
+ @Mock
+ private CurrentTimeMillisSupplier timeSupplier;
+ @Mock
+ private OSS ossClient;
+
+ @Rule
+ public final TemporaryFolder tempFolder = new TemporaryFolder();
+
+ @Test
+ public void testTaskLogsPushWithAclDisabled() throws Exception
+ {
+ String ownerId = "test_owner";
+ String ownerDisplayName = "test_owner";
+
+ List grantList = testPushInternal(true, ownerId, ownerDisplayName);
+
+ Assert.assertNotNull("Grant list should not be null", grantList);
+ Assert.assertEquals("Grant list should be empty as ACL is disabled", 0, grantList.size());
+ }
+
+ @Test
+ public void test_killAll_noException_deletesAllTaskLogs() throws IOException
+ {
+ OSSObjectSummary objectSummary1 = OssTestUtils.newOSSObjectSummary(TEST_BUCKET, KEY_1, TIME_0);
+ OSSObjectSummary objectSummary2 = OssTestUtils.newOSSObjectSummary(TEST_BUCKET, KEY_2, TIME_1);
+
+ EasyMock.expect(timeSupplier.getAsLong()).andReturn(TIME_NOW);
+
+ OssTestUtils.expectListObjects(
+ ossClient,
+ PREFIX_URI,
+ ImmutableList.of(objectSummary1, objectSummary2)
+ );
+
+ DeleteObjectsRequest deleteRequest1 = new DeleteObjectsRequest(TEST_BUCKET);
+ deleteRequest1.setKeys(Collections.singletonList(KEY_1));
+ DeleteObjectsRequest deleteRequest2 = new DeleteObjectsRequest(TEST_BUCKET);
+ deleteRequest2.setKeys(Collections.singletonList(KEY_2));
+
+ OssTestUtils.mockClientDeleteObjects(
+ ossClient,
+ ImmutableList.of(deleteRequest1, deleteRequest2),
+ ImmutableMap.of()
+ );
+
+ EasyMock.replay(ossClient, timeSupplier);
+
+ OssTaskLogsConfig config = new OssTaskLogsConfig();
+ config.setBucket(TEST_BUCKET);
+ config.setPrefix(TEST_PREFIX);
+ OssInputDataConfig inputDataConfig = new OssInputDataConfig();
+ inputDataConfig.setMaxListingLength(MAX_KEYS);
+ OssTaskLogs taskLogs = new OssTaskLogs(ossClient, config, inputDataConfig, timeSupplier);
+ taskLogs.killAll();
+
+ EasyMock.verify(ossClient, timeSupplier);
+ }
+
+ @Test
+ public void test_killAll_recoverableExceptionWhenDeletingObjects_deletesAllTaskLogs() throws IOException
+ {
+ OSSObjectSummary objectSummary1 = OssTestUtils.newOSSObjectSummary(TEST_BUCKET, KEY_1, TIME_0);
+
+ EasyMock.expect(timeSupplier.getAsLong()).andReturn(TIME_NOW);
+
+ OssTestUtils.expectListObjects(
+ ossClient,
+ PREFIX_URI,
+ ImmutableList.of(objectSummary1)
+ );
+
+ DeleteObjectsRequest expectedRequest = new DeleteObjectsRequest(TEST_BUCKET);
+ expectedRequest.setKeys(Collections.singletonList(KEY_1));
+ OssTestUtils.mockClientDeleteObjects(
+ ossClient,
+ ImmutableList.of(expectedRequest),
+ ImmutableMap.of(expectedRequest, RECOVERABLE_EXCEPTION)
+ );
+
+ EasyMock.replay(ossClient, timeSupplier);
+
+ OssTaskLogsConfig config = new OssTaskLogsConfig();
+ config.setBucket(TEST_BUCKET);
+ config.setPrefix(TEST_PREFIX);
+ OssInputDataConfig inputDataConfig = new OssInputDataConfig();
+ inputDataConfig.setMaxListingLength(MAX_KEYS);
+ OssTaskLogs taskLogs = new OssTaskLogs(ossClient, config, inputDataConfig, timeSupplier);
+ taskLogs.killAll();
+
+ EasyMock.verify(ossClient, timeSupplier);
+ }
+
+ @Test
+ public void test_killAll_nonrecoverableExceptionWhenListingObjects_doesntDeleteAnyTaskLogs()
+ {
+ boolean ioExceptionThrown = false;
+ try {
+ OSSObjectSummary objectSummary1 = OssTestUtils.newOSSObjectSummary(TEST_BUCKET, KEY_1, TIME_0);
+ EasyMock.expect(timeSupplier.getAsLong()).andReturn(TIME_NOW);
+ OssTestUtils.expectListObjects(
+ ossClient,
+ PREFIX_URI,
+ ImmutableList.of(objectSummary1)
+ );
+
+ DeleteObjectsRequest deleteRequest1 = new DeleteObjectsRequest(TEST_BUCKET);
+ deleteRequest1.setKeys(Collections.singletonList(KEY_1));
+ OssTestUtils.mockClientDeleteObjects(
+ ossClient,
+ ImmutableList.of(),
+ ImmutableMap.of(deleteRequest1, NON_RECOVERABLE_EXCEPTION)
+ );
+
+ EasyMock.replay(ossClient, timeSupplier);
+
+ OssTaskLogsConfig config = new OssTaskLogsConfig();
+ config.setBucket(TEST_BUCKET);
+ config.setPrefix(TEST_PREFIX);
+ OssInputDataConfig inputDataConfig = new OssInputDataConfig();
+ inputDataConfig.setMaxListingLength(MAX_KEYS);
+ OssTaskLogs taskLogs = new OssTaskLogs(ossClient, config, inputDataConfig, timeSupplier);
+ taskLogs.killAll();
+ }
+ catch (IOException e) {
+ ioExceptionThrown = true;
+ }
+
+ Assert.assertTrue(ioExceptionThrown);
+
+ EasyMock.verify(ossClient, timeSupplier);
+ }
+
+ @Test
+ public void test_killOlderThan_noException_deletesOnlyTaskLogsOlderThan() throws IOException
+ {
+ OSSObjectSummary objectSummary1 = OssTestUtils.newOSSObjectSummary(TEST_BUCKET, KEY_1, TIME_0);
+ OSSObjectSummary objectSummary2 = OssTestUtils.newOSSObjectSummary(TEST_BUCKET, KEY_2, TIME_FUTURE);
+
+ OssTestUtils.expectListObjects(
+ ossClient,
+ PREFIX_URI,
+ ImmutableList.of(objectSummary1, objectSummary2)
+ );
+
+ DeleteObjectsRequest deleteRequest1 = new DeleteObjectsRequest(TEST_BUCKET);
+ deleteRequest1.setKeys(Collections.singletonList(KEY_1));
+
+ OssTestUtils.mockClientDeleteObjects(ossClient, ImmutableList.of(deleteRequest1), ImmutableMap.of());
+
+ EasyMock.replay(ossClient, timeSupplier);
+
+ OssTaskLogsConfig config = new OssTaskLogsConfig();
+ config.setBucket(TEST_BUCKET);
+ config.setPrefix(TEST_PREFIX);
+ OssInputDataConfig inputDataConfig = new OssInputDataConfig();
+ inputDataConfig.setMaxListingLength(MAX_KEYS);
+ OssTaskLogs taskLogs = new OssTaskLogs(ossClient, config, inputDataConfig, timeSupplier);
+ taskLogs.killOlderThan(TIME_NOW);
+
+ EasyMock.verify(ossClient, timeSupplier);
+ }
+
+ @Test
+ public void test_killOlderThan_recoverableExceptionWhenListingObjects_deletesAllTaskLogs() throws IOException
+ {
+ OSSObjectSummary objectSummary1 = OssTestUtils.newOSSObjectSummary(TEST_BUCKET, KEY_1, TIME_0);
+
+ OssTestUtils.expectListObjects(
+ ossClient,
+ PREFIX_URI,
+ ImmutableList.of(objectSummary1)
+ );
+
+ DeleteObjectsRequest deleteRequest1 = new DeleteObjectsRequest(TEST_BUCKET);
+ deleteRequest1.setKeys(Collections.singletonList(KEY_1));
+
+ OssTestUtils.mockClientDeleteObjects(
+ ossClient,
+ ImmutableList.of(deleteRequest1),
+ ImmutableMap.of(deleteRequest1, RECOVERABLE_EXCEPTION)
+ );
+
+ EasyMock.replay(ossClient, timeSupplier);
+
+ OssTaskLogsConfig config = new OssTaskLogsConfig();
+ config.setBucket(TEST_BUCKET);
+ config.setPrefix(TEST_PREFIX);
+ OssInputDataConfig inputDataConfig = new OssInputDataConfig();
+ inputDataConfig.setMaxListingLength(MAX_KEYS);
+ OssTaskLogs taskLogs = new OssTaskLogs(ossClient, config, inputDataConfig, timeSupplier);
+ taskLogs.killOlderThan(TIME_NOW);
+
+ EasyMock.verify(ossClient, timeSupplier);
+ }
+
+ @Test
+ public void test_killOlderThan_nonrecoverableExceptionWhenListingObjects_doesntDeleteAnyTaskLogs()
+ {
+ boolean ioExceptionThrown = false;
+ try {
+ OSSObjectSummary objectSummary1 = OssTestUtils.newOSSObjectSummary(TEST_BUCKET, KEY_1, TIME_0);
+ OssTestUtils.expectListObjects(
+ ossClient,
+ PREFIX_URI,
+ ImmutableList.of(objectSummary1)
+ );
+
+ DeleteObjectsRequest deleteRequest1 = new DeleteObjectsRequest(TEST_BUCKET);
+ deleteRequest1.setKeys(Collections.singletonList(KEY_1));
+ OssTestUtils.mockClientDeleteObjects(
+ ossClient,
+ ImmutableList.of(),
+ ImmutableMap.of(deleteRequest1, NON_RECOVERABLE_EXCEPTION)
+ );
+
+ EasyMock.replay(ossClient, timeSupplier);
+
+ OssTaskLogsConfig config = new OssTaskLogsConfig();
+ config.setBucket(TEST_BUCKET);
+ config.setPrefix(TEST_PREFIX);
+ OssInputDataConfig inputDataConfig = new OssInputDataConfig();
+ inputDataConfig.setMaxListingLength(MAX_KEYS);
+ OssTaskLogs taskLogs = new OssTaskLogs(ossClient, config, inputDataConfig, timeSupplier);
+ taskLogs.killOlderThan(TIME_NOW);
+ }
+ catch (IOException e) {
+ ioExceptionThrown = true;
+ }
+
+ Assert.assertTrue(ioExceptionThrown);
+
+ EasyMock.verify(ossClient, timeSupplier);
+ }
+
+ private List testPushInternal(boolean disableAcl, String ownerId, String ownerDisplayName) throws Exception
+ {
+ EasyMock.expect(ossClient.putObject(EasyMock.anyObject()))
+ .andReturn(new PutObjectResult())
+ .once();
+
+ AccessControlList aclExpected = new AccessControlList();
+ aclExpected.setOwner(new Owner(ownerId, ownerDisplayName));
+
+ EasyMock.expect(ossClient.getBucketAcl(TEST_BUCKET))
+ .andReturn(aclExpected)
+ .once();
+
+ EasyMock.expect(ossClient.putObject(EasyMock.anyObject(PutObjectRequest.class)))
+ .andReturn(new PutObjectResult())
+ .once();
+
+ EasyMock.replay(ossClient);
+
+ OssTaskLogsConfig config = new OssTaskLogsConfig();
+ config.setDisableAcl(disableAcl);
+ config.setBucket(TEST_BUCKET);
+ CurrentTimeMillisSupplier timeSupplier = new CurrentTimeMillisSupplier();
+ OssInputDataConfig inputDataConfig = new OssInputDataConfig();
+ OssTaskLogs taskLogs = new OssTaskLogs(ossClient, config, inputDataConfig, timeSupplier);
+
+ String taskId = "index_test-datasource_2019-06-18T13:30:28.887Z";
+ File logFile = tempFolder.newFile("test_log_file");
+
+ taskLogs.pushTaskLog(taskId, logFile);
+
+ return new ArrayList<>(aclExpected.getGrants());
+ }
+}
diff --git a/extensions-contrib/aliyun-oss-extensions/src/test/java/org/apache/druid/storage/aliyun/OssTestUtils.java b/extensions-contrib/aliyun-oss-extensions/src/test/java/org/apache/druid/storage/aliyun/OssTestUtils.java
new file mode 100644
index 00000000000..35ef96663e0
--- /dev/null
+++ b/extensions-contrib/aliyun-oss-extensions/src/test/java/org/apache/druid/storage/aliyun/OssTestUtils.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.storage.aliyun;
+
+import com.aliyun.oss.OSS;
+import com.aliyun.oss.model.DeleteObjectsRequest;
+import com.aliyun.oss.model.DeleteObjectsResult;
+import com.aliyun.oss.model.ListObjectsRequest;
+import com.aliyun.oss.model.OSSObjectSummary;
+import com.aliyun.oss.model.ObjectListing;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.StringUtils;
+import org.easymock.EasyMock;
+import org.easymock.EasyMockSupport;
+import org.easymock.IArgumentMatcher;
+import org.easymock.IExpectationSetters;
+import org.joda.time.DateTime;
+
+import java.net.URI;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class OssTestUtils extends EasyMockSupport
+{
+ private static final DateTime NOW = DateTimes.nowUtc();
+ private static final byte[] CONTENT =
+ StringUtils.toUtf8(StringUtils.format("%d,hello,world", NOW.getMillis()));
+
+ public static DeleteObjectsRequest deleteObjectsRequestArgumentMatcher(DeleteObjectsRequest deleteObjectsRequest)
+ {
+ EasyMock.reportMatcher(new IArgumentMatcher()
+ {
+ @Override
+ public boolean matches(Object argument)
+ {
+
+ boolean matches = argument instanceof DeleteObjectsRequest
+ && deleteObjectsRequest.getBucketName()
+ .equals(((DeleteObjectsRequest) argument).getBucketName())
+ && deleteObjectsRequest.getKeys().size() == ((DeleteObjectsRequest) argument).getKeys()
+ .size();
+ if (matches) {
+ List expectedKeysAndVersions = deleteObjectsRequest.getKeys();
+ List actualKeysAndVersions = ((DeleteObjectsRequest) argument).getKeys();
+ matches = expectedKeysAndVersions.equals(actualKeysAndVersions);
+ }
+ return matches;
+ }
+
+ @Override
+ public void appendTo(StringBuffer buffer)
+ {
+ String str = "DeleteObjectsRequest(\"bucketName:\" \""
+ + deleteObjectsRequest.getBucketName()
+ + "\", \"keys:\""
+ + deleteObjectsRequest.getKeys()
+ + "\")";
+ buffer.append(str);
+ }
+ });
+ return null;
+ }
+
+ public static void expectListObjects(
+ OSS client,
+ URI prefix,
+ List objectSummaries
+ )
+ {
+ final ObjectListing result = new ObjectListing();
+ result.setBucketName(prefix.getAuthority());
+ //result.setsetKeyCount(objectSummaries.size());
+ for (OSSObjectSummary objectSummary : objectSummaries) {
+ result.getObjectSummaries().add(objectSummary);
+ }
+
+ EasyMock.expect(
+ client.listObjects(matchListObjectsRequest(prefix))
+ ).andReturn(result).once();
+ }
+
+ public static void mockClientDeleteObjects(
+ OSS client,
+ List deleteRequestsExpected,
+ Map requestToException
+ )
+ {
+ Map> requestToResultExpectationSetter = new HashMap<>();
+
+ for (Map.Entry requestsAndErrors : requestToException.entrySet()) {
+ DeleteObjectsRequest request = requestsAndErrors.getKey();
+ Exception exception = requestsAndErrors.getValue();
+ IExpectationSetters resultExpectationSetter = requestToResultExpectationSetter.get(request);
+ if (resultExpectationSetter == null) {
+ client.deleteObjects(
+ OssTestUtils.deleteObjectsRequestArgumentMatcher(request));
+ resultExpectationSetter = EasyMock.expectLastCall().andThrow(exception);
+ requestToResultExpectationSetter.put(request, resultExpectationSetter);
+ } else {
+ resultExpectationSetter.andThrow(exception);
+ }
+ }
+
+ for (DeleteObjectsRequest request : deleteRequestsExpected) {
+ IExpectationSetters resultExpectationSetter = requestToResultExpectationSetter.get(request);
+ if (resultExpectationSetter == null) {
+ client.deleteObjects(OssTestUtils.deleteObjectsRequestArgumentMatcher(request));
+ resultExpectationSetter = EasyMock.expectLastCall();
+ requestToResultExpectationSetter.put(request, resultExpectationSetter);
+ }
+ resultExpectationSetter.andReturn(new DeleteObjectsResult());
+ }
+ }
+
+ public static ListObjectsRequest matchListObjectsRequest(final URI prefixUri)
+ {
+ // Use an IArgumentMatcher to verify that the request has the correct bucket and prefix.
+ EasyMock.reportMatcher(
+ new IArgumentMatcher()
+ {
+ @Override
+ public boolean matches(Object argument)
+ {
+ if (!(argument instanceof ListObjectsRequest)) {
+ return false;
+ }
+
+ final ListObjectsRequest request = (ListObjectsRequest) argument;
+ return prefixUri.getAuthority().equals(request.getBucketName())
+ && OssUtils.extractKey(prefixUri).equals(request.getPrefix());
+ }
+
+ @Override
+ public void appendTo(StringBuffer buffer)
+ {
+ buffer.append("");
+ }
+ }
+ );
+
+ return null;
+ }
+
+ public static OSSObjectSummary newOSSObjectSummary(
+ String bucket,
+ String key,
+ long lastModifiedTimestamp
+ )
+ {
+ OSSObjectSummary objectSummary = new OSSObjectSummary();
+ objectSummary.setBucketName(bucket);
+ objectSummary.setKey(key);
+ objectSummary.setLastModified(new Date(lastModifiedTimestamp));
+ objectSummary.setETag("etag");
+ objectSummary.setSize(CONTENT.length);
+ return objectSummary;
+ }
+}
diff --git a/extensions-contrib/aliyun-oss-extensions/src/test/java/org/apache/druid/storage/aliyun/OssTimestampVersionedDataFinderTest.java b/extensions-contrib/aliyun-oss-extensions/src/test/java/org/apache/druid/storage/aliyun/OssTimestampVersionedDataFinderTest.java
new file mode 100644
index 00000000000..8443d2f4abc
--- /dev/null
+++ b/extensions-contrib/aliyun-oss-extensions/src/test/java/org/apache/druid/storage/aliyun/OssTimestampVersionedDataFinderTest.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.storage.aliyun;
+
+import com.aliyun.oss.OSS;
+import com.aliyun.oss.model.ListObjectsRequest;
+import com.aliyun.oss.model.OSSObjectSummary;
+import com.aliyun.oss.model.ObjectListing;
+import org.apache.druid.java.util.common.StringUtils;
+import org.easymock.EasyMock;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.net.URI;
+import java.util.Date;
+import java.util.regex.Pattern;
+
+public class OssTimestampVersionedDataFinderTest
+{
+
+ @Test
+ public void testSimpleLatestVersion()
+ {
+ String bucket = "bucket";
+ String keyPrefix = "prefix/dir/0";
+ OSS client = EasyMock.createStrictMock(OSS.class);
+
+ OSSObjectSummary object0 = new OSSObjectSummary(), object1 = new OSSObjectSummary();
+
+ object0.setBucketName(bucket);
+ object0.setKey(keyPrefix + "/renames-0.gz");
+ object0.setLastModified(new Date(0));
+ object0.setSize(10);
+
+ object1.setBucketName(bucket);
+ object1.setKey(keyPrefix + "/renames-1.gz");
+ object1.setLastModified(new Date(1));
+ object1.setSize(10);
+
+ final ObjectListing result = new ObjectListing();
+ result.getObjectSummaries().add(object0);
+ result.getObjectSummaries().add(object1);
+ result.setTruncated(false);
+
+ EasyMock.expect(client.listObjects(EasyMock.anyObject(ListObjectsRequest.class)))
+ .andReturn(result)
+ .once();
+ OssTimestampVersionedDataFinder finder = new OssTimestampVersionedDataFinder(client);
+
+ Pattern pattern = Pattern.compile("renames-[0-9]*\\.gz");
+
+ EasyMock.replay(client);
+
+
+ URI latest = finder.getLatestVersion(URI.create(StringUtils.format("%s://%s/%s", OssStorageDruidModule.SCHEME, bucket, keyPrefix)), pattern);
+
+ EasyMock.verify(client);
+
+ URI expected = URI.create(StringUtils.format("%s://%s/%s", OssStorageDruidModule.SCHEME, bucket, object1.getKey()));
+
+ Assert.assertEquals(expected, latest);
+ }
+
+ @Test
+ public void testMissing()
+ {
+ String bucket = "bucket";
+ String keyPrefix = "prefix/dir/0";
+ OSS oss = EasyMock.createStrictMock(OSS.class);
+
+ final ObjectListing result = new ObjectListing();
+ result.setTruncated(false);
+
+ EasyMock.expect(oss.listObjects(EasyMock.anyObject(ListObjectsRequest.class)))
+ .andReturn(result)
+ .once();
+ OssTimestampVersionedDataFinder finder = new OssTimestampVersionedDataFinder(oss);
+
+ Pattern pattern = Pattern.compile("renames-[0-9]*\\.gz");
+
+ EasyMock.replay(oss);
+
+
+ URI latest = finder.getLatestVersion(URI.create(StringUtils.format("%s://%s/%s", OssStorageDruidModule.SCHEME, bucket, keyPrefix)), pattern);
+
+ EasyMock.verify(oss);
+
+ Assert.assertEquals(null, latest);
+ }
+
+ @Test
+ public void testFindSelf()
+ {
+ String bucket = "bucket";
+ String keyPrefix = "prefix/dir/0";
+ OSS ossClient = EasyMock.createStrictMock(OSS.class);
+
+ OSSObjectSummary object0 = new OSSObjectSummary();
+
+ object0.setBucketName(bucket);
+ object0.setKey(keyPrefix + "/renames-0.gz");
+ object0.setLastModified(new Date(0));
+ object0.setSize(10);
+
+ final ObjectListing result = new ObjectListing();
+ result.getObjectSummaries().add(object0);
+ result.setTruncated(false);
+
+ EasyMock.expect(ossClient.listObjects(EasyMock.anyObject(ListObjectsRequest.class)))
+ .andReturn(result)
+ .once();
+ OssTimestampVersionedDataFinder finder = new OssTimestampVersionedDataFinder(ossClient);
+
+ Pattern pattern = Pattern.compile("renames-[0-9]*\\.gz");
+
+ EasyMock.replay(ossClient);
+
+
+ URI latest = finder.getLatestVersion(URI.create(StringUtils.format("%s://%s/%s", OssStorageDruidModule.SCHEME, bucket, keyPrefix)), pattern);
+
+ EasyMock.verify(ossClient);
+
+ URI expected = URI.create(StringUtils.format("%s://%s/%s", OssStorageDruidModule.SCHEME, bucket, object0.getKey()));
+
+ Assert.assertEquals(expected, latest);
+ }
+
+ @Test
+ public void testFindExact()
+ {
+ String bucket = "bucket";
+ String keyPrefix = "prefix/dir/0";
+ OSS ossClient = EasyMock.createStrictMock(OSS.class);
+
+ OSSObjectSummary object0 = new OSSObjectSummary();
+
+ object0.setBucketName(bucket);
+ object0.setKey(keyPrefix + "/renames-0.gz");
+ object0.setLastModified(new Date(0));
+ object0.setSize(10);
+
+ final ObjectListing result = new ObjectListing();
+ result.getObjectSummaries().add(object0);
+ result.setTruncated(false);
+
+ EasyMock.expect(ossClient.listObjects(EasyMock.anyObject(ListObjectsRequest.class)))
+ .andReturn(result)
+ .once();
+ OssTimestampVersionedDataFinder finder = new OssTimestampVersionedDataFinder(ossClient);
+
+ EasyMock.replay(ossClient);
+
+ URI latest = finder.getLatestVersion(URI.create(StringUtils.format("%s://%s/%s", OssStorageDruidModule.SCHEME, bucket, object0.getKey())), null);
+
+ EasyMock.verify(ossClient);
+
+ URI expected = URI.create(StringUtils.format("%s://%s/%s", OssStorageDruidModule.SCHEME, bucket, object0.getKey()));
+
+ Assert.assertEquals(expected, latest);
+ }
+}
diff --git a/integration-tests/docker/environment-configs/override-examples/oss b/integration-tests/docker/environment-configs/override-examples/oss
new file mode 100644
index 00000000000..dc2cf89085d
--- /dev/null
+++ b/integration-tests/docker/environment-configs/override-examples/oss
@@ -0,0 +1,30 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+#
+# Example of override config file to provide.
+# Please replace with your cloud configs/credentials
+#
+druid_storage_type=oss
+druid_storage_oss_bucket=
+druid_storage_oss_prefix=
+druid_oss_accessKey=
+druid_oss_secretKey=
+druid_oss_endpoint=
+druid_extensions_loadList=["aliyun-oss-extensions"]
\ No newline at end of file
diff --git a/integration-tests/src/test/java/org/apache/druid/tests/TestNGGroup.java b/integration-tests/src/test/java/org/apache/druid/tests/TestNGGroup.java
index 58a3a1b07a1..bc8e613730e 100644
--- a/integration-tests/src/test/java/org/apache/druid/tests/TestNGGroup.java
+++ b/integration-tests/src/test/java/org/apache/druid/tests/TestNGGroup.java
@@ -76,6 +76,13 @@ public class TestNGGroup
*/
public static final String AZURE_DEEP_STORAGE = "azure-deep-storage";
+ /**
+ * This group is not part of CI. To run this group, azure configs/credentials for your oss must be provided in a file.
+ * The path of the file must then be pass to mvn with -Doverride.config.path=
+ * See integration-tests/docker/environment-configs/override-examples/oss for env vars to provide.
+ */
+ public static final String ALIYUN_OSS_DEEP_STORAGE = "aliyun-oss-deep-storage";
+
/**
* This group is not part of CI. To run this group, hadoop configs must be provided in a file. The path of the file
* must then be pass to mvn with -Doverride.config.path=
diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractOssInputSourceParallelIndexTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractOssInputSourceParallelIndexTest.java
new file mode 100644
index 00000000000..72b0d355b2a
--- /dev/null
+++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractOssInputSourceParallelIndexTest.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.tests.indexer;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.StringUtils;
+import org.testng.annotations.DataProvider;
+
+import java.io.Closeable;
+import java.util.List;
+import java.util.UUID;
+import java.util.function.Function;
+
+public abstract class AbstractOssInputSourceParallelIndexTest extends AbstractITBatchIndexTest
+{
+ private static final String INDEX_TASK = "/indexer/wikipedia_cloud_index_task.json";
+ private static final String INDEX_QUERIES_RESOURCE = "/indexer/wikipedia_index_queries.json";
+ private static final String INPUT_SOURCE_URIS_KEY = "uris";
+ private static final String INPUT_SOURCE_PREFIXES_KEY = "prefixes";
+ private static final String INPUT_SOURCE_OBJECTS_KEY = "objects";
+ private static final String WIKIPEDIA_DATA_1 = "wikipedia_index_data1.json";
+ private static final String WIKIPEDIA_DATA_2 = "wikipedia_index_data2.json";
+ private static final String WIKIPEDIA_DATA_3 = "wikipedia_index_data3.json";
+
+ @DataProvider
+ public static Object[][] resources()
+ {
+ return new Object[][]{
+ {new Pair<>(INPUT_SOURCE_URIS_KEY,
+ ImmutableList.of(
+ "oss://%%BUCKET%%/%%PATH%%" + WIKIPEDIA_DATA_1,
+ "oss://%%BUCKET%%/%%PATH%%" + WIKIPEDIA_DATA_2,
+ "oss://%%BUCKET%%/%%PATH%%" + WIKIPEDIA_DATA_3
+ )
+ )},
+ {new Pair<>(INPUT_SOURCE_PREFIXES_KEY,
+ ImmutableList.of(
+ "oss://%%BUCKET%%/%%PATH%%"
+ )
+ )},
+ {new Pair<>(INPUT_SOURCE_OBJECTS_KEY,
+ ImmutableList.of(
+ ImmutableMap.of("bucket", "%%BUCKET%%", "path", "%%PATH%%" + WIKIPEDIA_DATA_1),
+ ImmutableMap.of("bucket", "%%BUCKET%%", "path", "%%PATH%%" + WIKIPEDIA_DATA_2),
+ ImmutableMap.of("bucket", "%%BUCKET%%", "path", "%%PATH%%" + WIKIPEDIA_DATA_3)
+ )
+ )}
+ };
+ }
+
+ void doTest(Pair inputSource) throws Exception
+ {
+ final String indexDatasource = "wikipedia_index_test_" + UUID.randomUUID();
+ try (
+ final Closeable ignored1 = unloader(indexDatasource + config.getExtraDatasourceNameSuffix());
+ ) {
+ final Function propsTransform = spec -> {
+ try {
+ String inputSourceValue = jsonMapper.writeValueAsString(inputSource.rhs);
+ inputSourceValue = StringUtils.replace(
+ inputSourceValue,
+ "%%BUCKET%%",
+ config.getCloudBucket()
+ );
+ inputSourceValue = StringUtils.replace(
+ inputSourceValue,
+ "%%PATH%%",
+ config.getCloudPath()
+ );
+ spec = StringUtils.replace(
+ spec,
+ "%%INPUT_FORMAT_TYPE%%",
+ InputFormatDetails.JSON.getInputFormatType()
+ );
+ spec = StringUtils.replace(
+ spec,
+ "%%PARTITIONS_SPEC%%",
+ jsonMapper.writeValueAsString(new DynamicPartitionsSpec(null, null))
+ );
+ spec = StringUtils.replace(
+ spec,
+ "%%INPUT_SOURCE_TYPE%%",
+ "oss"
+ );
+ spec = StringUtils.replace(
+ spec,
+ "%%INPUT_SOURCE_PROPERTY_KEY%%",
+ inputSource.lhs
+ );
+ return StringUtils.replace(
+ spec,
+ "%%INPUT_SOURCE_PROPERTY_VALUE%%",
+ inputSourceValue
+ );
+ }
+ catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ };
+
+ doIndexTest(
+ indexDatasource,
+ INDEX_TASK,
+ propsTransform,
+ INDEX_QUERIES_RESOURCE,
+ false,
+ true,
+ true
+ );
+ }
+ }
+}
diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITOssToOssParallelIndexTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITOssToOssParallelIndexTest.java
new file mode 100644
index 00000000000..ea989598b4b
--- /dev/null
+++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITOssToOssParallelIndexTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.tests.indexer;
+
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.testing.guice.DruidTestModuleFactory;
+import org.apache.druid.tests.TestNGGroup;
+import org.testng.annotations.Guice;
+import org.testng.annotations.Test;
+
+import java.util.List;
+
+/**
+ * IMPORTANT:
+ * To run this test, you must:
+ * 1) Set the bucket and path for your data. This can be done by setting -Ddruid.test.config.cloudBucket and
+ * -Ddruid.test.config.cloudPath or setting "cloud_bucket" and "cloud_path" in the config file.
+ * 2) Copy wikipedia_index_data1.json, wikipedia_index_data2.json, and wikipedia_index_data3.json
+ * located in integration-tests/src/test/resources/data/batch_index/json to your Aliyun OSS at the location set in step 1.
+ * 3) Provide -Doverride.config.path= with Aliyun OSS credentials/configs set. See
+ * integration-tests/docker/environment-configs/override-examples/oss for env vars to provide.
+ */
+@Test(groups = TestNGGroup.ALIYUN_OSS_DEEP_STORAGE)
+@Guice(moduleFactory = DruidTestModuleFactory.class)
+public class ITOssToOssParallelIndexTest extends AbstractOssInputSourceParallelIndexTest
+{
+ @Test(dataProvider = "resources")
+ public void testAliyunOssIndexData(Pair ossInputSource) throws Exception
+ {
+ doTest(ossInputSource);
+ }
+}
diff --git a/pom.xml b/pom.xml
index bbceed61c7f..2de34e85103 100644
--- a/pom.xml
+++ b/pom.xml
@@ -193,6 +193,7 @@
extensions-contrib/tdigestsketch
extensions-contrib/influxdb-emitter
extensions-contrib/gce-extensions
+ extensions-contrib/aliyun-oss-extensions
distribution
diff --git a/website/.spelling b/website/.spelling
index d38f8f5f440..b5ff7c9fc99 100644
--- a/website/.spelling
+++ b/website/.spelling
@@ -574,6 +574,14 @@ thriftJar
- ../docs/development/extensions-contrib/time-min-max.md
timeMax
timeMin
+ - ../docs/development/extensions-contrib/aliyun-oss-extensions.md
+Aliyun
+aliyun
+OSS
+AccessKey
+aliyun-oss
+oss
+url
- ../docs/development/extensions-core/approximate-histograms.md
approxHistogram
approxHistogramFold
@@ -1757,4 +1765,4 @@ UserGroupInformation
CVE-2019-17571
CVE-2019-12399
CVE-2018-17196
-bin.tar.gz
+bin.tar.gz
\ No newline at end of file