print replication levels in coordinator segment logs (#12511)

* print replication levels in coordinator segment logs

* add served segment count to stats

* also for drops
This commit is contained in:
Clint Wylie 2022-05-17 02:24:13 -07:00 committed by GitHub
parent 0fd4f1e386
commit b23ddc5939
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 59 additions and 23 deletions

View File

@ -240,12 +240,13 @@ public class EmitClusterStatsAndMetrics implements CoordinatorDuty
ImmutableDruidServer server = serverHolder.getServer();
LoadQueuePeon queuePeon = serverHolder.getPeon();
log.info(
"Server[%s, %s, %s] has %,d left to load, %,d left to drop, %,d bytes queued, %,d bytes served.",
"Server[%s, %s, %s] has %,d left to load, %,d left to drop, %,d served, %,d bytes queued, %,d bytes served.",
server.getName(),
server.getType().toString(),
server.getTier(),
queuePeon.getSegmentsToLoad().size(),
queuePeon.getSegmentsToDrop().size(),
server.getNumSegments(),
queuePeon.getLoadQueueSize(),
server.getCurrSize()
);

View File

@ -239,10 +239,10 @@ public abstract class LoadRule implements Rule
final String tier = entry.getKey();
String noAvailability = StringUtils.format(
"No available [%s] servers or node capacity to assign primary segment[%s]! Expected Replicants[%d]",
"No available [%s] servers or node capacity to assign primary segment [%s]! %s",
tier,
segment.getId(),
targetReplicantsInTier
getReplicationLogString()
);
final List<ServerHolder> holders = getFilteredHolders(
@ -299,7 +299,7 @@ public abstract class LoadRule implements Rule
for (final Object2IntMap.Entry<String> entry : targetReplicants.object2IntEntrySet()) {
final String tier = entry.getKey();
if (tier.equals(tierToSkip)) {
log.info("Skipping replica assignment for tier [%s]", tier);
log.info("Skipping replica assignment for segment [%s] to tier [%s]", segment.getId(), tier);
continue;
}
final int numAssigned = assignReplicasForTier(
@ -334,10 +334,10 @@ public abstract class LoadRule implements Rule
}
String noAvailability = StringUtils.format(
"No available [%s] servers or node capacity to assign segment[%s]! Expected Replicants[%d]",
"No available [%s] servers or node capacity to assign segment [%s]! %s",
tier,
segment.getId(),
targetReplicantsInTier
getReplicationLogString()
);
final List<ServerHolder> holders = getFilteredHolders(tier, params.getDruidCluster(), predicate);
@ -350,7 +350,7 @@ public abstract class LoadRule implements Rule
final ReplicationThrottler throttler = params.getReplicationManager();
for (int numAssigned = 0; numAssigned < numToAssign; numAssigned++) {
if (!throttler.canCreateReplicant(tier)) {
log.info("Throttling replication for segment [%s] in tier [%s]", segment.getId(), tier);
log.info("Throttling replication for segment [%s] in tier [%s]. %s", segment.getId(), tier, getReplicationLogString());
return numAssigned;
}
@ -371,10 +371,11 @@ public abstract class LoadRule implements Rule
final String holderHost = holder.getServer().getHost();
throttler.registerReplicantCreation(tier, segmentId, holderHost);
log.info(
"Assigning 'replica' for segment [%s] to server [%s] in tier [%s]",
"Assigning 'replica' for segment [%s] to server [%s] in tier [%s]. %s",
segment.getId(),
holder.getServer().getName(),
holder.getServer().getTier()
holder.getServer().getTier(),
getReplicationLogString()
);
holder.getPeon().loadSegment(segment, () -> throttler.unregisterReplicantCreation(tier, segmentId));
}
@ -393,11 +394,8 @@ public abstract class LoadRule implements Rule
{
final DruidCluster druidCluster = params.getDruidCluster();
// This enforces that loading is completed before we attempt to drop stuffs as a safety measure.
if (loadingInProgress(druidCluster)) {
log.info("Loading in progress, skipping drop until loading is complete");
return;
}
final boolean isLoading = loadingInProgress(druidCluster);
for (final Object2IntMap.Entry<String> entry : currentReplicants.object2IntEntrySet()) {
final String tier = entry.getKey();
@ -412,7 +410,23 @@ public abstract class LoadRule implements Rule
final int currentReplicantsInTier = entry.getIntValue();
final int numToDrop = currentReplicantsInTier - targetReplicants.getOrDefault(tier, 0);
if (numToDrop > 0) {
numDropped = dropForTier(numToDrop, holders, segment, params.getBalancerStrategy());
// This enforces that loading is completed before we attempt to drop stuffs as a safety measure.
if (isLoading) {
log.info(
"Loading in progress for segment [%s], skipping drop from tier [%s] until loading is complete! %s",
segment.getId(),
tier,
getReplicationLogString()
);
break;
}
numDropped = dropForTier(
numToDrop,
holders,
segment,
params.getBalancerStrategy(),
getReplicationLogString()
);
} else {
numDropped = 0;
}
@ -442,7 +456,8 @@ public abstract class LoadRule implements Rule
final int numToDrop,
final NavigableSet<ServerHolder> holdersInTier,
final DataSegment segment,
final BalancerStrategy balancerStrategy
final BalancerStrategy balancerStrategy,
final String replicationLog
)
{
Map<Boolean, TreeSet<ServerHolder>> holders = holdersInTier.stream()
@ -453,9 +468,9 @@ public abstract class LoadRule implements Rule
));
TreeSet<ServerHolder> decommissioningServers = holders.get(true);
TreeSet<ServerHolder> activeServers = holders.get(false);
int left = dropSegmentFromServers(balancerStrategy, segment, decommissioningServers, numToDrop);
int left = dropSegmentFromServers(balancerStrategy, segment, decommissioningServers, numToDrop, replicationLog);
if (left > 0) {
left = dropSegmentFromServers(balancerStrategy, segment, activeServers, left);
left = dropSegmentFromServers(balancerStrategy, segment, activeServers, left, replicationLog);
}
if (left != 0) {
log.warn("I have no servers serving [%s]?", segment.getId());
@ -464,9 +479,11 @@ public abstract class LoadRule implements Rule
}
private static int dropSegmentFromServers(
BalancerStrategy balancerStrategy,
DataSegment segment,
NavigableSet<ServerHolder> holders, int numToDrop
final BalancerStrategy balancerStrategy,
final DataSegment segment,
final NavigableSet<ServerHolder> holders,
int numToDrop,
final String replicationLog
)
{
final Iterator<ServerHolder> iterator = balancerStrategy.pickServersToDrop(segment, holders);
@ -479,10 +496,11 @@ public abstract class LoadRule implements Rule
final ServerHolder holder = iterator.next();
if (holder.isServingSegment(segment)) {
log.info(
"Dropping segment [%s] on server [%s] in tier [%s]",
"Dropping segment [%s] on server [%s] in tier [%s]. %s",
segment.getId(),
holder.getServer().getName(),
holder.getServer().getTier()
holder.getServer().getTier(),
replicationLog
);
holder.getPeon().dropSegment(segment, null);
numToDrop--;
@ -515,4 +533,21 @@ public abstract class LoadRule implements Rule
public abstract Map<String, Integer> getTieredReplicants();
public abstract int getNumReplicants(String tier);
protected String getReplicationLogString()
{
StringBuilder builder = new StringBuilder("Current replication: [");
for (final Object2IntMap.Entry<String> entry : currentReplicants.object2IntEntrySet()) {
final String tier = entry.getKey();
// [hot:1/2][cold:2/2]
builder.append("[")
.append(tier)
.append(":")
.append(entry.getIntValue())
.append("/")
.append(targetReplicants.getInt(tier))
.append("]");
}
return builder.append("]").toString();
}
}