More coordinator logging to help give context to load queue peon log messages (#5929)

* more coordinator logging to help give context to load queue peon log messages

* fix style

* more chill load queue peon log messages
This commit is contained in:
Clint Wylie 2018-07-04 23:40:25 -07:00 committed by Gian Merlino
parent 0a472d3fa0
commit 39371b0ff8
5 changed files with 106 additions and 84 deletions

View File

@ -174,7 +174,7 @@ public class CuratorLoadQueuePeon extends LoadQueuePeon
}
}
log.info("Asking server peon[%s] to load segment[%s]", basePath, segment.getIdentifier());
log.debug("Asking server peon[%s] to load segment[%s]", basePath, segment.getIdentifier());
queuedSize.addAndGet(segment.getSize());
segmentsToLoad.put(segment, new SegmentHolder(segment, LOAD, Collections.singletonList(callback)));
}
@ -205,7 +205,7 @@ public class CuratorLoadQueuePeon extends LoadQueuePeon
}
}
log.info("Asking server peon[%s] to drop segment[%s]", basePath, segment.getIdentifier());
log.debug("Asking server peon[%s] to drop segment[%s]", basePath, segment.getIdentifier());
segmentsToDrop.put(segment, new SegmentHolder(segment, DROP, Collections.singletonList(callback)));
}
@ -244,16 +244,6 @@ public class CuratorLoadQueuePeon extends LoadQueuePeon
}
try {
if (currentlyProcessing == null) {
if (!stopped) {
log.makeAlert("Crazy race condition! server[%s]", basePath)
.emit();
}
actionCompleted();
return;
}
log.info("Server[%s] processing segment[%s]", basePath, currentlyProcessing.getSegmentIdentifier());
final String path = ZKPaths.makePath(basePath, currentlyProcessing.getSegmentIdentifier());
final byte[] payload = jsonMapper.writeValueAsBytes(currentlyProcessing.getChangeRequest());
curator.create().withMode(CreateMode.EPHEMERAL).forPath(path, payload);
@ -397,7 +387,12 @@ public class CuratorLoadQueuePeon extends LoadQueuePeon
return;
}
actionCompleted();
log.info("Server[%s] done processing [%s]", basePath, path);
log.info(
"Server[%s] done processing %s of segment [%s]",
basePath,
currentlyProcessing.getType() == LOAD ? "load" : "drop",
path
);
}
}

View File

@ -696,11 +696,7 @@ public class DruidCoordinator
super(
ImmutableList.of(
new DruidCoordinatorSegmentInfoLoader(DruidCoordinator.this),
new DruidCoordinatorHelper()
{
@Override
public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
{
params -> {
// Display info about all historical servers
Iterable<ImmutableDruidServer> servers = FunctionalIterable
.create(serverInventoryView.getInventory())
@ -752,7 +748,6 @@ public class DruidCoordinator
.withSegmentReplicantLookup(segmentReplicantLookup)
.withBalancerReferenceTimestamp(DateTimes.nowUtc())
.build();
}
},
new DruidCoordinatorRuleRunner(DruidCoordinator.this),
new DruidCoordinatorCleanupUnneeded(DruidCoordinator.this),

View File

@ -390,7 +390,7 @@ public class HttpLoadQueuePeon extends LoadQueuePeon
SegmentHolder holder = segmentsToLoad.get(segment);
if (holder == null) {
log.info("Server[%s] to load segment[%s] queued.", serverId, segment.getIdentifier());
log.debug("Server[%s] to load segment[%s] queued.", serverId, segment.getIdentifier());
segmentsToLoad.put(segment, new LoadSegmentHolder(segment, callback));
processingExecutor.execute(this::doSegmentManagement);
} else {
@ -415,7 +415,7 @@ public class HttpLoadQueuePeon extends LoadQueuePeon
SegmentHolder holder = segmentsToDrop.get(segment);
if (holder == null) {
log.info("Server[%s] to drop segment[%s] queued.", serverId, segment.getIdentifier());
log.debug("Server[%s] to drop segment[%s] queued.", serverId, segment.getIdentifier());
segmentsToDrop.put(segment, new DropSegmentHolder(segment, callback));
processingExecutor.execute(this::doSegmentManagement);
} else {

View File

@ -26,7 +26,6 @@ import io.druid.server.coordinator.CoordinatorStats;
import io.druid.server.coordinator.DruidCluster;
import io.druid.server.coordinator.DruidCoordinator;
import io.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import io.druid.server.coordinator.LoadPeonCallback;
import io.druid.server.coordinator.LoadQueuePeon;
import io.druid.server.coordinator.ServerHolder;
import io.druid.timeline.DataSegment;
@ -74,15 +73,16 @@ public class DruidCoordinatorCleanupUnneeded implements DruidCoordinatorHelper
if (!queuePeon.getSegmentsToDrop().contains(segment)) {
queuePeon.dropSegment(
segment, new LoadPeonCallback()
{
@Override
public void execute()
{
}
segment, () -> {
}
);
stats.addToTieredStat("unneededCount", server.getTier(), 1);
log.info(
"Dropping uneeded segment [%s] from server [%s] in tier [%s]",
segment.getIdentifier(),
server.getName(),
server.getTier()
);
}
}
}

View File

@ -20,6 +20,7 @@
package io.druid.server.coordinator.rules;
import io.druid.java.util.common.IAE;
import io.druid.java.util.common.StringUtils;
import io.druid.java.util.emitter.EmittingLogger;
import io.druid.server.coordinator.BalancerStrategy;
import io.druid.server.coordinator.CoordinatorStats;
@ -174,6 +175,13 @@ public abstract class LoadRule implements Rule
}
final String tier = entry.getKey();
String noAvailability = StringUtils.format(
"No available [%s] servers or node capacity to assign primary segment[%s]! Expected Replicants[%d]",
tier,
segment.getIdentifier(),
targetReplicantsInTier
);
final List<ServerHolder> holders = getFilteredHolders(
tier,
params.getDruidCluster(),
@ -181,16 +189,13 @@ public abstract class LoadRule implements Rule
);
// no holders satisfy the predicate
if (holders.isEmpty()) {
log.warn(noAvailability);
continue;
}
final ServerHolder candidate = params.getBalancerStrategy().findNewSegmentHomeReplicator(segment, holders);
if (candidate == null) {
log.warn(
"No available [%s] servers or node capacity to assign primary segment[%s]! " +
"Expected Replicants[%d]",
tier, segment.getIdentifier(), targetReplicantsInTier
);
log.warn(noAvailability);
} else {
// cache the result for later use.
strategyCache.put(tier, candidate);
@ -204,6 +209,12 @@ public abstract class LoadRule implements Rule
if (topCandidate != null) {
// remove tier for primary replica
strategyCache.remove(topCandidate.getServer().getTier());
log.info(
"Assigning 'primary' for segment [%s] to server [%s] in tier [%s]",
segment.getIdentifier(),
topCandidate.getServer().getName(),
topCandidate.getServer().getTier()
);
topCandidate.getPeon().loadSegment(segment, null);
}
@ -225,6 +236,7 @@ public abstract class LoadRule implements Rule
for (final Object2IntMap.Entry<String> entry : targetReplicants.object2IntEntrySet()) {
final String tier = entry.getKey();
if (tier.equals(tierToSkip)) {
log.info("Skipping replica assignment for tier [%s]", tier);
continue;
}
final int numAssigned = assignReplicasForTier(
@ -235,6 +247,7 @@ public abstract class LoadRule implements Rule
createLoadQueueSizeLimitingPredicate(params),
segment
);
log.info("Assigned %d replicas in tier [%s]", numAssigned, tier);
stats.addToTieredStat(ASSIGNED_COUNT, tier, numAssigned);
}
}
@ -257,15 +270,24 @@ public abstract class LoadRule implements Rule
return 0;
}
String noAvailability = StringUtils.format(
"No available [%s] servers or node capacity to assign segment[%s]! Expected Replicants[%d]",
tier,
segment.getIdentifier(),
targetReplicantsInTier
);
final List<ServerHolder> holders = getFilteredHolders(tier, params.getDruidCluster(), predicate);
// if no holders available for assignment
if (holders.isEmpty()) {
log.warn(noAvailability);
return 0;
}
final ReplicationThrottler throttler = params.getReplicationManager();
for (int numAssigned = 0; numAssigned < numToAssign; numAssigned++) {
if (!throttler.canCreateReplicant(tier)) {
log.info("Throttling replication for segment [%s] in tier [%s]", segment.getIdentifier(), tier);
return numAssigned;
}
@ -277,10 +299,7 @@ public abstract class LoadRule implements Rule
}
if (holder == null) {
log.warn(
"No available [%s] servers or node capacity to assign segment[%s]! Expected Replicants[%d]",
tier, segment.getIdentifier(), targetReplicantsInTier
);
log.warn(noAvailability);
return numAssigned;
}
holders.remove(holder);
@ -288,6 +307,12 @@ public abstract class LoadRule implements Rule
final String segmentId = segment.getIdentifier();
final String holderHost = holder.getServer().getHost();
throttler.registerReplicantCreation(tier, segmentId, holderHost);
log.info(
"Assigning 'replica' for segment [%s] to server [%s] in tier [%s]",
segment.getIdentifier(),
holder.getServer().getName(),
holder.getServer().getTier()
);
holder.getPeon().loadSegment(segment, () -> throttler.unregisterReplicantCreation(tier, segmentId, holderHost));
}
@ -307,6 +332,7 @@ public abstract class LoadRule implements Rule
// This enforces that loading is completed before we attempt to drop stuffs as a safety measure.
if (loadingInProgress(druidCluster)) {
log.info("Loading in progress, skipping drop until loading is complete");
return;
}
@ -372,6 +398,12 @@ public abstract class LoadRule implements Rule
final ServerHolder holder = iterator.next();
if (holder.isServingSegment(segment)) {
log.info(
"Dropping segment [%s] on server [%s] in tier [%s]",
segment.getIdentifier(),
holder.getServer().getName(),
holder.getServer().getTier()
);
holder.getPeon().dropSegment(segment, null);
++numDropped;
} else {