compaction/status API retains status for datasources that no longer existed causing in-memory used to grow unbounded (#11426)

* compaction/status API retains status for datasources that no longer existed causing in-memory used to grow unbounded

* compaction/status API retains status for datasources that no longer existed causing in-memory used to grow unbounded

* compaction/status API retains status for datasources that no longer existed causing in-memory used to grow unbounded

* fix test

* fix test
This commit is contained in:
Maytas Monsereenusorn 2021-07-13 09:48:06 +07:00 committed by GitHub
parent f5d53569ca
commit 05d5dd9289
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 28 additions and 76 deletions

View File

@ -468,7 +468,7 @@ This is only valid for dataSource which has compaction enabled.
* `/druid/coordinator/v1/compaction/status`
Returns the status and statistics from the latest auto compaction run of all dataSources which have/had auto compaction enabled.
Returns the status and statistics from the auto compaction run of all dataSources which have auto compaction enabled in the latest run.
The response payload includes a list of `latestStatus` objects. Each `latestStatus` represents the status for a dataSource (which has/had auto compaction enabled).
The `latestStatus` object has the following keys:
* `dataSource`: name of the datasource for this status information

View File

@ -183,7 +183,9 @@ public class CompactionResourceTestClient
StatusResponseHolder response = httpClient.go(
new Request(HttpMethod.GET, new URL(url)), responseHandler
).get();
if (!response.getStatus().equals(HttpResponseStatus.OK)) {
if (response.getStatus().equals(HttpResponseStatus.NOT_FOUND)) {
return null;
} else if (!response.getStatus().equals(HttpResponseStatus.OK)) {
throw new ISE(
"Error while getting compaction status status[%s] content[%s]",
response.getStatus(),

View File

@ -210,7 +210,8 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
forceTriggerAutoCompaction(4);
verifyQuery(INDEX_QUERIES_RESOURCE);
verifySegmentsCompacted(0, null);
// Auto compaction stats should be deleted as compacation config was deleted
Assert.assertNull(compactionResource.getCompactionStatus(fullDatasourceName));
checkCompactionIntervals(intervalsBeforeCompaction);
}
}
@ -234,18 +235,7 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
verifyQuery(INDEX_QUERIES_RESOURCE);
verifySegmentsCompacted(0, null);
checkCompactionIntervals(intervalsBeforeCompaction);
getAndAssertCompactionStatus(
fullDatasourceName,
AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING,
0,
0,
0,
0,
0,
0,
0,
0,
0);
Assert.assertNull(compactionResource.getCompactionStatus(fullDatasourceName));
// Update compaction slots to be 1
updateCompactionTaskSlot(1, 1);
// One day compacted (1 new segment) and one day remains uncompacted. (3 total)

View File

@ -21,7 +21,6 @@ package org.apache.druid.server.coordinator.duty;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Maps;
import com.google.inject.Inject;
import org.apache.druid.client.indexing.ClientCompactionTaskGranularitySpec;
import org.apache.druid.client.indexing.ClientCompactionTaskQuery;
@ -213,11 +212,11 @@ public class CompactSegments implements CoordinatorDuty
}
} else {
LOG.info("compactionConfig is empty. Skip.");
updateAutoCompactionSnapshotWhenNoCompactTaskScheduled(currentRunAutoCompactionSnapshotBuilders);
autoCompactionSnapshotPerDataSource.set(new HashMap<>());
}
} else {
LOG.info("maxCompactionTaskSlots was set to 0. Skip compaction");
updateAutoCompactionSnapshotWhenNoCompactTaskScheduled(currentRunAutoCompactionSnapshotBuilders);
autoCompactionSnapshotPerDataSource.set(new HashMap<>());
}
return params.buildFromExisting()
@ -312,16 +311,6 @@ public class CompactSegments implements CoordinatorDuty
Set<String> enabledDatasources = compactionConfigList.stream()
.map(dataSourceCompactionConfig -> dataSourceCompactionConfig.getDataSource())
.collect(Collectors.toSet());
// Update AutoCompactionScheduleStatus for dataSource that now has auto compaction disabled
for (Map.Entry<String, AutoCompactionSnapshot> snapshot : autoCompactionSnapshotPerDataSource.get().entrySet()) {
if (!enabledDatasources.contains(snapshot.getKey())) {
currentRunAutoCompactionSnapshotBuilders.computeIfAbsent(
snapshot.getKey(),
k -> new AutoCompactionSnapshot.Builder(k, AutoCompactionSnapshot.AutoCompactionScheduleStatus.NOT_ENABLED)
);
}
}
// Create and Update snapshot for dataSource that has auto compaction enabled
for (String compactionConfigDataSource : enabledDatasources) {
currentRunAutoCompactionSnapshotBuilders.computeIfAbsent(
@ -420,42 +409,6 @@ public class CompactSegments implements CoordinatorDuty
return newContext;
}
/**
* This method can be use to atomically update the snapshots in {@code autoCompactionSnapshotPerDataSource} when
* no compaction task is schedule in this run. Currently, this method does not update compaction statistics
* (bytes, interval count, segment count, etc) since we skip iterating through the segments and cannot get an update
* on those statistics. Thus, this method only updates the schedule status and task list (compaction statistics
* remains the same as the previous snapshot).
*/
private void updateAutoCompactionSnapshotWhenNoCompactTaskScheduled(
Map<String, AutoCompactionSnapshot.Builder> currentRunAutoCompactionSnapshotBuilders
)
{
Map<String, AutoCompactionSnapshot> previousSnapshots = autoCompactionSnapshotPerDataSource.get();
for (Map.Entry<String, AutoCompactionSnapshot.Builder> autoCompactionSnapshotBuilderEntry : currentRunAutoCompactionSnapshotBuilders.entrySet()) {
final String dataSource = autoCompactionSnapshotBuilderEntry.getKey();
AutoCompactionSnapshot previousSnapshot = previousSnapshots.get(dataSource);
if (previousSnapshot != null) {
autoCompactionSnapshotBuilderEntry.getValue().incrementBytesAwaitingCompaction(previousSnapshot.getBytesAwaitingCompaction());
autoCompactionSnapshotBuilderEntry.getValue().incrementBytesCompacted(previousSnapshot.getBytesCompacted());
autoCompactionSnapshotBuilderEntry.getValue().incrementBytesSkipped(previousSnapshot.getBytesSkipped());
autoCompactionSnapshotBuilderEntry.getValue().incrementSegmentCountAwaitingCompaction(previousSnapshot.getSegmentCountAwaitingCompaction());
autoCompactionSnapshotBuilderEntry.getValue().incrementSegmentCountCompacted(previousSnapshot.getSegmentCountCompacted());
autoCompactionSnapshotBuilderEntry.getValue().incrementSegmentCountSkipped(previousSnapshot.getSegmentCountSkipped());
autoCompactionSnapshotBuilderEntry.getValue().incrementIntervalCountAwaitingCompaction(previousSnapshot.getIntervalCountAwaitingCompaction());
autoCompactionSnapshotBuilderEntry.getValue().incrementIntervalCountCompacted(previousSnapshot.getIntervalCountCompacted());
autoCompactionSnapshotBuilderEntry.getValue().incrementIntervalCountSkipped(previousSnapshot.getIntervalCountSkipped());
}
}
Map<String, AutoCompactionSnapshot> currentAutoCompactionSnapshotPerDataSource = Maps.transformValues(
currentRunAutoCompactionSnapshotBuilders,
AutoCompactionSnapshot.Builder::build
);
// Atomic update of autoCompactionSnapshotPerDataSource with the latest from this coordinator run
autoCompactionSnapshotPerDataSource.set(currentAutoCompactionSnapshotPerDataSource);
}
private CoordinatorStats makeStats(
Map<String, AutoCompactionSnapshot.Builder> currentRunAutoCompactionSnapshotBuilders,
int numCompactionTasks,

View File

@ -74,7 +74,7 @@ public class CompactionResource
{
final Long notCompactedSegmentSizeBytes = coordinator.getTotalSizeOfSegmentsAwaitingCompaction(dataSource);
if (notCompactedSegmentSizeBytes == null) {
return Response.status(Response.Status.BAD_REQUEST).entity(ImmutableMap.of("error", "unknown dataSource")).build();
return Response.status(Response.Status.NOT_FOUND).entity(ImmutableMap.of("error", "unknown dataSource")).build();
} else {
return Response.ok(ImmutableMap.of("remainingSegmentSize", notCompactedSegmentSizeBytes)).build();
}
@ -94,7 +94,7 @@ public class CompactionResource
} else {
AutoCompactionSnapshot autoCompactionSnapshot = coordinator.getAutoCompactionSnapshotForDataSource(dataSource);
if (autoCompactionSnapshot == null) {
return Response.status(Response.Status.BAD_REQUEST).entity(ImmutableMap.of("error", "unknown dataSource")).build();
return Response.status(Response.Status.NOT_FOUND).entity(ImmutableMap.of("error", "unknown dataSource")).build();
}
snapshots = ImmutableList.of(autoCompactionSnapshot);
}

View File

@ -340,17 +340,15 @@ public class CompactSegmentsTest
);
}
// Run auto compaction without any dataSource in the compaction config
// Should still populate the result of everything fully compacted
doCompactSegments(compactSegments, new ArrayList<>());
Assert.assertEquals(
0,
stats.getGlobalStat(CompactSegments.COMPACTION_TASK_COUNT)
);
for (int i = 0; i < 3; i++) {
// Test run auto compaction with one datasource auto compaction disabled
// Snapshot should not contain datasource with auto compaction disabled
List<DataSourceCompactionConfig> removedOneConfig = createCompactionConfigs();
removedOneConfig.remove(0);
doCompactSegments(compactSegments, removedOneConfig);
for (int i = 1; i < 3; i++) {
verifySnapshot(
compactSegments,
AutoCompactionSnapshot.AutoCompactionScheduleStatus.NOT_ENABLED,
AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING,
DATA_SOURCE_PREFIX + i,
0,
TOTAL_BYTE_PER_DATASOURCE,
@ -364,6 +362,15 @@ public class CompactSegmentsTest
);
}
// Run auto compaction without any dataSource in the compaction config
// Snapshot should be empty
doCompactSegments(compactSegments, new ArrayList<>());
Assert.assertEquals(
0,
stats.getGlobalStat(CompactSegments.COMPACTION_TASK_COUNT)
);
Assert.assertTrue(compactSegments.getAutoCompactionSnapshot().isEmpty());
assertLastSegmentNotCompacted(compactSegments);
}

View File

@ -117,6 +117,6 @@ public class CompactionResourceTest
EasyMock.replay(mock);
final Response response = new CompactionResource(mock).getCompactionSnapshotForDataSource(dataSourceName);
Assert.assertEquals(400, response.getStatus());
Assert.assertEquals(404, response.getStatus());
}
}