diff --git a/server/src/main/java/io/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java b/server/src/main/java/io/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java index bc856b220e4..4a7122d1d8f 100644 --- a/server/src/main/java/io/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java +++ b/server/src/main/java/io/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java @@ -28,6 +28,16 @@ import java.util.Set; */ public interface IndexerMetadataStorageCoordinator { + /** + * Get all segments which may include any data in the interval and are flagged as used. + * + * @param dataSource The datasource to query + * @param interval The interval for which all applicable and used datasources are requested. Start is inclusive, end is exclusive + * + * @return The DataSegments which include data in the requested interval. These segments may contain data outside the requested interval. + * + * @throws IOException + */ public List getUsedSegmentsForInterval(final String dataSource, final Interval interval) throws IOException; @@ -36,6 +46,7 @@ public interface IndexerMetadataStorageCoordinator * with identifiers already in the metadata storage will not be added). * * @param segments set of segments to add + * * @return set of segments actually added */ public Set announceHistoricalSegments(final Set segments) throws IOException; @@ -45,5 +56,13 @@ public interface IndexerMetadataStorageCoordinator public void deleteSegments(final Set segments) throws IOException; + /** + * Get all segments which include ONLY data within the given interval and are not flagged as used. + * + * @param dataSource The datasource the segments belong to + * @param interval Filter the data segments to ones that include data in this interval exclusively. Start is inclusive, end is exclusive + * + * @return DataSegments which include ONLY data within the requested interval and are not flagged as used. Data segments NOT returned here may include data in the interval + */ public List getUnusedSegmentsForInterval(final String dataSource, final Interval interval); } diff --git a/server/src/main/java/io/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/io/druid/metadata/IndexerSQLMetadataStorageCoordinator.java index df208a05093..9d3ba1f2856 100644 --- a/server/src/main/java/io/druid/metadata/IndexerSQLMetadataStorageCoordinator.java +++ b/server/src/main/java/io/druid/metadata/IndexerSQLMetadataStorageCoordinator.java @@ -88,11 +88,13 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor final ResultIterator dbSegments = handle.createQuery( String.format( - "SELECT payload FROM %s WHERE used = true AND dataSource = :dataSource", + "SELECT payload FROM %s WHERE used = true AND dataSource = :dataSource AND start <= :end and \"end\" >= :start AND used = true", dbTables.getSegmentsTable() ) ) .bind("dataSource", dataSource) + .bind("start", interval.getStart().toString()) + .bind("end", interval.getEnd().toString()) .map(ByteArrayMapper.FIRST) .iterator(); diff --git a/server/src/test/java/io/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java b/server/src/test/java/io/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java new file mode 100644 index 00000000000..1ad21c420c3 --- /dev/null +++ b/server/src/test/java/io/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java @@ -0,0 +1,356 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.metadata; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Suppliers; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import io.druid.jackson.DefaultObjectMapper; +import io.druid.timeline.DataSegment; +import io.druid.timeline.partition.LinearShardSpec; +import org.joda.time.Interval; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.skife.jdbi.v2.Handle; +import org.skife.jdbi.v2.tweak.HandleCallback; + +import java.io.IOException; +import java.util.Set; + +public class IndexerSQLMetadataStorageCoordinatorTest +{ + + private final MetadataStorageTablesConfig tablesConfig = MetadataStorageTablesConfig.fromBase("test"); + private final TestDerbyConnector derbyConnector = new TestDerbyConnector( + Suppliers.ofInstance(new MetadataStorageConnectorConfig()), + Suppliers.ofInstance(tablesConfig) + ); + private final ObjectMapper mapper = new DefaultObjectMapper(); + private final DataSegment defaultSegment = new DataSegment( + "dataSource", + Interval.parse("2015-01-01T00Z/2015-01-02T00Z"), + "version", + ImmutableMap.of(), + ImmutableList.of("dim1"), + ImmutableList.of("m1"), + new LinearShardSpec(0), + 9, + 100 + ); + private final Set segments = ImmutableSet.of(defaultSegment); + IndexerSQLMetadataStorageCoordinator coordinator; + + @Before + public void setUp() + { + mapper.registerSubtypes(LinearShardSpec.class); + derbyConnector.createTaskTables(); + derbyConnector.createSegmentTable(); + coordinator = new IndexerSQLMetadataStorageCoordinator( + mapper, + tablesConfig, + derbyConnector + ); + } + + @After + public void tearDown() + { + derbyConnector.tearDown(); + } + + private void unUseSegment() + { + Assert.assertEquals( + 1, (int) derbyConnector.getDBI().withHandle( + new HandleCallback() + { + @Override + public Integer withHandle(Handle handle) throws Exception + { + return handle.createStatement( + String.format("UPDATE %s SET used = false WHERE id = :id", tablesConfig.getSegmentsTable()) + ) + .bind("id", defaultSegment.getIdentifier()) + .execute(); + } + } + ) + ); + } + + @Test + public void testSimpleAnnounce() throws IOException + { + coordinator.announceHistoricalSegments(segments); + Assert.assertArrayEquals( + mapper.writeValueAsString(defaultSegment).getBytes("UTF-8"), + derbyConnector.lookup( + tablesConfig.getSegmentsTable(), + "id", + "payload", + defaultSegment.getIdentifier() + ) + ); + } + + @Test + public void testSimpleUsedList() throws IOException + { + coordinator.announceHistoricalSegments(segments); + Assert.assertEquals( + segments, + ImmutableSet.copyOf( + coordinator.getUsedSegmentsForInterval( + defaultSegment.getDataSource(), + defaultSegment.getInterval() + ) + ) + ); + } + + @Test + public void testSimpleUnUsedList() throws IOException + { + coordinator.announceHistoricalSegments(segments); + unUseSegment(); + Assert.assertEquals( + segments, + ImmutableSet.copyOf( + coordinator.getUnusedSegmentsForInterval( + defaultSegment.getDataSource(), + defaultSegment.getInterval() + ) + ) + ); + } + + + @Test + public void testUsedOverlapLow() throws IOException + { + coordinator.announceHistoricalSegments(segments); + Set actualSegments = ImmutableSet.copyOf( + coordinator.getUsedSegmentsForInterval( + defaultSegment.getDataSource(), + Interval.parse("2014-12-31T23:59:59.999Z/2015-01-01T00:00:00.001Z") // end is exclusive + ) + ); + Assert.assertEquals( + segments, + actualSegments + ); + } + + + @Test + public void testUsedOverlapHigh() throws IOException + { + coordinator.announceHistoricalSegments(segments); + Assert.assertEquals( + segments, + ImmutableSet.copyOf( + coordinator.getUsedSegmentsForInterval( + defaultSegment.getDataSource(), + Interval.parse("2015-1-1T23:59:59.999Z/2015-02-01T00Z") + ) + ) + ); + } + + @Test + public void testUsedOutOfBoundsLow() throws IOException + { + coordinator.announceHistoricalSegments(segments); + Assert.assertTrue( + coordinator.getUsedSegmentsForInterval( + defaultSegment.getDataSource(), + new Interval(defaultSegment.getInterval().getStart().minus(1), defaultSegment.getInterval().getStart()) + ).isEmpty() + ); + } + + + @Test + public void testUsedOutOfBoundsHigh() throws IOException + { + coordinator.announceHistoricalSegments(segments); + Assert.assertTrue( + coordinator.getUsedSegmentsForInterval( + defaultSegment.getDataSource(), + new Interval(defaultSegment.getInterval().getEnd(), defaultSegment.getInterval().getEnd().plusDays(10)) + ).isEmpty() + ); + } + + @Test + public void testUsedWithinBoundsEnd() throws IOException + { + coordinator.announceHistoricalSegments(segments); + Assert.assertEquals( + segments, + ImmutableSet.copyOf( + coordinator.getUsedSegmentsForInterval( + defaultSegment.getDataSource(), + defaultSegment.getInterval().withEnd(defaultSegment.getInterval().getEnd().minusMillis(1)) + ) + ) + ); + } + + @Test + public void testUsedOverlapEnd() throws IOException + { + coordinator.announceHistoricalSegments(segments); + Assert.assertEquals( + segments, + ImmutableSet.copyOf( + coordinator.getUsedSegmentsForInterval( + defaultSegment.getDataSource(), + defaultSegment.getInterval().withEnd(defaultSegment.getInterval().getEnd().plusMillis(1)) + ) + ) + ); + } + + + @Test + public void testUnUsedOverlapLow() throws IOException + { + coordinator.announceHistoricalSegments(segments); + unUseSegment(); + Assert.assertTrue( + coordinator.getUnusedSegmentsForInterval( + defaultSegment.getDataSource(), + new Interval(defaultSegment.getInterval().getStart().minus(1), defaultSegment.getInterval().getStart().plus(1)) + ).isEmpty() + ); + } + + @Test + public void testUnUsedUnderlapLow() throws IOException + { + coordinator.announceHistoricalSegments(segments); + unUseSegment(); + Assert.assertTrue( + coordinator.getUnusedSegmentsForInterval( + defaultSegment.getDataSource(), + new Interval(defaultSegment.getInterval().getStart().plus(1), defaultSegment.getInterval().getEnd()) + ).isEmpty() + ); + } + + + @Test + public void testUnUsedUnderlapHigh() throws IOException + { + coordinator.announceHistoricalSegments(segments); + unUseSegment(); + Assert.assertTrue( + coordinator.getUnusedSegmentsForInterval( + defaultSegment.getDataSource(), + new Interval(defaultSegment.getInterval().getStart(), defaultSegment.getInterval().getEnd().minus(1)) + ).isEmpty() + ); + } + + @Test + public void testUnUsedOverlapHigh() throws IOException + { + coordinator.announceHistoricalSegments(segments); + unUseSegment(); + Assert.assertTrue( + coordinator.getUnusedSegmentsForInterval( + defaultSegment.getDataSource(), + defaultSegment.getInterval().withStart(defaultSegment.getInterval().getEnd().minus(1)) + ).isEmpty() + ); + } + + @Test + public void testUnUsedBigOverlap() throws IOException + { + coordinator.announceHistoricalSegments(segments); + unUseSegment(); + Assert.assertEquals( + segments, + ImmutableSet.copyOf( + coordinator.getUnusedSegmentsForInterval( + defaultSegment.getDataSource(), + Interval.parse("2000/2999") + ) + ) + ); + } + + @Test + public void testUnUsedLowRange() throws IOException + { + coordinator.announceHistoricalSegments(segments); + unUseSegment(); + Assert.assertEquals( + segments, + ImmutableSet.copyOf( + coordinator.getUnusedSegmentsForInterval( + defaultSegment.getDataSource(), + defaultSegment.getInterval().withStart(defaultSegment.getInterval().getStart().minus(1)) + ) + ) + ); + Assert.assertEquals( + segments, + ImmutableSet.copyOf( + coordinator.getUnusedSegmentsForInterval( + defaultSegment.getDataSource(), + defaultSegment.getInterval().withStart(defaultSegment.getInterval().getStart().minusYears(1)) + ) + ) + ); + } + + @Test + public void testUnUsedHighRange() throws IOException + { + coordinator.announceHistoricalSegments(segments); + unUseSegment(); + Assert.assertEquals( + segments, + ImmutableSet.copyOf( + coordinator.getUnusedSegmentsForInterval( + defaultSegment.getDataSource(), + defaultSegment.getInterval().withEnd(defaultSegment.getInterval().getEnd().plus(1)) + ) + ) + ); + Assert.assertEquals( + segments, + ImmutableSet.copyOf( + coordinator.getUnusedSegmentsForInterval( + defaultSegment.getDataSource(), + defaultSegment.getInterval().withEnd(defaultSegment.getInterval().getEnd().plusYears(1)) + ) + ) + ); + } +}