Do not use UnmodifiableList in auto compaction (#9535)

This commit is contained in:
Jihoon Son 2020-03-19 11:43:33 -07:00 committed by GitHub
parent 68013fbc64
commit 1e667362eb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 210 additions and 95 deletions

View File

@ -231,7 +231,7 @@ public class HttpIndexingServiceClient implements IndexingServiceClient
);
if (!responseHolder.getStatus().equals(HttpResponseStatus.OK)) {
throw new ISE("Error while fetching the status of the last complete task");
throw new ISE("Error while fetching the status of tasks");
}
return jsonMapper.readValue(

View File

@ -41,6 +41,7 @@ import org.apache.druid.timeline.Partitions;
import org.apache.druid.timeline.TimelineObjectHolder;
import org.apache.druid.timeline.VersionedIntervalTimeline;
import org.apache.druid.timeline.partition.PartitionChunk;
import org.apache.druid.utils.Streams;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.joda.time.Period;
@ -240,9 +241,9 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
if (holders.isEmpty()) {
throw new NoSuchElementException();
}
return FluentIterable.from(holders.remove(holders.size() - 1).getObject())
.transform(PartitionChunk::getObject)
.toList();
return Streams.sequentialStreamFrom(holders.remove(holders.size() - 1).getObject())
.map(PartitionChunk::getObject)
.collect(Collectors.toList());
}
}

View File

@ -19,20 +19,32 @@
package org.apache.druid.server.coordinator.duty;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import org.apache.druid.client.DataSourcesSnapshot;
import org.apache.druid.client.indexing.ClientCompactionTaskQuery;
import org.apache.druid.client.indexing.ClientCompactionTaskQueryTuningConfig;
import org.apache.druid.client.indexing.IndexingServiceClient;
import org.apache.druid.client.indexing.NoopIndexingServiceClient;
import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.client.indexing.ClientTaskQuery;
import org.apache.druid.client.indexing.HttpIndexingServiceClient;
import org.apache.druid.client.indexing.IndexingWorker;
import org.apache.druid.client.indexing.IndexingWorkerInfo;
import org.apache.druid.discovery.DruidLeaderClient;
import org.apache.druid.discovery.DruidNodeDiscovery;
import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
import org.apache.druid.discovery.NodeRole;
import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.http.client.Request;
import org.apache.druid.java.util.http.client.response.StringFullResponseHolder;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.coordinator.CoordinatorCompactionConfig;
import org.apache.druid.server.coordinator.CoordinatorRuntimeParamsTestHelpers;
import org.apache.druid.server.coordinator.CoordinatorStats;
@ -45,6 +57,13 @@ import org.apache.druid.timeline.VersionedIntervalTimeline;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.apache.druid.timeline.partition.PartitionChunk;
import org.apache.druid.timeline.partition.ShardSpec;
import org.apache.druid.utils.Streams;
import org.easymock.EasyMock;
import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.codec.http.HttpResponse;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.jboss.netty.handler.codec.http.HttpVersion;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.joda.time.Period;
@ -52,102 +71,21 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.BooleanSupplier;
import java.util.function.Supplier;
import java.util.stream.Collectors;
public class CompactSegmentsTest
{
private static final String DATA_SOURCE_PREFIX = "dataSource_";
private final IndexingServiceClient indexingServiceClient = new NoopIndexingServiceClient()
{
private int compactVersionSuffix = 0;
private int idSuffix = 0;
@Override
public String compactSegments(
List<DataSegment> segments,
int compactionTaskPriority,
ClientCompactionTaskQueryTuningConfig tuningConfig,
Map<String, Object> context
)
{
Preconditions.checkArgument(segments.size() > 1);
DateTime minStart = DateTimes.MAX, maxEnd = DateTimes.MIN;
for (DataSegment segment : segments) {
if (segment.getInterval().getStart().compareTo(minStart) < 0) {
minStart = segment.getInterval().getStart();
}
if (segment.getInterval().getEnd().compareTo(maxEnd) > 0) {
maxEnd = segment.getInterval().getEnd();
}
}
Interval compactInterval = new Interval(minStart, maxEnd);
final VersionedIntervalTimeline<String, DataSegment> timeline = dataSources.get(segments.get(0).getDataSource());
segments.forEach(
segment -> timeline.remove(
segment.getInterval(),
segment.getVersion(),
segment.getShardSpec().createChunk(segment)
)
);
final String version = "newVersion_" + compactVersionSuffix++;
final long segmentSize = segments.stream().mapToLong(DataSegment::getSize).sum() / 2;
for (int i = 0; i < 2; i++) {
DataSegment compactSegment = new DataSegment(
segments.get(0).getDataSource(),
compactInterval,
version,
null,
segments.get(0).getDimensions(),
segments.get(0).getMetrics(),
new NumberedShardSpec(i, 0),
new CompactionState(
new DynamicPartitionsSpec(
tuningConfig.getMaxRowsPerSegment(),
tuningConfig.getMaxTotalRowsOr(Long.MAX_VALUE)
),
ImmutableMap.of(
"bitmap",
ImmutableMap.of("type", "concise"),
"dimensionCompression",
"lz4",
"metricCompression",
"lz4",
"longEncoding",
"longs"
)
),
1,
segmentSize
);
timeline.add(
compactInterval,
compactSegment.getVersion(),
compactSegment.getShardSpec().createChunk(compactSegment)
);
}
return "task_" + idSuffix++;
}
@Override
public List<TaskStatusPlus> getActiveTasks()
{
return Collections.emptyList();
}
@Override
public int getTotalWorkerCapacity()
{
return 10;
}
};
private Map<String, VersionedIntervalTimeline<String, DataSegment>> dataSources;
@Before
@ -156,7 +94,7 @@ public class CompactSegmentsTest
List<DataSegment> segments = new ArrayList<>();
for (int i = 0; i < 3; i++) {
final String dataSource = DATA_SOURCE_PREFIX + i;
for (int j : new int[] {0, 1, 2, 3, 7, 8}) {
for (int j : new int[]{0, 1, 2, 3, 7, 8}) {
for (int k = 0; k < 4; k++) {
segments.add(createSegment(dataSource, j, true, k));
segments.add(createSegment(dataSource, j, false, k));
@ -202,7 +140,11 @@ public class CompactSegmentsTest
@Test
public void testRun()
{
final CompactSegments compactSegments = new CompactSegments(new DefaultObjectMapper(), indexingServiceClient);
final ObjectMapper jsonMapper = new DefaultObjectMapper();
final TestDruidLeaderClient leaderClient = new TestDruidLeaderClient(jsonMapper);
leaderClient.start();
final HttpIndexingServiceClient indexingServiceClient = new HttpIndexingServiceClient(jsonMapper, leaderClient);
final CompactSegments compactSegments = new CompactSegments(jsonMapper, indexingServiceClient);
final Supplier<String> expectedVersionSupplier = new Supplier<String>()
{
@ -408,4 +350,176 @@ public class CompactSegmentsTest
}
return compactionConfigs;
}
private class TestDruidLeaderClient extends DruidLeaderClient
{
private final ObjectMapper jsonMapper;
private int compactVersionSuffix = 0;
private int idSuffix = 0;
private TestDruidLeaderClient(ObjectMapper jsonMapper)
{
super(null, new TestNodeDiscoveryProvider(), null, null);
this.jsonMapper = jsonMapper;
}
@Override
public Request makeRequest(HttpMethod httpMethod, String urlPath) throws IOException
{
return new Request(httpMethod, new URL("http", "host", 8090, urlPath));
}
@Override
public StringFullResponseHolder go(Request request) throws IOException
{
final String urlString = request.getUrl().toString();
if (urlString.contains("/druid/indexer/v1/task")) {
return handleTask(request);
} else if (urlString.contains("/druid/indexer/v1/workers")) {
return handleWorkers();
} else if (urlString.contains("/druid/indexer/v1/waitingTasks")
|| urlString.contains("/druid/indexer/v1/pendingTasks")
|| urlString.contains("/druid/indexer/v1/runningTasks")) {
return createStringFullResponseHolder(jsonMapper.writeValueAsString(Collections.emptyList()));
} else {
throw new IAE("Cannot handle request for url[%s]", request.getUrl());
}
}
private StringFullResponseHolder createStringFullResponseHolder(String content)
{
final HttpResponse httpResponse = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
final StringFullResponseHolder holder = new StringFullResponseHolder(
HttpResponseStatus.OK,
httpResponse,
StandardCharsets.UTF_8
);
holder.addChunk(content);
return holder;
}
private StringFullResponseHolder handleWorkers() throws JsonProcessingException
{
final List<IndexingWorkerInfo> workerInfos = new ArrayList<>();
// There are 10 workers available in this test
for (int i = 0; i < 10; i++) {
workerInfos.add(
new IndexingWorkerInfo(
new IndexingWorker("http", "host", "8091", 1, "version"),
0,
Collections.emptySet(),
Collections.emptyList(),
DateTimes.EPOCH,
null
)
);
}
return createStringFullResponseHolder(jsonMapper.writeValueAsString(workerInfos));
}
private StringFullResponseHolder handleTask(Request request) throws IOException
{
final ClientTaskQuery taskQuery = jsonMapper.readValue(request.getContent().array(), ClientTaskQuery.class);
if (!(taskQuery instanceof ClientCompactionTaskQuery)) {
throw new IAE("Cannot run non-compaction task");
}
final ClientCompactionTaskQuery compactionTaskQuery = (ClientCompactionTaskQuery) taskQuery;
final Interval intervalToCompact = compactionTaskQuery.getIoConfig().getInputSpec().getInterval();
final VersionedIntervalTimeline<String, DataSegment> timeline = dataSources.get(
compactionTaskQuery.getDataSource()
);
final List<DataSegment> segments = timeline.lookup(intervalToCompact)
.stream()
.flatMap(holder -> Streams.sequentialStreamFrom(holder.getObject()))
.map(PartitionChunk::getObject)
.collect(Collectors.toList());
final String taskId = compactSegments(
timeline,
segments,
compactionTaskQuery.getTuningConfig()
);
return createStringFullResponseHolder(jsonMapper.writeValueAsString(ImmutableMap.of("task", taskId)));
}
private String compactSegments(
VersionedIntervalTimeline<String, DataSegment> timeline,
List<DataSegment> segments,
ClientCompactionTaskQueryTuningConfig tuningConfig
)
{
Preconditions.checkArgument(segments.size() > 1);
DateTime minStart = DateTimes.MAX, maxEnd = DateTimes.MIN;
for (DataSegment segment : segments) {
if (segment.getInterval().getStart().compareTo(minStart) < 0) {
minStart = segment.getInterval().getStart();
}
if (segment.getInterval().getEnd().compareTo(maxEnd) > 0) {
maxEnd = segment.getInterval().getEnd();
}
}
Interval compactInterval = new Interval(minStart, maxEnd);
segments.forEach(
segment -> timeline.remove(
segment.getInterval(),
segment.getVersion(),
segment.getShardSpec().createChunk(segment)
)
);
final String version = "newVersion_" + compactVersionSuffix++;
final long segmentSize = segments.stream().mapToLong(DataSegment::getSize).sum() / 2;
for (int i = 0; i < 2; i++) {
DataSegment compactSegment = new DataSegment(
segments.get(0).getDataSource(),
compactInterval,
version,
null,
segments.get(0).getDimensions(),
segments.get(0).getMetrics(),
new NumberedShardSpec(i, 0),
new CompactionState(
new DynamicPartitionsSpec(
tuningConfig.getMaxRowsPerSegment(),
tuningConfig.getMaxTotalRowsOr(Long.MAX_VALUE)
),
ImmutableMap.of(
"bitmap",
ImmutableMap.of("type", "concise"),
"dimensionCompression",
"lz4",
"metricCompression",
"lz4",
"longEncoding",
"longs"
)
),
1,
segmentSize
);
timeline.add(
compactInterval,
compactSegment.getVersion(),
compactSegment.getShardSpec().createChunk(compactSegment)
);
}
return "task_" + idSuffix++;
}
}
private static class TestNodeDiscoveryProvider extends DruidNodeDiscoveryProvider
{
@Override
public BooleanSupplier getForNode(DruidNode node, NodeRole nodeRole)
{
throw new UnsupportedOperationException();
}
@Override
public DruidNodeDiscovery getForNodeRole(NodeRole nodeRole)
{
return EasyMock.niceMock(DruidNodeDiscovery.class);
}
}
}