fix bug with enabling segments and improve replication logic

This commit is contained in:
fjy 2013-07-18 13:24:52 -07:00
parent cef0e50881
commit 7d58a2d8db
3 changed files with 41 additions and 39 deletions

View File

@ -20,7 +20,6 @@
package com.metamx.druid.db;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
@ -33,7 +32,7 @@ import com.metamx.druid.TimelineObjectHolder;
import com.metamx.druid.VersionedIntervalTimeline;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.client.DruidDataSource;
import com.metamx.druid.partition.PartitionChunk;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.joda.time.Interval;
@ -45,7 +44,6 @@ import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.StatementContext;
import org.skife.jdbi.v2.tweak.HandleCallback;
import javax.annotation.Nullable;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collection;
@ -175,17 +173,16 @@ public class DatabaseSegmentManager
}
);
final List<DataSegment> segments = Lists.transform(
segmentTimeline.lookup(new Interval(new DateTime(0), new DateTime("3000-01-01"))),
new Function<TimelineObjectHolder<String, DataSegment>, DataSegment>()
{
@Override
public DataSegment apply(@Nullable TimelineObjectHolder<String, DataSegment> input)
{
return input.getObject().getChunk(0).getObject();
}
}
);
final List<DataSegment> segments = Lists.newArrayList();
for (TimelineObjectHolder<String, DataSegment> objectHolder : segmentTimeline.lookup(
new Interval(
"0000-01-01/3000-01-01"
)
)) {
for (PartitionChunk<DataSegment> partitionChunk : objectHolder.getObject()) {
segments.add(partitionChunk.getObject());
}
}
if (segments.isEmpty()) {
log.warn("No segments found in the database!");
@ -451,4 +448,4 @@ public class DatabaseSegmentManager
log.error(e, "Problem polling DB.");
}
}
}
}

View File

@ -84,19 +84,19 @@ public class ReplicationThrottler
}
}
public boolean canAddReplicant(String tier)
public boolean canCreateReplicant(String tier)
{
return replicatingLookup.get(tier);
return replicatingLookup.get(tier) && !currentlyReplicating.isAtMaxReplicants(tier);
}
public boolean canDestroyReplicant(String tier)
{
return terminatingLookup.get(tier);
return terminatingLookup.get(tier) && !currentlyTerminating.isAtMaxReplicants(tier);
}
public boolean registerReplicantCreation(String tier, String segmentId, String serverId)
public void registerReplicantCreation(String tier, String segmentId, String serverId)
{
return currentlyReplicating.addSegment(tier, segmentId, serverId);
currentlyReplicating.addSegment(tier, segmentId, serverId);
}
public void unregisterReplicantCreation(String tier, String segmentId, String serverId)
@ -104,9 +104,9 @@ public class ReplicationThrottler
currentlyReplicating.removeSegment(tier, segmentId, serverId);
}
public boolean registerReplicantTermination(String tier, String segmentId, String serverId)
public void registerReplicantTermination(String tier, String segmentId, String serverId)
{
return currentlyTerminating.addSegment(tier, segmentId, serverId);
currentlyTerminating.addSegment(tier, segmentId, serverId);
}
public void unregisterReplicantTermination(String tier, String segmentId, String serverId)
@ -119,19 +119,23 @@ public class ReplicationThrottler
private final Map<String, ConcurrentHashMap<String, String>> currentlyProcessingSegments = Maps.newHashMap();
private final Map<String, Integer> lifetimes = Maps.newHashMap();
public boolean addSegment(String tier, String segmentId, String serverId)
public boolean isAtMaxReplicants(String tier)
{
final ConcurrentHashMap<String, String> segments = currentlyProcessingSegments.get(tier);
return (segments != null && segments.size() >= maxReplicants);
}
public void addSegment(String tier, String segmentId, String serverId)
{
ConcurrentHashMap<String, String> segments = currentlyProcessingSegments.get(tier);
if (segments == null) {
segments = new ConcurrentHashMap<String, String>();
currentlyProcessingSegments.put(tier, segments);
}
if (segments.size() < maxReplicants) {
segments.put(segmentId, serverId);
return true;
}
return false;
if (!isAtMaxReplicants(tier)) {
segments.put(segmentId, serverId);
}
}
public void removeSegment(String tier, String segmentId, String serverId)

View File

@ -94,7 +94,7 @@ public abstract class LoadRule implements Rule
while (totalReplicants < expectedReplicants) {
boolean replicate = totalReplicants > 0;
if (replicate && !replicationManager.canAddReplicant(getTier())) {
if (replicate && !replicationManager.canCreateReplicant(getTier())) {
break;
}
@ -110,10 +110,10 @@ public abstract class LoadRule implements Rule
break;
}
if (replicate && !replicationManager.registerReplicantCreation(
getTier(), segment.getIdentifier(), holder.getServer().getHost()
)) {
break;
if (replicate) {
replicationManager.registerReplicantCreation(
getTier(), segment.getIdentifier(), holder.getServer().getHost()
);
}
holder.getPeon().loadSegment(
@ -181,15 +181,16 @@ public abstract class LoadRule implements Rule
if (holder.isServingSegment(segment)) {
if (expectedNumReplicantsForType > 0) { // don't throttle unless we are removing extra replicants
if (!replicationManager.canDestroyReplicant(getTier()) ||
!replicationManager.registerReplicantTermination(
getTier(),
segment.getIdentifier(),
holder.getServer().getHost()
)) {
if (!replicationManager.canDestroyReplicant(getTier())) {
serverQueue.add(holder);
break;
}
replicationManager.registerReplicantTermination(
getTier(),
segment.getIdentifier(),
holder.getServer().getHost()
);
}
holder.getPeon().dropSegment(