When inserting segments, mark unused if already overshadowed. (#3499)

This is useful for the insert-segment-to-db tool, which would otherwise
potentially insert a lot of overshadowed segments as "used", causing
load and drop churn in the cluster.
This commit is contained in:
Gian Merlino 2016-10-10 18:10:18 -07:00 committed by Fangjin Yang
parent 6f21778364
commit ddc856214d
4 changed files with 130 additions and 47 deletions

View File

@ -38,6 +38,7 @@ public class JodaUtils
// limit intervals such that duration millis fits in a long
public static final long MAX_INSTANT = Long.MAX_VALUE / 2;
public static final long MIN_INSTANT = Long.MIN_VALUE / 2;
public static final Interval ETERNITY = new Interval(MIN_INSTANT, MAX_INSTANT);
public static ArrayList<Interval> condenseIntervals(Iterable<Interval> intervals)
{

View File

@ -21,6 +21,7 @@ package io.druid.timeline;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Ordering;
import com.google.common.collect.Sets;
import com.metamx.common.guava.Comparators;
import com.metamx.common.logger.Logger;
@ -59,8 +60,6 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
*/
public class VersionedIntervalTimeline<VersionType, ObjectType> implements TimelineLookup<VersionType, ObjectType>
{
private static final Logger log = new Logger(VersionedIntervalTimeline.class);
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
final NavigableMap<Interval, TimelineEntry> completePartitionsTimeline = new TreeMap<Interval, TimelineEntry>(
@ -80,6 +79,15 @@ public class VersionedIntervalTimeline<VersionType, ObjectType> implements Timel
this.versionComparator = versionComparator;
}
public static VersionedIntervalTimeline<String, DataSegment> forSegments(Iterable<DataSegment> segments)
{
VersionedIntervalTimeline<String, DataSegment> timeline = new VersionedIntervalTimeline<>(Ordering.natural());
for (final DataSegment segment : segments) {
timeline.add(segment.getInterval(), segment.getVersion(), segment.getShardSpec().createChunk(segment));
}
return timeline;
}
public void add(final Interval interval, VersionType version, PartitionChunk<ObjectType> object)
{
try {
@ -293,10 +301,10 @@ public class VersionedIntervalTimeline<VersionType, ObjectType> implements Timel
}
/**
*
* @param timeline
* @param key
* @param entry
*
* @return boolean flag indicating whether or not we inserted or discarded something
*/
private boolean addAtKey(

View File

@ -37,6 +37,7 @@ import com.metamx.common.ISE;
import com.metamx.common.StringUtils;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.logger.Logger;
import io.druid.common.utils.JodaUtils;
import io.druid.indexing.overlord.DataSourceMetadata;
import io.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import io.druid.indexing.overlord.SegmentPublishResult;
@ -304,6 +305,15 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
throw new IllegalArgumentException("start/end metadata pair must be either null or non-null");
}
// Find which segments are used (i.e. not overshadowed).
final Set<DataSegment> usedSegments = Sets.newHashSet();
for (TimelineObjectHolder<String, DataSegment> holder : VersionedIntervalTimeline.forSegments(segments)
.lookup(JodaUtils.ETERNITY)) {
for (PartitionChunk<DataSegment> chunk : holder.getObject()) {
usedSegments.add(chunk.getObject());
}
}
final AtomicBoolean txnFailure = new AtomicBoolean(false);
try {
@ -334,7 +344,7 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
}
for (final DataSegment segment : segments) {
if (announceHistoricalSegment(handle, segment)) {
if (announceHistoricalSegment(handle, segment, usedSegments.contains(segment))) {
inserted.add(segment);
}
}
@ -575,7 +585,11 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
*
* @return true if the segment was added, false if it already existed
*/
private boolean announceHistoricalSegment(final Handle handle, final DataSegment segment) throws IOException
private boolean announceHistoricalSegment(
final Handle handle,
final DataSegment segment,
final boolean used
) throws IOException
{
try {
if (segmentExists(handle, segment)) {
@ -600,7 +614,7 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
.bind("end", segment.getInterval().getEnd().toString())
.bind("partitioned", (segment.getShardSpec() instanceof NoneShardSpec) ? false : true)
.bind("version", segment.getVersion())
.bind("used", true)
.bind("used", used)
.bind("payload", jsonMapper.writeValueAsBytes(segment))
.execute();

View File

@ -34,11 +34,12 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.tweak.HandleCallback;
import org.skife.jdbi.v2.util.StringMapper;
import java.io.IOException;
import java.util.List;
import java.util.Set;
public class IndexerSQLMetadataStorageCoordinatorTest
@ -83,7 +84,20 @@ public class IndexerSQLMetadataStorageCoordinatorTest
100
);
private final Set<DataSegment> segments = ImmutableSet.of(defaultSegment, defaultSegment2);
// Overshadows defaultSegment, defaultSegment2
private final DataSegment defaultSegment4 = new DataSegment(
"fooDataSource",
Interval.parse("2015-01-01T00Z/2015-01-02T00Z"),
"zversion",
ImmutableMap.<String, Object>of(),
ImmutableList.of("dim1"),
ImmutableList.of("m1"),
new LinearShardSpec(0),
9,
100
);
private final Set<DataSegment> SEGMENTS = ImmutableSet.of(defaultSegment, defaultSegment2);
IndexerSQLMetadataStorageCoordinator coordinator;
private TestDerbyConnector derbyConnector;
@ -104,7 +118,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest
private void unUseSegment()
{
for (final DataSegment segment : segments) {
for (final DataSegment segment : SEGMENTS) {
Assert.assertEquals(
1, (int) derbyConnector.getDBI().<Integer>withHandle(
new HandleCallback<Integer>()
@ -125,21 +139,67 @@ public class IndexerSQLMetadataStorageCoordinatorTest
}
}
private List<String> getUsedIdentifiers()
{
final String table = derbyConnectorRule.metadataTablesConfigSupplier().get().getSegmentsTable();
return derbyConnector.retryWithHandle(
new HandleCallback<List<String>>()
{
@Override
public List<String> withHandle(Handle handle) throws Exception
{
return handle.createQuery("SELECT id FROM " + table + " WHERE used = true ORDER BY id")
.map(StringMapper.FIRST)
.list();
}
}
);
}
@Test
public void testSimpleAnnounce() throws IOException
{
coordinator.announceHistoricalSegments(segments);
coordinator.announceHistoricalSegments(SEGMENTS);
for (DataSegment segment : SEGMENTS) {
Assert.assertArrayEquals(
mapper.writeValueAsString(defaultSegment).getBytes("UTF-8"),
mapper.writeValueAsString(segment).getBytes("UTF-8"),
derbyConnector.lookup(
derbyConnectorRule.metadataTablesConfigSupplier().get().getSegmentsTable(),
"id",
"payload",
defaultSegment.getIdentifier()
segment.getIdentifier()
)
);
}
Assert.assertEquals(
ImmutableList.of(defaultSegment.getIdentifier(), defaultSegment2.getIdentifier()),
getUsedIdentifiers()
);
}
@Test
public void testOvershadowingAnnounce() throws IOException
{
final ImmutableSet<DataSegment> segments = ImmutableSet.of(defaultSegment, defaultSegment2, defaultSegment4);
coordinator.announceHistoricalSegments(segments);
for (DataSegment segment : segments) {
Assert.assertArrayEquals(
mapper.writeValueAsString(segment).getBytes("UTF-8"),
derbyConnector.lookup(
derbyConnectorRule.metadataTablesConfigSupplier().get().getSegmentsTable(),
"id",
"payload",
segment.getIdentifier()
)
);
}
Assert.assertEquals(ImmutableList.of(defaultSegment4.getIdentifier()), getUsedIdentifiers());
}
@Test
public void testTransactionalAnnounceSuccess() throws IOException
{
@ -236,9 +296,9 @@ public class IndexerSQLMetadataStorageCoordinatorTest
@Test
public void testSimpleUsedList() throws IOException
{
coordinator.announceHistoricalSegments(segments);
coordinator.announceHistoricalSegments(SEGMENTS);
Assert.assertEquals(
segments,
SEGMENTS,
ImmutableSet.copyOf(
coordinator.getUsedSegmentsForInterval(
defaultSegment.getDataSource(),
@ -251,11 +311,11 @@ public class IndexerSQLMetadataStorageCoordinatorTest
@Test
public void testMultiIntervalUsedList() throws IOException
{
coordinator.announceHistoricalSegments(segments);
coordinator.announceHistoricalSegments(SEGMENTS);
coordinator.announceHistoricalSegments(ImmutableSet.of(defaultSegment3));
Assert.assertEquals(
segments,
SEGMENTS,
ImmutableSet.copyOf(
coordinator.getUsedSegmentsForIntervals(
defaultSegment.getDataSource(),
@ -300,10 +360,10 @@ public class IndexerSQLMetadataStorageCoordinatorTest
@Test
public void testSimpleUnUsedList() throws IOException
{
coordinator.announceHistoricalSegments(segments);
coordinator.announceHistoricalSegments(SEGMENTS);
unUseSegment();
Assert.assertEquals(
segments,
SEGMENTS,
ImmutableSet.copyOf(
coordinator.getUnusedSegmentsForInterval(
defaultSegment.getDataSource(),
@ -317,7 +377,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest
@Test
public void testUsedOverlapLow() throws IOException
{
coordinator.announceHistoricalSegments(segments);
coordinator.announceHistoricalSegments(SEGMENTS);
Set<DataSegment> actualSegments = ImmutableSet.copyOf(
coordinator.getUsedSegmentsForInterval(
defaultSegment.getDataSource(),
@ -325,7 +385,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest
)
);
Assert.assertEquals(
segments,
SEGMENTS,
actualSegments
);
}
@ -334,9 +394,9 @@ public class IndexerSQLMetadataStorageCoordinatorTest
@Test
public void testUsedOverlapHigh() throws IOException
{
coordinator.announceHistoricalSegments(segments);
coordinator.announceHistoricalSegments(SEGMENTS);
Assert.assertEquals(
segments,
SEGMENTS,
ImmutableSet.copyOf(
coordinator.getUsedSegmentsForInterval(
defaultSegment.getDataSource(),
@ -349,7 +409,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest
@Test
public void testUsedOutOfBoundsLow() throws IOException
{
coordinator.announceHistoricalSegments(segments);
coordinator.announceHistoricalSegments(SEGMENTS);
Assert.assertTrue(
coordinator.getUsedSegmentsForInterval(
defaultSegment.getDataSource(),
@ -362,7 +422,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest
@Test
public void testUsedOutOfBoundsHigh() throws IOException
{
coordinator.announceHistoricalSegments(segments);
coordinator.announceHistoricalSegments(SEGMENTS);
Assert.assertTrue(
coordinator.getUsedSegmentsForInterval(
defaultSegment.getDataSource(),
@ -374,9 +434,9 @@ public class IndexerSQLMetadataStorageCoordinatorTest
@Test
public void testUsedWithinBoundsEnd() throws IOException
{
coordinator.announceHistoricalSegments(segments);
coordinator.announceHistoricalSegments(SEGMENTS);
Assert.assertEquals(
segments,
SEGMENTS,
ImmutableSet.copyOf(
coordinator.getUsedSegmentsForInterval(
defaultSegment.getDataSource(),
@ -389,9 +449,9 @@ public class IndexerSQLMetadataStorageCoordinatorTest
@Test
public void testUsedOverlapEnd() throws IOException
{
coordinator.announceHistoricalSegments(segments);
coordinator.announceHistoricalSegments(SEGMENTS);
Assert.assertEquals(
segments,
SEGMENTS,
ImmutableSet.copyOf(
coordinator.getUsedSegmentsForInterval(
defaultSegment.getDataSource(),
@ -405,7 +465,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest
@Test
public void testUnUsedOverlapLow() throws IOException
{
coordinator.announceHistoricalSegments(segments);
coordinator.announceHistoricalSegments(SEGMENTS);
unUseSegment();
Assert.assertTrue(
coordinator.getUnusedSegmentsForInterval(
@ -421,7 +481,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest
@Test
public void testUnUsedUnderlapLow() throws IOException
{
coordinator.announceHistoricalSegments(segments);
coordinator.announceHistoricalSegments(SEGMENTS);
unUseSegment();
Assert.assertTrue(
coordinator.getUnusedSegmentsForInterval(
@ -435,7 +495,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest
@Test
public void testUnUsedUnderlapHigh() throws IOException
{
coordinator.announceHistoricalSegments(segments);
coordinator.announceHistoricalSegments(SEGMENTS);
unUseSegment();
Assert.assertTrue(
coordinator.getUnusedSegmentsForInterval(
@ -448,7 +508,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest
@Test
public void testUnUsedOverlapHigh() throws IOException
{
coordinator.announceHistoricalSegments(segments);
coordinator.announceHistoricalSegments(SEGMENTS);
unUseSegment();
Assert.assertTrue(
coordinator.getUnusedSegmentsForInterval(
@ -461,10 +521,10 @@ public class IndexerSQLMetadataStorageCoordinatorTest
@Test
public void testUnUsedBigOverlap() throws IOException
{
coordinator.announceHistoricalSegments(segments);
coordinator.announceHistoricalSegments(SEGMENTS);
unUseSegment();
Assert.assertEquals(
segments,
SEGMENTS,
ImmutableSet.copyOf(
coordinator.getUnusedSegmentsForInterval(
defaultSegment.getDataSource(),
@ -477,10 +537,10 @@ public class IndexerSQLMetadataStorageCoordinatorTest
@Test
public void testUnUsedLowRange() throws IOException
{
coordinator.announceHistoricalSegments(segments);
coordinator.announceHistoricalSegments(SEGMENTS);
unUseSegment();
Assert.assertEquals(
segments,
SEGMENTS,
ImmutableSet.copyOf(
coordinator.getUnusedSegmentsForInterval(
defaultSegment.getDataSource(),
@ -489,7 +549,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest
)
);
Assert.assertEquals(
segments,
SEGMENTS,
ImmutableSet.copyOf(
coordinator.getUnusedSegmentsForInterval(
defaultSegment.getDataSource(),
@ -502,10 +562,10 @@ public class IndexerSQLMetadataStorageCoordinatorTest
@Test
public void testUnUsedHighRange() throws IOException
{
coordinator.announceHistoricalSegments(segments);
coordinator.announceHistoricalSegments(SEGMENTS);
unUseSegment();
Assert.assertEquals(
segments,
SEGMENTS,
ImmutableSet.copyOf(
coordinator.getUnusedSegmentsForInterval(
defaultSegment.getDataSource(),
@ -514,7 +574,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest
)
);
Assert.assertEquals(
segments,
SEGMENTS,
ImmutableSet.copyOf(
coordinator.getUnusedSegmentsForInterval(
defaultSegment.getDataSource(),