From 8328d91b30e91d0bb12eb3b7efb2a3c57df81824 Mon Sep 17 00:00:00 2001 From: Maytas Monsereenusorn <52679095+maytasm@users.noreply.github.com> Date: Wed, 15 Apr 2020 11:27:33 -1000 Subject: [PATCH] Add missing integration tests for the compaction by the coordinator (#9644) * Add API to trigger a compaction by the coordinator for integration tests * Add missing integration tests for the compaction by the coordinator * address comments --- docs/operations/api-reference.md | 9 + .../docker/environment-configs/coordinator | 1 + .../docker/environment-configs/middlemanager | 1 + .../clients/CompactionResourceTestClient.java | 178 +++++++++ .../CoordinatorResourceTestClient.java | 23 ++ .../duty/ITAutoCompactionTest.java | 350 ++++++++++++++++++ .../server/coordinator/DruidCoordinator.java | 18 +- .../druid/server/http/CompactionResource.java | 79 ++++ .../server/http/CoordinatorResource.java | 16 - .../security/SecurityResourceFilterTest.java | 4 +- .../org/apache/druid/cli/CliCoordinator.java | 2 + 11 files changed, 661 insertions(+), 20 deletions(-) create mode 100644 integration-tests/src/main/java/org/apache/druid/testing/clients/CompactionResourceTestClient.java create mode 100644 integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java create mode 100644 server/src/main/java/org/apache/druid/server/http/CompactionResource.java diff --git a/docs/operations/api-reference.md b/docs/operations/api-reference.md index db11cfe6c04..a66dd649f41 100644 --- a/docs/operations/api-reference.md +++ b/docs/operations/api-reference.md @@ -362,6 +362,15 @@ Returns total size and count for each interval within given isointerval. Returns total size and count for each datasource for each interval within given isointerval. +#### Compaction Status + +##### GET + +* `/druid/coordinator/v1/compaction/progress?dataSource={dataSource}` + +Returns the total size of segments awaiting compaction for the given dataSource. +This is only valid for dataSource which has compaction enabled. + #### Compaction Configuration ##### GET diff --git a/integration-tests/docker/environment-configs/coordinator b/integration-tests/docker/environment-configs/coordinator index 6bd0260b813..28455ce6aee 100644 --- a/integration-tests/docker/environment-configs/coordinator +++ b/integration-tests/docker/environment-configs/coordinator @@ -35,3 +35,4 @@ druid_manager_lookups_threadPoolSize=2 druid_auth_basic_common_cacheDirectory=/tmp/authCache/coordinator druid_auth_unsecuredPaths=["/druid/coordinator/v1/loadqueue"] druid_server_https_crlPath=/tls/revocations.crl +druid_coordinator_period_indexingPeriod=PT180000S \ No newline at end of file diff --git a/integration-tests/docker/environment-configs/middlemanager b/integration-tests/docker/environment-configs/middlemanager index 9cbe41bce8f..0ca4dbcdcdf 100644 --- a/integration-tests/docker/environment-configs/middlemanager +++ b/integration-tests/docker/environment-configs/middlemanager @@ -37,3 +37,4 @@ druid_indexer_task_chathandler_type=announce druid_auth_basic_common_cacheDirectory=/tmp/authCache/middleManager druid_startup_logging_logProperties=true druid_server_https_crlPath=/tls/revocations.crl +druid_worker_capacity=20 \ No newline at end of file diff --git a/integration-tests/src/main/java/org/apache/druid/testing/clients/CompactionResourceTestClient.java b/integration-tests/src/main/java/org/apache/druid/testing/clients/CompactionResourceTestClient.java new file mode 100644 index 00000000000..60f3930edb3 --- /dev/null +++ b/integration-tests/src/main/java/org/apache/druid/testing/clients/CompactionResourceTestClient.java @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.testing.clients; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.inject.Inject; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.http.client.HttpClient; +import org.apache.druid.java.util.http.client.Request; +import org.apache.druid.java.util.http.client.response.StatusResponseHandler; +import org.apache.druid.java.util.http.client.response.StatusResponseHolder; +import org.apache.druid.server.coordinator.CoordinatorCompactionConfig; +import org.apache.druid.server.coordinator.DataSourceCompactionConfig; +import org.apache.druid.testing.IntegrationTestingConfig; +import org.apache.druid.testing.guice.TestClient; +import org.jboss.netty.handler.codec.http.HttpMethod; +import org.jboss.netty.handler.codec.http.HttpResponseStatus; + +import java.net.URL; +import java.util.Map; + +public class CompactionResourceTestClient +{ + private final ObjectMapper jsonMapper; + private final HttpClient httpClient; + private final String coordinator; + private final StatusResponseHandler responseHandler; + + @Inject + CompactionResourceTestClient( + ObjectMapper jsonMapper, + @TestClient HttpClient httpClient, + IntegrationTestingConfig config + ) + { + this.jsonMapper = jsonMapper; + this.httpClient = httpClient; + this.coordinator = config.getCoordinatorUrl(); + this.responseHandler = StatusResponseHandler.getInstance(); + } + + private String getCoordinatorURL() + { + return StringUtils.format( + "%s/druid/coordinator/v1/", + coordinator + ); + } + + public void submitCompactionConfig(final DataSourceCompactionConfig dataSourceCompactionConfig) throws Exception + { + String url = StringUtils.format("%sconfig/compaction", getCoordinatorURL()); + StatusResponseHolder response = httpClient.go( + new Request(HttpMethod.POST, new URL(url)).setContent( + "application/json", + jsonMapper.writeValueAsBytes(dataSourceCompactionConfig) + ), responseHandler + ).get(); + + if (!response.getStatus().equals(HttpResponseStatus.OK)) { + throw new ISE( + "Error while submiting compaction config status[%s] content[%s]", + response.getStatus(), + response.getContent() + ); + } + } + + public void deleteCompactionConfig(final String dataSource) throws Exception + { + String url = StringUtils.format("%sconfig/compaction/%s", getCoordinatorURL(), StringUtils.urlEncode(dataSource)); + StatusResponseHolder response = httpClient.go(new Request(HttpMethod.DELETE, new URL(url)), responseHandler).get(); + + if (!response.getStatus().equals(HttpResponseStatus.OK)) { + throw new ISE( + "Error while deleting compaction config status[%s] content[%s]", + response.getStatus(), + response.getContent() + ); + } + } + + public CoordinatorCompactionConfig getCoordinatorCompactionConfigs() throws Exception + { + String url = StringUtils.format("%sconfig/compaction", getCoordinatorURL()); + StatusResponseHolder response = httpClient.go( + new Request(HttpMethod.GET, new URL(url)), responseHandler + ).get(); + if (!response.getStatus().equals(HttpResponseStatus.OK)) { + throw new ISE( + "Error while getting compaction config status[%s] content[%s]", + response.getStatus(), + response.getContent() + ); + } + return jsonMapper.readValue(response.getContent(), new TypeReference() {}); + } + + public DataSourceCompactionConfig getDataSourceCompactionConfig(String dataSource) throws Exception + { + String url = StringUtils.format("%sconfig/compaction/%s", getCoordinatorURL(), StringUtils.urlEncode(dataSource)); + StatusResponseHolder response = httpClient.go( + new Request(HttpMethod.GET, new URL(url)), responseHandler + ).get(); + if (!response.getStatus().equals(HttpResponseStatus.OK)) { + throw new ISE( + "Error while getting compaction config status[%s] content[%s]", + response.getStatus(), + response.getContent() + ); + } + return jsonMapper.readValue(response.getContent(), new TypeReference() {}); + } + + public void forceTriggerAutoCompaction() throws Exception + { + String url = StringUtils.format("%scompaction/compact", getCoordinatorURL()); + StatusResponseHolder response = httpClient.go(new Request(HttpMethod.POST, new URL(url)), responseHandler).get(); + if (!response.getStatus().equals(HttpResponseStatus.OK)) { + throw new ISE( + "Error while force trigger auto compaction status[%s] content[%s]", + response.getStatus(), + response.getContent() + ); + } + } + + public void updateCompactionTaskSlot(Double compactionTaskSlotRatio, Integer maxCompactionTaskSlots) throws Exception + { + String url = StringUtils.format("%sconfig/compaction/taskslots?ratio=%s&max=%s", + getCoordinatorURL(), + StringUtils.urlEncode(compactionTaskSlotRatio.toString()), + StringUtils.urlEncode(maxCompactionTaskSlots.toString())); + StatusResponseHolder response = httpClient.go(new Request(HttpMethod.POST, new URL(url)), responseHandler).get(); + if (!response.getStatus().equals(HttpResponseStatus.OK)) { + throw new ISE( + "Error while updating compaction task slot status[%s] content[%s]", + response.getStatus(), + response.getContent() + ); + } + } + + public Map getCompactionProgress(String dataSource) throws Exception + { + String url = StringUtils.format("%scompaction/progress?dataSource=%s", getCoordinatorURL(), StringUtils.urlEncode(dataSource)); + StatusResponseHolder response = httpClient.go( + new Request(HttpMethod.GET, new URL(url)), responseHandler + ).get(); + if (!response.getStatus().equals(HttpResponseStatus.OK)) { + throw new ISE( + "Error while getting compaction progress status[%s] content[%s]", + response.getStatus(), + response.getContent() + ); + } + return jsonMapper.readValue(response.getContent(), new TypeReference>() {}); + } +} diff --git a/integration-tests/src/main/java/org/apache/druid/testing/clients/CoordinatorResourceTestClient.java b/integration-tests/src/main/java/org/apache/druid/testing/clients/CoordinatorResourceTestClient.java index 5f96ca00b49..37bd2f30531 100644 --- a/integration-tests/src/main/java/org/apache/druid/testing/clients/CoordinatorResourceTestClient.java +++ b/integration-tests/src/main/java/org/apache/druid/testing/clients/CoordinatorResourceTestClient.java @@ -79,6 +79,11 @@ public class CoordinatorResourceTestClient return StringUtils.format("%smetadata/datasources/%s/segments", getCoordinatorURL(), StringUtils.urlEncode(dataSource)); } + private String getFullSegmentsMetadataURL(String dataSource) + { + return StringUtils.format("%smetadata/datasources/%s/segments?full", getCoordinatorURL(), StringUtils.urlEncode(dataSource)); + } + private String getIntervalsURL(String dataSource) { return StringUtils.format("%sdatasources/%s/intervals", getCoordinatorURL(), StringUtils.urlEncode(dataSource)); @@ -113,6 +118,24 @@ public class CoordinatorResourceTestClient return segments; } + public List getFullSegmentsMetadata(final String dataSource) + { + List segments; + try { + StatusResponseHolder response = makeRequest(HttpMethod.GET, getFullSegmentsMetadataURL(dataSource)); + + segments = jsonMapper.readValue( + response.getContent(), new TypeReference>() + { + } + ); + } + catch (Exception e) { + throw new RuntimeException(e); + } + return segments; + } + // return a list of the segment dates for the specified datasource public List getSegmentIntervals(final String dataSource) { diff --git a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java new file mode 100644 index 00000000000..592c91034da --- /dev/null +++ b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java @@ -0,0 +1,350 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.tests.coordinator.duty; + +import com.google.inject.Inject; +import org.apache.commons.io.IOUtils; +import org.apache.druid.indexer.partitions.SecondaryPartitionType; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.server.coordinator.CoordinatorCompactionConfig; +import org.apache.druid.server.coordinator.DataSourceCompactionConfig; +import org.apache.druid.testing.IntegrationTestingConfig; +import org.apache.druid.testing.clients.CompactionResourceTestClient; +import org.apache.druid.testing.guice.DruidTestModuleFactory; +import org.apache.druid.testing.utils.ITRetryUtil; +import org.apache.druid.tests.TestNGGroup; +import org.apache.druid.tests.indexer.AbstractITBatchIndexTest; +import org.apache.druid.tests.indexer.AbstractIndexerTest; +import org.apache.druid.timeline.DataSegment; +import org.joda.time.Period; +import org.testng.Assert; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Guice; +import org.testng.annotations.Test; + +import java.io.Closeable; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; + +@Test(groups = {TestNGGroup.OTHER_INDEX}) +@Guice(moduleFactory = DruidTestModuleFactory.class) +public class ITAutoCompactionTest extends AbstractIndexerTest +{ + private static final Logger LOG = new Logger(ITAutoCompactionTest.class); + private static final String INDEX_TASK = "/indexer/wikipedia_index_task.json"; + private static final String INDEX_QUERIES_RESOURCE = "/indexer/wikipedia_index_queries.json"; + private static final int MAX_ROWS_PER_SEGMENT_COMPACTED = 10000; + private static final Period SKIP_OFFSET_FROM_LATEST = Period.seconds(0); + + @Inject + protected CompactionResourceTestClient compactionResource; + + @Inject + private IntegrationTestingConfig config; + + private String fullDatasourceName; + + @BeforeMethod + public void setup() throws Exception + { + // Set comapction slot to 10 + updateCompactionTaskSlot(0.5, 10); + fullDatasourceName = "wikipedia_index_test_" + UUID.randomUUID() + config.getExtraDatasourceNameSuffix(); + } + + @Test + public void testAutoCompactionDutySubmitAndVerifyCompaction() throws Exception + { + loadData(INDEX_TASK); + try (final Closeable ignored = unloader(fullDatasourceName)) { + final List intervalsBeforeCompaction = coordinator.getSegmentIntervals(fullDatasourceName); + intervalsBeforeCompaction.sort(null); + // 4 segments across 2 days (4 total)... + verifySegmentsCount(4); + verifyQuery(INDEX_QUERIES_RESOURCE); + + submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, Period.days(1)); + forceTriggerAutoCompaction(); + //...compacted into 1 new segment for 1 day. 1 day compacted and 1 day skipped/remains uncompacted. (5 total) + verifySegmentsCount(5); + verifyQuery(INDEX_QUERIES_RESOURCE); + verifySegmentsCompacted(1, MAX_ROWS_PER_SEGMENT_COMPACTED); + checkCompactionIntervals(intervalsBeforeCompaction); + + submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, SKIP_OFFSET_FROM_LATEST); + forceTriggerAutoCompaction(); + //...compacted into 1 new segment for the remaining one day. 2 day compacted and 0 day uncompacted. (6 total) + verifySegmentsCount(6); + verifyQuery(INDEX_QUERIES_RESOURCE); + verifySegmentsCompacted(2, MAX_ROWS_PER_SEGMENT_COMPACTED); + checkCompactionIntervals(intervalsBeforeCompaction); + } + } + + @Test + public void testAutoCompactionDutyCanUpdateCompactionConfig() throws Exception + { + loadData(INDEX_TASK); + try (final Closeable ignored = unloader(fullDatasourceName)) { + final List intervalsBeforeCompaction = coordinator.getSegmentIntervals(fullDatasourceName); + intervalsBeforeCompaction.sort(null); + // 4 segments across 2 days (4 total)... + verifySegmentsCount(4); + verifyQuery(INDEX_QUERIES_RESOURCE); + + // Dummy compaction config which will be overwritten + submitCompactionConfig(10000, SKIP_OFFSET_FROM_LATEST); + // New compaction config should overwrites the existing compaction config + submitCompactionConfig(1, SKIP_OFFSET_FROM_LATEST); + forceTriggerAutoCompaction(); + + // Instead of merging segments, the updated config will split segments! + //...compacted into 10 new segments across 2 days. 5 new segments each day (14 total) + verifySegmentsCount(14); + verifyQuery(INDEX_QUERIES_RESOURCE); + verifySegmentsCompacted(10, 1); + + checkCompactionIntervals(intervalsBeforeCompaction); + } + } + + @Test + public void testAutoCompactionDutyCanDeleteCompactionConfig() throws Exception + { + loadData(INDEX_TASK); + try (final Closeable ignored = unloader(fullDatasourceName)) { + final List intervalsBeforeCompaction = coordinator.getSegmentIntervals(fullDatasourceName); + intervalsBeforeCompaction.sort(null); + // 4 segments across 2 days (4 total)... + verifySegmentsCount(4); + verifyQuery(INDEX_QUERIES_RESOURCE); + + submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, SKIP_OFFSET_FROM_LATEST); + deleteCompactionConfig(); + forceTriggerAutoCompaction(); + + // ...should remains unchanged (4 total) + verifySegmentsCount(4); + verifyQuery(INDEX_QUERIES_RESOURCE); + verifySegmentsCompacted(0, null); + + checkCompactionIntervals(intervalsBeforeCompaction); + } + } + + @Test + public void testAutoCompactionDutyCanUpdateTaskSlots() throws Exception + { + loadData(INDEX_TASK); + try (final Closeable ignored = unloader(fullDatasourceName)) { + final List intervalsBeforeCompaction = coordinator.getSegmentIntervals(fullDatasourceName); + intervalsBeforeCompaction.sort(null); + // 4 segments across 2 days (4 total)... + verifySegmentsCount(4); + verifyQuery(INDEX_QUERIES_RESOURCE); + + // Skips first day. Should only compact one out of two days. + submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, SKIP_OFFSET_FROM_LATEST); + + // Set compactionTaskSlotRatio to 0 to prevent any compaction + updateCompactionTaskSlot(0, 100); + forceTriggerAutoCompaction(); + // ...should remains unchanged (4 total) + verifySegmentsCount(4); + verifyQuery(INDEX_QUERIES_RESOURCE); + verifySegmentsCompacted(0, null); + checkCompactionIntervals(intervalsBeforeCompaction); + + // Set maxCompactionTaskSlots to 0 to prevent any compaction + updateCompactionTaskSlot(0.1, 0); + forceTriggerAutoCompaction(); + // ...should remains unchanged (4 total) + verifySegmentsCount(4); + verifyQuery(INDEX_QUERIES_RESOURCE); + verifySegmentsCompacted(0, null); + checkCompactionIntervals(intervalsBeforeCompaction); + + // Update compaction slots to be 1 + updateCompactionTaskSlot(1, 1); + forceTriggerAutoCompaction(); + // One day compacted (1 new segment) and one day remains uncompacted. (5 total) + verifySegmentsCount(5); + verifyQuery(INDEX_QUERIES_RESOURCE); + verifySegmentsCompacted(1, MAX_ROWS_PER_SEGMENT_COMPACTED); + checkCompactionIntervals(intervalsBeforeCompaction); + Assert.assertEquals(compactionResource.getCompactionProgress(fullDatasourceName).get("remainingSegmentSize"), "14312"); + // Run compaction again to compact the remaining day + forceTriggerAutoCompaction(); + // Remaining day compacted (1 new segment). Now both days compacted (6 total) + verifySegmentsCount(6); + verifyQuery(INDEX_QUERIES_RESOURCE); + verifySegmentsCompacted(2, MAX_ROWS_PER_SEGMENT_COMPACTED); + checkCompactionIntervals(intervalsBeforeCompaction); + } + } + + private void loadData(String indexTask) throws Exception + { + String taskSpec = getResourceAsString(indexTask); + taskSpec = StringUtils.replace(taskSpec, "%%DATASOURCE%%", fullDatasourceName); + final String taskID = indexer.submitTask(taskSpec); + LOG.info("TaskID for loading index task %s", taskID); + indexer.waitUntilTaskCompletes(taskID); + + ITRetryUtil.retryUntilTrue( + () -> coordinator.areSegmentsLoaded(fullDatasourceName), + "Segment Load" + ); + } + + private void verifyQuery(String queryResource) throws Exception + { + String queryResponseTemplate; + try { + InputStream is = AbstractITBatchIndexTest.class.getResourceAsStream(queryResource); + queryResponseTemplate = IOUtils.toString(is, StandardCharsets.UTF_8); + } + catch (IOException e) { + throw new ISE(e, "could not read query file: %s", queryResource); + } + + queryResponseTemplate = StringUtils.replace( + queryResponseTemplate, + "%%DATASOURCE%%", + fullDatasourceName + ); + + queryHelper.testQueriesFromString(queryResponseTemplate, 2); + } + + private void submitCompactionConfig(Integer maxRowsPerSegment, Period skipOffsetFromLatest) throws Exception + { + DataSourceCompactionConfig compactionConfig = new DataSourceCompactionConfig(fullDatasourceName, + null, + null, + maxRowsPerSegment, + skipOffsetFromLatest, + null, + null); + compactionResource.submitCompactionConfig(compactionConfig); + + // Verify that the compaction config is updated correctly. + CoordinatorCompactionConfig coordinatorCompactionConfig = compactionResource.getCoordinatorCompactionConfigs(); + DataSourceCompactionConfig foundDataSourceCompactionConfig = null; + for (DataSourceCompactionConfig dataSourceCompactionConfig : coordinatorCompactionConfig.getCompactionConfigs()) { + if (dataSourceCompactionConfig.getDataSource().equals(fullDatasourceName)) { + foundDataSourceCompactionConfig = dataSourceCompactionConfig; + } + } + Assert.assertNotNull(foundDataSourceCompactionConfig); + Assert.assertEquals(foundDataSourceCompactionConfig.getMaxRowsPerSegment(), maxRowsPerSegment); + Assert.assertEquals(foundDataSourceCompactionConfig.getSkipOffsetFromLatest(), skipOffsetFromLatest); + + foundDataSourceCompactionConfig = compactionResource.getDataSourceCompactionConfig(fullDatasourceName); + Assert.assertNotNull(foundDataSourceCompactionConfig); + Assert.assertEquals(foundDataSourceCompactionConfig.getMaxRowsPerSegment(), maxRowsPerSegment); + Assert.assertEquals(foundDataSourceCompactionConfig.getSkipOffsetFromLatest(), skipOffsetFromLatest); + } + + private void deleteCompactionConfig() throws Exception + { + compactionResource.deleteCompactionConfig(fullDatasourceName); + + // Verify that the compaction config is updated correctly. + CoordinatorCompactionConfig coordinatorCompactionConfig = compactionResource.getCoordinatorCompactionConfigs(); + DataSourceCompactionConfig foundDataSourceCompactionConfig = null; + for (DataSourceCompactionConfig dataSourceCompactionConfig : coordinatorCompactionConfig.getCompactionConfigs()) { + if (dataSourceCompactionConfig.getDataSource().equals(fullDatasourceName)) { + foundDataSourceCompactionConfig = dataSourceCompactionConfig; + } + } + Assert.assertNull(foundDataSourceCompactionConfig); + } + + private void forceTriggerAutoCompaction() throws Exception + { + compactionResource.forceTriggerAutoCompaction(); + waitForAllTasksToComplete(); + ITRetryUtil.retryUntilTrue( + () -> coordinator.areSegmentsLoaded(fullDatasourceName), + "Segment Compaction" + ); + } + + private void verifySegmentsCount(int numExpectedSegments) + { + ITRetryUtil.retryUntilTrue( + () -> { + int metadataSegmentCount = coordinator.getSegments(fullDatasourceName).size(); + LOG.info("Current metadata segment count: %d, expected: %d", metadataSegmentCount, numExpectedSegments); + return metadataSegmentCount == numExpectedSegments; + }, + "Compaction segment count check" + ); + } + + private void checkCompactionIntervals(List expectedIntervals) + { + ITRetryUtil.retryUntilTrue( + () -> { + final List actualIntervals = coordinator.getSegmentIntervals(fullDatasourceName); + actualIntervals.sort(null); + return actualIntervals.equals(expectedIntervals); + }, + "Compaction interval check" + ); + } + + private void verifySegmentsCompacted(int expectedCompactedSegmentCount, Integer expectedMaxRowsPerSegment) + { + List segments = coordinator.getFullSegmentsMetadata(fullDatasourceName); + List foundCompactedSegments = new ArrayList<>(); + for (DataSegment segment : segments) { + if (segment.getLastCompactionState() != null) { + foundCompactedSegments.add(segment); + } + } + Assert.assertEquals(foundCompactedSegments.size(), expectedCompactedSegmentCount); + for (DataSegment compactedSegment : foundCompactedSegments) { + Assert.assertNotNull(compactedSegment.getLastCompactionState()); + Assert.assertNotNull(compactedSegment.getLastCompactionState().getPartitionsSpec()); + Assert.assertEquals(compactedSegment.getLastCompactionState().getPartitionsSpec().getMaxRowsPerSegment(), + expectedMaxRowsPerSegment); + Assert.assertEquals(compactedSegment.getLastCompactionState().getPartitionsSpec().getType(), + SecondaryPartitionType.LINEAR + ); + } + } + + private void updateCompactionTaskSlot(double compactionTaskSlotRatio, int maxCompactionTaskSlots) throws Exception + { + compactionResource.updateCompactionTaskSlot(compactionTaskSlotRatio, maxCompactionTaskSlots); + // Verify that the compaction config is updated correctly. + CoordinatorCompactionConfig coordinatorCompactionConfig = compactionResource.getCoordinatorCompactionConfigs(); + Assert.assertEquals(coordinatorCompactionConfig.getCompactionTaskSlotRatio(), compactionTaskSlotRatio); + Assert.assertEquals(coordinatorCompactionConfig.getMaxCompactionTaskSlots(), maxCompactionTaskSlots); + } +} diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java index 72c648d8488..f8c3f43c76f 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java @@ -513,6 +513,13 @@ public class DruidCoordinator } } + public void runCompactSegmentsDuty() + { + final int startingLeaderCounter = coordLeaderSelector.localTerm(); + DutiesRunnable compactSegmentsDuty = new DutiesRunnable(makeCompactSegmentsDuty(), startingLeaderCounter); + compactSegmentsDuty.run(); + } + private void becomeLeader() { synchronized (lock) { @@ -611,7 +618,7 @@ public class DruidCoordinator { List duties = new ArrayList<>(); duties.add(new LogUsedSegments()); - duties.add(compactSegments); + duties.addAll(makeCompactSegmentsDuty()); duties.addAll(indexingServiceDuties); log.debug( @@ -621,13 +628,18 @@ public class DruidCoordinator return ImmutableList.copyOf(duties); } - public class DutiesRunnable implements Runnable + private List makeCompactSegmentsDuty() + { + return ImmutableList.of(compactSegments); + } + + private class DutiesRunnable implements Runnable { private final long startTimeNanos = System.nanoTime(); private final List duties; private final int startingLeaderCounter; - protected DutiesRunnable(List duties, final int startingLeaderCounter) + private DutiesRunnable(List duties, final int startingLeaderCounter) { this.duties = duties; this.startingLeaderCounter = startingLeaderCounter; diff --git a/server/src/main/java/org/apache/druid/server/http/CompactionResource.java b/server/src/main/java/org/apache/druid/server/http/CompactionResource.java new file mode 100644 index 00000000000..fd4cf4fcb9d --- /dev/null +++ b/server/src/main/java/org/apache/druid/server/http/CompactionResource.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.http; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableMap; +import com.google.inject.Inject; +import com.sun.jersey.spi.container.ResourceFilters; +import org.apache.druid.server.coordinator.DruidCoordinator; +import org.apache.druid.server.http.security.ConfigResourceFilter; +import org.apache.druid.server.http.security.StateResourceFilter; + +import javax.ws.rs.GET; +import javax.ws.rs.POST; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; +import javax.ws.rs.QueryParam; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; + +@Path("/druid/coordinator/v1/compaction") +public class CompactionResource +{ + private final DruidCoordinator coordinator; + + @Inject + public CompactionResource( + DruidCoordinator coordinator + ) + { + this.coordinator = coordinator; + } + + /** + * This API is meant to only be used by Druid's integration tests. + */ + @POST + @Path("/compact") + @ResourceFilters(ConfigResourceFilter.class) + @VisibleForTesting + public Response forceTriggerCompaction() + { + coordinator.runCompactSegmentsDuty(); + return Response.ok().build(); + } + + @GET + @Path("/progress") + @Produces(MediaType.APPLICATION_JSON) + @ResourceFilters(StateResourceFilter.class) + public Response getCompactionProgress( + @QueryParam("dataSource") String dataSource + ) + { + final Long notCompactedSegmentSizeBytes = coordinator.getTotalSizeOfSegmentsAwaitingCompaction(dataSource); + if (notCompactedSegmentSizeBytes == null) { + return Response.status(Response.Status.BAD_REQUEST).entity(ImmutableMap.of("error", "unknown dataSource")).build(); + } else { + return Response.ok(ImmutableMap.of("remainingSegmentSize", notCompactedSegmentSizeBytes)).build(); + } + } +} diff --git a/server/src/main/java/org/apache/druid/server/http/CoordinatorResource.java b/server/src/main/java/org/apache/druid/server/http/CoordinatorResource.java index c595b932b1d..92543e6a9c7 100644 --- a/server/src/main/java/org/apache/druid/server/http/CoordinatorResource.java +++ b/server/src/main/java/org/apache/druid/server/http/CoordinatorResource.java @@ -36,7 +36,6 @@ import javax.ws.rs.Produces; import javax.ws.rs.QueryParam; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; -import javax.ws.rs.core.Response.Status; import java.util.Map; /** @@ -161,19 +160,4 @@ public class CoordinatorResource ) ).build(); } - - @GET - @Path("/remainingSegmentSizeForCompaction") - @Produces(MediaType.APPLICATION_JSON) - public Response getTotalSizeOfSegmentsAwaitingCompaction( - @QueryParam("dataSource") String dataSource - ) - { - final Long notCompactedSegmentSizeBytes = coordinator.getTotalSizeOfSegmentsAwaitingCompaction(dataSource); - if (notCompactedSegmentSizeBytes == null) { - return Response.status(Status.BAD_REQUEST).entity(ImmutableMap.of("error", "unknown dataSource")).build(); - } else { - return Response.ok(ImmutableMap.of("remainingSegmentSize", notCompactedSegmentSizeBytes)).build(); - } - } } diff --git a/server/src/test/java/org/apache/druid/server/http/security/SecurityResourceFilterTest.java b/server/src/test/java/org/apache/druid/server/http/security/SecurityResourceFilterTest.java index e0a5ba13334..c0b83145d35 100644 --- a/server/src/test/java/org/apache/druid/server/http/security/SecurityResourceFilterTest.java +++ b/server/src/test/java/org/apache/druid/server/http/security/SecurityResourceFilterTest.java @@ -28,6 +28,7 @@ import org.apache.druid.server.ClientInfoResource; import org.apache.druid.server.QueryResource; import org.apache.druid.server.StatusResource; import org.apache.druid.server.http.BrokerResource; +import org.apache.druid.server.http.CompactionResource; import org.apache.druid.server.http.CoordinatorDynamicConfigsResource; import org.apache.druid.server.http.CoordinatorResource; import org.apache.druid.server.http.DataSourcesResource; @@ -72,7 +73,8 @@ public class SecurityResourceFilterTest extends ResourceFilterTestHelper getRequestPathsWithAuthorizer(StatusResource.class), getRequestPathsWithAuthorizer(SelfDiscoveryResource.class), getRequestPathsWithAuthorizer(BrokerQueryResource.class), - getRequestPathsWithAuthorizer(RouterResource.class) + getRequestPathsWithAuthorizer(RouterResource.class), + getRequestPathsWithAuthorizer(CompactionResource.class) ) ); } diff --git a/services/src/main/java/org/apache/druid/cli/CliCoordinator.java b/services/src/main/java/org/apache/druid/cli/CliCoordinator.java index 8d1938dc09b..bc5d840e30d 100644 --- a/services/src/main/java/org/apache/druid/cli/CliCoordinator.java +++ b/services/src/main/java/org/apache/druid/cli/CliCoordinator.java @@ -72,6 +72,7 @@ import org.apache.druid.server.coordinator.LoadQueueTaskMaster; import org.apache.druid.server.coordinator.duty.CoordinatorDuty; import org.apache.druid.server.coordinator.duty.KillUnusedSegments; import org.apache.druid.server.http.ClusterResource; +import org.apache.druid.server.http.CompactionResource; import org.apache.druid.server.http.CoordinatorCompactionConfigsResource; import org.apache.druid.server.http.CoordinatorDynamicConfigsResource; import org.apache.druid.server.http.CoordinatorRedirectInfo; @@ -196,6 +197,7 @@ public class CliCoordinator extends ServerRunnable .to(CoordinatorJettyServerInitializer.class); Jerseys.addResource(binder, CoordinatorResource.class); + Jerseys.addResource(binder, CompactionResource.class); Jerseys.addResource(binder, CoordinatorDynamicConfigsResource.class); Jerseys.addResource(binder, CoordinatorCompactionConfigsResource.class); Jerseys.addResource(binder, TiersResource.class);