Merge pull request #1437 from metamx/limitUsedSegmentInterval

Add start and end bounds to getUsedSegmentsForInterval
This commit is contained in:
Himanshu 2015-06-22 14:11:38 -05:00
commit 34c8f4c3da
3 changed files with 378 additions and 1 deletions

View File

@ -28,6 +28,16 @@ import java.util.Set;
*/ */
public interface IndexerMetadataStorageCoordinator public interface IndexerMetadataStorageCoordinator
{ {
/**
* Get all segments which may include any data in the interval and are flagged as used.
*
* @param dataSource The datasource to query
* @param interval The interval for which all applicable and used datasources are requested. Start is inclusive, end is exclusive
*
* @return The DataSegments which include data in the requested interval. These segments may contain data outside the requested interval.
*
* @throws IOException
*/
public List<DataSegment> getUsedSegmentsForInterval(final String dataSource, final Interval interval) public List<DataSegment> getUsedSegmentsForInterval(final String dataSource, final Interval interval)
throws IOException; throws IOException;
@ -36,6 +46,7 @@ public interface IndexerMetadataStorageCoordinator
* with identifiers already in the metadata storage will not be added). * with identifiers already in the metadata storage will not be added).
* *
* @param segments set of segments to add * @param segments set of segments to add
*
* @return set of segments actually added * @return set of segments actually added
*/ */
public Set<DataSegment> announceHistoricalSegments(final Set<DataSegment> segments) throws IOException; public Set<DataSegment> announceHistoricalSegments(final Set<DataSegment> segments) throws IOException;
@ -45,5 +56,13 @@ public interface IndexerMetadataStorageCoordinator
public void deleteSegments(final Set<DataSegment> segments) throws IOException; public void deleteSegments(final Set<DataSegment> segments) throws IOException;
/**
* Get all segments which include ONLY data within the given interval and are not flagged as used.
*
* @param dataSource The datasource the segments belong to
* @param interval Filter the data segments to ones that include data in this interval exclusively. Start is inclusive, end is exclusive
*
* @return DataSegments which include ONLY data within the requested interval and are not flagged as used. Data segments NOT returned here may include data in the interval
*/
public List<DataSegment> getUnusedSegmentsForInterval(final String dataSource, final Interval interval); public List<DataSegment> getUnusedSegmentsForInterval(final String dataSource, final Interval interval);
} }

View File

@ -88,11 +88,13 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
final ResultIterator<byte[]> dbSegments = final ResultIterator<byte[]> dbSegments =
handle.createQuery( handle.createQuery(
String.format( String.format(
"SELECT payload FROM %s WHERE used = true AND dataSource = :dataSource", "SELECT payload FROM %s WHERE used = true AND dataSource = :dataSource AND start <= :end and \"end\" >= :start AND used = true",
dbTables.getSegmentsTable() dbTables.getSegmentsTable()
) )
) )
.bind("dataSource", dataSource) .bind("dataSource", dataSource)
.bind("start", interval.getStart().toString())
.bind("end", interval.getEnd().toString())
.map(ByteArrayMapper.FIRST) .map(ByteArrayMapper.FIRST)
.iterator(); .iterator();

View File

@ -0,0 +1,356 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.metadata;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.LinearShardSpec;
import org.joda.time.Interval;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.tweak.HandleCallback;
import java.io.IOException;
import java.util.Set;
public class IndexerSQLMetadataStorageCoordinatorTest
{
private final MetadataStorageTablesConfig tablesConfig = MetadataStorageTablesConfig.fromBase("test");
private final TestDerbyConnector derbyConnector = new TestDerbyConnector(
Suppliers.ofInstance(new MetadataStorageConnectorConfig()),
Suppliers.ofInstance(tablesConfig)
);
private final ObjectMapper mapper = new DefaultObjectMapper();
private final DataSegment defaultSegment = new DataSegment(
"dataSource",
Interval.parse("2015-01-01T00Z/2015-01-02T00Z"),
"version",
ImmutableMap.<String, Object>of(),
ImmutableList.of("dim1"),
ImmutableList.of("m1"),
new LinearShardSpec(0),
9,
100
);
private final Set<DataSegment> segments = ImmutableSet.of(defaultSegment);
IndexerSQLMetadataStorageCoordinator coordinator;
@Before
public void setUp()
{
mapper.registerSubtypes(LinearShardSpec.class);
derbyConnector.createTaskTables();
derbyConnector.createSegmentTable();
coordinator = new IndexerSQLMetadataStorageCoordinator(
mapper,
tablesConfig,
derbyConnector
);
}
@After
public void tearDown()
{
derbyConnector.tearDown();
}
private void unUseSegment()
{
Assert.assertEquals(
1, (int) derbyConnector.getDBI().<Integer>withHandle(
new HandleCallback<Integer>()
{
@Override
public Integer withHandle(Handle handle) throws Exception
{
return handle.createStatement(
String.format("UPDATE %s SET used = false WHERE id = :id", tablesConfig.getSegmentsTable())
)
.bind("id", defaultSegment.getIdentifier())
.execute();
}
}
)
);
}
@Test
public void testSimpleAnnounce() throws IOException
{
coordinator.announceHistoricalSegments(segments);
Assert.assertArrayEquals(
mapper.writeValueAsString(defaultSegment).getBytes("UTF-8"),
derbyConnector.lookup(
tablesConfig.getSegmentsTable(),
"id",
"payload",
defaultSegment.getIdentifier()
)
);
}
@Test
public void testSimpleUsedList() throws IOException
{
coordinator.announceHistoricalSegments(segments);
Assert.assertEquals(
segments,
ImmutableSet.copyOf(
coordinator.getUsedSegmentsForInterval(
defaultSegment.getDataSource(),
defaultSegment.getInterval()
)
)
);
}
@Test
public void testSimpleUnUsedList() throws IOException
{
coordinator.announceHistoricalSegments(segments);
unUseSegment();
Assert.assertEquals(
segments,
ImmutableSet.copyOf(
coordinator.getUnusedSegmentsForInterval(
defaultSegment.getDataSource(),
defaultSegment.getInterval()
)
)
);
}
@Test
public void testUsedOverlapLow() throws IOException
{
coordinator.announceHistoricalSegments(segments);
Set<DataSegment> actualSegments = ImmutableSet.copyOf(
coordinator.getUsedSegmentsForInterval(
defaultSegment.getDataSource(),
Interval.parse("2014-12-31T23:59:59.999Z/2015-01-01T00:00:00.001Z") // end is exclusive
)
);
Assert.assertEquals(
segments,
actualSegments
);
}
@Test
public void testUsedOverlapHigh() throws IOException
{
coordinator.announceHistoricalSegments(segments);
Assert.assertEquals(
segments,
ImmutableSet.copyOf(
coordinator.getUsedSegmentsForInterval(
defaultSegment.getDataSource(),
Interval.parse("2015-1-1T23:59:59.999Z/2015-02-01T00Z")
)
)
);
}
@Test
public void testUsedOutOfBoundsLow() throws IOException
{
coordinator.announceHistoricalSegments(segments);
Assert.assertTrue(
coordinator.getUsedSegmentsForInterval(
defaultSegment.getDataSource(),
new Interval(defaultSegment.getInterval().getStart().minus(1), defaultSegment.getInterval().getStart())
).isEmpty()
);
}
@Test
public void testUsedOutOfBoundsHigh() throws IOException
{
coordinator.announceHistoricalSegments(segments);
Assert.assertTrue(
coordinator.getUsedSegmentsForInterval(
defaultSegment.getDataSource(),
new Interval(defaultSegment.getInterval().getEnd(), defaultSegment.getInterval().getEnd().plusDays(10))
).isEmpty()
);
}
@Test
public void testUsedWithinBoundsEnd() throws IOException
{
coordinator.announceHistoricalSegments(segments);
Assert.assertEquals(
segments,
ImmutableSet.copyOf(
coordinator.getUsedSegmentsForInterval(
defaultSegment.getDataSource(),
defaultSegment.getInterval().withEnd(defaultSegment.getInterval().getEnd().minusMillis(1))
)
)
);
}
@Test
public void testUsedOverlapEnd() throws IOException
{
coordinator.announceHistoricalSegments(segments);
Assert.assertEquals(
segments,
ImmutableSet.copyOf(
coordinator.getUsedSegmentsForInterval(
defaultSegment.getDataSource(),
defaultSegment.getInterval().withEnd(defaultSegment.getInterval().getEnd().plusMillis(1))
)
)
);
}
@Test
public void testUnUsedOverlapLow() throws IOException
{
coordinator.announceHistoricalSegments(segments);
unUseSegment();
Assert.assertTrue(
coordinator.getUnusedSegmentsForInterval(
defaultSegment.getDataSource(),
new Interval(defaultSegment.getInterval().getStart().minus(1), defaultSegment.getInterval().getStart().plus(1))
).isEmpty()
);
}
@Test
public void testUnUsedUnderlapLow() throws IOException
{
coordinator.announceHistoricalSegments(segments);
unUseSegment();
Assert.assertTrue(
coordinator.getUnusedSegmentsForInterval(
defaultSegment.getDataSource(),
new Interval(defaultSegment.getInterval().getStart().plus(1), defaultSegment.getInterval().getEnd())
).isEmpty()
);
}
@Test
public void testUnUsedUnderlapHigh() throws IOException
{
coordinator.announceHistoricalSegments(segments);
unUseSegment();
Assert.assertTrue(
coordinator.getUnusedSegmentsForInterval(
defaultSegment.getDataSource(),
new Interval(defaultSegment.getInterval().getStart(), defaultSegment.getInterval().getEnd().minus(1))
).isEmpty()
);
}
@Test
public void testUnUsedOverlapHigh() throws IOException
{
coordinator.announceHistoricalSegments(segments);
unUseSegment();
Assert.assertTrue(
coordinator.getUnusedSegmentsForInterval(
defaultSegment.getDataSource(),
defaultSegment.getInterval().withStart(defaultSegment.getInterval().getEnd().minus(1))
).isEmpty()
);
}
@Test
public void testUnUsedBigOverlap() throws IOException
{
coordinator.announceHistoricalSegments(segments);
unUseSegment();
Assert.assertEquals(
segments,
ImmutableSet.copyOf(
coordinator.getUnusedSegmentsForInterval(
defaultSegment.getDataSource(),
Interval.parse("2000/2999")
)
)
);
}
@Test
public void testUnUsedLowRange() throws IOException
{
coordinator.announceHistoricalSegments(segments);
unUseSegment();
Assert.assertEquals(
segments,
ImmutableSet.copyOf(
coordinator.getUnusedSegmentsForInterval(
defaultSegment.getDataSource(),
defaultSegment.getInterval().withStart(defaultSegment.getInterval().getStart().minus(1))
)
)
);
Assert.assertEquals(
segments,
ImmutableSet.copyOf(
coordinator.getUnusedSegmentsForInterval(
defaultSegment.getDataSource(),
defaultSegment.getInterval().withStart(defaultSegment.getInterval().getStart().minusYears(1))
)
)
);
}
@Test
public void testUnUsedHighRange() throws IOException
{
coordinator.announceHistoricalSegments(segments);
unUseSegment();
Assert.assertEquals(
segments,
ImmutableSet.copyOf(
coordinator.getUnusedSegmentsForInterval(
defaultSegment.getDataSource(),
defaultSegment.getInterval().withEnd(defaultSegment.getInterval().getEnd().plus(1))
)
)
);
Assert.assertEquals(
segments,
ImmutableSet.copyOf(
coordinator.getUnusedSegmentsForInterval(
defaultSegment.getDataSource(),
defaultSegment.getInterval().withEnd(defaultSegment.getInterval().getEnd().plusYears(1))
)
)
);
}
}