Race in Task report/log streamer (#12931)

* Fixing RACE in HTTP remote task Runner

* Changes in the interface

* Updating documentation

* Adding test cases to SwitchingTaskLogStreamer

* Adding more tests
This commit is contained in:
Karan Kumar 2022-08-26 06:26:01 +05:30 committed by GitHub
parent 72aba00e09
commit 275f834b2a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
25 changed files with 794 additions and 411 deletions

View File

@ -20,17 +20,17 @@
package org.apache.druid.tasklogs;
import com.google.common.base.Optional;
import com.google.common.io.ByteSource;
import org.apache.druid.java.util.common.logger.Logger;
import java.io.File;
import java.io.InputStream;
public class NoopTaskLogs implements TaskLogs
{
private final Logger log = new Logger(TaskLogs.class);
@Override
public Optional<ByteSource> streamTaskLog(String taskid, long offset)
public Optional<InputStream> streamTaskLog(String taskid, long offset)
{
return Optional.absent();
}

View File

@ -20,10 +20,10 @@
package org.apache.druid.tasklogs;
import com.google.common.base.Optional;
import com.google.common.io.ByteSource;
import org.apache.druid.guice.annotations.ExtensionPoint;
import java.io.IOException;
import java.io.InputStream;
/**
* Something that knows how to stream logs for tasks.
@ -36,12 +36,11 @@ public interface TaskLogStreamer
*
* @param offset If zero, stream the entire log. If positive, attempt to read from this position onwards. If
* negative, attempt to read this many bytes from the end of the file (like <tt>tail -n</tt>).
*
* @return input supplier for this log, if available from this provider
* @return inputStream for this log, if available
*/
Optional<ByteSource> streamTaskLog(String taskid, long offset) throws IOException;
Optional<InputStream> streamTaskLog(String taskid, long offset) throws IOException;
default Optional<ByteSource> streamTaskReports(final String taskid) throws IOException
default Optional<InputStream> streamTaskReports(final String taskid) throws IOException
{
return Optional.absent();
}

View File

@ -25,7 +25,6 @@ import com.aliyun.oss.model.GetObjectRequest;
import com.aliyun.oss.model.ObjectMetadata;
import com.google.common.base.Optional;
import com.google.common.base.Throwables;
import com.google.common.io.ByteSource;
import com.google.inject.Inject;
import org.apache.druid.common.utils.CurrentTimeMillisSupplier;
import org.apache.druid.java.util.common.IOE;
@ -66,54 +65,44 @@ public class OssTaskLogs implements TaskLogs
}
@Override
public Optional<ByteSource> streamTaskLog(final String taskid, final long offset) throws IOException
public Optional<InputStream> streamTaskLog(final String taskid, final long offset) throws IOException
{
final String taskKey = getTaskLogKey(taskid, "log");
return streamTaskFile(offset, taskKey);
}
@Override
public Optional<ByteSource> streamTaskReports(String taskid) throws IOException
public Optional<InputStream> streamTaskReports(String taskid) throws IOException
{
final String taskKey = getTaskLogKey(taskid, "report.json");
return streamTaskFile(0, taskKey);
}
private Optional<ByteSource> streamTaskFile(final long offset, String taskKey) throws IOException
private Optional<InputStream> streamTaskFile(final long offset, String taskKey) throws IOException
{
try {
final ObjectMetadata objectMetadata = client.getObjectMetadata(config.getBucket(), taskKey);
try {
final long start;
final long end = objectMetadata.getContentLength() - 1;
return Optional.of(
new ByteSource()
{
@Override
public InputStream openStream() throws IOException
{
try {
final long start;
final long end = objectMetadata.getContentLength() - 1;
if (offset > 0 && offset < objectMetadata.getContentLength()) {
start = offset;
} else if (offset < 0 && (-1 * offset) < objectMetadata.getContentLength()) {
start = objectMetadata.getContentLength() + offset;
} else {
start = 0;
}
if (offset > 0 && offset < objectMetadata.getContentLength()) {
start = offset;
} else if (offset < 0 && (-1 * offset) < objectMetadata.getContentLength()) {
start = objectMetadata.getContentLength() + offset;
} else {
start = 0;
}
final GetObjectRequest request = new GetObjectRequest(config.getBucket(), taskKey);
request.setMatchingETagConstraints(Collections.singletonList(objectMetadata.getETag()));
request.setRange(start, end);
final GetObjectRequest request = new GetObjectRequest(config.getBucket(), taskKey);
request.setMatchingETagConstraints(Collections.singletonList(objectMetadata.getETag()));
request.setRange(start, end);
return client.getObject(request).getObjectContent();
}
catch (OSSException e) {
throw new IOException(e);
}
}
}
);
return Optional.of(client.getObject(request).getObjectContent());
}
catch (OSSException e) {
throw new IOException(e);
}
}
catch (OSSException e) {
if ("NoSuchKey".equals(e.getErrorCode())

View File

@ -23,11 +23,15 @@ import com.aliyun.oss.ClientException;
import com.aliyun.oss.OSS;
import com.aliyun.oss.model.AccessControlList;
import com.aliyun.oss.model.DeleteObjectsRequest;
import com.aliyun.oss.model.GetObjectRequest;
import com.aliyun.oss.model.Grant;
import com.aliyun.oss.model.OSSObject;
import com.aliyun.oss.model.OSSObjectSummary;
import com.aliyun.oss.model.ObjectMetadata;
import com.aliyun.oss.model.Owner;
import com.aliyun.oss.model.PutObjectRequest;
import com.aliyun.oss.model.PutObjectResult;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.common.utils.CurrentTimeMillisSupplier;
@ -42,12 +46,19 @@ import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import javax.annotation.Nonnull;
import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
@RunWith(EasyMockRunner.class)
public class OssTaskLogsTest extends EasyMockSupport
@ -65,6 +76,9 @@ public class OssTaskLogsTest extends EasyMockSupport
private static final int MAX_KEYS = 1;
private static final Exception RECOVERABLE_EXCEPTION = new ClientException(new IOException());
private static final Exception NON_RECOVERABLE_EXCEPTION = new ClientException(new NullPointerException());
private static final String LOG_CONTENTS = "log_contents";
private static final String REPORT_CONTENTS = "report_contents";
@Mock
private CurrentTimeMillisSupplier timeSupplier;
@ -113,12 +127,7 @@ public class OssTaskLogsTest extends EasyMockSupport
EasyMock.replay(ossClient, timeSupplier);
OssTaskLogsConfig config = new OssTaskLogsConfig();
config.setBucket(TEST_BUCKET);
config.setPrefix(TEST_PREFIX);
OssInputDataConfig inputDataConfig = new OssInputDataConfig();
inputDataConfig.setMaxListingLength(MAX_KEYS);
OssTaskLogs taskLogs = new OssTaskLogs(ossClient, config, inputDataConfig, timeSupplier);
OssTaskLogs taskLogs = getOssTaskLogs();
taskLogs.killAll();
EasyMock.verify(ossClient, timeSupplier);
@ -147,12 +156,7 @@ public class OssTaskLogsTest extends EasyMockSupport
EasyMock.replay(ossClient, timeSupplier);
OssTaskLogsConfig config = new OssTaskLogsConfig();
config.setBucket(TEST_BUCKET);
config.setPrefix(TEST_PREFIX);
OssInputDataConfig inputDataConfig = new OssInputDataConfig();
inputDataConfig.setMaxListingLength(MAX_KEYS);
OssTaskLogs taskLogs = new OssTaskLogs(ossClient, config, inputDataConfig, timeSupplier);
OssTaskLogs taskLogs = getOssTaskLogs();
taskLogs.killAll();
EasyMock.verify(ossClient, timeSupplier);
@ -181,12 +185,7 @@ public class OssTaskLogsTest extends EasyMockSupport
EasyMock.replay(ossClient, timeSupplier);
OssTaskLogsConfig config = new OssTaskLogsConfig();
config.setBucket(TEST_BUCKET);
config.setPrefix(TEST_PREFIX);
OssInputDataConfig inputDataConfig = new OssInputDataConfig();
inputDataConfig.setMaxListingLength(MAX_KEYS);
OssTaskLogs taskLogs = new OssTaskLogs(ossClient, config, inputDataConfig, timeSupplier);
OssTaskLogs taskLogs = getOssTaskLogs();
taskLogs.killAll();
}
catch (IOException e) {
@ -217,12 +216,7 @@ public class OssTaskLogsTest extends EasyMockSupport
EasyMock.replay(ossClient, timeSupplier);
OssTaskLogsConfig config = new OssTaskLogsConfig();
config.setBucket(TEST_BUCKET);
config.setPrefix(TEST_PREFIX);
OssInputDataConfig inputDataConfig = new OssInputDataConfig();
inputDataConfig.setMaxListingLength(MAX_KEYS);
OssTaskLogs taskLogs = new OssTaskLogs(ossClient, config, inputDataConfig, timeSupplier);
OssTaskLogs taskLogs = getOssTaskLogs();
taskLogs.killOlderThan(TIME_NOW);
EasyMock.verify(ossClient, timeSupplier);
@ -250,12 +244,7 @@ public class OssTaskLogsTest extends EasyMockSupport
EasyMock.replay(ossClient, timeSupplier);
OssTaskLogsConfig config = new OssTaskLogsConfig();
config.setBucket(TEST_BUCKET);
config.setPrefix(TEST_PREFIX);
OssInputDataConfig inputDataConfig = new OssInputDataConfig();
inputDataConfig.setMaxListingLength(MAX_KEYS);
OssTaskLogs taskLogs = new OssTaskLogs(ossClient, config, inputDataConfig, timeSupplier);
OssTaskLogs taskLogs = getOssTaskLogs();
taskLogs.killOlderThan(TIME_NOW);
EasyMock.verify(ossClient, timeSupplier);
@ -283,12 +272,7 @@ public class OssTaskLogsTest extends EasyMockSupport
EasyMock.replay(ossClient, timeSupplier);
OssTaskLogsConfig config = new OssTaskLogsConfig();
config.setBucket(TEST_BUCKET);
config.setPrefix(TEST_PREFIX);
OssInputDataConfig inputDataConfig = new OssInputDataConfig();
inputDataConfig.setMaxListingLength(MAX_KEYS);
OssTaskLogs taskLogs = new OssTaskLogs(ossClient, config, inputDataConfig, timeSupplier);
OssTaskLogs taskLogs = getOssTaskLogs();
taskLogs.killOlderThan(TIME_NOW);
}
catch (IOException e) {
@ -300,6 +284,115 @@ public class OssTaskLogsTest extends EasyMockSupport
EasyMock.verify(ossClient, timeSupplier);
}
@Test
public void test_taskLog_fetch() throws IOException
{
EasyMock.reset(ossClient);
String logPath = TEST_PREFIX + "/" + KEY_1 + "/log";
ObjectMetadata objectMetadata = new ObjectMetadata();
objectMetadata.setContentLength(LOG_CONTENTS.length());
EasyMock.expect(ossClient.getObjectMetadata(TEST_BUCKET, logPath)).andReturn(objectMetadata);
OSSObject ossObject = new OSSObject();
ossObject.setObjectContent(new ByteArrayInputStream(LOG_CONTENTS.getBytes(StandardCharsets.UTF_8)));
EasyMock.expect(ossClient.getObject(EasyMock.isA(GetObjectRequest.class))).andReturn(ossObject);
EasyMock.replay(ossClient);
OssTaskLogs ossTaskLogs = getOssTaskLogs();
Optional<InputStream> inputStreamOptional = ossTaskLogs.streamTaskLog(KEY_1, 0);
String taskLogs = new BufferedReader(
new InputStreamReader(inputStreamOptional.get(), StandardCharsets.UTF_8))
.lines()
.collect(Collectors.joining("\n"));
Assert.assertEquals(LOG_CONTENTS, taskLogs);
}
@Test
public void test_taskLog_fetch_withRange() throws IOException
{
EasyMock.reset(ossClient);
String logPath = TEST_PREFIX + "/" + KEY_1 + "/log";
ObjectMetadata objectMetadata = new ObjectMetadata();
objectMetadata.setContentLength(LOG_CONTENTS.length());
EasyMock.expect(ossClient.getObjectMetadata(TEST_BUCKET, logPath)).andReturn(objectMetadata);
OSSObject ossObject = new OSSObject();
ossObject.setObjectContent(new ByteArrayInputStream(LOG_CONTENTS.substring(1).getBytes(StandardCharsets.UTF_8)));
EasyMock.expect(ossClient.getObject(EasyMock.isA(GetObjectRequest.class))).andReturn(ossObject);
EasyMock.replay(ossClient);
OssTaskLogs ossTaskLogs = getOssTaskLogs();
Optional<InputStream> inputStreamOptional = ossTaskLogs.streamTaskLog(KEY_1, 1);
String taskLogs = new BufferedReader(
new InputStreamReader(inputStreamOptional.get(), StandardCharsets.UTF_8))
.lines()
.collect(Collectors.joining("\n"));
Assert.assertEquals(LOG_CONTENTS.substring(1), taskLogs);
}
@Test
public void test_taskLog_fetch_withNegativeRange() throws IOException
{
EasyMock.reset(ossClient);
String logPath = TEST_PREFIX + "/" + KEY_1 + "/log";
ObjectMetadata objectMetadata = new ObjectMetadata();
objectMetadata.setContentLength(LOG_CONTENTS.length());
EasyMock.expect(ossClient.getObjectMetadata(TEST_BUCKET, logPath)).andReturn(objectMetadata);
OSSObject ossObject = new OSSObject();
ossObject.setObjectContent(new ByteArrayInputStream(LOG_CONTENTS.substring(1).getBytes(StandardCharsets.UTF_8)));
EasyMock.expect(ossClient.getObject(EasyMock.isA(GetObjectRequest.class))).andReturn(ossObject);
EasyMock.replay(ossClient);
OssTaskLogs ossTaskLogs = getOssTaskLogs();
Optional<InputStream> inputStreamOptional = ossTaskLogs.streamTaskLog(KEY_1, -1 * (LOG_CONTENTS.length() - 1));
String taskLogs = new BufferedReader(
new InputStreamReader(inputStreamOptional.get(), StandardCharsets.UTF_8))
.lines()
.collect(Collectors.joining("\n"));
Assert.assertEquals(LOG_CONTENTS.substring(1), taskLogs);
}
@Test
public void test_taskReport_fetch() throws IOException
{
EasyMock.reset(ossClient);
String logPath = TEST_PREFIX + "/" + KEY_1 + "/report.json";
ObjectMetadata objectMetadata = new ObjectMetadata();
objectMetadata.setContentLength(REPORT_CONTENTS.length());
EasyMock.expect(ossClient.getObjectMetadata(TEST_BUCKET, logPath)).andReturn(objectMetadata);
OSSObject ossObject = new OSSObject();
ossObject.setObjectContent(new ByteArrayInputStream(REPORT_CONTENTS.getBytes(StandardCharsets.UTF_8)));
EasyMock.expect(ossClient.getObject(EasyMock.isA(GetObjectRequest.class))).andReturn(ossObject);
EasyMock.replay(ossClient);
OssTaskLogs ossTaskLogs = getOssTaskLogs();
Optional<InputStream> inputStreamOptional = ossTaskLogs.streamTaskReports(KEY_1);
String report = new BufferedReader(
new InputStreamReader(inputStreamOptional.get(), StandardCharsets.UTF_8))
.lines()
.collect(Collectors.joining("\n"));
Assert.assertEquals(REPORT_CONTENTS, report);
}
@Nonnull
private OssTaskLogs getOssTaskLogs()
{
OssTaskLogsConfig config = new OssTaskLogsConfig();
config.setBucket(TEST_BUCKET);
config.setPrefix(TEST_PREFIX);
OssInputDataConfig inputDataConfig = new OssInputDataConfig();
inputDataConfig.setMaxListingLength(MAX_KEYS);
OssTaskLogs taskLogs = new OssTaskLogs(ossClient, config, inputDataConfig, timeSupplier);
return taskLogs;
}
private List<Grant> testPushInternal(boolean disableAcl, String ownerId, String ownerDisplayName) throws Exception
{
EasyMock.expect(ossClient.putObject(EasyMock.anyObject()))

View File

@ -20,7 +20,6 @@
package org.apache.druid.storage.azure;
import com.google.common.base.Optional;
import com.google.common.io.ByteSource;
import com.google.inject.Inject;
import com.microsoft.azure.storage.StorageException;
import org.apache.druid.common.utils.CurrentTimeMillisSupplier;
@ -100,56 +99,45 @@ public class AzureTaskLogs implements TaskLogs
}
@Override
public Optional<ByteSource> streamTaskLog(final String taskid, final long offset) throws IOException
public Optional<InputStream> streamTaskLog(final String taskid, final long offset) throws IOException
{
return streamTaskFile(taskid, offset, getTaskLogKey(taskid));
}
@Override
public Optional<ByteSource> streamTaskReports(String taskid) throws IOException
public Optional<InputStream> streamTaskReports(String taskid) throws IOException
{
return streamTaskFile(taskid, 0, getTaskReportsKey(taskid));
}
private Optional<ByteSource> streamTaskFile(final String taskid, final long offset, String taskKey) throws IOException
private Optional<InputStream> streamTaskFile(final String taskid, final long offset, String taskKey)
throws IOException
{
final String container = config.getContainer();
try {
if (!azureStorage.getBlobExists(container, taskKey)) {
return Optional.absent();
}
try {
final long start;
final long length = azureStorage.getBlobLength(container, taskKey);
return Optional.of(
new ByteSource()
{
@Override
public InputStream openStream() throws IOException
{
try {
final long start;
final long length = azureStorage.getBlobLength(container, taskKey);
if (offset > 0 && offset < length) {
start = offset;
} else if (offset < 0 && (-1 * offset) < length) {
start = length + offset;
} else {
start = 0;
}
if (offset > 0 && offset < length) {
start = offset;
} else if (offset < 0 && (-1 * offset) < length) {
start = length + offset;
} else {
start = 0;
}
InputStream stream = azureStorage.getBlobInputStream(container, taskKey);
stream.skip(start);
InputStream stream = azureStorage.getBlobInputStream(container, taskKey);
stream.skip(start);
return stream;
}
catch (Exception e) {
throw new IOException(e);
}
}
}
);
return Optional.of(stream);
}
catch (Exception e) {
throw new IOException(e);
}
}
catch (StorageException | URISyntaxException e) {
throw new IOE(e, "Failed to stream logs from: %s", taskKey);

View File

@ -22,7 +22,6 @@ package org.apache.druid.storage.azure;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.ByteSource;
import com.microsoft.azure.storage.StorageException;
import org.apache.commons.io.IOUtils;
import org.apache.druid.common.utils.CurrentTimeMillisSupplier;
@ -39,6 +38,7 @@ import org.junit.Test;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.StringWriter;
import java.net.URI;
import java.net.URISyntaxException;
@ -191,10 +191,10 @@ public class AzureTaskLogsTest extends EasyMockSupport
replayAll();
final Optional<ByteSource> byteSource = azureTaskLogs.streamTaskLog(TASK_ID, 0);
final Optional<InputStream> stream = azureTaskLogs.streamTaskLog(TASK_ID, 0);
final StringWriter writer = new StringWriter();
IOUtils.copy(byteSource.get().openStream(), writer, "UTF-8");
IOUtils.copy(stream.get(), writer, "UTF-8");
Assert.assertEquals(writer.toString(), testLog);
verifyAll();
@ -214,10 +214,10 @@ public class AzureTaskLogsTest extends EasyMockSupport
replayAll();
final Optional<ByteSource> byteSource = azureTaskLogs.streamTaskLog(TASK_ID, 5);
final Optional<InputStream> stream = azureTaskLogs.streamTaskLog(TASK_ID, 5);
final StringWriter writer = new StringWriter();
IOUtils.copy(byteSource.get().openStream(), writer, "UTF-8");
IOUtils.copy(stream.get(), writer, "UTF-8");
Assert.assertEquals(writer.toString(), testLog.substring(5));
verifyAll();
@ -237,10 +237,10 @@ public class AzureTaskLogsTest extends EasyMockSupport
replayAll();
final Optional<ByteSource> byteSource = azureTaskLogs.streamTaskLog(TASK_ID, -3);
final Optional<InputStream> stream = azureTaskLogs.streamTaskLog(TASK_ID, -3);
final StringWriter writer = new StringWriter();
IOUtils.copy(byteSource.get().openStream(), writer, "UTF-8");
IOUtils.copy(stream.get(), writer, "UTF-8");
Assert.assertEquals(writer.toString(), testLog.substring(testLog.length() - 3));
verifyAll();
@ -260,10 +260,10 @@ public class AzureTaskLogsTest extends EasyMockSupport
replayAll();
final Optional<ByteSource> byteSource = azureTaskLogs.streamTaskReports(TASK_ID);
final Optional<InputStream> stream = azureTaskLogs.streamTaskReports(TASK_ID);
final StringWriter writer = new StringWriter();
IOUtils.copy(byteSource.get().openStream(), writer, "UTF-8");
IOUtils.copy(stream.get(), writer, "UTF-8");
Assert.assertEquals(writer.toString(), testLog);
verifyAll();
@ -279,10 +279,10 @@ public class AzureTaskLogsTest extends EasyMockSupport
replayAll();
final Optional<ByteSource> byteSource = azureTaskLogs.streamTaskReports(TASK_ID_NOT_FOUND);
final Optional<InputStream> stream = azureTaskLogs.streamTaskReports(TASK_ID_NOT_FOUND);
Assert.assertFalse(byteSource.isPresent());
Assert.assertFalse(stream.isPresent());
verifyAll();
}
@ -301,10 +301,10 @@ public class AzureTaskLogsTest extends EasyMockSupport
replayAll();
final Optional<ByteSource> byteSource = azureTaskLogs.streamTaskReports(TASK_ID);
final Optional<InputStream> stream = azureTaskLogs.streamTaskReports(TASK_ID);
final StringWriter writer = new StringWriter();
IOUtils.copy(byteSource.get().openStream(), writer, "UTF-8");
IOUtils.copy(stream.get(), writer, "UTF-8");
verifyAll();
}

View File

@ -21,7 +21,6 @@ package org.apache.druid.storage.google;
import com.google.api.client.http.InputStreamContent;
import com.google.common.base.Optional;
import com.google.common.io.ByteSource;
import com.google.inject.Inject;
import org.apache.druid.common.utils.CurrentTimeMillisSupplier;
import org.apache.druid.java.util.common.IOE;
@ -103,20 +102,21 @@ public class GoogleTaskLogs implements TaskLogs
}
@Override
public Optional<ByteSource> streamTaskLog(final String taskid, final long offset) throws IOException
public Optional<InputStream> streamTaskLog(final String taskid, final long offset) throws IOException
{
final String taskKey = getTaskLogKey(taskid);
return streamTaskFile(taskid, offset, taskKey);
}
@Override
public Optional<ByteSource> streamTaskReports(String taskid) throws IOException
public Optional<InputStream> streamTaskReports(String taskid) throws IOException
{
final String taskKey = getTaskReportKey(taskid);
return streamTaskFile(taskid, 0, taskKey);
}
private Optional<ByteSource> streamTaskFile(final String taskid, final long offset, String taskKey) throws IOException
private Optional<InputStream> streamTaskFile(final String taskid, final long offset, String taskKey)
throws IOException
{
try {
if (!storage.exists(config.getBucket(), taskKey)) {
@ -124,32 +124,22 @@ public class GoogleTaskLogs implements TaskLogs
}
final long length = storage.size(config.getBucket(), taskKey);
try {
final long start;
return Optional.of(
new ByteSource()
{
@Override
public InputStream openStream() throws IOException
{
try {
final long start;
if (offset > 0 && offset < length) {
start = offset;
} else if (offset < 0 && (-1 * offset) < length) {
start = length + offset;
} else {
start = 0;
}
if (offset > 0 && offset < length) {
start = offset;
} else if (offset < 0 && (-1 * offset) < length) {
start = length + offset;
} else {
start = 0;
}
return new GoogleByteSource(storage, config.getBucket(), taskKey).openStream(start);
}
catch (Exception e) {
throw new IOException(e);
}
}
}
);
return Optional.of(new GoogleByteSource(storage, config.getBucket(), taskKey).openStream(start));
}
catch (Exception e) {
throw new IOException(e);
}
}
catch (IOException e) {
throw new IOE(e, "Failed to stream logs from: %s", taskKey);

View File

@ -27,7 +27,6 @@ import com.google.api.services.storage.model.StorageObject;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.ByteSource;
import org.apache.commons.io.IOUtils;
import org.apache.druid.common.utils.CurrentTimeMillisSupplier;
import org.apache.druid.java.util.common.FileUtils;
@ -42,6 +41,7 @@ import java.io.BufferedWriter;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.StringWriter;
import java.net.URI;
import java.nio.charset.StandardCharsets;
@ -121,10 +121,10 @@ public class GoogleTaskLogsTest extends EasyMockSupport
replayAll();
final Optional<ByteSource> byteSource = googleTaskLogs.streamTaskLog(TASKID, 0);
final Optional<InputStream> stream = googleTaskLogs.streamTaskLog(TASKID, 0);
final StringWriter writer = new StringWriter();
IOUtils.copy(byteSource.get().openStream(), writer, "UTF-8");
IOUtils.copy(stream.get(), writer, "UTF-8");
Assert.assertEquals(writer.toString(), testLog);
verifyAll();
@ -144,10 +144,10 @@ public class GoogleTaskLogsTest extends EasyMockSupport
replayAll();
final Optional<ByteSource> byteSource = googleTaskLogs.streamTaskLog(TASKID, offset);
final Optional<InputStream> stream = googleTaskLogs.streamTaskLog(TASKID, offset);
final StringWriter writer = new StringWriter();
IOUtils.copy(byteSource.get().openStream(), writer, "UTF-8");
IOUtils.copy(stream.get(), writer, "UTF-8");
Assert.assertEquals(writer.toString(), expectedLog);
verifyAll();
@ -168,10 +168,10 @@ public class GoogleTaskLogsTest extends EasyMockSupport
replayAll();
final Optional<ByteSource> byteSource = googleTaskLogs.streamTaskLog(TASKID, offset);
final Optional<InputStream> stream = googleTaskLogs.streamTaskLog(TASKID, offset);
final StringWriter writer = new StringWriter();
IOUtils.copy(byteSource.get().openStream(), writer, "UTF-8");
IOUtils.copy(stream.get(), writer, "UTF-8");
Assert.assertEquals(writer.toString(), expectedLog);
verifyAll();

View File

@ -20,7 +20,6 @@
package org.apache.druid.storage.hdfs.tasklog;
import com.google.common.base.Optional;
import com.google.common.io.ByteSource;
import com.google.common.io.ByteStreams;
import com.google.inject.Inject;
import org.apache.druid.guice.Hdfs;
@ -88,44 +87,35 @@ public class HdfsTaskLogs implements TaskLogs
}
@Override
public Optional<ByteSource> streamTaskLog(final String taskId, final long offset) throws IOException
public Optional<InputStream> streamTaskLog(final String taskId, final long offset) throws IOException
{
final Path path = getTaskLogFileFromId(taskId);
return streamTaskFile(path, offset);
}
@Override
public Optional<ByteSource> streamTaskReports(String taskId) throws IOException
public Optional<InputStream> streamTaskReports(String taskId) throws IOException
{
final Path path = getTaskReportsFileFromId(taskId);
return streamTaskFile(path, 0);
}
private Optional<ByteSource> streamTaskFile(final Path path, final long offset) throws IOException
private Optional<InputStream> streamTaskFile(final Path path, final long offset) throws IOException
{
final FileSystem fs = path.getFileSystem(hadoopConfig);
if (fs.exists(path)) {
return Optional.of(
new ByteSource()
{
@Override
public InputStream openStream() throws IOException
{
log.info("Reading task log from: %s", path);
final long seekPos;
if (offset < 0) {
final FileStatus stat = fs.getFileStatus(path);
seekPos = Math.max(0, stat.getLen() + offset);
} else {
seekPos = offset;
}
final FSDataInputStream inputStream = fs.open(path);
inputStream.seek(seekPos);
log.info("Read task log from: %s (seek = %,d)", path, seekPos);
return inputStream;
}
}
);
log.info("Reading task log from: %s", path);
final long seekPos;
if (offset < 0) {
final FileStatus stat = fs.getFileStatus(path);
seekPos = Math.max(0, stat.getLen() + offset);
} else {
seekPos = offset;
}
final FSDataInputStream inputStream = fs.open(path);
inputStream.seek(seekPos);
log.info("Read task log from: %s (seek = %,d)", path, seekPos);
return Optional.of(inputStream);
} else {
return Optional.absent();
}

View File

@ -115,6 +115,6 @@ public class HdfsTaskLogsTest
private String readLog(TaskLogs taskLogs, String logFile, long offset) throws IOException
{
return StringUtils.fromUtf8(ByteStreams.toByteArray(taskLogs.streamTaskLog(logFile, offset).get().openStream()));
return StringUtils.fromUtf8(ByteStreams.toByteArray(taskLogs.streamTaskLog(logFile, offset).get()));
}
}

View File

@ -25,7 +25,6 @@ import com.amazonaws.services.s3.model.GetObjectRequest;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.google.common.base.Optional;
import com.google.common.base.Throwables;
import com.google.common.io.ByteSource;
import com.google.inject.Inject;
import org.apache.druid.common.utils.CurrentTimeMillisSupplier;
import org.apache.druid.java.util.common.IOE;
@ -65,54 +64,45 @@ public class S3TaskLogs implements TaskLogs
}
@Override
public Optional<ByteSource> streamTaskLog(final String taskid, final long offset) throws IOException
public Optional<InputStream> streamTaskLog(final String taskid, final long offset) throws IOException
{
final String taskKey = getTaskLogKey(taskid, "log");
return streamTaskFile(offset, taskKey);
}
@Override
public Optional<ByteSource> streamTaskReports(String taskid) throws IOException
public Optional<InputStream> streamTaskReports(String taskid) throws IOException
{
final String taskKey = getTaskLogKey(taskid, "report.json");
return streamTaskFile(0, taskKey);
}
private Optional<ByteSource> streamTaskFile(final long offset, String taskKey) throws IOException
private Optional<InputStream> streamTaskFile(final long offset, String taskKey) throws IOException
{
try {
final ObjectMetadata objectMetadata = service.getObjectMetadata(config.getS3Bucket(), taskKey);
return Optional.of(
new ByteSource()
{
@Override
public InputStream openStream() throws IOException
{
try {
final long start;
final long end = objectMetadata.getContentLength() - 1;
try {
final long start;
final long end = objectMetadata.getContentLength() - 1;
if (offset > 0 && offset < objectMetadata.getContentLength()) {
start = offset;
} else if (offset < 0 && (-1 * offset) < objectMetadata.getContentLength()) {
start = objectMetadata.getContentLength() + offset;
} else {
start = 0;
}
if (offset > 0 && offset < objectMetadata.getContentLength()) {
start = offset;
} else if (offset < 0 && (-1 * offset) < objectMetadata.getContentLength()) {
start = objectMetadata.getContentLength() + offset;
} else {
start = 0;
}
final GetObjectRequest request = new GetObjectRequest(config.getS3Bucket(), taskKey)
.withMatchingETagConstraint(objectMetadata.getETag())
.withRange(start, end);
final GetObjectRequest request = new GetObjectRequest(config.getS3Bucket(), taskKey)
.withMatchingETagConstraint(objectMetadata.getETag())
.withRange(start, end);
return service.getObject(request).getObjectContent();
}
catch (AmazonServiceException e) {
throw new IOException(e);
}
}
}
);
return Optional.of(service.getObject(request).getObjectContent());
}
catch (AmazonServiceException e) {
throw new IOException(e);
}
}
catch (AmazonS3Exception e) {
if (404 == e.getStatusCode()

View File

@ -22,12 +22,16 @@ package org.apache.druid.storage.s3;
import com.amazonaws.SdkClientException;
import com.amazonaws.services.s3.model.AccessControlList;
import com.amazonaws.services.s3.model.DeleteObjectsRequest;
import com.amazonaws.services.s3.model.GetObjectRequest;
import com.amazonaws.services.s3.model.Grant;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.Owner;
import com.amazonaws.services.s3.model.Permission;
import com.amazonaws.services.s3.model.PutObjectRequest;
import com.amazonaws.services.s3.model.PutObjectResult;
import com.amazonaws.services.s3.model.S3Object;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.common.utils.CurrentTimeMillisSupplier;
@ -42,10 +46,17 @@ import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import javax.annotation.Nonnull;
import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.stream.Collectors;
@RunWith(EasyMockRunner.class)
public class S3TaskLogsTest extends EasyMockSupport
@ -63,6 +74,8 @@ public class S3TaskLogsTest extends EasyMockSupport
private static final int MAX_KEYS = 1;
private static final Exception RECOVERABLE_EXCEPTION = new SdkClientException(new IOException());
private static final Exception NON_RECOVERABLE_EXCEPTION = new SdkClientException(new NullPointerException());
private static final String LOG_CONTENTS = "log_contents";
private static final String REPORT_CONTENTS = "report_contents";
@Mock
private CurrentTimeMillisSupplier timeSupplier;
@ -136,12 +149,7 @@ public class S3TaskLogsTest extends EasyMockSupport
EasyMock.replay(s3Client, timeSupplier);
S3TaskLogsConfig config = new S3TaskLogsConfig();
config.setS3Bucket(TEST_BUCKET);
config.setS3Prefix(TEST_PREFIX);
S3InputDataConfig inputDataConfig = new S3InputDataConfig();
inputDataConfig.setMaxListingLength(MAX_KEYS);
S3TaskLogs s3TaskLogs = new S3TaskLogs(s3Client, config, inputDataConfig, timeSupplier);
S3TaskLogs s3TaskLogs = getS3TaskLogs();
s3TaskLogs.killAll();
EasyMock.verify(s3Client, timeSupplier);
@ -174,12 +182,7 @@ public class S3TaskLogsTest extends EasyMockSupport
EasyMock.replay(s3Client, timeSupplier);
S3TaskLogsConfig config = new S3TaskLogsConfig();
config.setS3Bucket(TEST_BUCKET);
config.setS3Prefix(TEST_PREFIX);
S3InputDataConfig inputDataConfig = new S3InputDataConfig();
inputDataConfig.setMaxListingLength(MAX_KEYS);
S3TaskLogs s3TaskLogs = new S3TaskLogs(s3Client, config, inputDataConfig, timeSupplier);
S3TaskLogs s3TaskLogs = getS3TaskLogs();
s3TaskLogs.killAll();
EasyMock.verify(s3Client, timeSupplier);
@ -211,12 +214,7 @@ public class S3TaskLogsTest extends EasyMockSupport
EasyMock.replay(s3Client, timeSupplier);
S3TaskLogsConfig config = new S3TaskLogsConfig();
config.setS3Bucket(TEST_BUCKET);
config.setS3Prefix(TEST_PREFIX);
S3InputDataConfig inputDataConfig = new S3InputDataConfig();
inputDataConfig.setMaxListingLength(MAX_KEYS);
S3TaskLogs s3TaskLogs = new S3TaskLogs(s3Client, config, inputDataConfig, timeSupplier);
S3TaskLogs s3TaskLogs = getS3TaskLogs();
s3TaskLogs.killAll();
}
catch (IOException e) {
@ -250,12 +248,7 @@ public class S3TaskLogsTest extends EasyMockSupport
EasyMock.replay(s3Client, timeSupplier);
S3TaskLogsConfig config = new S3TaskLogsConfig();
config.setS3Bucket(TEST_BUCKET);
config.setS3Prefix(TEST_PREFIX);
S3InputDataConfig inputDataConfig = new S3InputDataConfig();
inputDataConfig.setMaxListingLength(MAX_KEYS);
S3TaskLogs s3TaskLogs = new S3TaskLogs(s3Client, config, inputDataConfig, timeSupplier);
S3TaskLogs s3TaskLogs = getS3TaskLogs();
s3TaskLogs.killOlderThan(TIME_NOW);
EasyMock.verify(s3Client, timeSupplier);
@ -286,12 +279,7 @@ public class S3TaskLogsTest extends EasyMockSupport
EasyMock.replay(s3Client, timeSupplier);
S3TaskLogsConfig config = new S3TaskLogsConfig();
config.setS3Bucket(TEST_BUCKET);
config.setS3Prefix(TEST_PREFIX);
S3InputDataConfig inputDataConfig = new S3InputDataConfig();
inputDataConfig.setMaxListingLength(MAX_KEYS);
S3TaskLogs s3TaskLogs = new S3TaskLogs(s3Client, config, inputDataConfig, timeSupplier);
S3TaskLogs s3TaskLogs = getS3TaskLogs();
s3TaskLogs.killOlderThan(TIME_NOW);
EasyMock.verify(s3Client, timeSupplier);
@ -322,12 +310,7 @@ public class S3TaskLogsTest extends EasyMockSupport
EasyMock.replay(s3Client, timeSupplier);
S3TaskLogsConfig config = new S3TaskLogsConfig();
config.setS3Bucket(TEST_BUCKET);
config.setS3Prefix(TEST_PREFIX);
S3InputDataConfig inputDataConfig = new S3InputDataConfig();
inputDataConfig.setMaxListingLength(MAX_KEYS);
S3TaskLogs s3TaskLogs = new S3TaskLogs(s3Client, config, inputDataConfig, timeSupplier);
S3TaskLogs s3TaskLogs = getS3TaskLogs();
s3TaskLogs.killOlderThan(TIME_NOW);
}
catch (IOException e) {
@ -339,6 +322,131 @@ public class S3TaskLogsTest extends EasyMockSupport
EasyMock.verify(s3Client, timeSupplier);
}
@Test
public void test_taskLog_fetch() throws IOException
{
EasyMock.reset(s3Client);
String logPath = TEST_PREFIX + "/" + KEY_1 + "/log";
ObjectMetadata objectMetadata = new ObjectMetadata();
objectMetadata.setContentLength(LOG_CONTENTS.length());
EasyMock.expect(s3Client.getObjectMetadata(TEST_BUCKET, logPath)).andReturn(objectMetadata);
S3Object s3Object = new S3Object();
s3Object.setObjectContent(new ByteArrayInputStream(LOG_CONTENTS.getBytes(StandardCharsets.UTF_8)));
GetObjectRequest getObjectRequest = new GetObjectRequest(TEST_BUCKET, logPath);
getObjectRequest.setRange(0, LOG_CONTENTS.length() - 1);
getObjectRequest.withMatchingETagConstraint(objectMetadata.getETag());
EasyMock.expect(s3Client.getObject(getObjectRequest)).andReturn(s3Object);
EasyMock.replay(s3Client);
S3TaskLogs s3TaskLogs = getS3TaskLogs();
Optional<InputStream> inputStreamOptional = s3TaskLogs.streamTaskLog(KEY_1, 0);
String taskLogs = new BufferedReader(
new InputStreamReader(inputStreamOptional.get(), StandardCharsets.UTF_8))
.lines()
.collect(Collectors.joining("\n"));
Assert.assertEquals(LOG_CONTENTS, taskLogs);
}
@Test
public void test_taskLog_fetch_withRange() throws IOException
{
EasyMock.reset(s3Client);
String logPath = TEST_PREFIX + "/" + KEY_1 + "/log";
ObjectMetadata objectMetadata = new ObjectMetadata();
objectMetadata.setContentLength(LOG_CONTENTS.length());
EasyMock.expect(s3Client.getObjectMetadata(TEST_BUCKET, logPath)).andReturn(objectMetadata);
S3Object s3Object = new S3Object();
s3Object.setObjectContent(new ByteArrayInputStream(LOG_CONTENTS.substring(1).getBytes(StandardCharsets.UTF_8)));
GetObjectRequest getObjectRequest = new GetObjectRequest(TEST_BUCKET, logPath);
getObjectRequest.setRange(1, LOG_CONTENTS.length() - 1);
getObjectRequest.withMatchingETagConstraint(objectMetadata.getETag());
EasyMock.expect(s3Client.getObject(getObjectRequest)).andReturn(s3Object);
EasyMock.replay(s3Client);
S3TaskLogs s3TaskLogs = getS3TaskLogs();
Optional<InputStream> inputStreamOptional = s3TaskLogs.streamTaskLog(KEY_1, 1);
String taskLogs = new BufferedReader(
new InputStreamReader(inputStreamOptional.get(), StandardCharsets.UTF_8))
.lines()
.collect(Collectors.joining("\n"));
Assert.assertEquals(LOG_CONTENTS.substring(1), taskLogs);
}
@Test
public void test_taskLog_fetch_withNegativeRange() throws IOException
{
EasyMock.reset(s3Client);
String logPath = TEST_PREFIX + "/" + KEY_1 + "/log";
ObjectMetadata objectMetadata = new ObjectMetadata();
objectMetadata.setContentLength(LOG_CONTENTS.length());
EasyMock.expect(s3Client.getObjectMetadata(TEST_BUCKET, logPath)).andReturn(objectMetadata);
S3Object s3Object = new S3Object();
s3Object.setObjectContent(new ByteArrayInputStream(LOG_CONTENTS.substring(1).getBytes(StandardCharsets.UTF_8)));
GetObjectRequest getObjectRequest = new GetObjectRequest(TEST_BUCKET, logPath);
getObjectRequest.setRange(1, LOG_CONTENTS.length() - 1);
getObjectRequest.withMatchingETagConstraint(objectMetadata.getETag());
EasyMock.expect(s3Client.getObject(getObjectRequest)).andReturn(s3Object);
EasyMock.replay(s3Client);
S3TaskLogs s3TaskLogs = getS3TaskLogs();
Optional<InputStream> inputStreamOptional = s3TaskLogs.streamTaskLog(KEY_1, -1 * (LOG_CONTENTS.length() - 1));
String taskLogs = new BufferedReader(
new InputStreamReader(inputStreamOptional.get(), StandardCharsets.UTF_8))
.lines()
.collect(Collectors.joining("\n"));
Assert.assertEquals(LOG_CONTENTS.substring(1), taskLogs);
}
@Test
public void test_report_fetch() throws IOException
{
EasyMock.reset(s3Client);
String logPath = TEST_PREFIX + "/" + KEY_1 + "/report.json";
ObjectMetadata objectMetadata = new ObjectMetadata();
objectMetadata.setContentLength(REPORT_CONTENTS.length());
EasyMock.expect(s3Client.getObjectMetadata(TEST_BUCKET, logPath)).andReturn(objectMetadata);
S3Object s3Object = new S3Object();
s3Object.setObjectContent(new ByteArrayInputStream(REPORT_CONTENTS.getBytes(StandardCharsets.UTF_8)));
GetObjectRequest getObjectRequest = new GetObjectRequest(TEST_BUCKET, logPath);
getObjectRequest.setRange(0, REPORT_CONTENTS.length() - 1);
getObjectRequest.withMatchingETagConstraint(objectMetadata.getETag());
EasyMock.expect(s3Client.getObject(getObjectRequest)).andReturn(s3Object);
EasyMock.replay(s3Client);
S3TaskLogs s3TaskLogs = getS3TaskLogs();
Optional<InputStream> inputStreamOptional = s3TaskLogs.streamTaskReports(KEY_1);
String report = new BufferedReader(
new InputStreamReader(inputStreamOptional.get(), StandardCharsets.UTF_8))
.lines()
.collect(Collectors.joining("\n"));
Assert.assertEquals(REPORT_CONTENTS, report);
}
@Nonnull
private S3TaskLogs getS3TaskLogs()
{
S3TaskLogsConfig config = new S3TaskLogsConfig();
config.setS3Bucket(TEST_BUCKET);
config.setS3Prefix(TEST_PREFIX);
S3InputDataConfig inputDataConfig = new S3InputDataConfig();
inputDataConfig.setMaxListingLength(MAX_KEYS);
S3TaskLogs s3TaskLogs = new S3TaskLogs(s3Client, config, inputDataConfig, timeSupplier);
return s3TaskLogs;
}
private List<Grant> testPushInternal(boolean disableAcl, String ownerId, String ownerDisplayName) throws Exception
{
EasyMock.expect(s3Client.putObject(EasyMock.anyObject()))

View File

@ -20,7 +20,6 @@
package org.apache.druid.indexing.common.tasklogs;
import com.google.common.base.Optional;
import com.google.common.io.ByteSource;
import com.google.common.io.Files;
import com.google.inject.Inject;
import org.apache.druid.indexing.common.config.FileTaskLogsConfig;
@ -67,40 +66,22 @@ public class FileTaskLogs implements TaskLogs
}
@Override
public Optional<ByteSource> streamTaskLog(final String taskid, final long offset)
public Optional<InputStream> streamTaskLog(final String taskid, final long offset) throws IOException
{
final File file = fileForTask(taskid, "log");
if (file.exists()) {
return Optional.of(
new ByteSource()
{
@Override
public InputStream openStream() throws IOException
{
return LogUtils.streamFile(file, offset);
}
}
);
return Optional.of(LogUtils.streamFile(file, offset));
} else {
return Optional.absent();
}
}
@Override
public Optional<ByteSource> streamTaskReports(final String taskid)
public Optional<InputStream> streamTaskReports(final String taskid) throws IOException
{
final File file = fileForTask(taskid, "report.json");
if (file.exists()) {
return Optional.of(
new ByteSource()
{
@Override
public InputStream openStream() throws IOException
{
return LogUtils.streamFile(file, 0);
}
}
);
return Optional.of(LogUtils.streamFile(file, 0));
} else {
return Optional.absent();
}

View File

@ -21,11 +21,12 @@ package org.apache.druid.indexing.common.tasklogs;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import com.google.common.io.ByteSource;
import com.google.inject.Inject;
import com.google.inject.name.Named;
import org.apache.druid.tasklogs.TaskLogStreamer;
import java.io.IOException;
import java.io.InputStream;
import java.util.List;
/**
@ -33,37 +34,93 @@ import java.util.List;
*/
public class SwitchingTaskLogStreamer implements TaskLogStreamer
{
private final List<TaskLogStreamer> providers;
private final TaskLogStreamer taskRunnerTaskLogStreamer;
private final List<TaskLogStreamer> deepStorageStreamers;
@Inject
public SwitchingTaskLogStreamer(List<TaskLogStreamer> providers)
public SwitchingTaskLogStreamer(
@Named("taskstreamer") TaskLogStreamer taskRunnerTaskLogStreamer,
List<TaskLogStreamer> deepStorageStreamer
)
{
this.providers = ImmutableList.copyOf(providers);
this.taskRunnerTaskLogStreamer = taskRunnerTaskLogStreamer;
this.deepStorageStreamers = ImmutableList.copyOf(deepStorageStreamer);
}
@Override
public Optional<ByteSource> streamTaskLog(String taskid, long offset) throws IOException
public Optional<InputStream> streamTaskLog(String taskid, long offset) throws IOException
{
for (TaskLogStreamer provider : providers) {
final Optional<ByteSource> stream = provider.streamTaskLog(taskid, offset);
IOException deferIOException = null;
try {
final Optional<InputStream> stream = taskRunnerTaskLogStreamer.streamTaskLog(taskid, offset);
if (stream.isPresent()) {
return stream;
}
}
catch (IOException e) {
// defer first IO exception due to race in the way tasks update their exit status in the overlord
// It may happen that the task sent the log to deep storage but is still running with http chat handlers unregistered
// In such a case, catch and ignore the 1st IOException and try deepStorage for the log. If the log is still not found, return the caught exception
deferIOException = e;
}
for (TaskLogStreamer provider : deepStorageStreamers) {
try {
final Optional<InputStream> stream = provider.streamTaskLog(taskid, offset);
if (stream.isPresent()) {
return stream;
}
}
catch (IOException e) {
if (deferIOException != null) {
e.addSuppressed(deferIOException);
}
throw e;
}
}
// Could not find any InputStream. Throw deferred exception if exists
if (deferIOException != null) {
throw deferIOException;
}
return Optional.absent();
}
@Override
public Optional<ByteSource> streamTaskReports(String taskid) throws IOException
public Optional<InputStream> streamTaskReports(String taskid) throws IOException
{
for (TaskLogStreamer provider : providers) {
final Optional<ByteSource> stream = provider.streamTaskReports(taskid);
IOException deferIOException = null;
try {
final Optional<InputStream> stream = taskRunnerTaskLogStreamer.streamTaskReports(taskid);
if (stream.isPresent()) {
return stream;
}
}
catch (IOException e) {
// defer first IO exception due to race in the way tasks update their exit status in the overlord
// It may happen that the task sent the report to deep storage but the task is still running with http chat handlers unregistered
// In such a case, catch and ignore the 1st IOException and try deepStorage for the report. If the report is still not found, return the caught exception
deferIOException = e;
}
for (TaskLogStreamer provider : deepStorageStreamers) {
try {
final Optional<InputStream> stream = provider.streamTaskReports(taskid);
if (stream.isPresent()) {
return stream;
}
}
catch (IOException e) {
if (deferIOException != null) {
e.addSuppressed(deferIOException);
}
throw e;
}
}
// Could not find any InputStream. Throw deferred exception if exists
if (deferIOException != null) {
throw deferIOException;
}
return Optional.absent();
}
}

View File

@ -20,13 +20,13 @@
package org.apache.druid.indexing.common.tasklogs;
import com.google.common.base.Optional;
import com.google.common.io.ByteSource;
import com.google.inject.Inject;
import org.apache.druid.indexing.overlord.TaskMaster;
import org.apache.druid.indexing.overlord.TaskRunner;
import org.apache.druid.tasklogs.TaskLogStreamer;
import java.io.IOException;
import java.io.InputStream;
/**
*/
@ -41,7 +41,7 @@ public class TaskRunnerTaskLogStreamer implements TaskLogStreamer
}
@Override
public Optional<ByteSource> streamTaskLog(String taskid, long offset) throws IOException
public Optional<InputStream> streamTaskLog(String taskid, long offset) throws IOException
{
final TaskRunner runner = taskMaster.getTaskRunner().orNull();
if (runner instanceof TaskLogStreamer) {
@ -52,7 +52,7 @@ public class TaskRunnerTaskLogStreamer implements TaskLogStreamer
}
@Override
public Optional<ByteSource> streamTaskReports(String taskId) throws IOException
public Optional<InputStream> streamTaskReports(String taskId) throws IOException
{
final TaskRunner runner = taskMaster.getTaskRunner().orNull();
if (runner instanceof TaskLogStreamer) {

View File

@ -33,7 +33,6 @@ import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import com.google.common.io.ByteSink;
import com.google.common.io.ByteSource;
import com.google.common.io.ByteStreams;
import com.google.common.io.FileWriteMode;
import com.google.common.io.Files;
@ -652,7 +651,7 @@ public class ForkingTaskRunner
}
@Override
public Optional<ByteSource> streamTaskLog(final String taskid, final long offset)
public Optional<InputStream> streamTaskLog(final String taskid, final long offset) throws IOException
{
final ProcessHolder processHolder;
@ -664,17 +663,7 @@ public class ForkingTaskRunner
return Optional.absent();
}
}
return Optional.of(
new ByteSource()
{
@Override
public InputStream openStream() throws IOException
{
return LogUtils.streamFile(processHolder.logFile, offset);
}
}
);
return Optional.of(LogUtils.streamFile(processHolder.logFile, offset));
}
/**

View File

@ -34,7 +34,6 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.io.ByteSource;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
@ -611,7 +610,7 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
}
@Override
public Optional<ByteSource> streamTaskLog(final String taskId, final long offset)
public Optional<InputStream> streamTaskLog(final String taskId, final long offset) throws IOException
{
final ZkWorker zkWorker = findWorkerRunningTask(taskId);
@ -626,34 +625,26 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
taskId,
Long.toString(offset)
);
return Optional.of(
new ByteSource()
{
@Override
public InputStream openStream() throws IOException
{
try {
return httpClient.go(
new Request(HttpMethod.GET, url),
new InputStreamResponseHandler()
).get();
}
catch (InterruptedException e) {
throw new RuntimeException(e);
}
catch (ExecutionException e) {
// Unwrap if possible
Throwables.propagateIfPossible(e.getCause(), IOException.class);
throw new RuntimeException(e);
}
}
}
);
try {
return Optional.of(httpClient.go(
new Request(HttpMethod.GET, url),
new InputStreamResponseHandler()
).get());
}
catch (InterruptedException e) {
throw new RuntimeException(e);
}
catch (ExecutionException e) {
// Unwrap if possible
Throwables.propagateIfPossible(e.getCause(), IOException.class);
throw new RuntimeException(e);
}
}
}
@Override
public Optional<ByteSource> streamTaskReports(final String taskId)
public Optional<InputStream> streamTaskReports(final String taskId) throws IOException
{
final ZkWorker zkWorker = findWorkerRunningTask(taskId);
@ -681,31 +672,24 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
"/druid/worker/v1/chat/%s/liveReports",
taskId
);
return Optional.of(
new ByteSource()
{
@Override
public InputStream openStream() throws IOException
{
try {
return httpClient.go(
new Request(HttpMethod.GET, url),
new InputStreamResponseHandler()
).get();
}
catch (InterruptedException e) {
throw new RuntimeException(e);
}
catch (ExecutionException e) {
// Unwrap if possible
Throwables.propagateIfPossible(e.getCause(), IOException.class);
throw new RuntimeException(e);
}
}
}
);
try {
return Optional.of(httpClient.go(
new Request(HttpMethod.GET, url),
new InputStreamResponseHandler()
).get());
}
catch (InterruptedException e) {
throw new RuntimeException(e);
}
catch (ExecutionException e) {
// Unwrap if possible
Throwables.propagateIfPossible(e.getCause(), IOException.class);
throw new RuntimeException(e);
}
}
/**
* Adds a task to the pending queue.
* {@link #runPendingTasks()} should be called to run the pending task.

View File

@ -24,7 +24,6 @@ import com.google.common.base.Joiner;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.io.ByteSource;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
@ -61,6 +60,7 @@ import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.io.File;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
@ -128,7 +128,7 @@ public class ThreadingTaskRunner
}
@Override
public Optional<ByteSource> streamTaskLog(String taskid, long offset)
public Optional<InputStream> streamTaskLog(String taskid, long offset)
{
// task logs will appear in the main indexer log, streaming individual task logs is not supported
return Optional.absent();

View File

@ -32,7 +32,6 @@ import com.google.common.collect.Collections2;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.google.common.io.ByteSource;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
@ -962,7 +961,7 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
}
@Override
public Optional<ByteSource> streamTaskLog(String taskId, long offset)
public Optional<InputStream> streamTaskLog(String taskId, long offset) throws IOException
{
@SuppressWarnings("GuardedBy") // Read on tasks is safe
HttpRemoteTaskRunnerWorkItem taskRunnerWorkItem = tasks.get(taskId);
@ -982,34 +981,26 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
taskId,
Long.toString(offset)
);
return Optional.of(
new ByteSource()
{
@Override
public InputStream openStream() throws IOException
{
try {
return httpClient.go(
new Request(HttpMethod.GET, url),
new InputStreamResponseHandler()
).get();
}
catch (InterruptedException e) {
throw new RuntimeException(e);
}
catch (ExecutionException e) {
// Unwrap if possible
Throwables.propagateIfPossible(e.getCause(), IOException.class);
throw new RuntimeException(e);
}
}
}
);
try {
return Optional.of(httpClient.go(
new Request(HttpMethod.GET, url),
new InputStreamResponseHandler()
).get());
}
catch (InterruptedException e) {
throw new RuntimeException(e);
}
catch (ExecutionException e) {
// Unwrap if possible
Throwables.propagateIfPossible(e.getCause(), IOException.class);
throw new RuntimeException(e);
}
}
}
@Override
public Optional<ByteSource> streamTaskReports(String taskId)
public Optional<InputStream> streamTaskReports(String taskId) throws IOException
{
@SuppressWarnings("GuardedBy") // Read on tasks is safe
HttpRemoteTaskRunnerWorkItem taskRunnerWorkItem = tasks.get(taskId);
@ -1035,29 +1026,21 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
"/druid/worker/v1/chat/%s/liveReports",
taskId
);
return Optional.of(
new ByteSource()
{
@Override
public InputStream openStream() throws IOException
{
try {
return httpClient.go(
new Request(HttpMethod.GET, url),
new InputStreamResponseHandler()
).get();
}
catch (InterruptedException e) {
throw new RuntimeException(e);
}
catch (ExecutionException e) {
// Unwrap if possible
Throwables.propagateIfPossible(e.getCause(), IOException.class);
throw new RuntimeException(e);
}
}
}
);
try {
return Optional.of(httpClient.go(
new Request(HttpMethod.GET, url),
new InputStreamResponseHandler()
).get());
}
catch (InterruptedException e) {
throw new RuntimeException(e);
}
catch (ExecutionException e) {
// Unwrap if possible
Throwables.propagateIfPossible(e.getCause(), IOException.class);
throw new RuntimeException(e);
}
}
}

View File

@ -25,7 +25,6 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.io.ByteSource;
import com.google.inject.Inject;
import com.sun.jersey.spi.container.ResourceFilters;
import org.apache.druid.audit.AuditEntry;
@ -100,6 +99,7 @@ import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@ -1008,9 +1008,9 @@ public class OverlordResource
)
{
try {
final Optional<ByteSource> stream = taskLogStreamer.streamTaskLog(taskid, offset);
final Optional<InputStream> stream = taskLogStreamer.streamTaskLog(taskid, offset);
if (stream.isPresent()) {
return Response.ok(stream.get().openStream()).build();
return Response.ok(stream.get()).build();
} else {
return Response.status(Response.Status.NOT_FOUND)
.entity(
@ -1035,9 +1035,9 @@ public class OverlordResource
)
{
try {
final Optional<ByteSource> stream = taskLogStreamer.streamTaskReports(taskid);
final Optional<InputStream> stream = taskLogStreamer.streamTaskReports(taskid);
if (stream.isPresent()) {
return Response.ok(stream.get().openStream()).build();
return Response.ok(stream.get()).build();
} else {
return Response.status(Response.Status.NOT_FOUND)
.entity(

View File

@ -24,7 +24,6 @@ import com.google.common.base.Optional;
import com.google.common.collect.Collections2;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.io.ByteSource;
import com.google.inject.Inject;
import com.google.inject.Provider;
import com.sun.jersey.spi.container.ResourceFilters;
@ -53,6 +52,7 @@ import javax.ws.rs.QueryParam;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import java.io.IOException;
import java.io.InputStream;
/**
*/
@ -214,10 +214,10 @@ public class WorkerResource
.build();
}
try {
final Optional<ByteSource> stream = ((TaskLogStreamer) taskRunner).streamTaskLog(taskId, offset);
final Optional<InputStream> stream = ((TaskLogStreamer) taskRunner).streamTaskLog(taskId, offset);
if (stream.isPresent()) {
return Response.ok(stream.get().openStream()).build();
return Response.ok(stream.get()).build();
} else {
return Response.status(Response.Status.NOT_FOUND).build();
}

View File

@ -63,7 +63,7 @@ public class FileTaskLogsTest
final Map<Long, String> expected = ImmutableMap.of(0L, "blah", 1L, "lah", -2L, "ah", -5L, "blah");
for (Map.Entry<Long, String> entry : expected.entrySet()) {
final byte[] bytes = ByteStreams.toByteArray(taskLogs.streamTaskLog("foo", entry.getKey()).get().openStream());
final byte[] bytes = ByteStreams.toByteArray(taskLogs.streamTaskLog("foo", entry.getKey()).get());
final String string = StringUtils.fromUtf8(bytes);
Assert.assertEquals(StringUtils.format("Read with offset %,d", entry.getKey()), string, entry.getValue());
}
@ -91,7 +91,7 @@ public class FileTaskLogsTest
Assert.assertEquals(
testReportString,
StringUtils.fromUtf8(ByteStreams.toByteArray(taskLogs.streamTaskReports("foo").get().openStream()))
StringUtils.fromUtf8(ByteStreams.toByteArray(taskLogs.streamTaskReports("foo").get()))
);
}
@ -147,7 +147,7 @@ public class FileTaskLogsTest
private String readLog(TaskLogs taskLogs, String logFile, long offset) throws IOException
{
return StringUtils.fromUtf8(ByteStreams.toByteArray(taskLogs.streamTaskLog(logFile, offset).get().openStream()));
return StringUtils.fromUtf8(ByteStreams.toByteArray(taskLogs.streamTaskLog(logFile, offset).get()));
}
private static class TestTaskReport implements TaskReport

View File

@ -0,0 +1,238 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.common.tasklogs;
import com.google.common.base.Optional;
import com.google.common.io.ByteStreams;
import org.apache.druid.java.util.common.IOE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.tasklogs.NoopTaskLogs;
import org.apache.druid.tasklogs.TaskLogStreamer;
import org.junit.Assert;
import org.junit.Test;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collections;
public class SwitchingTaskLogStreamerTest
{
private static final String LOG = "LOG";
private static final String REPORT = "REPORT";
private static final String TASK_ID = "foo";
private final TaskLogStreamer streamer1 = new TestTaskLogStreamer(1);
private final TaskLogStreamer streamer2 = new TestTaskLogStreamer(2);
private final TaskLogStreamer emptyStreamer = new NoopTaskLogs();
private final TaskLogStreamer ioExceptionStreamer = new TaskLogStreamer()
{
@Override
public Optional<InputStream> streamTaskLog(String taskid, long offset) throws IOException
{
throw new IOE("expected log exception");
}
@Override
public Optional<InputStream> streamTaskReports(String taskid) throws IOException
{
throw new IOE("expected task exception");
}
};
@Test
public void foundInRemoteTasks() throws IOException
{
SwitchingTaskLogStreamer switchingTaskLogStreamer = new SwitchingTaskLogStreamer(
streamer1,
Arrays.asList(
streamer2,
emptyStreamer
)
);
Assert.assertEquals(
getLogString(1, TASK_ID, 1),
StringUtils.fromUtf8(ByteStreams.toByteArray(switchingTaskLogStreamer.streamTaskLog(TASK_ID, 1).get()))
);
Assert.assertEquals(
getReportString(1, TASK_ID),
StringUtils.fromUtf8(ByteStreams.toByteArray(switchingTaskLogStreamer.streamTaskReports(TASK_ID).get()))
);
}
@Test
public void foundInDeepStorage() throws IOException
{
SwitchingTaskLogStreamer switchingTaskLogStreamer = new SwitchingTaskLogStreamer(
emptyStreamer,
Arrays.asList(
streamer2,
emptyStreamer
)
);
Assert.assertEquals(
getLogString(2, TASK_ID, 1),
StringUtils.fromUtf8(ByteStreams.toByteArray(switchingTaskLogStreamer.streamTaskLog(TASK_ID, 1).get()))
);
Assert.assertEquals(
getReportString(2, TASK_ID),
StringUtils.fromUtf8(ByteStreams.toByteArray(switchingTaskLogStreamer.streamTaskReports(TASK_ID).get()))
);
}
@Test
public void exceptionInTaskStreamerButFoundInDeepStrorage() throws IOException
{
SwitchingTaskLogStreamer switchingTaskLogStreamer = new SwitchingTaskLogStreamer(
ioExceptionStreamer,
Arrays.asList(
streamer2,
emptyStreamer
)
);
Assert.assertEquals(
getLogString(2, TASK_ID, 1),
StringUtils.fromUtf8(ByteStreams.toByteArray(switchingTaskLogStreamer.streamTaskLog(TASK_ID, 1).get()))
);
Assert.assertEquals(
getReportString(2, TASK_ID),
StringUtils.fromUtf8(ByteStreams.toByteArray(switchingTaskLogStreamer.streamTaskReports(TASK_ID).get()))
);
}
@Test
public void exceptionInDeepStrorage()
{
SwitchingTaskLogStreamer switchingTaskLogStreamer = new SwitchingTaskLogStreamer(
emptyStreamer,
Arrays.asList(
ioExceptionStreamer,
streamer2
)
);
Assert.assertThrows("expected log exception", IOException.class, () ->
StringUtils.fromUtf8(ByteStreams.toByteArray(switchingTaskLogStreamer.streamTaskLog(TASK_ID, 1).get()))
);
Assert.assertThrows("expected report exception", IOException.class, () ->
StringUtils.fromUtf8(ByteStreams.toByteArray(switchingTaskLogStreamer.streamTaskReports(TASK_ID).get()))
);
}
@Test
public void exceptionInRemoteTaskLogStreamerWithEmptyDeepStorage()
{
SwitchingTaskLogStreamer switchingTaskLogStreamer = new SwitchingTaskLogStreamer(
ioExceptionStreamer,
Collections.singletonList(
emptyStreamer
)
);
Assert.assertThrows("expected log exception", IOException.class, () ->
StringUtils.fromUtf8(ByteStreams.toByteArray(switchingTaskLogStreamer.streamTaskLog(TASK_ID, 1).get()))
);
Assert.assertThrows("expected report exception", IOException.class, () ->
StringUtils.fromUtf8(ByteStreams.toByteArray(switchingTaskLogStreamer.streamTaskReports(TASK_ID).get()))
);
}
@Test
public void exceptionEverywhere()
{
SwitchingTaskLogStreamer switchingTaskLogStreamer = new SwitchingTaskLogStreamer(
ioExceptionStreamer,
Collections.singletonList(
ioExceptionStreamer
)
);
Assert.assertThrows("expected log exception", IOException.class, () ->
StringUtils.fromUtf8(ByteStreams.toByteArray(switchingTaskLogStreamer.streamTaskLog(TASK_ID, 1).get()))
);
Assert.assertThrows("expected report exception", IOException.class, () ->
StringUtils.fromUtf8(ByteStreams.toByteArray(switchingTaskLogStreamer.streamTaskReports(TASK_ID).get()))
);
}
@Test
public void empty() throws IOException
{
SwitchingTaskLogStreamer switchingTaskLogStreamer = new SwitchingTaskLogStreamer(
emptyStreamer,
Collections.singletonList(
emptyStreamer
)
);
Assert.assertFalse(switchingTaskLogStreamer.streamTaskLog(TASK_ID, 1).isPresent());
Assert.assertFalse(switchingTaskLogStreamer.streamTaskReports(TASK_ID).isPresent());
}
private static String getLogString(int id, String taskid, long offset)
{
return StringUtils.format(
LOG + " with id %d, task %s and offset %d",
id,
taskid,
offset
);
}
private static String getReportString(int id, String taskid)
{
return StringUtils.format(
REPORT + " with id %d, task %s",
id,
taskid
);
}
private static class TestTaskLogStreamer implements TaskLogStreamer
{
private final int id;
public TestTaskLogStreamer(int id)
{
this.id = id;
}
@Override
public Optional<InputStream> streamTaskLog(String taskid, long offset)
{
return Optional.of(new ByteArrayInputStream(getLogString(id, taskid, offset).getBytes(StandardCharsets.UTF_8)));
}
@Override
public Optional<InputStream> streamTaskReports(String taskid)
{
return Optional.of(new ByteArrayInputStream(getReportString(id, taskid).getBytes(StandardCharsets.UTF_8)));
}
}
}

View File

@ -1116,7 +1116,7 @@ public class RemoteTaskRunnerTest
);
// Stream task reports from a running task.
final InputStream in = remoteTaskRunner.streamTaskReports(task.getId()).get().openStream();
final InputStream in = remoteTaskRunner.streamTaskReports(task.getId()).get();
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
ByteStreams.copy(in, baos);
Assert.assertEquals(reportString, StringUtils.fromUtf8(baos.toByteArray()));

View File

@ -198,11 +198,15 @@ public class CliOverlord extends ServerRunnable
)
.toProvider(
new ListProvider<TaskLogStreamer>()
.add(TaskRunnerTaskLogStreamer.class)
.add(TaskLogs.class)
)
.in(LazySingleton.class);
binder.bind(TaskLogStreamer.class)
.annotatedWith(Names.named("taskstreamer"))
.to(TaskRunnerTaskLogStreamer.class)
.in(LazySingleton.class);
binder.bind(TaskActionClientFactory.class).to(LocalTaskActionClientFactory.class).in(LazySingleton.class);
binder.bind(TaskActionToolbox.class).in(LazySingleton.class);
binder.bind(TaskLockbox.class).in(LazySingleton.class);