maxTotalRows should be checked in DataSourceCompactionConfig before setting targetCompactionSizeBytes (#7368)

* maxTotalRows should be checked in DataSourceCompactionConfig before setting targetCompactionSizeBytes

* remove unnecessary default values

* remove flacky test

* fix build

* Add comments
This commit is contained in:
Jihoon Son 2019-03-28 20:25:10 -07:00 committed by Jonathan Wei
parent a09aa13ead
commit 62c3e89266
14 changed files with 250 additions and 66 deletions

View File

@ -64,7 +64,7 @@ public class ITHadoopIndexTest extends AbstractIndexerTest
try {
LOG.info("indexerFile name: [%s]", BATCH_TASK);
indexerSpec = getTaskAsString(BATCH_TASK);
indexerSpec = getResourceAsString(BATCH_TASK);
indexerSpec = StringUtils.replace(indexerSpec, "%%HADOOP_TEST_PATH%%", hadoopDir);
}
catch (Exception e) {

View File

@ -56,7 +56,7 @@ public class AbstractITBatchIndexTest extends AbstractIndexerTest
{
final String fullDatasourceName = dataSource + config.getExtraDatasourceNameSuffix();
final String taskSpec = StringUtils.replace(
getTaskAsString(indexTaskFilePath),
getResourceAsString(indexTaskFilePath),
"%%DATASOURCE%%",
fullDatasourceName
);
@ -98,7 +98,7 @@ public class AbstractITBatchIndexTest extends AbstractIndexerTest
final String fullReindexDatasourceName = reindexDataSource + config.getExtraDatasourceNameSuffix();
String taskSpec = StringUtils.replace(
getTaskAsString(reindexTaskFilePath),
getResourceAsString(reindexTaskFilePath),
"%%DATASOURCE%%",
fullBaseDatasourceName
);
@ -148,7 +148,7 @@ public class AbstractITBatchIndexTest extends AbstractIndexerTest
{
final String fullDatasourceName = dataSource + config.getExtraDatasourceNameSuffix();
final String taskSpec = StringUtils.replace(
getTaskAsString(indexTaskFilePath),
getResourceAsString(indexTaskFilePath),
"%%DATASOURCE%%",
fullDatasourceName
);

View File

@ -88,7 +88,7 @@ public abstract class AbstractITRealtimeIndexTaskTest extends AbstractIndexerTes
try (final Closeable closeable = unloader(fullDatasourceName)) {
// the task will run for 3 minutes and then shutdown itself
String task = setShutOffTime(
getTaskAsString(getTaskResource()),
getResourceAsString(getTaskResource()),
DateTimes.utc(System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(3))
);
task = StringUtils.replace(task, "%%DATASOURCE%%", fullDatasourceName);

View File

@ -112,7 +112,7 @@ public abstract class AbstractIndexerTest
);
}
protected String getTaskAsString(String file) throws IOException
protected String getResourceAsString(String file) throws IOException
{
final InputStream inputStream = ITRealtimeIndexTaskTest.class.getResourceAsStream(file);
try {

View File

@ -138,7 +138,7 @@ public class AbstractKafkaIndexerTest extends AbstractIndexerTest
consumerProperties.putAll(consumerConfigs);
consumerProperties.put("bootstrap.servers", config.getKafkaInternalHost());
spec = getTaskAsString(INDEXER_FILE);
spec = getResourceAsString(INDEXER_FILE);
spec = StringUtils.replace(spec, "%%DATASOURCE%%", fullDatasourceName);
spec = StringUtils.replace(spec, "%%TOPIC%%", TOPIC_NAME);
spec = StringUtils.replace(spec, "%%CONSUMER_PROPERTIES%%", jsonMapper.writeValueAsString(consumerProperties));

View File

@ -131,7 +131,7 @@ public class ITCompactionTaskTest extends AbstractIndexerTest
private void loadData() throws Exception
{
String taskSpec = getTaskAsString(INDEX_TASK);
String taskSpec = getResourceAsString(INDEX_TASK);
taskSpec = StringUtils.replace(taskSpec, "%%DATASOURCE%%", fullDatasourceName);
final String taskID = indexer.submitTask(taskSpec);
LOG.info("TaskID for loading index task %s", taskID);
@ -145,7 +145,7 @@ public class ITCompactionTaskTest extends AbstractIndexerTest
private void compactData(boolean keepSegmentGranularity) throws Exception
{
final String template = getTaskAsString(COMPACTION_TASK);
final String template = getResourceAsString(COMPACTION_TASK);
String taskSpec =
StringUtils.replace(template, "${KEEP_SEGMENT_GRANULARITY}", Boolean.toString(keepSegmentGranularity));
taskSpec = StringUtils.replace(taskSpec, "%%DATASOURCE%%", fullDatasourceName);

View File

@ -96,7 +96,7 @@ public class ITNestedQueryPushDownTest extends AbstractIndexerTest
private void loadData() throws Exception
{
String taskSpec = getTaskAsString(WIKITICKER_INDEX_TASK);
String taskSpec = getResourceAsString(WIKITICKER_INDEX_TASK);
taskSpec = StringUtils.replace(taskSpec, "%%DATASOURCE%%", fullDatasourceName);
final String taskID = indexer.submitTask(taskSpec);
LOG.info("TaskID for loading index task %s", taskID);

View File

@ -93,7 +93,7 @@ public class ITUnionQueryTest extends AbstractIndexerTest
try {
// Load 4 datasources with same dimensions
String task = setShutOffTime(
getTaskAsString(UNION_TASK_RESOURCE),
getResourceAsString(UNION_TASK_RESOURCE),
DateTimes.utc(System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(3))
);
List<String> taskIDs = new ArrayList<>();

View File

@ -99,4 +99,17 @@ public class ClientCompactQuery implements ClientQuery
{
return context;
}
@Override
public String toString()
{
return "ClientCompactQuery{" +
"dataSource='" + dataSource + '\'' +
", segments=" + segments +
", keepSegmentGranularity=" + keepSegmentGranularity +
", targetCompactionSizeBytes=" + targetCompactionSizeBytes +
", tuningConfig=" + tuningConfig +
", context=" + context +
'}';
}
}

View File

@ -29,20 +29,18 @@ import java.util.Objects;
public class ClientCompactQueryTuningConfig
{
// These default values should be synchronized with those of IndexTuningConfig
private static final int DEFAULT_MAX_ROWS_IN_MEMORY = 75_000;
private static final int DEFAULT_MAX_TOTAL_ROWS = 20_000_000;
private static final IndexSpec DEFAULT_INDEX_SPEC = new IndexSpec();
private static final int DEFAULT_MAX_PENDING_PERSISTS = 0;
private static final long DEFAULT_PUSH_TIMEOUT = 0;
@Nullable
private final Integer maxRowsPerSegment;
private final int maxRowsInMemory;
private final int maxTotalRows;
@Nullable
private final Integer maxRowsInMemory;
@Nullable
private final Integer maxTotalRows;
@Nullable
private final IndexSpec indexSpec;
private final int maxPendingPersists;
private final long pushTimeout;
@Nullable
private final Integer maxPendingPersists;
@Nullable
private final Long pushTimeout;
public static ClientCompactQueryTuningConfig from(
@Nullable UserCompactTuningConfig userCompactTuningConfig,
@ -70,11 +68,11 @@ public class ClientCompactQueryTuningConfig
)
{
this.maxRowsPerSegment = maxRowsPerSegment;
this.maxRowsInMemory = maxRowsInMemory == null ? DEFAULT_MAX_ROWS_IN_MEMORY : maxRowsInMemory;
this.maxTotalRows = maxTotalRows == null ? DEFAULT_MAX_TOTAL_ROWS : maxTotalRows;
this.indexSpec = indexSpec == null ? DEFAULT_INDEX_SPEC : indexSpec;
this.maxPendingPersists = maxPendingPersists == null ? DEFAULT_MAX_PENDING_PERSISTS : maxPendingPersists;
this.pushTimeout = pushTimeout == null ? DEFAULT_PUSH_TIMEOUT : pushTimeout;
this.maxRowsInMemory = maxRowsInMemory;
this.maxTotalRows = maxTotalRows;
this.indexSpec = indexSpec;
this.maxPendingPersists = maxPendingPersists;
this.pushTimeout = pushTimeout;
}
@JsonProperty
@ -91,31 +89,36 @@ public class ClientCompactQueryTuningConfig
}
@JsonProperty
public int getMaxRowsInMemory()
@Nullable
public Integer getMaxRowsInMemory()
{
return maxRowsInMemory;
}
@JsonProperty
public int getMaxTotalRows()
@Nullable
public Integer getMaxTotalRows()
{
return maxTotalRows;
}
@JsonProperty
@Nullable
public IndexSpec getIndexSpec()
{
return indexSpec;
}
@JsonProperty
public int getMaxPendingPersists()
@Nullable
public Integer getMaxPendingPersists()
{
return maxPendingPersists;
}
@JsonProperty
public long getPushTimeout()
@Nullable
public Long getPushTimeout()
{
return pushTimeout;
}
@ -130,25 +133,18 @@ public class ClientCompactQueryTuningConfig
return false;
}
ClientCompactQueryTuningConfig that = (ClientCompactQueryTuningConfig) o;
return maxRowsInMemory == that.maxRowsInMemory &&
maxTotalRows == that.maxTotalRows &&
maxPendingPersists == that.maxPendingPersists &&
pushTimeout == that.pushTimeout &&
Objects.equals(maxRowsPerSegment, that.maxRowsPerSegment) &&
Objects.equals(indexSpec, that.indexSpec);
return Objects.equals(maxRowsPerSegment, that.maxRowsPerSegment) &&
Objects.equals(maxRowsInMemory, that.maxRowsInMemory) &&
Objects.equals(maxTotalRows, that.maxTotalRows) &&
Objects.equals(indexSpec, that.indexSpec) &&
Objects.equals(maxPendingPersists, that.maxPendingPersists) &&
Objects.equals(pushTimeout, that.pushTimeout);
}
@Override
public int hashCode()
{
return Objects.hash(
maxRowsPerSegment,
maxRowsInMemory,
maxTotalRows,
indexSpec,
maxPendingPersists,
pushTimeout
);
return Objects.hash(maxRowsPerSegment, maxRowsInMemory, maxTotalRows, indexSpec, maxPendingPersists, pushTimeout);
}
@Override

View File

@ -22,6 +22,7 @@ package org.apache.druid.client.indexing;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.inject.Inject;
import org.apache.druid.discovery.DruidLeaderClient;
import org.apache.druid.indexer.TaskStatusPlus;
@ -71,7 +72,7 @@ public class HttpIndexingServiceClient implements IndexingServiceClient
boolean keepSegmentGranularity,
@Nullable Long targetCompactionSizeBytes,
int compactionTaskPriority,
@Nullable ClientCompactQueryTuningConfig tuningConfig,
ClientCompactQueryTuningConfig tuningConfig,
@Nullable Map<String, Object> context
)
{
@ -103,14 +104,20 @@ public class HttpIndexingServiceClient implements IndexingServiceClient
{
try {
final FullResponseHolder response = druidLeaderClient.go(
druidLeaderClient.makeRequest(
HttpMethod.POST,
"/druid/indexer/v1/task"
).setContent(MediaType.APPLICATION_JSON, jsonMapper.writeValueAsBytes(taskObject))
druidLeaderClient.makeRequest(HttpMethod.POST, "/druid/indexer/v1/task")
.setContent(MediaType.APPLICATION_JSON, jsonMapper.writeValueAsBytes(taskObject))
);
if (!response.getStatus().equals(HttpResponseStatus.OK)) {
throw new ISE("Failed to post task[%s]", taskObject);
if (!Strings.isNullOrEmpty(response.getContent())) {
throw new ISE(
"Failed to post task[%s] with error[%s].",
taskObject,
response.getContent()
);
} else {
throw new ISE("Failed to post task[%s]. Please check overlord log", taskObject);
}
}
final Map<String, Object> resultMap = jsonMapper.readValue(
@ -132,10 +139,7 @@ public class HttpIndexingServiceClient implements IndexingServiceClient
final FullResponseHolder response = druidLeaderClient.go(
druidLeaderClient.makeRequest(
HttpMethod.POST,
StringUtils.format(
"/druid/indexer/v1/task/%s/shutdown",
StringUtils.urlEncode(taskId)
)
StringUtils.format("/druid/indexer/v1/task/%s/shutdown", StringUtils.urlEncode(taskId))
)
);
@ -267,7 +271,10 @@ public class HttpIndexingServiceClient implements IndexingServiceClient
{
try {
final FullResponseHolder responseHolder = druidLeaderClient.go(
druidLeaderClient.makeRequest(HttpMethod.GET, StringUtils.format("/druid/indexer/v1/task/%s", taskId))
druidLeaderClient.makeRequest(
HttpMethod.GET,
StringUtils.format("/druid/indexer/v1/task/%s", StringUtils.urlEncode(taskId))
)
);
return jsonMapper.readValue(

View File

@ -71,10 +71,6 @@ public class DataSourceCompactionConfig
@JsonProperty("taskContext") @Nullable Map<String, Object> taskContext
)
{
Preconditions.checkArgument(
targetCompactionSizeBytes == null || maxRowsPerSegment == null,
"targetCompactionSizeBytes and maxRowsPerSegment in tuningConfig can't be used together"
);
this.dataSource = Preconditions.checkNotNull(dataSource, "dataSource");
this.keepSegmentGranularity = keepSegmentGranularity == null
? DEFAULT_KEEP_SEGMENT_GRANULARITY
@ -85,11 +81,11 @@ public class DataSourceCompactionConfig
this.inputSegmentSizeBytes = inputSegmentSizeBytes == null
? DEFAULT_INPUT_SEGMENT_SIZE_BYTES
: inputSegmentSizeBytes;
if (targetCompactionSizeBytes == null && maxRowsPerSegment == null) {
this.targetCompactionSizeBytes = DEFAULT_TARGET_COMPACTION_SIZE_BYTES;
} else {
this.targetCompactionSizeBytes = targetCompactionSizeBytes;
}
this.targetCompactionSizeBytes = getValidTargetCompactionSizeBytes(
targetCompactionSizeBytes,
maxRowsPerSegment,
tuningConfig
);
this.maxRowsPerSegment = maxRowsPerSegment;
this.maxNumSegmentsToCompact = maxNumSegmentsToCompact == null
? DEFAULT_NUM_INPUT_SEGMENTS
@ -104,6 +100,51 @@ public class DataSourceCompactionConfig
);
}
/**
* This method is copied from {@code CompactionTask#getValidTargetCompactionSizeBytes}. The only difference is this
* method doesn't check 'numShards' which is not supported by {@link UserCompactTuningConfig}.
*
* Currently, we can't use the same method here because it's in a different module. Until we figure out how to reuse
* the same method, this method must be synced with {@code CompactionTask#getValidTargetCompactionSizeBytes}.
*/
@Nullable
private static Long getValidTargetCompactionSizeBytes(
@Nullable Long targetCompactionSizeBytes,
@Nullable Integer maxRowsPerSegment,
@Nullable UserCompactTuningConfig tuningConfig
)
{
if (targetCompactionSizeBytes != null) {
Preconditions.checkArgument(
!hasPartitionConfig(maxRowsPerSegment, tuningConfig),
"targetCompactionSizeBytes[%s] cannot be used with maxRowsPerSegment[%s] and maxTotalRows[%s]",
targetCompactionSizeBytes,
maxRowsPerSegment,
tuningConfig == null ? null : tuningConfig.getMaxTotalRows()
);
return targetCompactionSizeBytes;
} else {
return hasPartitionConfig(maxRowsPerSegment, tuningConfig) ? null : DEFAULT_TARGET_COMPACTION_SIZE_BYTES;
}
}
/**
* his method is copied from {@code CompactionTask#hasPartitionConfig}. The two differences are
* 1) this method doesn't check 'numShards' which is not supported by {@link UserCompactTuningConfig}, and
* 2) this method accepts an additional 'maxRowsPerSegment' parameter since it's not supported by
* {@link UserCompactTuningConfig}.
*
* Currently, we can't use the same method here because it's in a different module. Until we figure out how to reuse
* the same method, this method must be synced with {@code CompactionTask#hasPartitionConfig}.
*/
private static boolean hasPartitionConfig(
@Nullable Integer maxRowsPerSegment,
@Nullable UserCompactTuningConfig tuningConfig
)
{
return maxRowsPerSegment != null || (tuningConfig != null && tuningConfig.getMaxTotalRows() != null);
}
@JsonProperty
public String getDataSource()
{

View File

@ -175,7 +175,6 @@ public class DruidCoordinatorSegmentCompactor implements DruidCoordinatorHelper
for (; iterator.hasNext() && numSubmittedTasks < numAvailableCompactionTaskSlots; numSubmittedTasks++) {
final List<DataSegment> segmentsToCompact = iterator.next();
final String dataSourceName = segmentsToCompact.get(0).getDataSource();
if (segmentsToCompact.size() > 1) {

View File

@ -25,7 +25,9 @@ import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig.UserCompactTuningConfig;
import org.joda.time.Period;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.io.IOException;
@ -33,6 +35,9 @@ public class DataSourceCompactionConfigTest
{
private static final ObjectMapper objectMapper = new DefaultObjectMapper();
@Rule
public final ExpectedException expectedException = ExpectedException.none();
@Test
public void testSerdeBasic() throws IOException
{
@ -56,6 +61,7 @@ public class DataSourceCompactionConfigTest
Assert.assertEquals(25, fromJson.getTaskPriority());
Assert.assertEquals(config.getInputSegmentSizeBytes(), fromJson.getInputSegmentSizeBytes());
Assert.assertEquals(config.getTargetCompactionSizeBytes(), fromJson.getTargetCompactionSizeBytes());
Assert.assertEquals(config.getMaxRowsPerSegment(), fromJson.getMaxRowsPerSegment());
Assert.assertEquals(config.getMaxNumSegmentsToCompact(), fromJson.getMaxNumSegmentsToCompact());
Assert.assertEquals(config.getSkipOffsetFromLatest(), fromJson.getSkipOffsetFromLatest());
Assert.assertEquals(config.getTuningConfig(), fromJson.getTuningConfig());
@ -85,6 +91,7 @@ public class DataSourceCompactionConfigTest
Assert.assertEquals(25, fromJson.getTaskPriority());
Assert.assertEquals(config.getInputSegmentSizeBytes(), fromJson.getInputSegmentSizeBytes());
Assert.assertNull(fromJson.getTargetCompactionSizeBytes());
Assert.assertEquals(config.getMaxRowsPerSegment(), fromJson.getMaxRowsPerSegment());
Assert.assertEquals(config.getMaxNumSegmentsToCompact(), fromJson.getMaxNumSegmentsToCompact());
Assert.assertEquals(config.getSkipOffsetFromLatest(), fromJson.getSkipOffsetFromLatest());
Assert.assertEquals(config.getTuningConfig(), fromJson.getTuningConfig());
@ -101,4 +108,125 @@ public class DataSourceCompactionConfigTest
final UserCompactTuningConfig fromJson = objectMapper.readValue(json, UserCompactTuningConfig.class);
Assert.assertEquals(config, fromJson);
}
@Test
public void testSerdeWithMaxTotalRows() throws IOException
{
final DataSourceCompactionConfig config = new DataSourceCompactionConfig(
"dataSource",
null,
null,
500L,
null,
null,
20,
new Period(3600),
new UserCompactTuningConfig(
null,
10000,
null,
null,
null
),
ImmutableMap.of("key", "val")
);
final String json = objectMapper.writeValueAsString(config);
final DataSourceCompactionConfig fromJson = objectMapper.readValue(json, DataSourceCompactionConfig.class);
Assert.assertEquals(config.getDataSource(), fromJson.getDataSource());
Assert.assertTrue(fromJson.isKeepSegmentGranularity());
Assert.assertEquals(25, fromJson.getTaskPriority());
Assert.assertEquals(config.getInputSegmentSizeBytes(), fromJson.getInputSegmentSizeBytes());
Assert.assertNull(fromJson.getTargetCompactionSizeBytes());
Assert.assertEquals(config.getMaxRowsPerSegment(), fromJson.getMaxRowsPerSegment());
Assert.assertEquals(config.getMaxNumSegmentsToCompact(), fromJson.getMaxNumSegmentsToCompact());
Assert.assertEquals(config.getSkipOffsetFromLatest(), fromJson.getSkipOffsetFromLatest());
Assert.assertEquals(config.getTuningConfig(), fromJson.getTuningConfig());
Assert.assertEquals(config.getTaskContext(), fromJson.getTaskContext());
}
@Test
public void testTargetCompactionSizeBytesWithMaxRowsPerSegment()
{
expectedException.expect(IllegalArgumentException.class);
expectedException.expectMessage(
"targetCompactionSizeBytes[10000] cannot be used with maxRowsPerSegment[1000] and maxTotalRows[null]"
);
final DataSourceCompactionConfig config = new DataSourceCompactionConfig(
"dataSource",
null,
null,
500L,
10000L,
1000,
20,
new Period(3600),
null,
ImmutableMap.of("key", "val")
);
}
@Test
public void testTargetCompactionSizeBytesWithMaxTotalRows()
{
expectedException.expect(IllegalArgumentException.class);
expectedException.expectMessage(
"targetCompactionSizeBytes[10000] cannot be used with maxRowsPerSegment[null] and maxTotalRows[10000]"
);
final DataSourceCompactionConfig config = new DataSourceCompactionConfig(
"dataSource",
null,
null,
500L,
10000L,
null,
20,
new Period(3600),
new UserCompactTuningConfig(
null,
10000,
null,
null,
null
),
ImmutableMap.of("key", "val")
);
}
@Test
public void testSerdeMaxTotalRowsWithMaxRowsPerSegment() throws IOException
{
final DataSourceCompactionConfig config = new DataSourceCompactionConfig(
"dataSource",
null,
null,
500L,
null,
10000,
20,
new Period(3600),
new UserCompactTuningConfig(
null,
10000,
null,
null,
null
),
ImmutableMap.of("key", "val")
);
final String json = objectMapper.writeValueAsString(config);
final DataSourceCompactionConfig fromJson = objectMapper.readValue(json, DataSourceCompactionConfig.class);
Assert.assertEquals(config.getDataSource(), fromJson.getDataSource());
Assert.assertTrue(fromJson.isKeepSegmentGranularity());
Assert.assertEquals(25, fromJson.getTaskPriority());
Assert.assertEquals(config.getInputSegmentSizeBytes(), fromJson.getInputSegmentSizeBytes());
Assert.assertNull(fromJson.getTargetCompactionSizeBytes());
Assert.assertEquals(config.getMaxRowsPerSegment(), fromJson.getMaxRowsPerSegment());
Assert.assertEquals(config.getMaxNumSegmentsToCompact(), fromJson.getMaxNumSegmentsToCompact());
Assert.assertEquals(config.getSkipOffsetFromLatest(), fromJson.getSkipOffsetFromLatest());
Assert.assertEquals(config.getTuningConfig(), fromJson.getTuningConfig());
Assert.assertEquals(config.getTaskContext(), fromJson.getTaskContext());
}
}