Emit metrics for S3UploadThreadPool (#16616)

* Emit metrics for S3UploadThreadPool

* Address review comments

* Revert unnecessary formatting change

* Revert unnecessary formatting change in metrics.md file

* Address review comments

* Add metric for task duration

* Minor fix in metrics.md

* Add s3Key and uploadId in the log message

* Address review comments

* Create new instance of ServiceMetricEvent.Builder for thread safety

* Address review comments

* Address review comments
This commit is contained in:
Akshat Jain 2024-06-21 11:36:47 +05:30 committed by GitHub
parent 35709de549
commit cd438b1918
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 104 additions and 31 deletions

View File

@ -508,6 +508,19 @@ These metrics are only available if the `OshiSysMonitor` module is included.
|`sys/tcpv4/out/rsts`|Total "out reset" packets sent to reset the connection||Generally 0|
|`sys/tcpv4/retrans/segs`|Total segments re-transmitted||Varies|
## S3 multi-part upload
These metrics are only available if the `druid-s3-extensions` module is included and if certain specific features are being used: MSQ export to S3, durable intermediate storage on S3.
|Metric|Description|Dimensions|Normal value|
|------|-----------|----------|------------|
|`s3/upload/part/queueSize`|Number of items currently waiting in queue to be uploaded to S3. Each item in the queue corresponds to a single part in a multi-part upload.||Varies|
|`s3/upload/part/queuedTime`|Milliseconds spent by a single item (or part) in queue before it starts getting uploaded to S3.|`uploadId`, `partNumber`|Varies|
|`s3/upload/part/time`|Milliseconds taken to upload a single part of a multi-part upload to S3.|`uploadId`, `partNumber`|Varies|
|`s3/upload/total/time`|Milliseconds taken for uploading all parts of a multi-part upload to S3.|`uploadId`|Varies|
|`s3/upload/total/bytes`|Total bytes uploaded to S3 during a multi-part upload.|`uploadId`|Varies|
## Cgroup
These metrics are available on operating systems with the cgroup kernel feature. All the values are derived by reading from `/sys/fs/cgroup`.

View File

@ -32,6 +32,7 @@ import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.common.RetryUtils;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.storage.s3.S3Utils;
import org.apache.druid.storage.s3.ServerSideEncryptingAmazonS3;
@ -69,6 +70,11 @@ import java.util.concurrent.TimeUnit;
*/
public class RetryableS3OutputStream extends OutputStream
{
// Metric related constants.
private static final String METRIC_PREFIX = "s3/upload/total/";
private static final String METRIC_TOTAL_UPLOAD_TIME = METRIC_PREFIX + "time";
private static final String METRIC_TOTAL_UPLOAD_BYTES = METRIC_PREFIX + "bytes";
private static final Logger LOG = new Logger(RetryableS3OutputStream.class);
private final S3OutputConfig config;
@ -208,14 +214,20 @@ public class RetryableS3OutputStream extends OutputStream
org.apache.commons.io.FileUtils.forceDelete(chunkStorePath);
LOG.info("Deleted chunkStorePath[%s]", chunkStorePath);
// This should be emitted as a metric
long totalChunkSize = (currentChunk.id - 1) * chunkSize + currentChunk.length();
final long totalBytesUploaded = (currentChunk.id - 1) * chunkSize + currentChunk.length();
final long totalUploadTimeMillis = pushStopwatch.elapsed(TimeUnit.MILLISECONDS);
LOG.info(
"Pushed total [%d] parts containing [%d] bytes in [%d]ms.",
"Pushed total [%d] parts containing [%d] bytes in [%d]ms for s3Key[%s], uploadId[%s].",
futures.size(),
totalChunkSize,
pushStopwatch.elapsed(TimeUnit.MILLISECONDS)
totalBytesUploaded,
totalUploadTimeMillis,
s3Key,
uploadId
);
final ServiceMetricEvent.Builder builder = new ServiceMetricEvent.Builder().setDimension("uploadId", uploadId);
uploadManager.emitMetric(builder.setMetric(METRIC_TOTAL_UPLOAD_TIME, totalUploadTimeMillis));
uploadManager.emitMetric(builder.setMetric(METRIC_TOTAL_UPLOAD_BYTES, totalBytesUploaded));
});
try (Closer ignored = closer) {

View File

@ -25,10 +25,13 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.inject.Inject;
import org.apache.druid.guice.ManageLifecycle;
import org.apache.druid.java.util.common.RetryUtils;
import org.apache.druid.java.util.common.Stopwatch;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.storage.s3.S3Utils;
import org.apache.druid.storage.s3.ServerSideEncryptingAmazonS3;
import org.apache.druid.utils.RuntimeInfo;
@ -36,6 +39,7 @@ import org.apache.druid.utils.RuntimeInfo;
import java.io.File;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
/**
* This class manages uploading files to S3 in chunks, while ensuring that the
@ -44,18 +48,34 @@ import java.util.concurrent.Future;
@ManageLifecycle
public class S3UploadManager
{
// Metric related constants.
private static final String METRIC_PREFIX = "s3/upload/part/";
private static final String METRIC_PART_QUEUED_TIME = METRIC_PREFIX + "queuedTime";
private static final String METRIC_QUEUE_SIZE = METRIC_PREFIX + "queueSize";
private static final String METRIC_PART_UPLOAD_TIME = METRIC_PREFIX + "time";
private final ExecutorService uploadExecutor;
private final ServiceEmitter emitter;
private static final Logger log = new Logger(S3UploadManager.class);
// For metrics regarding uploadExecutor.
private final AtomicInteger executorQueueSize = new AtomicInteger(0);
@Inject
public S3UploadManager(S3OutputConfig s3OutputConfig, S3ExportConfig s3ExportConfig, RuntimeInfo runtimeInfo)
public S3UploadManager(
S3OutputConfig s3OutputConfig,
S3ExportConfig s3ExportConfig,
RuntimeInfo runtimeInfo,
ServiceEmitter emitter
)
{
int poolSize = Math.max(4, runtimeInfo.getAvailableProcessors());
int maxNumChunksOnDisk = computeMaxNumChunksOnDisk(s3OutputConfig, s3ExportConfig);
this.uploadExecutor = createExecutorService(poolSize, maxNumChunksOnDisk);
log.info("Initialized executor service for S3 multipart upload with pool size [%d] and work queue capacity [%d]",
poolSize, maxNumChunksOnDisk);
this.emitter = emitter;
}
/**
@ -87,25 +107,36 @@ public class S3UploadManager
S3OutputConfig config
)
{
return uploadExecutor.submit(() -> RetryUtils.retry(
() -> {
log.debug("Uploading chunk[%d] for uploadId[%s].", chunkNumber, uploadId);
UploadPartResult uploadPartResult = uploadPartIfPossible(
s3Client,
uploadId,
config.getBucket(),
key,
chunkNumber,
chunkFile
);
if (!chunkFile.delete()) {
log.warn("Failed to delete chunk [%s]", chunkFile.getAbsolutePath());
}
return uploadPartResult;
},
S3Utils.S3RETRY,
config.getMaxRetry()
));
final Stopwatch stopwatch = Stopwatch.createStarted();
executorQueueSize.incrementAndGet();
return uploadExecutor.submit(() -> {
final ServiceMetricEvent.Builder metricBuilder = new ServiceMetricEvent.Builder();
emitMetric(metricBuilder.setMetric(METRIC_QUEUE_SIZE, executorQueueSize.decrementAndGet()));
metricBuilder.setDimension("uploadId", uploadId).setDimension("partNumber", chunkNumber);
emitMetric(metricBuilder.setMetric(METRIC_PART_QUEUED_TIME, stopwatch.millisElapsed()));
stopwatch.restart();
return RetryUtils.retry(
() -> {
log.debug("Uploading chunk[%d] for uploadId[%s].", chunkNumber, uploadId);
UploadPartResult uploadPartResult = uploadPartIfPossible(
s3Client,
uploadId,
config.getBucket(),
key,
chunkNumber,
chunkFile
);
if (!chunkFile.delete()) {
log.warn("Failed to delete chunk [%s]", chunkFile.getAbsolutePath());
}
emitMetric(metricBuilder.setMetric(METRIC_PART_UPLOAD_TIME, stopwatch.millisElapsed()));
return uploadPartResult;
},
S3Utils.S3RETRY,
config.getMaxRetry()
);
});
}
@VisibleForTesting
@ -149,4 +180,8 @@ public class S3UploadManager
uploadExecutor.shutdown();
}
protected void emitMetric(ServiceMetricEvent.Builder builder)
{
emitter.emit(builder);
}
}

View File

@ -32,6 +32,7 @@ import org.apache.druid.guice.JsonConfigProvider;
import org.apache.druid.guice.LazySingleton;
import org.apache.druid.guice.StartupInjectorBuilder;
import org.apache.druid.java.util.common.HumanReadableBytes;
import org.apache.druid.java.util.metrics.StubServiceEmitter;
import org.apache.druid.query.DruidProcessingConfigTest;
import org.apache.druid.storage.StorageConnector;
import org.apache.druid.storage.StorageConnectorModule;
@ -158,7 +159,8 @@ public class S3StorageConnectorProviderTest
new S3UploadManager(
new S3OutputConfig("bucket", "prefix", EasyMock.mock(File.class), new HumanReadableBytes("5MiB"), 1),
new S3ExportConfig("tempDir", new HumanReadableBytes("5MiB"), 1, null),
new DruidProcessingConfigTest.MockRuntimeInfo(10, 0, 0))
new DruidProcessingConfigTest.MockRuntimeInfo(10, 0, 0),
new StubServiceEmitter())
)
);

View File

@ -33,6 +33,7 @@ import com.amazonaws.services.s3.model.UploadPartResult;
import org.apache.druid.java.util.common.HumanReadableBytes;
import org.apache.druid.java.util.common.IOE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.metrics.StubServiceEmitter;
import org.apache.druid.query.DruidProcessingConfigTest;
import org.apache.druid.storage.s3.NoopServerSideEncryption;
import org.apache.druid.storage.s3.ServerSideEncryptingAmazonS3;
@ -105,7 +106,8 @@ public class RetryableS3OutputStreamTest
s3UploadManager = new S3UploadManager(
new S3OutputConfig("bucket", "prefix", EasyMock.mock(File.class), new HumanReadableBytes("5MiB"), 1),
new S3ExportConfig("tempDir", new HumanReadableBytes("5MiB"), 1, null),
new DruidProcessingConfigTest.MockRuntimeInfo(10, 0, 0));
new DruidProcessingConfigTest.MockRuntimeInfo(10, 0, 0),
new StubServiceEmitter());
}
@Test

View File

@ -32,6 +32,7 @@ import com.amazonaws.services.s3.model.S3ObjectSummary;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import org.apache.druid.java.util.common.HumanReadableBytes;
import org.apache.druid.java.util.metrics.StubServiceEmitter;
import org.apache.druid.query.DruidProcessingConfigTest;
import org.apache.druid.storage.StorageConnector;
import org.apache.druid.storage.s3.NoopServerSideEncryption;
@ -90,7 +91,8 @@ public class S3StorageConnectorTest
storageConnector = new S3StorageConnector(s3OutputConfig, service, new S3UploadManager(
s3OutputConfig,
new S3ExportConfig("tempDir", new HumanReadableBytes("5MiB"), 1, null),
new DruidProcessingConfigTest.MockRuntimeInfo(10, 0, 0)));
new DruidProcessingConfigTest.MockRuntimeInfo(10, 0, 0),
new StubServiceEmitter()));
}
catch (IOException e) {
throw new RuntimeException(e);

View File

@ -22,6 +22,7 @@ package org.apache.druid.storage.s3.output;
import com.amazonaws.services.s3.model.UploadPartRequest;
import com.amazonaws.services.s3.model.UploadPartResult;
import org.apache.druid.java.util.common.HumanReadableBytes;
import org.apache.druid.java.util.metrics.StubServiceEmitter;
import org.apache.druid.query.DruidProcessingConfigTest;
import org.apache.druid.storage.s3.ServerSideEncryptingAmazonS3;
import org.apache.druid.utils.RuntimeInfo;
@ -43,14 +44,16 @@ public class S3UploadManagerTest
private S3UploadManager s3UploadManager;
private S3OutputConfig s3OutputConfig;
private S3ExportConfig s3ExportConfig;
private StubServiceEmitter serviceEmitter;
@Before
public void setUp()
{
s3OutputConfig = new S3OutputConfig("bucket", "prefix", EasyMock.mock(File.class), new HumanReadableBytes("100MiB"), 1);
s3ExportConfig = new S3ExportConfig("tempDir", new HumanReadableBytes("200MiB"), 1, null);
serviceEmitter = new StubServiceEmitter();
final RuntimeInfo runtimeInfo = new DruidProcessingConfigTest.MockRuntimeInfo(8, 0, 0);
s3UploadManager = new S3UploadManager(s3OutputConfig, s3ExportConfig, runtimeInfo);
s3UploadManager = new S3UploadManager(s3OutputConfig, s3ExportConfig, runtimeInfo, serviceEmitter);
}
@Test
@ -75,6 +78,10 @@ public class S3UploadManagerTest
UploadPartResult futureResult = result.get();
Assert.assertEquals(chunkId, futureResult.getPartNumber());
Assert.assertEquals("etag", futureResult.getETag());
serviceEmitter.verifyEmitted("s3/upload/part/queuedTime", 1);
serviceEmitter.verifyEmitted("s3/upload/part/queueSize", 1);
serviceEmitter.verifyEmitted("s3/upload/part/time", 1);
}
@Test

View File

@ -26,9 +26,9 @@ import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* Test implementation of {@link ServiceEmitter} that collects emitted metrics
@ -38,7 +38,7 @@ public class StubServiceEmitter extends ServiceEmitter implements MetricsVerifie
{
private final List<Event> events = new ArrayList<>();
private final List<AlertEvent> alertEvents = new ArrayList<>();
private final Map<String, List<ServiceMetricEvent>> metricEvents = new HashMap<>();
private final ConcurrentHashMap<String, List<ServiceMetricEvent>> metricEvents = new ConcurrentHashMap<>();
public StubServiceEmitter()
{