mirror of https://github.com/apache/druid.git
API to drop data by interval (#7494)
* Add api to drop data by interval * update to address comments * unused imports * PR comments + add tests in SQLMetadataSegmentManagerTest * update tests and docs
This commit is contained in:
parent
6fd6e5de89
commit
8308ffef1f
|
@ -220,6 +220,11 @@ Returns full segment metadata for a specific segment in the cluster.
|
||||||
|
|
||||||
Return the tiers that a datasource exists in.
|
Return the tiers that a datasource exists in.
|
||||||
|
|
||||||
|
#### Note for coordinator's POST and DELETE API's
|
||||||
|
The segments would be enabled when these API's are called, but then can be disabled again by the coordinator if any dropRule matches. Segments enabled by these API's might not be loaded by historical processes if no loadRule matches. If an indexing or kill task runs at the same time as these API's are invoked, the behavior is undefined. Some segments might be killed and others might be enabled. It's also possible that all segments might be disabled but at the same time, the indexing task is able to read data from those segments and succeed.
|
||||||
|
|
||||||
|
Caution : Avoid using indexing or kill tasks and these API's at the same time for the same datasource and time chunk. (It's fine if the time chunks or datasource don't overlap)
|
||||||
|
|
||||||
##### POST
|
##### POST
|
||||||
|
|
||||||
* `/druid/coordinator/v1/datasources/{dataSourceName}`
|
* `/druid/coordinator/v1/datasources/{dataSourceName}`
|
||||||
|
@ -230,6 +235,19 @@ Enables all segments of datasource which are not overshadowed by others.
|
||||||
|
|
||||||
Enables a segment of a datasource.
|
Enables a segment of a datasource.
|
||||||
|
|
||||||
|
* `/druid/coordinator/v1/datasources/{dataSourceName}/markUnused`
|
||||||
|
|
||||||
|
Marks segments unused for a datasource by interval or set of segment Ids. The request payload contains the interval or set of segment Ids to be marked unused.
|
||||||
|
Either interval or segment ids should be provided, if both or none are provided in the payload , the API would throw an error (400 BAD REQUEST).Interval specifies the start and end times as IS0 8601 strings. `interval=(start/end)` where start and end both are inclusive and only the segments completely contained within the specified interval will be disabled, partially overlapping segments will not be affected.
|
||||||
|
|
||||||
|
JSON Request Payload:
|
||||||
|
|
||||||
|
|Key|Description|Example|
|
||||||
|
|----------|-------------|---------|
|
||||||
|
|`interval`|The interval for which to mark segments unused|"2015-09-12T03:00:00.000Z/2015-09-12T05:00:00.000Z"|
|
||||||
|
|`segmentIds`|Set of segment Ids to be marked unused|["segmentId1", "segmentId2"]|
|
||||||
|
|
||||||
|
|
||||||
##### DELETE<a name="coordinator-delete"></a>
|
##### DELETE<a name="coordinator-delete"></a>
|
||||||
|
|
||||||
* `/druid/coordinator/v1/datasources/{dataSourceName}`
|
* `/druid/coordinator/v1/datasources/{dataSourceName}`
|
||||||
|
|
|
@ -53,6 +53,10 @@ public interface MetadataSegmentManager
|
||||||
|
|
||||||
boolean removeSegment(SegmentId segmentId);
|
boolean removeSegment(SegmentId segmentId);
|
||||||
|
|
||||||
|
long disableSegments(String dataSource, Collection<String> segmentIds);
|
||||||
|
|
||||||
|
int disableSegments(String dataSource, Interval interval);
|
||||||
|
|
||||||
boolean isStarted();
|
boolean isStarted();
|
||||||
|
|
||||||
@Nullable
|
@Nullable
|
||||||
|
|
|
@ -61,6 +61,7 @@ import java.io.IOException;
|
||||||
import java.sql.ResultSet;
|
import java.sql.ResultSet;
|
||||||
import java.sql.SQLException;
|
import java.sql.SQLException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
@ -407,6 +408,61 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long disableSegments(String dataSource, Collection<String> segmentIds)
|
||||||
|
{
|
||||||
|
if (segmentIds.isEmpty()) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
final long[] result = new long[1];
|
||||||
|
try {
|
||||||
|
connector.getDBI().withHandle(handle -> {
|
||||||
|
Batch batch = handle.createBatch();
|
||||||
|
segmentIds
|
||||||
|
.forEach(segmentId -> batch.add(
|
||||||
|
StringUtils.format(
|
||||||
|
"UPDATE %s SET used=false WHERE datasource = '%s' AND id = '%s' ",
|
||||||
|
getSegmentsTable(),
|
||||||
|
dataSource,
|
||||||
|
segmentId
|
||||||
|
)
|
||||||
|
));
|
||||||
|
final int[] resultArr = batch.execute();
|
||||||
|
result[0] = Arrays.stream(resultArr).filter(x -> x > 0).count();
|
||||||
|
return result[0];
|
||||||
|
});
|
||||||
|
}
|
||||||
|
catch (Exception e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
return result[0];
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int disableSegments(String dataSource, Interval interval)
|
||||||
|
{
|
||||||
|
try {
|
||||||
|
return connector.getDBI().withHandle(
|
||||||
|
handle -> handle
|
||||||
|
.createStatement(
|
||||||
|
StringUtils
|
||||||
|
.format(
|
||||||
|
"UPDATE %s SET used=false WHERE datasource = :datasource "
|
||||||
|
+ "AND start >= :start AND %2$send%2$s <= :end",
|
||||||
|
getSegmentsTable(),
|
||||||
|
connector.getQuoteString()
|
||||||
|
))
|
||||||
|
.bind("datasource", dataSource)
|
||||||
|
.bind("start", interval.getStart().toString())
|
||||||
|
.bind("end", interval.getEnd().toString())
|
||||||
|
.execute()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
catch (Exception e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private boolean removeSegmentFromTable(String segmentId)
|
private boolean removeSegmentFromTable(String segmentId)
|
||||||
{
|
{
|
||||||
final int removed = connector.getDBI().withHandle(
|
final int removed = connector.getDBI().withHandle(
|
||||||
|
|
|
@ -19,6 +19,9 @@
|
||||||
|
|
||||||
package org.apache.druid.server.http;
|
package org.apache.druid.server.http;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||||
|
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||||
|
import com.google.api.client.repackaged.com.google.common.annotations.VisibleForTesting;
|
||||||
import com.google.common.collect.ImmutableMap;
|
import com.google.common.collect.ImmutableMap;
|
||||||
import com.google.common.collect.Iterables;
|
import com.google.common.collect.Iterables;
|
||||||
import com.google.inject.Inject;
|
import com.google.inject.Inject;
|
||||||
|
@ -686,6 +689,55 @@ public class DataSourcesResource
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@POST
|
||||||
|
@Path("/{dataSourceName}/markUnused")
|
||||||
|
@ResourceFilters(DatasourceResourceFilter.class)
|
||||||
|
@Produces(MediaType.APPLICATION_JSON)
|
||||||
|
@Consumes(MediaType.APPLICATION_JSON)
|
||||||
|
public Response markDatasourceUnused(
|
||||||
|
@PathParam("dataSourceName") final String dataSourceName,
|
||||||
|
final MarkDatasourceSegmentsPayload payload
|
||||||
|
)
|
||||||
|
{
|
||||||
|
if (payload == null || !payload.isValid()) {
|
||||||
|
return Response.status(Response.Status.BAD_REQUEST)
|
||||||
|
.entity("Invalid request payload, either interval or segmentIds array must be specified")
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
final ImmutableDruidDataSource dataSource = getDataSource(dataSourceName);
|
||||||
|
if (dataSource == null) {
|
||||||
|
log.warn("datasource not found [%s]", dataSourceName);
|
||||||
|
return Response.noContent().build();
|
||||||
|
}
|
||||||
|
|
||||||
|
long markedSegmentCount = 0;
|
||||||
|
try {
|
||||||
|
final Interval interval = payload.getInterval();
|
||||||
|
final Set<String> segmentIds = payload.getSegmentIds();
|
||||||
|
if (interval != null) {
|
||||||
|
markedSegmentCount = databaseSegmentManager.disableSegments(dataSourceName, interval);
|
||||||
|
} else if (segmentIds != null) {
|
||||||
|
markedSegmentCount = databaseSegmentManager.disableSegments(dataSourceName, segmentIds);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch (Exception e) {
|
||||||
|
return Response.serverError().entity(
|
||||||
|
ImmutableMap.of(
|
||||||
|
"error",
|
||||||
|
"Exception occurred.",
|
||||||
|
"message",
|
||||||
|
e.toString()
|
||||||
|
)
|
||||||
|
).build();
|
||||||
|
|
||||||
|
}
|
||||||
|
if (markedSegmentCount == 0) {
|
||||||
|
return Response.noContent().build();
|
||||||
|
}
|
||||||
|
return Response.ok().build();
|
||||||
|
}
|
||||||
|
|
||||||
static boolean isSegmentLoaded(Iterable<ImmutableSegmentLoadInfo> serverView, SegmentDescriptor descriptor)
|
static boolean isSegmentLoaded(Iterable<ImmutableSegmentLoadInfo> serverView, SegmentDescriptor descriptor)
|
||||||
{
|
{
|
||||||
for (ImmutableSegmentLoadInfo segmentLoadInfo : serverView) {
|
for (ImmutableSegmentLoadInfo segmentLoadInfo : serverView) {
|
||||||
|
@ -700,4 +752,38 @@ public class DataSourcesResource
|
||||||
}
|
}
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
protected static class MarkDatasourceSegmentsPayload
|
||||||
|
{
|
||||||
|
private final Interval interval;
|
||||||
|
private final Set<String> segmentIds;
|
||||||
|
|
||||||
|
@JsonCreator
|
||||||
|
public MarkDatasourceSegmentsPayload(
|
||||||
|
@JsonProperty("interval") Interval interval,
|
||||||
|
@JsonProperty("segmentIds") Set<String> segmentIds
|
||||||
|
)
|
||||||
|
{
|
||||||
|
this.interval = interval;
|
||||||
|
this.segmentIds = segmentIds;
|
||||||
|
}
|
||||||
|
|
||||||
|
@JsonProperty
|
||||||
|
public Interval getInterval()
|
||||||
|
{
|
||||||
|
return interval;
|
||||||
|
}
|
||||||
|
|
||||||
|
@JsonProperty
|
||||||
|
public Set<String> getSegmentIds()
|
||||||
|
{
|
||||||
|
return segmentIds;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isValid()
|
||||||
|
{
|
||||||
|
return (interval == null ^ segmentIds == null) && (segmentIds == null || !segmentIds.isEmpty());
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -32,6 +32,7 @@ import org.apache.druid.segment.TestHelper;
|
||||||
import org.apache.druid.server.metrics.NoopServiceEmitter;
|
import org.apache.druid.server.metrics.NoopServiceEmitter;
|
||||||
import org.apache.druid.timeline.DataSegment;
|
import org.apache.druid.timeline.DataSegment;
|
||||||
import org.apache.druid.timeline.partition.NoneShardSpec;
|
import org.apache.druid.timeline.partition.NoneShardSpec;
|
||||||
|
import org.joda.time.Interval;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
|
@ -275,6 +276,302 @@ public class SQLMetadataSegmentManagerTest
|
||||||
Assert.assertTrue(manager.removeSegment(newSegment.getId()));
|
Assert.assertTrue(manager.removeSegment(newSegment.getId()));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDisableSegmentsWithSegmentIds() throws IOException
|
||||||
|
{
|
||||||
|
manager.start();
|
||||||
|
manager.poll();
|
||||||
|
Assert.assertTrue(manager.isStarted());
|
||||||
|
|
||||||
|
final String datasource = "wikipedia2";
|
||||||
|
final DataSegment newSegment1 = new DataSegment(
|
||||||
|
datasource,
|
||||||
|
Intervals.of("2017-10-15T00:00:00.000/2017-10-16T00:00:00.000"),
|
||||||
|
"2017-10-15T20:19:12.565Z",
|
||||||
|
ImmutableMap.of(
|
||||||
|
"type", "s3_zip",
|
||||||
|
"bucket", "test",
|
||||||
|
"key", "wikipedia2/index/y=2017/m=10/d=15/2017-10-16T20:19:12.565Z/0/index.zip"
|
||||||
|
),
|
||||||
|
ImmutableList.of("dim1", "dim2", "dim3"),
|
||||||
|
ImmutableList.of("count", "value"),
|
||||||
|
NoneShardSpec.instance(),
|
||||||
|
0,
|
||||||
|
1234L
|
||||||
|
);
|
||||||
|
|
||||||
|
final DataSegment newSegment2 = new DataSegment(
|
||||||
|
datasource,
|
||||||
|
Intervals.of("2017-10-15T00:00:00.000/2017-10-16T00:00:00.000"),
|
||||||
|
"2017-10-15T20:19:12.565Z",
|
||||||
|
ImmutableMap.of(
|
||||||
|
"type", "s3_zip",
|
||||||
|
"bucket", "test",
|
||||||
|
"key", "wikipedia2/index/y=2017/m=10/d=15/2017-10-16T20:19:12.565Z/0/index.zip"
|
||||||
|
),
|
||||||
|
ImmutableList.of("dim1", "dim2", "dim3"),
|
||||||
|
ImmutableList.of("count", "value"),
|
||||||
|
NoneShardSpec.instance(),
|
||||||
|
0,
|
||||||
|
1234L
|
||||||
|
);
|
||||||
|
|
||||||
|
publisher.publishSegment(newSegment1);
|
||||||
|
publisher.publishSegment(newSegment2);
|
||||||
|
final ImmutableList<String> segmentIds = ImmutableList.of(newSegment1.getId().toString(), newSegment1.getId().toString());
|
||||||
|
|
||||||
|
Assert.assertEquals(segmentIds.size(), manager.disableSegments(datasource, segmentIds));
|
||||||
|
manager.poll();
|
||||||
|
Assert.assertEquals(
|
||||||
|
ImmutableSet.of(segment1, segment2),
|
||||||
|
ImmutableSet.copyOf(manager.iterateAllSegments())
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDisableSegmentsWithSegmentIdsInvalidDatasource() throws IOException
|
||||||
|
{
|
||||||
|
manager.start();
|
||||||
|
manager.poll();
|
||||||
|
Assert.assertTrue(manager.isStarted());
|
||||||
|
|
||||||
|
final String datasource = "wikipedia2";
|
||||||
|
final DataSegment newSegment1 = new DataSegment(
|
||||||
|
datasource,
|
||||||
|
Intervals.of("2017-10-15T00:00:00.000/2017-10-16T00:00:00.000"),
|
||||||
|
"2017-10-15T20:19:12.565Z",
|
||||||
|
ImmutableMap.of(
|
||||||
|
"type", "s3_zip",
|
||||||
|
"bucket", "test",
|
||||||
|
"key", "wikipedia2/index/y=2017/m=10/d=15/2017-10-16T20:19:12.565Z/0/index.zip"
|
||||||
|
),
|
||||||
|
ImmutableList.of("dim1", "dim2", "dim3"),
|
||||||
|
ImmutableList.of("count", "value"),
|
||||||
|
NoneShardSpec.instance(),
|
||||||
|
0,
|
||||||
|
1234L
|
||||||
|
);
|
||||||
|
|
||||||
|
final DataSegment newSegment2 = new DataSegment(
|
||||||
|
datasource,
|
||||||
|
Intervals.of("2017-10-15T00:00:00.000/2017-10-16T00:00:00.000"),
|
||||||
|
"2017-10-15T20:19:12.565Z",
|
||||||
|
ImmutableMap.of(
|
||||||
|
"type", "s3_zip",
|
||||||
|
"bucket", "test",
|
||||||
|
"key", "wikipedia2/index/y=2017/m=10/d=15/2017-10-16T20:19:12.565Z/0/index.zip"
|
||||||
|
),
|
||||||
|
ImmutableList.of("dim1", "dim2", "dim3"),
|
||||||
|
ImmutableList.of("count", "value"),
|
||||||
|
NoneShardSpec.instance(),
|
||||||
|
0,
|
||||||
|
1234L
|
||||||
|
);
|
||||||
|
|
||||||
|
publisher.publishSegment(newSegment1);
|
||||||
|
publisher.publishSegment(newSegment2);
|
||||||
|
final ImmutableList<String> segmentIds = ImmutableList.of(
|
||||||
|
newSegment1.getId().toString(),
|
||||||
|
newSegment2.getId().toString()
|
||||||
|
);
|
||||||
|
// none of the segments are in datasource
|
||||||
|
Assert.assertEquals(0, manager.disableSegments("wrongDataSource", segmentIds));
|
||||||
|
manager.poll();
|
||||||
|
Assert.assertEquals(
|
||||||
|
ImmutableSet.of(segment1, segment2, newSegment1, newSegment2),
|
||||||
|
ImmutableSet.copyOf(manager.iterateAllSegments())
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDisableSegmentsWithInterval() throws IOException
|
||||||
|
{
|
||||||
|
manager.start();
|
||||||
|
manager.poll();
|
||||||
|
Assert.assertTrue(manager.isStarted());
|
||||||
|
|
||||||
|
final String datasource = "wikipedia2";
|
||||||
|
final DataSegment newSegment1 = new DataSegment(
|
||||||
|
datasource,
|
||||||
|
Intervals.of("2017-10-15T00:00:00.000/2017-10-16T00:00:00.000"),
|
||||||
|
"2017-10-15T20:19:12.565Z",
|
||||||
|
ImmutableMap.of(
|
||||||
|
"type", "s3_zip",
|
||||||
|
"bucket", "test",
|
||||||
|
"key", "wikipedia2/index/y=2017/m=10/d=15/2017-10-16T20:19:12.565Z/0/index.zip"
|
||||||
|
),
|
||||||
|
ImmutableList.of("dim1", "dim2", "dim3"),
|
||||||
|
ImmutableList.of("count", "value"),
|
||||||
|
NoneShardSpec.instance(),
|
||||||
|
0,
|
||||||
|
1234L
|
||||||
|
);
|
||||||
|
|
||||||
|
final DataSegment newSegment2 = new DataSegment(
|
||||||
|
datasource,
|
||||||
|
Intervals.of("2017-10-17T00:00:00.000/2017-10-18T00:00:00.000"),
|
||||||
|
"2017-10-15T20:19:12.565Z",
|
||||||
|
ImmutableMap.of(
|
||||||
|
"type", "s3_zip",
|
||||||
|
"bucket", "test",
|
||||||
|
"key", "wikipedia2/index/y=2017/m=10/d=15/2017-10-16T20:19:12.565Z/0/index.zip"
|
||||||
|
),
|
||||||
|
ImmutableList.of("dim1", "dim2", "dim3"),
|
||||||
|
ImmutableList.of("count", "value"),
|
||||||
|
NoneShardSpec.instance(),
|
||||||
|
0,
|
||||||
|
1234L
|
||||||
|
);
|
||||||
|
|
||||||
|
final DataSegment newSegment3 = new DataSegment(
|
||||||
|
datasource,
|
||||||
|
Intervals.of("2017-10-19T00:00:00.000/2017-10-20T00:00:00.000"),
|
||||||
|
"2017-10-15T20:19:12.565Z",
|
||||||
|
ImmutableMap.of(
|
||||||
|
"type", "s3_zip",
|
||||||
|
"bucket", "test",
|
||||||
|
"key", "wikipedia2/index/y=2017/m=10/d=15/2017-10-16T20:19:12.565Z/0/index.zip"
|
||||||
|
),
|
||||||
|
ImmutableList.of("dim1", "dim2", "dim3"),
|
||||||
|
ImmutableList.of("count", "value"),
|
||||||
|
NoneShardSpec.instance(),
|
||||||
|
0,
|
||||||
|
1234L
|
||||||
|
);
|
||||||
|
|
||||||
|
publisher.publishSegment(newSegment1);
|
||||||
|
publisher.publishSegment(newSegment2);
|
||||||
|
publisher.publishSegment(newSegment3);
|
||||||
|
final Interval theInterval = Intervals.of("2017-10-15T00:00:00.000/2017-10-18T00:00:00.000");
|
||||||
|
|
||||||
|
// 2 out of 3 segments match the interval
|
||||||
|
Assert.assertEquals(2, manager.disableSegments(datasource, theInterval));
|
||||||
|
|
||||||
|
manager.poll();
|
||||||
|
Assert.assertEquals(
|
||||||
|
ImmutableSet.of(segment1, segment2, newSegment3),
|
||||||
|
ImmutableSet.copyOf(manager.iterateAllSegments())
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(expected = IllegalArgumentException.class)
|
||||||
|
public void testDisableSegmentsWithInvalidInterval() throws IOException
|
||||||
|
{
|
||||||
|
manager.start();
|
||||||
|
manager.poll();
|
||||||
|
Assert.assertTrue(manager.isStarted());
|
||||||
|
|
||||||
|
final String datasource = "wikipedia2";
|
||||||
|
final DataSegment newSegment1 = new DataSegment(
|
||||||
|
datasource,
|
||||||
|
Intervals.of("2017-10-15T00:00:00.000/2017-10-16T00:00:00.000"),
|
||||||
|
"2017-10-15T20:19:12.565Z",
|
||||||
|
ImmutableMap.of(
|
||||||
|
"type", "s3_zip",
|
||||||
|
"bucket", "test",
|
||||||
|
"key", "wikipedia2/index/y=2017/m=10/d=15/2017-10-16T20:19:12.565Z/0/index.zip"
|
||||||
|
),
|
||||||
|
ImmutableList.of("dim1", "dim2", "dim3"),
|
||||||
|
ImmutableList.of("count", "value"),
|
||||||
|
NoneShardSpec.instance(),
|
||||||
|
0,
|
||||||
|
1234L
|
||||||
|
);
|
||||||
|
|
||||||
|
final DataSegment newSegment2 = new DataSegment(
|
||||||
|
datasource,
|
||||||
|
Intervals.of("2017-10-17T00:00:00.000/2017-10-18T00:00:00.000"),
|
||||||
|
"2017-10-15T20:19:12.565Z",
|
||||||
|
ImmutableMap.of(
|
||||||
|
"type", "s3_zip",
|
||||||
|
"bucket", "test",
|
||||||
|
"key", "wikipedia2/index/y=2017/m=10/d=15/2017-10-16T20:19:12.565Z/0/index.zip"
|
||||||
|
),
|
||||||
|
ImmutableList.of("dim1", "dim2", "dim3"),
|
||||||
|
ImmutableList.of("count", "value"),
|
||||||
|
NoneShardSpec.instance(),
|
||||||
|
0,
|
||||||
|
1234L
|
||||||
|
);
|
||||||
|
|
||||||
|
publisher.publishSegment(newSegment1);
|
||||||
|
publisher.publishSegment(newSegment2);
|
||||||
|
// invalid interval start > end
|
||||||
|
final Interval theInterval = Intervals.of("2017-10-22T00:00:00.000/2017-10-02T00:00:00.000");
|
||||||
|
manager.disableSegments(datasource, theInterval);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDisableSegmentsWithOverlappingInterval() throws IOException
|
||||||
|
{
|
||||||
|
manager.start();
|
||||||
|
manager.poll();
|
||||||
|
Assert.assertTrue(manager.isStarted());
|
||||||
|
|
||||||
|
final String datasource = "wikipedia2";
|
||||||
|
final DataSegment newSegment1 = new DataSegment(
|
||||||
|
datasource,
|
||||||
|
Intervals.of("2017-10-15T00:00:00.000/2017-10-17T00:00:00.000"),
|
||||||
|
"2017-10-15T20:19:12.565Z",
|
||||||
|
ImmutableMap.of(
|
||||||
|
"type", "s3_zip",
|
||||||
|
"bucket", "test",
|
||||||
|
"key", "wikipedia2/index/y=2017/m=10/d=15/2017-10-16T20:19:12.565Z/0/index.zip"
|
||||||
|
),
|
||||||
|
ImmutableList.of("dim1", "dim2", "dim3"),
|
||||||
|
ImmutableList.of("count", "value"),
|
||||||
|
NoneShardSpec.instance(),
|
||||||
|
0,
|
||||||
|
1234L
|
||||||
|
);
|
||||||
|
|
||||||
|
final DataSegment newSegment2 = new DataSegment(
|
||||||
|
datasource,
|
||||||
|
Intervals.of("2017-10-17T00:00:00.000/2017-10-18T00:00:00.000"),
|
||||||
|
"2017-10-15T20:19:12.565Z",
|
||||||
|
ImmutableMap.of(
|
||||||
|
"type", "s3_zip",
|
||||||
|
"bucket", "test",
|
||||||
|
"key", "wikipedia2/index/y=2017/m=10/d=15/2017-10-16T20:19:12.565Z/0/index.zip"
|
||||||
|
),
|
||||||
|
ImmutableList.of("dim1", "dim2", "dim3"),
|
||||||
|
ImmutableList.of("count", "value"),
|
||||||
|
NoneShardSpec.instance(),
|
||||||
|
0,
|
||||||
|
1234L
|
||||||
|
);
|
||||||
|
|
||||||
|
final DataSegment newSegment3 = new DataSegment(
|
||||||
|
datasource,
|
||||||
|
Intervals.of("2017-10-19T00:00:00.000/2017-10-22T00:00:00.000"),
|
||||||
|
"2017-10-15T20:19:12.565Z",
|
||||||
|
ImmutableMap.of(
|
||||||
|
"type", "s3_zip",
|
||||||
|
"bucket", "test",
|
||||||
|
"key", "wikipedia2/index/y=2017/m=10/d=15/2017-10-16T20:19:12.565Z/0/index.zip"
|
||||||
|
),
|
||||||
|
ImmutableList.of("dim1", "dim2", "dim3"),
|
||||||
|
ImmutableList.of("count", "value"),
|
||||||
|
NoneShardSpec.instance(),
|
||||||
|
0,
|
||||||
|
1234L
|
||||||
|
);
|
||||||
|
|
||||||
|
publisher.publishSegment(newSegment1);
|
||||||
|
publisher.publishSegment(newSegment2);
|
||||||
|
publisher.publishSegment(newSegment3);
|
||||||
|
final Interval theInterval = Intervals.of("2017-10-16T00:00:00.000/2017-10-20T00:00:00.000");
|
||||||
|
|
||||||
|
// 1 out of 3 segments match the interval, other 2 overlap, only the segment fully contained will be disabled
|
||||||
|
Assert.assertEquals(1, manager.disableSegments(datasource, theInterval));
|
||||||
|
|
||||||
|
manager.poll();
|
||||||
|
Assert.assertEquals(
|
||||||
|
ImmutableSet.of(segment1, segment2, newSegment1, newSegment3),
|
||||||
|
ImmutableSet.copyOf(manager.iterateAllSegments())
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testStopAndStart()
|
public void testStopAndStart()
|
||||||
{
|
{
|
||||||
|
|
|
@ -31,6 +31,7 @@ import org.apache.druid.client.SegmentLoadInfo;
|
||||||
import org.apache.druid.client.indexing.IndexingServiceClient;
|
import org.apache.druid.client.indexing.IndexingServiceClient;
|
||||||
import org.apache.druid.java.util.common.Intervals;
|
import org.apache.druid.java.util.common.Intervals;
|
||||||
import org.apache.druid.metadata.MetadataRuleManager;
|
import org.apache.druid.metadata.MetadataRuleManager;
|
||||||
|
import org.apache.druid.metadata.MetadataSegmentManager;
|
||||||
import org.apache.druid.query.SegmentDescriptor;
|
import org.apache.druid.query.SegmentDescriptor;
|
||||||
import org.apache.druid.query.TableDataSource;
|
import org.apache.druid.query.TableDataSource;
|
||||||
import org.apache.druid.server.coordination.DruidServerMetadata;
|
import org.apache.druid.server.coordination.DruidServerMetadata;
|
||||||
|
@ -801,6 +802,273 @@ public class DataSourcesResourceTest
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMarkDatasourceUnusedWithSegments()
|
||||||
|
{
|
||||||
|
final DruidDataSource dataSource1 = new DruidDataSource("datasource1", new HashMap<>());
|
||||||
|
final Set<String> segmentIds = dataSegmentList.stream()
|
||||||
|
.map(ds -> ds.getId().toString())
|
||||||
|
.collect(Collectors.toSet());
|
||||||
|
final MetadataSegmentManager segmentManager = EasyMock.createMock(MetadataSegmentManager.class);
|
||||||
|
|
||||||
|
EasyMock.expect(inventoryView.getInventory()).andReturn(ImmutableList.of(server)).once();
|
||||||
|
EasyMock.expect(server.getDataSource("datasource1")).andReturn(dataSource1).once();
|
||||||
|
EasyMock.expect(segmentManager.disableSegments("datasource1", segmentIds)).andReturn(1L).once();
|
||||||
|
EasyMock.replay(segmentManager, inventoryView, server);
|
||||||
|
|
||||||
|
final DataSourcesResource.MarkDatasourceSegmentsPayload payload = new DataSourcesResource.MarkDatasourceSegmentsPayload(
|
||||||
|
null,
|
||||||
|
segmentIds
|
||||||
|
);
|
||||||
|
|
||||||
|
DataSourcesResource DataSourcesResource = new DataSourcesResource(
|
||||||
|
inventoryView,
|
||||||
|
segmentManager,
|
||||||
|
null,
|
||||||
|
null,
|
||||||
|
new AuthConfig(),
|
||||||
|
null
|
||||||
|
);
|
||||||
|
Response response = DataSourcesResource.markDatasourceUnused("datasource1", payload);
|
||||||
|
Assert.assertEquals(200, response.getStatus());
|
||||||
|
Assert.assertEquals(null, response.getEntity());
|
||||||
|
EasyMock.verify(segmentManager, inventoryView, server);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMarkDatasourceUnusedWithSegmentsNoContent()
|
||||||
|
{
|
||||||
|
final DruidDataSource dataSource1 = new DruidDataSource("datasource1", new HashMap<>());
|
||||||
|
final Set<String> segmentIds = dataSegmentList.stream()
|
||||||
|
.map(ds -> ds.getId().toString())
|
||||||
|
.collect(Collectors.toSet());
|
||||||
|
final MetadataSegmentManager segmentManager = EasyMock.createMock(MetadataSegmentManager.class);
|
||||||
|
|
||||||
|
EasyMock.expect(inventoryView.getInventory()).andReturn(ImmutableList.of(server)).once();
|
||||||
|
EasyMock.expect(server.getDataSource("datasource1")).andReturn(dataSource1).once();
|
||||||
|
EasyMock.expect(segmentManager.disableSegments("datasource1", segmentIds)).andReturn(0L).once();
|
||||||
|
EasyMock.replay(segmentManager, inventoryView, server);
|
||||||
|
|
||||||
|
final DataSourcesResource.MarkDatasourceSegmentsPayload payload = new DataSourcesResource.MarkDatasourceSegmentsPayload(
|
||||||
|
null,
|
||||||
|
segmentIds
|
||||||
|
);
|
||||||
|
|
||||||
|
DataSourcesResource DataSourcesResource = new DataSourcesResource(
|
||||||
|
inventoryView,
|
||||||
|
segmentManager,
|
||||||
|
null,
|
||||||
|
null,
|
||||||
|
new AuthConfig(),
|
||||||
|
null
|
||||||
|
);
|
||||||
|
Response response = DataSourcesResource.markDatasourceUnused("datasource1", payload);
|
||||||
|
Assert.assertEquals(204, response.getStatus());
|
||||||
|
Assert.assertEquals(null, response.getEntity());
|
||||||
|
EasyMock.verify(segmentManager, inventoryView, server);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMarkDatasourceUnusedWithSegmentsException()
|
||||||
|
{
|
||||||
|
final DruidDataSource dataSource1 = new DruidDataSource("datasource1", new HashMap<>());
|
||||||
|
final Set<String> segmentIds = dataSegmentList.stream()
|
||||||
|
.map(ds -> ds.getId().toString())
|
||||||
|
.collect(Collectors.toSet());
|
||||||
|
final MetadataSegmentManager segmentManager = EasyMock.createMock(MetadataSegmentManager.class);
|
||||||
|
|
||||||
|
EasyMock.expect(inventoryView.getInventory()).andReturn(ImmutableList.of(server)).once();
|
||||||
|
EasyMock.expect(server.getDataSource("datasource1")).andReturn(dataSource1).once();
|
||||||
|
EasyMock.expect(segmentManager.disableSegments("datasource1", segmentIds))
|
||||||
|
.andThrow(new RuntimeException("Exception occurred"))
|
||||||
|
.once();
|
||||||
|
EasyMock.replay(segmentManager, inventoryView, server);
|
||||||
|
|
||||||
|
final DataSourcesResource.MarkDatasourceSegmentsPayload payload = new DataSourcesResource.MarkDatasourceSegmentsPayload(
|
||||||
|
null,
|
||||||
|
segmentIds
|
||||||
|
);
|
||||||
|
|
||||||
|
DataSourcesResource DataSourcesResource = new DataSourcesResource(
|
||||||
|
inventoryView,
|
||||||
|
segmentManager,
|
||||||
|
null,
|
||||||
|
null,
|
||||||
|
new AuthConfig(),
|
||||||
|
null
|
||||||
|
);
|
||||||
|
Response response = DataSourcesResource.markDatasourceUnused("datasource1", payload);
|
||||||
|
Assert.assertEquals(500, response.getStatus());
|
||||||
|
Assert.assertNotNull(response.getEntity());
|
||||||
|
EasyMock.verify(segmentManager, inventoryView, server);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMarkDatasourceUnusedWithInterval()
|
||||||
|
{
|
||||||
|
final Interval theInterval = Intervals.of("2010-01-01/P1D");
|
||||||
|
final DruidDataSource dataSource1 = new DruidDataSource("datasource1", new HashMap<>());
|
||||||
|
final MetadataSegmentManager segmentManager = EasyMock.createMock(MetadataSegmentManager.class);
|
||||||
|
|
||||||
|
EasyMock.expect(inventoryView.getInventory()).andReturn(ImmutableList.of(server)).once();
|
||||||
|
EasyMock.expect(server.getDataSource("datasource1")).andReturn(dataSource1).once();
|
||||||
|
EasyMock.expect(segmentManager.disableSegments("datasource1", theInterval)).andReturn(1).once();
|
||||||
|
EasyMock.replay(segmentManager, inventoryView, server);
|
||||||
|
|
||||||
|
final DataSourcesResource.MarkDatasourceSegmentsPayload payload = new DataSourcesResource.MarkDatasourceSegmentsPayload(
|
||||||
|
theInterval,
|
||||||
|
null
|
||||||
|
);
|
||||||
|
|
||||||
|
DataSourcesResource DataSourcesResource = new DataSourcesResource(
|
||||||
|
inventoryView,
|
||||||
|
segmentManager,
|
||||||
|
null,
|
||||||
|
null,
|
||||||
|
new AuthConfig(),
|
||||||
|
null
|
||||||
|
);
|
||||||
|
Response response = DataSourcesResource.markDatasourceUnused("datasource1", payload);
|
||||||
|
Assert.assertEquals(200, response.getStatus());
|
||||||
|
Assert.assertEquals(null, response.getEntity());
|
||||||
|
EasyMock.verify(segmentManager, inventoryView, server);
|
||||||
|
EasyMock.verify(segmentManager, inventoryView, server);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMarkDatasourceUnusedWithIntervaNoContent()
|
||||||
|
{
|
||||||
|
final Interval theInterval = Intervals.of("2010-01-01/P1D");
|
||||||
|
final DruidDataSource dataSource1 = new DruidDataSource("datasource1", new HashMap<>());
|
||||||
|
final MetadataSegmentManager segmentManager = EasyMock.createMock(MetadataSegmentManager.class);
|
||||||
|
|
||||||
|
EasyMock.expect(inventoryView.getInventory()).andReturn(ImmutableList.of(server)).once();
|
||||||
|
EasyMock.expect(server.getDataSource("datasource1")).andReturn(dataSource1).once();
|
||||||
|
EasyMock.expect(segmentManager.disableSegments("datasource1", theInterval)).andReturn(0).once();
|
||||||
|
EasyMock.replay(segmentManager, inventoryView, server);
|
||||||
|
|
||||||
|
final DataSourcesResource.MarkDatasourceSegmentsPayload payload = new DataSourcesResource.MarkDatasourceSegmentsPayload(
|
||||||
|
theInterval,
|
||||||
|
null
|
||||||
|
);
|
||||||
|
|
||||||
|
DataSourcesResource DataSourcesResource = new DataSourcesResource(
|
||||||
|
inventoryView,
|
||||||
|
segmentManager,
|
||||||
|
null,
|
||||||
|
null,
|
||||||
|
new AuthConfig(),
|
||||||
|
null
|
||||||
|
);
|
||||||
|
Response response = DataSourcesResource.markDatasourceUnused("datasource1", payload);
|
||||||
|
Assert.assertEquals(204, response.getStatus());
|
||||||
|
Assert.assertEquals(null, response.getEntity());
|
||||||
|
EasyMock.verify(segmentManager, inventoryView, server);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMarkDatasourceUnusedWithIntervaException()
|
||||||
|
{
|
||||||
|
final Interval theInterval = Intervals.of("2010-01-01/P1D");
|
||||||
|
final DruidDataSource dataSource1 = new DruidDataSource("datasource1", new HashMap<>());
|
||||||
|
final MetadataSegmentManager segmentManager = EasyMock.createMock(MetadataSegmentManager.class);
|
||||||
|
|
||||||
|
EasyMock.expect(inventoryView.getInventory()).andReturn(ImmutableList.of(server)).once();
|
||||||
|
EasyMock.expect(server.getDataSource("datasource1")).andReturn(dataSource1).once();
|
||||||
|
EasyMock.expect(segmentManager.disableSegments("datasource1", theInterval))
|
||||||
|
.andThrow(new RuntimeException("Exception occurred"))
|
||||||
|
.once();
|
||||||
|
EasyMock.replay(segmentManager, inventoryView, server);
|
||||||
|
|
||||||
|
final DataSourcesResource.MarkDatasourceSegmentsPayload payload = new DataSourcesResource.MarkDatasourceSegmentsPayload(
|
||||||
|
theInterval,
|
||||||
|
null
|
||||||
|
);
|
||||||
|
|
||||||
|
DataSourcesResource DataSourcesResource = new DataSourcesResource(
|
||||||
|
inventoryView,
|
||||||
|
segmentManager,
|
||||||
|
null,
|
||||||
|
null,
|
||||||
|
new AuthConfig(),
|
||||||
|
null
|
||||||
|
);
|
||||||
|
Response response = DataSourcesResource.markDatasourceUnused("datasource1", payload);
|
||||||
|
Assert.assertEquals(500, response.getStatus());
|
||||||
|
Assert.assertNotNull(response.getEntity());
|
||||||
|
EasyMock.verify(segmentManager, inventoryView, server);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMarkDatasourceUnusedNullPayload()
|
||||||
|
{
|
||||||
|
final MetadataSegmentManager segmentManager = EasyMock.createMock(MetadataSegmentManager.class);
|
||||||
|
DataSourcesResource DataSourcesResource = new DataSourcesResource(
|
||||||
|
inventoryView,
|
||||||
|
segmentManager,
|
||||||
|
null,
|
||||||
|
null,
|
||||||
|
null,
|
||||||
|
null
|
||||||
|
);
|
||||||
|
|
||||||
|
final DataSourcesResource.MarkDatasourceSegmentsPayload payload = null;
|
||||||
|
|
||||||
|
Response response = DataSourcesResource.markDatasourceUnused("datasource1", payload);
|
||||||
|
Assert.assertEquals(400, response.getStatus());
|
||||||
|
Assert.assertNotNull(response.getEntity());
|
||||||
|
Assert.assertEquals(
|
||||||
|
"Invalid request payload, either interval or segmentIds array must be specified",
|
||||||
|
response.getEntity()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMarkDatasourceUnusedInvalidPayload()
|
||||||
|
{
|
||||||
|
final MetadataSegmentManager segmentManager = EasyMock.createMock(MetadataSegmentManager.class);
|
||||||
|
DataSourcesResource DataSourcesResource = new DataSourcesResource(
|
||||||
|
inventoryView,
|
||||||
|
segmentManager,
|
||||||
|
null,
|
||||||
|
null,
|
||||||
|
null,
|
||||||
|
null
|
||||||
|
);
|
||||||
|
|
||||||
|
final DataSourcesResource.MarkDatasourceSegmentsPayload payload = new DataSourcesResource.MarkDatasourceSegmentsPayload(
|
||||||
|
null,
|
||||||
|
null
|
||||||
|
);
|
||||||
|
|
||||||
|
Response response = DataSourcesResource.markDatasourceUnused("datasource1", payload);
|
||||||
|
Assert.assertEquals(400, response.getStatus());
|
||||||
|
Assert.assertNotNull(response.getEntity());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMarkDatasourceUnusedInvalidPayloadBothArguments()
|
||||||
|
{
|
||||||
|
final MetadataSegmentManager segmentManager = EasyMock.createMock(MetadataSegmentManager.class);
|
||||||
|
DataSourcesResource DataSourcesResource = new DataSourcesResource(
|
||||||
|
inventoryView,
|
||||||
|
segmentManager,
|
||||||
|
null,
|
||||||
|
null,
|
||||||
|
null,
|
||||||
|
null
|
||||||
|
);
|
||||||
|
|
||||||
|
final DataSourcesResource.MarkDatasourceSegmentsPayload payload = new DataSourcesResource.MarkDatasourceSegmentsPayload(
|
||||||
|
Intervals.of("2010-01-01/P1D"),
|
||||||
|
ImmutableSet.of()
|
||||||
|
);
|
||||||
|
|
||||||
|
Response response = DataSourcesResource.markDatasourceUnused("datasource1", payload);
|
||||||
|
Assert.assertEquals(400, response.getStatus());
|
||||||
|
Assert.assertNotNull(response.getEntity());
|
||||||
|
}
|
||||||
|
|
||||||
private DruidServerMetadata createRealtimeServerMetadata(String name)
|
private DruidServerMetadata createRealtimeServerMetadata(String name)
|
||||||
{
|
{
|
||||||
return createServerMetadata(name, ServerType.REALTIME);
|
return createServerMetadata(name, ServerType.REALTIME);
|
||||||
|
|
Loading…
Reference in New Issue