mirror of https://github.com/apache/druid.git
implement Azure InputSource reader and deprecate Azure FireHose (#9306)
* IMPLY-1946: Improve code quality and unit test coverage of the Azure extension * Update unit tests to increase test coverage for the extension * Clean up any messy code * Enfore code coverage as part of tests. * * Update azure extension pom to remove unnecessary things * update jacoco thresholds * * updgrade version of azure-storage library version uses to most upto-date version * implement Azure InputSource reader and deprecate Azure FireHose * implement azure InputSource reader * deprecate Azure FireHose implementation * * exclude common libraries that are included from druid core * Implement more of Azure input source. * * Add tests * * Add more tests * * deprecate azure firehose * * added more tests * * rollback fix for google cloud batch ingestion bug. Will be fixed in another PR. * * Added javadocs for all azure related classes * Addressed review comments * * Remove dependency on org.apache.commons:commons-collections4 * Fix LGTM warnings * Add com.google.inject.extensions:guice-assistedinject to licenses * * rename classes as suggested in review comments * * Address review comments * * Address review comments * * Address review comments
This commit is contained in:
parent
b2c00b3a79
commit
5c202343c9
|
@ -40,6 +40,7 @@ import java.util.stream.Stream;
|
|||
public abstract class CloudObjectInputSource<T extends InputEntity> extends AbstractInputSource
|
||||
implements SplittableInputSource<CloudObjectLocation>
|
||||
{
|
||||
protected static final int MAX_LISTING_LENGTH = 1024;
|
||||
private final List<URI> uris;
|
||||
private final List<URI> prefixes;
|
||||
private final List<CloudObjectLocation> objects;
|
||||
|
|
|
@ -91,6 +91,11 @@
|
|||
<artifactId>guice</artifactId>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.google.inject.extensions</groupId>
|
||||
<artifactId>guice-assistedinject</artifactId>
|
||||
<version>${guice.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.fasterxml.jackson.core</groupId>
|
||||
<artifactId>jackson-databind</artifactId>
|
||||
|
@ -116,6 +121,11 @@
|
|||
<artifactId>validation-api</artifactId>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.google.code.findbugs</groupId>
|
||||
<artifactId>jsr305</artifactId>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
|
||||
<!-- Tests -->
|
||||
<dependency>
|
||||
|
@ -140,6 +150,11 @@
|
|||
<artifactId>easymock</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>nl.jqno.equalsverifier</groupId>
|
||||
<artifactId>equalsverifier</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
|
@ -160,36 +175,36 @@
|
|||
<limit>
|
||||
<counter>INSTRUCTION</counter>
|
||||
<value>COVEREDRATIO</value>
|
||||
<minimum>0.79</minimum>
|
||||
<minimum>0.84</minimum>
|
||||
</limit>
|
||||
<limit>
|
||||
<counter>LINE</counter>
|
||||
<value>COVEREDRATIO</value>
|
||||
<minimum>0.80</minimum>
|
||||
<minimum>0.84</minimum>
|
||||
</limit>
|
||||
<limit>
|
||||
<counter>BRANCH</counter>
|
||||
<value>COVEREDRATIO</value>
|
||||
<minimum>0.70</minimum>
|
||||
<minimum>0.86</minimum>
|
||||
</limit>
|
||||
<limit>
|
||||
<counter>COMPLEXITY</counter>
|
||||
<value>COVEREDRATIO</value>
|
||||
<minimum>0.73</minimum>
|
||||
<minimum>0.80</minimum>
|
||||
</limit>
|
||||
<limit>
|
||||
<counter>METHOD</counter>
|
||||
<value>COVEREDRATIO</value>
|
||||
<minimum>0.76</minimum>
|
||||
<minimum>0.79</minimum>
|
||||
</limit>
|
||||
<limit>
|
||||
<counter>CLASS</counter>
|
||||
<value>COVEREDRATIO</value>
|
||||
<minimum>0.83</minimum>
|
||||
<minimum>0.90</minimum>
|
||||
</limit>
|
||||
</limits>
|
||||
</rule>
|
||||
</rules>
|
||||
</rules>
|
||||
</configuration>
|
||||
<executions>
|
||||
<execution>
|
||||
|
@ -199,7 +214,7 @@
|
|||
<goal>report</goal>
|
||||
<goal>check</goal>
|
||||
</goals>
|
||||
</execution>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
</plugins>
|
||||
|
|
|
@ -0,0 +1,78 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.data.input.azure;
|
||||
|
||||
import com.google.common.base.Predicate;
|
||||
import com.google.inject.assistedinject.Assisted;
|
||||
import com.google.inject.assistedinject.AssistedInject;
|
||||
import org.apache.druid.data.input.RetryingInputEntity;
|
||||
import org.apache.druid.data.input.impl.CloudObjectLocation;
|
||||
import org.apache.druid.storage.azure.AzureByteSource;
|
||||
import org.apache.druid.storage.azure.AzureByteSourceFactory;
|
||||
import org.apache.druid.storage.azure.AzureUtils;
|
||||
|
||||
import javax.annotation.Nonnull;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.net.URI;
|
||||
|
||||
/**
|
||||
* Represents an azure based input resource and knows how to read bytes from the given resource.
|
||||
*/
|
||||
public class AzureEntity extends RetryingInputEntity
|
||||
{
|
||||
private final CloudObjectLocation location;
|
||||
private final AzureByteSource byteSource;
|
||||
|
||||
@AssistedInject
|
||||
AzureEntity(
|
||||
@Nonnull @Assisted CloudObjectLocation location,
|
||||
@Nonnull AzureByteSourceFactory byteSourceFactory
|
||||
)
|
||||
{
|
||||
this.location = location;
|
||||
this.byteSource = byteSourceFactory.create(location.getBucket(), location.getPath());
|
||||
}
|
||||
|
||||
@Override
|
||||
public URI getUri()
|
||||
{
|
||||
return location.toUri(AzureInputSource.SCHEME);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Predicate<Throwable> getRetryCondition()
|
||||
{
|
||||
return AzureUtils.AZURE_RETRY;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected InputStream readFrom(long offset) throws IOException
|
||||
{
|
||||
// Get data of the given object and open an input stream
|
||||
return byteSource.openStream(offset);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String getPath()
|
||||
{
|
||||
return location.getPath();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,30 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.data.input.azure;
|
||||
|
||||
import org.apache.druid.data.input.impl.CloudObjectLocation;
|
||||
|
||||
/**
|
||||
* Factory for creating {@link AzureEntity} objects
|
||||
*/
|
||||
public interface AzureEntityFactory
|
||||
{
|
||||
AzureEntity create(CloudObjectLocation location);
|
||||
}
|
|
@ -0,0 +1,153 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.data.input.azure;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JacksonInject;
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import org.apache.druid.data.input.InputSplit;
|
||||
import org.apache.druid.data.input.impl.CloudObjectInputSource;
|
||||
import org.apache.druid.data.input.impl.CloudObjectLocation;
|
||||
import org.apache.druid.data.input.impl.SplittableInputSource;
|
||||
import org.apache.druid.storage.azure.AzureCloudBlobHolderToCloudObjectLocationConverter;
|
||||
import org.apache.druid.storage.azure.AzureCloudBlobIterableFactory;
|
||||
import org.apache.druid.storage.azure.AzureStorage;
|
||||
import org.apache.druid.storage.azure.blob.CloudBlobHolder;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.net.URI;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.stream.Stream;
|
||||
import java.util.stream.StreamSupport;
|
||||
|
||||
/**
|
||||
* Abstracts the Azure storage system where input data is stored. Allows users to retrieve entities in
|
||||
* the storage system that match either a particular uri, prefix, or object.
|
||||
*/
|
||||
public class AzureInputSource extends CloudObjectInputSource<AzureEntity>
|
||||
{
|
||||
@VisibleForTesting
|
||||
static final int MAX_LISTING_LENGTH = 1024;
|
||||
public static final String SCHEME = "azure";
|
||||
|
||||
private final AzureStorage storage;
|
||||
private final AzureEntityFactory entityFactory;
|
||||
private final AzureCloudBlobIterableFactory azureCloudBlobIterableFactory;
|
||||
private final AzureCloudBlobHolderToCloudObjectLocationConverter azureCloudBlobToLocationConverter;
|
||||
|
||||
@JsonCreator
|
||||
public AzureInputSource(
|
||||
@JacksonInject AzureStorage storage,
|
||||
@JacksonInject AzureEntityFactory entityFactory,
|
||||
@JacksonInject AzureCloudBlobIterableFactory azureCloudBlobIterableFactory,
|
||||
@JacksonInject AzureCloudBlobHolderToCloudObjectLocationConverter azureCloudBlobToLocationConverter,
|
||||
@JsonProperty("uris") @Nullable List<URI> uris,
|
||||
@JsonProperty("prefixes") @Nullable List<URI> prefixes,
|
||||
@JsonProperty("objects") @Nullable List<CloudObjectLocation> objects
|
||||
)
|
||||
{
|
||||
super(SCHEME, uris, prefixes, objects);
|
||||
this.storage = Preconditions.checkNotNull(storage, "AzureStorage");
|
||||
this.entityFactory = Preconditions.checkNotNull(entityFactory, "AzureEntityFactory");
|
||||
this.azureCloudBlobIterableFactory = Preconditions.checkNotNull(
|
||||
azureCloudBlobIterableFactory,
|
||||
"AzureCloudBlobIterableFactory"
|
||||
);
|
||||
this.azureCloudBlobToLocationConverter = Preconditions.checkNotNull(azureCloudBlobToLocationConverter, "AzureCloudBlobToLocationConverter");
|
||||
}
|
||||
|
||||
@Override
|
||||
public SplittableInputSource<CloudObjectLocation> withSplit(InputSplit<CloudObjectLocation> split)
|
||||
{
|
||||
return new AzureInputSource(
|
||||
storage,
|
||||
entityFactory,
|
||||
azureCloudBlobIterableFactory,
|
||||
azureCloudBlobToLocationConverter,
|
||||
null,
|
||||
null,
|
||||
ImmutableList.of(split.get())
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
return "AzureInputSource{" +
|
||||
"uris=" + getUris() +
|
||||
", prefixes=" + getPrefixes() +
|
||||
", objects=" + getObjects() +
|
||||
'}';
|
||||
}
|
||||
|
||||
@Override
|
||||
protected AzureEntity createEntity(InputSplit<CloudObjectLocation> split)
|
||||
{
|
||||
return entityFactory.create(split.get());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Stream<InputSplit<CloudObjectLocation>> getPrefixesSplitStream()
|
||||
{
|
||||
return StreamSupport.stream(getIterableObjectsFromPrefixes().spliterator(), false)
|
||||
.map(o -> azureCloudBlobToLocationConverter.createCloudObjectLocation(o))
|
||||
.map(InputSplit::new);
|
||||
}
|
||||
|
||||
private Iterable<CloudBlobHolder> getIterableObjectsFromPrefixes()
|
||||
{
|
||||
return azureCloudBlobIterableFactory.create(getPrefixes(), MAX_LISTING_LENGTH);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode()
|
||||
{
|
||||
return Objects.hash(
|
||||
super.hashCode(),
|
||||
storage,
|
||||
entityFactory,
|
||||
azureCloudBlobIterableFactory,
|
||||
azureCloudBlobToLocationConverter
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o)
|
||||
{
|
||||
if (this == o) {
|
||||
return true;
|
||||
}
|
||||
if (o == null || getClass() != o.getClass()) {
|
||||
return false;
|
||||
}
|
||||
if (!super.equals(o)) {
|
||||
return false;
|
||||
}
|
||||
AzureInputSource that = (AzureInputSource) o;
|
||||
return storage.equals(that.storage) &&
|
||||
entityFactory.equals(that.entityFactory) &&
|
||||
azureCloudBlobIterableFactory.equals(that.azureCloudBlobIterableFactory) &&
|
||||
azureCloudBlobToLocationConverter.equals(that.azureCloudBlobToLocationConverter);
|
||||
}
|
||||
}
|
|
@ -25,7 +25,14 @@ import com.fasterxml.jackson.annotation.JsonProperty;
|
|||
import javax.validation.constraints.NotNull;
|
||||
import java.util.Objects;
|
||||
|
||||
|
||||
/**
|
||||
* Represents an Azure based blob. Used with {@link StaticAzureBlobStoreFirehoseFactory}.
|
||||
*
|
||||
* @deprecated as of version 0.18.0 because support for firehose has been discontinued. Please use
|
||||
* {@link org.apache.druid.data.input.azure.AzureEntity} with {@link org.apache.druid.data.input.azure.AzureInputSource}
|
||||
* instead.
|
||||
*/
|
||||
@Deprecated
|
||||
public class AzureBlob
|
||||
{
|
||||
@JsonProperty
|
||||
|
@ -57,9 +64,9 @@ public class AzureBlob
|
|||
public String toString()
|
||||
{
|
||||
return "AzureBlob{"
|
||||
+ "container=" + container
|
||||
+ ",path=" + path
|
||||
+ "}";
|
||||
+ "container=" + container
|
||||
+ ",path=" + path
|
||||
+ "}";
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -43,7 +43,11 @@ import java.util.Objects;
|
|||
|
||||
/**
|
||||
* This class is heavily inspired by the StaticS3FirehoseFactory class in the org.apache.druid.firehose.s3 package
|
||||
*
|
||||
* @deprecated as of version 0.18.0 because support for firehose has been discontinued. Please use
|
||||
* {@link org.apache.druid.data.input.azure.AzureInputSource} instead.
|
||||
*/
|
||||
@Deprecated
|
||||
public class StaticAzureBlobStoreFirehoseFactory extends PrefetchableTextFilesFirehoseFactory<AzureBlob>
|
||||
{
|
||||
private final AzureStorage azureStorage;
|
||||
|
|
|
@ -24,6 +24,9 @@ import com.fasterxml.jackson.annotation.JsonProperty;
|
|||
import javax.validation.constraints.Min;
|
||||
import javax.validation.constraints.NotNull;
|
||||
|
||||
/**
|
||||
* Stores the configuration for an Azure account.
|
||||
*/
|
||||
public class AzureAccountConfig
|
||||
{
|
||||
@JsonProperty
|
||||
|
|
|
@ -20,23 +20,30 @@
|
|||
package org.apache.druid.storage.azure;
|
||||
|
||||
import com.google.common.io.ByteSource;
|
||||
import com.google.inject.assistedinject.Assisted;
|
||||
import com.google.inject.assistedinject.AssistedInject;
|
||||
import com.microsoft.azure.storage.StorageException;
|
||||
import org.apache.druid.java.util.common.logger.Logger;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.net.URISyntaxException;
|
||||
|
||||
/**
|
||||
* Used for getting an {@link InputStream} to an azure resource.
|
||||
*/
|
||||
public class AzureByteSource extends ByteSource
|
||||
{
|
||||
|
||||
private static final Logger log = new Logger(AzureByteSource.class);
|
||||
private final AzureStorage azureStorage;
|
||||
private final String containerName;
|
||||
private final String blobPath;
|
||||
|
||||
@AssistedInject
|
||||
public AzureByteSource(
|
||||
AzureStorage azureStorage,
|
||||
String containerName,
|
||||
String blobPath
|
||||
@Assisted("containerName") String containerName,
|
||||
@Assisted("blobPath") String blobPath
|
||||
)
|
||||
{
|
||||
this.azureStorage = azureStorage;
|
||||
|
@ -46,11 +53,19 @@ public class AzureByteSource extends ByteSource
|
|||
|
||||
@Override
|
||||
public InputStream openStream() throws IOException
|
||||
{
|
||||
return openStream(0L);
|
||||
}
|
||||
|
||||
public InputStream openStream(long offset) throws IOException
|
||||
{
|
||||
try {
|
||||
return azureStorage.getBlobInputStream(containerName, blobPath);
|
||||
return azureStorage.getBlobInputStream(offset, containerName, blobPath);
|
||||
}
|
||||
catch (StorageException | URISyntaxException e) {
|
||||
log.warn("Exception when opening stream to azure resource, containerName: %s, blobPath: %s, Error: %s",
|
||||
containerName, blobPath, e.getMessage()
|
||||
);
|
||||
if (AzureUtils.AZURE_RETRY.apply(e)) {
|
||||
throw new IOException("Recoverable exception", e);
|
||||
}
|
||||
|
|
|
@ -0,0 +1,30 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.storage.azure;
|
||||
|
||||
import com.google.inject.assistedinject.Assisted;
|
||||
|
||||
/**
|
||||
* Factory used to create {@link AzureByteSource}
|
||||
*/
|
||||
public interface AzureByteSourceFactory
|
||||
{
|
||||
AzureByteSource create(@Assisted("containerName") String containerName, @Assisted("blobPath") String blobPath);
|
||||
}
|
|
@ -0,0 +1,42 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.storage.azure;
|
||||
|
||||
import org.apache.druid.data.input.impl.CloudObjectLocation;
|
||||
import org.apache.druid.java.util.common.RE;
|
||||
import org.apache.druid.storage.azure.blob.CloudBlobHolder;
|
||||
|
||||
/**
|
||||
* Converts a {@link CloudBlobHolder} object to a {@link CloudObjectLocation} object
|
||||
*/
|
||||
public class AzureCloudBlobHolderToCloudObjectLocationConverter
|
||||
implements ICloudSpecificObjectToCloudObjectLocationConverter<CloudBlobHolder>
|
||||
{
|
||||
@Override
|
||||
public CloudObjectLocation createCloudObjectLocation(CloudBlobHolder cloudBlob)
|
||||
{
|
||||
try {
|
||||
return new CloudObjectLocation(cloudBlob.getContainerName(), cloudBlob.getName());
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw new RE(e);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,55 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.storage.azure;
|
||||
|
||||
import com.google.inject.assistedinject.Assisted;
|
||||
import com.google.inject.assistedinject.AssistedInject;
|
||||
import org.apache.druid.storage.azure.blob.CloudBlobHolder;
|
||||
|
||||
import java.net.URI;
|
||||
import java.util.Iterator;
|
||||
|
||||
/**
|
||||
* {@link Iterable} for {@link CloudBlobHolder} objects.
|
||||
*/
|
||||
public class AzureCloudBlobIterable implements Iterable<CloudBlobHolder>
|
||||
{
|
||||
private final Iterable<URI> prefixes;
|
||||
private final int maxListingLength;
|
||||
private final AzureCloudBlobIteratorFactory azureCloudBlobIteratorFactory;
|
||||
|
||||
@AssistedInject
|
||||
public AzureCloudBlobIterable(
|
||||
AzureCloudBlobIteratorFactory azureCloudBlobIteratorFactory,
|
||||
@Assisted final Iterable<URI> prefixes,
|
||||
@Assisted final int maxListingLength
|
||||
)
|
||||
{
|
||||
this.azureCloudBlobIteratorFactory = azureCloudBlobIteratorFactory;
|
||||
this.prefixes = prefixes;
|
||||
this.maxListingLength = maxListingLength;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Iterator<CloudBlobHolder> iterator()
|
||||
{
|
||||
return azureCloudBlobIteratorFactory.create(prefixes, maxListingLength);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,30 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.storage.azure;
|
||||
|
||||
import java.net.URI;
|
||||
|
||||
/**
|
||||
* Factory for creating {@link AzureCloudBlobIterable} objects.
|
||||
*/
|
||||
public interface AzureCloudBlobIterableFactory
|
||||
{
|
||||
AzureCloudBlobIterable create(Iterable<URI> prefixes, int maxListingLength);
|
||||
}
|
|
@ -0,0 +1,164 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.storage.azure;
|
||||
|
||||
import com.google.inject.assistedinject.Assisted;
|
||||
import com.google.inject.assistedinject.AssistedInject;
|
||||
import com.microsoft.azure.storage.ResultContinuation;
|
||||
import com.microsoft.azure.storage.ResultSegment;
|
||||
import com.microsoft.azure.storage.blob.ListBlobItem;
|
||||
import org.apache.druid.java.util.common.RE;
|
||||
import org.apache.druid.java.util.common.logger.Logger;
|
||||
import org.apache.druid.storage.azure.blob.CloudBlobHolder;
|
||||
import org.apache.druid.storage.azure.blob.ListBlobItemHolder;
|
||||
import org.apache.druid.storage.azure.blob.ListBlobItemHolderFactory;
|
||||
|
||||
import java.net.URI;
|
||||
import java.util.Iterator;
|
||||
import java.util.NoSuchElementException;
|
||||
|
||||
/**
|
||||
* This iterator is computed incrementally in batches of {@link #maxListingLength}.
|
||||
* The first call is made at the same time the iterator is constructed.
|
||||
*/
|
||||
public class AzureCloudBlobIterator implements Iterator<CloudBlobHolder>
|
||||
{
|
||||
private static final Logger log = new Logger(AzureCloudBlobIterator.class);
|
||||
private final AzureStorage storage;
|
||||
private final ListBlobItemHolderFactory blobItemDruidFactory;
|
||||
private final Iterator<URI> prefixesIterator;
|
||||
private final int maxListingLength;
|
||||
|
||||
private ResultSegment<ListBlobItem> result;
|
||||
private String currentContainer;
|
||||
private String currentPrefix;
|
||||
private ResultContinuation continuationToken;
|
||||
private CloudBlobHolder currentBlobItem;
|
||||
private Iterator<ListBlobItem> blobItemIterator;
|
||||
private final AzureAccountConfig config;
|
||||
|
||||
@AssistedInject
|
||||
AzureCloudBlobIterator(
|
||||
AzureStorage storage,
|
||||
ListBlobItemHolderFactory blobItemDruidFactory,
|
||||
AzureAccountConfig config,
|
||||
@Assisted final Iterable<URI> prefixes,
|
||||
@Assisted final int maxListingLength
|
||||
)
|
||||
{
|
||||
this.storage = storage;
|
||||
this.blobItemDruidFactory = blobItemDruidFactory;
|
||||
this.config = config;
|
||||
this.prefixesIterator = prefixes.iterator();
|
||||
this.maxListingLength = maxListingLength;
|
||||
this.result = null;
|
||||
this.currentContainer = null;
|
||||
this.currentPrefix = null;
|
||||
this.continuationToken = null;
|
||||
this.currentBlobItem = null;
|
||||
this.blobItemIterator = null;
|
||||
|
||||
if (prefixesIterator.hasNext()) {
|
||||
prepareNextRequest();
|
||||
fetchNextBatch();
|
||||
advanceBlobItem();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasNext()
|
||||
{
|
||||
return currentBlobItem != null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public CloudBlobHolder next()
|
||||
{
|
||||
if (currentBlobItem == null) {
|
||||
throw new NoSuchElementException();
|
||||
}
|
||||
|
||||
final CloudBlobHolder retVal = currentBlobItem;
|
||||
advanceBlobItem();
|
||||
return retVal;
|
||||
}
|
||||
|
||||
private void prepareNextRequest()
|
||||
{
|
||||
URI currentUri = prefixesIterator.next();
|
||||
currentContainer = currentUri.getAuthority();
|
||||
currentPrefix = AzureUtils.extractAzureKey(currentUri);
|
||||
log.debug("prepareNextRequest:\ncurrentUri: %s\ncurrentContainer: %s\ncurrentPrefix: %s",
|
||||
currentUri, currentContainer, currentPrefix
|
||||
);
|
||||
result = null;
|
||||
continuationToken = null;
|
||||
}
|
||||
|
||||
private void fetchNextBatch()
|
||||
{
|
||||
try {
|
||||
result = AzureUtils.retryAzureOperation(() -> storage.listBlobsWithPrefixInContainerSegmented(
|
||||
currentContainer,
|
||||
currentPrefix,
|
||||
continuationToken,
|
||||
maxListingLength
|
||||
), config.getMaxTries());
|
||||
continuationToken = result.getContinuationToken();
|
||||
blobItemIterator = result.getResults().iterator();
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw new RE(
|
||||
e,
|
||||
"Failed to get blob item from Azure container[%s], prefix[%s]. Error: %s",
|
||||
currentContainer,
|
||||
currentPrefix,
|
||||
e.getMessage()
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Advance objectSummaryIterator to the next non-placeholder, updating "currentObjectSummary".
|
||||
*/
|
||||
private void advanceBlobItem()
|
||||
{
|
||||
while (blobItemIterator.hasNext() || continuationToken != null || prefixesIterator.hasNext()) {
|
||||
while (blobItemIterator.hasNext()) {
|
||||
ListBlobItemHolder blobItem = blobItemDruidFactory.create(blobItemIterator.next());
|
||||
/* skip directory objects */
|
||||
if (blobItem.isCloudBlob()) {
|
||||
currentBlobItem = blobItem.getCloudBlob();
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
if (continuationToken != null) {
|
||||
fetchNextBatch();
|
||||
} else if (prefixesIterator.hasNext()) {
|
||||
prepareNextRequest();
|
||||
fetchNextBatch();
|
||||
}
|
||||
}
|
||||
|
||||
// Truly nothing left to read.
|
||||
currentBlobItem = null;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,30 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.storage.azure;
|
||||
|
||||
import java.net.URI;
|
||||
|
||||
/**
|
||||
* Factory for creating {@link AzureCloudBlobIterator} objects
|
||||
*/
|
||||
public interface AzureCloudBlobIteratorFactory
|
||||
{
|
||||
AzureCloudBlobIterator create(Iterable<URI> prefixes, int maxListingLength);
|
||||
}
|
|
@ -31,6 +31,9 @@ import java.net.URISyntaxException;
|
|||
import java.nio.file.Paths;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* Used for removing segment files stored in Azure based deep storage
|
||||
*/
|
||||
public class AzureDataSegmentKiller implements DataSegmentKiller
|
||||
{
|
||||
private static final Logger log = new Logger(AzureDataSegmentKiller.class);
|
||||
|
|
|
@ -29,6 +29,9 @@ import org.apache.druid.utils.CompressionUtils;
|
|||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* Used for Reading segment files stored in Azure based deep storage
|
||||
*/
|
||||
public class AzureDataSegmentPuller
|
||||
{
|
||||
private static final Logger log = new Logger(AzureDataSegmentPuller.class);
|
||||
|
@ -40,14 +43,13 @@ public class AzureDataSegmentPuller
|
|||
|
||||
static final String AZURE_STORAGE_HOST_ADDRESS = "blob.core.windows.net";
|
||||
|
||||
private final AzureStorage azureStorage;
|
||||
private final AzureByteSourceFactory byteSourceFactory;
|
||||
|
||||
@Inject
|
||||
public AzureDataSegmentPuller(
|
||||
AzureStorage azureStorage
|
||||
)
|
||||
AzureByteSourceFactory byteSourceFactory)
|
||||
{
|
||||
this.azureStorage = azureStorage;
|
||||
this.byteSourceFactory = byteSourceFactory;
|
||||
}
|
||||
|
||||
FileUtils.FileCopyResult getSegmentFiles(
|
||||
|
@ -64,17 +66,9 @@ public class AzureDataSegmentPuller
|
|||
"Loading container: [%s], with blobPath: [%s] and outDir: [%s]", containerName, blobPath, outDir
|
||||
);
|
||||
|
||||
boolean blobPathIsHadoop = blobPath.contains(AZURE_STORAGE_HOST_ADDRESS);
|
||||
final String actualBlobPath;
|
||||
if (blobPathIsHadoop) {
|
||||
// Remove azure's hadoop prefix to match realtime ingestion path
|
||||
actualBlobPath = blobPath.substring(
|
||||
blobPath.indexOf(AZURE_STORAGE_HOST_ADDRESS) + AZURE_STORAGE_HOST_ADDRESS.length() + 1);
|
||||
} else {
|
||||
actualBlobPath = blobPath;
|
||||
}
|
||||
final String actualBlobPath = AzureUtils.maybeRemoveAzurePathPrefix(blobPath);
|
||||
|
||||
final ByteSource byteSource = new AzureByteSource(azureStorage, containerName, actualBlobPath);
|
||||
final ByteSource byteSource = byteSourceFactory.create(containerName, actualBlobPath);
|
||||
final FileUtils.FileCopyResult result = CompressionUtils.unzip(
|
||||
byteSource,
|
||||
outDir,
|
||||
|
|
|
@ -39,6 +39,9 @@ import java.net.URISyntaxException;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* Used for writing segment files to Azure based deep storage
|
||||
*/
|
||||
public class AzureDataSegmentPusher implements DataSegmentPusher
|
||||
{
|
||||
private static final Logger log = new Logger(AzureDataSegmentPusher.class);
|
||||
|
|
|
@ -29,6 +29,9 @@ import org.apache.druid.segment.loading.SegmentLoadingException;
|
|||
|
||||
import java.io.File;
|
||||
|
||||
/**
|
||||
* A means of pulling segment files into Azure based deep storage
|
||||
*/
|
||||
@JsonTypeName(AzureStorageDruidModule.SCHEME)
|
||||
public class AzureLoadSpec implements LoadSpec
|
||||
{
|
||||
|
|
|
@ -20,7 +20,10 @@
|
|||
package org.apache.druid.storage.azure;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.microsoft.azure.storage.ResultContinuation;
|
||||
import com.microsoft.azure.storage.ResultSegment;
|
||||
import com.microsoft.azure.storage.StorageException;
|
||||
import com.microsoft.azure.storage.blob.BlobListingDetails;
|
||||
import com.microsoft.azure.storage.blob.CloudBlob;
|
||||
import com.microsoft.azure.storage.blob.CloudBlobClient;
|
||||
import com.microsoft.azure.storage.blob.CloudBlobContainer;
|
||||
|
@ -33,10 +36,15 @@ import java.io.IOException;
|
|||
import java.io.InputStream;
|
||||
import java.net.URISyntaxException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.EnumSet;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Abstracts the Azure storage layer. Makes direct calls to Azure file system.
|
||||
*/
|
||||
public class AzureStorage
|
||||
{
|
||||
private static final boolean USE_FLAT_BLOB_LISTING = true;
|
||||
|
||||
private final Logger log = new Logger(AzureStorage.class);
|
||||
|
||||
|
@ -109,6 +117,29 @@ public class AzureStorage
|
|||
return this.cloudBlobClient;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
ResultSegment<ListBlobItem> listBlobsWithPrefixInContainerSegmented(
|
||||
final String containerName,
|
||||
final String prefix,
|
||||
ResultContinuation continuationToken,
|
||||
int maxResults
|
||||
) throws StorageException, URISyntaxException
|
||||
{
|
||||
CloudBlobContainer cloudBlobContainer = cloudBlobClient.getContainerReference(containerName);
|
||||
return cloudBlobContainer
|
||||
.listBlobsSegmented(
|
||||
prefix,
|
||||
/* Use flat blob listing here so that we get only blob types and not directories.*/
|
||||
USE_FLAT_BLOB_LISTING,
|
||||
EnumSet
|
||||
.noneOf(BlobListingDetails.class),
|
||||
maxResults,
|
||||
continuationToken,
|
||||
null,
|
||||
null
|
||||
);
|
||||
}
|
||||
|
||||
private CloudBlobContainer getOrCreateCloudBlobContainer(final String containerName)
|
||||
throws StorageException, URISyntaxException
|
||||
{
|
||||
|
|
|
@ -26,19 +26,26 @@ import com.fasterxml.jackson.databind.module.SimpleModule;
|
|||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.inject.Binder;
|
||||
import com.google.inject.Provides;
|
||||
import com.google.inject.assistedinject.FactoryModuleBuilder;
|
||||
import com.microsoft.azure.storage.CloudStorageAccount;
|
||||
import com.microsoft.azure.storage.blob.CloudBlobClient;
|
||||
import org.apache.druid.data.input.azure.AzureEntityFactory;
|
||||
import org.apache.druid.data.input.azure.AzureInputSource;
|
||||
import org.apache.druid.firehose.azure.StaticAzureBlobStoreFirehoseFactory;
|
||||
import org.apache.druid.guice.Binders;
|
||||
import org.apache.druid.guice.JsonConfigProvider;
|
||||
import org.apache.druid.guice.LazySingleton;
|
||||
import org.apache.druid.initialization.DruidModule;
|
||||
import org.apache.druid.java.util.common.StringUtils;
|
||||
import org.apache.druid.storage.azure.blob.ListBlobItemHolderFactory;
|
||||
|
||||
import java.net.URISyntaxException;
|
||||
import java.security.InvalidKeyException;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Binds objects related to dealing with the Azure file system.
|
||||
*/
|
||||
public class AzureStorageDruidModule implements DruidModule
|
||||
{
|
||||
|
||||
|
@ -71,7 +78,9 @@ public class AzureStorageDruidModule implements DruidModule
|
|||
}
|
||||
},
|
||||
new SimpleModule().registerSubtypes(
|
||||
new NamedType(StaticAzureBlobStoreFirehoseFactory.class, "static-azure-blobstore"))
|
||||
new NamedType(StaticAzureBlobStoreFirehoseFactory.class, "static-azure-blobstore"),
|
||||
new NamedType(AzureInputSource.class, SCHEME)
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
|
@ -91,6 +100,17 @@ public class AzureStorageDruidModule implements DruidModule
|
|||
Binders.taskLogsBinder(binder).addBinding(SCHEME).to(AzureTaskLogs.class);
|
||||
JsonConfigProvider.bind(binder, "druid.indexer.logs", AzureTaskLogsConfig.class);
|
||||
binder.bind(AzureTaskLogs.class).in(LazySingleton.class);
|
||||
binder.bind(AzureCloudBlobHolderToCloudObjectLocationConverter.class).in(LazySingleton.class);
|
||||
binder.install(new FactoryModuleBuilder()
|
||||
.build(AzureByteSourceFactory.class));
|
||||
binder.install(new FactoryModuleBuilder()
|
||||
.build(AzureEntityFactory.class));
|
||||
binder.install(new FactoryModuleBuilder()
|
||||
.build(AzureCloudBlobIteratorFactory.class));
|
||||
binder.install(new FactoryModuleBuilder()
|
||||
.build(AzureCloudBlobIterableFactory.class));
|
||||
binder.install(new FactoryModuleBuilder()
|
||||
.build(ListBlobItemHolderFactory.class));
|
||||
}
|
||||
|
||||
@Provides
|
||||
|
|
|
@ -33,6 +33,9 @@ import java.io.IOException;
|
|||
import java.io.InputStream;
|
||||
import java.net.URISyntaxException;
|
||||
|
||||
/**
|
||||
* Deals with reading and writing task logs stored in Azure.
|
||||
*/
|
||||
public class AzureTaskLogs implements TaskLogs
|
||||
{
|
||||
|
||||
|
|
|
@ -24,6 +24,9 @@ import com.fasterxml.jackson.annotation.JsonProperty;
|
|||
import javax.validation.constraints.Min;
|
||||
import javax.validation.constraints.NotNull;
|
||||
|
||||
/**
|
||||
* Stores the configuration for writing task logs to Azure .
|
||||
*/
|
||||
public class AzureTaskLogsConfig
|
||||
{
|
||||
@JsonProperty
|
||||
|
|
|
@ -19,17 +19,26 @@
|
|||
|
||||
package org.apache.druid.storage.azure;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Predicate;
|
||||
import com.microsoft.azure.storage.StorageException;
|
||||
import org.apache.druid.java.util.common.RetryUtils;
|
||||
import org.apache.druid.java.util.common.RetryUtils.Task;
|
||||
import org.apache.druid.java.util.common.StringUtils;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
|
||||
/**
|
||||
* Utility class for miscellaneous things involving Azure.
|
||||
*/
|
||||
public class AzureUtils
|
||||
{
|
||||
|
||||
@VisibleForTesting
|
||||
static final String AZURE_STORAGE_HOST_ADDRESS = "blob.core.windows.net";
|
||||
|
||||
public static final Predicate<Throwable> AZURE_RETRY = e -> {
|
||||
if (e instanceof URISyntaxException) {
|
||||
return false;
|
||||
|
@ -46,6 +55,36 @@ public class AzureUtils
|
|||
return false;
|
||||
};
|
||||
|
||||
/**
|
||||
* extracts the path component of the supplied uri with any leading '/' characters removed.
|
||||
* @param uri the uri to extract the path for
|
||||
* @return a String representing the path component of the uri with any leading '/'
|
||||
* characters removed.
|
||||
*/
|
||||
public static String extractAzureKey(URI uri)
|
||||
{
|
||||
return StringUtils.maybeRemoveLeadingSlash(uri.getPath());
|
||||
}
|
||||
|
||||
/**
|
||||
* extracts the blob path component of the supplied uri with any leading 'blob.core.windows.net/' string removed.
|
||||
* @param blobPath the path of the blob
|
||||
* @return a String representing the blob path component of the uri with any leading 'blob.core.windows.net/' string
|
||||
* removed characters removed.
|
||||
*/
|
||||
public static String maybeRemoveAzurePathPrefix(String blobPath)
|
||||
{
|
||||
boolean blobPathIsHadoop = blobPath.contains(AZURE_STORAGE_HOST_ADDRESS);
|
||||
|
||||
if (blobPathIsHadoop) {
|
||||
// Remove azure's hadoop prefix to match realtime ingestion path
|
||||
return blobPath.substring(
|
||||
blobPath.indexOf(AZURE_STORAGE_HOST_ADDRESS) + AZURE_STORAGE_HOST_ADDRESS.length() + 1);
|
||||
} else {
|
||||
return blobPath;
|
||||
}
|
||||
}
|
||||
|
||||
static <T> T retryAzureOperation(Task<T> f, int maxTries) throws Exception
|
||||
{
|
||||
return RetryUtils.retry(f, AZURE_RETRY, maxTries);
|
||||
|
|
|
@ -0,0 +1,31 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.storage.azure;
|
||||
|
||||
import org.apache.druid.data.input.impl.CloudObjectLocation;
|
||||
|
||||
/**
|
||||
* Interface for converting between from some object, T, and a {@link CloudObjectLocation} object
|
||||
* @param <T> The object to convert to a {@link CloudObjectLocation} object
|
||||
*/
|
||||
public interface ICloudSpecificObjectToCloudObjectLocationConverter<T>
|
||||
{
|
||||
CloudObjectLocation createCloudObjectLocation(T cloudSpecificImpl);
|
||||
}
|
|
@ -0,0 +1,49 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.storage.azure.blob;
|
||||
|
||||
import com.microsoft.azure.storage.StorageException;
|
||||
import com.microsoft.azure.storage.blob.CloudBlob;
|
||||
|
||||
import java.net.URISyntaxException;
|
||||
|
||||
/**
|
||||
* Wrapper for {@link CloudBlob}. Used to make testing easier, since {@link CloudBlob}
|
||||
* is a final class and so is difficult to mock in unit tests.
|
||||
*/
|
||||
public class CloudBlobHolder
|
||||
{
|
||||
private final CloudBlob delegate;
|
||||
|
||||
public CloudBlobHolder(CloudBlob delegate)
|
||||
{
|
||||
this.delegate = delegate;
|
||||
}
|
||||
|
||||
public String getContainerName() throws URISyntaxException, StorageException
|
||||
{
|
||||
return delegate.getContainer().getName();
|
||||
}
|
||||
|
||||
public String getName()
|
||||
{
|
||||
return delegate.getName();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,71 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.storage.azure.blob;
|
||||
|
||||
import com.google.inject.assistedinject.Assisted;
|
||||
import com.google.inject.assistedinject.AssistedInject;
|
||||
import com.microsoft.azure.storage.StorageException;
|
||||
import com.microsoft.azure.storage.blob.CloudBlob;
|
||||
import com.microsoft.azure.storage.blob.ListBlobItem;
|
||||
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
|
||||
/**
|
||||
* Wrapper class for {@link ListBlobItem} interface, which was missing some useful
|
||||
* functionality for telling whether the blob was a cloudBlob or not. This class was
|
||||
* added mainly to make testing easier.
|
||||
*/
|
||||
public class ListBlobItemHolder
|
||||
{
|
||||
private final ListBlobItem delegate;
|
||||
|
||||
@AssistedInject
|
||||
public ListBlobItemHolder(@Assisted ListBlobItem delegate)
|
||||
{
|
||||
this.delegate = delegate;
|
||||
}
|
||||
|
||||
public String getContainerName() throws URISyntaxException, StorageException
|
||||
{
|
||||
return delegate.getContainer().getName();
|
||||
}
|
||||
|
||||
public URI getUri()
|
||||
{
|
||||
return delegate.getUri();
|
||||
}
|
||||
|
||||
public CloudBlobHolder getCloudBlob()
|
||||
{
|
||||
return new CloudBlobHolder((CloudBlob) delegate);
|
||||
}
|
||||
|
||||
public boolean isCloudBlob()
|
||||
{
|
||||
return delegate instanceof CloudBlob;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
return delegate.toString();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,30 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.storage.azure.blob;
|
||||
|
||||
import com.microsoft.azure.storage.blob.ListBlobItem;
|
||||
|
||||
/**
|
||||
* Factory for creating {@link ListBlobItemHolder} objects
|
||||
*/
|
||||
public interface ListBlobItemHolderFactory
|
||||
{
|
||||
ListBlobItemHolder create(ListBlobItem blobItem);
|
||||
}
|
|
@ -0,0 +1,163 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.data.input.azure;
|
||||
|
||||
import com.google.common.base.Predicate;
|
||||
import org.apache.commons.io.input.NullInputStream;
|
||||
import org.apache.druid.data.input.impl.CloudObjectLocation;
|
||||
import org.apache.druid.storage.azure.AzureByteSource;
|
||||
import org.apache.druid.storage.azure.AzureByteSourceFactory;
|
||||
import org.apache.druid.storage.azure.AzureUtils;
|
||||
import org.easymock.EasyMock;
|
||||
import org.easymock.EasyMockSupport;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.net.URI;
|
||||
|
||||
public class AzureEntityTest extends EasyMockSupport
|
||||
{
|
||||
private static final String CONTAINER_NAME = "container";
|
||||
private static final String BLOB_NAME = "blob";
|
||||
private static final int OFFSET = 20;
|
||||
private static final InputStream INPUT_STREAM = new NullInputStream(OFFSET);
|
||||
private static final IOException IO_EXCEPTION = new IOException();
|
||||
private static final URI ENTITY_URI;
|
||||
|
||||
private CloudObjectLocation location;
|
||||
private AzureByteSourceFactory byteSourceFactory;
|
||||
private AzureByteSource byteSource;
|
||||
|
||||
private AzureEntity azureEntity;
|
||||
|
||||
static {
|
||||
try {
|
||||
ENTITY_URI = new URI(AzureInputSource.SCHEME + "://" + CONTAINER_NAME + "/" + BLOB_NAME);
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setup()
|
||||
{
|
||||
location = createMock(CloudObjectLocation.class);
|
||||
byteSourceFactory = createMock(AzureByteSourceFactory.class);
|
||||
byteSource = createMock(AzureByteSource.class);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test_getUri_returnsLocationUri()
|
||||
{
|
||||
EasyMock.expect(location.getBucket()).andReturn(CONTAINER_NAME);
|
||||
EasyMock.expect(location.getPath()).andReturn(BLOB_NAME);
|
||||
EasyMock.expect(byteSourceFactory.create(CONTAINER_NAME, BLOB_NAME)).andReturn(byteSource);
|
||||
EasyMock.expect(location.toUri(AzureInputSource.SCHEME)).andReturn(ENTITY_URI);
|
||||
replayAll();
|
||||
|
||||
azureEntity = new AzureEntity(location, byteSourceFactory);
|
||||
|
||||
URI actualUri = azureEntity.getUri();
|
||||
Assert.assertEquals(ENTITY_URI, actualUri);
|
||||
|
||||
verifyAll();
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test_readFromStart_returnsExpectedStream() throws Exception
|
||||
{
|
||||
EasyMock.expect(location.getBucket()).andReturn(CONTAINER_NAME);
|
||||
EasyMock.expect(location.getPath()).andReturn(BLOB_NAME);
|
||||
EasyMock.expect(byteSource.openStream(0)).andReturn(INPUT_STREAM);
|
||||
EasyMock.expect(byteSourceFactory.create(CONTAINER_NAME, BLOB_NAME)).andReturn(byteSource);
|
||||
replayAll();
|
||||
|
||||
azureEntity = new AzureEntity(location, byteSourceFactory);
|
||||
|
||||
InputStream actualInputStream = azureEntity.readFrom(0);
|
||||
Assert.assertSame(INPUT_STREAM, actualInputStream);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test_readFrom_returnsExpectedStream() throws Exception
|
||||
{
|
||||
EasyMock.expect(location.getBucket()).andReturn(CONTAINER_NAME);
|
||||
EasyMock.expect(location.getPath()).andReturn(BLOB_NAME);
|
||||
EasyMock.expect(byteSource.openStream(OFFSET)).andReturn(INPUT_STREAM);
|
||||
EasyMock.expect(byteSourceFactory.create(CONTAINER_NAME, BLOB_NAME)).andReturn(byteSource);
|
||||
replayAll();
|
||||
|
||||
azureEntity = new AzureEntity(location, byteSourceFactory);
|
||||
|
||||
InputStream actualInputStream = azureEntity.readFrom(OFFSET);
|
||||
Assert.assertSame(INPUT_STREAM, actualInputStream);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test_readFrom_throwsIOException_propogatesError()
|
||||
{
|
||||
try {
|
||||
EasyMock.expect(location.getBucket()).andReturn(CONTAINER_NAME);
|
||||
EasyMock.expect(location.getPath()).andReturn(BLOB_NAME);
|
||||
EasyMock.expect(byteSource.openStream(OFFSET)).andThrow(IO_EXCEPTION);
|
||||
EasyMock.expect(byteSourceFactory.create(CONTAINER_NAME, BLOB_NAME)).andReturn(byteSource);
|
||||
replayAll();
|
||||
|
||||
azureEntity = new AzureEntity(location, byteSourceFactory);
|
||||
azureEntity.readFrom(OFFSET);
|
||||
}
|
||||
catch (IOException e) {
|
||||
verifyAll();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test_getPath_returnsLocationPath()
|
||||
{
|
||||
EasyMock.expect(location.getBucket()).andReturn(CONTAINER_NAME);
|
||||
EasyMock.expect(location.getPath()).andReturn(BLOB_NAME).atLeastOnce();
|
||||
EasyMock.expect(byteSourceFactory.create(CONTAINER_NAME, BLOB_NAME)).andReturn(byteSource);
|
||||
replayAll();
|
||||
|
||||
azureEntity = new AzureEntity(location, byteSourceFactory);
|
||||
String actualPath = azureEntity.getPath();
|
||||
|
||||
Assert.assertEquals(BLOB_NAME, actualPath);
|
||||
verifyAll();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test_getRetryCondition_returnsExpectedRetryCondition()
|
||||
{
|
||||
EasyMock.expect(location.getBucket()).andReturn(CONTAINER_NAME);
|
||||
EasyMock.expect(location.getPath()).andReturn(BLOB_NAME).atLeastOnce();
|
||||
EasyMock.expect(byteSourceFactory.create(CONTAINER_NAME, BLOB_NAME)).andReturn(byteSource);
|
||||
replayAll();
|
||||
|
||||
azureEntity = new AzureEntity(location, byteSourceFactory);
|
||||
Predicate<Throwable> actualRetryCondition = azureEntity.getRetryCondition();
|
||||
Assert.assertSame(AzureUtils.AZURE_RETRY, actualRetryCondition);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,189 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.data.input.azure;
|
||||
|
||||
import com.fasterxml.jackson.databind.InjectableValues;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import org.apache.druid.data.input.impl.CloudObjectLocation;
|
||||
import org.apache.druid.jackson.DefaultObjectMapper;
|
||||
import org.apache.druid.storage.azure.AzureCloudBlobHolderToCloudObjectLocationConverter;
|
||||
import org.apache.druid.storage.azure.AzureCloudBlobIterableFactory;
|
||||
import org.apache.druid.storage.azure.AzureStorage;
|
||||
import org.apache.druid.storage.azure.AzureStorageDruidModule;
|
||||
import org.easymock.EasyMockSupport;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.net.URI;
|
||||
import java.util.List;
|
||||
|
||||
public class AzureInputSourceSerdeTest extends EasyMockSupport
|
||||
{
|
||||
private static final String JSON_WITH_URIS = "{\n"
|
||||
+ " \"type\": \"azure\",\n"
|
||||
+ " \"uris\": [\"azure://datacontainer2/wikipedia.json\"]\n"
|
||||
+ "}";
|
||||
|
||||
private static final String JSON_WITH_PREFIXES = "{\n"
|
||||
+ " \"type\": \"azure\",\n"
|
||||
+ " \"prefixes\": [\"azure://datacontainer2\"]\n"
|
||||
+ "}";
|
||||
|
||||
private static final String JSON_WITH_OBJECTS = "{\n"
|
||||
+ " \"type\": \"azure\",\n"
|
||||
+ " \"objects\": [\n"
|
||||
+ " { \"bucket\": \"container1\", \"path\": \"bar/file1.json\"},\n"
|
||||
+ " { \"bucket\": \"conatiner2\", \"path\": \"foo/file2.json\"}\n"
|
||||
+ " ]\n"
|
||||
+ " }";
|
||||
|
||||
private static final List<URI> EXPECTED_URIS;
|
||||
private static final List<URI> EXPECTED_PREFIXES;
|
||||
private static final List<CloudObjectLocation> EXPECTED_CLOUD_OBJECTS;
|
||||
|
||||
private AzureStorage azureStorage;
|
||||
private AzureEntityFactory entityFactory;
|
||||
private AzureCloudBlobIterableFactory azureCloudBlobIterableFactory;
|
||||
private AzureCloudBlobHolderToCloudObjectLocationConverter azureCloudBlobToLocationConverter;
|
||||
|
||||
static {
|
||||
try {
|
||||
EXPECTED_URIS = ImmutableList.of(new URI("azure://datacontainer2/wikipedia.json"));
|
||||
EXPECTED_PREFIXES = ImmutableList.of(new URI("azure://datacontainer2"));
|
||||
EXPECTED_CLOUD_OBJECTS = ImmutableList.of(
|
||||
new CloudObjectLocation("container1", "bar/file1.json"),
|
||||
new CloudObjectLocation("conatiner2", "foo/file2.json")
|
||||
);
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setup()
|
||||
{
|
||||
azureStorage = createMock(AzureStorage.class);
|
||||
entityFactory = createMock(AzureEntityFactory.class);
|
||||
azureCloudBlobIterableFactory = createMock(AzureCloudBlobIterableFactory.class);
|
||||
azureCloudBlobToLocationConverter = createMock(AzureCloudBlobHolderToCloudObjectLocationConverter.class);
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test_uriSerde_constructsProperAzureInputSource() throws Exception
|
||||
{
|
||||
final InjectableValues.Std injectableValues = new InjectableValues.Std();
|
||||
injectableValues.addValue(AzureStorage.class, azureStorage);
|
||||
injectableValues.addValue(AzureEntityFactory.class, entityFactory);
|
||||
injectableValues.addValue(AzureCloudBlobIterableFactory.class, azureCloudBlobIterableFactory);
|
||||
injectableValues.addValue(
|
||||
AzureCloudBlobHolderToCloudObjectLocationConverter.class,
|
||||
azureCloudBlobToLocationConverter
|
||||
);
|
||||
final ObjectMapper objectMapper = new DefaultObjectMapper()
|
||||
.registerModules(new AzureStorageDruidModule().getJacksonModules());
|
||||
objectMapper.setInjectableValues(injectableValues);
|
||||
|
||||
final AzureInputSource inputSource = objectMapper.readValue(JSON_WITH_URIS, AzureInputSource.class);
|
||||
verifyInputSourceWithUris(inputSource);
|
||||
|
||||
final AzureInputSource roundTripInputSource = objectMapper.readValue(
|
||||
objectMapper.writeValueAsBytes(inputSource),
|
||||
AzureInputSource.class);
|
||||
verifyInputSourceWithUris(roundTripInputSource);
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test_prefixSerde_constructsProperAzureInputSource() throws Exception
|
||||
{
|
||||
final InjectableValues.Std injectableValues = new InjectableValues.Std();
|
||||
injectableValues.addValue(AzureStorage.class, azureStorage);
|
||||
injectableValues.addValue(AzureEntityFactory.class, entityFactory);
|
||||
injectableValues.addValue(AzureCloudBlobIterableFactory.class, azureCloudBlobIterableFactory);
|
||||
injectableValues.addValue(
|
||||
AzureCloudBlobHolderToCloudObjectLocationConverter.class,
|
||||
azureCloudBlobToLocationConverter
|
||||
);
|
||||
final ObjectMapper objectMapper = new DefaultObjectMapper()
|
||||
.registerModules(new AzureStorageDruidModule().getJacksonModules());
|
||||
objectMapper.setInjectableValues(injectableValues);
|
||||
|
||||
final AzureInputSource inputSource = objectMapper.readValue(JSON_WITH_PREFIXES, AzureInputSource.class);
|
||||
verifyInputSourceWithPrefixes(inputSource);
|
||||
|
||||
final AzureInputSource roundTripInputSource = objectMapper.readValue(
|
||||
objectMapper.writeValueAsBytes(inputSource),
|
||||
AzureInputSource.class);
|
||||
verifyInputSourceWithPrefixes(roundTripInputSource);
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test_objectSerde_constructsProperAzureInputSource() throws Exception
|
||||
{
|
||||
final InjectableValues.Std injectableValues = new InjectableValues.Std();
|
||||
injectableValues.addValue(AzureStorage.class, azureStorage);
|
||||
injectableValues.addValue(AzureEntityFactory.class, entityFactory);
|
||||
injectableValues.addValue(AzureCloudBlobIterableFactory.class, azureCloudBlobIterableFactory);
|
||||
injectableValues.addValue(
|
||||
AzureCloudBlobHolderToCloudObjectLocationConverter.class,
|
||||
azureCloudBlobToLocationConverter
|
||||
);
|
||||
final ObjectMapper objectMapper = new DefaultObjectMapper()
|
||||
.registerModules(new AzureStorageDruidModule().getJacksonModules());
|
||||
objectMapper.setInjectableValues(injectableValues);
|
||||
|
||||
final AzureInputSource inputSource = objectMapper.readValue(JSON_WITH_OBJECTS, AzureInputSource.class);
|
||||
verifyInputSourceWithObjects(inputSource);
|
||||
|
||||
final AzureInputSource roundTripInputSource = objectMapper.readValue(
|
||||
objectMapper.writeValueAsBytes(inputSource),
|
||||
AzureInputSource.class);
|
||||
verifyInputSourceWithObjects(roundTripInputSource);
|
||||
|
||||
}
|
||||
|
||||
private static void verifyInputSourceWithUris(final AzureInputSource inputSource)
|
||||
{
|
||||
|
||||
Assert.assertEquals(EXPECTED_URIS, inputSource.getUris());
|
||||
Assert.assertNull(inputSource.getPrefixes());
|
||||
Assert.assertNull(inputSource.getObjects());
|
||||
}
|
||||
|
||||
private static void verifyInputSourceWithPrefixes(final AzureInputSource inputSource)
|
||||
{
|
||||
|
||||
Assert.assertNull(inputSource.getUris());
|
||||
Assert.assertEquals(EXPECTED_PREFIXES, inputSource.getPrefixes());
|
||||
Assert.assertNull(inputSource.getObjects());
|
||||
}
|
||||
|
||||
private static void verifyInputSourceWithObjects(final AzureInputSource inputSource)
|
||||
{
|
||||
Assert.assertNull(inputSource.getUris());
|
||||
Assert.assertNull(inputSource.getPrefixes());
|
||||
Assert.assertEquals(EXPECTED_CLOUD_OBJECTS, inputSource.getObjects());
|
||||
}
|
||||
}
|
|
@ -0,0 +1,222 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.data.input.azure;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import nl.jqno.equalsverifier.EqualsVerifier;
|
||||
import org.apache.druid.data.input.InputSplit;
|
||||
import org.apache.druid.data.input.impl.CloudObjectLocation;
|
||||
import org.apache.druid.data.input.impl.SplittableInputSource;
|
||||
import org.apache.druid.java.util.common.logger.Logger;
|
||||
import org.apache.druid.storage.azure.AzureCloudBlobHolderToCloudObjectLocationConverter;
|
||||
import org.apache.druid.storage.azure.AzureCloudBlobIterable;
|
||||
import org.apache.druid.storage.azure.AzureCloudBlobIterableFactory;
|
||||
import org.apache.druid.storage.azure.AzureStorage;
|
||||
import org.apache.druid.storage.azure.blob.CloudBlobHolder;
|
||||
import org.easymock.EasyMock;
|
||||
import org.easymock.EasyMockSupport;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.net.URI;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Spliterators;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
public class AzureInputSourceTest extends EasyMockSupport
|
||||
{
|
||||
private static final String CONTAINER_NAME = "container";
|
||||
private static final String BLOB_NAME = "blob";
|
||||
private static final URI PREFIX_URI;
|
||||
private final List<URI> EMPTY_URIS = ImmutableList.of();
|
||||
private final List<URI> EMPTY_PREFIXES = ImmutableList.of();
|
||||
private final List<CloudObjectLocation> EMPTY_OBJECTS = ImmutableList.of();
|
||||
private static final String CONTAINER = "CONTAINER";
|
||||
private static final String BLOB_PATH = "BLOB_PATH";
|
||||
private static final CloudObjectLocation CLOUD_OBJECT_LOCATION_1 = new CloudObjectLocation(CONTAINER, BLOB_PATH);
|
||||
|
||||
private AzureStorage storage;
|
||||
private AzureEntityFactory entityFactory;
|
||||
private AzureCloudBlobIterableFactory azureCloudBlobIterableFactory;
|
||||
private AzureCloudBlobHolderToCloudObjectLocationConverter azureCloudBlobToLocationConverter;
|
||||
|
||||
private InputSplit<CloudObjectLocation> inputSplit;
|
||||
private AzureEntity azureEntity1;
|
||||
private CloudBlobHolder cloudBlobDruid1;
|
||||
private AzureCloudBlobIterable azureCloudBlobIterable;
|
||||
|
||||
private AzureInputSource azureInputSource;
|
||||
|
||||
static {
|
||||
try {
|
||||
PREFIX_URI = new URI(AzureInputSource.SCHEME + "://" + CONTAINER_NAME + "/" + BLOB_NAME);
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setup()
|
||||
{
|
||||
storage = createMock(AzureStorage.class);
|
||||
entityFactory = createMock(AzureEntityFactory.class);
|
||||
inputSplit = createMock(InputSplit.class);
|
||||
azureEntity1 = createMock(AzureEntity.class);
|
||||
azureCloudBlobIterableFactory = createMock(AzureCloudBlobIterableFactory.class);
|
||||
azureCloudBlobToLocationConverter = createMock(AzureCloudBlobHolderToCloudObjectLocationConverter.class);
|
||||
cloudBlobDruid1 = createMock(CloudBlobHolder.class);
|
||||
azureCloudBlobIterable = createMock(AzureCloudBlobIterable.class);
|
||||
}
|
||||
|
||||
@Test(expected = IllegalArgumentException.class)
|
||||
public void test_constructor_emptyUrisEmptyPrefixesEmptyObjects_throwsIllegalArgumentException()
|
||||
{
|
||||
replayAll();
|
||||
azureInputSource = new AzureInputSource(
|
||||
storage,
|
||||
entityFactory,
|
||||
azureCloudBlobIterableFactory,
|
||||
azureCloudBlobToLocationConverter,
|
||||
EMPTY_URIS,
|
||||
EMPTY_PREFIXES,
|
||||
EMPTY_OBJECTS
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test_createEntity_returnsExpectedEntity()
|
||||
{
|
||||
EasyMock.expect(entityFactory.create(CLOUD_OBJECT_LOCATION_1)).andReturn(azureEntity1);
|
||||
EasyMock.expect(inputSplit.get()).andReturn(CLOUD_OBJECT_LOCATION_1);
|
||||
replayAll();
|
||||
|
||||
List<CloudObjectLocation> objects = ImmutableList.of(CLOUD_OBJECT_LOCATION_1);
|
||||
azureInputSource = new AzureInputSource(
|
||||
storage,
|
||||
entityFactory,
|
||||
azureCloudBlobIterableFactory,
|
||||
azureCloudBlobToLocationConverter,
|
||||
EMPTY_URIS,
|
||||
EMPTY_PREFIXES,
|
||||
objects
|
||||
);
|
||||
|
||||
AzureEntity actualAzureEntity = azureInputSource.createEntity(inputSplit);
|
||||
Assert.assertSame(azureEntity1, actualAzureEntity);
|
||||
verifyAll();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test_getPrefixesSplitStream_successfullyCreatesCloudLocation_returnsExpectedLocations()
|
||||
{
|
||||
List<URI> prefixes = ImmutableList.of(PREFIX_URI);
|
||||
List<CloudObjectLocation> expectedCloudLocations = ImmutableList.of(CLOUD_OBJECT_LOCATION_1);
|
||||
List<CloudBlobHolder> expectedCloudBlobs = ImmutableList.of(cloudBlobDruid1);
|
||||
Iterator<CloudBlobHolder> expectedCloudBlobsIterator = expectedCloudBlobs.iterator();
|
||||
EasyMock.expect(azureCloudBlobIterableFactory.create(prefixes, AzureInputSource.MAX_LISTING_LENGTH)).andReturn(
|
||||
azureCloudBlobIterable);
|
||||
EasyMock.expect(azureCloudBlobIterable.spliterator())
|
||||
.andReturn(Spliterators.spliteratorUnknownSize(expectedCloudBlobsIterator, 0));
|
||||
EasyMock.expect(azureCloudBlobToLocationConverter.createCloudObjectLocation(cloudBlobDruid1))
|
||||
.andReturn(CLOUD_OBJECT_LOCATION_1);
|
||||
replayAll();
|
||||
|
||||
azureInputSource = new AzureInputSource(
|
||||
storage,
|
||||
entityFactory,
|
||||
azureCloudBlobIterableFactory,
|
||||
azureCloudBlobToLocationConverter,
|
||||
EMPTY_URIS,
|
||||
prefixes,
|
||||
EMPTY_OBJECTS
|
||||
);
|
||||
|
||||
Stream<InputSplit<CloudObjectLocation>> cloudObjectStream = azureInputSource.getPrefixesSplitStream();
|
||||
|
||||
List<CloudObjectLocation> actualCloudLocationList = cloudObjectStream.map(split -> split.get())
|
||||
.collect(Collectors.toList());
|
||||
verifyAll();
|
||||
Assert.assertEquals(expectedCloudLocations, actualCloudLocationList);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test_withSplit_constructsExpectedInputSource()
|
||||
{
|
||||
List<URI> prefixes = ImmutableList.of(PREFIX_URI);
|
||||
EasyMock.expect(inputSplit.get()).andReturn(CLOUD_OBJECT_LOCATION_1);
|
||||
replayAll();
|
||||
|
||||
azureInputSource = new AzureInputSource(
|
||||
storage,
|
||||
entityFactory,
|
||||
azureCloudBlobIterableFactory,
|
||||
azureCloudBlobToLocationConverter,
|
||||
EMPTY_URIS,
|
||||
prefixes,
|
||||
EMPTY_OBJECTS
|
||||
);
|
||||
|
||||
SplittableInputSource<CloudObjectLocation> newInputSource = azureInputSource.withSplit(inputSplit);
|
||||
Assert.assertTrue(newInputSource.isSplittable());
|
||||
verifyAll();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test_toString_returnsExpectedString()
|
||||
{
|
||||
List<URI> prefixes = ImmutableList.of(PREFIX_URI);
|
||||
azureInputSource = new AzureInputSource(
|
||||
storage,
|
||||
entityFactory,
|
||||
azureCloudBlobIterableFactory,
|
||||
azureCloudBlobToLocationConverter,
|
||||
EMPTY_URIS,
|
||||
prefixes,
|
||||
EMPTY_OBJECTS
|
||||
);
|
||||
|
||||
String actualToString = azureInputSource.toString();
|
||||
Assert.assertEquals("AzureInputSource{uris=[], prefixes=[azure://container/blob], objects=[]}", actualToString);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void abidesEqualsContract()
|
||||
{
|
||||
EqualsVerifier.forClass(AzureInputSource.class)
|
||||
.usingGetClass()
|
||||
.withPrefabValues(Logger.class, new Logger(AzureStorage.class), new Logger(AzureStorage.class))
|
||||
.withNonnullFields("storage")
|
||||
.withNonnullFields("entityFactory")
|
||||
.withNonnullFields("azureCloudBlobIterableFactory")
|
||||
.withNonnullFields("azureCloudBlobToLocationConverter")
|
||||
.verify();
|
||||
}
|
||||
|
||||
@After
|
||||
public void cleanup()
|
||||
{
|
||||
resetAll();
|
||||
}
|
||||
}
|
|
@ -30,16 +30,18 @@ import java.net.URISyntaxException;
|
|||
|
||||
public class AzureByteSourceTest extends EasyMockSupport
|
||||
{
|
||||
private static final long NO_OFFSET = 0L;
|
||||
private static final long OFFSET = 10L;
|
||||
|
||||
@Test
|
||||
public void openStreamTest() throws IOException, URISyntaxException, StorageException
|
||||
public void test_openStream_withoutOffset_succeeds() throws IOException, URISyntaxException, StorageException
|
||||
{
|
||||
final String containerName = "container";
|
||||
final String blobPath = "/path/to/file";
|
||||
AzureStorage azureStorage = createMock(AzureStorage.class);
|
||||
InputStream stream = createMock(InputStream.class);
|
||||
|
||||
EasyMock.expect(azureStorage.getBlobInputStream(containerName, blobPath)).andReturn(stream);
|
||||
EasyMock.expect(azureStorage.getBlobInputStream(NO_OFFSET, containerName, blobPath)).andReturn(stream);
|
||||
|
||||
replayAll();
|
||||
|
||||
|
@ -50,6 +52,25 @@ public class AzureByteSourceTest extends EasyMockSupport
|
|||
verifyAll();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test_openStream_withOffset_succeeds() throws IOException, URISyntaxException, StorageException
|
||||
{
|
||||
final String containerName = "container";
|
||||
final String blobPath = "/path/to/file";
|
||||
AzureStorage azureStorage = createMock(AzureStorage.class);
|
||||
InputStream stream = createMock(InputStream.class);
|
||||
|
||||
EasyMock.expect(azureStorage.getBlobInputStream(OFFSET, containerName, blobPath)).andReturn(stream);
|
||||
|
||||
replayAll();
|
||||
|
||||
AzureByteSource byteSource = new AzureByteSource(azureStorage, containerName, blobPath);
|
||||
|
||||
byteSource.openStream(10L);
|
||||
|
||||
verifyAll();
|
||||
}
|
||||
|
||||
@Test(expected = IOException.class)
|
||||
public void openStreamWithRecoverableErrorTest() throws URISyntaxException, StorageException, IOException
|
||||
{
|
||||
|
@ -57,7 +78,7 @@ public class AzureByteSourceTest extends EasyMockSupport
|
|||
final String blobPath = "/path/to/file";
|
||||
AzureStorage azureStorage = createMock(AzureStorage.class);
|
||||
|
||||
EasyMock.expect(azureStorage.getBlobInputStream(containerName, blobPath)).andThrow(
|
||||
EasyMock.expect(azureStorage.getBlobInputStream(NO_OFFSET, containerName, blobPath)).andThrow(
|
||||
new StorageException(
|
||||
"",
|
||||
"",
|
||||
|
|
|
@ -0,0 +1,59 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.storage.azure;
|
||||
|
||||
import org.apache.druid.data.input.impl.CloudObjectLocation;
|
||||
import org.apache.druid.storage.azure.blob.CloudBlobHolder;
|
||||
import org.easymock.EasyMock;
|
||||
import org.easymock.EasyMockSupport;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
public class AzureCloudBlobHolderToCloudObjectLocationConverterTest extends EasyMockSupport
|
||||
{
|
||||
private static final String CONTAINER1 = "container1";
|
||||
private static final String BLOB1 = "blob1";
|
||||
|
||||
private CloudBlobHolder cloudBlob;
|
||||
|
||||
private AzureCloudBlobHolderToCloudObjectLocationConverter converter;
|
||||
|
||||
@Before
|
||||
public void setup()
|
||||
{
|
||||
cloudBlob = createMock(CloudBlobHolder.class);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test_createCloudObjectLocation_returnsExpectedLocation() throws Exception
|
||||
{
|
||||
EasyMock.expect(cloudBlob.getContainerName()).andReturn(CONTAINER1);
|
||||
EasyMock.expect(cloudBlob.getName()).andReturn(BLOB1);
|
||||
replayAll();
|
||||
|
||||
CloudObjectLocation expectedLocation = new CloudObjectLocation(CONTAINER1, BLOB1);
|
||||
converter = new AzureCloudBlobHolderToCloudObjectLocationConverter();
|
||||
CloudObjectLocation actualLocation = converter.createCloudObjectLocation(cloudBlob);
|
||||
|
||||
Assert.assertEquals(expectedLocation, actualLocation);
|
||||
verifyAll();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,338 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.storage.azure;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.microsoft.azure.storage.ResultContinuation;
|
||||
import com.microsoft.azure.storage.ResultSegment;
|
||||
import com.microsoft.azure.storage.StorageException;
|
||||
import com.microsoft.azure.storage.blob.ListBlobItem;
|
||||
import org.apache.druid.java.util.common.RE;
|
||||
import org.apache.druid.storage.azure.blob.CloudBlobHolder;
|
||||
import org.apache.druid.storage.azure.blob.ListBlobItemHolder;
|
||||
import org.apache.druid.storage.azure.blob.ListBlobItemHolderFactory;
|
||||
import org.easymock.EasyMock;
|
||||
import org.easymock.EasyMockSupport;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.NoSuchElementException;
|
||||
|
||||
public class AzureCloudBlobIteratorTest extends EasyMockSupport
|
||||
{
|
||||
private static final String AZURE = "azure";
|
||||
private static final String CONTAINER1 = "container1";
|
||||
private static final String PREFIX_ONLY_CLOUD_BLOBS = "prefixOnlyCloudBlobs";
|
||||
private static final String PREFIX_WITH_NO_BLOBS = "prefixWithNoBlobs";
|
||||
private static final String PREFIX_WITH_CLOUD_BLOBS_AND_DIRECTORIES = "prefixWithCloudBlobsAndDirectories";
|
||||
private static final URI PREFIX_ONLY_CLOUD_BLOBS_URI;
|
||||
private static final URI PREFIX_WITH_NO_BLOBS_URI;
|
||||
private static final URI PREFIX_WITH_CLOUD_BLOBS_AND_DIRECTORIES_URI;
|
||||
private static final List<URI> EMPTY_URI_PREFIXES = ImmutableList.of();
|
||||
private static final List<URI> PREFIXES;
|
||||
private static final int MAX_LISTING_LENGTH = 10;
|
||||
private static final int MAX_TRIES = 2;
|
||||
private static final StorageException RETRYABLE_EXCEPTION = new StorageException("", "", new IOException());
|
||||
private static final URISyntaxException NON_RETRYABLE_EXCEPTION = new URISyntaxException("", "");
|
||||
|
||||
private AzureStorage storage;
|
||||
private ListBlobItemHolderFactory blobItemDruidFactory;
|
||||
private AzureAccountConfig config;
|
||||
private ResultSegment<ListBlobItem> resultSegmentPrefixOnlyAndFailLessThanMaxTriesCloudBlobs1;
|
||||
private ResultSegment<ListBlobItem> resultSegmentPrefixOnlyAndFailLessThanMaxTriesCloudBlobs2;
|
||||
private ResultSegment<ListBlobItem> resultSegmentPrefixWithNoBlobs;
|
||||
private ResultSegment<ListBlobItem> resultSegmentPrefixWithCloudBlobsAndDirectories;
|
||||
|
||||
private ResultContinuation resultContinuationPrefixOnlyCloudBlobs = new ResultContinuation();
|
||||
private ResultContinuation nullResultContinuationToken = null;
|
||||
|
||||
private ListBlobItem blobItemPrefixWithOnlyCloudBlobs1;
|
||||
private ListBlobItemHolder cloudBlobItemPrefixWithOnlyCloudBlobs1;
|
||||
private CloudBlobHolder cloudBlobDruidPrefixWithOnlyCloudBlobs1;
|
||||
|
||||
private ListBlobItem blobItemPrefixWithOnlyCloudBlobs2;
|
||||
private ListBlobItemHolder cloudBlobItemPrefixWithOnlyCloudBlobs2;
|
||||
private CloudBlobHolder cloudBlobDruidPrefixWithOnlyCloudBlobs2;
|
||||
|
||||
private ListBlobItem blobItemPrefixWithCloudBlobsAndDirectories1;
|
||||
private ListBlobItemHolder directoryItemPrefixWithCloudBlobsAndDirectories;
|
||||
|
||||
private ListBlobItem blobItemPrefixWithCloudBlobsAndDirectories2;
|
||||
private ListBlobItemHolder cloudBlobItemPrefixWithCloudBlobsAndDirectories;
|
||||
private CloudBlobHolder cloudBlobDruidPrefixWithCloudBlobsAndDirectories;
|
||||
|
||||
private ListBlobItem blobItemPrefixWithCloudBlobsAndDirectories3;
|
||||
private ListBlobItemHolder directoryItemPrefixWithCloudBlobsAndDirectories3;
|
||||
|
||||
|
||||
private AzureCloudBlobIterator azureCloudBlobIterator;
|
||||
|
||||
static {
|
||||
try {
|
||||
PREFIX_ONLY_CLOUD_BLOBS_URI = new URI(AZURE + "://" + CONTAINER1 + "/" + PREFIX_ONLY_CLOUD_BLOBS);
|
||||
PREFIX_WITH_NO_BLOBS_URI = new URI(AZURE + "://" + CONTAINER1 + "/" + PREFIX_WITH_NO_BLOBS);
|
||||
PREFIX_WITH_CLOUD_BLOBS_AND_DIRECTORIES_URI = new URI(AZURE
|
||||
+ "://"
|
||||
+ CONTAINER1
|
||||
+ "/"
|
||||
+ PREFIX_WITH_CLOUD_BLOBS_AND_DIRECTORIES);
|
||||
PREFIXES = ImmutableList.of(
|
||||
PREFIX_ONLY_CLOUD_BLOBS_URI,
|
||||
PREFIX_WITH_NO_BLOBS_URI,
|
||||
PREFIX_WITH_CLOUD_BLOBS_AND_DIRECTORIES_URI
|
||||
);
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setup()
|
||||
{
|
||||
storage = createMock(AzureStorage.class);
|
||||
config = createMock(AzureAccountConfig.class);
|
||||
blobItemDruidFactory = createMock(ListBlobItemHolderFactory.class);
|
||||
|
||||
resultSegmentPrefixOnlyAndFailLessThanMaxTriesCloudBlobs1 = createMock(ResultSegment.class);
|
||||
resultSegmentPrefixOnlyAndFailLessThanMaxTriesCloudBlobs2 = createMock(ResultSegment.class);
|
||||
resultSegmentPrefixWithNoBlobs = createMock(ResultSegment.class);
|
||||
resultSegmentPrefixWithCloudBlobsAndDirectories = createMock(ResultSegment.class);
|
||||
cloudBlobItemPrefixWithOnlyCloudBlobs1 = createMock(ListBlobItemHolder.class);
|
||||
|
||||
blobItemPrefixWithOnlyCloudBlobs1 = createMock(ListBlobItem.class);
|
||||
cloudBlobItemPrefixWithOnlyCloudBlobs1 = createMock(ListBlobItemHolder.class);
|
||||
cloudBlobDruidPrefixWithOnlyCloudBlobs1 = createMock(CloudBlobHolder.class);
|
||||
|
||||
blobItemPrefixWithOnlyCloudBlobs2 = createMock(ListBlobItem.class);
|
||||
cloudBlobItemPrefixWithOnlyCloudBlobs2 = createMock(ListBlobItemHolder.class);
|
||||
cloudBlobDruidPrefixWithOnlyCloudBlobs2 = createMock(CloudBlobHolder.class);
|
||||
|
||||
blobItemPrefixWithCloudBlobsAndDirectories1 = createMock(ListBlobItem.class);
|
||||
directoryItemPrefixWithCloudBlobsAndDirectories = createMock(ListBlobItemHolder.class);
|
||||
|
||||
blobItemPrefixWithCloudBlobsAndDirectories2 = createMock(ListBlobItem.class);
|
||||
cloudBlobItemPrefixWithCloudBlobsAndDirectories = createMock(ListBlobItemHolder.class);
|
||||
cloudBlobDruidPrefixWithCloudBlobsAndDirectories = createMock(CloudBlobHolder.class);
|
||||
|
||||
blobItemPrefixWithCloudBlobsAndDirectories3 = createMock(ListBlobItem.class);
|
||||
directoryItemPrefixWithCloudBlobsAndDirectories3 = createMock(ListBlobItemHolder.class);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test_hasNext_noBlobs_returnsFalse()
|
||||
{
|
||||
azureCloudBlobIterator = new AzureCloudBlobIterator(
|
||||
storage,
|
||||
blobItemDruidFactory,
|
||||
config,
|
||||
EMPTY_URI_PREFIXES,
|
||||
MAX_LISTING_LENGTH
|
||||
);
|
||||
boolean hasNext = azureCloudBlobIterator.hasNext();
|
||||
Assert.assertFalse(hasNext);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test_next_prefixesWithMultipleBlobsAndSomeDirectories_returnsExpectedBlobs() throws Exception
|
||||
{
|
||||
EasyMock.expect(config.getMaxTries()).andReturn(MAX_TRIES).atLeastOnce();
|
||||
EasyMock.expect(cloudBlobItemPrefixWithOnlyCloudBlobs1.isCloudBlob()).andReturn(true);
|
||||
EasyMock.expect(cloudBlobItemPrefixWithOnlyCloudBlobs1.getCloudBlob()).andReturn(
|
||||
cloudBlobDruidPrefixWithOnlyCloudBlobs1);
|
||||
EasyMock.expect(blobItemDruidFactory.create(blobItemPrefixWithOnlyCloudBlobs1)).andReturn(
|
||||
cloudBlobItemPrefixWithOnlyCloudBlobs1);
|
||||
|
||||
EasyMock.expect(cloudBlobItemPrefixWithOnlyCloudBlobs2.isCloudBlob()).andReturn(true);
|
||||
EasyMock.expect(cloudBlobItemPrefixWithOnlyCloudBlobs2.getCloudBlob()).andReturn(
|
||||
cloudBlobDruidPrefixWithOnlyCloudBlobs2);
|
||||
EasyMock.expect(blobItemDruidFactory.create(blobItemPrefixWithOnlyCloudBlobs2)).andReturn(
|
||||
cloudBlobItemPrefixWithOnlyCloudBlobs2);
|
||||
|
||||
EasyMock.expect(directoryItemPrefixWithCloudBlobsAndDirectories.isCloudBlob()).andReturn(false);
|
||||
EasyMock.expect(blobItemDruidFactory.create(blobItemPrefixWithCloudBlobsAndDirectories1)).andReturn(
|
||||
directoryItemPrefixWithCloudBlobsAndDirectories);
|
||||
|
||||
EasyMock.expect(cloudBlobItemPrefixWithCloudBlobsAndDirectories.isCloudBlob()).andReturn(true);
|
||||
EasyMock.expect(cloudBlobItemPrefixWithCloudBlobsAndDirectories.getCloudBlob()).andReturn(
|
||||
cloudBlobDruidPrefixWithCloudBlobsAndDirectories);
|
||||
EasyMock.expect(blobItemDruidFactory.create(blobItemPrefixWithCloudBlobsAndDirectories2)).andReturn(
|
||||
cloudBlobItemPrefixWithCloudBlobsAndDirectories);
|
||||
|
||||
EasyMock.expect(directoryItemPrefixWithCloudBlobsAndDirectories3.isCloudBlob()).andReturn(false);
|
||||
EasyMock.expect(blobItemDruidFactory.create(blobItemPrefixWithCloudBlobsAndDirectories3)).andReturn(
|
||||
directoryItemPrefixWithCloudBlobsAndDirectories3);
|
||||
|
||||
ArrayList<ListBlobItem> resultBlobItemsPrefixWithOnlyCloudBlobs1 = new ArrayList<>();
|
||||
resultBlobItemsPrefixWithOnlyCloudBlobs1.add(blobItemPrefixWithOnlyCloudBlobs1);
|
||||
ArrayList<ListBlobItem> resultBlobItemsPrefixWithOnlyCloudBlobs2 = new ArrayList<>();
|
||||
resultBlobItemsPrefixWithOnlyCloudBlobs2.add(blobItemPrefixWithOnlyCloudBlobs2);
|
||||
ArrayList<ListBlobItem> resultBlobItemsPrefixWithNoBlobs = new ArrayList<>();
|
||||
ArrayList<ListBlobItem> resultBlobItemsPrefixWithCloudBlobsAndDirectories = new ArrayList<>();
|
||||
resultBlobItemsPrefixWithCloudBlobsAndDirectories.add(blobItemPrefixWithCloudBlobsAndDirectories1);
|
||||
resultBlobItemsPrefixWithCloudBlobsAndDirectories.add(blobItemPrefixWithCloudBlobsAndDirectories2);
|
||||
resultBlobItemsPrefixWithCloudBlobsAndDirectories.add(blobItemPrefixWithCloudBlobsAndDirectories3);
|
||||
EasyMock.expect(resultSegmentPrefixOnlyAndFailLessThanMaxTriesCloudBlobs1.getContinuationToken())
|
||||
.andReturn(resultContinuationPrefixOnlyCloudBlobs);
|
||||
EasyMock.expect(resultSegmentPrefixOnlyAndFailLessThanMaxTriesCloudBlobs1.getResults())
|
||||
.andReturn(resultBlobItemsPrefixWithOnlyCloudBlobs1);
|
||||
|
||||
EasyMock.expect(resultSegmentPrefixOnlyAndFailLessThanMaxTriesCloudBlobs2.getContinuationToken()).andReturn(nullResultContinuationToken);
|
||||
EasyMock.expect(resultSegmentPrefixOnlyAndFailLessThanMaxTriesCloudBlobs2.getResults())
|
||||
.andReturn(resultBlobItemsPrefixWithOnlyCloudBlobs2);
|
||||
|
||||
EasyMock.expect(resultSegmentPrefixWithNoBlobs.getContinuationToken()).andReturn(nullResultContinuationToken);
|
||||
EasyMock.expect(resultSegmentPrefixWithNoBlobs.getResults()).andReturn(resultBlobItemsPrefixWithNoBlobs);
|
||||
|
||||
EasyMock.expect(resultSegmentPrefixWithCloudBlobsAndDirectories.getContinuationToken())
|
||||
.andReturn(nullResultContinuationToken);
|
||||
EasyMock.expect(resultSegmentPrefixWithCloudBlobsAndDirectories.getResults())
|
||||
.andReturn(resultBlobItemsPrefixWithCloudBlobsAndDirectories);
|
||||
|
||||
EasyMock.expect(storage.listBlobsWithPrefixInContainerSegmented(
|
||||
CONTAINER1,
|
||||
PREFIX_ONLY_CLOUD_BLOBS,
|
||||
nullResultContinuationToken,
|
||||
MAX_LISTING_LENGTH
|
||||
)).andThrow(RETRYABLE_EXCEPTION);
|
||||
|
||||
EasyMock.expect(storage.listBlobsWithPrefixInContainerSegmented(
|
||||
CONTAINER1,
|
||||
PREFIX_ONLY_CLOUD_BLOBS,
|
||||
nullResultContinuationToken,
|
||||
MAX_LISTING_LENGTH
|
||||
)).andReturn(resultSegmentPrefixOnlyAndFailLessThanMaxTriesCloudBlobs1);
|
||||
|
||||
|
||||
EasyMock.expect(storage.listBlobsWithPrefixInContainerSegmented(
|
||||
CONTAINER1,
|
||||
PREFIX_ONLY_CLOUD_BLOBS,
|
||||
resultContinuationPrefixOnlyCloudBlobs,
|
||||
MAX_LISTING_LENGTH
|
||||
)).andReturn(resultSegmentPrefixOnlyAndFailLessThanMaxTriesCloudBlobs2);
|
||||
|
||||
EasyMock.expect(storage.listBlobsWithPrefixInContainerSegmented(
|
||||
CONTAINER1,
|
||||
PREFIX_WITH_NO_BLOBS,
|
||||
nullResultContinuationToken,
|
||||
MAX_LISTING_LENGTH
|
||||
)).andReturn(resultSegmentPrefixWithNoBlobs);
|
||||
|
||||
EasyMock.expect(storage.listBlobsWithPrefixInContainerSegmented(
|
||||
CONTAINER1,
|
||||
PREFIX_WITH_CLOUD_BLOBS_AND_DIRECTORIES,
|
||||
nullResultContinuationToken,
|
||||
MAX_LISTING_LENGTH
|
||||
)).andReturn(resultSegmentPrefixWithCloudBlobsAndDirectories);
|
||||
|
||||
replayAll();
|
||||
|
||||
azureCloudBlobIterator = new AzureCloudBlobIterator(
|
||||
storage,
|
||||
blobItemDruidFactory,
|
||||
config,
|
||||
PREFIXES,
|
||||
MAX_LISTING_LENGTH
|
||||
);
|
||||
|
||||
List<CloudBlobHolder> expectedBlobItems = ImmutableList.of(
|
||||
cloudBlobDruidPrefixWithOnlyCloudBlobs1,
|
||||
cloudBlobDruidPrefixWithOnlyCloudBlobs2,
|
||||
cloudBlobDruidPrefixWithCloudBlobsAndDirectories
|
||||
);
|
||||
List<CloudBlobHolder> actualBlobItems = new ArrayList<>();
|
||||
while (azureCloudBlobIterator.hasNext()) {
|
||||
actualBlobItems.add(azureCloudBlobIterator.next());
|
||||
}
|
||||
Assert.assertEquals(expectedBlobItems.size(), actualBlobItems.size());
|
||||
Assert.assertTrue(expectedBlobItems.containsAll(actualBlobItems));
|
||||
verifyAll();
|
||||
}
|
||||
|
||||
@Test(expected = NoSuchElementException.class)
|
||||
public void test_next_emptyPrefixes_throwsNoSuchElementException()
|
||||
{
|
||||
azureCloudBlobIterator = new AzureCloudBlobIterator(
|
||||
storage,
|
||||
blobItemDruidFactory,
|
||||
config,
|
||||
EMPTY_URI_PREFIXES,
|
||||
MAX_LISTING_LENGTH
|
||||
);
|
||||
azureCloudBlobIterator.next();
|
||||
}
|
||||
|
||||
@Test(expected = RE.class)
|
||||
public void test_fetchNextBatch_moreThanMaxTriesRetryableExceptionsThrownInStorage_throwsREException() throws Exception
|
||||
{
|
||||
EasyMock.expect(config.getMaxTries()).andReturn(MAX_TRIES).atLeastOnce();
|
||||
EasyMock.expect(storage.listBlobsWithPrefixInContainerSegmented(
|
||||
EasyMock.anyString(),
|
||||
EasyMock.anyString(),
|
||||
EasyMock.anyObject(),
|
||||
EasyMock.anyInt()
|
||||
)).andThrow(RETRYABLE_EXCEPTION);
|
||||
EasyMock.expect(storage.listBlobsWithPrefixInContainerSegmented(
|
||||
EasyMock.anyString(),
|
||||
EasyMock.anyString(),
|
||||
EasyMock.anyObject(),
|
||||
EasyMock.anyInt()
|
||||
)).andThrow(RETRYABLE_EXCEPTION);
|
||||
azureCloudBlobIterator = new AzureCloudBlobIterator(
|
||||
storage,
|
||||
blobItemDruidFactory,
|
||||
config,
|
||||
PREFIXES,
|
||||
MAX_LISTING_LENGTH
|
||||
);
|
||||
}
|
||||
|
||||
@Test(expected = RE.class)
|
||||
public void test_fetchNextBatch_nonRetryableExceptionThrownInStorage_throwsREException() throws Exception
|
||||
{
|
||||
EasyMock.expect(config.getMaxTries()).andReturn(MAX_TRIES).atLeastOnce();
|
||||
EasyMock.expect(storage.listBlobsWithPrefixInContainerSegmented(
|
||||
EasyMock.anyString(),
|
||||
EasyMock.anyString(),
|
||||
EasyMock.anyObject(),
|
||||
EasyMock.anyInt()
|
||||
)).andThrow(NON_RETRYABLE_EXCEPTION);
|
||||
azureCloudBlobIterator = new AzureCloudBlobIterator(
|
||||
storage,
|
||||
blobItemDruidFactory,
|
||||
config,
|
||||
PREFIXES,
|
||||
MAX_LISTING_LENGTH
|
||||
);
|
||||
}
|
||||
|
||||
@After
|
||||
public void cleanup()
|
||||
{
|
||||
resetAll();
|
||||
}
|
||||
}
|
|
@ -42,11 +42,13 @@ public class AzureDataSegmentPullerTest extends EasyMockSupport
|
|||
private static final String BLOB_PATH = "path/to/storage/index.zip";
|
||||
private static final String BLOB_PATH_HADOOP = AzureDataSegmentPuller.AZURE_STORAGE_HOST_ADDRESS + "/path/to/storage/index.zip";
|
||||
private AzureStorage azureStorage;
|
||||
private AzureByteSourceFactory byteSourceFactory;
|
||||
|
||||
@Before
|
||||
public void before()
|
||||
{
|
||||
azureStorage = createMock(AzureStorage.class);
|
||||
byteSourceFactory = createMock(AzureByteSourceFactory.class);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -59,11 +61,12 @@ public class AzureDataSegmentPullerTest extends EasyMockSupport
|
|||
try {
|
||||
final InputStream zipStream = new FileInputStream(pulledFile);
|
||||
|
||||
EasyMock.expect(azureStorage.getBlobInputStream(CONTAINER_NAME, BLOB_PATH)).andReturn(zipStream);
|
||||
EasyMock.expect(byteSourceFactory.create(CONTAINER_NAME, BLOB_PATH)).andReturn(new AzureByteSource(azureStorage, CONTAINER_NAME, BLOB_PATH));
|
||||
EasyMock.expect(azureStorage.getBlobInputStream(0L, CONTAINER_NAME, BLOB_PATH)).andReturn(zipStream);
|
||||
|
||||
replayAll();
|
||||
|
||||
AzureDataSegmentPuller puller = new AzureDataSegmentPuller(azureStorage);
|
||||
AzureDataSegmentPuller puller = new AzureDataSegmentPuller(byteSourceFactory);
|
||||
|
||||
FileUtils.FileCopyResult result = puller.getSegmentFiles(CONTAINER_NAME, BLOB_PATH, toDir);
|
||||
|
||||
|
@ -90,11 +93,12 @@ public class AzureDataSegmentPullerTest extends EasyMockSupport
|
|||
try {
|
||||
final InputStream zipStream = new FileInputStream(pulledFile);
|
||||
|
||||
EasyMock.expect(azureStorage.getBlobInputStream(CONTAINER_NAME, BLOB_PATH)).andReturn(zipStream);
|
||||
EasyMock.expect(byteSourceFactory.create(CONTAINER_NAME, BLOB_PATH)).andReturn(new AzureByteSource(azureStorage, CONTAINER_NAME, BLOB_PATH));
|
||||
EasyMock.expect(azureStorage.getBlobInputStream(0L, CONTAINER_NAME, BLOB_PATH)).andReturn(zipStream);
|
||||
|
||||
replayAll();
|
||||
|
||||
AzureDataSegmentPuller puller = new AzureDataSegmentPuller(azureStorage);
|
||||
AzureDataSegmentPuller puller = new AzureDataSegmentPuller(byteSourceFactory);
|
||||
|
||||
FileUtils.FileCopyResult result = puller.getSegmentFiles(CONTAINER_NAME, BLOB_PATH_HADOOP, toDir);
|
||||
|
||||
|
@ -118,7 +122,8 @@ public class AzureDataSegmentPullerTest extends EasyMockSupport
|
|||
|
||||
final File outDir = FileUtils.createTempDir();
|
||||
try {
|
||||
EasyMock.expect(azureStorage.getBlobInputStream(CONTAINER_NAME, BLOB_PATH)).andThrow(
|
||||
EasyMock.expect(byteSourceFactory.create(CONTAINER_NAME, BLOB_PATH)).andReturn(new AzureByteSource(azureStorage, CONTAINER_NAME, BLOB_PATH));
|
||||
EasyMock.expect(azureStorage.getBlobInputStream(0L, CONTAINER_NAME, BLOB_PATH)).andThrow(
|
||||
new URISyntaxException(
|
||||
"error",
|
||||
"error",
|
||||
|
@ -128,7 +133,7 @@ public class AzureDataSegmentPullerTest extends EasyMockSupport
|
|||
|
||||
replayAll();
|
||||
|
||||
AzureDataSegmentPuller puller = new AzureDataSegmentPuller(azureStorage);
|
||||
AzureDataSegmentPuller puller = new AzureDataSegmentPuller(byteSourceFactory);
|
||||
|
||||
puller.getSegmentFiles(CONTAINER_NAME, BLOB_PATH, outDir);
|
||||
}
|
||||
|
@ -149,13 +154,14 @@ public class AzureDataSegmentPullerTest extends EasyMockSupport
|
|||
|
||||
final File outDir = FileUtils.createTempDir();
|
||||
try {
|
||||
EasyMock.expect(azureStorage.getBlobInputStream(CONTAINER_NAME, BLOB_PATH)).andThrow(
|
||||
EasyMock.expect(byteSourceFactory.create(CONTAINER_NAME, BLOB_PATH)).andReturn(new AzureByteSource(azureStorage, CONTAINER_NAME, BLOB_PATH));
|
||||
EasyMock.expect(azureStorage.getBlobInputStream(0L, CONTAINER_NAME, BLOB_PATH)).andThrow(
|
||||
new StorageException(null, null, 0, null, null)
|
||||
).atLeastOnce();
|
||||
|
||||
replayAll();
|
||||
|
||||
AzureDataSegmentPuller puller = new AzureDataSegmentPuller(azureStorage);
|
||||
AzureDataSegmentPuller puller = new AzureDataSegmentPuller(byteSourceFactory);
|
||||
|
||||
puller.getSegmentFiles(CONTAINER_NAME, BLOB_PATH, outDir);
|
||||
|
||||
|
|
|
@ -27,23 +27,42 @@ import com.google.inject.Key;
|
|||
import com.google.inject.Module;
|
||||
import com.microsoft.azure.storage.StorageCredentials;
|
||||
import com.microsoft.azure.storage.blob.CloudBlobClient;
|
||||
import com.microsoft.azure.storage.blob.ListBlobItem;
|
||||
import org.apache.druid.data.input.azure.AzureEntityFactory;
|
||||
import org.apache.druid.data.input.impl.CloudObjectLocation;
|
||||
import org.apache.druid.guice.DruidGuiceExtensions;
|
||||
import org.apache.druid.guice.JsonConfigurator;
|
||||
import org.apache.druid.guice.LazySingleton;
|
||||
import org.apache.druid.storage.azure.blob.ListBlobItemHolder;
|
||||
import org.apache.druid.storage.azure.blob.ListBlobItemHolderFactory;
|
||||
import org.easymock.EasyMock;
|
||||
import org.easymock.EasyMockSupport;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import javax.validation.Validation;
|
||||
import javax.validation.Validator;
|
||||
import java.net.URI;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.Base64;
|
||||
import java.util.Properties;
|
||||
|
||||
public class AzureStorageDruidModuleTest
|
||||
public class AzureStorageDruidModuleTest extends EasyMockSupport
|
||||
{
|
||||
private static final String AZURE_ACCOUNT_NAME;
|
||||
private static final String AZURE_ACCOUNT_KEY;
|
||||
private static final String AZURE_CONTAINER;
|
||||
private static final String PATH = "path";
|
||||
private static final Iterable<URI> EMPTY_PREFIXES_ITERABLE = ImmutableList.of();
|
||||
private static final Properties PROPERTIES;
|
||||
|
||||
private CloudObjectLocation cloudObjectLocation1;
|
||||
private CloudObjectLocation cloudObjectLocation2;
|
||||
private ListBlobItem blobItem1;
|
||||
private ListBlobItem blobItem2;
|
||||
|
||||
|
||||
private Injector injector;
|
||||
|
||||
static {
|
||||
|
@ -52,20 +71,26 @@ public class AzureStorageDruidModuleTest
|
|||
AZURE_ACCOUNT_KEY = Base64.getUrlEncoder()
|
||||
.encodeToString("azureKey1".getBytes(StandardCharsets.UTF_8.toString()));
|
||||
AZURE_CONTAINER = "azureContainer1";
|
||||
PROPERTIES = initializePropertes();
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setup()
|
||||
{
|
||||
cloudObjectLocation1 = createMock(CloudObjectLocation.class);
|
||||
cloudObjectLocation2 = createMock(CloudObjectLocation.class);
|
||||
blobItem1 = createMock(ListBlobItem.class);
|
||||
blobItem2 = createMock(ListBlobItem.class);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test_getBlobClient_expectedClient()
|
||||
{
|
||||
final Properties props = new Properties();
|
||||
props.put("druid.azure.account", AZURE_ACCOUNT_NAME);
|
||||
props.put("druid.azure.key", AZURE_ACCOUNT_KEY);
|
||||
props.put("druid.azure.container", AZURE_CONTAINER);
|
||||
injector = makeInjectorWithProperties(props);
|
||||
injector = makeInjectorWithProperties(PROPERTIES);
|
||||
AzureAccountConfig azureAccountConfig = injector.getInstance(Key.get(AzureAccountConfig.class));
|
||||
|
||||
Assert.assertEquals(AZURE_ACCOUNT_NAME, azureAccountConfig.getAccount());
|
||||
|
@ -81,11 +106,7 @@ public class AzureStorageDruidModuleTest
|
|||
@Test
|
||||
public void test_getAzureStorageContainer_expectedClient()
|
||||
{
|
||||
final Properties props = new Properties();
|
||||
props.put("druid.azure.account", AZURE_ACCOUNT_NAME);
|
||||
props.put("druid.azure.key", AZURE_ACCOUNT_KEY);
|
||||
props.put("druid.azure.container", AZURE_CONTAINER);
|
||||
injector = makeInjectorWithProperties(props);
|
||||
injector = makeInjectorWithProperties(PROPERTIES);
|
||||
AzureAccountConfig azureAccountConfig = injector.getInstance(Key.get(AzureAccountConfig.class));
|
||||
|
||||
Assert.assertEquals(AZURE_ACCOUNT_NAME, azureAccountConfig.getAccount());
|
||||
|
@ -101,6 +122,83 @@ public class AzureStorageDruidModuleTest
|
|||
Assert.assertSame(cloudBlobClient, azureStorage.getCloudBlobClient());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test_getAzureCloudBlobToLocationConverter_expectedConverted()
|
||||
{
|
||||
injector = makeInjectorWithProperties(PROPERTIES);
|
||||
AzureCloudBlobHolderToCloudObjectLocationConverter azureCloudBlobLocationConverter1 = injector.getInstance(
|
||||
AzureCloudBlobHolderToCloudObjectLocationConverter.class);
|
||||
AzureCloudBlobHolderToCloudObjectLocationConverter azureCloudBlobLocationConverter2 = injector.getInstance(
|
||||
AzureCloudBlobHolderToCloudObjectLocationConverter.class);
|
||||
Assert.assertSame(azureCloudBlobLocationConverter1, azureCloudBlobLocationConverter2);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test_getAzureByteSourceFactory_canCreateAzureByteSource()
|
||||
{
|
||||
injector = makeInjectorWithProperties(PROPERTIES);
|
||||
AzureByteSourceFactory factory = injector.getInstance(AzureByteSourceFactory.class);
|
||||
Object object1 = factory.create("container1", "blob1");
|
||||
Object object2 = factory.create("container2", "blob2");
|
||||
Assert.assertNotNull(object1);
|
||||
Assert.assertNotNull(object2);
|
||||
Assert.assertNotSame(object1, object2);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test_getAzureEntityFactory_canCreateAzureEntity()
|
||||
{
|
||||
EasyMock.expect(cloudObjectLocation1.getBucket()).andReturn(AZURE_CONTAINER);
|
||||
EasyMock.expect(cloudObjectLocation2.getBucket()).andReturn(AZURE_CONTAINER);
|
||||
EasyMock.expect(cloudObjectLocation1.getPath()).andReturn(PATH);
|
||||
EasyMock.expect(cloudObjectLocation2.getPath()).andReturn(PATH);
|
||||
replayAll();
|
||||
|
||||
injector = makeInjectorWithProperties(PROPERTIES);
|
||||
AzureEntityFactory factory = injector.getInstance(AzureEntityFactory.class);
|
||||
Object object1 = factory.create(cloudObjectLocation1);
|
||||
Object object2 = factory.create(cloudObjectLocation2);
|
||||
Assert.assertNotNull(object1);
|
||||
Assert.assertNotNull(object2);
|
||||
Assert.assertNotSame(object1, object2);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test_getAzureCloudBlobIteratorFactory_canCreateAzureCloudBlobIterator()
|
||||
{
|
||||
injector = makeInjectorWithProperties(PROPERTIES);
|
||||
AzureCloudBlobIteratorFactory factory = injector.getInstance(AzureCloudBlobIteratorFactory.class);
|
||||
Object object1 = factory.create(EMPTY_PREFIXES_ITERABLE, 10);
|
||||
Object object2 = factory.create(EMPTY_PREFIXES_ITERABLE, 10);
|
||||
Assert.assertNotNull(object1);
|
||||
Assert.assertNotNull(object2);
|
||||
Assert.assertNotSame(object1, object2);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test_getAzureCloudBlobIterableFactory_canCreateAzureCloudBlobIterable()
|
||||
{
|
||||
injector = makeInjectorWithProperties(PROPERTIES);
|
||||
AzureCloudBlobIterableFactory factory = injector.getInstance(AzureCloudBlobIterableFactory.class);
|
||||
AzureCloudBlobIterable object1 = factory.create(EMPTY_PREFIXES_ITERABLE, 10);
|
||||
AzureCloudBlobIterable object2 = factory.create(EMPTY_PREFIXES_ITERABLE, 10);
|
||||
Assert.assertNotNull(object1);
|
||||
Assert.assertNotNull(object2);
|
||||
Assert.assertNotSame(object1, object2);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test_getListBlobItemDruidFactory_canCreateListBlobItemDruid()
|
||||
{
|
||||
injector = makeInjectorWithProperties(PROPERTIES);
|
||||
ListBlobItemHolderFactory factory = injector.getInstance(ListBlobItemHolderFactory.class);
|
||||
ListBlobItemHolder object1 = factory.create(blobItem1);
|
||||
ListBlobItemHolder object2 = factory.create(blobItem2);
|
||||
Assert.assertNotNull(object1);
|
||||
Assert.assertNotNull(object2);
|
||||
Assert.assertNotSame(object1, object2);
|
||||
}
|
||||
|
||||
private Injector makeInjectorWithProperties(final Properties props)
|
||||
{
|
||||
return Guice.createInjector(
|
||||
|
@ -119,4 +217,13 @@ public class AzureStorageDruidModuleTest
|
|||
new AzureStorageDruidModule()
|
||||
));
|
||||
}
|
||||
|
||||
private static Properties initializePropertes()
|
||||
{
|
||||
final Properties props = new Properties();
|
||||
props.put("druid.azure.account", AZURE_ACCOUNT_NAME);
|
||||
props.put("druid.azure.key", AZURE_ACCOUNT_KEY);
|
||||
props.put("druid.azure.container", AZURE_CONTAINER);
|
||||
return props;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,70 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.storage.azure;
|
||||
|
||||
import org.apache.druid.data.input.azure.AzureInputSource;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.net.URI;
|
||||
|
||||
public class AzureUtilsTest
|
||||
{
|
||||
private static final String CONTAINER_NAME = "container1";
|
||||
private static final String BLOB_NAME = "blob1";
|
||||
private static final String BLOB_PATH_WITH_LEADING_SLASH = "/" + BLOB_NAME;
|
||||
private static final String BLOB_PATH_WITH_LEADING_AZURE_PREFIX = AzureUtils.AZURE_STORAGE_HOST_ADDRESS
|
||||
+ "/"
|
||||
+ BLOB_NAME;
|
||||
private static final URI URI_WITH_PATH_WITH_LEADING_SLASH;
|
||||
|
||||
static {
|
||||
try {
|
||||
URI_WITH_PATH_WITH_LEADING_SLASH = new URI(AzureInputSource.SCHEME
|
||||
+ "://"
|
||||
+ CONTAINER_NAME
|
||||
+ BLOB_PATH_WITH_LEADING_SLASH);
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test_extractAzureKey_pathHasLeadingSlash_returnsPathWithLeadingSlashRemoved()
|
||||
{
|
||||
String extractedKey = AzureUtils.extractAzureKey(URI_WITH_PATH_WITH_LEADING_SLASH);
|
||||
Assert.assertEquals(BLOB_NAME, extractedKey);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test_maybeRemoveAzurePathPrefix_pathHasLeadingAzurePathPrefix_returnsPathWithLeadingAzurePathRemoved()
|
||||
{
|
||||
String path = AzureUtils.maybeRemoveAzurePathPrefix(BLOB_PATH_WITH_LEADING_AZURE_PREFIX);
|
||||
Assert.assertEquals(BLOB_NAME, path);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test_maybeRemoveAzurePathPrefix_pathDoesNotHaveAzurePathPrefix__returnsPathWithLeadingAzurePathRemoved()
|
||||
{
|
||||
String path = AzureUtils.maybeRemoveAzurePathPrefix(BLOB_NAME);
|
||||
Assert.assertEquals(BLOB_NAME, path);
|
||||
}
|
||||
}
|
|
@ -41,8 +41,6 @@ import java.util.stream.StreamSupport;
|
|||
|
||||
public class S3InputSource extends CloudObjectInputSource<S3Entity>
|
||||
{
|
||||
private static final int MAX_LISTING_LENGTH = 1024;
|
||||
|
||||
private final ServerSideEncryptingAmazonS3 s3Client;
|
||||
|
||||
@JsonCreator
|
||||
|
|
|
@ -308,7 +308,7 @@ notices:
|
|||
Google Guice - Extensions - Servlet
|
||||
Copyright 2006-2016 Google, Inc.
|
||||
- guice-assistedinject: |
|
||||
Google Guice - Extensions - AssistedInect
|
||||
Google Guice - Extensions - AssistedInject
|
||||
Copyright 2006-2016 Google, Inc.
|
||||
|
||||
---
|
||||
|
|
Loading…
Reference in New Issue