Add docs and integration tests for Auto-compaction snapshot status API (#10510)

* add docs and IT for Auto-compaction snapshot status API

* fix spellings

* fix test

* address comments
This commit is contained in:
Maytas Monsereenusorn 2020-10-14 06:42:22 -07:00 committed by GitHub
parent e8c5893c34
commit 9056d113d0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 147 additions and 3 deletions

View File

@ -410,6 +410,32 @@ Returns total size and count for each datasource for each interval within given
Returns the total size of segments awaiting compaction for the given dataSource.
This is only valid for dataSource which has compaction enabled.
##### GET
* `/druid/coordinator/v1/compaction/status`
Returns the status and statistics from the latest auto compaction run of all dataSources which have/had auto compaction enabled.
The response payload includes a list of `latestStatus` objects. Each `latestStatus` represents the status for a dataSource (which has/had auto compaction enabled).
The `latestStatus` object has the following keys:
* `dataSource`: name of the datasource for this status information
* `scheduleStatus`: auto compaction scheduling status. Possible values are `NOT_ENABLED` and `RUNNING`. Returns `RUNNING ` if the dataSource has an active auto compaction config submitted otherwise, `NOT_ENABLED`
* `bytesAwaitingCompaction`: total bytes of this datasource waiting to be compacted by the auto compaction (only consider intervals/segments that are eligible for auto compaction)
* `bytesCompacted`: total bytes of this datasource that are already compacted with the spec set in the auto compaction config.
* `bytesSkipped`: total bytes of this datasource that are skipped (not eligible for auto compaction) by the auto compaction.
* `segmentCountAwaitingCompaction`: total number of segments of this datasource waiting to be compacted by the auto compaction (only consider intervals/segments that are eligible for auto compaction)
* `segmentCountCompacted`: total number of segments of this datasource that are already compacted with the spec set in the auto compaction config.
* `segmentCountSkipped`: total number of segments of this datasource that are skipped (not eligible for auto compaction) by the auto compaction.
* `intervalCountAwaitingCompaction`: total number of intervals of this datasource waiting to be compacted by the auto compaction (only consider intervals/segments that are eligible for auto compaction)
* `intervalCountCompacted`: total number of intervals of this datasource that are already compacted with the spec set in the auto compaction config.
* `intervalCountSkipped`: total number of intervals of this datasource that are skipped (not eligible for auto compaction) by the auto compaction.
##### GET
* `/druid/coordinator/v1/compaction/status?dataSource={dataSource}`
Similar to the API `/druid/coordinator/v1/compaction/status` above but filters response to only return information for the {dataSource} given.
Note that {dataSource} given must have/had auto compaction enabled.
#### Compaction Configuration
##### GET

View File

@ -239,6 +239,18 @@ These metrics are for the Druid Coordinator and are reset each time the Coordina
|`tier/replication/factor`|Configured maximum replication factor in each tier.|tier.|Varies.|
|`tier/required/capacity`|Total capacity in bytes required in each tier.|tier.|Varies.|
|`tier/total/capacity`|Total capacity in bytes available in each tier.|tier.|Varies.|
|`compact/task/count`|Number of tasks issued in the auto compaction run.| |Varies.|
|`compactTask/maxSlot/count`|Max number of task slots that can be used for auto compaction tasks in the auto compaction run.| |Varies.|
|`compactTask/availableSlot/count`|Number of available task slots that can be used for auto compaction tasks in the auto compaction run. (this is max slot minus any currently running compaction task)| |Varies.|
|`segment/waitCompact/bytes`|Total bytes of this datasource waiting to be compacted by the auto compaction (only consider intervals/segments that are eligible for auto compaction).|datasource.|Varies.|
|`segment/waitCompact/count`|Total number of segments of this datasource waiting to be compacted by the auto compaction (only consider intervals/segments that are eligible for auto compaction).|datasource.|Varies.|
|`interval/waitCompact/count`|Total number of intervals of this datasource waiting to be compacted by the auto compaction (only consider intervals/segments that are eligible for auto compaction).|datasource.|Varies.|
|`segment/compacted/bytes`|Total bytes of this datasource that are already compacted with the spec set in the auto compaction config.|datasource.|Varies.|
|`segment/compacted/count`|Total number of segments of this datasource that are already compacted with the spec set in the auto compaction config.|datasource.|Varies.|
|`interval/compacted/count`|Total number of intervals of this datasource that are already compacted with the spec set in the auto compaction config.|datasource.|Varies.|
|`segment/skipCompact/bytes`|Total bytes of this datasource that are skipped (not eligible for auto compaction) by the auto compaction.|datasource.|Varies.|
|`segment/skipCompact/count`|Total number of segments of this datasource that are skipped (not eligible for auto compaction) by the auto compaction.|datasource.|Varies.|
|`interval/skipCompact/count`|Total number of intervals of this datasource that are skipped (not eligible for auto compaction) by the auto compaction.|datasource.|Varies.|
If `emitBalancingStats` is set to `true` in the Coordinator [dynamic configuration](
../configuration/index.html#dynamic-configuration), then [log entries](../configuration/logging.md) for class

View File

@ -36,6 +36,7 @@ import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import java.net.URL;
import java.util.List;
import java.util.Map;
public class CompactionResourceTestClient
@ -175,4 +176,21 @@ public class CompactionResourceTestClient
}
return jsonMapper.readValue(response.getContent(), new TypeReference<Map<String, String>>() {});
}
public Map<String, String> getCompactionStatus(String dataSource) throws Exception
{
String url = StringUtils.format("%scompaction/status?dataSource=%s", getCoordinatorURL(), StringUtils.urlEncode(dataSource));
StatusResponseHolder response = httpClient.go(
new Request(HttpMethod.GET, new URL(url)), responseHandler
).get();
if (!response.getStatus().equals(HttpResponseStatus.OK)) {
throw new ISE(
"Error while getting compaction status status[%s] content[%s]",
response.getStatus(),
response.getContent()
);
}
Map<String, List<Map<String, String>>> latestSnapshots = jsonMapper.readValue(response.getContent(), new TypeReference<Map<String, List<Map<String, String>>>>() {});
return latestSnapshots.get("latestStatus").get(0);
}
}

View File

@ -29,6 +29,7 @@ import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.server.coordinator.AutoCompactionSnapshot;
import org.apache.druid.server.coordinator.CoordinatorCompactionConfig;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
import org.apache.druid.server.coordinator.UserCompactionTaskQueryTuningConfig;
@ -52,6 +53,7 @@ import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
@Test(groups = {TestNGGroup.COMPACTION})
@ -97,13 +99,36 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
verifyQuery(INDEX_QUERIES_RESOURCE);
verifySegmentsCompacted(1, MAX_ROWS_PER_SEGMENT_COMPACTED);
checkCompactionIntervals(intervalsBeforeCompaction);
getAndAssertCompactionStatus(
fullDatasourceName,
AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING,
0,
14312,
0,
0,
2,
0,
0,
1,
0);
submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET);
//...compacted into 1 new segment for the remaining one day. 2 day compacted and 0 day uncompacted. (2 total)
forceTriggerAutoCompaction(2);
verifyQuery(INDEX_QUERIES_RESOURCE);
verifySegmentsCompacted(2, MAX_ROWS_PER_SEGMENT_COMPACTED);
checkCompactionIntervals(intervalsBeforeCompaction);
getAndAssertCompactionStatus(
fullDatasourceName,
AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING,
0,
22489,
0,
0,
3,
0,
0,
2,
0);
}
}
@ -200,7 +225,18 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
verifyQuery(INDEX_QUERIES_RESOURCE);
verifySegmentsCompacted(0, null);
checkCompactionIntervals(intervalsBeforeCompaction);
getAndAssertCompactionStatus(
fullDatasourceName,
AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING,
0,
0,
0,
0,
0,
0,
0,
0,
0);
// Update compaction slots to be 1
updateCompactionTaskSlot(1, 1);
// One day compacted (1 new segment) and one day remains uncompacted. (3 total)
@ -208,6 +244,18 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
verifyQuery(INDEX_QUERIES_RESOURCE);
verifySegmentsCompacted(1, MAX_ROWS_PER_SEGMENT_COMPACTED);
checkCompactionIntervals(intervalsBeforeCompaction);
getAndAssertCompactionStatus(
fullDatasourceName,
AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING,
14312,
14311,
0,
2,
2,
0,
1,
1,
0);
Assert.assertEquals(compactionResource.getCompactionProgress(fullDatasourceName).get("remainingSegmentSize"), "14312");
// Run compaction again to compact the remaining day
// Remaining day compacted (1 new segment). Now both days compacted (2 total)
@ -215,6 +263,18 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
verifyQuery(INDEX_QUERIES_RESOURCE);
verifySegmentsCompacted(2, MAX_ROWS_PER_SEGMENT_COMPACTED);
checkCompactionIntervals(intervalsBeforeCompaction);
getAndAssertCompactionStatus(
fullDatasourceName,
AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING,
0,
22489,
0,
0,
3,
0,
0,
2,
0);
}
}
@ -398,4 +458,32 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
Assert.assertEquals(coordinatorCompactionConfig.getCompactionTaskSlotRatio(), compactionTaskSlotRatio);
Assert.assertEquals(coordinatorCompactionConfig.getMaxCompactionTaskSlots(), maxCompactionTaskSlots);
}
private void getAndAssertCompactionStatus(
String fullDatasourceName,
AutoCompactionSnapshot.AutoCompactionScheduleStatus scheduleStatus,
long bytesAwaitingCompaction,
long bytesCompacted,
long bytesSkipped,
long segmentCountAwaitingCompaction,
long segmentCountCompacted,
long segmentCountSkipped,
long intervalCountAwaitingCompaction,
long intervalCountCompacted,
long intervalCountSkipped
) throws Exception
{
Map<String, String> actualStatus = compactionResource.getCompactionStatus(fullDatasourceName);
Assert.assertNotNull(actualStatus);
Assert.assertEquals(actualStatus.get("scheduleStatus"), scheduleStatus.toString());
Assert.assertEquals(Long.parseLong(actualStatus.get("bytesAwaitingCompaction")), bytesAwaitingCompaction);
Assert.assertEquals(Long.parseLong(actualStatus.get("bytesCompacted")), bytesCompacted);
Assert.assertEquals(Long.parseLong(actualStatus.get("bytesSkipped")), bytesSkipped);
Assert.assertEquals(Long.parseLong(actualStatus.get("segmentCountAwaitingCompaction")), segmentCountAwaitingCompaction);
Assert.assertEquals(Long.parseLong(actualStatus.get("segmentCountCompacted")), segmentCountCompacted);
Assert.assertEquals(Long.parseLong(actualStatus.get("segmentCountSkipped")), segmentCountSkipped);
Assert.assertEquals(Long.parseLong(actualStatus.get("intervalCountAwaitingCompaction")), intervalCountAwaitingCompaction);
Assert.assertEquals(Long.parseLong(actualStatus.get("intervalCountCompacted")), intervalCountCompacted);
Assert.assertEquals(Long.parseLong(actualStatus.get("intervalCountSkipped")), intervalCountSkipped);
}
}