Add more support for Azure Blob Store

Azure Blob Store support for Task Logs and a firehose for data ingestion
This commit is contained in:
Zak Kristjanson 2015-07-09 15:49:57 -07:00
parent 3ce32f97ec
commit 0bda7af52c
12 changed files with 555 additions and 12 deletions

View File

@ -19,11 +19,11 @@ The indexing service uses several of the global configs in [Configuration](../co
#### Task Logging
If you are running the indexing service in remote mode, the task logs must S3 or HDFS.
If you are running the indexing service in remote mode, the task logs must be stored in S3, Azure Blob Store or HDFS.
|Property|Description|Default|
|--------|-----------|-------|
|`druid.indexer.logs.type`|Choices:noop, s3, hdfs, file. Where to store task logs|file|
|`druid.indexer.logs.type`|Choices:noop, s3, azure, hdfs, file. Where to store task logs|file|
##### File Task Logs
@ -42,6 +42,16 @@ Store task logs in S3.
|`druid.indexer.logs.s3Bucket`|S3 bucket name.|none|
|`druid.indexer.logs.s3Prefix`|S3 key prefix.|none|
#### Azure Blob Store Task Logs
Store task logs in Azure Blob Store.
Note: this uses the same storage account as the deep storage module for azure.
|Property|Description|Default|
|--------|-----------|-------|
|`druid.indexer.logs.container`|The Azure Blob Store container to write logs to|none|
|`druid.indexer.logs.prefix`|The path to prepend to logs|none|
##### HDFS Task Logs
Store task logs in HDFS.

View File

@ -27,6 +27,33 @@ This firehose acts as a Kafka consumer and ingests data from Kafka.
This firehose ingests events from a predefined list of S3 objects.
#### StaticAzureBlobStoreFirehose
This firehose ingests events, similar to the StaticS3Firehose, but from an Azure Blob Store.
Data is newline delimited, with one JSON object per line and parsed as per the `InputRowParser` configuration.
The storage account is shared with the one used for Azure deep storage functionality, but blobs can be in a different container.
As with the S3 blobstore, it is assumed to be gzipped if the extension ends in .gz
Sample spec:
```json
"firehose" : {
"type" : "static-azure-blobstore",
"blobs": [
{
"container": "container",
"path": "/path/to/your/file.json"
},
{
"container": "anothercontainer",
"path": "/another/path.json"
}
]
}
```
#### TwitterSpritzerFirehose
See [Examples](../tutorials/examples.html). This firehose connects directly to the twitter spritzer data stream.

View File

@ -0,0 +1,49 @@
/*
* Druid - a distributed column store.
* Copyright 2012 - 2015 Metamarkets Group Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.druid.firehose.azure;
import com.fasterxml.jackson.annotation.JsonProperty;
import javax.validation.constraints.NotNull;
public class AzureBlob {
@JsonProperty
@NotNull
private String container = null;
@JsonProperty
@NotNull
private String path = null;
public AzureBlob() {
}
public AzureBlob(String container, String path) {
this.container = container;
this.path = path;
}
public String getContainer() {
return container;
}
public String getPath() {
return path;
}
}

View File

@ -0,0 +1,124 @@
/*
* Druid - a distributed column store.
* Copyright 2012 - 2015 Metamarkets Group Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.druid.firehose.azure;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Charsets;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.metamx.common.CompressionUtils;
import com.metamx.common.logger.Logger;
import io.druid.data.input.Firehose;
import io.druid.data.input.FirehoseFactory;
import io.druid.data.input.impl.FileIteratingFirehose;
import io.druid.data.input.impl.StringInputRowParser;
import io.druid.storage.azure.AzureByteSource;
import io.druid.storage.azure.AzureStorage;
import org.apache.commons.io.IOUtils;
import org.apache.commons.io.LineIterator;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
/**
* This class is heavily inspired by the StaticS3FirehoseFactory class in the io.druid.firehose.s3 package
*/
public class StaticAzureBlobStoreFirehoseFactory implements FirehoseFactory<StringInputRowParser> {
private static final Logger log = new Logger(StaticAzureBlobStoreFirehoseFactory.class);
private final AzureStorage azureStorage;
private final List<AzureBlob> blobs;
@JsonCreator
public StaticAzureBlobStoreFirehoseFactory(
@JacksonInject("azureStorage") AzureStorage azureStorage,
@JsonProperty("blobs") AzureBlob[] blobs
) {
this.azureStorage = azureStorage;
this.blobs = ImmutableList.copyOf(blobs);
}
@JsonProperty
public List<AzureBlob> getBlobs() {
return blobs;
}
@Override
public Firehose connect(StringInputRowParser stringInputRowParser) throws IOException {
Preconditions.checkNotNull(azureStorage, "null azureStorage");
final LinkedList<AzureBlob> objectQueue = Lists.newLinkedList(blobs);
return new FileIteratingFirehose(
new Iterator<LineIterator>() {
@Override
public boolean hasNext() {
return !objectQueue.isEmpty();
}
@Override
public LineIterator next() {
final AzureBlob nextURI = objectQueue.poll();
final String container = nextURI.getContainer();
final String path = nextURI.getPath().startsWith("/")
? nextURI.getPath().substring(1)
: nextURI.getPath();
try {
final InputStream innerInputStream = new AzureByteSource(azureStorage, container, path).openStream();
final InputStream outerInputStream = path.endsWith(".gz")
? CompressionUtils.gzipInputStream(innerInputStream)
: innerInputStream;
return IOUtils.lineIterator(
new BufferedReader(
new InputStreamReader(outerInputStream, Charsets.UTF_8)
)
);
} catch (Exception e) {
log.error(e,
"Exception opening container[%s] blob[%s]",
container,
path
);
throw Throwables.propagate(e);
}
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
},
stringInputRowParser
);
}
}

View File

@ -22,7 +22,6 @@ import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.inject.Inject;
import com.metamx.common.CompressionUtils;
import com.metamx.common.RetryUtils;
import com.metamx.common.logger.Logger;
import com.microsoft.azure.storage.StorageException;
import io.druid.segment.SegmentUtils;
@ -94,11 +93,6 @@ public class AzureDataSegmentPusher implements DataSegmentPusher
}
public <T> T retryAzureOperation(Callable<T> f, int maxTries) throws Exception
{
return RetryUtils.retry(f, AzureUtils.AZURE_RETRY, maxTries);
}
public DataSegment uploadDataSegment(
DataSegment segment,
final int version,
@ -146,7 +140,7 @@ public class AzureDataSegmentPusher implements DataSegmentPusher
final Map<String, String> azurePaths = getAzurePaths(segment);
try {
return retryAzureOperation(
return AzureUtils.retryAzureOperation(
new Callable<DataSegment>()
{
@Override

View File

@ -89,6 +89,16 @@ public class AzureStorage
}
}
public CloudBlockBlob getBlob(final String containerName, final String blobPath)
throws URISyntaxException, StorageException {
return getCloudBlobContainer(containerName).getBlockBlobReference(blobPath);
}
public long getBlobLength(final String containerName, final String blobPath)
throws URISyntaxException, StorageException {
return getCloudBlobContainer(containerName).getBlockBlobReference(blobPath).getProperties().getLength();
}
public InputStream getBlobInputStream(final String containerName, final String blobPath)
throws URISyntaxException, StorageException
{
@ -96,4 +106,8 @@ public class AzureStorage
return container.getBlockBlobReference(blobPath).openInputStream();
}
public boolean getBlobExists(String container, String blobPath)
throws URISyntaxException, StorageException {
return getCloudBlobContainer(container).getBlockBlobReference(blobPath).exists();
}
}

View File

@ -19,14 +19,18 @@ package io.druid.storage.azure;
import com.fasterxml.jackson.core.Version;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.common.collect.ImmutableList;
import com.google.inject.Binder;
import com.google.inject.Provides;
import com.microsoft.azure.storage.CloudStorageAccount;
import com.microsoft.azure.storage.blob.CloudBlobClient;
import io.druid.firehose.azure.StaticAzureBlobStoreFirehoseFactory;
import io.druid.guice.Binders;
import io.druid.guice.JsonConfigProvider;
import io.druid.guice.LazySingleton;
import io.druid.guice.annotations.Json;
import io.druid.initialization.DruidModule;
import java.net.URISyntaxException;
@ -64,7 +68,9 @@ public class AzureStorageDruidModule implements DruidModule
{
context.registerSubtypes(AzureLoadSpec.class);
}
}
},
new SimpleModule().registerSubtypes(
new NamedType(StaticAzureBlobStoreFirehoseFactory.class, "static-azure-blobstore"))
);
}
@ -84,6 +90,10 @@ public class AzureStorageDruidModule implements DruidModule
Binders.dataSegmentKillerBinder(binder)
.addBinding(SCHEME)
.to(AzureDataSegmentKiller.class).in(LazySingleton.class);
Binders.taskLogsBinder(binder).addBinding(SCHEME).to(AzureTaskLogs.class);
JsonConfigProvider.bind(binder, "druid.indexer.logs", AzureTaskLogsConfig.class);
binder.bind(AzureTaskLogs.class).in(LazySingleton.class);
}
@Provides

View File

@ -0,0 +1,112 @@
/*
* Druid - a distributed column store.
* Copyright 2012 - 2015 Metamarkets Group Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.druid.storage.azure;
import com.google.common.base.Optional;
import com.google.common.base.Throwables;
import com.google.common.io.ByteSource;
import com.google.inject.Inject;
import com.metamx.common.logger.Logger;
import com.microsoft.azure.storage.StorageException;
import io.druid.tasklogs.TaskLogs;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.net.URISyntaxException;
import java.util.concurrent.Callable;
public class AzureTaskLogs implements TaskLogs {
private static final Logger log = new Logger(AzureTaskLogs.class);
private final AzureTaskLogsConfig config;
private final AzureStorage azureStorage;
@Inject
public AzureTaskLogs(AzureTaskLogsConfig config, AzureStorage azureStorage) {
this.config = config;
this.azureStorage = azureStorage;
}
@Override
public void pushTaskLog(final String taskid, final File logFile) throws IOException {
final String taskKey = getTaskLogKey(taskid);
log.info("Pushing task log %s to: %s", logFile, taskKey);
try {
AzureUtils.retryAzureOperation(
new Callable<Void>() {
@Override
public Void call() throws Exception {
azureStorage.uploadBlob(logFile, config.getContainer(), taskKey);
return null;
}
},
config.getMaxTries()
);
} catch (Exception e) {
throw Throwables.propagate(e);
}
}
@Override
public Optional<ByteSource> streamTaskLog(final String taskid, final long offset) throws IOException {
final String container = config.getContainer();
final String taskKey = getTaskLogKey(taskid);
try {
if (!azureStorage.getBlobExists(container, taskKey)) return Optional.absent();
return Optional.<ByteSource>of(
new ByteSource() {
@Override
public InputStream openStream() throws IOException {
try {
final long start;
final long length = azureStorage.getBlobLength(container, taskKey);
if (offset > 0 && offset < length) {
start = offset;
} else if (offset < 0 && (-1 * offset) < length) {
start = length + offset;
} else {
start = 0;
}
InputStream stream = azureStorage.getBlobInputStream(container, taskKey);
stream.skip(start);
return stream;
} catch(Exception e) {
throw new IOException(e);
}
}
}
);
} catch (StorageException | URISyntaxException e) {
throw new IOException(String.format("Failed to stream logs from: %s", taskKey), e);
}
}
private String getTaskLogKey(String taskid) {
return String.format("%s/%s/log", config.getPrefix(), taskid);
}
}

View File

@ -0,0 +1,59 @@
/*
* Druid - a distributed column store.
* Copyright 2012 - 2015 Metamarkets Group Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.druid.storage.azure;
import com.fasterxml.jackson.annotation.JsonProperty;
import javax.validation.constraints.Min;
import javax.validation.constraints.NotNull;
public class AzureTaskLogsConfig {
@JsonProperty
@NotNull
private String container = null;
@JsonProperty
@NotNull
private String prefix = null;
@JsonProperty
@Min(1)
private int maxTries = 3;
public AzureTaskLogsConfig() {
}
public AzureTaskLogsConfig(String container, String prefix, int maxTries) {
this.container = container;
this.prefix = prefix;
this.maxTries = maxTries;
}
public String getContainer() {
return container;
}
public String getPrefix() {
return prefix;
}
public int getMaxTries() {
return maxTries;
}
}

View File

@ -18,10 +18,12 @@
package io.druid.storage.azure;
import com.google.common.base.Predicate;
import com.metamx.common.RetryUtils;
import com.microsoft.azure.storage.StorageException;
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.concurrent.Callable;
public class AzureUtils
{
@ -48,5 +50,7 @@ public class AzureUtils
}
};
public static <T> T retryAzureOperation(Callable<T> f, int maxTries) throws Exception {
return RetryUtils.retry(f, AZURE_RETRY, maxTries);
}
}

View File

@ -1 +1 @@
io.druid.storage.azure.AzureStorageDruidModule
io.druid.storage.azure.AzureStorageDruidModule

View File

@ -0,0 +1,140 @@
/*
* Druid - a distributed column store.
* Copyright 2012 - 2015 Metamarkets Group Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.druid.storage.azure;
import com.google.common.base.Charsets;
import com.google.common.base.Optional;
import com.google.common.io.ByteSource;
import com.google.common.io.Files;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.easymock.EasyMockSupport;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.StringWriter;
import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.expectLastCall;
public class AzureTaskLogsTest extends EasyMockSupport {
private static final String container = "test";
private static final String prefix = "test/log";
private static final String taskid = "taskid";
private static final AzureTaskLogsConfig azureTaskLogsConfig = new AzureTaskLogsConfig(container, prefix, 3);
private AzureStorage azureStorage;
private AzureTaskLogs azureTaskLogs;
@Before
public void before() {
azureStorage = createMock(AzureStorage.class);
azureTaskLogs = new AzureTaskLogs(azureTaskLogsConfig, azureStorage);
}
@Test
public void testPushTaskLog() throws Exception {
final File tmpDir = Files.createTempDir();
try {
final File logFile = new File(tmpDir, "log");
azureStorage.uploadBlob(logFile, container, prefix + "/" + taskid + "/log");
expectLastCall();
replayAll();
azureTaskLogs.pushTaskLog(taskid, logFile);
verifyAll();
} finally {
FileUtils.deleteDirectory(tmpDir);
}
}
@Test
public void testStreamTaskLogWithoutOffset() throws Exception {
final String testLog = "hello this is a log";
final String blobPath = prefix + "/" + taskid + "/log";
expect(azureStorage.getBlobExists(container, blobPath)).andReturn(true);
expect(azureStorage.getBlobLength(container, blobPath)).andReturn((long) testLog.length());
expect(azureStorage.getBlobInputStream(container, blobPath)).andReturn(
new ByteArrayInputStream(testLog.getBytes(Charsets.UTF_8)));
replayAll();
final Optional<ByteSource> byteSource = azureTaskLogs.streamTaskLog(taskid, 0);
final StringWriter writer = new StringWriter();
IOUtils.copy(byteSource.get().openStream(), writer, "UTF-8");
Assert.assertEquals(writer.toString(), testLog);
verifyAll();
}
@Test
public void testStreamTaskLogWithPositiveOffset() throws Exception {
final String testLog = "hello this is a log";
final String blobPath = prefix + "/" + taskid + "/log";
expect(azureStorage.getBlobExists(container, blobPath)).andReturn(true);
expect(azureStorage.getBlobLength(container, blobPath)).andReturn((long) testLog.length());
expect(azureStorage.getBlobInputStream(container, blobPath)).andReturn(
new ByteArrayInputStream(testLog.getBytes(Charsets.UTF_8)));
replayAll();
final Optional<ByteSource> byteSource = azureTaskLogs.streamTaskLog(taskid, 5);
final StringWriter writer = new StringWriter();
IOUtils.copy(byteSource.get().openStream(), writer, "UTF-8");
Assert.assertEquals(writer.toString(), testLog.substring(5));
verifyAll();
}
@Test
public void testStreamTaskLogWithNegative() throws Exception {
final String testLog = "hello this is a log";
final String blobPath = prefix + "/" + taskid + "/log";
expect(azureStorage.getBlobExists(container, blobPath)).andReturn(true);
expect(azureStorage.getBlobLength(container, blobPath)).andReturn((long) testLog.length());
expect(azureStorage.getBlobInputStream(container, blobPath)).andReturn(
new ByteArrayInputStream(testLog.getBytes(Charsets.UTF_8)));
replayAll();
final Optional<ByteSource> byteSource = azureTaskLogs.streamTaskLog(taskid, -3);
final StringWriter writer = new StringWriter();
IOUtils.copy(byteSource.get().openStream(), writer, "UTF-8");
Assert.assertEquals(writer.toString(), testLog.substring(testLog.length() - 3));
verifyAll();
}
}