Add safeguard to make sure new Rules added are aware of Rule usage in loadstatus API (#10054)

* Add safeguard to make sure new Rules added are aware of Rule usuage in loadstatus API

* address comments

* address comments

* add tests
This commit is contained in:
Maytas Monsereenusorn 2020-06-19 17:18:56 -10:00 committed by GitHub
parent c2f5d453f8
commit 191572ad5e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 156 additions and 40 deletions

View File

@ -29,7 +29,6 @@ import it.unimi.dsi.fastutil.objects.Object2IntMap;
import it.unimi.dsi.fastutil.objects.Object2IntMaps;
import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
import it.unimi.dsi.fastutil.objects.Object2LongMap;
import it.unimi.dsi.fastutil.objects.Object2LongOpenHashMap;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.utils.ZKPaths;
import org.apache.druid.client.DataSourcesSnapshot;
@ -69,7 +68,6 @@ import org.apache.druid.server.coordinator.duty.LogUsedSegments;
import org.apache.druid.server.coordinator.duty.MarkAsUnusedOvershadowedSegments;
import org.apache.druid.server.coordinator.duty.RunRules;
import org.apache.druid.server.coordinator.duty.UnloadUnusedSegments;
import org.apache.druid.server.coordinator.rules.BroadcastDistributionRule;
import org.apache.druid.server.coordinator.rules.LoadRule;
import org.apache.druid.server.coordinator.rules.Rule;
import org.apache.druid.server.initialization.ZkPathsConfig;
@ -275,13 +273,6 @@ public class DruidCoordinator
)
{
final Map<String, Object2LongMap<String>> underReplicationCountsPerDataSourcePerTier = new HashMap<>();
final Set<String> decommissioningServers = getDynamicConfigs().getDecommissioningNodes();
final List<ImmutableDruidServer> broadcastTargetServers = serverInventoryView
.getInventory()
.stream()
.filter(druidServer -> druidServer.isSegmentBroadcastTarget() && !decommissioningServers.contains(druidServer.getHost()))
.map(DruidServer::toImmutableDruidServer)
.collect(Collectors.toList());
if (segmentReplicantLookup == null) {
return underReplicationCountsPerDataSourcePerTier;
@ -294,36 +285,19 @@ public class DruidCoordinator
for (final Rule rule : rules) {
if (!rule.appliesTo(segment, now)) {
// Rule did not match. Continue to the next Rule.
continue;
}
if (rule instanceof LoadRule) {
((LoadRule) rule)
.getTieredReplicants()
.forEach((final String tier, final Integer ruleReplicants) -> {
int currentReplicants = segmentReplicantLookup.getLoadedReplicants(segment.getId(), tier);
Object2LongMap<String> underReplicationPerDataSource = underReplicationCountsPerDataSourcePerTier
.computeIfAbsent(tier, ignored -> new Object2LongOpenHashMap<>());
((Object2LongOpenHashMap<String>) underReplicationPerDataSource)
.addTo(segment.getDataSource(), Math.max(ruleReplicants - currentReplicants, 0));
});
if (!rule.canLoadSegments()) {
// Rule matched but rule does not and cannot load segments.
// Hence, there is no need to update underReplicationCountsPerDataSourcePerTier map
break;
}
if (rule instanceof BroadcastDistributionRule) {
for (ImmutableDruidServer server : broadcastTargetServers) {
Object2LongMap<String> underReplicationPerDataSource = underReplicationCountsPerDataSourcePerTier
.computeIfAbsent(server.getTier(), ignored -> new Object2LongOpenHashMap<>());
if (server.getSegment(segment.getId()) == null) {
((Object2LongOpenHashMap<String>) underReplicationPerDataSource)
.addTo(segment.getDataSource(), 1);
} else {
// This make sure that every datasource has a entry even if the all segments are loaded
underReplicationPerDataSource.putIfAbsent(segment.getDataSource(), 0);
}
}
}
rule.updateUnderReplicated(underReplicationCountsPerDataSourcePerTier, segmentReplicantLookup, segment);
// only the first matching rule applies
// Only the first matching rule applies. This is because the Coordinator cycle through all used segments
// and match each segment with the first rule that applies. Each segment may only match a single rule.
break;
}
}

View File

@ -21,6 +21,8 @@ package org.apache.druid.server.coordinator;
import com.google.common.collect.HashBasedTable;
import com.google.common.collect.Table;
import it.unimi.dsi.fastutil.objects.Object2LongMap;
import it.unimi.dsi.fastutil.objects.Object2LongOpenHashMap;
import org.apache.druid.client.ImmutableDruidServer;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
@ -62,19 +64,22 @@ public class SegmentReplicantLookup
}
}
return new SegmentReplicantLookup(segmentsInCluster, loadingSegments);
return new SegmentReplicantLookup(segmentsInCluster, loadingSegments, cluster);
}
private final Table<SegmentId, String, Integer> segmentsInCluster;
private final Table<SegmentId, String, Integer> loadingSegments;
private final DruidCluster cluster;
private SegmentReplicantLookup(
Table<SegmentId, String, Integer> segmentsInCluster,
Table<SegmentId, String, Integer> loadingSegments
Table<SegmentId, String, Integer> loadingSegments,
DruidCluster cluster
)
{
this.segmentsInCluster = segmentsInCluster;
this.loadingSegments = loadingSegments;
this.cluster = cluster;
}
public Map<String, Integer> getClusterTiers(SegmentId segmentId)
@ -93,7 +98,7 @@ public class SegmentReplicantLookup
return retVal;
}
int getLoadedReplicants(SegmentId segmentId, String tier)
public int getLoadedReplicants(SegmentId segmentId, String tier)
{
Integer retVal = segmentsInCluster.get(segmentId, tier);
return (retVal == null) ? 0 : retVal;
@ -124,4 +129,21 @@ public class SegmentReplicantLookup
{
return getLoadedReplicants(segmentId, tier) + getLoadingReplicants(segmentId, tier);
}
public Object2LongMap<String> getBroadcastUnderReplication(SegmentId segmentId)
{
Object2LongOpenHashMap<String> perTier = new Object2LongOpenHashMap<>();
for (ServerHolder holder : cluster.getAllServers()) {
// Only record tier entry for server that is segment broadcast target
if (holder.getServer().getType().isSegmentBroadcastTarget()) {
// Every broadcast target server should be serving 1 replica of the segment
if (!holder.isServingSegment(segmentId)) {
perTier.addTo(holder.getServer().getTier(), 1L);
} else {
perTier.putIfAbsent(holder.getServer().getTier(), 0);
}
}
}
return perTier;
}
}

View File

@ -22,6 +22,7 @@ package org.apache.druid.server.coordinator;
import org.apache.druid.client.ImmutableDruidServer;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import java.util.Objects;
@ -114,7 +115,7 @@ public class ServerHolder implements Comparable<ServerHolder>
public boolean isServingSegment(DataSegment segment)
{
return server.getSegment(segment.getId()) != null;
return isServingSegment(segment.getId());
}
public boolean isLoadingSegment(DataSegment segment)
@ -132,6 +133,11 @@ public class ServerHolder implements Comparable<ServerHolder>
return peon.getNumberOfSegmentsInQueue();
}
public boolean isServingSegment(SegmentId segmentId)
{
return server.getSegment(segmentId) != null;
}
@Override
public int compareTo(ServerHolder serverHolder)
{

View File

@ -19,15 +19,19 @@
package org.apache.druid.server.coordinator.rules;
import it.unimi.dsi.fastutil.objects.Object2LongMap;
import it.unimi.dsi.fastutil.objects.Object2LongOpenHashMap;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.coordinator.CoordinatorStats;
import org.apache.druid.server.coordinator.DruidCoordinator;
import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import org.apache.druid.server.coordinator.SegmentReplicantLookup;
import org.apache.druid.server.coordinator.ServerHolder;
import org.apache.druid.timeline.DataSegment;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
@ -71,6 +75,37 @@ public abstract class BroadcastDistributionRule implements Rule
.accumulate(drop(dropServerHolders, segment));
}
@Override
public boolean canLoadSegments()
{
return true;
}
@Override
public void updateUnderReplicated(
Map<String, Object2LongMap<String>> underReplicatedPerTier,
SegmentReplicantLookup segmentReplicantLookup,
DataSegment segment
)
{
Object2LongMap<String> underReplicatedBroadcastTiers = segmentReplicantLookup.getBroadcastUnderReplication(segment.getId());
for (final Object2LongMap.Entry<String> entry : underReplicatedBroadcastTiers.object2LongEntrySet()) {
final String tier = entry.getKey();
final long underReplicatedCount = entry.getLongValue();
underReplicatedPerTier.compute(tier, (_tier, existing) -> {
Object2LongMap<String> underReplicationPerDataSource = existing;
if (existing == null) {
underReplicationPerDataSource = new Object2LongOpenHashMap<>();
}
underReplicationPerDataSource.compute(
segment.getDataSource(),
(_datasource, count) -> count != null ? count + underReplicatedCount : underReplicatedCount
);
return underReplicationPerDataSource;
});
}
}
private CoordinatorStats assign(
final Set<ServerHolder> serverHolders,
final DataSegment segment

View File

@ -37,4 +37,10 @@ public abstract class DropRule implements Rule
stats.addToGlobalStat("deletedCount", 1);
return stats;
}
@Override
public boolean canLoadSegments()
{
return false;
}
}

View File

@ -21,6 +21,8 @@ package org.apache.druid.server.coordinator.rules;
import it.unimi.dsi.fastutil.objects.Object2IntMap;
import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
import it.unimi.dsi.fastutil.objects.Object2LongMap;
import it.unimi.dsi.fastutil.objects.Object2LongOpenHashMap;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.emitter.EmittingLogger;
@ -30,6 +32,7 @@ import org.apache.druid.server.coordinator.DruidCluster;
import org.apache.druid.server.coordinator.DruidCoordinator;
import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import org.apache.druid.server.coordinator.ReplicationThrottler;
import org.apache.druid.server.coordinator.SegmentReplicantLookup;
import org.apache.druid.server.coordinator.ServerHolder;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
@ -90,6 +93,32 @@ public abstract class LoadRule implements Rule
}
}
@Override
public boolean canLoadSegments()
{
return true;
}
@Override
public void updateUnderReplicated(
Map<String, Object2LongMap<String>> underReplicatedPerTier,
SegmentReplicantLookup segmentReplicantLookup,
DataSegment segment
)
{
getTieredReplicants().forEach((final String tier, final Integer ruleReplicants) -> {
int currentReplicants = segmentReplicantLookup.getLoadedReplicants(segment.getId(), tier);
Object2LongMap<String> underReplicationPerDataSource = underReplicatedPerTier.computeIfAbsent(
tier,
ignored -> new Object2LongOpenHashMap<>()
);
((Object2LongOpenHashMap<String>) underReplicationPerDataSource).addTo(
segment.getDataSource(),
Math.max(ruleReplicants - currentReplicants, 0)
);
});
}
/**
* @param stats {@link CoordinatorStats} to accumulate assignment statistics.
*/

View File

@ -21,13 +21,18 @@ package org.apache.druid.server.coordinator.rules;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.google.common.base.Preconditions;
import it.unimi.dsi.fastutil.objects.Object2LongMap;
import org.apache.druid.server.coordinator.CoordinatorStats;
import org.apache.druid.server.coordinator.DruidCoordinator;
import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import org.apache.druid.server.coordinator.SegmentReplicantLookup;
import org.apache.druid.timeline.DataSegment;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import java.util.Map;
/**
*/
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
@ -51,6 +56,27 @@ public interface Rule
boolean appliesTo(Interval interval, DateTime referenceTimestamp);
/**
* Return true if this Rule can load segment onto one or more type of Druid node, otherwise return false.
* Any Rule that returns true for this method should implement logic for calculating segment under replicated
* in {@link Rule#updateUnderReplicated}
*/
boolean canLoadSegments();
/**
* This method should update the {@param underReplicatedPerTier} with the replication count of the
* {@param segment}. Rule that returns true for {@link Rule#canLoadSegments()} must override this method.
* Note that {@param underReplicatedPerTier} is a map of tier -> { dataSource -> underReplicationCount }
*/
default void updateUnderReplicated(
Map<String, Object2LongMap<String>> underReplicatedPerTier,
SegmentReplicantLookup segmentReplicantLookup,
DataSegment segment
)
{
Preconditions.checkArgument(!canLoadSegments());
}
/**
* {@link DruidCoordinatorRuntimeParams#getUsedSegments()} must not be called in Rule's code, because the used
* segments are not specified for the {@link DruidCoordinatorRuntimeParams} passed into Rule's code. This is because

View File

@ -39,7 +39,7 @@ public class ServerHolderTest
{
private static final List<DataSegment> SEGMENTS = ImmutableList.of(
new DataSegment(
"test",
"src1",
Intervals.of("2015-04-12/2015-04-13"),
"1",
ImmutableMap.of("containerName", "container1", "blobPath", "blobPath1"),
@ -50,7 +50,7 @@ public class ServerHolderTest
1
),
new DataSegment(
"test",
"src2",
Intervals.of("2015-04-12/2015-04-13"),
"1",
ImmutableMap.of("containerName", "container2", "blobPath", "blobPath2"),
@ -177,4 +177,22 @@ public class ServerHolderTest
Assert.assertNotEquals(h1, h4);
Assert.assertNotEquals(h1, h5);
}
@Test
public void testIsServingSegment()
{
final ServerHolder h1 = new ServerHolder(
new ImmutableDruidServer(
new DruidServerMetadata("name1", "host1", null, 100L, ServerType.HISTORICAL, "tier1", 0),
0L,
ImmutableMap.of("src1", DATA_SOURCES.get("src1")),
1
),
new LoadQueuePeonTester()
);
Assert.assertTrue(h1.isServingSegment(SEGMENTS.get(0)));
Assert.assertFalse(h1.isServingSegment(SEGMENTS.get(1)));
Assert.assertTrue(h1.isServingSegment(SEGMENTS.get(0).getId()));
Assert.assertFalse(h1.isServingSegment(SEGMENTS.get(1).getId()));
}
}