Add paramter to loadstatus API to compute underdeplication against cluster view (#11056)

* Add paramter to loadstatus API to compute underdeplication against cluster view

This change adds a query parameter `computeUsingClusterView` to loadstatus apis
that if specified have the coordinator compute undereplication for segments based
on the number of services available within cluster that the segment can be replicated
on, instead of the configured replication count configured in load rule. A default
load rule is created in all clusters that specified that all segments should be
replicated 2 times. As replicas are forced to be on separate nodes in the cluster,
this causes the loadstatus api to report that there are under-replicated segments
when there is only 1 data server in the cluster. In this case, calling loadstatus
api without this new query parameter will always result in a response indicating
under-replication of segments

* * fix exception mapper

* * Address review comments

* * update external API docs

* Apply suggestions from code review

Co-authored-by: Charles Smith <38529548+techdocsmith@users.noreply.github.com>

* * update more external docs

* * update javadoc

* Apply suggestions from code review

Co-authored-by: Charles Smith <38529548+techdocsmith@users.noreply.github.com>

Co-authored-by: Charles Smith <38529548+techdocsmith@users.noreply.github.com>
This commit is contained in:
zachjsh 2021-04-05 00:02:43 -04:00 committed by GitHub
parent 470d659ca0
commit 8cf1e83543
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 337 additions and 45 deletions

View File

@ -86,6 +86,8 @@ Coordinator of the cluster. In addition, returns HTTP 200 if the server is the c
This is suitable for use as a load balancer status check if you only want the active leader to be considered in-service
at the load balancer.
<a name="coordinator-segment-loading"></a>
#### Segment Loading
##### GET
@ -102,6 +104,14 @@ Returns the number of segments left to load until segments that should be loaded
Returns the number of segments left to load in each tier until segments that should be loaded in the cluster are all available. This includes segment replication counts.
* `/druid/coordinator/v1/loadstatus?full?computeUsingClusterView`
Returns the number of segments not yet loaded for each tier until all segments loading in the cluster are available.
The result includes segment replication counts. It also factors in the number of available nodes that are of a service type that can load the segment when computing the number of segments remaining to load.
A segment is considered fully loaded when:
- Druid has replicated it the number of times configured in the corresponding load rule.
- Or the number of replicas for the segment in each tier where it is configured to be replicated equals the available nodes of a service type that are currently allowed to load the segment in the tier.
* `/druid/coordinator/v1/loadqueue`
Returns the ids of segments to load and drop for each Historical process.
@ -151,6 +161,8 @@ Setting `forceMetadataRefresh` to true will force the coordinator to poll latest
(Note: `forceMetadataRefresh=true` refreshes Coordinator's metadata cache of all datasources. This can be a heavy operation in terms
of the load on the metadata store but can be necessary to make sure that we verify all the latest segments' load status)
Setting `forceMetadataRefresh` to false will use the metadata cached on the coordinator from the last force/periodic refresh.
You can pass the optional query parameter `computeUsingClusterView` to factor in the available cluster services when calculating
the segments left to load. See [Coordinator Segment Loading](#coordinator-segment-loading) for details.
If no used segments are found for the given inputs, this API returns `204 No Content`
#### Metadata store information

View File

@ -76,6 +76,7 @@ import org.apache.druid.server.coordinator.duty.UnloadUnusedSegments;
import org.apache.druid.server.coordinator.rules.LoadRule;
import org.apache.druid.server.coordinator.rules.Rule;
import org.apache.druid.server.initialization.ZkPathsConfig;
import org.apache.druid.server.initialization.jetty.ServiceUnavailableException;
import org.apache.druid.server.lookup.cache.LookupCoordinatorManager;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
@ -157,6 +158,7 @@ public class DruidCoordinator
private volatile boolean started = false;
private volatile SegmentReplicantLookup segmentReplicantLookup = null;
private volatile DruidCluster cluster = null;
private int cachedBalancerThreadNumber;
private ListeningExecutorService balancerExec;
@ -280,7 +282,16 @@ public class DruidCoordinator
public Map<String, Object2LongMap<String>> computeUnderReplicationCountsPerDataSourcePerTier()
{
final Iterable<DataSegment> dataSegments = segmentsMetadataManager.iterateAllUsedSegments();
return computeUnderReplicationCountsPerDataSourcePerTierForSegments(dataSegments);
return computeUnderReplicationCountsPerDataSourcePerTierForSegmentsInternal(dataSegments, false);
}
/**
* @return tier -> { dataSource -> underReplicationCount } map
*/
public Map<String, Object2LongMap<String>> computeUnderReplicationCountsPerDataSourcePerTierUsingClusterView()
{
final Iterable<DataSegment> dataSegments = segmentsMetadataManager.iterateAllUsedSegments();
return computeUnderReplicationCountsPerDataSourcePerTierForSegmentsInternal(dataSegments, true);
}
/**
@ -295,37 +306,22 @@ public class DruidCoordinator
Iterable<DataSegment> dataSegments
)
{
final Map<String, Object2LongMap<String>> underReplicationCountsPerDataSourcePerTier = new HashMap<>();
if (segmentReplicantLookup == null) {
return underReplicationCountsPerDataSourcePerTier;
return computeUnderReplicationCountsPerDataSourcePerTierForSegmentsInternal(dataSegments, false);
}
final DateTime now = DateTimes.nowUtc();
for (final DataSegment segment : dataSegments) {
final List<Rule> rules = metadataRuleManager.getRulesWithDefault(segment.getDataSource());
for (final Rule rule : rules) {
if (!rule.appliesTo(segment, now)) {
// Rule did not match. Continue to the next Rule.
continue;
}
if (!rule.canLoadSegments()) {
// Rule matched but rule does not and cannot load segments.
// Hence, there is no need to update underReplicationCountsPerDataSourcePerTier map
break;
}
rule.updateUnderReplicated(underReplicationCountsPerDataSourcePerTier, segmentReplicantLookup, segment);
// Only the first matching rule applies. This is because the Coordinator cycle through all used segments
// and match each segment with the first rule that applies. Each segment may only match a single rule.
break;
}
}
return underReplicationCountsPerDataSourcePerTier;
/**
* segmentReplicantLookup or cluster use in this method could potentially be stale since it is only updated on coordinator runs.
* However, this is ok as long as the {@param dataSegments} is refreshed/latest as this would at least still ensure
* that the stale data in segmentReplicantLookup and cluster would be under counting replication levels,
* rather than potentially falsely reporting that everything is available.
*
* @return tier -> { dataSource -> underReplicationCount } map
*/
public Map<String, Object2LongMap<String>> computeUnderReplicationCountsPerDataSourcePerTierForSegmentsUsingClusterView(
Iterable<DataSegment> dataSegments
)
{
return computeUnderReplicationCountsPerDataSourcePerTierForSegmentsInternal(dataSegments, true);
}
public Object2IntMap<String> computeNumsUnavailableUsedSegmentsPerDataSource()
@ -584,6 +580,58 @@ public class DruidCoordinator
compactSegmentsDuty.run();
}
private Map<String, Object2LongMap<String>> computeUnderReplicationCountsPerDataSourcePerTierForSegmentsInternal(
Iterable<DataSegment> dataSegments,
boolean computeUsingClusterView
)
{
final Map<String, Object2LongMap<String>> underReplicationCountsPerDataSourcePerTier = new HashMap<>();
if (segmentReplicantLookup == null) {
return underReplicationCountsPerDataSourcePerTier;
}
if (computeUsingClusterView && cluster == null) {
throw new ServiceUnavailableException(
"coordinator hasn't populated information about cluster yet, try again later");
}
final DateTime now = DateTimes.nowUtc();
for (final DataSegment segment : dataSegments) {
final List<Rule> rules = metadataRuleManager.getRulesWithDefault(segment.getDataSource());
for (final Rule rule : rules) {
if (!rule.appliesTo(segment, now)) {
// Rule did not match. Continue to the next Rule.
continue;
}
if (!rule.canLoadSegments()) {
// Rule matched but rule does not and cannot load segments.
// Hence, there is no need to update underReplicationCountsPerDataSourcePerTier map
break;
}
if (computeUsingClusterView) {
rule.updateUnderReplicatedWithClusterView(
underReplicationCountsPerDataSourcePerTier,
segmentReplicantLookup,
cluster,
segment
);
} else {
rule.updateUnderReplicated(underReplicationCountsPerDataSourcePerTier, segmentReplicantLookup, segment);
}
// Only the first matching rule applies. This is because the Coordinator cycle through all used segments
// and match each segment with the first rule that applies. Each segment may only match a single rule.
break;
}
}
return underReplicationCountsPerDataSourcePerTier;
}
private void becomeLeader()
{
synchronized (lock) {
@ -852,7 +900,7 @@ public class DruidCoordinator
startPeonsForNewServers(currentServers);
final DruidCluster cluster = prepareCluster(params, currentServers);
cluster = prepareCluster(params, currentServers);
segmentReplicantLookup = SegmentReplicantLookup.make(cluster, getDynamicConfigs().getReplicateAfterLoadTimeout());
stopPeonsForDisappearedServers(currentServers);

View File

@ -24,6 +24,7 @@ import it.unimi.dsi.fastutil.objects.Object2LongOpenHashMap;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.coordinator.CoordinatorStats;
import org.apache.druid.server.coordinator.DruidCluster;
import org.apache.druid.server.coordinator.DruidCoordinator;
import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import org.apache.druid.server.coordinator.SegmentReplicantLookup;
@ -106,6 +107,21 @@ public abstract class BroadcastDistributionRule implements Rule
}
}
@Override
public void updateUnderReplicatedWithClusterView(
Map<String, Object2LongMap<String>> underReplicatedPerTier,
SegmentReplicantLookup segmentReplicantLookup,
DruidCluster cluster,
DataSegment segment
)
{
updateUnderReplicated(
underReplicatedPerTier,
segmentReplicantLookup,
segment
);
}
private CoordinatorStats assign(
final Set<ServerHolder> serverHolders,
final DataSegment segment

View File

@ -119,6 +119,34 @@ public abstract class LoadRule implements Rule
});
}
@Override
public void updateUnderReplicatedWithClusterView(
Map<String, Object2LongMap<String>> underReplicatedPerTier,
SegmentReplicantLookup segmentReplicantLookup,
DruidCluster cluster,
DataSegment segment
)
{
getTieredReplicants().forEach((final String tier, final Integer ruleReplicants) -> {
int currentReplicants = segmentReplicantLookup.getLoadedReplicants(segment.getId(), tier);
Object2LongMap<String> underReplicationPerDataSource = underReplicatedPerTier.computeIfAbsent(
tier,
ignored -> new Object2LongOpenHashMap<>()
);
int possibleReplicants = Math.min(ruleReplicants, cluster.getHistoricals().get(tier).size());
log.debug(
"ruleReplicants: [%d], possibleReplicants: [%d], currentReplicants: [%d]",
ruleReplicants,
possibleReplicants,
currentReplicants
);
((Object2LongOpenHashMap<String>) underReplicationPerDataSource).addTo(
segment.getDataSource(),
Math.max(possibleReplicants - currentReplicants, 0)
);
});
}
/**
* @param stats {@link CoordinatorStats} to accumulate assignment statistics.
*/

View File

@ -24,6 +24,7 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.google.common.base.Preconditions;
import it.unimi.dsi.fastutil.objects.Object2LongMap;
import org.apache.druid.server.coordinator.CoordinatorStats;
import org.apache.druid.server.coordinator.DruidCluster;
import org.apache.druid.server.coordinator.DruidCoordinator;
import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import org.apache.druid.server.coordinator.SegmentReplicantLookup;
@ -77,6 +78,22 @@ public interface Rule
Preconditions.checkArgument(!canLoadSegments());
}
/**
* This method should update the {@param underReplicatedPerTier} with the replication count of the
* {@param segment} taking into consideration the number of servers available in cluster that the segment can be
* replicated on. Rule that returns true for {@link Rule#canLoadSegments()} must override this method.
* Note that {@param underReplicatedPerTier} is a map of tier -> { dataSource -> underReplicationCount }
*/
default void updateUnderReplicatedWithClusterView(
Map<String, Object2LongMap<String>> underReplicatedPerTier,
SegmentReplicantLookup segmentReplicantLookup,
DruidCluster cluster,
DataSegment segment
)
{
Preconditions.checkArgument(!canLoadSegments());
}
/**
* {@link DruidCoordinatorRuntimeParams#getUsedSegments()} must not be called in Rule's code, because the used
* segments are not specified for the {@link DruidCoordinatorRuntimeParams} passed into Rule's code. This is because

View File

@ -30,6 +30,7 @@ import org.apache.druid.server.coordinator.LoadQueuePeon;
import org.apache.druid.server.http.security.StateResourceFilter;
import org.apache.druid.timeline.DataSegment;
import javax.annotation.Nullable;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
@ -85,7 +86,8 @@ public class CoordinatorResource
@Produces(MediaType.APPLICATION_JSON)
public Response getLoadStatus(
@QueryParam("simple") String simple,
@QueryParam("full") String full
@QueryParam("full") String full,
@QueryParam("computeUsingClusterView") @Nullable String computeUsingClusterView
)
{
if (simple != null) {
@ -93,7 +95,9 @@ public class CoordinatorResource
}
if (full != null) {
return Response.ok(coordinator.computeUnderReplicationCountsPerDataSourcePerTier()).build();
return computeUsingClusterView != null
? Response.ok(coordinator.computeUnderReplicationCountsPerDataSourcePerTierUsingClusterView()).build() :
Response.ok(coordinator.computeUnderReplicationCountsPerDataSourcePerTier()).build();
}
return Response.ok(coordinator.getLoadStatus()).build();
}

View File

@ -407,7 +407,8 @@ public class DataSourcesResource
@QueryParam("forceMetadataRefresh") final Boolean forceMetadataRefresh,
@QueryParam("interval") @Nullable final String interval,
@QueryParam("simple") @Nullable final String simple,
@QueryParam("full") @Nullable final String full
@QueryParam("full") @Nullable final String full,
@QueryParam("computeUsingClusterView") @Nullable String computeUsingClusterView
)
{
if (forceMetadataRefresh == null) {
@ -452,8 +453,10 @@ public class DataSourcesResource
).build();
} else if (full != null) {
// Calculate response for full mode
Map<String, Object2LongMap<String>> segmentLoadMap
= coordinator.computeUnderReplicationCountsPerDataSourcePerTierForSegments(segments.get());
Map<String, Object2LongMap<String>> segmentLoadMap =
(computeUsingClusterView != null) ?
coordinator.computeUnderReplicationCountsPerDataSourcePerTierForSegmentsUsingClusterView(segments.get()) :
coordinator.computeUnderReplicationCountsPerDataSourcePerTierForSegments(segments.get());
if (segmentLoadMap.isEmpty()) {
return Response.serverError()
.entity("Coordinator segment replicant lookup is not initialized yet. Try again later.")

View File

@ -116,6 +116,7 @@ public class JettyServerModule extends JerseyServletModule
binder.bind(CustomExceptionMapper.class).in(Singleton.class);
binder.bind(ForbiddenExceptionMapper.class).in(Singleton.class);
binder.bind(BadRequestExceptionMapper.class).in(Singleton.class);
binder.bind(ServiceUnavailableExceptionMapper.class).in(Singleton.class);
serve("/*").with(DruidGuiceContainer.class);

View File

@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.server.initialization.jetty;
/**
* This class is for any exceptions that should return a Service unavailable status code (503).
* See {@code BadQueryException} for query requests.
*
* @see ServiceUnavailableExceptionMapper
*/
public class ServiceUnavailableException extends RuntimeException
{
public ServiceUnavailableException(String msg)
{
super(msg);
}
}

View File

@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.server.initialization.jetty;
import com.google.common.collect.ImmutableMap;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.ext.ExceptionMapper;
import javax.ws.rs.ext.Provider;
@Provider
public class ServiceUnavailableExceptionMapper implements ExceptionMapper<ServiceUnavailableException>
{
@Override
public Response toResponse(ServiceUnavailableException exception)
{
return Response.status(Response.Status.SERVICE_UNAVAILABLE)
.type(MediaType.APPLICATION_JSON)
.entity(ImmutableMap.of("error", exception.getMessage()))
.build();
}
}

View File

@ -412,6 +412,24 @@ public class DruidCoordinatorTest extends CuratorTestBase
// The load rules asks for 2 replicas, therefore 1 replica should still be pending
Assert.assertEquals(1L, underRepliicationCountsPerDataSource.getLong(dataSource));
Map<String, Object2LongMap<String>> underReplicationCountsPerDataSourcePerTierUsingClusterView =
coordinator.computeUnderReplicationCountsPerDataSourcePerTierUsingClusterView();
Assert.assertNotNull(underReplicationCountsPerDataSourcePerTier);
Assert.assertEquals(1, underReplicationCountsPerDataSourcePerTier.size());
Object2LongMap<String> underRepliicationCountsPerDataSourceUsingClusterView =
underReplicationCountsPerDataSourcePerTierUsingClusterView.get(tier);
Assert.assertNotNull(underRepliicationCountsPerDataSourceUsingClusterView);
Assert.assertEquals(1, underRepliicationCountsPerDataSourceUsingClusterView.size());
//noinspection deprecation
Assert.assertNotNull(underRepliicationCountsPerDataSourceUsingClusterView.get(dataSource));
// Simulated the adding of segment to druidServer during SegmentChangeRequestLoad event
// The load rules asks for 2 replicas, but only 1 historical server in cluster. Since computing using cluster view
// the segments are replicated as many times as they can be given state of cluster, therefore should not be
// under-replicated.
Assert.assertEquals(0L, underRepliicationCountsPerDataSourceUsingClusterView.getLong(dataSource));
coordinator.stop();
leaderUnannouncerLatch.await();
@ -499,6 +517,12 @@ public class DruidCoordinatorTest extends CuratorTestBase
Assert.assertEquals(0L, underReplicationCountsPerDataSourcePerTier.get(hotTierName).getLong(dataSource));
Assert.assertEquals(0L, underReplicationCountsPerDataSourcePerTier.get(coldTierName).getLong(dataSource));
Map<String, Object2LongMap<String>> underReplicationCountsPerDataSourcePerTierUsingClusterView =
coordinator.computeUnderReplicationCountsPerDataSourcePerTierUsingClusterView();
Assert.assertEquals(2, underReplicationCountsPerDataSourcePerTierUsingClusterView.size());
Assert.assertEquals(0L, underReplicationCountsPerDataSourcePerTierUsingClusterView.get(hotTierName).getLong(dataSource));
Assert.assertEquals(0L, underReplicationCountsPerDataSourcePerTierUsingClusterView.get(coldTierName).getLong(dataSource));
coordinator.stop();
leaderUnannouncerLatch.await();
@ -660,6 +684,14 @@ public class DruidCoordinatorTest extends CuratorTestBase
Assert.assertEquals(0L, underReplicationCountsPerDataSourcePerTier.get(tierName1).getLong(dataSource));
Assert.assertEquals(0L, underReplicationCountsPerDataSourcePerTier.get(tierName2).getLong(dataSource));
Map<String, Object2LongMap<String>> underReplicationCountsPerDataSourcePerTierUsingClusterView =
coordinator.computeUnderReplicationCountsPerDataSourcePerTierUsingClusterView();
Assert.assertEquals(4, underReplicationCountsPerDataSourcePerTierUsingClusterView.size());
Assert.assertEquals(0L, underReplicationCountsPerDataSourcePerTierUsingClusterView.get(hotTierName).getLong(dataSource));
Assert.assertEquals(0L, underReplicationCountsPerDataSourcePerTierUsingClusterView.get(coldTierName).getLong(dataSource));
Assert.assertEquals(0L, underReplicationCountsPerDataSourcePerTierUsingClusterView.get(tierName1).getLong(dataSource));
Assert.assertEquals(0L, underReplicationCountsPerDataSourcePerTierUsingClusterView.get(tierName2).getLong(dataSource));
coordinator.stop();
leaderUnannouncerLatch.await();

View File

@ -1201,7 +1201,7 @@ public class DataSourcesResourceTest
public void testGetDatasourceLoadstatusForceMetadataRefreshNull()
{
DataSourcesResource dataSourcesResource = new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null, null);
Response response = dataSourcesResource.getDatasourceLoadstatus("datasource1", null, null, null, null);
Response response = dataSourcesResource.getDatasourceLoadstatus("datasource1", null, null, null, null, null);
Assert.assertEquals(400, response.getStatus());
}
@ -1223,7 +1223,7 @@ public class DataSourcesResourceTest
null,
null
);
Response response = dataSourcesResource.getDatasourceLoadstatus("datasource1", true, null, null, null);
Response response = dataSourcesResource.getDatasourceLoadstatus("datasource1", true, null, null, null, null);
Assert.assertEquals(204, response.getStatus());
}
@ -1281,7 +1281,7 @@ public class DataSourcesResourceTest
EasyMock.replay(segmentsMetadataManager, inventoryView);
DataSourcesResource dataSourcesResource = new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null, null);
Response response = dataSourcesResource.getDatasourceLoadstatus("datasource1", true, null, null, null);
Response response = dataSourcesResource.getDatasourceLoadstatus("datasource1", true, null, null, null, null);
Assert.assertEquals(200, response.getStatus());
Assert.assertNotNull(response.getEntity());
Assert.assertEquals(1, ((Map) response.getEntity()).size());
@ -1297,7 +1297,7 @@ public class DataSourcesResourceTest
EasyMock.replay(segmentsMetadataManager, inventoryView);
dataSourcesResource = new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null, null);
response = dataSourcesResource.getDatasourceLoadstatus("datasource1", true, null, null, null);
response = dataSourcesResource.getDatasourceLoadstatus("datasource1", true, null, null, null, null);
Assert.assertEquals(200, response.getStatus());
Assert.assertNotNull(response.getEntity());
Assert.assertEquals(1, ((Map) response.getEntity()).size());
@ -1360,7 +1360,7 @@ public class DataSourcesResourceTest
EasyMock.replay(segmentsMetadataManager, inventoryView);
DataSourcesResource dataSourcesResource = new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null, null);
Response response = dataSourcesResource.getDatasourceLoadstatus("datasource1", true, null, "simple", null);
Response response = dataSourcesResource.getDatasourceLoadstatus("datasource1", true, null, "simple", null, null);
Assert.assertEquals(200, response.getStatus());
Assert.assertNotNull(response.getEntity());
Assert.assertEquals(1, ((Map) response.getEntity()).size());
@ -1376,7 +1376,7 @@ public class DataSourcesResourceTest
EasyMock.replay(segmentsMetadataManager, inventoryView);
dataSourcesResource = new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null, null);
response = dataSourcesResource.getDatasourceLoadstatus("datasource1", true, null, "simple", null);
response = dataSourcesResource.getDatasourceLoadstatus("datasource1", true, null, "simple", null, null);
Assert.assertEquals(200, response.getStatus());
Assert.assertNotNull(response.getEntity());
Assert.assertEquals(1, ((Map) response.getEntity()).size());
@ -1431,7 +1431,64 @@ public class DataSourcesResourceTest
EasyMock.replay(segmentsMetadataManager, druidCoordinator);
DataSourcesResource dataSourcesResource = new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null, druidCoordinator);
Response response = dataSourcesResource.getDatasourceLoadstatus("datasource1", true, null, null, "full");
Response response = dataSourcesResource.getDatasourceLoadstatus("datasource1", true, null, null, "full", null);
Assert.assertEquals(200, response.getStatus());
Assert.assertNotNull(response.getEntity());
Assert.assertEquals(2, ((Map) response.getEntity()).size());
Assert.assertEquals(1, ((Map) ((Map) response.getEntity()).get("tier1")).size());
Assert.assertEquals(1, ((Map) ((Map) response.getEntity()).get("tier2")).size());
Assert.assertEquals(0L, ((Map) ((Map) response.getEntity()).get("tier1")).get("datasource1"));
Assert.assertEquals(3L, ((Map) ((Map) response.getEntity()).get("tier2")).get("datasource1"));
EasyMock.verify(segmentsMetadataManager);
}
@Test
public void testGetDatasourceLoadstatusFullAndComputeUsingClusterView()
{
DataSegment datasource1Segment1 = new DataSegment(
"datasource1",
Intervals.of("2010-01-01/P1D"),
"",
null,
null,
null,
null,
0x9,
10
);
DataSegment datasource1Segment2 = new DataSegment(
"datasource1",
Intervals.of("2010-01-22/P1D"),
"",
null,
null,
null,
null,
0x9,
20
);
List<DataSegment> segments = ImmutableList.of(datasource1Segment1, datasource1Segment2);
final Map<String, Object2LongMap<String>> underReplicationCountsPerDataSourcePerTier = new HashMap<>();
Object2LongMap<String> tier1 = new Object2LongOpenHashMap<>();
tier1.put("datasource1", 0L);
Object2LongMap<String> tier2 = new Object2LongOpenHashMap<>();
tier2.put("datasource1", 3L);
underReplicationCountsPerDataSourcePerTier.put("tier1", tier1);
underReplicationCountsPerDataSourcePerTier.put("tier2", tier2);
// Test when datasource fully loaded
EasyMock.expect(segmentsMetadataManager.iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval(EasyMock.eq("datasource1"), EasyMock.anyObject(Interval.class), EasyMock.anyBoolean()))
.andReturn(Optional.of(segments)).once();
DruidCoordinator druidCoordinator = EasyMock.createMock(DruidCoordinator.class);
EasyMock.expect(druidCoordinator.computeUnderReplicationCountsPerDataSourcePerTierForSegmentsUsingClusterView(segments))
.andReturn(underReplicationCountsPerDataSourcePerTier).once();
EasyMock.replay(segmentsMetadataManager, druidCoordinator);
DataSourcesResource dataSourcesResource = new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null, druidCoordinator);
Response response = dataSourcesResource.getDatasourceLoadstatus("datasource1", true, null, null, "full", "computeUsingClusterView");
Assert.assertEquals(200, response.getStatus());
Assert.assertNotNull(response.getEntity());
Assert.assertEquals(2, ((Map) response.getEntity()).size());