Reduce interval creation cost for segment cost computation (#12670)

Changes:
- Reuse created interval in `SegmentId.getInterval()`
- Intern intervals to save on memory footprint
This commit is contained in:
AmatyaAvadhanula 2022-06-21 17:39:43 +05:30 committed by GitHub
parent a85b1d8985
commit eccdec9139
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 26 additions and 28 deletions

View File

@ -33,7 +33,6 @@ import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.timeline.partition.ShardSpec;
import org.joda.time.Chronology;
import org.joda.time.DateTime;
import org.joda.time.Interval;
@ -80,6 +79,12 @@ public final class SegmentId implements Comparable<SegmentId>
*/
private static final Interner<String> STRING_INTERNER = Interners.newWeakInterner();
/**
* Store Intervals since creating them each time before returning is an expensive operation
* To decrease the memory required for storing intervals, intern them, since the number of distinct values is "low"
*/
private static final Interner<Interval> INTERVAL_INTERNER = Interners.newWeakInterner();
private static final char DELIMITER = '_';
private static final Splitter DELIMITER_SPLITTER = Splitter.on(DELIMITER);
private static final Joiner DELIMITER_JOINER = Joiner.on(DELIMITER);
@ -258,14 +263,7 @@ public final class SegmentId implements Comparable<SegmentId>
}
private final String dataSource;
/**
* {@code intervalStartMillis}, {@link #intervalEndMillis} and {@link #intervalChronology} are the three fields of
* an {@link Interval}. Storing them directly to flatten the structure and reduce the heap space consumption.
*/
private final long intervalStartMillis;
private final long intervalEndMillis;
@Nullable
private final Chronology intervalChronology;
private final Interval interval;
private final String version;
private final int partitionNum;
@ -278,9 +276,7 @@ public final class SegmentId implements Comparable<SegmentId>
private SegmentId(String dataSource, Interval interval, String version, int partitionNum)
{
this.dataSource = STRING_INTERNER.intern(Objects.requireNonNull(dataSource));
this.intervalStartMillis = interval.getStartMillis();
this.intervalEndMillis = interval.getEndMillis();
this.intervalChronology = interval.getChronology();
this.interval = INTERVAL_INTERNER.intern(Objects.requireNonNull(interval));
// Versions are timestamp-based Strings, interning of them doesn't make sense. If this is not the case, interning
// could be conditionally allowed via a system property.
this.version = Objects.requireNonNull(version);
@ -297,9 +293,7 @@ public final class SegmentId implements Comparable<SegmentId>
hashCode = hashCode * 1000003 + version.hashCode();
hashCode = hashCode * 1000003 + dataSource.hashCode();
hashCode = hashCode * 1000003 + Long.hashCode(intervalStartMillis);
hashCode = hashCode * 1000003 + Long.hashCode(intervalEndMillis);
hashCode = hashCode * 1000003 + Objects.hashCode(intervalChronology);
hashCode = hashCode * 1000003 + interval.hashCode();
return hashCode;
}
@ -310,17 +304,17 @@ public final class SegmentId implements Comparable<SegmentId>
public DateTime getIntervalStart()
{
return new DateTime(intervalStartMillis, intervalChronology);
return new DateTime(interval.getStartMillis(), interval.getChronology());
}
public DateTime getIntervalEnd()
{
return new DateTime(intervalEndMillis, intervalChronology);
return new DateTime(interval.getEndMillis(), interval.getChronology());
}
public Interval getInterval()
{
return new Interval(intervalStartMillis, intervalEndMillis, intervalChronology);
return interval;
}
public String getVersion()
@ -340,7 +334,7 @@ public final class SegmentId implements Comparable<SegmentId>
public SegmentDescriptor toDescriptor()
{
return new SegmentDescriptor(Intervals.utc(intervalStartMillis, intervalEndMillis), version, partitionNum);
return new SegmentDescriptor(Intervals.utc(interval.getStartMillis(), interval.getEndMillis()), version, partitionNum);
}
@Override
@ -357,9 +351,7 @@ public final class SegmentId implements Comparable<SegmentId>
// are equal as well as all other fields used to compute them, the partitionNums are also guaranteed to be equal.
return hashCode == that.hashCode &&
dataSource.equals(that.dataSource) &&
intervalStartMillis == that.intervalStartMillis &&
intervalEndMillis == that.intervalEndMillis &&
Objects.equals(intervalChronology, that.intervalChronology) &&
interval.equals(that.interval) &&
version.equals(that.version);
}
@ -376,11 +368,11 @@ public final class SegmentId implements Comparable<SegmentId>
if (result != 0) {
return result;
}
result = Long.compare(intervalStartMillis, o.intervalStartMillis);
result = Long.compare(interval.getStartMillis(), o.interval.getStartMillis());
if (result != 0) {
return result;
}
result = Long.compare(intervalEndMillis, o.intervalEndMillis);
result = Long.compare(interval.getEndMillis(), o.interval.getEndMillis());
if (result != 0) {
return result;
}

View File

@ -60,10 +60,10 @@ import org.junit.rules.ExpectedException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@ -355,7 +355,7 @@ public class StreamAppenderatorDriverFailTest extends EasyMockSupport
private static class FailableAppenderator implements Appenderator
{
private final Map<SegmentIdWithShardSpec, List<InputRow>> rows = new HashMap<>();
private final Map<SegmentIdWithShardSpec, List<InputRow>> rows = new TreeMap<>();
private boolean dropEnabled = true;
private boolean persistEnabled = true;

View File

@ -57,8 +57,10 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ScheduledExecutorService;
@ -501,9 +503,13 @@ public class SegmentLoadDropHandlerTest
ListenableFuture<List<SegmentLoadDropHandler.DataSegmentChangeRequestAndStatus>> future = segmentLoadDropHandler
.processBatch(batch);
Map<DataSegmentChangeRequest, SegmentLoadDropHandler.Status> expectedStatusMap = new HashMap<>();
expectedStatusMap.put(batch.get(0), SegmentLoadDropHandler.Status.PENDING);
expectedStatusMap.put(batch.get(1), SegmentLoadDropHandler.Status.SUCCESS);
List<SegmentLoadDropHandler.DataSegmentChangeRequestAndStatus> result = future.get();
Assert.assertEquals(SegmentLoadDropHandler.Status.PENDING, result.get(0).getStatus());
Assert.assertEquals(SegmentLoadDropHandler.Status.SUCCESS, result.get(1).getStatus());
for (SegmentLoadDropHandler.DataSegmentChangeRequestAndStatus requestAndStatus : result) {
Assert.assertEquals(expectedStatusMap.get(requestAndStatus.getRequest()), requestAndStatus.getStatus());
}
for (Runnable runnable : scheduledRunnable) {
runnable.run();