From ac93f752b0d9afbd49ebf4f440f09cde786714c6 Mon Sep 17 00:00:00 2001 From: Fangjin Yang Date: Mon, 11 Feb 2013 10:56:30 -0800 Subject: [PATCH] fix a case where a segment may get stuck in the terminate queue --- .../druid/master/ReplicationThrottler.java | 58 ++++++++++++------- .../metamx/druid/master/rules/LoadRule.java | 42 +++++++++----- 2 files changed, 66 insertions(+), 34 deletions(-) diff --git a/server/src/main/java/com/metamx/druid/master/ReplicationThrottler.java b/server/src/main/java/com/metamx/druid/master/ReplicationThrottler.java index 3b3e90f78c2..85743143438 100644 --- a/server/src/main/java/com/metamx/druid/master/ReplicationThrottler.java +++ b/server/src/main/java/com/metamx/druid/master/ReplicationThrottler.java @@ -19,12 +19,13 @@ package com.metamx.druid.master; +import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.metamx.emitter.EmittingLogger; +import java.util.List; import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentSkipListSet; +import java.util.concurrent.ConcurrentHashMap; /** * The ReplicationThrottler is used to throttle the number of replicants that are created and destroyed. @@ -61,17 +62,20 @@ public class ReplicationThrottler int size = holder.getNumProcessing(tier); if (size != 0) { log.info( - "[%s]: Replicant %s queue still has %d segments. Lifetime[%d]", + "[%s]: Replicant %s queue still has %d segments. Lifetime[%d]. Segments %s", tier, type, size, - holder.getLifetime(tier) + holder.getLifetime(tier), + holder.getCurrentlyProcessingSegmentsAndHosts(tier) ); holder.reduceLifetime(tier); lookup.put(tier, false); if (holder.getLifetime(tier) < 0) { - log.makeAlert("[%s]: Replicant %s queue stuck after %d+ runs!", tier, type, maxLifetime).emit(); + log.makeAlert("[%s]: Replicant %s queue stuck after %d+ runs!", tier, type, maxLifetime) + .addData("segments", holder.getCurrentlyProcessingSegmentsAndHosts(tier)) + .emit(); } } else { log.info("[%s]: Replicant %s queue is empty.", tier, type); @@ -90,49 +94,49 @@ public class ReplicationThrottler return terminatingLookup.get(tier); } - public boolean registerReplicantCreation(String tier, String segmentId) + public boolean registerReplicantCreation(String tier, String segmentId, String serverId) { - return currentlyReplicating.addSegment(tier, segmentId); + return currentlyReplicating.addSegment(tier, segmentId, serverId); } - public void unregisterReplicantCreation(String tier, String segmentId) + public void unregisterReplicantCreation(String tier, String segmentId, String serverId) { - currentlyReplicating.removeSegment(tier, segmentId); + currentlyReplicating.removeSegment(tier, segmentId, serverId); } - public boolean registerReplicantTermination(String tier, String segmentId) + public boolean registerReplicantTermination(String tier, String segmentId, String serverId) { - return currentlyTerminating.addSegment(tier, segmentId); + return currentlyTerminating.addSegment(tier, segmentId, serverId); } - public void unregisterReplicantTermination(String tier, String segmentId) + public void unregisterReplicantTermination(String tier, String segmentId, String serverId) { - currentlyTerminating.removeSegment(tier, segmentId); + currentlyTerminating.removeSegment(tier, segmentId, serverId); } private class ReplicatorSegmentHolder { - private final Map> currentlyProcessingSegments = Maps.newHashMap(); + private final Map> currentlyProcessingSegments = Maps.newHashMap(); private final Map lifetimes = Maps.newHashMap(); - public boolean addSegment(String tier, String segmentId) + public boolean addSegment(String tier, String segmentId, String serverId) { - ConcurrentSkipListSet segments = currentlyProcessingSegments.get(tier); + ConcurrentHashMap segments = currentlyProcessingSegments.get(tier); if (segments == null) { - segments = new ConcurrentSkipListSet(); + segments = new ConcurrentHashMap(); currentlyProcessingSegments.put(tier, segments); } if (segments.size() < maxReplicants) { - segments.add(segmentId); + segments.put(segmentId, serverId); return true; } return false; } - public void removeSegment(String tier, String segmentId) + public void removeSegment(String tier, String segmentId, String serverId) { - Set segments = currentlyProcessingSegments.get(tier); + Map segments = currentlyProcessingSegments.get(tier); if (segments != null) { segments.remove(segmentId); } @@ -140,7 +144,7 @@ public class ReplicationThrottler public int getNumProcessing(String tier) { - Set segments = currentlyProcessingSegments.get(tier); + Map segments = currentlyProcessingSegments.get(tier); return (segments == null) ? 0 : segments.size(); } @@ -168,5 +172,17 @@ public class ReplicationThrottler { lifetimes.put(tier, maxLifetime); } + + public List getCurrentlyProcessingSegmentsAndHosts(String tier) + { + Map segments = currentlyProcessingSegments.get(tier); + List retVal = Lists.newArrayList(); + for (Map.Entry entry : segments.entrySet()) { + retVal.add( + String.format("%s ON %s", entry.getKey(), entry.getValue()) + ); + } + return retVal; + } } } diff --git a/server/src/main/java/com/metamx/druid/master/rules/LoadRule.java b/server/src/main/java/com/metamx/druid/master/rules/LoadRule.java index a7b3b28358c..2f1819c9274 100644 --- a/server/src/main/java/com/metamx/druid/master/rules/LoadRule.java +++ b/server/src/main/java/com/metamx/druid/master/rules/LoadRule.java @@ -74,7 +74,7 @@ public abstract class LoadRule implements Rule List assignedServers = Lists.newArrayList(); while (totalReplicants < expectedReplicants) { - ServerHolder holder = serverQueue.pollFirst(); + final ServerHolder holder = serverQueue.pollFirst(); if (holder == null) { log.warn( "Not enough %s servers[%d] to assign segment[%s]! Expected Replicants[%d]", @@ -113,7 +113,11 @@ public abstract class LoadRule implements Rule if (totalReplicants > 0) { // don't throttle if there's only 1 copy of this segment in the cluster if (!replicationManager.canAddReplicant(getTier()) || - !replicationManager.registerReplicantCreation(getTier(), segment.getIdentifier())) { + !replicationManager.registerReplicantCreation( + getTier(), + segment.getIdentifier(), + holder.getServer().getHost() + )) { serverQueue.add(holder); break; } @@ -126,7 +130,11 @@ public abstract class LoadRule implements Rule @Override protected void execute() { - replicationManager.unregisterReplicantCreation(getTier(), segment.getIdentifier()); + replicationManager.unregisterReplicantCreation( + getTier(), + segment.getIdentifier(), + holder.getServer().getHost() + ); } } ); @@ -174,21 +182,25 @@ public abstract class LoadRule implements Rule List droppedServers = Lists.newArrayList(); while (actualNumReplicantsForType > expectedNumReplicantsForType) { - ServerHolder holder = serverQueue.pollLast(); + final ServerHolder holder = serverQueue.pollLast(); if (holder == null) { log.warn("Wtf, holder was null? I have no servers serving [%s]?", segment.getIdentifier()); break; } - if (expectedNumReplicantsForType > 0) { // don't throttle unless we are removing extra replicants - if (!replicationManager.canDestroyReplicant(getTier()) || - !replicationManager.registerReplicantTermination(getTier(), segment.getIdentifier())) { - serverQueue.add(holder); - break; - } - } - if (holder.isServingSegment(segment)) { + if (expectedNumReplicantsForType > 0) { // don't throttle unless we are removing extra replicants + if (!replicationManager.canDestroyReplicant(getTier()) || + !replicationManager.registerReplicantTermination( + getTier(), + segment.getIdentifier(), + holder.getServer().getHost() + )) { + serverQueue.add(holder); + break; + } + } + holder.getPeon().dropSegment( segment, new LoadPeonCallback() @@ -196,7 +208,11 @@ public abstract class LoadRule implements Rule @Override protected void execute() { - replicationManager.unregisterReplicantTermination(getTier(), segment.getIdentifier()); + replicationManager.unregisterReplicantTermination( + getTier(), + segment.getIdentifier(), + holder.getServer().getHost() + ); } } );