Merge branch 'master' into new-schema

This commit is contained in:
fjy 2014-02-06 12:46:13 -08:00
commit 0f6af72ea4
31 changed files with 209 additions and 86 deletions

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.53-SNAPSHOT</version>
<version>0.6.57-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.53-SNAPSHOT</version>
<version>0.6.57-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -87,7 +87,8 @@ public class DbConnector
+ " `payload` longblob NOT NULL,\n"
+ " `status_payload` longblob NOT NULL,\n"
+ " `active` tinyint(1) NOT NULL DEFAULT '0',\n"
+ " PRIMARY KEY (`id`)\n"
+ " PRIMARY KEY (`id`),\n"
+ " KEY (active, created_date(100))\n"
+ ")",
taskTableName
)

View File

@ -1,8 +1,9 @@
---
layout: doc_page
---
Data Formats for Ingestion
==========================
# Data Formats for Ingestion
Druid can ingest data in JSON, CSV, or TSV. While most examples in the documentation use data in JSON format, it is not difficult to configure Druid to ingest CSV or TSV data.
## Formatting the Data
@ -10,7 +11,7 @@ The following are three samples of the data used in the [Wikipedia example](Tuto
_JSON_
```
```json
{"timestamp": "2013-08-31T01:02:33Z", "page": "Gypsy Danger", "language" : "en", "user" : "nuclear", "unpatrolled" : "true", "newPage" : "true", "robot": "false", "anonymous": "false", "namespace":"article", "continent":"North America", "country":"United States", "region":"Bay Area", "city":"San Francisco", "added": 57, "deleted": 200, "delta": -143}
{"timestamp": "2013-08-31T03:32:45Z", "page": "Striker Eureka", "language" : "en", "user" : "speed", "unpatrolled" : "false", "newPage" : "true", "robot": "true", "anonymous": "false", "namespace":"wikipedia", "continent":"Australia", "country":"Australia", "region":"Cantebury", "city":"Syndey", "added": 459, "deleted": 129, "delta": 330}
{"timestamp": "2013-08-31T07:11:21Z", "page": "Cherno Alpha", "language" : "ru", "user" : "masterYi", "unpatrolled" : "false", "newPage" : "true", "robot": "true", "anonymous": "false", "namespace":"article", "continent":"Asia", "country":"Russia", "region":"Oblast", "city":"Moscow", "added": 123, "deleted": 12, "delta": 111}
@ -43,7 +44,7 @@ Note that the CSV and TSV data do not contain column heads. This becomes importa
## Configuring Ingestion For the Indexing Service
If you use the [indexing service](Indexing-Service.html) for ingesting the data, a [task](Tasks.html) must be configured and submitted. Tasks are configured with a JSON object which, among other things, specifies the data source and type. In the Wikipedia example, JSON data was read from a local file. The task spec contains a firehose element to specify this:
...
```json
"firehose" : {
"type" : "local",
"baseDir" : "examples/indexing",
@ -58,13 +59,13 @@ If you use the [indexing service](Indexing-Service.html) for ingesting the data,
}
}
}
...
```
Specified here are the location of the datafile, the timestamp column, the format of the data, and the columns that will become dimensions in Druid.
Since the CSV data does not contain the column names, they will have to be added before that data can be processed:
...
```json
"firehose" : {
"type" : "local",
"baseDir" : "examples/indexing/",
@ -80,7 +81,7 @@ Since the CSV data does not contain the column names, they will have to be added
}
}
}
...
```
Note also that the filename extension and the data type were changed to "csv". For the TSV data, the same changes are made but with "tsv" for the filename extension and the data type.

View File

@ -28,6 +28,7 @@ h2. Data Ingestion
* "Batch":./Batch-ingestion.html
* "Indexing Service":./Indexing-Service.html
** "Tasks":./Tasks.html
* "Data Formats":./Data_formats.html
* "Ingestion FAQ":./Ingestion-FAQ.html
h2. Querying

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.53-SNAPSHOT</version>
<version>0.6.57-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.53-SNAPSHOT</version>
<version>0.6.57-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -18,8 +18,7 @@
~ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>io.druid.extensions</groupId>
<artifactId>druid-hll</artifactId>
@ -29,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.53-SNAPSHOT</version>
<version>0.6.57-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.53-SNAPSHOT</version>
<version>0.6.57-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.53-SNAPSHOT</version>
<version>0.6.57-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -32,6 +32,7 @@ import io.druid.curator.discovery.ServerDiscoverySelector;
import io.druid.indexing.common.RetryPolicy;
import io.druid.indexing.common.RetryPolicyFactory;
import io.druid.indexing.common.task.Task;
import org.jboss.netty.channel.ChannelException;
import org.joda.time.Duration;
import java.io.IOException;
@ -94,6 +95,7 @@ public class RemoteTaskActionClient implements TaskActionClient
}
catch (Exception e) {
Throwables.propagateIfInstanceOf(e.getCause(), IOException.class);
Throwables.propagateIfInstanceOf(e.getCause(), ChannelException.class);
throw Throwables.propagate(e);
}
@ -105,7 +107,7 @@ public class RemoteTaskActionClient implements TaskActionClient
return jsonMapper.convertValue(responseDict.get("result"), taskAction.getReturnTypeReference());
}
catch (IOException e) {
catch (IOException | ChannelException e) {
log.warn(e, "Exception submitting action for task[%s]", task.getId());
final Duration delay = retryPolicy.getAndIncrementRetryDelay();

View File

@ -46,7 +46,6 @@ import io.druid.indexing.overlord.scaling.ResourceManagementScheduler;
import io.druid.indexing.overlord.setup.WorkerSetupData;
import io.druid.tasklogs.TaskLogStreamer;
import io.druid.timeline.DataSegment;
import org.apache.zookeeper.data.Stat;
import org.joda.time.DateTime;
import javax.ws.rs.Consumes;

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.53-SNAPSHOT</version>
<version>0.6.57-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.53-SNAPSHOT</version>
<version>0.6.57-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -23,7 +23,7 @@
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<packaging>pom</packaging>
<version>0.6.53-SNAPSHOT</version>
<version>0.6.57-SNAPSHOT</version>
<name>druid</name>
<description>druid</description>
<scm>

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.53-SNAPSHOT</version>
<version>0.6.57-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -148,7 +148,7 @@ public class DefaultLimitSpec implements LimitSpec
@Override
public int compare(Row left, Row right)
{
return comparator.compare(left.getFloatMetric(column), right.getFloatMetric(column));
return comparator.compare(left.getRaw(column), right.getRaw(column));
}
};
}

View File

@ -9,7 +9,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.53-SNAPSHOT</version>
<version>0.6.57-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.53-SNAPSHOT</version>
<version>0.6.57-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.53-SNAPSHOT</version>
<version>0.6.57-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -313,7 +313,7 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
final MultipleSpecificSegmentSpec segmentSpec = new MultipleSpecificSegmentSpec(descriptors);
List<Interval> intervals = segmentSpec.getIntervals();
if (server.isRealtime() || !populateCache || isBySegment) {
if (!server.isAssignable() || !populateCache || isBySegment) {
resultSeqToAdd = clientQueryable.run(query.withQuerySegmentSpec(segmentSpec));
} else {
resultSeqToAdd = toolChest.mergeSequences(

View File

@ -136,9 +136,9 @@ public class DruidServer implements Comparable
return Collections.unmodifiableMap(segments);
}
public boolean isRealtime()
public boolean isAssignable()
{
return getType().equalsIgnoreCase("realtime");
return getType().equalsIgnoreCase("historical") || getType().equalsIgnoreCase("bridge");
}
public DataSegment getSegment(String segmentName)

View File

@ -704,7 +704,7 @@ public class RealtimePlumber implements Plumber
return ServerView.CallbackAction.UNREGISTER;
}
if (server.isRealtime()) {
if (!server.isAssignable()) {
return ServerView.CallbackAction.CONTINUE;
}

View File

@ -291,7 +291,7 @@ public class DruidClusterBridge
DruidServer input
)
{
return !input.isRealtime();
return input.isAssignable();
}
}
);

View File

@ -96,8 +96,8 @@ public class DruidCoordinator
private final Object lock = new Object();
private volatile boolean started = false;
private volatile int leaderCounter = 0;
private volatile boolean leader = false;
private volatile AtomicReference<CoordinatorDynamicConfig> dynamicConfigs;
private final DruidCoordinatorConfig config;
private final ZkPathsConfig zkPaths;
@ -184,7 +184,6 @@ public class DruidCoordinator
this.exec = scheduledExecutorFactory.create(1, "Coordinator-Exec--%d");
this.leaderLatch = new AtomicReference<>(null);
this.dynamicConfigs = new AtomicReference<>(null);
this.loadManagementPeons = loadQueuePeonMap;
}
@ -280,7 +279,11 @@ public class DruidCoordinator
public CoordinatorDynamicConfig getDynamicConfigs()
{
return dynamicConfigs.get();
return configManager.watch(
CoordinatorDynamicConfig.CONFIG_KEY,
CoordinatorDynamicConfig.class,
new CoordinatorDynamicConfig.Builder().build()
).get();
}
public void removeSegment(DataSegment segment)
@ -533,6 +536,7 @@ public class DruidCoordinator
log.info("I am the leader of the coordinators, all must bow!");
try {
leaderCounter++;
leader = true;
databaseSegmentManager.start();
databaseRuleManager.start();
@ -540,11 +544,6 @@ public class DruidCoordinator
serviceAnnouncer.announce(self);
final List<Pair<? extends CoordinatorRunnable, Duration>> coordinatorRunnables = Lists.newArrayList();
dynamicConfigs = configManager.watch(
CoordinatorDynamicConfig.CONFIG_KEY,
CoordinatorDynamicConfig.class,
new CoordinatorDynamicConfig.Builder().build()
);
coordinatorRunnables.add(Pair.of(new CoordinatorHistoricalManagerRunnable(), config.getCoordinatorPeriod()));
if (indexingServiceClient != null) {
coordinatorRunnables.add(
@ -562,6 +561,7 @@ public class DruidCoordinator
);
}
final int startingLeaderCounter = leaderCounter;
for (final Pair<? extends CoordinatorRunnable, Duration> coordinatorRunnable : coordinatorRunnables) {
ScheduledExecutors.scheduleWithFixedDelay(
exec,
@ -574,10 +574,10 @@ public class DruidCoordinator
@Override
public ScheduledExecutors.Signal call()
{
if (leader) {
if (leader && startingLeaderCounter == leaderCounter) {
theRunnable.run();
}
if (leader) { // (We might no longer be leader)
if (leader && startingLeaderCounter == leaderCounter) { // (We might no longer be leader)
return ScheduledExecutors.Signal.REPEAT;
} else {
return ScheduledExecutors.Signal.STOP;
@ -610,6 +610,8 @@ public class DruidCoordinator
{
synchronized (lock) {
try {
leaderCounter++;
log.info("I am no longer the leader...");
for (String server : loadManagementPeons.keySet()) {
@ -739,11 +741,10 @@ public class DruidCoordinator
DruidCoordinatorRuntimeParams.newBuilder()
.withStartTime(startTime)
.withDatasources(databaseSegmentManager.getInventory())
.withDynamicConfigs(dynamicConfigs.get())
.withDynamicConfigs(getDynamicConfigs())
.withEmitter(emitter)
.build();
for (DruidCoordinatorHelper helper : helpers) {
params = helper.run(params);
}
@ -777,7 +778,7 @@ public class DruidCoordinator
DruidServer input
)
{
return !input.isRealtime();
return input.isAssignable();
}
}
);
@ -826,7 +827,6 @@ public class DruidCoordinator
.withLoadManagementPeons(loadManagementPeons)
.withSegmentReplicantLookup(segmentReplicantLookup)
.withBalancerReferenceTimestamp(DateTime.now())
.withDynamicConfigs(dynamicConfigs.get())
.build();
}
},

View File

@ -88,7 +88,17 @@ public class SegmentReplicantLookup
return (retVal == null) ? Maps.<String, Integer>newHashMap() : retVal;
}
public int getClusterReplicants(String segmentId, String tier)
public int getLoadedReplicants(String segmentId)
{
Map<String, Integer> allTiers = segmentsInCluster.row(segmentId);
int retVal = 0;
for (Integer replicants : allTiers.values()) {
retVal += replicants;
}
return retVal;
}
public int getLoadedReplicants(String segmentId, String tier)
{
Integer retVal = segmentsInCluster.get(segmentId, tier);
return (retVal == null) ? 0 : retVal;
@ -100,8 +110,23 @@ public class SegmentReplicantLookup
return (retVal == null) ? 0 : retVal;
}
public int getLoadingReplicants(String segmentId)
{
Map<String, Integer> allTiers = loadingSegments.row(segmentId);
int retVal = 0;
for (Integer replicants : allTiers.values()) {
retVal += replicants;
}
return retVal;
}
public int getTotalReplicants(String segmentId)
{
return getLoadedReplicants(segmentId) + getLoadingReplicants(segmentId);
}
public int getTotalReplicants(String segmentId, String tier)
{
return getClusterReplicants(segmentId, tier) + getLoadingReplicants(segmentId, tier);
return getLoadedReplicants(segmentId, tier) + getLoadingReplicants(segmentId, tier);
}
}

View File

@ -42,6 +42,8 @@ import java.util.Map;
public abstract class LoadRule implements Rule
{
private static final EmittingLogger log = new EmittingLogger(LoadRule.class);
private static final String assignedCount = "assignedCount";
private static final String droppedCount = "droppedCount";
@Override
public CoordinatorStats run(DruidCoordinator coordinator, DruidCoordinatorRuntimeParams params, DataSegment segment)
@ -49,13 +51,17 @@ public abstract class LoadRule implements Rule
CoordinatorStats stats = new CoordinatorStats();
final Map<String, Integer> loadStatus = Maps.newHashMap();
int totalReplicantsInCluster = params.getSegmentReplicantLookup().getTotalReplicants(segment.getIdentifier());
for (Map.Entry<String, Integer> entry : getTieredReplicants().entrySet()) {
final String tier = entry.getKey();
final int expectedReplicants = entry.getValue();
final int expectedReplicantsInTier = entry.getValue();
final int totalReplicantsInTier = params.getSegmentReplicantLookup()
.getTotalReplicants(segment.getIdentifier(), tier);
final int loadedReplicantsInTier = params.getSegmentReplicantLookup()
.getLoadedReplicants(segment.getIdentifier(), tier);
int totalReplicants = params.getSegmentReplicantLookup().getTotalReplicants(segment.getIdentifier(), tier);
MinMaxPriorityQueue<ServerHolder> serverQueue = params.getDruidCluster().getServersByTier(tier);
final MinMaxPriorityQueue<ServerHolder> serverQueue = params.getDruidCluster().getServersByTier(tier);
if (serverQueue == null) {
log.makeAlert("Tier[%s] has no servers! Check your cluster configuration!", tier).emit();
return stats;
@ -65,22 +71,21 @@ public abstract class LoadRule implements Rule
final DateTime referenceTimestamp = params.getBalancerReferenceTimestamp();
final BalancerStrategy strategy = params.getBalancerStrategyFactory().createBalancerStrategy(referenceTimestamp);
if (params.getAvailableSegments().contains(segment)) {
stats.accumulate(
assign(
CoordinatorStats assignStats = assign(
params.getReplicationManager(),
tier,
expectedReplicants,
totalReplicants,
totalReplicantsInCluster,
expectedReplicantsInTier,
totalReplicantsInTier,
strategy,
serverHolderList,
segment
)
);
stats.accumulate(assignStats);
totalReplicantsInCluster += assignStats.getPerTierStats().get(assignedCount).get(tier).get();
}
int clusterReplicants = params.getSegmentReplicantLookup()
.getClusterReplicants(segment.getIdentifier(), tier);
loadStatus.put(tier, expectedReplicants - clusterReplicants);
loadStatus.put(tier, expectedReplicantsInTier - loadedReplicantsInTier);
}
// Remove over-replication
stats.accumulate(drop(loadStatus, segment, params));
@ -92,18 +97,21 @@ public abstract class LoadRule implements Rule
private CoordinatorStats assign(
final ReplicationThrottler replicationManager,
final String tier,
final int expectedReplicants,
int totalReplicants,
final int totalReplicantsInCluster,
final int expectedReplicantsInTier,
final int totalReplicantsInTier,
final BalancerStrategy strategy,
final List<ServerHolder> serverHolderList,
final DataSegment segment
)
{
final CoordinatorStats stats = new CoordinatorStats();
stats.addToTieredStat("assignedCount", tier, 0);
stats.addToTieredStat(assignedCount, tier, 0);
while (totalReplicants < expectedReplicants) {
boolean replicate = totalReplicants > 0;
int currReplicantsInTier = totalReplicantsInTier;
int currTotalReplicantsInCluster = totalReplicantsInCluster;
while (currReplicantsInTier < expectedReplicantsInTier) {
boolean replicate = currTotalReplicantsInCluster > 0;
if (replicate && !replicationManager.canCreateReplicant(tier)) {
break;
@ -116,7 +124,7 @@ public abstract class LoadRule implements Rule
"Not enough [%s] servers or node capacity to assign segment[%s]! Expected Replicants[%d]",
tier,
segment.getIdentifier(),
expectedReplicants
expectedReplicantsInTier
);
break;
}
@ -143,8 +151,9 @@ public abstract class LoadRule implements Rule
}
);
stats.addToTieredStat("assignedCount", tier, 1);
++totalReplicants;
stats.addToTieredStat(assignedCount, tier, 1);
++currReplicantsInTier;
++currTotalReplicantsInCluster;
}
return stats;
@ -162,7 +171,7 @@ public abstract class LoadRule implements Rule
return stats;
}
// Make sure we have enough actual replicants in the correct tiers in the cluster before doing anything
// Make sure we have enough loaded replicants in the correct tiers in the cluster before doing anything
for (Integer leftToLoad : loadStatus.values()) {
if (leftToLoad > 0) {
return stats;
@ -176,10 +185,10 @@ public abstract class LoadRule implements Rule
for (Map.Entry<String, Integer> entry : replicantsByTier.entrySet()) {
final String tier = entry.getKey();
int actualNumReplicantsForTier = entry.getValue();
int loadedNumReplicantsForTier = entry.getValue();
int expectedNumReplicantsForTier = getNumReplicants(tier);
stats.addToTieredStat("droppedCount", tier, 0);
stats.addToTieredStat(droppedCount, tier, 0);
MinMaxPriorityQueue<ServerHolder> serverQueue = params.getDruidCluster().get(tier);
if (serverQueue == null) {
@ -188,7 +197,7 @@ public abstract class LoadRule implements Rule
}
List<ServerHolder> droppedServers = Lists.newArrayList();
while (actualNumReplicantsForTier > expectedNumReplicantsForTier) {
while (loadedNumReplicantsForTier > expectedNumReplicantsForTier) {
final ServerHolder holder = serverQueue.pollLast();
if (holder == null) {
log.warn("Wtf, holder was null? I have no servers serving [%s]?", segment.getIdentifier());
@ -224,8 +233,8 @@ public abstract class LoadRule implements Rule
}
}
);
--actualNumReplicantsForTier;
stats.addToTieredStat("droppedCount", tier, 1);
--loadedNumReplicantsForTier;
stats.addToTieredStat(droppedCount, tier, 1);
}
droppedServers.add(holder);
}

View File

@ -8,7 +8,6 @@ $(document).ready(function() {
function handleTable(dontDisplay)
{
console.log(type);
$.get(basePath + type + '?full', function(data) {
buildTable(data, $('#result_table'), dontDisplay);
@ -76,7 +75,6 @@ $(document).ready(function() {
}
$('#view_button').click(function() {
console.log("here");
type = $('#select_type').val() + '';
view = $('#select_view').val() + '';

View File

@ -979,6 +979,94 @@ public class DruidCoordinatorRuleRunnerTest
EasyMock.verify(mockPeon);
}
/**
* Nodes:
* hot - nothing loaded
* _default_tier - 1 segment loaded
*
* @throws Exception
*/
@Test
public void testReplicantThrottleAcrossTiers() throws Exception
{
mockPeon.loadSegment(EasyMock.<DataSegment>anyObject(), EasyMock.<LoadPeonCallback>anyObject());
EasyMock.expectLastCall().atLeastOnce();
EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.<DataSegment>newHashSet()).atLeastOnce();
EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(0L).atLeastOnce();
EasyMock.replay(mockPeon);
EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.<String>anyObject())).andReturn(
Lists.<Rule>newArrayList(
new IntervalLoadRule(
new Interval("2012-01-01T00:00:00.000Z/2013-01-01T00:00:00.000Z"),
ImmutableMap.<String, Integer>of(
"hot", 1,
DruidServer.DEFAULT_TIER, 1
),
null,
null
)
)
).atLeastOnce();
EasyMock.replay(databaseRuleManager);
DruidCluster druidCluster = new DruidCluster(
ImmutableMap.of(
"hot",
MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create(
Arrays.asList(
new ServerHolder(
new DruidServer(
"serverHot",
"hostHot",
1000,
"historical",
"hot",
0
),
mockPeon
)
)
),
DruidServer.DEFAULT_TIER,
MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create(
Arrays.asList(
new ServerHolder(
new DruidServer(
"serverNorm",
"hostNorm",
1000,
"historical",
DruidServer.DEFAULT_TIER,
0
),
mockPeon
)
)
)
)
);
DruidCoordinatorRuntimeParams params =
new DruidCoordinatorRuntimeParams.Builder()
.withDruidCluster(druidCluster)
.withAvailableSegments(availableSegments)
.withDatabaseRuleManager(databaseRuleManager)
.withSegmentReplicantLookup(SegmentReplicantLookup.make(new DruidCluster()))
.build();
DruidCoordinatorRuleRunner runner = new DruidCoordinatorRuleRunner(new ReplicationThrottler(7, 1), coordinator);
DruidCoordinatorRuntimeParams afterParams = runner.run(params);
CoordinatorStats stats = afterParams.getCoordinatorStats();
Assert.assertTrue(stats.getPerTierStats().get("assignedCount").get("hot").get() == 24);
Assert.assertTrue(stats.getPerTierStats().get("assignedCount").get(DruidServer.DEFAULT_TIER).get() == 7);
Assert.assertTrue(stats.getPerTierStats().get("unassignedCount") == null);
Assert.assertTrue(stats.getPerTierStats().get("unassignedSize") == null);
EasyMock.verify(mockPeon);
}
@Test
public void testDropReplicantThrottle() throws Exception
{

View File

@ -59,7 +59,7 @@ public class LoadRuleTest
public void setUp() throws Exception
{
mockPeon = EasyMock.createMock(LoadQueuePeon.class);
throttler = new ReplicationThrottler(1, 1);
throttler = new ReplicationThrottler(2, 1);
for (String tier : Arrays.asList("hot", DruidServer.DEFAULT_TIER)) {
throttler.updateReplicationState(tier);
throttler.updateTerminationState(tier);

View File

@ -27,7 +27,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.53-SNAPSHOT</version>
<version>0.6.57-SNAPSHOT</version>
</parent>
<dependencies>