Add missing integration tests for the compaction by the coordinator (#9644)

* Add API to trigger a compaction by the coordinator for integration tests

* Add missing integration tests for the compaction by the coordinator

* address comments
This commit is contained in:
Maytas Monsereenusorn 2020-04-15 11:27:33 -10:00 committed by GitHub
parent b8f7128b2d
commit 8328d91b30
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 661 additions and 20 deletions

View File

@ -362,6 +362,15 @@ Returns total size and count for each interval within given isointerval.
Returns total size and count for each datasource for each interval within given isointerval.
#### Compaction Status
##### GET
* `/druid/coordinator/v1/compaction/progress?dataSource={dataSource}`
Returns the total size of segments awaiting compaction for the given dataSource.
This is only valid for dataSource which has compaction enabled.
#### Compaction Configuration
##### GET

View File

@ -35,3 +35,4 @@ druid_manager_lookups_threadPoolSize=2
druid_auth_basic_common_cacheDirectory=/tmp/authCache/coordinator
druid_auth_unsecuredPaths=["/druid/coordinator/v1/loadqueue"]
druid_server_https_crlPath=/tls/revocations.crl
druid_coordinator_period_indexingPeriod=PT180000S

View File

@ -37,3 +37,4 @@ druid_indexer_task_chathandler_type=announce
druid_auth_basic_common_cacheDirectory=/tmp/authCache/middleManager
druid_startup_logging_logProperties=true
druid_server_https_crlPath=/tls/revocations.crl
druid_worker_capacity=20

View File

@ -0,0 +1,178 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.testing.clients;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.inject.Inject;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.java.util.http.client.Request;
import org.apache.druid.java.util.http.client.response.StatusResponseHandler;
import org.apache.druid.java.util.http.client.response.StatusResponseHolder;
import org.apache.druid.server.coordinator.CoordinatorCompactionConfig;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
import org.apache.druid.testing.IntegrationTestingConfig;
import org.apache.druid.testing.guice.TestClient;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import java.net.URL;
import java.util.Map;
public class CompactionResourceTestClient
{
private final ObjectMapper jsonMapper;
private final HttpClient httpClient;
private final String coordinator;
private final StatusResponseHandler responseHandler;
@Inject
CompactionResourceTestClient(
ObjectMapper jsonMapper,
@TestClient HttpClient httpClient,
IntegrationTestingConfig config
)
{
this.jsonMapper = jsonMapper;
this.httpClient = httpClient;
this.coordinator = config.getCoordinatorUrl();
this.responseHandler = StatusResponseHandler.getInstance();
}
private String getCoordinatorURL()
{
return StringUtils.format(
"%s/druid/coordinator/v1/",
coordinator
);
}
public void submitCompactionConfig(final DataSourceCompactionConfig dataSourceCompactionConfig) throws Exception
{
String url = StringUtils.format("%sconfig/compaction", getCoordinatorURL());
StatusResponseHolder response = httpClient.go(
new Request(HttpMethod.POST, new URL(url)).setContent(
"application/json",
jsonMapper.writeValueAsBytes(dataSourceCompactionConfig)
), responseHandler
).get();
if (!response.getStatus().equals(HttpResponseStatus.OK)) {
throw new ISE(
"Error while submiting compaction config status[%s] content[%s]",
response.getStatus(),
response.getContent()
);
}
}
public void deleteCompactionConfig(final String dataSource) throws Exception
{
String url = StringUtils.format("%sconfig/compaction/%s", getCoordinatorURL(), StringUtils.urlEncode(dataSource));
StatusResponseHolder response = httpClient.go(new Request(HttpMethod.DELETE, new URL(url)), responseHandler).get();
if (!response.getStatus().equals(HttpResponseStatus.OK)) {
throw new ISE(
"Error while deleting compaction config status[%s] content[%s]",
response.getStatus(),
response.getContent()
);
}
}
public CoordinatorCompactionConfig getCoordinatorCompactionConfigs() throws Exception
{
String url = StringUtils.format("%sconfig/compaction", getCoordinatorURL());
StatusResponseHolder response = httpClient.go(
new Request(HttpMethod.GET, new URL(url)), responseHandler
).get();
if (!response.getStatus().equals(HttpResponseStatus.OK)) {
throw new ISE(
"Error while getting compaction config status[%s] content[%s]",
response.getStatus(),
response.getContent()
);
}
return jsonMapper.readValue(response.getContent(), new TypeReference<CoordinatorCompactionConfig>() {});
}
public DataSourceCompactionConfig getDataSourceCompactionConfig(String dataSource) throws Exception
{
String url = StringUtils.format("%sconfig/compaction/%s", getCoordinatorURL(), StringUtils.urlEncode(dataSource));
StatusResponseHolder response = httpClient.go(
new Request(HttpMethod.GET, new URL(url)), responseHandler
).get();
if (!response.getStatus().equals(HttpResponseStatus.OK)) {
throw new ISE(
"Error while getting compaction config status[%s] content[%s]",
response.getStatus(),
response.getContent()
);
}
return jsonMapper.readValue(response.getContent(), new TypeReference<DataSourceCompactionConfig>() {});
}
public void forceTriggerAutoCompaction() throws Exception
{
String url = StringUtils.format("%scompaction/compact", getCoordinatorURL());
StatusResponseHolder response = httpClient.go(new Request(HttpMethod.POST, new URL(url)), responseHandler).get();
if (!response.getStatus().equals(HttpResponseStatus.OK)) {
throw new ISE(
"Error while force trigger auto compaction status[%s] content[%s]",
response.getStatus(),
response.getContent()
);
}
}
public void updateCompactionTaskSlot(Double compactionTaskSlotRatio, Integer maxCompactionTaskSlots) throws Exception
{
String url = StringUtils.format("%sconfig/compaction/taskslots?ratio=%s&max=%s",
getCoordinatorURL(),
StringUtils.urlEncode(compactionTaskSlotRatio.toString()),
StringUtils.urlEncode(maxCompactionTaskSlots.toString()));
StatusResponseHolder response = httpClient.go(new Request(HttpMethod.POST, new URL(url)), responseHandler).get();
if (!response.getStatus().equals(HttpResponseStatus.OK)) {
throw new ISE(
"Error while updating compaction task slot status[%s] content[%s]",
response.getStatus(),
response.getContent()
);
}
}
public Map<String, String> getCompactionProgress(String dataSource) throws Exception
{
String url = StringUtils.format("%scompaction/progress?dataSource=%s", getCoordinatorURL(), StringUtils.urlEncode(dataSource));
StatusResponseHolder response = httpClient.go(
new Request(HttpMethod.GET, new URL(url)), responseHandler
).get();
if (!response.getStatus().equals(HttpResponseStatus.OK)) {
throw new ISE(
"Error while getting compaction progress status[%s] content[%s]",
response.getStatus(),
response.getContent()
);
}
return jsonMapper.readValue(response.getContent(), new TypeReference<Map<String, String>>() {});
}
}

View File

@ -79,6 +79,11 @@ public class CoordinatorResourceTestClient
return StringUtils.format("%smetadata/datasources/%s/segments", getCoordinatorURL(), StringUtils.urlEncode(dataSource));
}
private String getFullSegmentsMetadataURL(String dataSource)
{
return StringUtils.format("%smetadata/datasources/%s/segments?full", getCoordinatorURL(), StringUtils.urlEncode(dataSource));
}
private String getIntervalsURL(String dataSource)
{
return StringUtils.format("%sdatasources/%s/intervals", getCoordinatorURL(), StringUtils.urlEncode(dataSource));
@ -113,6 +118,24 @@ public class CoordinatorResourceTestClient
return segments;
}
public List<DataSegment> getFullSegmentsMetadata(final String dataSource)
{
List<DataSegment> segments;
try {
StatusResponseHolder response = makeRequest(HttpMethod.GET, getFullSegmentsMetadataURL(dataSource));
segments = jsonMapper.readValue(
response.getContent(), new TypeReference<List<DataSegment>>()
{
}
);
}
catch (Exception e) {
throw new RuntimeException(e);
}
return segments;
}
// return a list of the segment dates for the specified datasource
public List<String> getSegmentIntervals(final String dataSource)
{

View File

@ -0,0 +1,350 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.tests.coordinator.duty;
import com.google.inject.Inject;
import org.apache.commons.io.IOUtils;
import org.apache.druid.indexer.partitions.SecondaryPartitionType;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.server.coordinator.CoordinatorCompactionConfig;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
import org.apache.druid.testing.IntegrationTestingConfig;
import org.apache.druid.testing.clients.CompactionResourceTestClient;
import org.apache.druid.testing.guice.DruidTestModuleFactory;
import org.apache.druid.testing.utils.ITRetryUtil;
import org.apache.druid.tests.TestNGGroup;
import org.apache.druid.tests.indexer.AbstractITBatchIndexTest;
import org.apache.druid.tests.indexer.AbstractIndexerTest;
import org.apache.druid.timeline.DataSegment;
import org.joda.time.Period;
import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Guice;
import org.testng.annotations.Test;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
@Test(groups = {TestNGGroup.OTHER_INDEX})
@Guice(moduleFactory = DruidTestModuleFactory.class)
public class ITAutoCompactionTest extends AbstractIndexerTest
{
private static final Logger LOG = new Logger(ITAutoCompactionTest.class);
private static final String INDEX_TASK = "/indexer/wikipedia_index_task.json";
private static final String INDEX_QUERIES_RESOURCE = "/indexer/wikipedia_index_queries.json";
private static final int MAX_ROWS_PER_SEGMENT_COMPACTED = 10000;
private static final Period SKIP_OFFSET_FROM_LATEST = Period.seconds(0);
@Inject
protected CompactionResourceTestClient compactionResource;
@Inject
private IntegrationTestingConfig config;
private String fullDatasourceName;
@BeforeMethod
public void setup() throws Exception
{
// Set comapction slot to 10
updateCompactionTaskSlot(0.5, 10);
fullDatasourceName = "wikipedia_index_test_" + UUID.randomUUID() + config.getExtraDatasourceNameSuffix();
}
@Test
public void testAutoCompactionDutySubmitAndVerifyCompaction() throws Exception
{
loadData(INDEX_TASK);
try (final Closeable ignored = unloader(fullDatasourceName)) {
final List<String> intervalsBeforeCompaction = coordinator.getSegmentIntervals(fullDatasourceName);
intervalsBeforeCompaction.sort(null);
// 4 segments across 2 days (4 total)...
verifySegmentsCount(4);
verifyQuery(INDEX_QUERIES_RESOURCE);
submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, Period.days(1));
forceTriggerAutoCompaction();
//...compacted into 1 new segment for 1 day. 1 day compacted and 1 day skipped/remains uncompacted. (5 total)
verifySegmentsCount(5);
verifyQuery(INDEX_QUERIES_RESOURCE);
verifySegmentsCompacted(1, MAX_ROWS_PER_SEGMENT_COMPACTED);
checkCompactionIntervals(intervalsBeforeCompaction);
submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, SKIP_OFFSET_FROM_LATEST);
forceTriggerAutoCompaction();
//...compacted into 1 new segment for the remaining one day. 2 day compacted and 0 day uncompacted. (6 total)
verifySegmentsCount(6);
verifyQuery(INDEX_QUERIES_RESOURCE);
verifySegmentsCompacted(2, MAX_ROWS_PER_SEGMENT_COMPACTED);
checkCompactionIntervals(intervalsBeforeCompaction);
}
}
@Test
public void testAutoCompactionDutyCanUpdateCompactionConfig() throws Exception
{
loadData(INDEX_TASK);
try (final Closeable ignored = unloader(fullDatasourceName)) {
final List<String> intervalsBeforeCompaction = coordinator.getSegmentIntervals(fullDatasourceName);
intervalsBeforeCompaction.sort(null);
// 4 segments across 2 days (4 total)...
verifySegmentsCount(4);
verifyQuery(INDEX_QUERIES_RESOURCE);
// Dummy compaction config which will be overwritten
submitCompactionConfig(10000, SKIP_OFFSET_FROM_LATEST);
// New compaction config should overwrites the existing compaction config
submitCompactionConfig(1, SKIP_OFFSET_FROM_LATEST);
forceTriggerAutoCompaction();
// Instead of merging segments, the updated config will split segments!
//...compacted into 10 new segments across 2 days. 5 new segments each day (14 total)
verifySegmentsCount(14);
verifyQuery(INDEX_QUERIES_RESOURCE);
verifySegmentsCompacted(10, 1);
checkCompactionIntervals(intervalsBeforeCompaction);
}
}
@Test
public void testAutoCompactionDutyCanDeleteCompactionConfig() throws Exception
{
loadData(INDEX_TASK);
try (final Closeable ignored = unloader(fullDatasourceName)) {
final List<String> intervalsBeforeCompaction = coordinator.getSegmentIntervals(fullDatasourceName);
intervalsBeforeCompaction.sort(null);
// 4 segments across 2 days (4 total)...
verifySegmentsCount(4);
verifyQuery(INDEX_QUERIES_RESOURCE);
submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, SKIP_OFFSET_FROM_LATEST);
deleteCompactionConfig();
forceTriggerAutoCompaction();
// ...should remains unchanged (4 total)
verifySegmentsCount(4);
verifyQuery(INDEX_QUERIES_RESOURCE);
verifySegmentsCompacted(0, null);
checkCompactionIntervals(intervalsBeforeCompaction);
}
}
@Test
public void testAutoCompactionDutyCanUpdateTaskSlots() throws Exception
{
loadData(INDEX_TASK);
try (final Closeable ignored = unloader(fullDatasourceName)) {
final List<String> intervalsBeforeCompaction = coordinator.getSegmentIntervals(fullDatasourceName);
intervalsBeforeCompaction.sort(null);
// 4 segments across 2 days (4 total)...
verifySegmentsCount(4);
verifyQuery(INDEX_QUERIES_RESOURCE);
// Skips first day. Should only compact one out of two days.
submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, SKIP_OFFSET_FROM_LATEST);
// Set compactionTaskSlotRatio to 0 to prevent any compaction
updateCompactionTaskSlot(0, 100);
forceTriggerAutoCompaction();
// ...should remains unchanged (4 total)
verifySegmentsCount(4);
verifyQuery(INDEX_QUERIES_RESOURCE);
verifySegmentsCompacted(0, null);
checkCompactionIntervals(intervalsBeforeCompaction);
// Set maxCompactionTaskSlots to 0 to prevent any compaction
updateCompactionTaskSlot(0.1, 0);
forceTriggerAutoCompaction();
// ...should remains unchanged (4 total)
verifySegmentsCount(4);
verifyQuery(INDEX_QUERIES_RESOURCE);
verifySegmentsCompacted(0, null);
checkCompactionIntervals(intervalsBeforeCompaction);
// Update compaction slots to be 1
updateCompactionTaskSlot(1, 1);
forceTriggerAutoCompaction();
// One day compacted (1 new segment) and one day remains uncompacted. (5 total)
verifySegmentsCount(5);
verifyQuery(INDEX_QUERIES_RESOURCE);
verifySegmentsCompacted(1, MAX_ROWS_PER_SEGMENT_COMPACTED);
checkCompactionIntervals(intervalsBeforeCompaction);
Assert.assertEquals(compactionResource.getCompactionProgress(fullDatasourceName).get("remainingSegmentSize"), "14312");
// Run compaction again to compact the remaining day
forceTriggerAutoCompaction();
// Remaining day compacted (1 new segment). Now both days compacted (6 total)
verifySegmentsCount(6);
verifyQuery(INDEX_QUERIES_RESOURCE);
verifySegmentsCompacted(2, MAX_ROWS_PER_SEGMENT_COMPACTED);
checkCompactionIntervals(intervalsBeforeCompaction);
}
}
private void loadData(String indexTask) throws Exception
{
String taskSpec = getResourceAsString(indexTask);
taskSpec = StringUtils.replace(taskSpec, "%%DATASOURCE%%", fullDatasourceName);
final String taskID = indexer.submitTask(taskSpec);
LOG.info("TaskID for loading index task %s", taskID);
indexer.waitUntilTaskCompletes(taskID);
ITRetryUtil.retryUntilTrue(
() -> coordinator.areSegmentsLoaded(fullDatasourceName),
"Segment Load"
);
}
private void verifyQuery(String queryResource) throws Exception
{
String queryResponseTemplate;
try {
InputStream is = AbstractITBatchIndexTest.class.getResourceAsStream(queryResource);
queryResponseTemplate = IOUtils.toString(is, StandardCharsets.UTF_8);
}
catch (IOException e) {
throw new ISE(e, "could not read query file: %s", queryResource);
}
queryResponseTemplate = StringUtils.replace(
queryResponseTemplate,
"%%DATASOURCE%%",
fullDatasourceName
);
queryHelper.testQueriesFromString(queryResponseTemplate, 2);
}
private void submitCompactionConfig(Integer maxRowsPerSegment, Period skipOffsetFromLatest) throws Exception
{
DataSourceCompactionConfig compactionConfig = new DataSourceCompactionConfig(fullDatasourceName,
null,
null,
maxRowsPerSegment,
skipOffsetFromLatest,
null,
null);
compactionResource.submitCompactionConfig(compactionConfig);
// Verify that the compaction config is updated correctly.
CoordinatorCompactionConfig coordinatorCompactionConfig = compactionResource.getCoordinatorCompactionConfigs();
DataSourceCompactionConfig foundDataSourceCompactionConfig = null;
for (DataSourceCompactionConfig dataSourceCompactionConfig : coordinatorCompactionConfig.getCompactionConfigs()) {
if (dataSourceCompactionConfig.getDataSource().equals(fullDatasourceName)) {
foundDataSourceCompactionConfig = dataSourceCompactionConfig;
}
}
Assert.assertNotNull(foundDataSourceCompactionConfig);
Assert.assertEquals(foundDataSourceCompactionConfig.getMaxRowsPerSegment(), maxRowsPerSegment);
Assert.assertEquals(foundDataSourceCompactionConfig.getSkipOffsetFromLatest(), skipOffsetFromLatest);
foundDataSourceCompactionConfig = compactionResource.getDataSourceCompactionConfig(fullDatasourceName);
Assert.assertNotNull(foundDataSourceCompactionConfig);
Assert.assertEquals(foundDataSourceCompactionConfig.getMaxRowsPerSegment(), maxRowsPerSegment);
Assert.assertEquals(foundDataSourceCompactionConfig.getSkipOffsetFromLatest(), skipOffsetFromLatest);
}
private void deleteCompactionConfig() throws Exception
{
compactionResource.deleteCompactionConfig(fullDatasourceName);
// Verify that the compaction config is updated correctly.
CoordinatorCompactionConfig coordinatorCompactionConfig = compactionResource.getCoordinatorCompactionConfigs();
DataSourceCompactionConfig foundDataSourceCompactionConfig = null;
for (DataSourceCompactionConfig dataSourceCompactionConfig : coordinatorCompactionConfig.getCompactionConfigs()) {
if (dataSourceCompactionConfig.getDataSource().equals(fullDatasourceName)) {
foundDataSourceCompactionConfig = dataSourceCompactionConfig;
}
}
Assert.assertNull(foundDataSourceCompactionConfig);
}
private void forceTriggerAutoCompaction() throws Exception
{
compactionResource.forceTriggerAutoCompaction();
waitForAllTasksToComplete();
ITRetryUtil.retryUntilTrue(
() -> coordinator.areSegmentsLoaded(fullDatasourceName),
"Segment Compaction"
);
}
private void verifySegmentsCount(int numExpectedSegments)
{
ITRetryUtil.retryUntilTrue(
() -> {
int metadataSegmentCount = coordinator.getSegments(fullDatasourceName).size();
LOG.info("Current metadata segment count: %d, expected: %d", metadataSegmentCount, numExpectedSegments);
return metadataSegmentCount == numExpectedSegments;
},
"Compaction segment count check"
);
}
private void checkCompactionIntervals(List<String> expectedIntervals)
{
ITRetryUtil.retryUntilTrue(
() -> {
final List<String> actualIntervals = coordinator.getSegmentIntervals(fullDatasourceName);
actualIntervals.sort(null);
return actualIntervals.equals(expectedIntervals);
},
"Compaction interval check"
);
}
private void verifySegmentsCompacted(int expectedCompactedSegmentCount, Integer expectedMaxRowsPerSegment)
{
List<DataSegment> segments = coordinator.getFullSegmentsMetadata(fullDatasourceName);
List<DataSegment> foundCompactedSegments = new ArrayList<>();
for (DataSegment segment : segments) {
if (segment.getLastCompactionState() != null) {
foundCompactedSegments.add(segment);
}
}
Assert.assertEquals(foundCompactedSegments.size(), expectedCompactedSegmentCount);
for (DataSegment compactedSegment : foundCompactedSegments) {
Assert.assertNotNull(compactedSegment.getLastCompactionState());
Assert.assertNotNull(compactedSegment.getLastCompactionState().getPartitionsSpec());
Assert.assertEquals(compactedSegment.getLastCompactionState().getPartitionsSpec().getMaxRowsPerSegment(),
expectedMaxRowsPerSegment);
Assert.assertEquals(compactedSegment.getLastCompactionState().getPartitionsSpec().getType(),
SecondaryPartitionType.LINEAR
);
}
}
private void updateCompactionTaskSlot(double compactionTaskSlotRatio, int maxCompactionTaskSlots) throws Exception
{
compactionResource.updateCompactionTaskSlot(compactionTaskSlotRatio, maxCompactionTaskSlots);
// Verify that the compaction config is updated correctly.
CoordinatorCompactionConfig coordinatorCompactionConfig = compactionResource.getCoordinatorCompactionConfigs();
Assert.assertEquals(coordinatorCompactionConfig.getCompactionTaskSlotRatio(), compactionTaskSlotRatio);
Assert.assertEquals(coordinatorCompactionConfig.getMaxCompactionTaskSlots(), maxCompactionTaskSlots);
}
}

View File

@ -513,6 +513,13 @@ public class DruidCoordinator
}
}
public void runCompactSegmentsDuty()
{
final int startingLeaderCounter = coordLeaderSelector.localTerm();
DutiesRunnable compactSegmentsDuty = new DutiesRunnable(makeCompactSegmentsDuty(), startingLeaderCounter);
compactSegmentsDuty.run();
}
private void becomeLeader()
{
synchronized (lock) {
@ -611,7 +618,7 @@ public class DruidCoordinator
{
List<CoordinatorDuty> duties = new ArrayList<>();
duties.add(new LogUsedSegments());
duties.add(compactSegments);
duties.addAll(makeCompactSegmentsDuty());
duties.addAll(indexingServiceDuties);
log.debug(
@ -621,13 +628,18 @@ public class DruidCoordinator
return ImmutableList.copyOf(duties);
}
public class DutiesRunnable implements Runnable
private List<CoordinatorDuty> makeCompactSegmentsDuty()
{
return ImmutableList.of(compactSegments);
}
private class DutiesRunnable implements Runnable
{
private final long startTimeNanos = System.nanoTime();
private final List<CoordinatorDuty> duties;
private final int startingLeaderCounter;
protected DutiesRunnable(List<CoordinatorDuty> duties, final int startingLeaderCounter)
private DutiesRunnable(List<CoordinatorDuty> duties, final int startingLeaderCounter)
{
this.duties = duties;
this.startingLeaderCounter = startingLeaderCounter;

View File

@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.server.http;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import com.google.inject.Inject;
import com.sun.jersey.spi.container.ResourceFilters;
import org.apache.druid.server.coordinator.DruidCoordinator;
import org.apache.druid.server.http.security.ConfigResourceFilter;
import org.apache.druid.server.http.security.StateResourceFilter;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
@Path("/druid/coordinator/v1/compaction")
public class CompactionResource
{
private final DruidCoordinator coordinator;
@Inject
public CompactionResource(
DruidCoordinator coordinator
)
{
this.coordinator = coordinator;
}
/**
* This API is meant to only be used by Druid's integration tests.
*/
@POST
@Path("/compact")
@ResourceFilters(ConfigResourceFilter.class)
@VisibleForTesting
public Response forceTriggerCompaction()
{
coordinator.runCompactSegmentsDuty();
return Response.ok().build();
}
@GET
@Path("/progress")
@Produces(MediaType.APPLICATION_JSON)
@ResourceFilters(StateResourceFilter.class)
public Response getCompactionProgress(
@QueryParam("dataSource") String dataSource
)
{
final Long notCompactedSegmentSizeBytes = coordinator.getTotalSizeOfSegmentsAwaitingCompaction(dataSource);
if (notCompactedSegmentSizeBytes == null) {
return Response.status(Response.Status.BAD_REQUEST).entity(ImmutableMap.of("error", "unknown dataSource")).build();
} else {
return Response.ok(ImmutableMap.of("remainingSegmentSize", notCompactedSegmentSizeBytes)).build();
}
}
}

View File

@ -36,7 +36,6 @@ import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import java.util.Map;
/**
@ -161,19 +160,4 @@ public class CoordinatorResource
)
).build();
}
@GET
@Path("/remainingSegmentSizeForCompaction")
@Produces(MediaType.APPLICATION_JSON)
public Response getTotalSizeOfSegmentsAwaitingCompaction(
@QueryParam("dataSource") String dataSource
)
{
final Long notCompactedSegmentSizeBytes = coordinator.getTotalSizeOfSegmentsAwaitingCompaction(dataSource);
if (notCompactedSegmentSizeBytes == null) {
return Response.status(Status.BAD_REQUEST).entity(ImmutableMap.of("error", "unknown dataSource")).build();
} else {
return Response.ok(ImmutableMap.of("remainingSegmentSize", notCompactedSegmentSizeBytes)).build();
}
}
}

View File

@ -28,6 +28,7 @@ import org.apache.druid.server.ClientInfoResource;
import org.apache.druid.server.QueryResource;
import org.apache.druid.server.StatusResource;
import org.apache.druid.server.http.BrokerResource;
import org.apache.druid.server.http.CompactionResource;
import org.apache.druid.server.http.CoordinatorDynamicConfigsResource;
import org.apache.druid.server.http.CoordinatorResource;
import org.apache.druid.server.http.DataSourcesResource;
@ -72,7 +73,8 @@ public class SecurityResourceFilterTest extends ResourceFilterTestHelper
getRequestPathsWithAuthorizer(StatusResource.class),
getRequestPathsWithAuthorizer(SelfDiscoveryResource.class),
getRequestPathsWithAuthorizer(BrokerQueryResource.class),
getRequestPathsWithAuthorizer(RouterResource.class)
getRequestPathsWithAuthorizer(RouterResource.class),
getRequestPathsWithAuthorizer(CompactionResource.class)
)
);
}

View File

@ -72,6 +72,7 @@ import org.apache.druid.server.coordinator.LoadQueueTaskMaster;
import org.apache.druid.server.coordinator.duty.CoordinatorDuty;
import org.apache.druid.server.coordinator.duty.KillUnusedSegments;
import org.apache.druid.server.http.ClusterResource;
import org.apache.druid.server.http.CompactionResource;
import org.apache.druid.server.http.CoordinatorCompactionConfigsResource;
import org.apache.druid.server.http.CoordinatorDynamicConfigsResource;
import org.apache.druid.server.http.CoordinatorRedirectInfo;
@ -196,6 +197,7 @@ public class CliCoordinator extends ServerRunnable
.to(CoordinatorJettyServerInitializer.class);
Jerseys.addResource(binder, CoordinatorResource.class);
Jerseys.addResource(binder, CompactionResource.class);
Jerseys.addResource(binder, CoordinatorDynamicConfigsResource.class);
Jerseys.addResource(binder, CoordinatorCompactionConfigsResource.class);
Jerseys.addResource(binder, TiersResource.class);