mirror of https://github.com/apache/druid.git
Merge branch 'master' into indexing_refactor
Conflicts: merger/src/test/java/com/metamx/druid/merger/coordinator/RemoteTaskRunnerTest.java server/src/main/java/com/metamx/druid/master/DruidMaster.java
This commit is contained in:
commit
111df9a90e
|
@ -28,7 +28,7 @@
|
|||
<parent>
|
||||
<groupId>com.metamx</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.2.3-SNAPSHOT</version>
|
||||
<version>0.2.7-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
|
|
@ -48,6 +48,8 @@ import java.util.Map;
|
|||
public class DataSegment implements Comparable<DataSegment>
|
||||
{
|
||||
public static String delimiter = "_";
|
||||
private final Integer binaryVersion;
|
||||
|
||||
public static String makeDataSegmentIdentifier(
|
||||
String dataSource,
|
||||
DateTime start,
|
||||
|
@ -89,6 +91,7 @@ public class DataSegment implements Comparable<DataSegment>
|
|||
@JsonProperty("dimensions") @JsonDeserialize(using = CommaListJoinDeserializer.class) List<String> dimensions,
|
||||
@JsonProperty("metrics") @JsonDeserialize(using = CommaListJoinDeserializer.class) List<String> metrics,
|
||||
@JsonProperty("shardSpec") ShardSpec shardSpec,
|
||||
@JsonProperty("binaryVersion") Integer binaryVersion,
|
||||
@JsonProperty("size") long size
|
||||
)
|
||||
{
|
||||
|
@ -112,6 +115,7 @@ public class DataSegment implements Comparable<DataSegment>
|
|||
? ImmutableList.<String>of()
|
||||
: ImmutableList.copyOf(Iterables.filter(metrics, nonEmpty));
|
||||
this.shardSpec = (shardSpec == null) ? new NoneShardSpec() : shardSpec;
|
||||
this.binaryVersion = binaryVersion;
|
||||
this.size = size;
|
||||
|
||||
this.identifier = makeDataSegmentIdentifier(
|
||||
|
@ -172,6 +176,12 @@ public class DataSegment implements Comparable<DataSegment>
|
|||
return shardSpec;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public Integer getBinaryVersion()
|
||||
{
|
||||
return binaryVersion;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public long getSize()
|
||||
{
|
||||
|
@ -209,6 +219,11 @@ public class DataSegment implements Comparable<DataSegment>
|
|||
return builder(this).version(version).build();
|
||||
}
|
||||
|
||||
public DataSegment withBinaryVersion(int binaryVersion)
|
||||
{
|
||||
return builder(this).binaryVersion(binaryVersion).build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int compareTo(DataSegment dataSegment)
|
||||
{
|
||||
|
@ -287,6 +302,7 @@ public class DataSegment implements Comparable<DataSegment>
|
|||
private List<String> dimensions;
|
||||
private List<String> metrics;
|
||||
private ShardSpec shardSpec;
|
||||
private Integer binaryVersion;
|
||||
private long size;
|
||||
|
||||
public Builder()
|
||||
|
@ -307,6 +323,7 @@ public class DataSegment implements Comparable<DataSegment>
|
|||
this.dimensions = segment.getDimensions();
|
||||
this.metrics = segment.getMetrics();
|
||||
this.shardSpec = segment.getShardSpec();
|
||||
this.binaryVersion = segment.getBinaryVersion();
|
||||
this.size = segment.getSize();
|
||||
}
|
||||
|
||||
|
@ -352,6 +369,12 @@ public class DataSegment implements Comparable<DataSegment>
|
|||
return this;
|
||||
}
|
||||
|
||||
public Builder binaryVersion(Integer binaryVersion)
|
||||
{
|
||||
this.binaryVersion = binaryVersion;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder size(long size)
|
||||
{
|
||||
this.size = size;
|
||||
|
@ -374,6 +397,7 @@ public class DataSegment implements Comparable<DataSegment>
|
|||
dimensions,
|
||||
metrics,
|
||||
shardSpec,
|
||||
binaryVersion,
|
||||
size
|
||||
);
|
||||
}
|
||||
|
|
|
@ -6,7 +6,7 @@ import org.skife.config.Default;
|
|||
public abstract class MemcachedCacheConfig
|
||||
{
|
||||
@Config("${prefix}.expiration")
|
||||
@Default("31536000")
|
||||
@Default("2592000")
|
||||
public abstract int getExpiration();
|
||||
|
||||
@Config("${prefix}.timeout")
|
||||
|
@ -17,8 +17,10 @@ public abstract class MemcachedCacheConfig
|
|||
public abstract String getHosts();
|
||||
|
||||
@Config("${prefix}.maxObjectSize")
|
||||
@Default("52428800")
|
||||
public abstract int getMaxObjectSize();
|
||||
|
||||
@Config("${prefix}.memcachedPrefix")
|
||||
@Default("druid")
|
||||
public abstract String getMemcachedPrefix();
|
||||
}
|
||||
|
|
|
@ -112,7 +112,8 @@ class StoppedPhoneBook implements PhoneBook
|
|||
}
|
||||
|
||||
if (! serviceAnnouncements.containsKey(nodeName)) {
|
||||
throw new IAE("Cannot unannounce node[%s] on service[%s]", nodeName, serviceName);
|
||||
log.warn("Cannot unannounce[%s]: it doesn't exist for service[%s]", nodeName, serviceName);
|
||||
return;
|
||||
}
|
||||
|
||||
serviceAnnouncements.remove(nodeName);
|
||||
|
|
|
@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableList;
|
|||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Sets;
|
||||
import com.metamx.druid.index.v1.IndexIO;
|
||||
import com.metamx.druid.jackson.DefaultObjectMapper;
|
||||
import com.metamx.druid.shard.NoneShardSpec;
|
||||
import com.metamx.druid.shard.SingleDimensionShardSpec;
|
||||
|
@ -60,12 +61,13 @@ public class DataSegmentTest
|
|||
Arrays.asList("dim1", "dim2"),
|
||||
Arrays.asList("met1", "met2"),
|
||||
new NoneShardSpec(),
|
||||
IndexIO.CURRENT_VERSION_ID,
|
||||
1
|
||||
);
|
||||
|
||||
final Map<String, Object> objectMap = mapper.readValue(mapper.writeValueAsString(segment), new TypeReference<Map<String, Object>>(){});
|
||||
|
||||
Assert.assertEquals(9, objectMap.size());
|
||||
Assert.assertEquals(10, objectMap.size());
|
||||
Assert.assertEquals("something", objectMap.get("dataSource"));
|
||||
Assert.assertEquals(interval.toString(), objectMap.get("interval"));
|
||||
Assert.assertEquals("1", objectMap.get("version"));
|
||||
|
@ -73,6 +75,7 @@ public class DataSegmentTest
|
|||
Assert.assertEquals("dim1,dim2", objectMap.get("dimensions"));
|
||||
Assert.assertEquals("met1,met2", objectMap.get("metrics"));
|
||||
Assert.assertEquals(ImmutableMap.of("type", "none"), objectMap.get("shardSpec"));
|
||||
Assert.assertEquals(IndexIO.CURRENT_VERSION_ID, objectMap.get("binaryVersion"));
|
||||
Assert.assertEquals(1, objectMap.get("size"));
|
||||
|
||||
DataSegment deserializedSegment = mapper.readValue(mapper.writeValueAsString(segment), DataSegment.class);
|
||||
|
|
|
@ -28,7 +28,7 @@
|
|||
<parent>
|
||||
<groupId>com.metamx</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.2.3-SNAPSHOT</version>
|
||||
<version>0.2.7-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
|
|
@ -24,11 +24,11 @@
|
|||
<artifactId>druid-services</artifactId>
|
||||
<name>druid-services</name>
|
||||
<description>druid-services</description>
|
||||
<version>0.2.3-SNAPSHOT</version>
|
||||
<version>0.2.7-SNAPSHOT</version>
|
||||
<parent>
|
||||
<groupId>com.metamx</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.2.3-SNAPSHOT</version>
|
||||
<version>0.2.7-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
|
|
@ -28,7 +28,7 @@
|
|||
<parent>
|
||||
<groupId>com.metamx</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.2.3-SNAPSHOT</version>
|
||||
<version>0.2.7-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<modules>
|
||||
|
|
|
@ -9,7 +9,7 @@
|
|||
<parent>
|
||||
<groupId>com.metamx</groupId>
|
||||
<artifactId>druid-examples</artifactId>
|
||||
<version>0.2.3-SNAPSHOT</version>
|
||||
<version>0.2.7-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
|
|
@ -9,7 +9,7 @@
|
|||
<parent>
|
||||
<groupId>com.metamx</groupId>
|
||||
<artifactId>druid-examples</artifactId>
|
||||
<version>0.2.3-SNAPSHOT</version>
|
||||
<version>0.2.7-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
|
|
@ -28,7 +28,7 @@
|
|||
<parent>
|
||||
<groupId>com.metamx</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.2.3-SNAPSHOT</version>
|
||||
<version>0.2.7-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
|
|
@ -109,7 +109,7 @@ public class IndexIO
|
|||
|
||||
|
||||
private static volatile IndexIOHandler handler = null;
|
||||
public static final byte CURRENT_VERSION_ID = 0x8;
|
||||
public static final int CURRENT_VERSION_ID = V9_VERSION;
|
||||
|
||||
public static Index readIndex(File inDir) throws IOException
|
||||
{
|
||||
|
@ -170,7 +170,7 @@ public class IndexIO
|
|||
}
|
||||
}
|
||||
|
||||
private static int getVersionFromDir(File inDir) throws IOException
|
||||
public static int getVersionFromDir(File inDir) throws IOException
|
||||
{
|
||||
File versionFile = new File(inDir, "version.bin");
|
||||
if (versionFile.exists()) {
|
||||
|
|
|
@ -28,7 +28,7 @@
|
|||
<parent>
|
||||
<groupId>com.metamx</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.2.3-SNAPSHOT</version>
|
||||
<version>0.2.7-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
|
|
@ -146,7 +146,7 @@ public class DeterminePartitionsJob implements Jobby
|
|||
log.info("Job %s submitted, status available at: %s", groupByJob.getJobName(), groupByJob.getTrackingURL());
|
||||
|
||||
if(!groupByJob.waitForCompletion(true)) {
|
||||
log.error("Job failed: %s", groupByJob.getJobID().toString());
|
||||
log.error("Job failed: %s", groupByJob.getJobID());
|
||||
return false;
|
||||
}
|
||||
} else {
|
||||
|
|
|
@ -393,6 +393,7 @@ public class IndexGeneratorJob implements Jobby
|
|||
dimensionNames,
|
||||
metricNames,
|
||||
config.getShardSpec(bucket).getActualSpec(),
|
||||
IndexIO.getVersionFromDir(mergedBase),
|
||||
size
|
||||
);
|
||||
|
||||
|
|
|
@ -28,7 +28,7 @@
|
|||
<parent>
|
||||
<groupId>com.metamx</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.2.3-SNAPSHOT</version>
|
||||
<version>0.2.7-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
|
|
@ -83,9 +83,7 @@ public class YeOldePlumberSchool implements PlumberSchool
|
|||
final Sink theSink = new Sink(interval, schema);
|
||||
|
||||
// Temporary directory to hold spilled segments.
|
||||
final File persistDir = new File(
|
||||
tmpSegmentDir, theSink.getSegment().withVersion(version).getIdentifier()
|
||||
);
|
||||
final File persistDir = new File(tmpSegmentDir, theSink.getSegment().withVersion(version).getIdentifier());
|
||||
|
||||
// Set of spilled segments. Will be merged at the end.
|
||||
final Set<File> spilled = Sets.newHashSet();
|
||||
|
@ -144,7 +142,8 @@ public class YeOldePlumberSchool implements PlumberSchool
|
|||
|
||||
final DataSegment segmentToUpload = theSink.getSegment()
|
||||
.withDimensions(ImmutableList.copyOf(mappedSegment.getAvailableDimensions()))
|
||||
.withVersion(version);
|
||||
.withVersion(version)
|
||||
.withBinaryVersion(IndexIO.getVersionFromDir(fileToUpload));
|
||||
|
||||
segmentPusher.push(fileToUpload, segmentToUpload);
|
||||
|
||||
|
|
|
@ -128,17 +128,19 @@ public abstract class MergeTask extends AbstractTask
|
|||
final long startTime = System.currentTimeMillis();
|
||||
|
||||
log.info(
|
||||
"Starting merge of id[%s], segments: %s", getId(), Lists.transform(
|
||||
segments,
|
||||
new Function<DataSegment, String>()
|
||||
{
|
||||
@Override
|
||||
public String apply(@Nullable DataSegment input)
|
||||
{
|
||||
return input.getIdentifier();
|
||||
}
|
||||
}
|
||||
)
|
||||
"Starting merge of id[%s], segments: %s",
|
||||
getId(),
|
||||
Lists.transform(
|
||||
segments,
|
||||
new Function<DataSegment, String>()
|
||||
{
|
||||
@Override
|
||||
public String apply(@Nullable DataSegment input)
|
||||
{
|
||||
return input.getIdentifier();
|
||||
}
|
||||
}
|
||||
)
|
||||
);
|
||||
|
||||
|
||||
|
|
|
@ -63,7 +63,7 @@ public interface Task
|
|||
|
||||
/**
|
||||
* Execute preflight checks for a task. This typically runs on the coordinator, and will be run while
|
||||
* holding a lock on our dataSouce and interval. If this method throws an exception, the task should be
|
||||
* holding a lock on our dataSource and interval. If this method throws an exception, the task should be
|
||||
* considered a failure.
|
||||
*
|
||||
* @param context Context for this task, gathered under indexer lock
|
||||
|
|
|
@ -0,0 +1,39 @@
|
|||
package com.metamx.druid.merger.common.task;
|
||||
|
||||
import com.metamx.druid.merger.common.TaskStatus;
|
||||
import com.metamx.druid.merger.common.TaskToolbox;
|
||||
import com.metamx.druid.merger.coordinator.TaskContext;
|
||||
import org.codehaus.jackson.annotate.JsonProperty;
|
||||
import org.joda.time.DateTime;
|
||||
import org.joda.time.Interval;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class V8toV9UpgradeTask extends AbstractTask
|
||||
{
|
||||
public V8toV9UpgradeTask(
|
||||
@JsonProperty("dataSource") String dataSource,
|
||||
@JsonProperty("interval") Interval interval
|
||||
)
|
||||
{
|
||||
super(
|
||||
String.format("v8tov9_%s_%s_%s", dataSource, interval.toString().replace("/", "_"), new DateTime()),
|
||||
dataSource,
|
||||
interval
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Type getType()
|
||||
{
|
||||
throw new UnsupportedOperationException("Do we really need to return a Type?");
|
||||
}
|
||||
|
||||
@Override
|
||||
public TaskStatus run(
|
||||
TaskContext context, TaskToolbox toolbox
|
||||
) throws Exception
|
||||
{
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
}
|
|
@ -32,12 +32,14 @@ import java.util.Map;
|
|||
|
||||
public class MergeTaskTest
|
||||
{
|
||||
final List<DataSegment> segments =
|
||||
ImmutableList
|
||||
.<DataSegment>builder()
|
||||
.add(new DataSegment("foo", new Interval("2012-01-04/2012-01-06"), "V1", null, null, null, null, -1))
|
||||
.add(new DataSegment("foo", new Interval("2012-01-05/2012-01-07"), "V1", null, null, null, null, -1))
|
||||
.add(new DataSegment("foo", new Interval("2012-01-03/2012-01-05"), "V1", null, null, null, null, -1))
|
||||
private final DataSegment.Builder segmentBuilder = DataSegment.builder()
|
||||
.dataSource("foo")
|
||||
.version("V1");
|
||||
|
||||
final List<DataSegment> segments = ImmutableList.<DataSegment>builder()
|
||||
.add(segmentBuilder.interval(new Interval("2012-01-04/2012-01-06")).build())
|
||||
.add(segmentBuilder.interval(new Interval("2012-01-05/2012-01-07")).build())
|
||||
.add(segmentBuilder.interval(new Interval("2012-01-03/2012-01-05")).build())
|
||||
.build();
|
||||
|
||||
final MergeTask testMergeTask = new MergeTask("foo", segments)
|
||||
|
|
|
@ -13,12 +13,9 @@ import com.metamx.druid.merger.common.TaskStatus;
|
|||
import com.metamx.druid.merger.common.TaskToolbox;
|
||||
import com.metamx.druid.merger.common.config.IndexerZkConfig;
|
||||
import com.metamx.druid.merger.common.config.TaskConfig;
|
||||
import com.metamx.druid.merger.common.task.DefaultMergeTask;
|
||||
import com.metamx.druid.merger.common.task.Task;
|
||||
import com.metamx.druid.merger.coordinator.config.RemoteTaskRunnerConfig;
|
||||
import com.metamx.druid.merger.coordinator.config.RetryPolicyConfig;
|
||||
import com.metamx.druid.merger.coordinator.scaling.AutoScalingData;
|
||||
import com.metamx.druid.merger.coordinator.scaling.AutoScalingStrategy;
|
||||
import com.metamx.druid.merger.coordinator.setup.WorkerSetupData;
|
||||
import com.metamx.druid.merger.coordinator.setup.WorkerSetupManager;
|
||||
import com.metamx.druid.merger.worker.TaskMonitor;
|
||||
|
@ -31,8 +28,6 @@ import com.netflix.curator.retry.ExponentialBackoffRetry;
|
|||
import com.netflix.curator.test.TestingCluster;
|
||||
import org.apache.commons.lang.mutable.MutableBoolean;
|
||||
import org.apache.zookeeper.CreateMode;
|
||||
import org.codehaus.jackson.annotate.JsonProperty;
|
||||
import org.codehaus.jackson.annotate.JsonTypeName;
|
||||
import org.codehaus.jackson.map.ObjectMapper;
|
||||
import org.codehaus.jackson.map.jsontype.NamedType;
|
||||
import org.easymock.EasyMock;
|
||||
|
@ -45,7 +40,7 @@ import org.junit.Before;
|
|||
import org.junit.Test;
|
||||
|
||||
import java.io.File;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
|
||||
|
@ -113,6 +108,7 @@ public class RemoteTaskRunnerTest
|
|||
null,
|
||||
null,
|
||||
null,
|
||||
0,
|
||||
0
|
||||
)
|
||||
), Lists.<AggregatorFactory>newArrayList()
|
||||
|
@ -143,13 +139,25 @@ public class RemoteTaskRunnerTest
|
|||
@Test
|
||||
public void testAlreadyExecutedTask() throws Exception
|
||||
{
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
remoteTaskRunner.run(
|
||||
task1,
|
||||
new TaskContext(
|
||||
new DateTime().toString(),
|
||||
Sets.<DataSegment>newHashSet(),
|
||||
Sets.<DataSegment>newHashSet()
|
||||
),
|
||||
new TestTask(
|
||||
task1.getId(),
|
||||
task1.getDataSource(),
|
||||
Lists.<DataSegment>newArrayList(),
|
||||
Lists.<AggregatorFactory>newArrayList()
|
||||
)
|
||||
{
|
||||
@Override
|
||||
public TaskStatus run(
|
||||
TaskContext context, TaskToolbox toolbox, TaskCallback callback
|
||||
) throws Exception
|
||||
{
|
||||
latch.await();
|
||||
return super.run(context, toolbox, callback);
|
||||
}
|
||||
},
|
||||
new TaskContext(new DateTime().toString(), Sets.<DataSegment>newHashSet(), Sets.<DataSegment>newHashSet()),
|
||||
null
|
||||
);
|
||||
try {
|
||||
|
@ -162,10 +170,11 @@ public class RemoteTaskRunnerTest
|
|||
),
|
||||
null
|
||||
);
|
||||
latch.countDown();
|
||||
fail("ISE expected");
|
||||
}
|
||||
catch (ISE expected) {
|
||||
|
||||
latch.countDown();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -187,6 +196,7 @@ public class RemoteTaskRunnerTest
|
|||
null,
|
||||
null,
|
||||
null,
|
||||
0,
|
||||
0
|
||||
)
|
||||
), Lists.<AggregatorFactory>newArrayList()
|
||||
|
|
|
@ -439,6 +439,7 @@ public class TaskQueueTest
|
|||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
-1
|
||||
)
|
||||
)
|
||||
|
|
2
pom.xml
2
pom.xml
|
@ -23,7 +23,7 @@
|
|||
<groupId>com.metamx</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<packaging>pom</packaging>
|
||||
<version>0.2.3-SNAPSHOT</version>
|
||||
<version>0.2.7-SNAPSHOT</version>
|
||||
<name>druid</name>
|
||||
<description>druid</description>
|
||||
<scm>
|
||||
|
|
|
@ -28,7 +28,7 @@
|
|||
<parent>
|
||||
<groupId>com.metamx</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.2.3-SNAPSHOT</version>
|
||||
<version>0.2.7-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<properties>
|
||||
|
|
|
@ -30,6 +30,7 @@ import com.metamx.common.logger.Logger;
|
|||
import com.metamx.druid.aggregation.AggregatorFactory;
|
||||
import com.metamx.druid.client.DataSegment;
|
||||
import com.metamx.druid.index.v1.IncrementalIndex;
|
||||
import com.metamx.druid.index.v1.IndexIO;
|
||||
import com.metamx.druid.input.InputRow;
|
||||
import org.joda.time.Interval;
|
||||
|
||||
|
@ -134,6 +135,7 @@ public class Sink implements Iterable<FireHydrant>
|
|||
}
|
||||
}),
|
||||
schema.getShardSpec(),
|
||||
null,
|
||||
0
|
||||
);
|
||||
}
|
||||
|
|
|
@ -28,7 +28,7 @@
|
|||
<parent>
|
||||
<groupId>com.metamx</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.2.3-SNAPSHOT</version>
|
||||
<version>0.2.7-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
|
|
@ -126,7 +126,9 @@ public class ComputeNode extends BaseServerNode<ComputeNode>
|
|||
getConfigFactory().buildWithReplacements(
|
||||
ExecutorServiceConfig.class, ImmutableMap.of("base_path", "druid.processing")
|
||||
)
|
||||
), emitter, new ServiceMetricEvent.Builder()
|
||||
),
|
||||
emitter,
|
||||
new ServiceMetricEvent.Builder()
|
||||
);
|
||||
|
||||
final ServerManager serverManager = new ServerManager(segmentLoader, conglomerate, emitter, executorService);
|
||||
|
|
|
@ -407,7 +407,7 @@ public class IndexMerger
|
|||
try {
|
||||
fileOutputStream = new FileOutputStream(indexFile);
|
||||
channel = fileOutputStream.getChannel();
|
||||
channel.write(ByteBuffer.wrap(new byte[]{IndexIO.CURRENT_VERSION_ID}));
|
||||
channel.write(ByteBuffer.wrap(new byte[]{IndexIO.V8_VERSION}));
|
||||
|
||||
GenericIndexed.fromIterable(mergedDimensions, GenericIndexed.stringStrategy).writeToChannel(channel);
|
||||
GenericIndexed.fromIterable(mergedMetrics, GenericIndexed.stringStrategy).writeToChannel(channel);
|
||||
|
@ -770,7 +770,7 @@ public class IndexMerger
|
|||
}
|
||||
|
||||
createIndexDrdFile(
|
||||
IndexIO.CURRENT_VERSION_ID,
|
||||
IndexIO.V8_VERSION,
|
||||
v8OutDir,
|
||||
GenericIndexed.fromIterable(mergedDimensions, GenericIndexed.stringStrategy),
|
||||
GenericIndexed.fromIterable(mergedMetrics, GenericIndexed.stringStrategy),
|
||||
|
|
|
@ -25,6 +25,7 @@ import com.google.common.io.Closeables;
|
|||
import com.metamx.common.ISE;
|
||||
import com.metamx.common.StreamUtils;
|
||||
import com.metamx.druid.client.DataSegment;
|
||||
import com.metamx.druid.index.v1.IndexIO;
|
||||
import com.metamx.emitter.EmittingLogger;
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
|
@ -63,9 +64,9 @@ public class S3SegmentPusher implements SegmentPusher
|
|||
}
|
||||
|
||||
@Override
|
||||
public DataSegment push(File file, DataSegment segment) throws IOException
|
||||
public DataSegment push(final File indexFilesDir, DataSegment segment) throws IOException
|
||||
{
|
||||
log.info("Uploading [%s] to S3", file);
|
||||
log.info("Uploading [%s] to S3", indexFilesDir);
|
||||
String outputKey = JOINER.join(
|
||||
config.getBaseKey().isEmpty() ? null : config.getBaseKey(),
|
||||
segment.getDataSource(),
|
||||
|
@ -78,8 +79,6 @@ public class S3SegmentPusher implements SegmentPusher
|
|||
segment.getShardSpec().getPartitionNum()
|
||||
);
|
||||
|
||||
File indexFilesDir = file;
|
||||
|
||||
long indexSize = 0;
|
||||
final File zipOutFile = File.createTempFile("druid", "index.zip");
|
||||
ZipOutputStream zipOut = null;
|
||||
|
@ -110,14 +109,15 @@ public class S3SegmentPusher implements SegmentPusher
|
|||
log.info("Pushing %s.", toPush);
|
||||
s3Client.putObject(outputBucket, toPush);
|
||||
|
||||
DataSegment outputSegment = segment.withSize(indexSize)
|
||||
.withLoadSpec(
|
||||
ImmutableMap.<String, Object>of(
|
||||
"type", "s3_zip",
|
||||
"bucket", outputBucket,
|
||||
"key", toPush.getKey()
|
||||
)
|
||||
);
|
||||
segment = segment.withSize(indexSize)
|
||||
.withLoadSpec(
|
||||
ImmutableMap.<String, Object>of(
|
||||
"type", "s3_zip",
|
||||
"bucket", outputBucket,
|
||||
"key", toPush.getKey()
|
||||
)
|
||||
)
|
||||
.withBinaryVersion(IndexIO.getVersionFromDir(indexFilesDir));
|
||||
|
||||
File descriptorFile = File.createTempFile("druid", "descriptor.json");
|
||||
StreamUtils.copyToFileAndClose(new ByteArrayInputStream(jsonMapper.writeValueAsBytes(segment)), descriptorFile);
|
||||
|
@ -137,7 +137,7 @@ public class S3SegmentPusher implements SegmentPusher
|
|||
log.info("Deleting descriptor file[%s]", descriptorFile);
|
||||
descriptorFile.delete();
|
||||
|
||||
return outputSegment;
|
||||
return segment;
|
||||
}
|
||||
catch (NoSuchAlgorithmException e) {
|
||||
throw new IOException(e);
|
||||
|
|
|
@ -21,7 +21,6 @@ package com.metamx.druid.master;
|
|||
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.base.Predicate;
|
||||
import com.google.common.base.Throwables;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.Iterables;
|
||||
|
@ -43,12 +42,9 @@ import com.metamx.druid.client.ServerInventoryManager;
|
|||
import com.metamx.druid.coordination.DruidClusterInfo;
|
||||
import com.metamx.druid.db.DatabaseRuleManager;
|
||||
import com.metamx.druid.db.DatabaseSegmentManager;
|
||||
import com.metamx.druid.merge.ClientKillQuery;
|
||||
import com.metamx.emitter.EmittingLogger;
|
||||
import com.metamx.emitter.service.ServiceEmitter;
|
||||
import com.metamx.emitter.service.ServiceMetricEvent;
|
||||
import com.metamx.http.client.HttpClient;
|
||||
import com.metamx.http.client.response.HttpResponseHandler;
|
||||
import com.metamx.phonebook.PhoneBook;
|
||||
import com.metamx.phonebook.PhoneBookPeon;
|
||||
import com.netflix.curator.x.discovery.ServiceProvider;
|
||||
|
@ -57,7 +53,6 @@ import org.codehaus.jackson.map.ObjectMapper;
|
|||
import org.joda.time.Duration;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.net.URL;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
@ -91,8 +86,6 @@ public class DruidMaster
|
|||
private final PhoneBookPeon masterPeon;
|
||||
private final Map<String, LoadQueuePeon> loadManagementPeons;
|
||||
private final ServiceProvider serviceProvider;
|
||||
private final HttpClient httpClient;
|
||||
private final HttpResponseHandler<StringBuilder, String> responseHandler;
|
||||
|
||||
private final ObjectMapper jsonMapper;
|
||||
|
||||
|
@ -107,9 +100,7 @@ public class DruidMaster
|
|||
ServiceEmitter emitter,
|
||||
ScheduledExecutorFactory scheduledExecutorFactory,
|
||||
Map<String, LoadQueuePeon> loadManagementPeons,
|
||||
ServiceProvider serviceProvider,
|
||||
HttpClient httpClient,
|
||||
HttpResponseHandler<StringBuilder, String> responseHandler
|
||||
ServiceProvider serviceProvider
|
||||
)
|
||||
{
|
||||
this.config = config;
|
||||
|
@ -130,9 +121,6 @@ public class DruidMaster
|
|||
this.loadManagementPeons = loadManagementPeons;
|
||||
|
||||
this.serviceProvider = serviceProvider;
|
||||
|
||||
this.httpClient = httpClient;
|
||||
this.responseHandler = responseHandler;
|
||||
}
|
||||
|
||||
public boolean isClusterMaster()
|
||||
|
@ -208,27 +196,6 @@ public class DruidMaster
|
|||
databaseSegmentManager.enableDatasource(ds);
|
||||
}
|
||||
|
||||
public void killSegments(ClientKillQuery killQuery)
|
||||
{
|
||||
try {
|
||||
httpClient.post(
|
||||
new URL(
|
||||
String.format(
|
||||
"http://%s:%s/mmx/merger/v1/index",
|
||||
serviceProvider.getInstance().getAddress(),
|
||||
serviceProvider.getInstance().getPort()
|
||||
)
|
||||
)
|
||||
)
|
||||
.setContent("application/json", jsonMapper.writeValueAsBytes(killQuery))
|
||||
.go(responseHandler)
|
||||
.get();
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
}
|
||||
|
||||
public void moveSegment(String from, String to, String segmentName, final LoadPeonCallback callback)
|
||||
{
|
||||
final DruidServer fromServer = serverInventoryManager.getInventoryValue(from);
|
||||
|
@ -623,21 +590,20 @@ public class DruidMaster
|
|||
public DruidMasterRuntimeParams run(DruidMasterRuntimeParams params)
|
||||
{
|
||||
// Display info about all historical servers
|
||||
Iterable<DruidServer> servers =
|
||||
FunctionalIterable.create(serverInventoryManager.getInventory())
|
||||
.filter(
|
||||
new Predicate<DruidServer>()
|
||||
{
|
||||
@Override
|
||||
public boolean apply(
|
||||
@Nullable DruidServer input
|
||||
)
|
||||
{
|
||||
return input.getType()
|
||||
.equalsIgnoreCase("historical");
|
||||
}
|
||||
}
|
||||
);
|
||||
Iterable<DruidServer> servers =FunctionalIterable
|
||||
.create(serverInventoryManager.getInventory())
|
||||
.filter(
|
||||
new Predicate<DruidServer>()
|
||||
{
|
||||
@Override
|
||||
public boolean apply(
|
||||
@Nullable DruidServer input
|
||||
)
|
||||
{
|
||||
return input.getType().equalsIgnoreCase("historical");
|
||||
}
|
||||
}
|
||||
);
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug("Servers");
|
||||
for (DruidServer druidServer : servers) {
|
||||
|
@ -695,14 +661,18 @@ public class DruidMaster
|
|||
decrementRemovedSegmentsLifetime();
|
||||
|
||||
return params.buildFromExisting()
|
||||
.withLoadManagementPeons(loadManagementPeons)
|
||||
.withDruidCluster(cluster)
|
||||
.withDatabaseRuleManager(databaseRuleManager)
|
||||
.withLoadManagementPeons(loadManagementPeons)
|
||||
.withSegmentReplicantLookup(segmentReplicantLookup)
|
||||
.build();
|
||||
}
|
||||
},
|
||||
new DruidMasterRuleRunner(DruidMaster.this),
|
||||
new DruidMasterRuleRunner(
|
||||
DruidMaster.this,
|
||||
config.getReplicantLifetime(),
|
||||
config.getReplicantThrottleLimit()
|
||||
),
|
||||
new DruidMasterCleanup(DruidMaster.this),
|
||||
new DruidMasterBalancer(DruidMaster.this, new BalancerAnalyzer()),
|
||||
new DruidMasterLogger()
|
||||
|
@ -718,14 +688,7 @@ public class DruidMaster
|
|||
super(
|
||||
ImmutableList.of(
|
||||
new DruidMasterSegmentInfoLoader(DruidMaster.this),
|
||||
new DruidMasterSegmentMerger(
|
||||
new HttpMergerClient(
|
||||
httpClient,
|
||||
responseHandler,
|
||||
jsonMapper,
|
||||
serviceProvider
|
||||
)
|
||||
),
|
||||
new DruidMasterSegmentMerger(jsonMapper, serviceProvider),
|
||||
new DruidMasterHelper()
|
||||
{
|
||||
@Override
|
||||
|
|
|
@ -19,8 +19,6 @@
|
|||
|
||||
package com.metamx.druid.master;
|
||||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.collect.MinMaxPriorityQueue;
|
||||
import com.google.common.collect.Sets;
|
||||
|
@ -30,7 +28,6 @@ import com.metamx.druid.client.DruidServer;
|
|||
import com.metamx.emitter.EmittingLogger;
|
||||
|
||||
import java.util.Comparator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.TreeSet;
|
||||
|
@ -71,13 +68,10 @@ public class DruidMasterBalancer implements DruidMasterHelper
|
|||
for (BalancerSegmentHolder holder : currentlyMovingSegments.get(tier).values()) {
|
||||
holder.reduceLifetime();
|
||||
if (holder.getLifetime() <= 0) {
|
||||
log.makeAlert(
|
||||
"[%s]: Balancer move segments queue has a segment stuck",
|
||||
tier,
|
||||
ImmutableMap.<String, Object>builder()
|
||||
.put("segment", holder.getSegment().getIdentifier())
|
||||
.build()
|
||||
).emit();
|
||||
log.makeAlert("[%s]: Balancer move segments queue has a segment stuck", tier)
|
||||
.addData("segment", holder.getSegment().getIdentifier())
|
||||
.addData("server", holder.getServer())
|
||||
.emit();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -97,11 +91,7 @@ public class DruidMasterBalancer implements DruidMasterHelper
|
|||
|
||||
if (!currentlyMovingSegments.get(tier).isEmpty()) {
|
||||
reduceLifetimes(tier);
|
||||
log.info(
|
||||
"[%s]: Still waiting on %,d segments to be moved",
|
||||
tier,
|
||||
currentlyMovingSegments.size()
|
||||
);
|
||||
log.info("[%s]: Still waiting on %,d segments to be moved", tier, currentlyMovingSegments.size());
|
||||
continue;
|
||||
}
|
||||
|
||||
|
@ -110,8 +100,7 @@ public class DruidMasterBalancer implements DruidMasterHelper
|
|||
|
||||
if (serversByPercentUsed.size() <= 1) {
|
||||
log.info(
|
||||
"[%s]: No unique values found for highest and lowest percent used servers: nothing to balance",
|
||||
tier
|
||||
"[%s]: No unique values found for highest and lowest percent used servers: nothing to balance", tier
|
||||
);
|
||||
continue;
|
||||
}
|
||||
|
@ -175,12 +164,7 @@ public class DruidMasterBalancer implements DruidMasterHelper
|
|||
if (!toPeon.getSegmentsToLoad().contains(segmentToMove) &&
|
||||
(server.getSegment(segmentName) == null) &&
|
||||
new ServerHolder(server, toPeon).getAvailableSize() > segmentToMove.getSize()) {
|
||||
log.info(
|
||||
"Moving [%s] from [%s] to [%s]",
|
||||
segmentName,
|
||||
fromServer,
|
||||
toServer
|
||||
);
|
||||
log.info("Moving [%s] from [%s] to [%s]", segmentName, fromServer, toServer);
|
||||
try {
|
||||
master.moveSegment(
|
||||
fromServer,
|
||||
|
|
|
@ -80,4 +80,12 @@ public abstract class DruidMasterConfig
|
|||
{
|
||||
return Integer.MAX_VALUE;
|
||||
}
|
||||
|
||||
@Config("druid.master.replicant.lifetime")
|
||||
@Default("15")
|
||||
public abstract int getReplicantLifetime();
|
||||
|
||||
@Config("druid.master.replicant.throttleLimit")
|
||||
@Default("10")
|
||||
public abstract int getReplicantThrottleLimit();
|
||||
}
|
||||
|
|
|
@ -33,11 +33,14 @@ public class DruidMasterRuleRunner implements DruidMasterHelper
|
|||
{
|
||||
private static final EmittingLogger log = new EmittingLogger(DruidMasterRuleRunner.class);
|
||||
|
||||
private final ReplicationThrottler replicatorThrottler;
|
||||
|
||||
private final DruidMaster master;
|
||||
|
||||
public DruidMasterRuleRunner(DruidMaster master)
|
||||
public DruidMasterRuleRunner(DruidMaster master, int replicantLifeTime, int replicantThrottleLimit)
|
||||
{
|
||||
this.master = master;
|
||||
this.replicatorThrottler = new ReplicationThrottler(replicantThrottleLimit, replicantLifeTime);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -51,16 +54,25 @@ public class DruidMasterRuleRunner implements DruidMasterHelper
|
|||
return params;
|
||||
}
|
||||
|
||||
for (String tier : cluster.getTierNames()) {
|
||||
replicatorThrottler.updateReplicationState(tier);
|
||||
replicatorThrottler.updateTerminationState(tier);
|
||||
}
|
||||
|
||||
DruidMasterRuntimeParams paramsWithReplicationManager = params.buildFromExisting()
|
||||
.withReplicationManager(replicatorThrottler)
|
||||
.build();
|
||||
|
||||
// Run through all matched rules for available segments
|
||||
DateTime now = new DateTime();
|
||||
DatabaseRuleManager databaseRuleManager = params.getDatabaseRuleManager();
|
||||
for (DataSegment segment : params.getAvailableSegments()) {
|
||||
DatabaseRuleManager databaseRuleManager = paramsWithReplicationManager.getDatabaseRuleManager();
|
||||
for (DataSegment segment : paramsWithReplicationManager.getAvailableSegments()) {
|
||||
List<Rule> rules = databaseRuleManager.getRulesWithDefault(segment.getDataSource());
|
||||
|
||||
boolean foundMatchingRule = false;
|
||||
for (Rule rule : rules) {
|
||||
if (rule.appliesTo(segment, now)) {
|
||||
stats.accumulate(rule.run(master, params, segment));
|
||||
stats.accumulate(rule.run(master, paramsWithReplicationManager, segment));
|
||||
foundMatchingRule = true;
|
||||
break;
|
||||
}
|
||||
|
@ -76,8 +88,8 @@ public class DruidMasterRuleRunner implements DruidMasterHelper
|
|||
}
|
||||
}
|
||||
|
||||
return params.buildFromExisting()
|
||||
.withMasterStats(stats)
|
||||
.build();
|
||||
return paramsWithReplicationManager.buildFromExisting()
|
||||
.withMasterStats(stats)
|
||||
.build();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -25,7 +25,6 @@ import com.metamx.common.guava.Comparators;
|
|||
import com.metamx.druid.client.DataSegment;
|
||||
import com.metamx.druid.client.DruidDataSource;
|
||||
import com.metamx.druid.db.DatabaseRuleManager;
|
||||
import com.metamx.druid.master.rules.RuleMap;
|
||||
import com.metamx.emitter.service.ServiceEmitter;
|
||||
|
||||
import java.util.Collection;
|
||||
|
@ -44,6 +43,7 @@ public class DruidMasterRuntimeParams
|
|||
private final Set<DruidDataSource> dataSources;
|
||||
private final Set<DataSegment> availableSegments;
|
||||
private final Map<String, LoadQueuePeon> loadManagementPeons;
|
||||
private final ReplicationThrottler replicationManager;
|
||||
private final ServiceEmitter emitter;
|
||||
private final long millisToWaitBeforeDeleting;
|
||||
private final MasterStats stats;
|
||||
|
@ -58,6 +58,7 @@ public class DruidMasterRuntimeParams
|
|||
Set<DruidDataSource> dataSources,
|
||||
Set<DataSegment> availableSegments,
|
||||
Map<String, LoadQueuePeon> loadManagementPeons,
|
||||
ReplicationThrottler replicationManager,
|
||||
ServiceEmitter emitter,
|
||||
long millisToWaitBeforeDeleting,
|
||||
MasterStats stats,
|
||||
|
@ -72,6 +73,7 @@ public class DruidMasterRuntimeParams
|
|||
this.dataSources = dataSources;
|
||||
this.availableSegments = availableSegments;
|
||||
this.loadManagementPeons = loadManagementPeons;
|
||||
this.replicationManager = replicationManager;
|
||||
this.emitter = emitter;
|
||||
this.millisToWaitBeforeDeleting = millisToWaitBeforeDeleting;
|
||||
this.stats = stats;
|
||||
|
@ -114,6 +116,11 @@ public class DruidMasterRuntimeParams
|
|||
return loadManagementPeons;
|
||||
}
|
||||
|
||||
public ReplicationThrottler getReplicationManager()
|
||||
{
|
||||
return replicationManager;
|
||||
}
|
||||
|
||||
public ServiceEmitter getEmitter()
|
||||
{
|
||||
return emitter;
|
||||
|
@ -159,6 +166,7 @@ public class DruidMasterRuntimeParams
|
|||
dataSources,
|
||||
availableSegments,
|
||||
loadManagementPeons,
|
||||
replicationManager,
|
||||
emitter,
|
||||
millisToWaitBeforeDeleting,
|
||||
stats,
|
||||
|
@ -176,6 +184,7 @@ public class DruidMasterRuntimeParams
|
|||
private final Set<DruidDataSource> dataSources;
|
||||
private final Set<DataSegment> availableSegments;
|
||||
private final Map<String, LoadQueuePeon> loadManagementPeons;
|
||||
private ReplicationThrottler replicationManager;
|
||||
private ServiceEmitter emitter;
|
||||
private long millisToWaitBeforeDeleting;
|
||||
private MasterStats stats;
|
||||
|
@ -191,6 +200,7 @@ public class DruidMasterRuntimeParams
|
|||
this.dataSources = Sets.newHashSet();
|
||||
this.availableSegments = Sets.newTreeSet(Comparators.inverse(DataSegment.bucketMonthComparator()));
|
||||
this.loadManagementPeons = Maps.newHashMap();
|
||||
this.replicationManager = null;
|
||||
this.emitter = null;
|
||||
this.millisToWaitBeforeDeleting = 0;
|
||||
this.stats = new MasterStats();
|
||||
|
@ -206,6 +216,7 @@ public class DruidMasterRuntimeParams
|
|||
Set<DruidDataSource> dataSources,
|
||||
Set<DataSegment> availableSegments,
|
||||
Map<String, LoadQueuePeon> loadManagementPeons,
|
||||
ReplicationThrottler replicationManager,
|
||||
ServiceEmitter emitter,
|
||||
long millisToWaitBeforeDeleting,
|
||||
MasterStats stats,
|
||||
|
@ -220,6 +231,7 @@ public class DruidMasterRuntimeParams
|
|||
this.dataSources = dataSources;
|
||||
this.availableSegments = availableSegments;
|
||||
this.loadManagementPeons = loadManagementPeons;
|
||||
this.replicationManager = replicationManager;
|
||||
this.emitter = emitter;
|
||||
this.millisToWaitBeforeDeleting = millisToWaitBeforeDeleting;
|
||||
this.stats = stats;
|
||||
|
@ -237,6 +249,7 @@ public class DruidMasterRuntimeParams
|
|||
dataSources,
|
||||
availableSegments,
|
||||
loadManagementPeons,
|
||||
replicationManager,
|
||||
emitter,
|
||||
millisToWaitBeforeDeleting,
|
||||
stats,
|
||||
|
@ -287,6 +300,12 @@ public class DruidMasterRuntimeParams
|
|||
return this;
|
||||
}
|
||||
|
||||
public Builder withReplicationManager(ReplicationThrottler replicationManager)
|
||||
{
|
||||
this.replicationManager = replicationManager;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder withEmitter(ServiceEmitter emitter)
|
||||
{
|
||||
this.emitter = emitter;
|
||||
|
|
|
@ -0,0 +1,172 @@
|
|||
/*
|
||||
* Druid - a distributed column store.
|
||||
* Copyright (C) 2012 Metamarkets Group Inc.
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU General Public License
|
||||
* as published by the Free Software Foundation; either version 2
|
||||
* of the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package com.metamx.druid.master;
|
||||
|
||||
import com.google.common.collect.Maps;
|
||||
import com.metamx.emitter.EmittingLogger;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentSkipListSet;
|
||||
|
||||
/**
|
||||
* The ReplicationThrottler is used to throttle the number of replicants that are created and destroyed.
|
||||
*/
|
||||
public class ReplicationThrottler
|
||||
{
|
||||
private static final EmittingLogger log = new EmittingLogger(ReplicationThrottler.class);
|
||||
private final int maxReplicants;
|
||||
private final int maxLifetime;
|
||||
|
||||
private final Map<String, Boolean> replicatingLookup = Maps.newHashMap();
|
||||
private final Map<String, Boolean> terminatingLookup = Maps.newHashMap();
|
||||
private final ReplicatorSegmentHolder currentlyReplicating = new ReplicatorSegmentHolder();
|
||||
private final ReplicatorSegmentHolder currentlyTerminating = new ReplicatorSegmentHolder();
|
||||
|
||||
public ReplicationThrottler(int maxReplicants, int maxLifetime)
|
||||
{
|
||||
this.maxReplicants = maxReplicants;
|
||||
this.maxLifetime = maxLifetime;
|
||||
}
|
||||
|
||||
public void updateReplicationState(String tier)
|
||||
{
|
||||
update(tier, currentlyReplicating, replicatingLookup, "create");
|
||||
}
|
||||
|
||||
public void updateTerminationState(String tier)
|
||||
{
|
||||
update(tier, currentlyTerminating, terminatingLookup, "terminate");
|
||||
}
|
||||
|
||||
private void update(String tier, ReplicatorSegmentHolder holder, Map<String, Boolean> lookup, String type)
|
||||
{
|
||||
int size = holder.getNumProcessing(tier);
|
||||
if (size != 0) {
|
||||
log.info(
|
||||
"[%s]: Replicant %s queue still has %d segments. Lifetime[%d]",
|
||||
tier,
|
||||
type,
|
||||
size,
|
||||
holder.getLifetime(tier)
|
||||
);
|
||||
holder.reduceLifetime(tier);
|
||||
lookup.put(tier, false);
|
||||
|
||||
if (holder.getLifetime(tier) < 0) {
|
||||
log.makeAlert("[%s]: Replicant %s queue stuck after %d+ runs!", tier, type, maxLifetime).emit();
|
||||
}
|
||||
} else {
|
||||
log.info("[%s]: Replicant %s queue is empty.", tier, type);
|
||||
lookup.put(tier, true);
|
||||
holder.resetLifetime(tier);
|
||||
}
|
||||
}
|
||||
|
||||
public boolean canAddReplicant(String tier)
|
||||
{
|
||||
return replicatingLookup.get(tier);
|
||||
}
|
||||
|
||||
public boolean canDestroyReplicant(String tier)
|
||||
{
|
||||
return terminatingLookup.get(tier);
|
||||
}
|
||||
|
||||
public boolean registerReplicantCreation(String tier, String segmentId)
|
||||
{
|
||||
return currentlyReplicating.addSegment(tier, segmentId);
|
||||
}
|
||||
|
||||
public void unregisterReplicantCreation(String tier, String segmentId)
|
||||
{
|
||||
currentlyReplicating.removeSegment(tier, segmentId);
|
||||
}
|
||||
|
||||
public boolean registerReplicantTermination(String tier, String segmentId)
|
||||
{
|
||||
return currentlyTerminating.addSegment(tier, segmentId);
|
||||
}
|
||||
|
||||
public void unregisterReplicantTermination(String tier, String segmentId)
|
||||
{
|
||||
currentlyTerminating.removeSegment(tier, segmentId);
|
||||
}
|
||||
|
||||
private class ReplicatorSegmentHolder
|
||||
{
|
||||
private final Map<String, ConcurrentSkipListSet<String>> currentlyProcessingSegments = Maps.newHashMap();
|
||||
private final Map<String, Integer> lifetimes = Maps.newHashMap();
|
||||
|
||||
public boolean addSegment(String tier, String segmentId)
|
||||
{
|
||||
ConcurrentSkipListSet<String> segments = currentlyProcessingSegments.get(tier);
|
||||
if (segments == null) {
|
||||
segments = new ConcurrentSkipListSet<String>();
|
||||
currentlyProcessingSegments.put(tier, segments);
|
||||
}
|
||||
if (segments.size() < maxReplicants) {
|
||||
segments.add(segmentId);
|
||||
return true;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
public void removeSegment(String tier, String segmentId)
|
||||
{
|
||||
Set<String> segments = currentlyProcessingSegments.get(tier);
|
||||
if (segments != null) {
|
||||
segments.remove(segmentId);
|
||||
}
|
||||
}
|
||||
|
||||
public int getNumProcessing(String tier)
|
||||
{
|
||||
Set<String> segments = currentlyProcessingSegments.get(tier);
|
||||
return (segments == null) ? 0 : segments.size();
|
||||
}
|
||||
|
||||
public int getLifetime(String tier)
|
||||
{
|
||||
Integer lifetime = lifetimes.get(tier);
|
||||
if (lifetime == null) {
|
||||
lifetime = maxLifetime;
|
||||
lifetimes.put(tier, lifetime);
|
||||
}
|
||||
return lifetime;
|
||||
}
|
||||
|
||||
public void reduceLifetime(String tier)
|
||||
{
|
||||
Integer lifetime = lifetimes.get(tier);
|
||||
if (lifetime == null) {
|
||||
lifetime = maxLifetime;
|
||||
lifetimes.put(tier, lifetime);
|
||||
}
|
||||
lifetimes.put(tier, --lifetime);
|
||||
}
|
||||
|
||||
public void resetLifetime(String tier)
|
||||
{
|
||||
lifetimes.put(tier, maxLifetime);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -27,6 +27,7 @@ import com.metamx.druid.master.DruidMaster;
|
|||
import com.metamx.druid.master.DruidMasterRuntimeParams;
|
||||
import com.metamx.druid.master.LoadPeonCallback;
|
||||
import com.metamx.druid.master.MasterStats;
|
||||
import com.metamx.druid.master.ReplicationThrottler;
|
||||
import com.metamx.druid.master.ServerHolder;
|
||||
import com.metamx.emitter.EmittingLogger;
|
||||
|
||||
|
@ -55,17 +56,18 @@ public abstract class LoadRule implements Rule
|
|||
return stats;
|
||||
}
|
||||
|
||||
stats.accumulate(assign(expectedReplicants, totalReplicants, serverQueue, segment));
|
||||
stats.accumulate(assign(params.getReplicationManager(), expectedReplicants, totalReplicants, serverQueue, segment));
|
||||
stats.accumulate(drop(expectedReplicants, clusterReplicants, segment, params));
|
||||
|
||||
return stats;
|
||||
}
|
||||
|
||||
private MasterStats assign(
|
||||
final ReplicationThrottler replicationManager,
|
||||
int expectedReplicants,
|
||||
int totalReplicants,
|
||||
MinMaxPriorityQueue<ServerHolder> serverQueue,
|
||||
DataSegment segment
|
||||
final DataSegment segment
|
||||
)
|
||||
{
|
||||
MasterStats stats = new MasterStats();
|
||||
|
@ -109,6 +111,14 @@ public abstract class LoadRule implements Rule
|
|||
break;
|
||||
}
|
||||
|
||||
if (totalReplicants > 0) { // don't throttle if there's only 1 copy of this segment in the cluster
|
||||
if (!replicationManager.canAddReplicant(getTier()) ||
|
||||
!replicationManager.registerReplicantCreation(getTier(), segment.getIdentifier())) {
|
||||
serverQueue.add(holder);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
holder.getPeon().loadSegment(
|
||||
segment,
|
||||
new LoadPeonCallback()
|
||||
|
@ -116,6 +126,7 @@ public abstract class LoadRule implements Rule
|
|||
@Override
|
||||
protected void execute()
|
||||
{
|
||||
replicationManager.unregisterReplicantCreation(getTier(), segment.getIdentifier());
|
||||
}
|
||||
}
|
||||
);
|
||||
|
@ -132,11 +143,12 @@ public abstract class LoadRule implements Rule
|
|||
private MasterStats drop(
|
||||
int expectedReplicants,
|
||||
int clusterReplicants,
|
||||
DataSegment segment,
|
||||
DruidMasterRuntimeParams params
|
||||
final DataSegment segment,
|
||||
final DruidMasterRuntimeParams params
|
||||
)
|
||||
{
|
||||
MasterStats stats = new MasterStats();
|
||||
final ReplicationThrottler replicationManager = params.getReplicationManager();
|
||||
|
||||
if (!params.hasDeletionWaitTimeElapsed()) {
|
||||
return stats;
|
||||
|
@ -168,6 +180,14 @@ public abstract class LoadRule implements Rule
|
|||
break;
|
||||
}
|
||||
|
||||
if (expectedNumReplicantsForType > 0) { // don't throttle unless we are removing extra replicants
|
||||
if (!replicationManager.canDestroyReplicant(getTier()) ||
|
||||
!replicationManager.registerReplicantTermination(getTier(), segment.getIdentifier())) {
|
||||
serverQueue.add(holder);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (holder.isServingSegment(segment)) {
|
||||
holder.getPeon().dropSegment(
|
||||
segment,
|
||||
|
@ -176,6 +196,7 @@ public abstract class LoadRule implements Rule
|
|||
@Override
|
||||
protected void execute()
|
||||
{
|
||||
replicationManager.unregisterReplicantTermination(getTier(), segment.getIdentifier());
|
||||
}
|
||||
}
|
||||
);
|
||||
|
|
|
@ -21,6 +21,7 @@ package com.metamx.druid.coordination;
|
|||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.metamx.druid.client.DataSegment;
|
||||
import com.metamx.druid.index.v1.IndexIO;
|
||||
import com.metamx.druid.jackson.DefaultObjectMapper;
|
||||
import com.metamx.druid.shard.NoneShardSpec;
|
||||
import org.codehaus.jackson.map.ObjectMapper;
|
||||
|
@ -52,6 +53,7 @@ public class SegmentChangeRequestDropTest
|
|||
Arrays.asList("dim1", "dim2"),
|
||||
Arrays.asList("met1", "met2"),
|
||||
new NoneShardSpec(),
|
||||
IndexIO.CURRENT_VERSION_ID,
|
||||
1
|
||||
);
|
||||
|
||||
|
@ -61,7 +63,7 @@ public class SegmentChangeRequestDropTest
|
|||
mapper.writeValueAsString(segmentDrop), new TypeReference<Map<String, Object>>(){}
|
||||
);
|
||||
|
||||
Assert.assertEquals(10, objectMap.size());
|
||||
Assert.assertEquals(11, objectMap.size());
|
||||
Assert.assertEquals("drop", objectMap.get("action"));
|
||||
Assert.assertEquals("something", objectMap.get("dataSource"));
|
||||
Assert.assertEquals(interval.toString(), objectMap.get("interval"));
|
||||
|
@ -70,6 +72,7 @@ public class SegmentChangeRequestDropTest
|
|||
Assert.assertEquals("dim1,dim2", objectMap.get("dimensions"));
|
||||
Assert.assertEquals("met1,met2", objectMap.get("metrics"));
|
||||
Assert.assertEquals(ImmutableMap.of("type", "none"), objectMap.get("shardSpec"));
|
||||
Assert.assertEquals(IndexIO.CURRENT_VERSION_ID, objectMap.get("binaryVersion"));
|
||||
Assert.assertEquals(1, objectMap.get("size"));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -21,6 +21,7 @@ package com.metamx.druid.coordination;
|
|||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.metamx.druid.client.DataSegment;
|
||||
import com.metamx.druid.index.v1.IndexIO;
|
||||
import com.metamx.druid.jackson.DefaultObjectMapper;
|
||||
import com.metamx.druid.shard.NoneShardSpec;
|
||||
import org.codehaus.jackson.map.ObjectMapper;
|
||||
|
@ -52,6 +53,7 @@ public class SegmentChangeRequestLoadTest
|
|||
Arrays.asList("dim1", "dim2"),
|
||||
Arrays.asList("met1", "met2"),
|
||||
new NoneShardSpec(),
|
||||
IndexIO.CURRENT_VERSION_ID,
|
||||
1
|
||||
);
|
||||
|
||||
|
@ -61,7 +63,7 @@ public class SegmentChangeRequestLoadTest
|
|||
mapper.writeValueAsString(segmentDrop), new TypeReference<Map<String, Object>>(){}
|
||||
);
|
||||
|
||||
Assert.assertEquals(10, objectMap.size());
|
||||
Assert.assertEquals(11, objectMap.size());
|
||||
Assert.assertEquals("load", objectMap.get("action"));
|
||||
Assert.assertEquals("something", objectMap.get("dataSource"));
|
||||
Assert.assertEquals(interval.toString(), objectMap.get("interval"));
|
||||
|
@ -70,6 +72,7 @@ public class SegmentChangeRequestLoadTest
|
|||
Assert.assertEquals("dim1,dim2", objectMap.get("dimensions"));
|
||||
Assert.assertEquals("met1,met2", objectMap.get("metrics"));
|
||||
Assert.assertEquals(ImmutableMap.of("type", "none"), objectMap.get("shardSpec"));
|
||||
Assert.assertEquals(IndexIO.CURRENT_VERSION_ID, objectMap.get("binaryVersion"));
|
||||
Assert.assertEquals(1, objectMap.get("size"));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -39,6 +39,7 @@ import com.metamx.druid.client.DataSegment;
|
|||
import com.metamx.druid.index.QueryableIndex;
|
||||
import com.metamx.druid.index.Segment;
|
||||
import com.metamx.druid.index.brita.Filter;
|
||||
import com.metamx.druid.index.v1.IndexIO;
|
||||
import com.metamx.druid.index.v1.SegmentIdAttachedStorageAdapter;
|
||||
import com.metamx.druid.index.v1.processing.Cursor;
|
||||
import com.metamx.druid.loading.SegmentLoader;
|
||||
|
@ -238,6 +239,7 @@ public class ServerManagerTest
|
|||
Arrays.asList("dim1", "dim2", "dim3"),
|
||||
Arrays.asList("metric1", "metric2"),
|
||||
new NoneShardSpec(),
|
||||
IndexIO.CURRENT_VERSION_ID,
|
||||
123l
|
||||
)
|
||||
);
|
||||
|
@ -259,6 +261,7 @@ public class ServerManagerTest
|
|||
Arrays.asList("dim1", "dim2", "dim3"),
|
||||
Arrays.asList("metric1", "metric2"),
|
||||
new NoneShardSpec(),
|
||||
IndexIO.CURRENT_VERSION_ID,
|
||||
123l
|
||||
)
|
||||
);
|
||||
|
|
|
@ -27,6 +27,7 @@ import com.metamx.druid.client.DataSegment;
|
|||
import com.metamx.druid.client.DruidServer;
|
||||
import com.metamx.druid.client.DruidServerConfig;
|
||||
import com.metamx.druid.client.ZKPhoneBook;
|
||||
import com.metamx.druid.index.v1.IndexIO;
|
||||
import com.metamx.druid.jackson.DefaultObjectMapper;
|
||||
import com.metamx.druid.loading.NoopSegmentLoader;
|
||||
import com.metamx.druid.metrics.NoopServiceEmitter;
|
||||
|
@ -196,6 +197,7 @@ public class ZkCoordinatorTest
|
|||
Arrays.asList("dim1", "dim2", "dim3"),
|
||||
Arrays.asList("metric1", "metric2"),
|
||||
new NoneShardSpec(),
|
||||
IndexIO.CURRENT_VERSION_ID,
|
||||
123l
|
||||
);
|
||||
}
|
||||
|
|
|
@ -28,6 +28,7 @@ import com.google.common.collect.Sets;
|
|||
import com.metamx.druid.client.DataSegment;
|
||||
import com.metamx.druid.client.DruidServer;
|
||||
import com.metamx.druid.db.DatabaseRuleManager;
|
||||
import com.metamx.druid.index.v1.IndexIO;
|
||||
import com.metamx.druid.master.rules.IntervalDropRule;
|
||||
import com.metamx.druid.master.rules.IntervalLoadRule;
|
||||
import com.metamx.druid.master.rules.Rule;
|
||||
|
@ -78,13 +79,14 @@ public class DruidMasterRuleRunnerTest
|
|||
Lists.<String>newArrayList(),
|
||||
Lists.<String>newArrayList(),
|
||||
new NoneShardSpec(),
|
||||
IndexIO.CURRENT_VERSION_ID,
|
||||
1
|
||||
)
|
||||
);
|
||||
start = start.plusHours(1);
|
||||
}
|
||||
|
||||
ruleRunner = new DruidMasterRuleRunner(master);
|
||||
ruleRunner = new DruidMasterRuleRunner(master, 1, 24);
|
||||
}
|
||||
|
||||
@After
|
||||
|
@ -848,4 +850,187 @@ public class DruidMasterRuleRunnerTest
|
|||
EasyMock.verify(mockPeon);
|
||||
EasyMock.verify(anotherMockPeon);
|
||||
}
|
||||
|
||||
/**
|
||||
* Nodes:
|
||||
* hot - 2 replicants
|
||||
*
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test
|
||||
public void testReplicantThrottle() throws Exception
|
||||
{
|
||||
mockPeon.loadSegment(EasyMock.<DataSegment>anyObject(), EasyMock.<LoadPeonCallback>anyObject());
|
||||
EasyMock.expectLastCall().atLeastOnce();
|
||||
EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.<DataSegment>newHashSet()).atLeastOnce();
|
||||
EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(0L).atLeastOnce();
|
||||
EasyMock.replay(mockPeon);
|
||||
|
||||
EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.<String>anyObject())).andReturn(
|
||||
Lists.<Rule>newArrayList(
|
||||
new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2013-01-01T00:00:00.000Z"), 2, "hot")
|
||||
)
|
||||
).atLeastOnce();
|
||||
EasyMock.replay(databaseRuleManager);
|
||||
|
||||
DruidCluster druidCluster = new DruidCluster(
|
||||
ImmutableMap.of(
|
||||
"hot",
|
||||
MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create(
|
||||
Arrays.asList(
|
||||
new ServerHolder(
|
||||
new DruidServer(
|
||||
"serverHot",
|
||||
"hostHot",
|
||||
1000,
|
||||
"historical",
|
||||
"hot"
|
||||
),
|
||||
mockPeon
|
||||
),
|
||||
new ServerHolder(
|
||||
new DruidServer(
|
||||
"serverHot2",
|
||||
"hostHot2",
|
||||
1000,
|
||||
"historical",
|
||||
"hot"
|
||||
),
|
||||
mockPeon
|
||||
)
|
||||
)
|
||||
)
|
||||
)
|
||||
);
|
||||
|
||||
DruidMasterRuntimeParams params =
|
||||
new DruidMasterRuntimeParams.Builder()
|
||||
.withDruidCluster(druidCluster)
|
||||
.withAvailableSegments(availableSegments)
|
||||
.withDatabaseRuleManager(databaseRuleManager)
|
||||
.withSegmentReplicantLookup(SegmentReplicantLookup.make(new DruidCluster()))
|
||||
.build();
|
||||
|
||||
DruidMasterRuntimeParams afterParams = ruleRunner.run(params);
|
||||
MasterStats stats = afterParams.getMasterStats();
|
||||
|
||||
Assert.assertTrue(stats.getPerTierStats().get("assignedCount").get("hot").get() == 48);
|
||||
Assert.assertTrue(stats.getPerTierStats().get("unassignedCount") == null);
|
||||
Assert.assertTrue(stats.getPerTierStats().get("unassignedSize") == null);
|
||||
|
||||
DataSegment overFlowSegment = new DataSegment(
|
||||
"test",
|
||||
new Interval("2012-02-01/2012-02-02"),
|
||||
new DateTime().toString(),
|
||||
Maps.<String, Object>newHashMap(),
|
||||
Lists.<String>newArrayList(),
|
||||
Lists.<String>newArrayList(),
|
||||
new NoneShardSpec(),
|
||||
1,
|
||||
0
|
||||
);
|
||||
|
||||
afterParams = ruleRunner.run(
|
||||
new DruidMasterRuntimeParams.Builder()
|
||||
.withDruidCluster(druidCluster)
|
||||
.withEmitter(emitter)
|
||||
.withAvailableSegments(Arrays.asList(overFlowSegment))
|
||||
.withDatabaseRuleManager(databaseRuleManager)
|
||||
.withSegmentReplicantLookup(SegmentReplicantLookup.make(new DruidCluster()))
|
||||
.build()
|
||||
);
|
||||
stats = afterParams.getMasterStats();
|
||||
|
||||
Assert.assertTrue(stats.getPerTierStats().get("assignedCount").get("hot").get() == 1);
|
||||
Assert.assertTrue(stats.getPerTierStats().get("unassignedCount") == null);
|
||||
Assert.assertTrue(stats.getPerTierStats().get("unassignedSize") == null);
|
||||
|
||||
EasyMock.verify(mockPeon);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDropReplicantThrottle() throws Exception
|
||||
{
|
||||
mockPeon.dropSegment(EasyMock.<DataSegment>anyObject(), EasyMock.<LoadPeonCallback>anyObject());
|
||||
EasyMock.expectLastCall().atLeastOnce();
|
||||
EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.<DataSegment>newHashSet()).atLeastOnce();
|
||||
EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(0L).atLeastOnce();
|
||||
EasyMock.replay(mockPeon);
|
||||
|
||||
EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.<String>anyObject())).andReturn(
|
||||
Lists.<Rule>newArrayList(
|
||||
new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2013-01-02T00:00:00.000Z"), 1, "normal")
|
||||
)
|
||||
).atLeastOnce();
|
||||
EasyMock.replay(databaseRuleManager);
|
||||
|
||||
DataSegment overFlowSegment = new DataSegment(
|
||||
"test",
|
||||
new Interval("2012-02-01/2012-02-02"),
|
||||
new DateTime().toString(),
|
||||
Maps.<String, Object>newHashMap(),
|
||||
Lists.<String>newArrayList(),
|
||||
Lists.<String>newArrayList(),
|
||||
new NoneShardSpec(),
|
||||
1,
|
||||
0
|
||||
);
|
||||
List<DataSegment> longerAvailableSegments = Lists.newArrayList(availableSegments);
|
||||
longerAvailableSegments.add(overFlowSegment);
|
||||
|
||||
DruidServer server1 = new DruidServer(
|
||||
"serverNorm1",
|
||||
"hostNorm1",
|
||||
1000,
|
||||
"historical",
|
||||
"normal"
|
||||
);
|
||||
for (DataSegment availableSegment : longerAvailableSegments) {
|
||||
server1.addDataSegment(availableSegment.getIdentifier(), availableSegment);
|
||||
}
|
||||
DruidServer server2 = new DruidServer(
|
||||
"serverNorm2",
|
||||
"hostNorm2",
|
||||
1000,
|
||||
"historical",
|
||||
"normal"
|
||||
);
|
||||
for (DataSegment availableSegment : longerAvailableSegments) {
|
||||
server2.addDataSegment(availableSegment.getIdentifier(), availableSegment);
|
||||
}
|
||||
|
||||
DruidCluster druidCluster = new DruidCluster(
|
||||
ImmutableMap.of(
|
||||
"normal",
|
||||
MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create(
|
||||
Arrays.asList(
|
||||
new ServerHolder(
|
||||
server1,
|
||||
mockPeon
|
||||
),
|
||||
new ServerHolder(
|
||||
server2,
|
||||
mockPeon
|
||||
)
|
||||
)
|
||||
)
|
||||
)
|
||||
);
|
||||
|
||||
SegmentReplicantLookup segmentReplicantLookup = SegmentReplicantLookup.make(druidCluster);
|
||||
|
||||
DruidMasterRuntimeParams params = new DruidMasterRuntimeParams.Builder()
|
||||
.withDruidCluster(druidCluster)
|
||||
.withMillisToWaitBeforeDeleting(0L)
|
||||
.withAvailableSegments(longerAvailableSegments)
|
||||
.withDatabaseRuleManager(databaseRuleManager)
|
||||
.withSegmentReplicantLookup(segmentReplicantLookup)
|
||||
.build();
|
||||
|
||||
DruidMasterRuntimeParams afterParams = ruleRunner.run(params);
|
||||
MasterStats stats = afterParams.getMasterStats();
|
||||
|
||||
Assert.assertTrue(stats.getPerTierStats().get("droppedCount").get("normal").get() == 24);
|
||||
EasyMock.verify(mockPeon);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -20,7 +20,6 @@
|
|||
package com.metamx.druid.master;
|
||||
|
||||
import com.metamx.common.concurrent.ScheduledExecutorFactory;
|
||||
import com.metamx.common.logger.Logger;
|
||||
import com.metamx.druid.client.DataSegment;
|
||||
import com.metamx.druid.client.DruidServer;
|
||||
import com.metamx.druid.client.ServerInventoryManager;
|
||||
|
@ -124,6 +123,18 @@ public class DruidMasterTest
|
|||
{
|
||||
return "";
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getReplicantLifetime()
|
||||
{
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getReplicantThrottleLimit()
|
||||
{
|
||||
return 0;
|
||||
}
|
||||
},
|
||||
null,
|
||||
null,
|
||||
|
|
Loading…
Reference in New Issue