From 0bda7af52ce2a70f37e03af9becf0c0ac0811ef7 Mon Sep 17 00:00:00 2001 From: Zak Kristjanson Date: Thu, 9 Jul 2015 15:49:57 -0700 Subject: [PATCH] Add more support for Azure Blob Store Azure Blob Store support for Task Logs and a firehose for data ingestion --- .../content/configuration/indexing-service.md | 14 +- docs/content/ingestion/firehose.md | 27 ++++ .../io/druid/firehose/azure/AzureBlob.java | 49 ++++++ .../StaticAzureBlobStoreFirehoseFactory.java | 124 ++++++++++++++++ .../storage/azure/AzureDataSegmentPusher.java | 8 +- .../io/druid/storage/azure/AzureStorage.java | 14 ++ .../azure/AzureStorageDruidModule.java | 12 +- .../io/druid/storage/azure/AzureTaskLogs.java | 112 ++++++++++++++ .../storage/azure/AzureTaskLogsConfig.java | 59 ++++++++ .../io/druid/storage/azure/AzureUtils.java | 6 +- .../io.druid.initialization.DruidModule | 2 +- .../storage/azure/AzureTaskLogsTest.java | 140 ++++++++++++++++++ 12 files changed, 555 insertions(+), 12 deletions(-) create mode 100644 extensions/azure-extensions/src/main/java/io/druid/firehose/azure/AzureBlob.java create mode 100644 extensions/azure-extensions/src/main/java/io/druid/firehose/azure/StaticAzureBlobStoreFirehoseFactory.java create mode 100644 extensions/azure-extensions/src/main/java/io/druid/storage/azure/AzureTaskLogs.java create mode 100644 extensions/azure-extensions/src/main/java/io/druid/storage/azure/AzureTaskLogsConfig.java create mode 100644 extensions/azure-extensions/src/test/java/io/druid/storage/azure/AzureTaskLogsTest.java diff --git a/docs/content/configuration/indexing-service.md b/docs/content/configuration/indexing-service.md index afe4b83c2b4..96993765f74 100644 --- a/docs/content/configuration/indexing-service.md +++ b/docs/content/configuration/indexing-service.md @@ -19,11 +19,11 @@ The indexing service uses several of the global configs in [Configuration](../co #### Task Logging -If you are running the indexing service in remote mode, the task logs must S3 or HDFS. +If you are running the indexing service in remote mode, the task logs must be stored in S3, Azure Blob Store or HDFS. |Property|Description|Default| |--------|-----------|-------| -|`druid.indexer.logs.type`|Choices:noop, s3, hdfs, file. Where to store task logs|file| +|`druid.indexer.logs.type`|Choices:noop, s3, azure, hdfs, file. Where to store task logs|file| ##### File Task Logs @@ -42,6 +42,16 @@ Store task logs in S3. |`druid.indexer.logs.s3Bucket`|S3 bucket name.|none| |`druid.indexer.logs.s3Prefix`|S3 key prefix.|none| +#### Azure Blob Store Task Logs +Store task logs in Azure Blob Store. + +Note: this uses the same storage account as the deep storage module for azure. + +|Property|Description|Default| +|--------|-----------|-------| +|`druid.indexer.logs.container`|The Azure Blob Store container to write logs to|none| +|`druid.indexer.logs.prefix`|The path to prepend to logs|none| + ##### HDFS Task Logs Store task logs in HDFS. diff --git a/docs/content/ingestion/firehose.md b/docs/content/ingestion/firehose.md index ca48729f706..e98ffa01765 100644 --- a/docs/content/ingestion/firehose.md +++ b/docs/content/ingestion/firehose.md @@ -27,6 +27,33 @@ This firehose acts as a Kafka consumer and ingests data from Kafka. This firehose ingests events from a predefined list of S3 objects. +#### StaticAzureBlobStoreFirehose + +This firehose ingests events, similar to the StaticS3Firehose, but from an Azure Blob Store. + +Data is newline delimited, with one JSON object per line and parsed as per the `InputRowParser` configuration. + +The storage account is shared with the one used for Azure deep storage functionality, but blobs can be in a different container. + +As with the S3 blobstore, it is assumed to be gzipped if the extension ends in .gz + +Sample spec: +```json +"firehose" : { + "type" : "static-azure-blobstore", + "blobs": [ + { + "container": "container", + "path": "/path/to/your/file.json" + }, + { + "container": "anothercontainer", + "path": "/another/path.json" + } + ] +} +``` + #### TwitterSpritzerFirehose See [Examples](../tutorials/examples.html). This firehose connects directly to the twitter spritzer data stream. diff --git a/extensions/azure-extensions/src/main/java/io/druid/firehose/azure/AzureBlob.java b/extensions/azure-extensions/src/main/java/io/druid/firehose/azure/AzureBlob.java new file mode 100644 index 00000000000..1e308b71da4 --- /dev/null +++ b/extensions/azure-extensions/src/main/java/io/druid/firehose/azure/AzureBlob.java @@ -0,0 +1,49 @@ +/* + * Druid - a distributed column store. + * Copyright 2012 - 2015 Metamarkets Group Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.druid.firehose.azure; + +import com.fasterxml.jackson.annotation.JsonProperty; + +import javax.validation.constraints.NotNull; + + +public class AzureBlob { + @JsonProperty + @NotNull + private String container = null; + + @JsonProperty + @NotNull + private String path = null; + + public AzureBlob() { + } + + public AzureBlob(String container, String path) { + this.container = container; + this.path = path; + } + + public String getContainer() { + return container; + } + + public String getPath() { + return path; + } +} diff --git a/extensions/azure-extensions/src/main/java/io/druid/firehose/azure/StaticAzureBlobStoreFirehoseFactory.java b/extensions/azure-extensions/src/main/java/io/druid/firehose/azure/StaticAzureBlobStoreFirehoseFactory.java new file mode 100644 index 00000000000..93a84414098 --- /dev/null +++ b/extensions/azure-extensions/src/main/java/io/druid/firehose/azure/StaticAzureBlobStoreFirehoseFactory.java @@ -0,0 +1,124 @@ +/* + * Druid - a distributed column store. + * Copyright 2012 - 2015 Metamarkets Group Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.druid.firehose.azure; + +import com.fasterxml.jackson.annotation.JacksonInject; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Charsets; +import com.google.common.base.Preconditions; +import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import com.metamx.common.CompressionUtils; +import com.metamx.common.logger.Logger; +import io.druid.data.input.Firehose; +import io.druid.data.input.FirehoseFactory; +import io.druid.data.input.impl.FileIteratingFirehose; +import io.druid.data.input.impl.StringInputRowParser; +import io.druid.storage.azure.AzureByteSource; +import io.druid.storage.azure.AzureStorage; +import org.apache.commons.io.IOUtils; +import org.apache.commons.io.LineIterator; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; + +/** + * This class is heavily inspired by the StaticS3FirehoseFactory class in the io.druid.firehose.s3 package + */ +public class StaticAzureBlobStoreFirehoseFactory implements FirehoseFactory { + private static final Logger log = new Logger(StaticAzureBlobStoreFirehoseFactory.class); + + private final AzureStorage azureStorage; + private final List blobs; + + @JsonCreator + public StaticAzureBlobStoreFirehoseFactory( + @JacksonInject("azureStorage") AzureStorage azureStorage, + @JsonProperty("blobs") AzureBlob[] blobs + ) { + this.azureStorage = azureStorage; + this.blobs = ImmutableList.copyOf(blobs); + } + + @JsonProperty + public List getBlobs() { + return blobs; + } + + @Override + public Firehose connect(StringInputRowParser stringInputRowParser) throws IOException { + Preconditions.checkNotNull(azureStorage, "null azureStorage"); + + final LinkedList objectQueue = Lists.newLinkedList(blobs); + + return new FileIteratingFirehose( + new Iterator() { + @Override + public boolean hasNext() { + return !objectQueue.isEmpty(); + } + + @Override + public LineIterator next() { + final AzureBlob nextURI = objectQueue.poll(); + + final String container = nextURI.getContainer(); + final String path = nextURI.getPath().startsWith("/") + ? nextURI.getPath().substring(1) + : nextURI.getPath(); + + try { + final InputStream innerInputStream = new AzureByteSource(azureStorage, container, path).openStream(); + + + final InputStream outerInputStream = path.endsWith(".gz") + ? CompressionUtils.gzipInputStream(innerInputStream) + : innerInputStream; + + return IOUtils.lineIterator( + new BufferedReader( + new InputStreamReader(outerInputStream, Charsets.UTF_8) + ) + ); + } catch (Exception e) { + log.error(e, + "Exception opening container[%s] blob[%s]", + container, + path + ); + + throw Throwables.propagate(e); + } + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }, + stringInputRowParser + ); + } +} diff --git a/extensions/azure-extensions/src/main/java/io/druid/storage/azure/AzureDataSegmentPusher.java b/extensions/azure-extensions/src/main/java/io/druid/storage/azure/AzureDataSegmentPusher.java index a23ced6c938..9ecb915867e 100644 --- a/extensions/azure-extensions/src/main/java/io/druid/storage/azure/AzureDataSegmentPusher.java +++ b/extensions/azure-extensions/src/main/java/io/druid/storage/azure/AzureDataSegmentPusher.java @@ -22,7 +22,6 @@ import com.google.common.base.Throwables; import com.google.common.collect.ImmutableMap; import com.google.inject.Inject; import com.metamx.common.CompressionUtils; -import com.metamx.common.RetryUtils; import com.metamx.common.logger.Logger; import com.microsoft.azure.storage.StorageException; import io.druid.segment.SegmentUtils; @@ -94,11 +93,6 @@ public class AzureDataSegmentPusher implements DataSegmentPusher } - public T retryAzureOperation(Callable f, int maxTries) throws Exception - { - return RetryUtils.retry(f, AzureUtils.AZURE_RETRY, maxTries); - } - public DataSegment uploadDataSegment( DataSegment segment, final int version, @@ -146,7 +140,7 @@ public class AzureDataSegmentPusher implements DataSegmentPusher final Map azurePaths = getAzurePaths(segment); try { - return retryAzureOperation( + return AzureUtils.retryAzureOperation( new Callable() { @Override diff --git a/extensions/azure-extensions/src/main/java/io/druid/storage/azure/AzureStorage.java b/extensions/azure-extensions/src/main/java/io/druid/storage/azure/AzureStorage.java index b91af1c2eb1..bd24e8781fa 100644 --- a/extensions/azure-extensions/src/main/java/io/druid/storage/azure/AzureStorage.java +++ b/extensions/azure-extensions/src/main/java/io/druid/storage/azure/AzureStorage.java @@ -89,6 +89,16 @@ public class AzureStorage } } + public CloudBlockBlob getBlob(final String containerName, final String blobPath) + throws URISyntaxException, StorageException { + return getCloudBlobContainer(containerName).getBlockBlobReference(blobPath); + } + + public long getBlobLength(final String containerName, final String blobPath) + throws URISyntaxException, StorageException { + return getCloudBlobContainer(containerName).getBlockBlobReference(blobPath).getProperties().getLength(); + } + public InputStream getBlobInputStream(final String containerName, final String blobPath) throws URISyntaxException, StorageException { @@ -96,4 +106,8 @@ public class AzureStorage return container.getBlockBlobReference(blobPath).openInputStream(); } + public boolean getBlobExists(String container, String blobPath) + throws URISyntaxException, StorageException { + return getCloudBlobContainer(container).getBlockBlobReference(blobPath).exists(); + } } diff --git a/extensions/azure-extensions/src/main/java/io/druid/storage/azure/AzureStorageDruidModule.java b/extensions/azure-extensions/src/main/java/io/druid/storage/azure/AzureStorageDruidModule.java index 8987ea2cd58..88f59d7bf72 100644 --- a/extensions/azure-extensions/src/main/java/io/druid/storage/azure/AzureStorageDruidModule.java +++ b/extensions/azure-extensions/src/main/java/io/druid/storage/azure/AzureStorageDruidModule.java @@ -19,14 +19,18 @@ package io.druid.storage.azure; import com.fasterxml.jackson.core.Version; import com.fasterxml.jackson.databind.Module; +import com.fasterxml.jackson.databind.jsontype.NamedType; +import com.fasterxml.jackson.databind.module.SimpleModule; import com.google.common.collect.ImmutableList; import com.google.inject.Binder; import com.google.inject.Provides; import com.microsoft.azure.storage.CloudStorageAccount; import com.microsoft.azure.storage.blob.CloudBlobClient; +import io.druid.firehose.azure.StaticAzureBlobStoreFirehoseFactory; import io.druid.guice.Binders; import io.druid.guice.JsonConfigProvider; import io.druid.guice.LazySingleton; +import io.druid.guice.annotations.Json; import io.druid.initialization.DruidModule; import java.net.URISyntaxException; @@ -64,7 +68,9 @@ public class AzureStorageDruidModule implements DruidModule { context.registerSubtypes(AzureLoadSpec.class); } - } + }, + new SimpleModule().registerSubtypes( + new NamedType(StaticAzureBlobStoreFirehoseFactory.class, "static-azure-blobstore")) ); } @@ -84,6 +90,10 @@ public class AzureStorageDruidModule implements DruidModule Binders.dataSegmentKillerBinder(binder) .addBinding(SCHEME) .to(AzureDataSegmentKiller.class).in(LazySingleton.class); + + Binders.taskLogsBinder(binder).addBinding(SCHEME).to(AzureTaskLogs.class); + JsonConfigProvider.bind(binder, "druid.indexer.logs", AzureTaskLogsConfig.class); + binder.bind(AzureTaskLogs.class).in(LazySingleton.class); } @Provides diff --git a/extensions/azure-extensions/src/main/java/io/druid/storage/azure/AzureTaskLogs.java b/extensions/azure-extensions/src/main/java/io/druid/storage/azure/AzureTaskLogs.java new file mode 100644 index 00000000000..1e15bc70c5b --- /dev/null +++ b/extensions/azure-extensions/src/main/java/io/druid/storage/azure/AzureTaskLogs.java @@ -0,0 +1,112 @@ +/* + * Druid - a distributed column store. + * Copyright 2012 - 2015 Metamarkets Group Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.druid.storage.azure; + +import com.google.common.base.Optional; +import com.google.common.base.Throwables; +import com.google.common.io.ByteSource; +import com.google.inject.Inject; +import com.metamx.common.logger.Logger; +import com.microsoft.azure.storage.StorageException; +import io.druid.tasklogs.TaskLogs; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.net.URISyntaxException; +import java.util.concurrent.Callable; + +public class AzureTaskLogs implements TaskLogs { + + private static final Logger log = new Logger(AzureTaskLogs.class); + + private final AzureTaskLogsConfig config; + private final AzureStorage azureStorage; + + @Inject + public AzureTaskLogs(AzureTaskLogsConfig config, AzureStorage azureStorage) { + this.config = config; + this.azureStorage = azureStorage; + } + + @Override + public void pushTaskLog(final String taskid, final File logFile) throws IOException { + final String taskKey = getTaskLogKey(taskid); + log.info("Pushing task log %s to: %s", logFile, taskKey); + + try { + AzureUtils.retryAzureOperation( + new Callable() { + @Override + public Void call() throws Exception { + azureStorage.uploadBlob(logFile, config.getContainer(), taskKey); + return null; + } + }, + config.getMaxTries() + ); + } catch (Exception e) { + throw Throwables.propagate(e); + } + } + + @Override + public Optional streamTaskLog(final String taskid, final long offset) throws IOException { + final String container = config.getContainer(); + final String taskKey = getTaskLogKey(taskid); + + try { + if (!azureStorage.getBlobExists(container, taskKey)) return Optional.absent(); + + return Optional.of( + new ByteSource() { + @Override + public InputStream openStream() throws IOException { + try { + final long start; + final long length = azureStorage.getBlobLength(container, taskKey); + + if (offset > 0 && offset < length) { + start = offset; + } else if (offset < 0 && (-1 * offset) < length) { + start = length + offset; + } else { + start = 0; + } + + InputStream stream = azureStorage.getBlobInputStream(container, taskKey); + stream.skip(start); + + return stream; + + } catch(Exception e) { + throw new IOException(e); + } + } + } + ); + } catch (StorageException | URISyntaxException e) { + throw new IOException(String.format("Failed to stream logs from: %s", taskKey), e); + } + } + + + private String getTaskLogKey(String taskid) { + return String.format("%s/%s/log", config.getPrefix(), taskid); + } +} diff --git a/extensions/azure-extensions/src/main/java/io/druid/storage/azure/AzureTaskLogsConfig.java b/extensions/azure-extensions/src/main/java/io/druid/storage/azure/AzureTaskLogsConfig.java new file mode 100644 index 00000000000..43bdf144080 --- /dev/null +++ b/extensions/azure-extensions/src/main/java/io/druid/storage/azure/AzureTaskLogsConfig.java @@ -0,0 +1,59 @@ +/* + * Druid - a distributed column store. + * Copyright 2012 - 2015 Metamarkets Group Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.druid.storage.azure; + +import com.fasterxml.jackson.annotation.JsonProperty; + +import javax.validation.constraints.Min; +import javax.validation.constraints.NotNull; + +public class AzureTaskLogsConfig { + @JsonProperty + @NotNull + private String container = null; + + @JsonProperty + @NotNull + private String prefix = null; + + @JsonProperty + @Min(1) + private int maxTries = 3; + + public AzureTaskLogsConfig() { + + } + + public AzureTaskLogsConfig(String container, String prefix, int maxTries) { + this.container = container; + this.prefix = prefix; + this.maxTries = maxTries; + } + + public String getContainer() { + return container; + } + + public String getPrefix() { + return prefix; + } + + public int getMaxTries() { + return maxTries; + } +} diff --git a/extensions/azure-extensions/src/main/java/io/druid/storage/azure/AzureUtils.java b/extensions/azure-extensions/src/main/java/io/druid/storage/azure/AzureUtils.java index 8957112c27b..34dcb1c638f 100644 --- a/extensions/azure-extensions/src/main/java/io/druid/storage/azure/AzureUtils.java +++ b/extensions/azure-extensions/src/main/java/io/druid/storage/azure/AzureUtils.java @@ -18,10 +18,12 @@ package io.druid.storage.azure; import com.google.common.base.Predicate; +import com.metamx.common.RetryUtils; import com.microsoft.azure.storage.StorageException; import java.io.IOException; import java.net.URISyntaxException; +import java.util.concurrent.Callable; public class AzureUtils { @@ -48,5 +50,7 @@ public class AzureUtils } }; - + public static T retryAzureOperation(Callable f, int maxTries) throws Exception { + return RetryUtils.retry(f, AZURE_RETRY, maxTries); + } } diff --git a/extensions/azure-extensions/src/main/resources/META-INF/services/io.druid.initialization.DruidModule b/extensions/azure-extensions/src/main/resources/META-INF/services/io.druid.initialization.DruidModule index c3cf64fa6da..b58fcb22692 100644 --- a/extensions/azure-extensions/src/main/resources/META-INF/services/io.druid.initialization.DruidModule +++ b/extensions/azure-extensions/src/main/resources/META-INF/services/io.druid.initialization.DruidModule @@ -1 +1 @@ -io.druid.storage.azure.AzureStorageDruidModule \ No newline at end of file +io.druid.storage.azure.AzureStorageDruidModule diff --git a/extensions/azure-extensions/src/test/java/io/druid/storage/azure/AzureTaskLogsTest.java b/extensions/azure-extensions/src/test/java/io/druid/storage/azure/AzureTaskLogsTest.java new file mode 100644 index 00000000000..d6a64afa161 --- /dev/null +++ b/extensions/azure-extensions/src/test/java/io/druid/storage/azure/AzureTaskLogsTest.java @@ -0,0 +1,140 @@ +/* + * Druid - a distributed column store. + * Copyright 2012 - 2015 Metamarkets Group Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.druid.storage.azure; + +import com.google.common.base.Charsets; +import com.google.common.base.Optional; +import com.google.common.io.ByteSource; +import com.google.common.io.Files; +import org.apache.commons.io.FileUtils; +import org.apache.commons.io.IOUtils; +import org.easymock.EasyMockSupport; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.ByteArrayInputStream; +import java.io.File; +import java.io.StringWriter; + +import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.expectLastCall; + +public class AzureTaskLogsTest extends EasyMockSupport { + + private static final String container = "test"; + private static final String prefix = "test/log"; + private static final String taskid = "taskid"; + private static final AzureTaskLogsConfig azureTaskLogsConfig = new AzureTaskLogsConfig(container, prefix, 3); + + private AzureStorage azureStorage; + private AzureTaskLogs azureTaskLogs; + + @Before + public void before() { + azureStorage = createMock(AzureStorage.class); + azureTaskLogs = new AzureTaskLogs(azureTaskLogsConfig, azureStorage); + } + + + @Test + public void testPushTaskLog() throws Exception { + final File tmpDir = Files.createTempDir(); + + try { + final File logFile = new File(tmpDir, "log"); + + azureStorage.uploadBlob(logFile, container, prefix + "/" + taskid + "/log"); + expectLastCall(); + + replayAll(); + + azureTaskLogs.pushTaskLog(taskid, logFile); + + verifyAll(); + } finally { + FileUtils.deleteDirectory(tmpDir); + } + } + + @Test + public void testStreamTaskLogWithoutOffset() throws Exception { + final String testLog = "hello this is a log"; + + final String blobPath = prefix + "/" + taskid + "/log"; + expect(azureStorage.getBlobExists(container, blobPath)).andReturn(true); + expect(azureStorage.getBlobLength(container, blobPath)).andReturn((long) testLog.length()); + expect(azureStorage.getBlobInputStream(container, blobPath)).andReturn( + new ByteArrayInputStream(testLog.getBytes(Charsets.UTF_8))); + + + replayAll(); + + final Optional byteSource = azureTaskLogs.streamTaskLog(taskid, 0); + + final StringWriter writer = new StringWriter(); + IOUtils.copy(byteSource.get().openStream(), writer, "UTF-8"); + Assert.assertEquals(writer.toString(), testLog); + + verifyAll(); + } + + @Test + public void testStreamTaskLogWithPositiveOffset() throws Exception { + final String testLog = "hello this is a log"; + + final String blobPath = prefix + "/" + taskid + "/log"; + expect(azureStorage.getBlobExists(container, blobPath)).andReturn(true); + expect(azureStorage.getBlobLength(container, blobPath)).andReturn((long) testLog.length()); + expect(azureStorage.getBlobInputStream(container, blobPath)).andReturn( + new ByteArrayInputStream(testLog.getBytes(Charsets.UTF_8))); + + + replayAll(); + + final Optional byteSource = azureTaskLogs.streamTaskLog(taskid, 5); + + final StringWriter writer = new StringWriter(); + IOUtils.copy(byteSource.get().openStream(), writer, "UTF-8"); + Assert.assertEquals(writer.toString(), testLog.substring(5)); + + verifyAll(); + } + + @Test + public void testStreamTaskLogWithNegative() throws Exception { + final String testLog = "hello this is a log"; + + final String blobPath = prefix + "/" + taskid + "/log"; + expect(azureStorage.getBlobExists(container, blobPath)).andReturn(true); + expect(azureStorage.getBlobLength(container, blobPath)).andReturn((long) testLog.length()); + expect(azureStorage.getBlobInputStream(container, blobPath)).andReturn( + new ByteArrayInputStream(testLog.getBytes(Charsets.UTF_8))); + + + replayAll(); + + final Optional byteSource = azureTaskLogs.streamTaskLog(taskid, -3); + + final StringWriter writer = new StringWriter(); + IOUtils.copy(byteSource.get().openStream(), writer, "UTF-8"); + Assert.assertEquals(writer.toString(), testLog.substring(testLog.length() - 3)); + + verifyAll(); + } +}