From 7d58a2d8db45f56cd51e8d9e2e1a1e8c745857b8 Mon Sep 17 00:00:00 2001 From: fjy Date: Thu, 18 Jul 2013 13:24:52 -0700 Subject: [PATCH] fix bug with enabling segments and improve replication logic --- .../druid/db/DatabaseSegmentManager.java | 27 ++++++++--------- .../druid/master/ReplicationThrottler.java | 30 +++++++++++-------- .../metamx/druid/master/rules/LoadRule.java | 23 +++++++------- 3 files changed, 41 insertions(+), 39 deletions(-) diff --git a/server/src/main/java/com/metamx/druid/db/DatabaseSegmentManager.java b/server/src/main/java/com/metamx/druid/db/DatabaseSegmentManager.java index 53c853d5a21..2096e0bd70a 100644 --- a/server/src/main/java/com/metamx/druid/db/DatabaseSegmentManager.java +++ b/server/src/main/java/com/metamx/druid/db/DatabaseSegmentManager.java @@ -20,7 +20,6 @@ package com.metamx.druid.db; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Function; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Ordering; @@ -33,7 +32,7 @@ import com.metamx.druid.TimelineObjectHolder; import com.metamx.druid.VersionedIntervalTimeline; import com.metamx.druid.client.DataSegment; import com.metamx.druid.client.DruidDataSource; - +import com.metamx.druid.partition.PartitionChunk; import org.joda.time.DateTime; import org.joda.time.Duration; import org.joda.time.Interval; @@ -45,7 +44,6 @@ import org.skife.jdbi.v2.Handle; import org.skife.jdbi.v2.StatementContext; import org.skife.jdbi.v2.tweak.HandleCallback; -import javax.annotation.Nullable; import java.sql.SQLException; import java.util.ArrayList; import java.util.Collection; @@ -175,17 +173,16 @@ public class DatabaseSegmentManager } ); - final List segments = Lists.transform( - segmentTimeline.lookup(new Interval(new DateTime(0), new DateTime("3000-01-01"))), - new Function, DataSegment>() - { - @Override - public DataSegment apply(@Nullable TimelineObjectHolder input) - { - return input.getObject().getChunk(0).getObject(); - } - } - ); + final List segments = Lists.newArrayList(); + for (TimelineObjectHolder objectHolder : segmentTimeline.lookup( + new Interval( + "0000-01-01/3000-01-01" + ) + )) { + for (PartitionChunk partitionChunk : objectHolder.getObject()) { + segments.add(partitionChunk.getObject()); + } + } if (segments.isEmpty()) { log.warn("No segments found in the database!"); @@ -451,4 +448,4 @@ public class DatabaseSegmentManager log.error(e, "Problem polling DB."); } } -} +} \ No newline at end of file diff --git a/server/src/main/java/com/metamx/druid/master/ReplicationThrottler.java b/server/src/main/java/com/metamx/druid/master/ReplicationThrottler.java index 85743143438..fc2774374bd 100644 --- a/server/src/main/java/com/metamx/druid/master/ReplicationThrottler.java +++ b/server/src/main/java/com/metamx/druid/master/ReplicationThrottler.java @@ -84,19 +84,19 @@ public class ReplicationThrottler } } - public boolean canAddReplicant(String tier) + public boolean canCreateReplicant(String tier) { - return replicatingLookup.get(tier); + return replicatingLookup.get(tier) && !currentlyReplicating.isAtMaxReplicants(tier); } public boolean canDestroyReplicant(String tier) { - return terminatingLookup.get(tier); + return terminatingLookup.get(tier) && !currentlyTerminating.isAtMaxReplicants(tier); } - public boolean registerReplicantCreation(String tier, String segmentId, String serverId) + public void registerReplicantCreation(String tier, String segmentId, String serverId) { - return currentlyReplicating.addSegment(tier, segmentId, serverId); + currentlyReplicating.addSegment(tier, segmentId, serverId); } public void unregisterReplicantCreation(String tier, String segmentId, String serverId) @@ -104,9 +104,9 @@ public class ReplicationThrottler currentlyReplicating.removeSegment(tier, segmentId, serverId); } - public boolean registerReplicantTermination(String tier, String segmentId, String serverId) + public void registerReplicantTermination(String tier, String segmentId, String serverId) { - return currentlyTerminating.addSegment(tier, segmentId, serverId); + currentlyTerminating.addSegment(tier, segmentId, serverId); } public void unregisterReplicantTermination(String tier, String segmentId, String serverId) @@ -119,19 +119,23 @@ public class ReplicationThrottler private final Map> currentlyProcessingSegments = Maps.newHashMap(); private final Map lifetimes = Maps.newHashMap(); - public boolean addSegment(String tier, String segmentId, String serverId) + public boolean isAtMaxReplicants(String tier) + { + final ConcurrentHashMap segments = currentlyProcessingSegments.get(tier); + return (segments != null && segments.size() >= maxReplicants); + } + + public void addSegment(String tier, String segmentId, String serverId) { ConcurrentHashMap segments = currentlyProcessingSegments.get(tier); if (segments == null) { segments = new ConcurrentHashMap(); currentlyProcessingSegments.put(tier, segments); } - if (segments.size() < maxReplicants) { - segments.put(segmentId, serverId); - return true; - } - return false; + if (!isAtMaxReplicants(tier)) { + segments.put(segmentId, serverId); + } } public void removeSegment(String tier, String segmentId, String serverId) diff --git a/server/src/main/java/com/metamx/druid/master/rules/LoadRule.java b/server/src/main/java/com/metamx/druid/master/rules/LoadRule.java index 3c6c58593e7..8a9a014aa79 100644 --- a/server/src/main/java/com/metamx/druid/master/rules/LoadRule.java +++ b/server/src/main/java/com/metamx/druid/master/rules/LoadRule.java @@ -94,7 +94,7 @@ public abstract class LoadRule implements Rule while (totalReplicants < expectedReplicants) { boolean replicate = totalReplicants > 0; - if (replicate && !replicationManager.canAddReplicant(getTier())) { + if (replicate && !replicationManager.canCreateReplicant(getTier())) { break; } @@ -110,10 +110,10 @@ public abstract class LoadRule implements Rule break; } - if (replicate && !replicationManager.registerReplicantCreation( - getTier(), segment.getIdentifier(), holder.getServer().getHost() - )) { - break; + if (replicate) { + replicationManager.registerReplicantCreation( + getTier(), segment.getIdentifier(), holder.getServer().getHost() + ); } holder.getPeon().loadSegment( @@ -181,15 +181,16 @@ public abstract class LoadRule implements Rule if (holder.isServingSegment(segment)) { if (expectedNumReplicantsForType > 0) { // don't throttle unless we are removing extra replicants - if (!replicationManager.canDestroyReplicant(getTier()) || - !replicationManager.registerReplicantTermination( - getTier(), - segment.getIdentifier(), - holder.getServer().getHost() - )) { + if (!replicationManager.canDestroyReplicant(getTier())) { serverQueue.add(holder); break; } + + replicationManager.registerReplicantTermination( + getTier(), + segment.getIdentifier(), + holder.getServer().getHost() + ); } holder.getPeon().dropSegment(