Handoff should ignore segments that are dropped by drop rules (#6676)

* Handoff should ignore segments that are dropped by drop rules

* fix travis-ci

* fix tests

* address comments

* remove line added by accident

* address comments

* add javadoc and logging the full stack trace of exception

* add error message
This commit is contained in:
Mingming Qiu 2019-01-08 06:43:11 +08:00 committed by Jihoon Son
parent b88e6304c4
commit 8ebb7b558b
5 changed files with 388 additions and 280 deletions

View File

@ -23,16 +23,13 @@ import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Throwables;
import com.google.inject.Inject;
import org.apache.druid.client.ImmutableSegmentLoadInfo;
import org.apache.druid.discovery.DruidLeaderClient;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.http.client.response.FullResponseHolder;
import org.apache.druid.query.SegmentDescriptor;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.joda.time.Interval;
import java.util.List;
public class CoordinatorClient
{
@ -49,18 +46,20 @@ public class CoordinatorClient
this.druidLeaderClient = druidLeaderClient;
}
public List<ImmutableSegmentLoadInfo> fetchServerView(String dataSource, Interval interval, boolean incompleteOk)
public boolean isHandOffComplete(String dataSource, SegmentDescriptor descriptor)
{
try {
FullResponseHolder response = druidLeaderClient.go(
druidLeaderClient.makeRequest(HttpMethod.GET,
StringUtils.format(
"/druid/coordinator/v1/datasources/%s/intervals/%s/serverview?partial=%s",
dataSource,
interval.toString().replace('/', '_'),
incompleteOk
))
druidLeaderClient.makeRequest(
HttpMethod.GET,
StringUtils.format(
"/druid/coordinator/v1/datasources/%s/handoffComplete?interval=%s&partitionNumber=%d&version=%s",
dataSource,
descriptor.getInterval(),
descriptor.getPartitionNumber(),
descriptor.getVersion()
)
)
);
if (!response.getStatus().equals(HttpResponseStatus.OK)) {
@ -70,12 +69,9 @@ public class CoordinatorClient
response.getContent()
);
}
return jsonMapper.readValue(
response.getContent(), new TypeReference<List<ImmutableSegmentLoadInfo>>()
{
}
);
return jsonMapper.readValue(response.getContent(), new TypeReference<Boolean>()
{
});
}
catch (Exception e) {
throw Throwables.propagate(e);

View File

@ -19,18 +19,13 @@
package org.apache.druid.segment.realtime.plumber;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
import org.apache.druid.client.ImmutableSegmentLoadInfo;
import org.apache.druid.client.coordinator.CoordinatorClient;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.server.coordination.DruidServerMetadata;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@ -95,13 +90,7 @@ public class CoordinatorBasedSegmentHandoffNotifier implements SegmentHandoffNot
Map.Entry<SegmentDescriptor, Pair<Executor, Runnable>> entry = itr.next();
SegmentDescriptor descriptor = entry.getKey();
try {
List<ImmutableSegmentLoadInfo> loadedSegments = coordinatorClient.fetchServerView(
dataSource,
descriptor.getInterval(),
true
);
if (isHandOffComplete(loadedSegments, entry.getKey())) {
if (coordinatorClient.isHandOffComplete(dataSource, descriptor)) {
log.info("Segment Handoff complete for dataSource[%s] Segment[%s]", dataSource, descriptor);
entry.getValue().lhs.execute(entry.getValue().rhs);
itr.remove();
@ -131,30 +120,6 @@ public class CoordinatorBasedSegmentHandoffNotifier implements SegmentHandoffNot
}
}
static boolean isHandOffComplete(List<ImmutableSegmentLoadInfo> serverView, SegmentDescriptor descriptor)
{
for (ImmutableSegmentLoadInfo segmentLoadInfo : serverView) {
if (segmentLoadInfo.getSegment().getInterval().contains(descriptor.getInterval())
&& segmentLoadInfo.getSegment().getShardSpec().getPartitionNum()
== descriptor.getPartitionNumber()
&& segmentLoadInfo.getSegment().getVersion().compareTo(descriptor.getVersion()) >= 0
&& Iterables.any(
segmentLoadInfo.getServers(), new Predicate<DruidServerMetadata>()
{
@Override
public boolean apply(DruidServerMetadata input)
{
return input.segmentReplicatable();
}
}
)) {
return true;
}
}
return false;
}
@Override
public void close()
{

View File

@ -37,8 +37,13 @@ import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.guava.Comparators;
import org.apache.druid.java.util.common.guava.FunctionalIterable;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.metadata.MetadataRuleManager;
import org.apache.druid.metadata.MetadataSegmentManager;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.server.coordination.DruidServerMetadata;
import org.apache.druid.server.coordinator.rules.LoadRule;
import org.apache.druid.server.coordinator.rules.Rule;
import org.apache.druid.server.http.security.DatasourceResourceFilter;
import org.apache.druid.server.security.AuthConfig;
import org.apache.druid.server.security.AuthorizerMapper;
@ -85,6 +90,7 @@ public class DatasourcesResource
private final CoordinatorServerView serverInventoryView;
private final MetadataSegmentManager databaseSegmentManager;
private final MetadataRuleManager databaseRuleManager;
private final IndexingServiceClient indexingServiceClient;
private final AuthConfig authConfig;
private final AuthorizerMapper authorizerMapper;
@ -93,6 +99,7 @@ public class DatasourcesResource
public DatasourcesResource(
CoordinatorServerView serverInventoryView,
MetadataSegmentManager databaseSegmentManager,
MetadataRuleManager databaseRuleManager,
@Nullable IndexingServiceClient indexingServiceClient,
AuthConfig authConfig,
AuthorizerMapper authorizerMapper
@ -100,6 +107,7 @@ public class DatasourcesResource
{
this.serverInventoryView = serverInventoryView;
this.databaseSegmentManager = databaseSegmentManager;
this.databaseRuleManager = databaseRuleManager;
this.indexingServiceClient = indexingServiceClient;
this.authConfig = authConfig;
this.authorizerMapper = authorizerMapper;
@ -647,4 +655,85 @@ public class DatasourcesResource
);
return Response.ok(retval).build();
}
/**
* Used by the realtime tasks to learn whether a segment is handed off or not.
* It returns true when the segment will never be handed off or is already handed off. Otherwise, it returns false.
*/
@GET
@Path("/{dataSourceName}/handoffComplete")
@Produces(MediaType.APPLICATION_JSON)
@ResourceFilters(DatasourceResourceFilter.class)
public Response isHandOffComplete(
@PathParam("dataSourceName") String dataSourceName,
@QueryParam("interval") final String interval,
@QueryParam("partitionNumber") final int partitionNumber,
@QueryParam("version") final String version
)
{
try {
final List<Rule> rules = databaseRuleManager.getRulesWithDefault(dataSourceName);
final Interval theInterval = Intervals.of(interval);
final SegmentDescriptor descriptor = new SegmentDescriptor(theInterval, version, partitionNumber);
final DateTime now = DateTimes.nowUtc();
// dropped means a segment will never be handed off, i.e it completed hand off
// init to true, reset to false only if this segment can be loaded by rules
boolean dropped = true;
for (Rule rule : rules) {
if (rule.appliesTo(theInterval, now)) {
if (rule instanceof LoadRule) {
dropped = false;
}
break;
}
}
if (dropped) {
return Response.ok(true).build();
}
TimelineLookup<String, SegmentLoadInfo> timeline = serverInventoryView.getTimeline(
new TableDataSource(dataSourceName)
);
if (timeline == null) {
log.debug("No timeline found for datasource[%s]", dataSourceName);
return Response.ok(false).build();
}
Iterable<TimelineObjectHolder<String, SegmentLoadInfo>> lookup = timeline.lookupWithIncompletePartitions(
theInterval);
FunctionalIterable<ImmutableSegmentLoadInfo> loadInfoIterable = FunctionalIterable
.create(lookup).transformCat(
(TimelineObjectHolder<String, SegmentLoadInfo> input) ->
Iterables.transform(
input.getObject(),
(PartitionChunk<SegmentLoadInfo> chunk) ->
chunk.getObject().toImmutableSegmentLoadInfo()
)
);
if (isSegmentLoaded(loadInfoIterable, descriptor)) {
return Response.ok(true).build();
}
return Response.ok(false).build();
}
catch (Exception e) {
log.error(e, "Error while handling hand off check request");
return Response.serverError().entity(ImmutableMap.of("error", e.toString())).build();
}
}
static boolean isSegmentLoaded(Iterable<ImmutableSegmentLoadInfo> serverView, SegmentDescriptor descriptor)
{
for (ImmutableSegmentLoadInfo segmentLoadInfo : serverView) {
if (segmentLoadInfo.getSegment().getInterval().contains(descriptor.getInterval())
&& segmentLoadInfo.getSegment().getShardSpec().getPartitionNum() == descriptor.getPartitionNumber()
&& segmentLoadInfo.getSegment().getVersion().compareTo(descriptor.getVersion()) >= 0
&& Iterables.any(
segmentLoadInfo.getServers(), DruidServerMetadata::segmentReplicatable
)) {
return true;
}
}
return false;
}
}

View File

@ -19,23 +19,16 @@
package org.apache.druid.segment.realtime.plumber;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.MoreExecutors;
import junit.framework.Assert;
import org.apache.druid.client.ImmutableSegmentLoadInfo;
import org.apache.druid.client.coordinator.CoordinatorClient;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.server.coordination.DruidServerMetadata;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.easymock.EasyMock;
import org.joda.time.Duration;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Test;
import java.util.Collections;
import java.util.concurrent.atomic.AtomicBoolean;
public class CoordinatorBasedSegmentHandoffNotifierTest
@ -55,27 +48,10 @@ public class CoordinatorBasedSegmentHandoffNotifierTest
{
Interval interval = Intervals.of("2011-04-01/2011-04-02");
SegmentDescriptor descriptor = new SegmentDescriptor(interval, "v1", 2);
DataSegment segment = new DataSegment(
"test_ds",
interval,
"v1",
null,
null,
null,
new NumberedShardSpec(2, 3),
0, 0
);
CoordinatorClient coordinatorClient = EasyMock.createMock(CoordinatorClient.class);
EasyMock.expect(coordinatorClient.fetchServerView("test_ds", interval, true))
.andReturn(
Collections.singletonList(
new ImmutableSegmentLoadInfo(
segment,
Sets.newHashSet(createRealtimeServerMetadata("a1"))
)
)
)
EasyMock.expect(coordinatorClient.isHandOffComplete("test_ds", descriptor))
.andReturn(false)
.anyTimes();
EasyMock.replay(coordinatorClient);
CoordinatorBasedSegmentHandoffNotifier notifier = new CoordinatorBasedSegmentHandoffNotifier(
@ -102,27 +78,11 @@ public class CoordinatorBasedSegmentHandoffNotifierTest
{
Interval interval = Intervals.of("2011-04-01/2011-04-02");
SegmentDescriptor descriptor = new SegmentDescriptor(interval, "v1", 2);
DataSegment segment = new DataSegment(
"test_ds",
interval,
"v1",
null,
null,
null,
new NumberedShardSpec(2, 3),
0, 0
);
final AtomicBoolean callbackCalled = new AtomicBoolean(false);
CoordinatorClient coordinatorClient = EasyMock.createMock(CoordinatorClient.class);
EasyMock.expect(coordinatorClient.fetchServerView("test_ds", interval, true))
.andReturn(
Collections.singletonList(
new ImmutableSegmentLoadInfo(
segment,
Sets.newHashSet(createHistoricalServerMetadata("a1"))
)
)
)
EasyMock.expect(coordinatorClient.isHandOffComplete("test_ds", descriptor))
.andReturn(true)
.anyTimes();
EasyMock.replay(coordinatorClient);
CoordinatorBasedSegmentHandoffNotifier notifier = new CoordinatorBasedSegmentHandoffNotifier(
@ -144,177 +104,4 @@ public class CoordinatorBasedSegmentHandoffNotifierTest
Assert.assertTrue(callbackCalled.get());
EasyMock.verify(coordinatorClient);
}
@Test
public void testHandoffChecksForVersion()
{
Interval interval = Intervals.of(
"2011-04-01/2011-04-02"
);
Assert.assertFalse(
CoordinatorBasedSegmentHandoffNotifier.isHandOffComplete(
Collections.singletonList(
new ImmutableSegmentLoadInfo(
createSegment(interval, "v1", 2),
Sets.newHashSet(createHistoricalServerMetadata("a"))
)
),
new SegmentDescriptor(interval, "v2", 2)
)
);
Assert.assertTrue(
CoordinatorBasedSegmentHandoffNotifier.isHandOffComplete(
Collections.singletonList(
new ImmutableSegmentLoadInfo(
createSegment(interval, "v2", 2),
Sets.newHashSet(createHistoricalServerMetadata("a"))
)
),
new SegmentDescriptor(interval, "v1", 2)
)
);
Assert.assertTrue(
CoordinatorBasedSegmentHandoffNotifier.isHandOffComplete(
Collections.singletonList(
new ImmutableSegmentLoadInfo(
createSegment(interval, "v1", 2),
Sets.newHashSet(createHistoricalServerMetadata("a"))
)
),
new SegmentDescriptor(interval, "v1", 2)
)
);
}
@Test
public void testHandoffChecksForAssignableServer()
{
Interval interval = Intervals.of(
"2011-04-01/2011-04-02"
);
Assert.assertTrue(
CoordinatorBasedSegmentHandoffNotifier.isHandOffComplete(
Collections.singletonList(
new ImmutableSegmentLoadInfo(
createSegment(interval, "v1", 2),
Sets.newHashSet(createHistoricalServerMetadata("a"))
)
),
new SegmentDescriptor(interval, "v1", 2)
)
);
Assert.assertFalse(
CoordinatorBasedSegmentHandoffNotifier.isHandOffComplete(
Collections.singletonList(
new ImmutableSegmentLoadInfo(
createSegment(interval, "v1", 2),
Sets.newHashSet(createRealtimeServerMetadata("a"))
)
),
new SegmentDescriptor(interval, "v1", 2)
)
);
}
@Test
public void testHandoffChecksForPartitionNumber()
{
Interval interval = Intervals.of(
"2011-04-01/2011-04-02"
);
Assert.assertTrue(
CoordinatorBasedSegmentHandoffNotifier.isHandOffComplete(
Collections.singletonList(
new ImmutableSegmentLoadInfo(
createSegment(interval, "v1", 1),
Sets.newHashSet(createHistoricalServerMetadata("a"))
)
),
new SegmentDescriptor(interval, "v1", 1)
)
);
Assert.assertFalse(
CoordinatorBasedSegmentHandoffNotifier.isHandOffComplete(
Collections.singletonList(
new ImmutableSegmentLoadInfo(
createSegment(interval, "v1", 1),
Sets.newHashSet(createHistoricalServerMetadata("a"))
)
),
new SegmentDescriptor(interval, "v1", 2)
)
);
}
@Test
public void testHandoffChecksForInterval()
{
Assert.assertFalse(
CoordinatorBasedSegmentHandoffNotifier.isHandOffComplete(
Collections.singletonList(
new ImmutableSegmentLoadInfo(
createSegment(Intervals.of("2011-04-01/2011-04-02"), "v1", 1),
Sets.newHashSet(createHistoricalServerMetadata("a"))
)
),
new SegmentDescriptor(Intervals.of("2011-04-01/2011-04-03"), "v1", 1)
)
);
Assert.assertTrue(
CoordinatorBasedSegmentHandoffNotifier.isHandOffComplete(
Collections.singletonList(
new ImmutableSegmentLoadInfo(
createSegment(Intervals.of("2011-04-01/2011-04-04"), "v1", 1),
Sets.newHashSet(createHistoricalServerMetadata("a"))
)
),
new SegmentDescriptor(Intervals.of("2011-04-02/2011-04-03"), "v1", 1)
)
);
}
private DruidServerMetadata createRealtimeServerMetadata(String name)
{
return createServerMetadata(name, ServerType.REALTIME);
}
private DruidServerMetadata createHistoricalServerMetadata(String name)
{
return createServerMetadata(name, ServerType.HISTORICAL);
}
private DruidServerMetadata createServerMetadata(String name, ServerType type)
{
return new DruidServerMetadata(
name,
name,
null,
10000,
type,
"tier",
1
);
}
private DataSegment createSegment(Interval interval, String version, int partitionNumber)
{
return new DataSegment(
"test_ds",
interval,
version,
null,
null,
null,
new NumberedShardSpec(partitionNumber, 100),
0, 0
);
}
}

View File

@ -21,13 +21,23 @@ package org.apache.druid.server.http;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import org.apache.druid.client.CoordinatorServerView;
import org.apache.druid.client.DruidDataSource;
import org.apache.druid.client.DruidServer;
import org.apache.druid.client.ImmutableDruidDataSource;
import org.apache.druid.client.ImmutableSegmentLoadInfo;
import org.apache.druid.client.SegmentLoadInfo;
import org.apache.druid.client.indexing.IndexingServiceClient;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.metadata.MetadataRuleManager;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.server.coordination.DruidServerMetadata;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.coordinator.rules.IntervalDropRule;
import org.apache.druid.server.coordinator.rules.IntervalLoadRule;
import org.apache.druid.server.coordinator.rules.Rule;
import org.apache.druid.server.security.Access;
import org.apache.druid.server.security.Action;
import org.apache.druid.server.security.AuthConfig;
@ -37,6 +47,11 @@ import org.apache.druid.server.security.Authorizer;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.server.security.Resource;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.TimelineObjectHolder;
import org.apache.druid.timeline.VersionedIntervalTimeline;
import org.apache.druid.timeline.partition.NumberedPartitionChunk;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.apache.druid.timeline.partition.PartitionHolder;
import org.easymock.EasyMock;
import org.joda.time.Interval;
import org.junit.Assert;
@ -46,6 +61,7 @@ import org.junit.Test;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.core.Response;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@ -156,6 +172,7 @@ public class DatasourcesResourceTest
inventoryView,
null,
null,
null,
new AuthConfig(),
AuthTestUtils.TEST_AUTHORIZER_MAPPER
);
@ -240,6 +257,7 @@ public class DatasourcesResourceTest
inventoryView,
null,
null,
null,
new AuthConfig(),
authMapper
);
@ -294,6 +312,7 @@ public class DatasourcesResourceTest
inventoryView,
null,
null,
null,
new AuthConfig(),
AuthTestUtils.TEST_AUTHORIZER_MAPPER
);
@ -323,7 +342,7 @@ public class DatasourcesResourceTest
).atLeastOnce();
EasyMock.replay(inventoryView, server);
DatasourcesResource datasourcesResource = new DatasourcesResource(inventoryView, null, null, new AuthConfig(), null);
DatasourcesResource datasourcesResource = new DatasourcesResource(inventoryView, null, null, null, new AuthConfig(), null);
Response response = datasourcesResource.getTheDataSource("datasource1", "full");
ImmutableDruidDataSource result = (ImmutableDruidDataSource) response.getEntity();
Assert.assertEquals(200, response.getStatus());
@ -340,7 +359,7 @@ public class DatasourcesResourceTest
).atLeastOnce();
EasyMock.replay(inventoryView, server);
DatasourcesResource datasourcesResource = new DatasourcesResource(inventoryView, null, null, new AuthConfig(), null);
DatasourcesResource datasourcesResource = new DatasourcesResource(inventoryView, null, null, null, new AuthConfig(), null);
Assert.assertEquals(204, datasourcesResource.getTheDataSource("none", null).getStatus());
EasyMock.verify(inventoryView, server);
}
@ -361,7 +380,7 @@ public class DatasourcesResourceTest
).atLeastOnce();
EasyMock.replay(inventoryView, server);
DatasourcesResource datasourcesResource = new DatasourcesResource(inventoryView, null, null, new AuthConfig(), null);
DatasourcesResource datasourcesResource = new DatasourcesResource(inventoryView, null, null, null, new AuthConfig(), null);
Response response = datasourcesResource.getTheDataSource("datasource1", null);
Assert.assertEquals(200, response.getStatus());
Map<String, Map<String, Object>> result = (Map<String, Map<String, Object>>) response.getEntity();
@ -400,7 +419,7 @@ public class DatasourcesResourceTest
).atLeastOnce();
EasyMock.replay(inventoryView, server, server2, server3);
DatasourcesResource datasourcesResource = new DatasourcesResource(inventoryView, null, null, new AuthConfig(), null);
DatasourcesResource datasourcesResource = new DatasourcesResource(inventoryView, null, null, null, new AuthConfig(), null);
Response response = datasourcesResource.getTheDataSource("datasource1", null);
Assert.assertEquals(200, response.getStatus());
Map<String, Map<String, Object>> result = (Map<String, Map<String, Object>>) response.getEntity();
@ -431,7 +450,7 @@ public class DatasourcesResourceTest
List<Interval> expectedIntervals = new ArrayList<>();
expectedIntervals.add(Intervals.of("2010-01-22T00:00:00.000Z/2010-01-23T00:00:00.000Z"));
expectedIntervals.add(Intervals.of("2010-01-01T00:00:00.000Z/2010-01-02T00:00:00.000Z"));
DatasourcesResource datasourcesResource = new DatasourcesResource(inventoryView, null, null, new AuthConfig(), null);
DatasourcesResource datasourcesResource = new DatasourcesResource(inventoryView, null, null, null, new AuthConfig(), null);
Response response = datasourcesResource.getSegmentDataSourceIntervals("invalidDataSource", null, null);
Assert.assertEquals(response.getEntity(), null);
@ -478,7 +497,7 @@ public class DatasourcesResourceTest
).atLeastOnce();
EasyMock.replay(inventoryView);
DatasourcesResource datasourcesResource = new DatasourcesResource(inventoryView, null, null, new AuthConfig(), null);
DatasourcesResource datasourcesResource = new DatasourcesResource(inventoryView, null, null, null, new AuthConfig(), null);
Response response = datasourcesResource.getSegmentDataSourceSpecificInterval(
"invalidDataSource",
"2010-01-01/P1D",
@ -548,6 +567,7 @@ public class DatasourcesResourceTest
DatasourcesResource datasourcesResource = new DatasourcesResource(
inventoryView,
null,
null,
indexingServiceClient,
new AuthConfig(),
null
@ -567,6 +587,7 @@ public class DatasourcesResourceTest
DatasourcesResource datasourcesResource = new DatasourcesResource(
inventoryView,
null,
null,
indexingServiceClient,
new AuthConfig(),
null
@ -579,4 +600,254 @@ public class DatasourcesResourceTest
EasyMock.verify(indexingServiceClient, server);
}
@Test
public void testIsHandOffComplete()
{
MetadataRuleManager databaseRuleManager = EasyMock.createMock(MetadataRuleManager.class);
Rule loadRule = new IntervalLoadRule(Intervals.of("2013-01-02T00:00:00Z/2013-01-03T00:00:00Z"), null);
Rule dropRule = new IntervalDropRule(Intervals.of("2013-01-01T00:00:00Z/2013-01-02T00:00:00Z"));
DatasourcesResource datasourcesResource = new DatasourcesResource(
inventoryView,
null,
databaseRuleManager,
null,
new AuthConfig(),
null
);
// test dropped
EasyMock.expect(databaseRuleManager.getRulesWithDefault("dataSource1"))
.andReturn(ImmutableList.of(loadRule, dropRule))
.once();
EasyMock.replay(databaseRuleManager);
String interval1 = "2013-01-01T01:00:00Z/2013-01-01T02:00:00Z";
Response response1 = datasourcesResource.isHandOffComplete("dataSource1", interval1, 1, "v1");
Assert.assertTrue((boolean) response1.getEntity());
EasyMock.verify(databaseRuleManager);
// test isn't dropped and no timeline found
EasyMock.reset(databaseRuleManager);
EasyMock.expect(databaseRuleManager.getRulesWithDefault("dataSource1"))
.andReturn(ImmutableList.of(loadRule, dropRule))
.once();
EasyMock.expect(inventoryView.getTimeline(new TableDataSource("dataSource1")))
.andReturn(null)
.once();
EasyMock.replay(inventoryView, databaseRuleManager);
String interval2 = "2013-01-02T01:00:00Z/2013-01-02T02:00:00Z";
Response response2 = datasourcesResource.isHandOffComplete("dataSource1", interval2, 1, "v1");
Assert.assertFalse((boolean) response2.getEntity());
EasyMock.verify(inventoryView, databaseRuleManager);
// test isn't dropped and timeline exist
String interval3 = "2013-01-02T02:00:00Z/2013-01-02T03:00:00Z";
SegmentLoadInfo segmentLoadInfo = new SegmentLoadInfo(createSegment(Intervals.of(interval3), "v1", 1));
segmentLoadInfo.addServer(createHistoricalServerMetadata("test"));
VersionedIntervalTimeline<String, SegmentLoadInfo> timeline = new VersionedIntervalTimeline<String, SegmentLoadInfo>(
null)
{
@Override
public List<TimelineObjectHolder<String, SegmentLoadInfo>> lookupWithIncompletePartitions(Interval interval)
{
PartitionHolder<SegmentLoadInfo> partitionHolder = new PartitionHolder<>(new NumberedPartitionChunk<>(
1,
1,
segmentLoadInfo
));
List<TimelineObjectHolder<String, SegmentLoadInfo>> ret = new ArrayList<>();
ret.add(new TimelineObjectHolder<>(Intervals.of(interval3), "v1", partitionHolder));
return ret;
}
};
EasyMock.reset(inventoryView, databaseRuleManager);
EasyMock.expect(databaseRuleManager.getRulesWithDefault("dataSource1"))
.andReturn(ImmutableList.of(loadRule, dropRule))
.once();
EasyMock.expect(inventoryView.getTimeline(new TableDataSource("dataSource1")))
.andReturn(timeline)
.once();
EasyMock.replay(inventoryView, databaseRuleManager);
Response response3 = datasourcesResource.isHandOffComplete("dataSource1", interval3, 1, "v1");
Assert.assertTrue((boolean) response3.getEntity());
EasyMock.verify(inventoryView, databaseRuleManager);
}
@Test
public void testSegmentLoadChecksForVersion()
{
Interval interval = Intervals.of(
"2011-04-01/2011-04-02"
);
Assert.assertFalse(
DatasourcesResource.isSegmentLoaded(
Collections.singletonList(
new ImmutableSegmentLoadInfo(
createSegment(interval, "v1", 2),
Sets.newHashSet(createHistoricalServerMetadata("a"))
)
),
new SegmentDescriptor(interval, "v2", 2)
)
);
Assert.assertTrue(
DatasourcesResource.isSegmentLoaded(
Collections.singletonList(
new ImmutableSegmentLoadInfo(
createSegment(interval, "v2", 2),
Sets.newHashSet(createHistoricalServerMetadata("a"))
)
),
new SegmentDescriptor(interval, "v1", 2)
)
);
Assert.assertTrue(
DatasourcesResource.isSegmentLoaded(
Collections.singletonList(
new ImmutableSegmentLoadInfo(
createSegment(interval, "v1", 2),
Sets.newHashSet(createHistoricalServerMetadata("a"))
)
),
new SegmentDescriptor(interval, "v1", 2)
)
);
}
@Test
public void testSegmentLoadChecksForAssignableServer()
{
Interval interval = Intervals.of(
"2011-04-01/2011-04-02"
);
Assert.assertTrue(
DatasourcesResource.isSegmentLoaded(
Collections.singletonList(
new ImmutableSegmentLoadInfo(
createSegment(interval, "v1", 2),
Sets.newHashSet(createHistoricalServerMetadata("a"))
)
),
new SegmentDescriptor(interval, "v1", 2)
)
);
Assert.assertFalse(
DatasourcesResource.isSegmentLoaded(
Collections.singletonList(
new ImmutableSegmentLoadInfo(
createSegment(interval, "v1", 2),
Sets.newHashSet(createRealtimeServerMetadata("a"))
)
),
new SegmentDescriptor(interval, "v1", 2)
)
);
}
@Test
public void testSegmentLoadChecksForPartitionNumber()
{
Interval interval = Intervals.of(
"2011-04-01/2011-04-02"
);
Assert.assertTrue(
DatasourcesResource.isSegmentLoaded(
Collections.singletonList(
new ImmutableSegmentLoadInfo(
createSegment(interval, "v1", 1),
Sets.newHashSet(createHistoricalServerMetadata("a"))
)
),
new SegmentDescriptor(interval, "v1", 1)
)
);
Assert.assertFalse(
DatasourcesResource.isSegmentLoaded(
Collections.singletonList(
new ImmutableSegmentLoadInfo(
createSegment(interval, "v1", 1),
Sets.newHashSet(createHistoricalServerMetadata("a"))
)
),
new SegmentDescriptor(interval, "v1", 2)
)
);
}
@Test
public void testSegmentLoadChecksForInterval()
{
Assert.assertFalse(
DatasourcesResource.isSegmentLoaded(
Collections.singletonList(
new ImmutableSegmentLoadInfo(
createSegment(Intervals.of("2011-04-01/2011-04-02"), "v1", 1),
Sets.newHashSet(createHistoricalServerMetadata("a"))
)
),
new SegmentDescriptor(Intervals.of("2011-04-01/2011-04-03"), "v1", 1)
)
);
Assert.assertTrue(
DatasourcesResource.isSegmentLoaded(
Collections.singletonList(
new ImmutableSegmentLoadInfo(
createSegment(Intervals.of("2011-04-01/2011-04-04"), "v1", 1),
Sets.newHashSet(createHistoricalServerMetadata("a"))
)
),
new SegmentDescriptor(Intervals.of("2011-04-02/2011-04-03"), "v1", 1)
)
);
}
private DruidServerMetadata createRealtimeServerMetadata(String name)
{
return createServerMetadata(name, ServerType.REALTIME);
}
private DruidServerMetadata createHistoricalServerMetadata(String name)
{
return createServerMetadata(name, ServerType.HISTORICAL);
}
private DruidServerMetadata createServerMetadata(String name, ServerType type)
{
return new DruidServerMetadata(
name,
name,
null,
10000,
type,
"tier",
1
);
}
private DataSegment createSegment(Interval interval, String version, int partitionNumber)
{
return new DataSegment(
"test_ds",
interval,
version,
null,
null,
null,
new NumberedShardSpec(partitionNumber, 100),
0, 0
);
}
}