From ddc856214d0b1876759fe6eed71543f5156f6a64 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Mon, 10 Oct 2016 18:10:18 -0700 Subject: [PATCH] When inserting segments, mark unused if already overshadowed. (#3499) This is useful for the insert-segment-to-db tool, which would otherwise potentially insert a lot of overshadowed segments as "used", causing load and drop churn in the cluster. --- .../java/io/druid/common/utils/JodaUtils.java | 1 + .../timeline/VersionedIntervalTimeline.java | 16 +- .../IndexerSQLMetadataStorageCoordinator.java | 20 ++- ...exerSQLMetadataStorageCoordinatorTest.java | 140 +++++++++++++----- 4 files changed, 130 insertions(+), 47 deletions(-) diff --git a/common/src/main/java/io/druid/common/utils/JodaUtils.java b/common/src/main/java/io/druid/common/utils/JodaUtils.java index d32fd6bf9a8..3874f145989 100644 --- a/common/src/main/java/io/druid/common/utils/JodaUtils.java +++ b/common/src/main/java/io/druid/common/utils/JodaUtils.java @@ -38,6 +38,7 @@ public class JodaUtils // limit intervals such that duration millis fits in a long public static final long MAX_INSTANT = Long.MAX_VALUE / 2; public static final long MIN_INSTANT = Long.MIN_VALUE / 2; + public static final Interval ETERNITY = new Interval(MIN_INSTANT, MAX_INSTANT); public static ArrayList condenseIntervals(Iterable intervals) { diff --git a/common/src/main/java/io/druid/timeline/VersionedIntervalTimeline.java b/common/src/main/java/io/druid/timeline/VersionedIntervalTimeline.java index 743492fc1bd..cb6dcecc825 100644 --- a/common/src/main/java/io/druid/timeline/VersionedIntervalTimeline.java +++ b/common/src/main/java/io/druid/timeline/VersionedIntervalTimeline.java @@ -21,6 +21,7 @@ package io.druid.timeline; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.common.collect.Ordering; import com.google.common.collect.Sets; import com.metamx.common.guava.Comparators; import com.metamx.common.logger.Logger; @@ -59,8 +60,6 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; */ public class VersionedIntervalTimeline implements TimelineLookup { - private static final Logger log = new Logger(VersionedIntervalTimeline.class); - private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true); final NavigableMap completePartitionsTimeline = new TreeMap( @@ -80,6 +79,15 @@ public class VersionedIntervalTimeline implements Timel this.versionComparator = versionComparator; } + public static VersionedIntervalTimeline forSegments(Iterable segments) + { + VersionedIntervalTimeline timeline = new VersionedIntervalTimeline<>(Ordering.natural()); + for (final DataSegment segment : segments) { + timeline.add(segment.getInterval(), segment.getVersion(), segment.getShardSpec().createChunk(segment)); + } + return timeline; + } + public void add(final Interval interval, VersionType version, PartitionChunk object) { try { @@ -179,7 +187,7 @@ public class VersionedIntervalTimeline implements Timel * @param interval interval to find objects for * * @return Holders representing the interval that the objects exist for, PartitionHolders - * are guaranteed to be complete + * are guaranteed to be complete */ public List> lookup(Interval interval) { @@ -293,10 +301,10 @@ public class VersionedIntervalTimeline implements Timel } /** - * * @param timeline * @param key * @param entry + * * @return boolean flag indicating whether or not we inserted or discarded something */ private boolean addAtKey( diff --git a/server/src/main/java/io/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/io/druid/metadata/IndexerSQLMetadataStorageCoordinator.java index 6f4de4b2a33..fd8d291d75b 100644 --- a/server/src/main/java/io/druid/metadata/IndexerSQLMetadataStorageCoordinator.java +++ b/server/src/main/java/io/druid/metadata/IndexerSQLMetadataStorageCoordinator.java @@ -37,6 +37,7 @@ import com.metamx.common.ISE; import com.metamx.common.StringUtils; import com.metamx.common.lifecycle.LifecycleStart; import com.metamx.common.logger.Logger; +import io.druid.common.utils.JodaUtils; import io.druid.indexing.overlord.DataSourceMetadata; import io.druid.indexing.overlord.IndexerMetadataStorageCoordinator; import io.druid.indexing.overlord.SegmentPublishResult; @@ -304,6 +305,15 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor throw new IllegalArgumentException("start/end metadata pair must be either null or non-null"); } + // Find which segments are used (i.e. not overshadowed). + final Set usedSegments = Sets.newHashSet(); + for (TimelineObjectHolder holder : VersionedIntervalTimeline.forSegments(segments) + .lookup(JodaUtils.ETERNITY)) { + for (PartitionChunk chunk : holder.getObject()) { + usedSegments.add(chunk.getObject()); + } + } + final AtomicBoolean txnFailure = new AtomicBoolean(false); try { @@ -334,7 +344,7 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor } for (final DataSegment segment : segments) { - if (announceHistoricalSegment(handle, segment)) { + if (announceHistoricalSegment(handle, segment, usedSegments.contains(segment))) { inserted.add(segment); } } @@ -575,7 +585,11 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor * * @return true if the segment was added, false if it already existed */ - private boolean announceHistoricalSegment(final Handle handle, final DataSegment segment) throws IOException + private boolean announceHistoricalSegment( + final Handle handle, + final DataSegment segment, + final boolean used + ) throws IOException { try { if (segmentExists(handle, segment)) { @@ -600,7 +614,7 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor .bind("end", segment.getInterval().getEnd().toString()) .bind("partitioned", (segment.getShardSpec() instanceof NoneShardSpec) ? false : true) .bind("version", segment.getVersion()) - .bind("used", true) + .bind("used", used) .bind("payload", jsonMapper.writeValueAsBytes(segment)) .execute(); diff --git a/server/src/test/java/io/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java b/server/src/test/java/io/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java index 2b53428902e..5f4b0da759a 100644 --- a/server/src/test/java/io/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java +++ b/server/src/test/java/io/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java @@ -34,11 +34,12 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Rule; import org.junit.Test; -import org.junit.rules.ExpectedException; import org.skife.jdbi.v2.Handle; import org.skife.jdbi.v2.tweak.HandleCallback; +import org.skife.jdbi.v2.util.StringMapper; import java.io.IOException; +import java.util.List; import java.util.Set; public class IndexerSQLMetadataStorageCoordinatorTest @@ -83,7 +84,20 @@ public class IndexerSQLMetadataStorageCoordinatorTest 100 ); - private final Set segments = ImmutableSet.of(defaultSegment, defaultSegment2); + // Overshadows defaultSegment, defaultSegment2 + private final DataSegment defaultSegment4 = new DataSegment( + "fooDataSource", + Interval.parse("2015-01-01T00Z/2015-01-02T00Z"), + "zversion", + ImmutableMap.of(), + ImmutableList.of("dim1"), + ImmutableList.of("m1"), + new LinearShardSpec(0), + 9, + 100 + ); + + private final Set SEGMENTS = ImmutableSet.of(defaultSegment, defaultSegment2); IndexerSQLMetadataStorageCoordinator coordinator; private TestDerbyConnector derbyConnector; @@ -104,7 +118,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest private void unUseSegment() { - for (final DataSegment segment : segments) { + for (final DataSegment segment : SEGMENTS) { Assert.assertEquals( 1, (int) derbyConnector.getDBI().withHandle( new HandleCallback() @@ -125,21 +139,67 @@ public class IndexerSQLMetadataStorageCoordinatorTest } } + private List getUsedIdentifiers() + { + final String table = derbyConnectorRule.metadataTablesConfigSupplier().get().getSegmentsTable(); + return derbyConnector.retryWithHandle( + new HandleCallback>() + { + @Override + public List withHandle(Handle handle) throws Exception + { + return handle.createQuery("SELECT id FROM " + table + " WHERE used = true ORDER BY id") + .map(StringMapper.FIRST) + .list(); + } + } + ); + } + @Test public void testSimpleAnnounce() throws IOException { - coordinator.announceHistoricalSegments(segments); - Assert.assertArrayEquals( - mapper.writeValueAsString(defaultSegment).getBytes("UTF-8"), - derbyConnector.lookup( - derbyConnectorRule.metadataTablesConfigSupplier().get().getSegmentsTable(), - "id", - "payload", - defaultSegment.getIdentifier() - ) + coordinator.announceHistoricalSegments(SEGMENTS); + for (DataSegment segment : SEGMENTS) { + Assert.assertArrayEquals( + mapper.writeValueAsString(segment).getBytes("UTF-8"), + derbyConnector.lookup( + derbyConnectorRule.metadataTablesConfigSupplier().get().getSegmentsTable(), + "id", + "payload", + segment.getIdentifier() + ) + ); + } + + Assert.assertEquals( + ImmutableList.of(defaultSegment.getIdentifier(), defaultSegment2.getIdentifier()), + getUsedIdentifiers() ); } + @Test + public void testOvershadowingAnnounce() throws IOException + { + final ImmutableSet segments = ImmutableSet.of(defaultSegment, defaultSegment2, defaultSegment4); + + coordinator.announceHistoricalSegments(segments); + + for (DataSegment segment : segments) { + Assert.assertArrayEquals( + mapper.writeValueAsString(segment).getBytes("UTF-8"), + derbyConnector.lookup( + derbyConnectorRule.metadataTablesConfigSupplier().get().getSegmentsTable(), + "id", + "payload", + segment.getIdentifier() + ) + ); + } + + Assert.assertEquals(ImmutableList.of(defaultSegment4.getIdentifier()), getUsedIdentifiers()); + } + @Test public void testTransactionalAnnounceSuccess() throws IOException { @@ -236,9 +296,9 @@ public class IndexerSQLMetadataStorageCoordinatorTest @Test public void testSimpleUsedList() throws IOException { - coordinator.announceHistoricalSegments(segments); + coordinator.announceHistoricalSegments(SEGMENTS); Assert.assertEquals( - segments, + SEGMENTS, ImmutableSet.copyOf( coordinator.getUsedSegmentsForInterval( defaultSegment.getDataSource(), @@ -251,11 +311,11 @@ public class IndexerSQLMetadataStorageCoordinatorTest @Test public void testMultiIntervalUsedList() throws IOException { - coordinator.announceHistoricalSegments(segments); + coordinator.announceHistoricalSegments(SEGMENTS); coordinator.announceHistoricalSegments(ImmutableSet.of(defaultSegment3)); Assert.assertEquals( - segments, + SEGMENTS, ImmutableSet.copyOf( coordinator.getUsedSegmentsForIntervals( defaultSegment.getDataSource(), @@ -300,10 +360,10 @@ public class IndexerSQLMetadataStorageCoordinatorTest @Test public void testSimpleUnUsedList() throws IOException { - coordinator.announceHistoricalSegments(segments); + coordinator.announceHistoricalSegments(SEGMENTS); unUseSegment(); Assert.assertEquals( - segments, + SEGMENTS, ImmutableSet.copyOf( coordinator.getUnusedSegmentsForInterval( defaultSegment.getDataSource(), @@ -317,7 +377,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest @Test public void testUsedOverlapLow() throws IOException { - coordinator.announceHistoricalSegments(segments); + coordinator.announceHistoricalSegments(SEGMENTS); Set actualSegments = ImmutableSet.copyOf( coordinator.getUsedSegmentsForInterval( defaultSegment.getDataSource(), @@ -325,7 +385,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest ) ); Assert.assertEquals( - segments, + SEGMENTS, actualSegments ); } @@ -334,9 +394,9 @@ public class IndexerSQLMetadataStorageCoordinatorTest @Test public void testUsedOverlapHigh() throws IOException { - coordinator.announceHistoricalSegments(segments); + coordinator.announceHistoricalSegments(SEGMENTS); Assert.assertEquals( - segments, + SEGMENTS, ImmutableSet.copyOf( coordinator.getUsedSegmentsForInterval( defaultSegment.getDataSource(), @@ -349,7 +409,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest @Test public void testUsedOutOfBoundsLow() throws IOException { - coordinator.announceHistoricalSegments(segments); + coordinator.announceHistoricalSegments(SEGMENTS); Assert.assertTrue( coordinator.getUsedSegmentsForInterval( defaultSegment.getDataSource(), @@ -362,7 +422,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest @Test public void testUsedOutOfBoundsHigh() throws IOException { - coordinator.announceHistoricalSegments(segments); + coordinator.announceHistoricalSegments(SEGMENTS); Assert.assertTrue( coordinator.getUsedSegmentsForInterval( defaultSegment.getDataSource(), @@ -374,9 +434,9 @@ public class IndexerSQLMetadataStorageCoordinatorTest @Test public void testUsedWithinBoundsEnd() throws IOException { - coordinator.announceHistoricalSegments(segments); + coordinator.announceHistoricalSegments(SEGMENTS); Assert.assertEquals( - segments, + SEGMENTS, ImmutableSet.copyOf( coordinator.getUsedSegmentsForInterval( defaultSegment.getDataSource(), @@ -389,9 +449,9 @@ public class IndexerSQLMetadataStorageCoordinatorTest @Test public void testUsedOverlapEnd() throws IOException { - coordinator.announceHistoricalSegments(segments); + coordinator.announceHistoricalSegments(SEGMENTS); Assert.assertEquals( - segments, + SEGMENTS, ImmutableSet.copyOf( coordinator.getUsedSegmentsForInterval( defaultSegment.getDataSource(), @@ -405,7 +465,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest @Test public void testUnUsedOverlapLow() throws IOException { - coordinator.announceHistoricalSegments(segments); + coordinator.announceHistoricalSegments(SEGMENTS); unUseSegment(); Assert.assertTrue( coordinator.getUnusedSegmentsForInterval( @@ -421,7 +481,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest @Test public void testUnUsedUnderlapLow() throws IOException { - coordinator.announceHistoricalSegments(segments); + coordinator.announceHistoricalSegments(SEGMENTS); unUseSegment(); Assert.assertTrue( coordinator.getUnusedSegmentsForInterval( @@ -435,7 +495,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest @Test public void testUnUsedUnderlapHigh() throws IOException { - coordinator.announceHistoricalSegments(segments); + coordinator.announceHistoricalSegments(SEGMENTS); unUseSegment(); Assert.assertTrue( coordinator.getUnusedSegmentsForInterval( @@ -448,7 +508,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest @Test public void testUnUsedOverlapHigh() throws IOException { - coordinator.announceHistoricalSegments(segments); + coordinator.announceHistoricalSegments(SEGMENTS); unUseSegment(); Assert.assertTrue( coordinator.getUnusedSegmentsForInterval( @@ -461,10 +521,10 @@ public class IndexerSQLMetadataStorageCoordinatorTest @Test public void testUnUsedBigOverlap() throws IOException { - coordinator.announceHistoricalSegments(segments); + coordinator.announceHistoricalSegments(SEGMENTS); unUseSegment(); Assert.assertEquals( - segments, + SEGMENTS, ImmutableSet.copyOf( coordinator.getUnusedSegmentsForInterval( defaultSegment.getDataSource(), @@ -477,10 +537,10 @@ public class IndexerSQLMetadataStorageCoordinatorTest @Test public void testUnUsedLowRange() throws IOException { - coordinator.announceHistoricalSegments(segments); + coordinator.announceHistoricalSegments(SEGMENTS); unUseSegment(); Assert.assertEquals( - segments, + SEGMENTS, ImmutableSet.copyOf( coordinator.getUnusedSegmentsForInterval( defaultSegment.getDataSource(), @@ -489,7 +549,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest ) ); Assert.assertEquals( - segments, + SEGMENTS, ImmutableSet.copyOf( coordinator.getUnusedSegmentsForInterval( defaultSegment.getDataSource(), @@ -502,10 +562,10 @@ public class IndexerSQLMetadataStorageCoordinatorTest @Test public void testUnUsedHighRange() throws IOException { - coordinator.announceHistoricalSegments(segments); + coordinator.announceHistoricalSegments(SEGMENTS); unUseSegment(); Assert.assertEquals( - segments, + SEGMENTS, ImmutableSet.copyOf( coordinator.getUnusedSegmentsForInterval( defaultSegment.getDataSource(), @@ -514,7 +574,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest ) ); Assert.assertEquals( - segments, + SEGMENTS, ImmutableSet.copyOf( coordinator.getUnusedSegmentsForInterval( defaultSegment.getDataSource(),