Merge branch 'master' into subquery

This commit is contained in:
Yuval Oren 2014-02-06 15:36:29 -08:00
commit bd11309593
28 changed files with 197 additions and 77 deletions

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.54-SNAPSHOT</version>
<version>0.6.57-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.54-SNAPSHOT</version>
<version>0.6.57-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -87,7 +87,8 @@ public class DbConnector
+ " `payload` longblob NOT NULL,\n"
+ " `status_payload` longblob NOT NULL,\n"
+ " `active` tinyint(1) NOT NULL DEFAULT '0',\n"
+ " PRIMARY KEY (`id`)\n"
+ " PRIMARY KEY (`id`),\n"
+ " KEY (active, created_date(100))\n"
+ ")",
taskTableName
)

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.54-SNAPSHOT</version>
<version>0.6.57-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.54-SNAPSHOT</version>
<version>0.6.57-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.54-SNAPSHOT</version>
<version>0.6.57-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.54-SNAPSHOT</version>
<version>0.6.57-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.54-SNAPSHOT</version>
<version>0.6.57-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -46,7 +46,6 @@ import io.druid.indexing.overlord.scaling.ResourceManagementScheduler;
import io.druid.indexing.overlord.setup.WorkerSetupData;
import io.druid.tasklogs.TaskLogStreamer;
import io.druid.timeline.DataSegment;
import org.apache.zookeeper.data.Stat;
import org.joda.time.DateTime;
import javax.ws.rs.Consumes;

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.54-SNAPSHOT</version>
<version>0.6.57-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.54-SNAPSHOT</version>
<version>0.6.57-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -23,7 +23,7 @@
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<packaging>pom</packaging>
<version>0.6.54-SNAPSHOT</version>
<version>0.6.57-SNAPSHOT</version>
<name>druid</name>
<description>druid</description>
<scm>

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.54-SNAPSHOT</version>
<version>0.6.57-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -148,7 +148,7 @@ public class DefaultLimitSpec implements LimitSpec
@Override
public int compare(Row left, Row right)
{
return comparator.compare(left.getFloatMetric(column), right.getFloatMetric(column));
return comparator.compare(left.getRaw(column), right.getRaw(column));
}
};
}

View File

@ -9,7 +9,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.54-SNAPSHOT</version>
<version>0.6.57-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.54-SNAPSHOT</version>
<version>0.6.57-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.54-SNAPSHOT</version>
<version>0.6.57-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -314,7 +314,7 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
final MultipleSpecificSegmentSpec segmentSpec = new MultipleSpecificSegmentSpec(descriptors);
List<Interval> intervals = segmentSpec.getIntervals();
if (server.isRealtime() || !populateCache || isBySegment) {
if (!server.isAssignable() || !populateCache || isBySegment) {
resultSeqToAdd = clientQueryable.run(query.withQuerySegmentSpec(segmentSpec));
} else {
resultSeqToAdd = toolChest.mergeSequences(

View File

@ -136,9 +136,9 @@ public class DruidServer implements Comparable
return Collections.unmodifiableMap(segments);
}
public boolean isRealtime()
public boolean isAssignable()
{
return getType().equalsIgnoreCase("realtime");
return getType().equalsIgnoreCase("historical") || getType().equalsIgnoreCase("bridge");
}
public DataSegment getSegment(String segmentName)

View File

@ -704,7 +704,7 @@ public class RealtimePlumber implements Plumber
return ServerView.CallbackAction.UNREGISTER;
}
if (server.isRealtime()) {
if (!server.isAssignable()) {
return ServerView.CallbackAction.CONTINUE;
}

View File

@ -291,7 +291,7 @@ public class DruidClusterBridge
DruidServer input
)
{
return !input.isRealtime();
return input.isAssignable();
}
}
);

View File

@ -96,8 +96,8 @@ public class DruidCoordinator
private final Object lock = new Object();
private volatile boolean started = false;
private volatile int leaderCounter = 0;
private volatile boolean leader = false;
private volatile AtomicReference<CoordinatorDynamicConfig> dynamicConfigs;
private final DruidCoordinatorConfig config;
private final ZkPathsConfig zkPaths;
@ -184,7 +184,6 @@ public class DruidCoordinator
this.exec = scheduledExecutorFactory.create(1, "Coordinator-Exec--%d");
this.leaderLatch = new AtomicReference<>(null);
this.dynamicConfigs = new AtomicReference<>(null);
this.loadManagementPeons = loadQueuePeonMap;
}
@ -280,7 +279,11 @@ public class DruidCoordinator
public CoordinatorDynamicConfig getDynamicConfigs()
{
return dynamicConfigs.get();
return configManager.watch(
CoordinatorDynamicConfig.CONFIG_KEY,
CoordinatorDynamicConfig.class,
new CoordinatorDynamicConfig.Builder().build()
).get();
}
public void removeSegment(DataSegment segment)
@ -533,6 +536,7 @@ public class DruidCoordinator
log.info("I am the leader of the coordinators, all must bow!");
try {
leaderCounter++;
leader = true;
databaseSegmentManager.start();
databaseRuleManager.start();
@ -540,11 +544,6 @@ public class DruidCoordinator
serviceAnnouncer.announce(self);
final List<Pair<? extends CoordinatorRunnable, Duration>> coordinatorRunnables = Lists.newArrayList();
dynamicConfigs = configManager.watch(
CoordinatorDynamicConfig.CONFIG_KEY,
CoordinatorDynamicConfig.class,
new CoordinatorDynamicConfig.Builder().build()
);
coordinatorRunnables.add(Pair.of(new CoordinatorHistoricalManagerRunnable(), config.getCoordinatorPeriod()));
if (indexingServiceClient != null) {
coordinatorRunnables.add(
@ -562,6 +561,7 @@ public class DruidCoordinator
);
}
final int startingLeaderCounter = leaderCounter;
for (final Pair<? extends CoordinatorRunnable, Duration> coordinatorRunnable : coordinatorRunnables) {
ScheduledExecutors.scheduleWithFixedDelay(
exec,
@ -574,10 +574,10 @@ public class DruidCoordinator
@Override
public ScheduledExecutors.Signal call()
{
if (leader) {
if (leader && startingLeaderCounter == leaderCounter) {
theRunnable.run();
}
if (leader) { // (We might no longer be leader)
if (leader && startingLeaderCounter == leaderCounter) { // (We might no longer be leader)
return ScheduledExecutors.Signal.REPEAT;
} else {
return ScheduledExecutors.Signal.STOP;
@ -610,6 +610,8 @@ public class DruidCoordinator
{
synchronized (lock) {
try {
leaderCounter++;
log.info("I am no longer the leader...");
for (String server : loadManagementPeons.keySet()) {
@ -739,11 +741,10 @@ public class DruidCoordinator
DruidCoordinatorRuntimeParams.newBuilder()
.withStartTime(startTime)
.withDatasources(databaseSegmentManager.getInventory())
.withDynamicConfigs(dynamicConfigs.get())
.withDynamicConfigs(getDynamicConfigs())
.withEmitter(emitter)
.build();
for (DruidCoordinatorHelper helper : helpers) {
params = helper.run(params);
}
@ -777,7 +778,7 @@ public class DruidCoordinator
DruidServer input
)
{
return !input.isRealtime();
return input.isAssignable();
}
}
);
@ -826,7 +827,6 @@ public class DruidCoordinator
.withLoadManagementPeons(loadManagementPeons)
.withSegmentReplicantLookup(segmentReplicantLookup)
.withBalancerReferenceTimestamp(DateTime.now())
.withDynamicConfigs(dynamicConfigs.get())
.build();
}
},

View File

@ -83,12 +83,22 @@ public class SegmentReplicantLookup
}
public Map<String, Integer> getLoadingTiers(String segmentId)
{
Map<String, Integer> retVal = loadingSegments.row(segmentId);
return (retVal == null) ? Maps.<String, Integer>newHashMap() : retVal;
}
{
Map<String, Integer> retVal = loadingSegments.row(segmentId);
return (retVal == null) ? Maps.<String, Integer>newHashMap() : retVal;
}
public int getClusterReplicants(String segmentId, String tier)
public int getLoadedReplicants(String segmentId)
{
Map<String, Integer> allTiers = segmentsInCluster.row(segmentId);
int retVal = 0;
for (Integer replicants : allTiers.values()) {
retVal += replicants;
}
return retVal;
}
public int getLoadedReplicants(String segmentId, String tier)
{
Integer retVal = segmentsInCluster.get(segmentId, tier);
return (retVal == null) ? 0 : retVal;
@ -100,8 +110,23 @@ public class SegmentReplicantLookup
return (retVal == null) ? 0 : retVal;
}
public int getLoadingReplicants(String segmentId)
{
Map<String, Integer> allTiers = loadingSegments.row(segmentId);
int retVal = 0;
for (Integer replicants : allTiers.values()) {
retVal += replicants;
}
return retVal;
}
public int getTotalReplicants(String segmentId)
{
return getLoadedReplicants(segmentId) + getLoadingReplicants(segmentId);
}
public int getTotalReplicants(String segmentId, String tier)
{
return getClusterReplicants(segmentId, tier) + getLoadingReplicants(segmentId, tier);
return getLoadedReplicants(segmentId, tier) + getLoadingReplicants(segmentId, tier);
}
}

View File

@ -42,6 +42,8 @@ import java.util.Map;
public abstract class LoadRule implements Rule
{
private static final EmittingLogger log = new EmittingLogger(LoadRule.class);
private static final String assignedCount = "assignedCount";
private static final String droppedCount = "droppedCount";
@Override
public CoordinatorStats run(DruidCoordinator coordinator, DruidCoordinatorRuntimeParams params, DataSegment segment)
@ -49,13 +51,17 @@ public abstract class LoadRule implements Rule
CoordinatorStats stats = new CoordinatorStats();
final Map<String, Integer> loadStatus = Maps.newHashMap();
int totalReplicantsInCluster = params.getSegmentReplicantLookup().getTotalReplicants(segment.getIdentifier());
for (Map.Entry<String, Integer> entry : getTieredReplicants().entrySet()) {
final String tier = entry.getKey();
final int expectedReplicants = entry.getValue();
final int expectedReplicantsInTier = entry.getValue();
final int totalReplicantsInTier = params.getSegmentReplicantLookup()
.getTotalReplicants(segment.getIdentifier(), tier);
final int loadedReplicantsInTier = params.getSegmentReplicantLookup()
.getLoadedReplicants(segment.getIdentifier(), tier);
int totalReplicants = params.getSegmentReplicantLookup().getTotalReplicants(segment.getIdentifier(), tier);
MinMaxPriorityQueue<ServerHolder> serverQueue = params.getDruidCluster().getServersByTier(tier);
final MinMaxPriorityQueue<ServerHolder> serverQueue = params.getDruidCluster().getServersByTier(tier);
if (serverQueue == null) {
log.makeAlert("Tier[%s] has no servers! Check your cluster configuration!", tier).emit();
return stats;
@ -65,22 +71,21 @@ public abstract class LoadRule implements Rule
final DateTime referenceTimestamp = params.getBalancerReferenceTimestamp();
final BalancerStrategy strategy = params.getBalancerStrategyFactory().createBalancerStrategy(referenceTimestamp);
if (params.getAvailableSegments().contains(segment)) {
stats.accumulate(
assign(
params.getReplicationManager(),
tier,
expectedReplicants,
totalReplicants,
strategy,
serverHolderList,
segment
)
CoordinatorStats assignStats = assign(
params.getReplicationManager(),
tier,
totalReplicantsInCluster,
expectedReplicantsInTier,
totalReplicantsInTier,
strategy,
serverHolderList,
segment
);
stats.accumulate(assignStats);
totalReplicantsInCluster += assignStats.getPerTierStats().get(assignedCount).get(tier).get();
}
int clusterReplicants = params.getSegmentReplicantLookup()
.getClusterReplicants(segment.getIdentifier(), tier);
loadStatus.put(tier, expectedReplicants - clusterReplicants);
loadStatus.put(tier, expectedReplicantsInTier - loadedReplicantsInTier);
}
// Remove over-replication
stats.accumulate(drop(loadStatus, segment, params));
@ -92,18 +97,21 @@ public abstract class LoadRule implements Rule
private CoordinatorStats assign(
final ReplicationThrottler replicationManager,
final String tier,
final int expectedReplicants,
int totalReplicants,
final int totalReplicantsInCluster,
final int expectedReplicantsInTier,
final int totalReplicantsInTier,
final BalancerStrategy strategy,
final List<ServerHolder> serverHolderList,
final DataSegment segment
)
{
final CoordinatorStats stats = new CoordinatorStats();
stats.addToTieredStat("assignedCount", tier, 0);
stats.addToTieredStat(assignedCount, tier, 0);
while (totalReplicants < expectedReplicants) {
boolean replicate = totalReplicants > 0;
int currReplicantsInTier = totalReplicantsInTier;
int currTotalReplicantsInCluster = totalReplicantsInCluster;
while (currReplicantsInTier < expectedReplicantsInTier) {
boolean replicate = currTotalReplicantsInCluster > 0;
if (replicate && !replicationManager.canCreateReplicant(tier)) {
break;
@ -116,7 +124,7 @@ public abstract class LoadRule implements Rule
"Not enough [%s] servers or node capacity to assign segment[%s]! Expected Replicants[%d]",
tier,
segment.getIdentifier(),
expectedReplicants
expectedReplicantsInTier
);
break;
}
@ -143,8 +151,9 @@ public abstract class LoadRule implements Rule
}
);
stats.addToTieredStat("assignedCount", tier, 1);
++totalReplicants;
stats.addToTieredStat(assignedCount, tier, 1);
++currReplicantsInTier;
++currTotalReplicantsInCluster;
}
return stats;
@ -162,7 +171,7 @@ public abstract class LoadRule implements Rule
return stats;
}
// Make sure we have enough actual replicants in the correct tiers in the cluster before doing anything
// Make sure we have enough loaded replicants in the correct tiers in the cluster before doing anything
for (Integer leftToLoad : loadStatus.values()) {
if (leftToLoad > 0) {
return stats;
@ -176,10 +185,10 @@ public abstract class LoadRule implements Rule
for (Map.Entry<String, Integer> entry : replicantsByTier.entrySet()) {
final String tier = entry.getKey();
int actualNumReplicantsForTier = entry.getValue();
int loadedNumReplicantsForTier = entry.getValue();
int expectedNumReplicantsForTier = getNumReplicants(tier);
stats.addToTieredStat("droppedCount", tier, 0);
stats.addToTieredStat(droppedCount, tier, 0);
MinMaxPriorityQueue<ServerHolder> serverQueue = params.getDruidCluster().get(tier);
if (serverQueue == null) {
@ -188,7 +197,7 @@ public abstract class LoadRule implements Rule
}
List<ServerHolder> droppedServers = Lists.newArrayList();
while (actualNumReplicantsForTier > expectedNumReplicantsForTier) {
while (loadedNumReplicantsForTier > expectedNumReplicantsForTier) {
final ServerHolder holder = serverQueue.pollLast();
if (holder == null) {
log.warn("Wtf, holder was null? I have no servers serving [%s]?", segment.getIdentifier());
@ -224,8 +233,8 @@ public abstract class LoadRule implements Rule
}
}
);
--actualNumReplicantsForTier;
stats.addToTieredStat("droppedCount", tier, 1);
--loadedNumReplicantsForTier;
stats.addToTieredStat(droppedCount, tier, 1);
}
droppedServers.add(holder);
}

View File

@ -8,7 +8,6 @@ $(document).ready(function() {
function handleTable(dontDisplay)
{
console.log(type);
$.get(basePath + type + '?full', function(data) {
buildTable(data, $('#result_table'), dontDisplay);
@ -76,7 +75,6 @@ $(document).ready(function() {
}
$('#view_button').click(function() {
console.log("here");
type = $('#select_type').val() + '';
view = $('#select_view').val() + '';

View File

@ -979,6 +979,94 @@ public class DruidCoordinatorRuleRunnerTest
EasyMock.verify(mockPeon);
}
/**
* Nodes:
* hot - nothing loaded
* _default_tier - 1 segment loaded
*
* @throws Exception
*/
@Test
public void testReplicantThrottleAcrossTiers() throws Exception
{
mockPeon.loadSegment(EasyMock.<DataSegment>anyObject(), EasyMock.<LoadPeonCallback>anyObject());
EasyMock.expectLastCall().atLeastOnce();
EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.<DataSegment>newHashSet()).atLeastOnce();
EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(0L).atLeastOnce();
EasyMock.replay(mockPeon);
EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.<String>anyObject())).andReturn(
Lists.<Rule>newArrayList(
new IntervalLoadRule(
new Interval("2012-01-01T00:00:00.000Z/2013-01-01T00:00:00.000Z"),
ImmutableMap.<String, Integer>of(
"hot", 1,
DruidServer.DEFAULT_TIER, 1
),
null,
null
)
)
).atLeastOnce();
EasyMock.replay(databaseRuleManager);
DruidCluster druidCluster = new DruidCluster(
ImmutableMap.of(
"hot",
MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create(
Arrays.asList(
new ServerHolder(
new DruidServer(
"serverHot",
"hostHot",
1000,
"historical",
"hot",
0
),
mockPeon
)
)
),
DruidServer.DEFAULT_TIER,
MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create(
Arrays.asList(
new ServerHolder(
new DruidServer(
"serverNorm",
"hostNorm",
1000,
"historical",
DruidServer.DEFAULT_TIER,
0
),
mockPeon
)
)
)
)
);
DruidCoordinatorRuntimeParams params =
new DruidCoordinatorRuntimeParams.Builder()
.withDruidCluster(druidCluster)
.withAvailableSegments(availableSegments)
.withDatabaseRuleManager(databaseRuleManager)
.withSegmentReplicantLookup(SegmentReplicantLookup.make(new DruidCluster()))
.build();
DruidCoordinatorRuleRunner runner = new DruidCoordinatorRuleRunner(new ReplicationThrottler(7, 1), coordinator);
DruidCoordinatorRuntimeParams afterParams = runner.run(params);
CoordinatorStats stats = afterParams.getCoordinatorStats();
Assert.assertTrue(stats.getPerTierStats().get("assignedCount").get("hot").get() == 24);
Assert.assertTrue(stats.getPerTierStats().get("assignedCount").get(DruidServer.DEFAULT_TIER).get() == 7);
Assert.assertTrue(stats.getPerTierStats().get("unassignedCount") == null);
Assert.assertTrue(stats.getPerTierStats().get("unassignedSize") == null);
EasyMock.verify(mockPeon);
}
@Test
public void testDropReplicantThrottle() throws Exception
{

View File

@ -59,7 +59,7 @@ public class LoadRuleTest
public void setUp() throws Exception
{
mockPeon = EasyMock.createMock(LoadQueuePeon.class);
throttler = new ReplicationThrottler(1, 1);
throttler = new ReplicationThrottler(2, 1);
for (String tier : Arrays.asList("hot", DruidServer.DEFAULT_TIER)) {
throttler.updateReplicationState(tier);
throttler.updateTerminationState(tier);

View File

@ -27,7 +27,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.54-SNAPSHOT</version>
<version>0.6.57-SNAPSHOT</version>
</parent>
<dependencies>