fix a case where a segment may get stuck in the terminate queue

This commit is contained in:
Fangjin Yang 2013-02-11 10:56:30 -08:00
parent 9ac5eeebb3
commit ac93f752b0
2 changed files with 66 additions and 34 deletions

View File

@ -19,12 +19,13 @@
package com.metamx.druid.master;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.metamx.emitter.EmittingLogger;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ConcurrentHashMap;
/**
* The ReplicationThrottler is used to throttle the number of replicants that are created and destroyed.
@ -61,17 +62,20 @@ public class ReplicationThrottler
int size = holder.getNumProcessing(tier);
if (size != 0) {
log.info(
"[%s]: Replicant %s queue still has %d segments. Lifetime[%d]",
"[%s]: Replicant %s queue still has %d segments. Lifetime[%d]. Segments %s",
tier,
type,
size,
holder.getLifetime(tier)
holder.getLifetime(tier),
holder.getCurrentlyProcessingSegmentsAndHosts(tier)
);
holder.reduceLifetime(tier);
lookup.put(tier, false);
if (holder.getLifetime(tier) < 0) {
log.makeAlert("[%s]: Replicant %s queue stuck after %d+ runs!", tier, type, maxLifetime).emit();
log.makeAlert("[%s]: Replicant %s queue stuck after %d+ runs!", tier, type, maxLifetime)
.addData("segments", holder.getCurrentlyProcessingSegmentsAndHosts(tier))
.emit();
}
} else {
log.info("[%s]: Replicant %s queue is empty.", tier, type);
@ -90,49 +94,49 @@ public class ReplicationThrottler
return terminatingLookup.get(tier);
}
public boolean registerReplicantCreation(String tier, String segmentId)
public boolean registerReplicantCreation(String tier, String segmentId, String serverId)
{
return currentlyReplicating.addSegment(tier, segmentId);
return currentlyReplicating.addSegment(tier, segmentId, serverId);
}
public void unregisterReplicantCreation(String tier, String segmentId)
public void unregisterReplicantCreation(String tier, String segmentId, String serverId)
{
currentlyReplicating.removeSegment(tier, segmentId);
currentlyReplicating.removeSegment(tier, segmentId, serverId);
}
public boolean registerReplicantTermination(String tier, String segmentId)
public boolean registerReplicantTermination(String tier, String segmentId, String serverId)
{
return currentlyTerminating.addSegment(tier, segmentId);
return currentlyTerminating.addSegment(tier, segmentId, serverId);
}
public void unregisterReplicantTermination(String tier, String segmentId)
public void unregisterReplicantTermination(String tier, String segmentId, String serverId)
{
currentlyTerminating.removeSegment(tier, segmentId);
currentlyTerminating.removeSegment(tier, segmentId, serverId);
}
private class ReplicatorSegmentHolder
{
private final Map<String, ConcurrentSkipListSet<String>> currentlyProcessingSegments = Maps.newHashMap();
private final Map<String, ConcurrentHashMap<String, String>> currentlyProcessingSegments = Maps.newHashMap();
private final Map<String, Integer> lifetimes = Maps.newHashMap();
public boolean addSegment(String tier, String segmentId)
public boolean addSegment(String tier, String segmentId, String serverId)
{
ConcurrentSkipListSet<String> segments = currentlyProcessingSegments.get(tier);
ConcurrentHashMap<String, String> segments = currentlyProcessingSegments.get(tier);
if (segments == null) {
segments = new ConcurrentSkipListSet<String>();
segments = new ConcurrentHashMap<String, String>();
currentlyProcessingSegments.put(tier, segments);
}
if (segments.size() < maxReplicants) {
segments.add(segmentId);
segments.put(segmentId, serverId);
return true;
}
return false;
}
public void removeSegment(String tier, String segmentId)
public void removeSegment(String tier, String segmentId, String serverId)
{
Set<String> segments = currentlyProcessingSegments.get(tier);
Map<String, String> segments = currentlyProcessingSegments.get(tier);
if (segments != null) {
segments.remove(segmentId);
}
@ -140,7 +144,7 @@ public class ReplicationThrottler
public int getNumProcessing(String tier)
{
Set<String> segments = currentlyProcessingSegments.get(tier);
Map<String, String> segments = currentlyProcessingSegments.get(tier);
return (segments == null) ? 0 : segments.size();
}
@ -168,5 +172,17 @@ public class ReplicationThrottler
{
lifetimes.put(tier, maxLifetime);
}
public List<String> getCurrentlyProcessingSegmentsAndHosts(String tier)
{
Map<String, String> segments = currentlyProcessingSegments.get(tier);
List<String> retVal = Lists.newArrayList();
for (Map.Entry<String, String> entry : segments.entrySet()) {
retVal.add(
String.format("%s ON %s", entry.getKey(), entry.getValue())
);
}
return retVal;
}
}
}

View File

@ -74,7 +74,7 @@ public abstract class LoadRule implements Rule
List<ServerHolder> assignedServers = Lists.newArrayList();
while (totalReplicants < expectedReplicants) {
ServerHolder holder = serverQueue.pollFirst();
final ServerHolder holder = serverQueue.pollFirst();
if (holder == null) {
log.warn(
"Not enough %s servers[%d] to assign segment[%s]! Expected Replicants[%d]",
@ -113,7 +113,11 @@ public abstract class LoadRule implements Rule
if (totalReplicants > 0) { // don't throttle if there's only 1 copy of this segment in the cluster
if (!replicationManager.canAddReplicant(getTier()) ||
!replicationManager.registerReplicantCreation(getTier(), segment.getIdentifier())) {
!replicationManager.registerReplicantCreation(
getTier(),
segment.getIdentifier(),
holder.getServer().getHost()
)) {
serverQueue.add(holder);
break;
}
@ -126,7 +130,11 @@ public abstract class LoadRule implements Rule
@Override
protected void execute()
{
replicationManager.unregisterReplicantCreation(getTier(), segment.getIdentifier());
replicationManager.unregisterReplicantCreation(
getTier(),
segment.getIdentifier(),
holder.getServer().getHost()
);
}
}
);
@ -174,21 +182,25 @@ public abstract class LoadRule implements Rule
List<ServerHolder> droppedServers = Lists.newArrayList();
while (actualNumReplicantsForType > expectedNumReplicantsForType) {
ServerHolder holder = serverQueue.pollLast();
final ServerHolder holder = serverQueue.pollLast();
if (holder == null) {
log.warn("Wtf, holder was null? I have no servers serving [%s]?", segment.getIdentifier());
break;
}
if (expectedNumReplicantsForType > 0) { // don't throttle unless we are removing extra replicants
if (!replicationManager.canDestroyReplicant(getTier()) ||
!replicationManager.registerReplicantTermination(getTier(), segment.getIdentifier())) {
serverQueue.add(holder);
break;
}
}
if (holder.isServingSegment(segment)) {
if (expectedNumReplicantsForType > 0) { // don't throttle unless we are removing extra replicants
if (!replicationManager.canDestroyReplicant(getTier()) ||
!replicationManager.registerReplicantTermination(
getTier(),
segment.getIdentifier(),
holder.getServer().getHost()
)) {
serverQueue.add(holder);
break;
}
}
holder.getPeon().dropSegment(
segment,
new LoadPeonCallback()
@ -196,7 +208,11 @@ public abstract class LoadRule implements Rule
@Override
protected void execute()
{
replicationManager.unregisterReplicantTermination(getTier(), segment.getIdentifier());
replicationManager.unregisterReplicantTermination(
getTier(),
segment.getIdentifier(),
holder.getServer().getHost()
);
}
}
);