Add reload by interval API (#7490)

* Add reload by interval API
Implements the reload proposal of #7439
Added tests and updated docs

* PR updates

* Only build timeline with required segments
Use 404 with message when a segmentId is not found
Fix typo in doc
Return number of segments modified.

* Fix checkstyle errors

* Replace String.format with StringUtils.format

* Remove return value

* Expand timeline to segments that overlap for intervals
Restrict update call to only segments that need updating.

* Only add overlapping enabled segments to the timeline

* Some renames for clarity
Added comments

* Don't rely on cached poll data
Only fetch required information from DB

* Match error style

* Merge and cleanup doc

* Fix String.format call

* Add unit tests

* Fix unit tests that check for overshadowing
This commit is contained in:
Adam Peck 2019-04-26 17:01:50 -06:00 committed by Jonathan Wei
parent 09b7700d13
commit ebdf07b69f
7 changed files with 1042 additions and 76 deletions

View File

@ -235,10 +235,18 @@ Enables all segments of datasource which are not overshadowed by others.
Enables a segment of a datasource. Enables a segment of a datasource.
* `/druid/coordinator/v1/datasources/{dataSourceName}/markUsed`
* `/druid/coordinator/v1/datasources/{dataSourceName}/markUnused` * `/druid/coordinator/v1/datasources/{dataSourceName}/markUnused`
Marks segments unused for a datasource by interval or set of segment Ids. The request payload contains the interval or set of segment Ids to be marked unused. Marks segments (un)used for a datasource by interval or set of segment Ids.
Either interval or segment ids should be provided, if both or none are provided in the payload , the API would throw an error (400 BAD REQUEST).Interval specifies the start and end times as IS0 8601 strings. `interval=(start/end)` where start and end both are inclusive and only the segments completely contained within the specified interval will be disabled, partially overlapping segments will not be affected.
When marking used only segments that are not overshadowed will be updated.
The request payload contains the interval or set of segment Ids to be marked unused.
Either interval or segment ids should be provided, if both or none are provided in the payload, the API would throw an error (400 BAD REQUEST).
Interval specifies the start and end times as IS0 8601 strings. `interval=(start/end)` where start and end both are inclusive and only the segments completely contained within the specified interval will be disabled, partially overlapping segments will not be affected.
JSON Request Payload: JSON Request Payload:
@ -247,7 +255,6 @@ JSON Request Payload:
|`interval`|The interval for which to mark segments unused|"2015-09-12T03:00:00.000Z/2015-09-12T05:00:00.000Z"| |`interval`|The interval for which to mark segments unused|"2015-09-12T03:00:00.000Z/2015-09-12T05:00:00.000Z"|
|`segmentIds`|Set of segment Ids to be marked unused|["segmentId1", "segmentId2"]| |`segmentIds`|Set of segment Ids to be marked unused|["segmentId1", "segmentId2"]|
##### DELETE<a name="coordinator-delete"></a> ##### DELETE<a name="coordinator-delete"></a>
* `/druid/coordinator/v1/datasources/{dataSourceName}` * `/druid/coordinator/v1/datasources/{dataSourceName}`

View File

@ -37,10 +37,23 @@ public interface MetadataSegmentManager
void stop(); void stop();
/**
* Enables all segments for a dataSource which will not be overshadowed.
*/
boolean enableDataSource(String dataSource); boolean enableDataSource(String dataSource);
boolean enableSegment(String segmentId); boolean enableSegment(String segmentId);
/**
* Enables all segments contained in the interval which are not overshadowed by any currently enabled segments.
*/
int enableSegments(String dataSource, Interval interval);
/**
* Enables the segments passed which are not overshadowed by any currently enabled segments.
*/
int enableSegments(String dataSource, Collection<String> segmentIds);
boolean removeDataSource(String dataSource); boolean removeDataSource(String dataSource);
/** /**

View File

@ -22,7 +22,6 @@ package org.apache.druid.metadata;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Supplier; import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.inject.Inject; import com.google.inject.Inject;
import org.apache.druid.client.DruidDataSource; import org.apache.druid.client.DruidDataSource;
@ -30,7 +29,9 @@ import org.apache.druid.client.ImmutableDruidDataSource;
import org.apache.druid.guice.ManageLifecycle; import org.apache.druid.guice.ManageLifecycle;
import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.JodaUtils;
import org.apache.druid.java.util.common.MapUtils; import org.apache.druid.java.util.common.MapUtils;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart; import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
@ -38,9 +39,7 @@ import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId; import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.TimelineObjectHolder;
import org.apache.druid.timeline.VersionedIntervalTimeline; import org.apache.druid.timeline.VersionedIntervalTimeline;
import org.apache.druid.timeline.partition.PartitionChunk;
import org.joda.time.Duration; import org.joda.time.Duration;
import org.joda.time.Interval; import org.joda.time.Interval;
import org.skife.jdbi.v2.BaseResultSetMapper; import org.skife.jdbi.v2.BaseResultSetMapper;
@ -48,13 +47,11 @@ import org.skife.jdbi.v2.Batch;
import org.skife.jdbi.v2.FoldController; import org.skife.jdbi.v2.FoldController;
import org.skife.jdbi.v2.Folder3; import org.skife.jdbi.v2.Folder3;
import org.skife.jdbi.v2.Handle; import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.IDBI;
import org.skife.jdbi.v2.StatementContext; import org.skife.jdbi.v2.StatementContext;
import org.skife.jdbi.v2.TransactionCallback; import org.skife.jdbi.v2.TransactionCallback;
import org.skife.jdbi.v2.TransactionStatus; import org.skife.jdbi.v2.TransactionStatus;
import org.skife.jdbi.v2.tweak.HandleCallback; import org.skife.jdbi.v2.tweak.HandleCallback;
import org.skife.jdbi.v2.tweak.ResultSetMapper; import org.skife.jdbi.v2.tweak.ResultSetMapper;
import org.skife.jdbi.v2.util.ByteArrayMapper;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.io.IOException; import java.io.IOException;
@ -220,83 +217,200 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager
} }
} }
@Override private Pair<DataSegment, Boolean> usedPayloadMapper(
public boolean enableDataSource(final String dataSource) final int index,
final ResultSet resultSet,
final StatementContext context
) throws SQLException
{ {
try { try {
final IDBI dbi = connector.getDBI(); return new Pair<>(
VersionedIntervalTimeline<String, DataSegment> segmentTimeline = connector.inReadOnlyTransaction( jsonMapper.readValue(resultSet.getBytes("payload"), DataSegment.class),
(handle, status) -> VersionedIntervalTimeline.forSegments( resultSet.getBoolean("used")
Iterators.transform( );
handle
.createQuery(
StringUtils.format(
"SELECT payload FROM %s WHERE dataSource = :dataSource",
getSegmentsTable()
)
)
.setFetchSize(connector.getStreamingFetchSize())
.bind("dataSource", dataSource)
.map(ByteArrayMapper.FIRST)
.iterator(),
payload -> {
try {
return jsonMapper.readValue(payload, DataSegment.class);
} }
catch (IOException e) { catch (IOException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
} }
/**
* Gets a list of all datasegments that overlap the provided interval along with thier used status.
*/
private List<Pair<DataSegment, Boolean>> getDataSegmentsOverlappingInterval(
final String dataSource,
final Interval interval
) )
)
);
final List<DataSegment> segments = new ArrayList<>();
List<TimelineObjectHolder<String, DataSegment>> timelineObjectHolders = segmentTimeline.lookup(
Intervals.of("0000-01-01/3000-01-01")
);
for (TimelineObjectHolder<String, DataSegment> objectHolder : timelineObjectHolders) {
for (PartitionChunk<DataSegment> partitionChunk : objectHolder.getObject()) {
segments.add(partitionChunk.getObject());
}
}
if (segments.isEmpty()) {
log.warn("No segments found in the database!");
return false;
}
dbi.withHandle(
new HandleCallback<Void>()
{ {
@Override return connector.inReadOnlyTransaction(
public Void withHandle(Handle handle) (handle, status) -> handle.createQuery(
{
Batch batch = handle.createBatch();
for (DataSegment segment : segments) {
batch.add(
StringUtils.format( StringUtils.format(
"UPDATE %s SET used=true WHERE id = '%s'", "SELECT used, payload FROM %1$s WHERE dataSource = :dataSource AND start < :end AND %2$send%2$s > :start",
getSegmentsTable(), getSegmentsTable(),
segment.getId() connector.getQuoteString()
) )
)
.setFetchSize(connector.getStreamingFetchSize())
.bind("dataSource", dataSource)
.bind("start", interval.getStart().toString())
.bind("end", interval.getEnd().toString())
.map(this::usedPayloadMapper)
.list()
); );
} }
batch.execute();
return null; private List<Pair<DataSegment, Boolean>> getDataSegments(
final String dataSource,
final Collection<String> segmentIds,
final Handle handle
)
{
return segmentIds.stream().map(
segmentId -> Optional.ofNullable(
handle.createQuery(
StringUtils.format(
"SELECT used, payload FROM %1$s WHERE dataSource = :dataSource AND id = :id",
getSegmentsTable()
)
)
.bind("dataSource", dataSource)
.bind("id", segmentId)
.map(this::usedPayloadMapper)
.first()
)
.orElseThrow(() -> new UnknownSegmentIdException(StringUtils.format("Cannot find segment id [%s]", segmentId)))
)
.collect(Collectors.toList());
} }
/**
* Builds a VersionedIntervalTimeline containing used segments that overlap the intervals passed.
*/
private VersionedIntervalTimeline<String, DataSegment> buildVersionedIntervalTimeline(
final String dataSource,
final Collection<Interval> intervals,
final Handle handle
)
{
return VersionedIntervalTimeline.forSegments(intervals
.stream()
.flatMap(interval -> handle.createQuery(
StringUtils.format(
"SELECT payload FROM %1$s WHERE dataSource = :dataSource AND start < :end AND %2$send%2$s > :start AND used = true",
getSegmentsTable(),
connector.getQuoteString()
)
)
.setFetchSize(connector.getStreamingFetchSize())
.bind("dataSource", dataSource)
.bind("start", interval.getStart().toString())
.bind("end", interval.getEnd().toString())
.map((i, resultSet, context) -> {
try {
return jsonMapper.readValue(resultSet.getBytes("payload"), DataSegment.class);
} }
catch (IOException e) {
throw new RuntimeException(e);
}
})
.list()
.stream()
)
.iterator()
); );
} }
@Override
public boolean enableDataSource(final String dataSource)
{
try {
return enableSegments(dataSource, Intervals.ETERNITY) != 0;
}
catch (Exception e) { catch (Exception e) {
log.error(e, "Exception enabling datasource %s", dataSource); log.error(e, "Exception enabling datasource %s", dataSource);
return false; return false;
} }
}
return true; @Override
public int enableSegments(final String dataSource, final Interval interval)
{
List<Pair<DataSegment, Boolean>> segments = getDataSegmentsOverlappingInterval(dataSource, interval);
List<DataSegment> segmentsToEnable = segments.stream()
.filter(segment -> !segment.rhs && interval.contains(segment.lhs.getInterval()))
.map(segment -> segment.lhs)
.collect(Collectors.toList());
VersionedIntervalTimeline<String, DataSegment> versionedIntervalTimeline = VersionedIntervalTimeline.forSegments(
segments.stream().filter(segment -> segment.rhs).map(segment -> segment.lhs).iterator()
);
VersionedIntervalTimeline.addSegments(versionedIntervalTimeline, segmentsToEnable.iterator());
return enableSegments(
segmentsToEnable,
versionedIntervalTimeline
);
}
@Override
public int enableSegments(final String dataSource, final Collection<String> segmentIds)
{
Pair<List<DataSegment>, VersionedIntervalTimeline<String, DataSegment>> data = connector.inReadOnlyTransaction(
(handle, status) -> {
List<DataSegment> segments = getDataSegments(dataSource, segmentIds, handle)
.stream()
.filter(pair -> !pair.rhs)
.map(pair -> pair.lhs)
.collect(Collectors.toList());
VersionedIntervalTimeline<String, DataSegment> versionedIntervalTimeline = buildVersionedIntervalTimeline(
dataSource,
JodaUtils.condenseIntervals(segments.stream().map(segment -> segment.getInterval()).collect(Collectors.toList())),
handle
);
VersionedIntervalTimeline.addSegments(versionedIntervalTimeline, segments.iterator());
return new Pair<>(
segments,
versionedIntervalTimeline
);
}
);
return enableSegments(
data.lhs,
data.rhs
);
}
private int enableSegments(
final Collection<DataSegment> segments,
final VersionedIntervalTimeline<String, DataSegment> versionedIntervalTimeline
)
{
if (segments.isEmpty()) {
log.warn("No segments found to update!");
return 0;
}
return connector.getDBI().withHandle(handle -> {
Batch batch = handle.createBatch();
segments
.stream()
.map(segment -> segment.getId())
.filter(segmentId -> !versionedIntervalTimeline.isOvershadowed(
segmentId.getInterval(),
segmentId.getVersion()
))
.forEach(segmentId -> batch.add(
StringUtils.format(
"UPDATE %s SET used=true WHERE id = '%s'",
getSegmentsTable(),
segmentId
)
));
return batch.execute().length;
});
} }
@Override @Override

View File

@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.metadata;
/**
* Exception thrown by MetadataSegmentManager when an segment id is unknown.
*/
public class UnknownSegmentIdException extends RuntimeException
{
public UnknownSegmentIdException(String message)
{
super(message);
}
}

View File

@ -21,7 +21,7 @@ package org.apache.druid.server.http;
import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.api.client.repackaged.com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables; import com.google.common.collect.Iterables;
import com.google.inject.Inject; import com.google.inject.Inject;
@ -43,6 +43,7 @@ import org.apache.druid.java.util.common.guava.FunctionalIterable;
import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.metadata.MetadataRuleManager; import org.apache.druid.metadata.MetadataRuleManager;
import org.apache.druid.metadata.MetadataSegmentManager; import org.apache.druid.metadata.MetadataSegmentManager;
import org.apache.druid.metadata.UnknownSegmentIdException;
import org.apache.druid.query.SegmentDescriptor; import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.query.TableDataSource; import org.apache.druid.query.TableDataSource;
import org.apache.druid.server.coordination.DruidServerMetadata; import org.apache.druid.server.coordination.DruidServerMetadata;
@ -753,6 +754,60 @@ public class DataSourcesResource
return false; return false;
} }
@POST
@Path("/{dataSourceName}/markUsed")
@Produces(MediaType.APPLICATION_JSON)
@ResourceFilters(DatasourceResourceFilter.class)
public Response enableDatasourceSegments(
@PathParam("dataSourceName") String dataSourceName,
MarkDatasourceSegmentsPayload payload
)
{
if (payload == null || !payload.isValid()) {
return Response.status(Response.Status.BAD_REQUEST)
.entity("Invalid request payload, either interval or segmentIds array must be specified")
.build();
}
final ImmutableDruidDataSource dataSource = getDataSource(dataSourceName);
if (dataSource == null) {
return Response.noContent().build();
}
int modified;
try {
if (payload.getInterval() != null) {
modified = databaseSegmentManager.enableSegments(dataSource.getName(), payload.getInterval());
} else {
modified = databaseSegmentManager.enableSegments(dataSource.getName(), payload.getSegmentIds());
}
}
catch (Exception e) {
if (e.getCause() instanceof UnknownSegmentIdException) {
return Response.status(Response.Status.NOT_FOUND).entity(
ImmutableMap.of(
"message",
e.getCause().getMessage()
)
).build();
}
return Response.serverError().entity(
ImmutableMap.of(
"error",
"Exception occurred.",
"message",
e.getMessage()
)
).build();
}
if (modified == 0) {
return Response.noContent().build();
}
return Response.ok().build();
}
@VisibleForTesting @VisibleForTesting
protected static class MarkDatasourceSegmentsPayload protected static class MarkDatasourceSegmentsPayload
{ {

View File

@ -25,6 +25,7 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables; import com.google.common.collect.Iterables;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.java.util.emitter.EmittingLogger;
@ -32,12 +33,14 @@ import org.apache.druid.segment.TestHelper;
import org.apache.druid.server.metrics.NoopServiceEmitter; import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.NoneShardSpec; import org.apache.druid.timeline.partition.NoneShardSpec;
import org.hamcrest.core.IsInstanceOf;
import org.joda.time.Interval; import org.joda.time.Interval;
import org.junit.After; import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Rule; import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.io.IOException; import java.io.IOException;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -48,6 +51,9 @@ public class SQLMetadataSegmentManagerTest
@Rule @Rule
public final TestDerbyConnector.DerbyConnectorRule derbyConnectorRule = new TestDerbyConnector.DerbyConnectorRule(); public final TestDerbyConnector.DerbyConnectorRule derbyConnectorRule = new TestDerbyConnector.DerbyConnectorRule();
@Rule
public ExpectedException thrown = ExpectedException.none();
private SQLMetadataSegmentManager manager; private SQLMetadataSegmentManager manager;
private SQLMetadataSegmentPublisher publisher; private SQLMetadataSegmentPublisher publisher;
private final ObjectMapper jsonMapper = TestHelper.makeJsonMapper(); private final ObjectMapper jsonMapper = TestHelper.makeJsonMapper();
@ -84,6 +90,21 @@ public class SQLMetadataSegmentManagerTest
1234L 1234L
); );
private void publish(DataSegment segment, boolean used) throws IOException
{
publisher.publishSegment(
segment.getId().toString(),
segment.getDataSource(),
DateTimes.nowUtc().toString(),
segment.getInterval().getStart().toString(),
segment.getInterval().getEnd().toString(),
(segment.getShardSpec() instanceof NoneShardSpec) ? false : true,
segment.getVersion(),
used,
jsonMapper.writeValueAsBytes(segment)
);
}
@Before @Before
public void setUp() throws Exception public void setUp() throws Exception
{ {
@ -276,6 +297,432 @@ public class SQLMetadataSegmentManagerTest
Assert.assertTrue(manager.removeSegment(newSegment.getId())); Assert.assertTrue(manager.removeSegment(newSegment.getId()));
} }
@Test
public void testEnableSegmentsWithSegmentIds() throws IOException
{
manager.start();
manager.poll();
Assert.assertTrue(manager.isStarted());
final String datasource = "wikipedia2";
final DataSegment newSegment1 = new DataSegment(
datasource,
Intervals.of("2017-10-15T00:00:00.000/2017-10-17T00:00:00.000"),
"2017-10-15T20:19:12.565Z",
ImmutableMap.of(
"type", "s3_zip",
"bucket", "test",
"key", "wikipedia2/index/y=2017/m=10/d=15/2017-10-16T20:19:12.565Z/0/index.zip"
),
ImmutableList.of("dim1", "dim2", "dim3"),
ImmutableList.of("count", "value"),
NoneShardSpec.instance(),
0,
1234L
);
final DataSegment newSegment2 = new DataSegment(
datasource,
Intervals.of("2017-10-17T00:00:00.000/2017-10-18T00:00:00.000"),
"2017-10-16T20:19:12.565Z",
ImmutableMap.of(
"type", "s3_zip",
"bucket", "test",
"key", "wikipedia2/index/y=2017/m=10/d=15/2017-10-16T20:19:12.565Z/0/index.zip"
),
ImmutableList.of("dim1", "dim2", "dim3"),
ImmutableList.of("count", "value"),
NoneShardSpec.instance(),
1,
1234L
);
// Overshadowed by newSegment2
final DataSegment newSegment3 = new DataSegment(
datasource,
Intervals.of("2017-10-17T00:00:00.000/2017-10-18T00:00:00.000"),
"2017-10-15T20:19:12.565Z",
ImmutableMap.of(
"type", "s3_zip",
"bucket", "test",
"key", "wikipedia2/index/y=2017/m=10/d=15/2017-10-16T20:19:12.565Z/0/index.zip"
),
ImmutableList.of("dim1", "dim2", "dim3"),
ImmutableList.of("count", "value"),
NoneShardSpec.instance(),
1,
1234L
);
publish(newSegment1, false);
publish(newSegment2, false);
publish(newSegment3, false);
final ImmutableList<String> segmentIds = ImmutableList.of(
newSegment1.getId().toString(),
newSegment2.getId().toString(),
newSegment3.getId().toString()
);
manager.poll();
Assert.assertEquals(
ImmutableSet.of(segment1, segment2),
ImmutableSet.copyOf(manager.iterateAllSegments())
);
Assert.assertEquals(2, manager.enableSegments(datasource, segmentIds));
manager.poll();
Assert.assertEquals(
ImmutableSet.of(segment1, segment2, newSegment1, newSegment2),
ImmutableSet.copyOf(manager.iterateAllSegments())
);
}
@Test
public void testEnableSegmentsWithSegmentIdsInvalidDatasource() throws IOException
{
thrown.expectCause(IsInstanceOf.instanceOf(UnknownSegmentIdException.class));
manager.start();
manager.poll();
Assert.assertTrue(manager.isStarted());
final String datasource = "wikipedia2";
final DataSegment newSegment1 = new DataSegment(
datasource,
Intervals.of("2017-10-15T00:00:00.000/2017-10-16T00:00:00.000"),
"2017-10-15T20:19:12.565Z",
ImmutableMap.of(
"type", "s3_zip",
"bucket", "test",
"key", "wikipedia2/index/y=2017/m=10/d=15/2017-10-16T20:19:12.565Z/0/index.zip"
),
ImmutableList.of("dim1", "dim2", "dim3"),
ImmutableList.of("count", "value"),
NoneShardSpec.instance(),
0,
1234L
);
final DataSegment newSegment2 = new DataSegment(
datasource,
Intervals.of("2017-10-15T00:00:00.000/2017-10-16T00:00:00.000"),
"2017-10-15T20:19:12.565Z",
ImmutableMap.of(
"type", "s3_zip",
"bucket", "test",
"key", "wikipedia2/index/y=2017/m=10/d=15/2017-10-16T20:19:12.565Z/0/index.zip"
),
ImmutableList.of("dim1", "dim2", "dim3"),
ImmutableList.of("count", "value"),
NoneShardSpec.instance(),
0,
1234L
);
publish(newSegment1, false);
publish(newSegment2, false);
final ImmutableList<String> segmentIds = ImmutableList.of(
newSegment1.getId().toString(),
newSegment2.getId().toString()
);
manager.poll();
Assert.assertEquals(
ImmutableSet.of(segment1, segment2),
ImmutableSet.copyOf(manager.iterateAllSegments())
);
// none of the segments are in datasource
Assert.assertEquals(0, manager.enableSegments("wrongDataSource", segmentIds));
}
@Test
public void testEnableSegmentsWithInvalidSegmentIds()
{
thrown.expectCause(IsInstanceOf.instanceOf(UnknownSegmentIdException.class));
manager.start();
manager.poll();
Assert.assertTrue(manager.isStarted());
final String datasource = "wikipedia2";
final DataSegment newSegment1 = new DataSegment(
datasource,
Intervals.of("2017-10-15T00:00:00.000/2017-10-16T00:00:00.000"),
"2017-10-15T20:19:12.565Z",
ImmutableMap.of(
"type", "s3_zip",
"bucket", "test",
"key", "wikipedia2/index/y=2017/m=10/d=15/2017-10-16T20:19:12.565Z/0/index.zip"
),
ImmutableList.of("dim1", "dim2", "dim3"),
ImmutableList.of("count", "value"),
NoneShardSpec.instance(),
0,
1234L
);
final DataSegment newSegment2 = new DataSegment(
datasource,
Intervals.of("2017-10-15T00:00:00.000/2017-10-16T00:00:00.000"),
"2017-10-15T20:19:12.565Z",
ImmutableMap.of(
"type", "s3_zip",
"bucket", "test",
"key", "wikipedia2/index/y=2017/m=10/d=15/2017-10-16T20:19:12.565Z/0/index.zip"
),
ImmutableList.of("dim1", "dim2", "dim3"),
ImmutableList.of("count", "value"),
NoneShardSpec.instance(),
0,
1234L
);
final ImmutableList<String> segmentIds = ImmutableList.of(
newSegment1.getId().toString(),
newSegment2.getId().toString()
);
manager.poll();
Assert.assertEquals(
ImmutableSet.of(segment1, segment2),
ImmutableSet.copyOf(manager.iterateAllSegments())
);
// none of the segments are in datasource
Assert.assertEquals(0, manager.enableSegments(datasource, segmentIds));
}
@Test
public void testEnableSegmentsWithInterval() throws IOException
{
manager.start();
manager.poll();
Assert.assertTrue(manager.isStarted());
final String datasource = "wikipedia2";
final DataSegment newSegment1 = new DataSegment(
datasource,
Intervals.of("2017-10-15T00:00:00.000/2017-10-16T00:00:00.000"),
"2017-10-15T20:19:12.565Z",
ImmutableMap.of(
"type", "s3_zip",
"bucket", "test",
"key", "wikipedia2/index/y=2017/m=10/d=15/2017-10-16T20:19:12.565Z/0/index.zip"
),
ImmutableList.of("dim1", "dim2", "dim3"),
ImmutableList.of("count", "value"),
NoneShardSpec.instance(),
0,
1234L
);
final DataSegment newSegment2 = new DataSegment(
datasource,
Intervals.of("2017-10-17T00:00:00.000/2017-10-18T00:00:00.000"),
"2017-10-16T20:19:12.565Z",
ImmutableMap.of(
"type", "s3_zip",
"bucket", "test",
"key", "wikipedia2/index/y=2017/m=10/d=15/2017-10-16T20:19:12.565Z/0/index.zip"
),
ImmutableList.of("dim1", "dim2", "dim3"),
ImmutableList.of("count", "value"),
NoneShardSpec.instance(),
1,
1234L
);
final DataSegment newSegment3 = new DataSegment(
datasource,
Intervals.of("2017-10-19T00:00:00.000/2017-10-20T00:00:00.000"),
"2017-10-15T20:19:12.565Z",
ImmutableMap.of(
"type", "s3_zip",
"bucket", "test",
"key", "wikipedia2/index/y=2017/m=10/d=15/2017-10-16T20:19:12.565Z/0/index.zip"
),
ImmutableList.of("dim1", "dim2", "dim3"),
ImmutableList.of("count", "value"),
NoneShardSpec.instance(),
0,
1234L
);
// Overshadowed by newSegment2
final DataSegment newSegment4 = new DataSegment(
datasource,
Intervals.of("2017-10-17T00:00:00.000/2017-10-18T00:00:00.000"),
"2017-10-15T20:19:12.565Z",
ImmutableMap.of(
"type", "s3_zip",
"bucket", "test",
"key", "wikipedia2/index/y=2017/m=10/d=15/2017-10-16T20:19:12.565Z/0/index.zip"
),
ImmutableList.of("dim1", "dim2", "dim3"),
ImmutableList.of("count", "value"),
NoneShardSpec.instance(),
0,
1234L
);
publish(newSegment1, false);
publish(newSegment2, false);
publish(newSegment3, false);
publish(newSegment4, false);
final Interval theInterval = Intervals.of("2017-10-15T00:00:00.000/2017-10-18T00:00:00.000");
manager.poll();
Assert.assertEquals(
ImmutableSet.of(segment1, segment2),
ImmutableSet.copyOf(manager.iterateAllSegments())
);
// 2 out of 3 segments match the interval
Assert.assertEquals(2, manager.enableSegments(datasource, theInterval));
manager.poll();
Assert.assertEquals(
ImmutableSet.of(segment1, segment2, newSegment1, newSegment2),
ImmutableSet.copyOf(manager.iterateAllSegments())
);
}
@Test(expected = IllegalArgumentException.class)
public void testEnableSegmentsWithInvalidInterval() throws IOException
{
manager.start();
manager.poll();
Assert.assertTrue(manager.isStarted());
final String datasource = "wikipedia2";
final DataSegment newSegment1 = new DataSegment(
datasource,
Intervals.of("2017-10-15T00:00:00.000/2017-10-16T00:00:00.000"),
"2017-10-15T20:19:12.565Z",
ImmutableMap.of(
"type", "s3_zip",
"bucket", "test",
"key", "wikipedia2/index/y=2017/m=10/d=15/2017-10-16T20:19:12.565Z/0/index.zip"
),
ImmutableList.of("dim1", "dim2", "dim3"),
ImmutableList.of("count", "value"),
NoneShardSpec.instance(),
0,
1234L
);
final DataSegment newSegment2 = new DataSegment(
datasource,
Intervals.of("2017-10-17T00:00:00.000/2017-10-18T00:00:00.000"),
"2017-10-15T20:19:12.565Z",
ImmutableMap.of(
"type", "s3_zip",
"bucket", "test",
"key", "wikipedia2/index/y=2017/m=10/d=15/2017-10-16T20:19:12.565Z/0/index.zip"
),
ImmutableList.of("dim1", "dim2", "dim3"),
ImmutableList.of("count", "value"),
NoneShardSpec.instance(),
0,
1234L
);
publish(newSegment1, false);
publish(newSegment2, false);
// invalid interval start > end
final Interval theInterval = Intervals.of("2017-10-22T00:00:00.000/2017-10-02T00:00:00.000");
manager.enableSegments(datasource, theInterval);
}
@Test
public void testEnableSegmentsWithOverlappingInterval() throws IOException
{
manager.start();
manager.poll();
Assert.assertTrue(manager.isStarted());
final String datasource = "wikipedia2";
final DataSegment newSegment1 = new DataSegment(
datasource,
Intervals.of("2017-10-15T00:00:00.000/2017-10-17T00:00:00.000"),
"2017-10-15T20:19:12.565Z",
ImmutableMap.of(
"type", "s3_zip",
"bucket", "test",
"key", "wikipedia2/index/y=2017/m=10/d=15/2017-10-16T20:19:12.565Z/0/index.zip"
),
ImmutableList.of("dim1", "dim2", "dim3"),
ImmutableList.of("count", "value"),
NoneShardSpec.instance(),
0,
1234L
);
final DataSegment newSegment2 = new DataSegment(
datasource,
Intervals.of("2017-10-17T00:00:00.000/2017-10-18T00:00:00.000"),
"2017-10-16T20:19:12.565Z",
ImmutableMap.of(
"type", "s3_zip",
"bucket", "test",
"key", "wikipedia2/index/y=2017/m=10/d=15/2017-10-16T20:19:12.565Z/0/index.zip"
),
ImmutableList.of("dim1", "dim2", "dim3"),
ImmutableList.of("count", "value"),
NoneShardSpec.instance(),
1,
1234L
);
final DataSegment newSegment3 = new DataSegment(
datasource,
Intervals.of("2017-10-19T00:00:00.000/2017-10-22T00:00:00.000"),
"2017-10-15T20:19:12.565Z",
ImmutableMap.of(
"type", "s3_zip",
"bucket", "test",
"key", "wikipedia2/index/y=2017/m=10/d=15/2017-10-16T20:19:12.565Z/0/index.zip"
),
ImmutableList.of("dim1", "dim2", "dim3"),
ImmutableList.of("count", "value"),
NoneShardSpec.instance(),
0,
1234L
);
// Overshadowed by newSegment2
final DataSegment newSegment4 = new DataSegment(
datasource,
Intervals.of("2017-10-17T00:00:00.000/2017-10-18T00:00:00.000"),
"2017-10-15T20:19:12.565Z",
ImmutableMap.of(
"type", "s3_zip",
"bucket", "test",
"key", "wikipedia2/index/y=2017/m=10/d=15/2017-10-16T20:19:12.565Z/0/index.zip"
),
ImmutableList.of("dim1", "dim2", "dim3"),
ImmutableList.of("count", "value"),
NoneShardSpec.instance(),
0,
1234L
);
publish(newSegment1, false);
publish(newSegment2, false);
publish(newSegment3, false);
publish(newSegment4, false);
final Interval theInterval = Intervals.of("2017-10-16T00:00:00.000/2017-10-20T00:00:00.000");
manager.poll();
Assert.assertEquals(
ImmutableSet.of(segment1, segment2),
ImmutableSet.copyOf(manager.iterateAllSegments())
);
// 1 out of 3 segments match the interval, other 2 overlap, only the segment fully contained will be disabled
Assert.assertEquals(1, manager.enableSegments(datasource, theInterval));
manager.poll();
Assert.assertEquals(
ImmutableSet.of(segment1, segment2, newSegment2),
ImmutableSet.copyOf(manager.iterateAllSegments())
);
}
@Test @Test
public void testDisableSegmentsWithSegmentIds() throws IOException public void testDisableSegmentsWithSegmentIds() throws IOException
{ {

View File

@ -666,6 +666,305 @@ public class DataSourcesResourceTest
EasyMock.verify(inventoryView, databaseRuleManager); EasyMock.verify(inventoryView, databaseRuleManager);
} }
@Test
public void testEnableDatasourceSegment()
{
MetadataSegmentManager metadataSegmentManager = EasyMock.createMock(MetadataSegmentManager.class);
EasyMock.expect(metadataSegmentManager.enableSegment(dataSegmentList.get(0).getId().toString()))
.andReturn(true)
.once();
EasyMock.replay(metadataSegmentManager);
DataSourcesResource DataSourcesResource = new DataSourcesResource(
null,
metadataSegmentManager,
null,
null,
null,
null
);
Response response = DataSourcesResource.enableDatasourceSegment(dataSegmentList.get(0).getDataSource(), dataSegmentList.get(0).getId().toString());
Assert.assertEquals(200, response.getStatus());
EasyMock.verify(metadataSegmentManager);
}
@Test
public void testEnableDatasourceSegmentFailed()
{
MetadataSegmentManager metadataSegmentManager = EasyMock.createMock(MetadataSegmentManager.class);
EasyMock.expect(metadataSegmentManager.enableSegment(dataSegmentList.get(0).getId().toString()))
.andReturn(false)
.once();
EasyMock.replay(metadataSegmentManager);
DataSourcesResource DataSourcesResource = new DataSourcesResource(
null,
metadataSegmentManager,
null,
null,
null,
null
);
Response response = DataSourcesResource.enableDatasourceSegment(dataSegmentList.get(0).getDataSource(), dataSegmentList.get(0).getId().toString());
Assert.assertEquals(204, response.getStatus());
EasyMock.verify(metadataSegmentManager);
}
@Test
public void testEnableDatasourceSegmentsInterval()
{
MetadataSegmentManager metadataSegmentManager = EasyMock.createMock(MetadataSegmentManager.class);
DruidDataSource dataSource = new DruidDataSource("datasource1", new HashMap<>());
Interval interval = Intervals.of("2010-01-22/P1D");
EasyMock.expect(metadataSegmentManager.enableSegments(EasyMock.eq("datasource1"), EasyMock.eq(interval)))
.andReturn(3)
.once();
EasyMock.expect(inventoryView.getInventory()).andReturn(ImmutableList.of(server)).once();
EasyMock.expect(server.getDataSource("datasource1")).andReturn(dataSource).once();
EasyMock.replay(metadataSegmentManager, inventoryView, server);
DataSourcesResource DataSourcesResource = new DataSourcesResource(
inventoryView,
metadataSegmentManager,
null,
null,
null,
null
);
Response response = DataSourcesResource.enableDatasourceSegments(
"datasource1",
new DataSourcesResource.MarkDatasourceSegmentsPayload(
interval,
null
)
);
Assert.assertEquals(200, response.getStatus());
EasyMock.verify(metadataSegmentManager, inventoryView, server);
}
@Test
public void testEnableDatasourceSegmentsIntervalNoneUpdated()
{
MetadataSegmentManager metadataSegmentManager = EasyMock.createMock(MetadataSegmentManager.class);
DruidDataSource dataSource = new DruidDataSource("datasource1", new HashMap<>());
Interval interval = Intervals.of("2010-01-22/P1D");
EasyMock.expect(metadataSegmentManager.enableSegments(EasyMock.eq("datasource1"), EasyMock.eq(interval)))
.andReturn(0)
.once();
EasyMock.expect(inventoryView.getInventory()).andReturn(ImmutableList.of(server)).once();
EasyMock.expect(server.getDataSource("datasource1")).andReturn(dataSource).once();
EasyMock.replay(metadataSegmentManager, inventoryView, server);
DataSourcesResource DataSourcesResource = new DataSourcesResource(
inventoryView,
metadataSegmentManager,
null,
null,
null,
null
);
Response response = DataSourcesResource.enableDatasourceSegments(
"datasource1",
new DataSourcesResource.MarkDatasourceSegmentsPayload(
interval,
null
)
);
Assert.assertEquals(204, response.getStatus());
EasyMock.verify(metadataSegmentManager, inventoryView, server);
}
@Test
public void testEnableDatasourceSegmentsSet()
{
MetadataSegmentManager metadataSegmentManager = EasyMock.createMock(MetadataSegmentManager.class);
DruidDataSource dataSource = new DruidDataSource("datasource1", new HashMap<>());
Set<String> segmentIds = ImmutableSet.of(dataSegmentList.get(1).getId().toString());
EasyMock.expect(metadataSegmentManager.enableSegments(EasyMock.eq("datasource1"), EasyMock.eq(segmentIds)))
.andReturn(3)
.once();
EasyMock.expect(inventoryView.getInventory()).andReturn(ImmutableList.of(server)).once();
EasyMock.expect(server.getDataSource("datasource1")).andReturn(dataSource).once();
EasyMock.replay(metadataSegmentManager, inventoryView, server);
DataSourcesResource DataSourcesResource = new DataSourcesResource(
inventoryView,
metadataSegmentManager,
null,
null,
null,
null
);
Response response = DataSourcesResource.enableDatasourceSegments(
"datasource1",
new DataSourcesResource.MarkDatasourceSegmentsPayload(
null,
segmentIds
)
);
Assert.assertEquals(200, response.getStatus());
EasyMock.verify(metadataSegmentManager, inventoryView, server);
}
@Test
public void testEnableDatasourceSegmentsIntervalException()
{
MetadataSegmentManager metadataSegmentManager = EasyMock.createMock(MetadataSegmentManager.class);
DruidDataSource dataSource = new DruidDataSource("datasource1", new HashMap<>());
Interval interval = Intervals.of("2010-01-22/P1D");
EasyMock.expect(metadataSegmentManager.enableSegments(EasyMock.eq("datasource1"), EasyMock.eq(interval)))
.andThrow(new RuntimeException("Error!"))
.once();
EasyMock.expect(inventoryView.getInventory()).andReturn(ImmutableList.of(server)).once();
EasyMock.expect(server.getDataSource("datasource1")).andReturn(dataSource).once();
EasyMock.replay(metadataSegmentManager, inventoryView, server);
DataSourcesResource DataSourcesResource = new DataSourcesResource(
inventoryView,
metadataSegmentManager,
null,
null,
null,
null
);
Response response = DataSourcesResource.enableDatasourceSegments(
"datasource1",
new DataSourcesResource.MarkDatasourceSegmentsPayload(
interval,
null
)
);
Assert.assertEquals(500, response.getStatus());
EasyMock.verify(metadataSegmentManager, inventoryView, server);
}
@Test
public void testEnableDatasourceSegmentslNoDatasource()
{
MetadataSegmentManager metadataSegmentManager = EasyMock.createMock(MetadataSegmentManager.class);
EasyMock.expect(inventoryView.getInventory()).andReturn(ImmutableList.of(server)).once();
EasyMock.expect(server.getDataSource("datasource1")).andReturn(null).once();
EasyMock.replay(metadataSegmentManager, inventoryView, server);
DataSourcesResource DataSourcesResource = new DataSourcesResource(
inventoryView,
metadataSegmentManager,
null,
null,
null,
null
);
Response response = DataSourcesResource.enableDatasourceSegments(
"datasource1",
new DataSourcesResource.MarkDatasourceSegmentsPayload(
Intervals.of("2010-01-22/P1D"),
null
)
);
Assert.assertEquals(204, response.getStatus());
EasyMock.verify(metadataSegmentManager);
}
@Test
public void testEnableDatasourceSegmentsInvalidPayloadNoArguments()
{
MetadataSegmentManager metadataSegmentManager = EasyMock.createMock(MetadataSegmentManager.class);
DataSourcesResource DataSourcesResource = new DataSourcesResource(
inventoryView,
metadataSegmentManager,
null,
null,
null,
null
);
Response response = DataSourcesResource.enableDatasourceSegments(
"datasource1",
new DataSourcesResource.MarkDatasourceSegmentsPayload(
null,
null
)
);
Assert.assertEquals(400, response.getStatus());
}
@Test
public void testEnableDatasourceSegmentsInvalidPayloadBothArguments()
{
MetadataSegmentManager metadataSegmentManager = EasyMock.createMock(MetadataSegmentManager.class);
DataSourcesResource DataSourcesResource = new DataSourcesResource(
inventoryView,
metadataSegmentManager,
null,
null,
null,
null
);
Response response = DataSourcesResource.enableDatasourceSegments(
"datasource1",
new DataSourcesResource.MarkDatasourceSegmentsPayload(
Intervals.of("2010-01-22/P1D"),
ImmutableSet.of()
)
);
Assert.assertEquals(400, response.getStatus());
}
@Test
public void testEnableDatasourceSegmentsInvalidPayloadEmptyArray()
{
MetadataSegmentManager metadataSegmentManager = EasyMock.createMock(MetadataSegmentManager.class);
DataSourcesResource DataSourcesResource = new DataSourcesResource(
inventoryView,
metadataSegmentManager,
null,
null,
null,
null
);
Response response = DataSourcesResource.enableDatasourceSegments(
"datasource1",
new DataSourcesResource.MarkDatasourceSegmentsPayload(
null,
ImmutableSet.of()
)
);
Assert.assertEquals(400, response.getStatus());
}
@Test
public void testEnableDatasourceSegmentsNoPayload()
{
MetadataSegmentManager metadataSegmentManager = EasyMock.createMock(MetadataSegmentManager.class);
DataSourcesResource DataSourcesResource = new DataSourcesResource(
inventoryView,
metadataSegmentManager,
null,
null,
null,
null
);
Response response = DataSourcesResource.enableDatasourceSegments(
"datasource1",
null
);
Assert.assertEquals(400, response.getStatus());
}
@Test @Test
public void testSegmentLoadChecksForVersion() public void testSegmentLoadChecksForVersion()
{ {