diff --git a/client/pom.xml b/client/pom.xml
index a9a2efedf89..e737c55bc20 100644
--- a/client/pom.xml
+++ b/client/pom.xml
@@ -28,7 +28,7 @@
com.metamx
druid
- 0.1.15-SNAPSHOT
+ 0.1.26-SNAPSHOT
diff --git a/client/src/main/java/com/metamx/druid/Query.java b/client/src/main/java/com/metamx/druid/Query.java
index b553dd46855..bd1dc49702a 100644
--- a/client/src/main/java/com/metamx/druid/Query.java
+++ b/client/src/main/java/com/metamx/druid/Query.java
@@ -21,6 +21,7 @@ package com.metamx.druid;
import com.metamx.common.guava.Sequence;
import com.metamx.druid.query.group.GroupByQuery;
+import com.metamx.druid.query.metadata.SegmentMetadataQuery;
import com.metamx.druid.query.search.SearchQuery;
import com.metamx.druid.query.segment.QuerySegmentSpec;
import com.metamx.druid.query.segment.QuerySegmentWalker;
@@ -39,7 +40,8 @@ import java.util.Map;
@JsonSubTypes.Type(name = Query.TIMESERIES, value = TimeseriesQuery.class),
@JsonSubTypes.Type(name = Query.SEARCH, value = SearchQuery.class),
@JsonSubTypes.Type(name = Query.TIME_BOUNDARY, value = TimeBoundaryQuery.class),
- @JsonSubTypes.Type(name = "groupBy", value= GroupByQuery.class)
+ @JsonSubTypes.Type(name = "groupBy", value= GroupByQuery.class),
+ @JsonSubTypes.Type(name = "segmentMetadata", value= SegmentMetadataQuery.class)
})
public interface Query
{
diff --git a/client/src/main/java/com/metamx/druid/http/ClientQuerySegmentWalker.java b/client/src/main/java/com/metamx/druid/http/ClientQuerySegmentWalker.java
index d316e8fcab5..d053d6507a2 100644
--- a/client/src/main/java/com/metamx/druid/http/ClientQuerySegmentWalker.java
+++ b/client/src/main/java/com/metamx/druid/http/ClientQuerySegmentWalker.java
@@ -83,7 +83,7 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker
}
},
toolChest.preMergeQueryDecoration(baseClient)
- )
+ ).withWaitMeasuredFromNow()
)
),
toolChest
diff --git a/client/src/main/java/com/metamx/druid/query/MetricsEmittingExecutorService.java b/client/src/main/java/com/metamx/druid/query/MetricsEmittingExecutorService.java
new file mode 100644
index 00000000000..2e453ddbe0b
--- /dev/null
+++ b/client/src/main/java/com/metamx/druid/query/MetricsEmittingExecutorService.java
@@ -0,0 +1,72 @@
+package com.metamx.druid.query;
+
+import com.metamx.emitter.service.ServiceEmitter;
+import com.metamx.emitter.service.ServiceMetricEvent;
+
+import java.util.List;
+import java.util.concurrent.AbstractExecutorService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+public class MetricsEmittingExecutorService extends AbstractExecutorService
+{
+ private final ExecutorService base;
+ private final ServiceEmitter emitter;
+ private final ServiceMetricEvent.Builder metricBuilder;
+
+ public MetricsEmittingExecutorService(
+ ExecutorService base,
+ ServiceEmitter emitter,
+ ServiceMetricEvent.Builder metricBuilder
+ )
+ {
+ this.base = base;
+ this.emitter = emitter;
+ this.metricBuilder = metricBuilder;
+ }
+
+ @Override
+ public void shutdown()
+ {
+ base.shutdown();
+ }
+
+ @Override
+ public List shutdownNow()
+ {
+ return base.shutdownNow();
+ }
+
+ @Override
+ public boolean isShutdown()
+ {
+ return base.isShutdown();
+ }
+
+ @Override
+ public boolean isTerminated()
+ {
+ return base.isTerminated();
+ }
+
+ @Override
+ public boolean awaitTermination(long l, TimeUnit timeUnit) throws InterruptedException
+ {
+ return base.awaitTermination(l, timeUnit);
+ }
+
+ @Override
+ public void execute(Runnable runnable)
+ {
+ emitMetrics();
+ base.execute(runnable);
+ }
+
+ private void emitMetrics()
+ {
+ if (base instanceof ThreadPoolExecutor) {
+ emitter.emit(metricBuilder.build("exec/backlog", ((ThreadPoolExecutor) base).getQueue().size()));
+ }
+ }
+}
diff --git a/client/src/main/java/com/metamx/druid/query/MetricsEmittingQueryRunner.java b/client/src/main/java/com/metamx/druid/query/MetricsEmittingQueryRunner.java
index 44db0ac3893..9a7111526de 100644
--- a/client/src/main/java/com/metamx/druid/query/MetricsEmittingQueryRunner.java
+++ b/client/src/main/java/com/metamx/druid/query/MetricsEmittingQueryRunner.java
@@ -37,16 +37,33 @@ public class MetricsEmittingQueryRunner implements QueryRunner
private final ServiceEmitter emitter;
private final Function, ServiceMetricEvent.Builder> builderFn;
private final QueryRunner queryRunner;
+ private final long creationTime;
public MetricsEmittingQueryRunner(
ServiceEmitter emitter,
Function, ServiceMetricEvent.Builder> builderFn,
QueryRunner queryRunner
)
+ {
+ this(emitter, builderFn, queryRunner, -1);
+ }
+
+ public MetricsEmittingQueryRunner(
+ ServiceEmitter emitter,
+ Function, ServiceMetricEvent.Builder> builderFn,
+ QueryRunner queryRunner,
+ long creationTime
+ )
{
this.emitter = emitter;
this.builderFn = builderFn;
this.queryRunner = queryRunner;
+ this.creationTime = creationTime;
+ }
+
+ public MetricsEmittingQueryRunner withWaitMeasuredFromNow()
+ {
+ return new MetricsEmittingQueryRunner(emitter, builderFn, queryRunner, System.currentTimeMillis());
}
@Override
@@ -77,6 +94,10 @@ public class MetricsEmittingQueryRunner implements QueryRunner
long timeTaken = System.currentTimeMillis() - startTime;
emitter.emit(builder.build("query/time", timeTaken));
+
+ if(creationTime > 0) {
+ emitter.emit(builder.build("query/wait", startTime - creationTime));
+ }
}
return retVal;
@@ -149,6 +170,10 @@ public class MetricsEmittingQueryRunner implements QueryRunner
long timeTaken = System.currentTimeMillis() - startTime;
emitter.emit(builder.build("query/time", timeTaken));
+ if(creationTime > 0) {
+ emitter.emit(builder.build("query/wait", startTime - creationTime));
+ }
+
yielder.close();
}
};
diff --git a/client/src/main/java/com/metamx/druid/query/Queries.java b/client/src/main/java/com/metamx/druid/query/Queries.java
index 9d8e052c0c0..ae5f958b563 100644
--- a/client/src/main/java/com/metamx/druid/query/Queries.java
+++ b/client/src/main/java/com/metamx/druid/query/Queries.java
@@ -20,6 +20,7 @@
package com.metamx.druid.query;
import com.google.common.base.Function;
+import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
@@ -58,9 +59,12 @@ public class Queries
);
for (PostAggregator postAgg : postAggs) {
+ Set dependencies = postAgg.getDependentFields();
+ Set missing = Sets.difference(dependencies, combinedAggNames);
+
Preconditions.checkArgument(
- postAgg.verifyFields(combinedAggNames),
- String.format("Missing field[%s]", postAgg.getName())
+ missing.isEmpty(),
+ "Missing fields [%s] for postAggregator [%s]", missing, postAgg.getName()
);
combinedAggNames.add(postAgg.getName());
}
diff --git a/client/src/main/java/com/metamx/druid/query/timeboundary/TimeBoundaryQuery.java b/client/src/main/java/com/metamx/druid/query/timeboundary/TimeBoundaryQuery.java
index 541d5342c73..071d7695c0f 100644
--- a/client/src/main/java/com/metamx/druid/query/timeboundary/TimeBoundaryQuery.java
+++ b/client/src/main/java/com/metamx/druid/query/timeboundary/TimeBoundaryQuery.java
@@ -43,8 +43,8 @@ import java.util.Map;
public class TimeBoundaryQuery extends BaseQuery>
{
public static final Interval MY_Y2K_INTERVAL = new Interval(
- new DateTime(Long.MIN_VALUE),
- new DateTime(Long.MAX_VALUE)
+ new DateTime("0000-01-01"),
+ new DateTime("3000-01-01")
);
public static final String MAX_TIME = "maxTime";
public static final String MIN_TIME = "minTime";
diff --git a/client/src/test/java/com/metamx/druid/client/cache/MemcachedCacheBrokerBenchmark.java b/client/src/test/java/com/metamx/druid/client/cache/MemcachedCacheBrokerBenchmark.java
index 702bf7a2a56..2fa7d3b4193 100644
--- a/client/src/test/java/com/metamx/druid/client/cache/MemcachedCacheBrokerBenchmark.java
+++ b/client/src/test/java/com/metamx/druid/client/cache/MemcachedCacheBrokerBenchmark.java
@@ -24,6 +24,8 @@ public class MemcachedCacheBrokerBenchmark extends SimpleBenchmark
private Cache cache;
private static byte[] randBytes;
+ @Param({"localhost:11211"}) String hosts;
+
// object size in kB
@Param({"1", "5", "10", "40"}) int objectSize;
@Param({"100", "1000"}) int objectCount;
@@ -37,6 +39,8 @@ public class MemcachedCacheBrokerBenchmark extends SimpleBenchmark
// disable compression
transcoder.setCompressionThreshold(Integer.MAX_VALUE);
+ System.out.println(String.format("Using memcached hosts [%s]", hosts));
+
client = new MemcachedClient(
new ConnectionFactoryBuilder().setProtocol(ConnectionFactoryBuilder.Protocol.BINARY)
.setHashAlg(DefaultHashAlgorithm.FNV1A_64_HASH)
@@ -46,7 +50,7 @@ public class MemcachedCacheBrokerBenchmark extends SimpleBenchmark
.setTranscoder(transcoder)
.setShouldOptimize(true)
.build(),
- AddrUtil.getAddresses("localhost:11211")
+ AddrUtil.getAddresses(hosts)
);
broker = new MemcachedCacheBroker(
diff --git a/common/pom.xml b/common/pom.xml
index 344e709443b..ec21eb37008 100644
--- a/common/pom.xml
+++ b/common/pom.xml
@@ -28,7 +28,7 @@
com.metamx
druid
- 0.1.15-SNAPSHOT
+ 0.1.26-SNAPSHOT
diff --git a/common/src/main/java/com/metamx/druid/aggregation/post/ArithmeticPostAggregator.java b/common/src/main/java/com/metamx/druid/aggregation/post/ArithmeticPostAggregator.java
index 035d0fa6652..cec19d80d78 100644
--- a/common/src/main/java/com/metamx/druid/aggregation/post/ArithmeticPostAggregator.java
+++ b/common/src/main/java/com/metamx/druid/aggregation/post/ArithmeticPostAggregator.java
@@ -20,6 +20,7 @@
package com.metamx.druid.aggregation.post;
import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
import com.metamx.common.IAE;
import org.codehaus.jackson.annotate.JsonCreator;
import org.codehaus.jackson.annotate.JsonProperty;
@@ -69,14 +70,13 @@ public class ArithmeticPostAggregator implements PostAggregator
}
@Override
- public boolean verifyFields(Set fieldNames)
+ public Set getDependentFields()
{
+ Set dependentFields = Sets.newHashSet();
for (PostAggregator field : fields) {
- if (!field.verifyFields(fieldNames)) {
- return false;
- }
+ dependentFields.addAll(field.getDependentFields());
}
- return true;
+ return dependentFields;
}
@Override
diff --git a/common/src/main/java/com/metamx/druid/aggregation/post/ConstantPostAggregator.java b/common/src/main/java/com/metamx/druid/aggregation/post/ConstantPostAggregator.java
index 645de6c9b80..f1bbb0d8392 100644
--- a/common/src/main/java/com/metamx/druid/aggregation/post/ConstantPostAggregator.java
+++ b/common/src/main/java/com/metamx/druid/aggregation/post/ConstantPostAggregator.java
@@ -19,6 +19,7 @@
package com.metamx.druid.aggregation.post;
+import com.google.common.collect.Sets;
import org.codehaus.jackson.annotate.JsonCreator;
import org.codehaus.jackson.annotate.JsonProperty;
@@ -44,9 +45,9 @@ public class ConstantPostAggregator implements PostAggregator
}
@Override
- public boolean verifyFields(Set fields)
+ public Set getDependentFields()
{
- return true;
+ return Sets.newHashSet();
}
@Override
diff --git a/common/src/main/java/com/metamx/druid/aggregation/post/FieldAccessPostAggregator.java b/common/src/main/java/com/metamx/druid/aggregation/post/FieldAccessPostAggregator.java
index 0a1c866d044..780c720103d 100644
--- a/common/src/main/java/com/metamx/druid/aggregation/post/FieldAccessPostAggregator.java
+++ b/common/src/main/java/com/metamx/druid/aggregation/post/FieldAccessPostAggregator.java
@@ -19,6 +19,7 @@
package com.metamx.druid.aggregation.post;
+import com.google.common.collect.Sets;
import com.metamx.common.ISE;
import org.codehaus.jackson.annotate.JsonCreator;
import org.codehaus.jackson.annotate.JsonProperty;
@@ -45,9 +46,9 @@ public class FieldAccessPostAggregator implements PostAggregator
}
@Override
- public boolean verifyFields(Set fieldNames)
+ public Set getDependentFields()
{
- return fieldNames.contains(fieldName);
+ return Sets.newHashSet(fieldName);
}
@Override
diff --git a/common/src/main/java/com/metamx/druid/aggregation/post/PostAggregator.java b/common/src/main/java/com/metamx/druid/aggregation/post/PostAggregator.java
index 487ac30efb3..5b1ebc60528 100644
--- a/common/src/main/java/com/metamx/druid/aggregation/post/PostAggregator.java
+++ b/common/src/main/java/com/metamx/druid/aggregation/post/PostAggregator.java
@@ -37,7 +37,7 @@ import java.util.Set;
})
public interface PostAggregator
{
- public boolean verifyFields(Set fieldNames);
+ public Set getDependentFields();
public Comparator getComparator();
diff --git a/common/src/test/java/com/metamx/druid/QueryGranularityTest.java b/common/src/test/java/com/metamx/druid/QueryGranularityTest.java
index 18f39ef3a1b..de0a297c569 100644
--- a/common/src/test/java/com/metamx/druid/QueryGranularityTest.java
+++ b/common/src/test/java/com/metamx/druid/QueryGranularityTest.java
@@ -181,7 +181,7 @@ public class QueryGranularityTest
);
}
- //@Test
+ @Test
public void testPeriodDaylightSaving() throws Exception
{
final DateTimeZone tz = DateTimeZone.forID("America/Los_Angeles");
diff --git a/druid-services/pom.xml b/druid-services/pom.xml
index 5a1ad70fb57..81af25085e9 100644
--- a/druid-services/pom.xml
+++ b/druid-services/pom.xml
@@ -24,11 +24,11 @@
druid-services
druid-services
druid-services
- 0.1.15-SNAPSHOT
+ 0.1.26-SNAPSHOT
com.metamx
druid
- 0.1.15-SNAPSHOT
+ 0.1.26-SNAPSHOT
diff --git a/examples/pom.xml b/examples/pom.xml
index 16f4976302c..d1aae11058e 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -28,7 +28,7 @@
com.metamx
druid
- 0.1.15-SNAPSHOT
+ 0.1.26-SNAPSHOT
diff --git a/examples/rand/pom.xml b/examples/rand/pom.xml
index 6cf3453b17e..ae6b1282605 100644
--- a/examples/rand/pom.xml
+++ b/examples/rand/pom.xml
@@ -9,7 +9,7 @@
com.metamx
druid-examples
- 0.1.15-SNAPSHOT
+ 0.1.26-SNAPSHOT
diff --git a/examples/twitter/pom.xml b/examples/twitter/pom.xml
index f0e1e9d4276..40420b8b2f3 100644
--- a/examples/twitter/pom.xml
+++ b/examples/twitter/pom.xml
@@ -9,7 +9,7 @@
com.metamx
druid-examples
- 0.1.15-SNAPSHOT
+ 0.1.26-SNAPSHOT
diff --git a/index-common/pom.xml b/index-common/pom.xml
index 767e0e14685..c5ca45c1002 100644
--- a/index-common/pom.xml
+++ b/index-common/pom.xml
@@ -28,7 +28,7 @@
com.metamx
druid
- 0.1.15-SNAPSHOT
+ 0.1.26-SNAPSHOT
diff --git a/indexer/pom.xml b/indexer/pom.xml
index 8e6ff353a2e..3ab32ed0b90 100644
--- a/indexer/pom.xml
+++ b/indexer/pom.xml
@@ -28,7 +28,7 @@
com.metamx
druid
- 0.1.15-SNAPSHOT
+ 0.1.26-SNAPSHOT
diff --git a/merger/pom.xml b/merger/pom.xml
index f605d10111f..1ea3478f7b9 100644
--- a/merger/pom.xml
+++ b/merger/pom.xml
@@ -28,7 +28,7 @@
com.metamx
druid
- 0.1.15-SNAPSHOT
+ 0.1.26-SNAPSHOT
diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/RemoteTaskRunner.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/RemoteTaskRunner.java
index 35fe72db968..2a235b88d86 100644
--- a/merger/src/main/java/com/metamx/druid/merger/coordinator/RemoteTaskRunner.java
+++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/RemoteTaskRunner.java
@@ -211,10 +211,10 @@ public class RemoteTaskRunner implements TaskRunner
} else {
Duration durSinceLastTerminate = new Duration(new DateTime(), lastTerminateTime);
if (durSinceLastTerminate.isLongerThan(config.getMaxScalingDuration())) {
- log.makeAlert(
- "It has been %d millis since last scheduled termination but nodes remain",
- durSinceLastTerminate.getMillis()
- ).emit();
+ log.makeAlert("Worker node termination taking too long")
+ .addData("millisSinceLastTerminate", durSinceLastTerminate.getMillis())
+ .addData("terminatingCount", currentlyTerminating.size())
+ .emit();
}
log.info(
@@ -330,7 +330,9 @@ public class RemoteTaskRunner implements TaskRunner
log.info("Registering retry for failed task[%s]", task.getId());
if (retryPolicy.hasExceededRetryThreshold()) {
- log.makeAlert("Task [%s] has failed[%d] times, giving up!", task.getId(), retryPolicy.getNumRetries())
+ log.makeAlert("Task exceeded maximum retry count")
+ .addData("task", task.getId())
+ .addData("retryCount", retryPolicy.getNumRetries())
.emit();
return;
}
@@ -542,10 +544,10 @@ public class RemoteTaskRunner implements TaskRunner
} else {
Duration durSinceLastProvision = new Duration(new DateTime(), lastProvisionTime);
if (durSinceLastProvision.isLongerThan(config.getMaxScalingDuration())) {
- log.makeAlert(
- "It has been %d millis since last scheduled provision but nodes remain",
- durSinceLastProvision.getMillis()
- ).emit();
+ log.makeAlert("Worker node provisioning taking too long")
+ .addData("millisSinceLastProvision", durSinceLastProvision.getMillis())
+ .addData("provisioningCount", currentlyProvisioning.size())
+ .emit();
}
log.info(
diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/exec/TaskConsumer.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/exec/TaskConsumer.java
index 201bfdf338d..ed7ac9f3f25 100644
--- a/merger/src/main/java/com/metamx/druid/merger/coordinator/exec/TaskConsumer.java
+++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/exec/TaskConsumer.java
@@ -20,11 +20,9 @@
package com.metamx.druid.merger.coordinator.exec;
import com.google.common.base.Throwables;
-import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
-import com.metamx.common.logger.Logger;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.merger.common.TaskStatus;
import com.metamx.druid.merger.common.task.Task;
@@ -35,12 +33,9 @@ import com.metamx.druid.merger.coordinator.TaskQueue;
import com.metamx.druid.merger.coordinator.TaskRunner;
import com.metamx.druid.merger.coordinator.VersionedTaskWrapper;
import com.metamx.emitter.EmittingLogger;
-import com.metamx.emitter.service.AlertEvent;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceMetricEvent;
-import java.util.concurrent.ExecutorService;
-
public class TaskConsumer implements Runnable
{
private final TaskQueue queue;
@@ -242,17 +237,12 @@ public class TaskConsumer implements Runnable
emitter.emit(builder.build("indexer/segment/bytes", bytes));
if (status.isFailure()) {
- emitter.emit(
- new AlertEvent.Builder().build(
- String.format("Failed to index: %s", task.getDataSource()),
- ImmutableMap.builder()
- .put("task", task.getId())
- .put("type", task.getType().toString())
- .put("dataSource", task.getDataSource())
- .put("interval", task.getInterval())
- .build()
- )
- );
+ log.makeAlert("Failed to index")
+ .addData("task", task.getId())
+ .addData("type", task.getType().toString())
+ .addData("dataSource", task.getDataSource())
+ .addData("interval", task.getInterval())
+ .emit();
}
log.info(
diff --git a/pom.xml b/pom.xml
index 9d33d272966..b94153ac887 100644
--- a/pom.xml
+++ b/pom.xml
@@ -23,7 +23,7 @@
com.metamx
druid
pom
- 0.1.15-SNAPSHOT
+ 0.1.26-SNAPSHOT
druid
druid
diff --git a/realtime/pom.xml b/realtime/pom.xml
index 71ae520036c..6a0e30f2c8e 100644
--- a/realtime/pom.xml
+++ b/realtime/pom.xml
@@ -28,7 +28,7 @@
com.metamx
druid
- 0.1.15-SNAPSHOT
+ 0.1.26-SNAPSHOT
diff --git a/server/pom.xml b/server/pom.xml
index 62e2784f636..88bfc7e2982 100644
--- a/server/pom.xml
+++ b/server/pom.xml
@@ -28,7 +28,7 @@
com.metamx
druid
- 0.1.15-SNAPSHOT
+ 0.1.26-SNAPSHOT
diff --git a/server/src/main/java/com/metamx/druid/coordination/ServerManager.java b/server/src/main/java/com/metamx/druid/coordination/ServerManager.java
index 1d3675e570d..9d0e8bbdfa2 100644
--- a/server/src/main/java/com/metamx/druid/coordination/ServerManager.java
+++ b/server/src/main/java/com/metamx/druid/coordination/ServerManager.java
@@ -336,7 +336,7 @@ public class ServerManager implements QuerySegmentWalker
adapter.getInterval().getStart(),
factory.createRunner(adapter)
)
- ),
+ ).withWaitMeasuredFromNow(),
segmentSpec
);
}
diff --git a/server/src/main/java/com/metamx/druid/http/ComputeNode.java b/server/src/main/java/com/metamx/druid/http/ComputeNode.java
index 021ad751ad6..39f43bfcde2 100644
--- a/server/src/main/java/com/metamx/druid/http/ComputeNode.java
+++ b/server/src/main/java/com/metamx/druid/http/ComputeNode.java
@@ -40,9 +40,11 @@ import com.metamx.druid.jackson.DefaultObjectMapper;
import com.metamx.druid.loading.QueryableLoaderConfig;
import com.metamx.druid.loading.StorageAdapterLoader;
import com.metamx.druid.metrics.ServerMonitor;
+import com.metamx.druid.query.MetricsEmittingExecutorService;
import com.metamx.druid.query.QueryRunnerFactoryConglomerate;
import com.metamx.druid.utils.PropUtils;
import com.metamx.emitter.service.ServiceEmitter;
+import com.metamx.emitter.service.ServiceMetricEvent;
import com.metamx.metrics.Monitor;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.smile.SmileFactory;
@@ -118,13 +120,16 @@ public class ComputeNode extends BaseServerNode
final List monitors = getMonitors();
final QueryRunnerFactoryConglomerate conglomerate = getConglomerate();
- final ExecutorService executorService = ExecutorServices.create(
- getLifecycle(),
- getConfigFactory().buildWithReplacements(
- ExecutorServiceConfig.class, ImmutableMap.of("base_path", "druid.processing")
- )
+ final ExecutorService executorService = new MetricsEmittingExecutorService(
+ ExecutorServices.create(
+ getLifecycle(),
+ getConfigFactory().buildWithReplacements(
+ ExecutorServiceConfig.class, ImmutableMap.of("base_path", "druid.processing")
+ )
+ ), emitter, new ServiceMetricEvent.Builder()
);
- ServerManager serverManager = new ServerManager(adapterLoader, conglomerate, emitter, executorService);
+
+ final ServerManager serverManager = new ServerManager(adapterLoader, conglomerate, emitter, executorService);
final ZkCoordinator coordinator = new ZkCoordinator(
getJsonMapper(),
diff --git a/server/src/main/java/com/metamx/druid/http/InfoResource.java b/server/src/main/java/com/metamx/druid/http/InfoResource.java
index 315acdf8ce8..800e3a93e46 100644
--- a/server/src/main/java/com/metamx/druid/http/InfoResource.java
+++ b/server/src/main/java/com/metamx/druid/http/InfoResource.java
@@ -33,7 +33,6 @@ import com.metamx.druid.client.ServerInventoryManager;
import com.metamx.druid.coordination.DruidClusterInfo;
import com.metamx.druid.db.DatabaseRuleManager;
import com.metamx.druid.db.DatabaseSegmentManager;
-import com.metamx.druid.jackson.DefaultObjectMapper;
import com.metamx.druid.master.rules.Rule;
import javax.annotation.Nullable;
@@ -47,6 +46,7 @@ import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Response;
+import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
@@ -556,7 +556,7 @@ public class InfoResource
return builder.entity(databaseSegmentManager.getInventory()).build();
}
- return builder.entity(
+ List dataSourceNames = Lists.newArrayList(
Iterables.transform(
databaseSegmentManager.getInventory(),
new Function()
@@ -568,7 +568,11 @@ public class InfoResource
}
}
)
- ).build();
+ );
+
+ Collections.sort(dataSourceNames);
+
+ return builder.entity(dataSourceNames).build();
}
@GET
diff --git a/server/src/main/java/com/metamx/druid/master/DruidMasterRuleRunner.java b/server/src/main/java/com/metamx/druid/master/DruidMasterRuleRunner.java
index 56d27f47544..5c33a257f26 100644
--- a/server/src/main/java/com/metamx/druid/master/DruidMasterRuleRunner.java
+++ b/server/src/main/java/com/metamx/druid/master/DruidMasterRuleRunner.java
@@ -23,6 +23,7 @@ import com.metamx.druid.client.DataSegment;
import com.metamx.druid.db.DatabaseRuleManager;
import com.metamx.druid.master.rules.Rule;
import com.metamx.emitter.EmittingLogger;
+import org.joda.time.DateTime;
import java.util.List;
@@ -51,13 +52,14 @@ public class DruidMasterRuleRunner implements DruidMasterHelper
}
// Run through all matched rules for available segments
+ DateTime now = new DateTime();
DatabaseRuleManager databaseRuleManager = params.getDatabaseRuleManager();
for (DataSegment segment : params.getAvailableSegments()) {
List rules = databaseRuleManager.getRulesWithDefault(segment.getDataSource());
boolean foundMatchingRule = false;
for (Rule rule : rules) {
- if (rule.appliesTo(segment)) {
+ if (rule.appliesTo(segment, now)) {
stats.accumulate(rule.run(master, params, segment));
foundMatchingRule = true;
break;
diff --git a/server/src/main/java/com/metamx/druid/master/SegmentReplicantLookup.java b/server/src/main/java/com/metamx/druid/master/SegmentReplicantLookup.java
index 5c5d3bee655..560787247f5 100644
--- a/server/src/main/java/com/metamx/druid/master/SegmentReplicantLookup.java
+++ b/server/src/main/java/com/metamx/druid/master/SegmentReplicantLookup.java
@@ -24,7 +24,6 @@ import com.google.common.collect.Maps;
import com.google.common.collect.MinMaxPriorityQueue;
import com.google.common.collect.Table;
import com.metamx.druid.client.DataSegment;
-import com.metamx.druid.client.DruidDataSource;
import com.metamx.druid.client.DruidServer;
import java.util.Map;
@@ -37,51 +36,72 @@ public class SegmentReplicantLookup
public static SegmentReplicantLookup make(DruidCluster cluster)
{
final Table segmentsInCluster = HashBasedTable.create();
+ final Table loadingSegments = HashBasedTable.create();
for (MinMaxPriorityQueue serversByType : cluster.getSortedServersByTier()) {
for (ServerHolder serverHolder : serversByType) {
DruidServer server = serverHolder.getServer();
- for (DruidDataSource dataSource : server.getDataSources()) {
- for (DataSegment segment : dataSource.getSegments()) {
- Integer numReplicants = segmentsInCluster.get(segment.getIdentifier(), server.getTier());
- if (numReplicants == null) {
- numReplicants = 0;
- }
- segmentsInCluster.put(segment.getIdentifier(), server.getTier(), ++numReplicants);
- }
- }
-
- // Also account for queued segments
- for (DataSegment segment : serverHolder.getPeon().getSegmentsToLoad()) {
+ for (DataSegment segment : server.getSegments().values()) {
Integer numReplicants = segmentsInCluster.get(segment.getIdentifier(), server.getTier());
if (numReplicants == null) {
numReplicants = 0;
}
segmentsInCluster.put(segment.getIdentifier(), server.getTier(), ++numReplicants);
}
+
+ // Also account for queued segments
+ for (DataSegment segment : serverHolder.getPeon().getSegmentsToLoad()) {
+ Integer numReplicants = loadingSegments.get(segment.getIdentifier(), server.getTier());
+ if (numReplicants == null) {
+ numReplicants = 0;
+ }
+ loadingSegments.put(segment.getIdentifier(), server.getTier(), ++numReplicants);
+ }
}
}
- return new SegmentReplicantLookup(segmentsInCluster);
+ return new SegmentReplicantLookup(segmentsInCluster, loadingSegments);
}
- private final Table table;
+ private final Table segmentsInCluster;
+ private final Table loadingSegments;
- private SegmentReplicantLookup(Table table)
+ private SegmentReplicantLookup(
+ Table segmentsInCluster,
+ Table loadingSegments
+ )
{
- this.table = table;
+ this.segmentsInCluster = segmentsInCluster;
+ this.loadingSegments = loadingSegments;
}
- public Map getTiers(String segmentId)
+ public Map getClusterTiers(String segmentId)
{
- Map retVal = table.row(segmentId);
+ Map retVal = segmentsInCluster.row(segmentId);
return (retVal == null) ? Maps.newHashMap() : retVal;
}
- public int lookup(String segmentId, String tier)
+ public Map getLoadingTiers(String segmentId)
+ {
+ Map retVal = loadingSegments.row(segmentId);
+ return (retVal == null) ? Maps.newHashMap() : retVal;
+ }
+
+ public int getClusterReplicants(String segmentId, String tier)
{
- Integer retVal = table.get(segmentId, tier);
+ Integer retVal = segmentsInCluster.get(segmentId, tier);
return (retVal == null) ? 0 : retVal;
}
+
+ public int getLoadingReplicants(String segmentId, String tier)
+ {
+ Integer retVal = loadingSegments.get(segmentId, tier);
+ return (retVal == null) ? 0 : retVal;
+ }
+
+ public int getTotalReplicants(String segmentId, String tier)
+ {
+ return getClusterReplicants(segmentId, tier) + getLoadingReplicants(segmentId, tier);
+ }
}
diff --git a/server/src/main/java/com/metamx/druid/master/ServerHolder.java b/server/src/main/java/com/metamx/druid/master/ServerHolder.java
index 6cf4d65ce3f..5fba424f429 100644
--- a/server/src/main/java/com/metamx/druid/master/ServerHolder.java
+++ b/server/src/main/java/com/metamx/druid/master/ServerHolder.java
@@ -94,9 +94,14 @@ public class ServerHolder implements Comparable
return availableSize;
}
- public boolean containsSegment(DataSegment segment)
+ public boolean isServingSegment(DataSegment segment)
{
- return (server.getSegment(segment.getIdentifier()) != null || peon.getSegmentsToLoad().contains(segment));
+ return (server.getSegment(segment.getIdentifier()) != null);
+ }
+
+ public boolean isLoadingSegment(DataSegment segment)
+ {
+ return peon.getSegmentsToLoad().contains(segment);
}
@Override
diff --git a/server/src/main/java/com/metamx/druid/master/rules/IntervalDropRule.java b/server/src/main/java/com/metamx/druid/master/rules/IntervalDropRule.java
index 6546fce40b3..0acdd8bc2f8 100644
--- a/server/src/main/java/com/metamx/druid/master/rules/IntervalDropRule.java
+++ b/server/src/main/java/com/metamx/druid/master/rules/IntervalDropRule.java
@@ -22,6 +22,7 @@ package com.metamx.druid.master.rules;
import com.metamx.druid.client.DataSegment;
import org.codehaus.jackson.annotate.JsonCreator;
import org.codehaus.jackson.annotate.JsonProperty;
+import org.joda.time.DateTime;
import org.joda.time.Interval;
/**
@@ -52,7 +53,7 @@ public class IntervalDropRule extends DropRule
}
@Override
- public boolean appliesTo(DataSegment segment)
+ public boolean appliesTo(DataSegment segment, DateTime referenceTimestamp)
{
return interval.contains(segment.getInterval());
}
diff --git a/server/src/main/java/com/metamx/druid/master/rules/IntervalLoadRule.java b/server/src/main/java/com/metamx/druid/master/rules/IntervalLoadRule.java
index 8c77594a177..5aa984ccba8 100644
--- a/server/src/main/java/com/metamx/druid/master/rules/IntervalLoadRule.java
+++ b/server/src/main/java/com/metamx/druid/master/rules/IntervalLoadRule.java
@@ -23,6 +23,7 @@ import com.metamx.common.logger.Logger;
import com.metamx.druid.client.DataSegment;
import org.codehaus.jackson.annotate.JsonCreator;
import org.codehaus.jackson.annotate.JsonProperty;
+import org.joda.time.DateTime;
import org.joda.time.Interval;
/**
@@ -81,7 +82,7 @@ public class IntervalLoadRule extends LoadRule
}
@Override
- public boolean appliesTo(DataSegment segment)
+ public boolean appliesTo(DataSegment segment, DateTime referenceTimestamp)
{
return interval.contains(segment.getInterval());
}
diff --git a/server/src/main/java/com/metamx/druid/master/rules/LoadRule.java b/server/src/main/java/com/metamx/druid/master/rules/LoadRule.java
index b3d256d2066..5bf232c9474 100644
--- a/server/src/main/java/com/metamx/druid/master/rules/LoadRule.java
+++ b/server/src/main/java/com/metamx/druid/master/rules/LoadRule.java
@@ -22,7 +22,6 @@ package com.metamx.druid.master.rules;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.MinMaxPriorityQueue;
-import com.metamx.common.ISE;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.master.DruidMaster;
import com.metamx.druid.master.DruidMasterRuntimeParams;
@@ -47,7 +46,8 @@ public abstract class LoadRule implements Rule
MasterStats stats = new MasterStats();
int expectedReplicants = getReplicants();
- int actualReplicants = params.getSegmentReplicantLookup().lookup(segment.getIdentifier(), getTier());
+ int totalReplicants = params.getSegmentReplicantLookup().getTotalReplicants(segment.getIdentifier(), getTier());
+ int clusterReplicants = params.getSegmentReplicantLookup().getClusterReplicants(segment.getIdentifier(), getTier());
MinMaxPriorityQueue serverQueue = params.getDruidCluster().getServersByTier(getTier());
if (serverQueue == null) {
@@ -55,15 +55,15 @@ public abstract class LoadRule implements Rule
return stats;
}
- stats.accumulate(assign(expectedReplicants, actualReplicants, serverQueue, segment));
- stats.accumulate(drop(expectedReplicants, actualReplicants, segment, params));
+ stats.accumulate(assign(expectedReplicants, totalReplicants, serverQueue, segment));
+ stats.accumulate(drop(expectedReplicants, clusterReplicants, segment, params));
return stats;
}
private MasterStats assign(
int expectedReplicants,
- int actualReplicants,
+ int totalReplicants,
MinMaxPriorityQueue serverQueue,
DataSegment segment
)
@@ -71,7 +71,7 @@ public abstract class LoadRule implements Rule
MasterStats stats = new MasterStats();
List assignedServers = Lists.newArrayList();
- while (actualReplicants < expectedReplicants) {
+ while (totalReplicants < expectedReplicants) {
ServerHolder holder = serverQueue.pollFirst();
if (holder == null) {
log.warn(
@@ -83,7 +83,8 @@ public abstract class LoadRule implements Rule
);
break;
}
- if (holder.containsSegment(segment)) {
+ if (holder.isServingSegment(segment) || holder.isLoadingSegment(segment)) {
+ assignedServers.add(holder);
continue;
}
@@ -121,7 +122,7 @@ public abstract class LoadRule implements Rule
assignedServers.add(holder);
stats.addToTieredStat("assignedCount", getTier(), 1);
- ++actualReplicants;
+ ++totalReplicants;
}
serverQueue.addAll(assignedServers);
@@ -130,7 +131,7 @@ public abstract class LoadRule implements Rule
private MasterStats drop(
int expectedReplicants,
- int actualReplicants,
+ int clusterReplicants,
DataSegment segment,
DruidMasterRuntimeParams params
)
@@ -142,11 +143,11 @@ public abstract class LoadRule implements Rule
}
// Make sure we have enough actual replicants in the cluster before doing anything
- if (actualReplicants < expectedReplicants) {
+ if (clusterReplicants < expectedReplicants) {
return stats;
}
- Map replicantsByType = params.getSegmentReplicantLookup().getTiers(segment.getIdentifier());
+ Map replicantsByType = params.getSegmentReplicantLookup().getClusterTiers(segment.getIdentifier());
for (Map.Entry entry : replicantsByType.entrySet()) {
String tier = entry.getKey();
@@ -163,23 +164,25 @@ public abstract class LoadRule implements Rule
while (actualNumReplicantsForType > expectedNumReplicantsForType) {
ServerHolder holder = serverQueue.pollLast();
if (holder == null) {
- log.warn("Wtf, holder was null? Do I have no servers[%s]?", serverQueue);
- continue;
+ log.warn("Wtf, holder was null? I have no servers serving [%s]?", segment.getIdentifier());
+ break;
}
- holder.getPeon().dropSegment(
- segment,
- new LoadPeonCallback()
- {
- @Override
- protected void execute()
+ if (holder.isServingSegment(segment)) {
+ holder.getPeon().dropSegment(
+ segment,
+ new LoadPeonCallback()
{
+ @Override
+ protected void execute()
+ {
+ }
}
- }
- );
+ );
+ --actualNumReplicantsForType;
+ stats.addToTieredStat("droppedCount", tier, 1);
+ }
droppedServers.add(holder);
- --actualNumReplicantsForType;
- stats.addToTieredStat("droppedCount", tier, 1);
}
serverQueue.addAll(droppedServers);
}
diff --git a/server/src/main/java/com/metamx/druid/master/rules/PeriodDropRule.java b/server/src/main/java/com/metamx/druid/master/rules/PeriodDropRule.java
index ce3c472a28f..152f074dc3a 100644
--- a/server/src/main/java/com/metamx/druid/master/rules/PeriodDropRule.java
+++ b/server/src/main/java/com/metamx/druid/master/rules/PeriodDropRule.java
@@ -54,9 +54,9 @@ public class PeriodDropRule extends DropRule
}
@Override
- public boolean appliesTo(DataSegment segment)
+ public boolean appliesTo(DataSegment segment, DateTime referenceTimestamp)
{
- final Interval currInterval = new Interval(new DateTime().minus(period), period);
+ final Interval currInterval = new Interval(period, referenceTimestamp);
return currInterval.contains(segment.getInterval());
}
}
diff --git a/server/src/main/java/com/metamx/druid/master/rules/PeriodLoadRule.java b/server/src/main/java/com/metamx/druid/master/rules/PeriodLoadRule.java
index 84128e4a74d..051967e65ab 100644
--- a/server/src/main/java/com/metamx/druid/master/rules/PeriodLoadRule.java
+++ b/server/src/main/java/com/metamx/druid/master/rules/PeriodLoadRule.java
@@ -81,9 +81,9 @@ public class PeriodLoadRule extends LoadRule
}
@Override
- public boolean appliesTo(DataSegment segment)
+ public boolean appliesTo(DataSegment segment, DateTime referenceTimestamp)
{
- final Interval currInterval = new Interval(new DateTime().minus(period), period);
+ final Interval currInterval = new Interval(period, referenceTimestamp);
return currInterval.overlaps(segment.getInterval());
}
}
diff --git a/server/src/main/java/com/metamx/druid/master/rules/Rule.java b/server/src/main/java/com/metamx/druid/master/rules/Rule.java
index a6fbfa358cc..a6eced93c68 100644
--- a/server/src/main/java/com/metamx/druid/master/rules/Rule.java
+++ b/server/src/main/java/com/metamx/druid/master/rules/Rule.java
@@ -25,6 +25,7 @@ import com.metamx.druid.master.DruidMasterRuntimeParams;
import com.metamx.druid.master.MasterStats;
import org.codehaus.jackson.annotate.JsonSubTypes;
import org.codehaus.jackson.annotate.JsonTypeInfo;
+import org.joda.time.DateTime;
/**
*/
@@ -40,7 +41,7 @@ public interface Rule
{
public String getType();
- public boolean appliesTo(DataSegment segment);
+ public boolean appliesTo(DataSegment segment, DateTime referenceTimestamp);
public MasterStats run(DruidMaster master, DruidMasterRuntimeParams params, DataSegment segment);
}
diff --git a/server/src/main/java/com/metamx/druid/query/metadata/SegmentMetadataQueryEngine.java b/server/src/main/java/com/metamx/druid/query/metadata/SegmentMetadataQueryEngine.java
index edc4ea3eda8..7522b4b4750 100644
--- a/server/src/main/java/com/metamx/druid/query/metadata/SegmentMetadataQueryEngine.java
+++ b/server/src/main/java/com/metamx/druid/query/metadata/SegmentMetadataQueryEngine.java
@@ -72,7 +72,7 @@ public class SegmentMetadataQueryEngine
final Indexed lookup = adapter.getDimValueLookup(input);
for (String dimVal : lookup) {
ImmutableConciseSet index = adapter.getInvertedIndex(input, dimVal);
- size += index.size() * Charsets.UTF_8.encode(dimVal).capacity();
+ size += (dimVal == null) ? 0 : index.size() * Charsets.UTF_8.encode(dimVal).capacity();
}
return new SegmentMetadataResultValue.Dimension(
size,
diff --git a/server/src/test/java/com/metamx/druid/master/DruidMasterRuleRunnerTest.java b/server/src/test/java/com/metamx/druid/master/DruidMasterRuleRunnerTest.java
index c31dc670b82..2ac32578cbf 100644
--- a/server/src/test/java/com/metamx/druid/master/DruidMasterRuleRunnerTest.java
+++ b/server/src/test/java/com/metamx/druid/master/DruidMasterRuleRunnerTest.java
@@ -31,7 +31,6 @@ import com.metamx.druid.db.DatabaseRuleManager;
import com.metamx.druid.master.rules.IntervalDropRule;
import com.metamx.druid.master.rules.IntervalLoadRule;
import com.metamx.druid.master.rules.Rule;
-import com.metamx.druid.master.rules.RuleMap;
import com.metamx.druid.shard.NoneShardSpec;
import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.service.ServiceEmitter;
@@ -86,14 +85,6 @@ public class DruidMasterRuleRunnerTest
}
ruleRunner = new DruidMasterRuleRunner(master);
-
- mockPeon.loadSegment(EasyMock.anyObject(), EasyMock.anyObject());
- EasyMock.expectLastCall().anyTimes();
- mockPeon.dropSegment(EasyMock.anyObject(), EasyMock.anyObject());
- EasyMock.expectLastCall().anyTimes();
- EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.newHashSet()).anyTimes();
- EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(0L).atLeastOnce();
- EasyMock.replay(mockPeon);
}
@After
@@ -113,6 +104,12 @@ public class DruidMasterRuleRunnerTest
@Test
public void testRunThreeTiersOneReplicant() throws Exception
{
+ mockPeon.loadSegment(EasyMock.anyObject(), EasyMock.anyObject());
+ EasyMock.expectLastCall().atLeastOnce();
+ EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.newHashSet()).atLeastOnce();
+ EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(0L).atLeastOnce();
+ EasyMock.replay(mockPeon);
+
EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.anyObject())).andReturn(
Lists.newArrayList(
new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-01T06:00:00.000Z"), 1, "hot"),
@@ -202,6 +199,12 @@ public class DruidMasterRuleRunnerTest
@Test
public void testRunTwoTiersTwoReplicants() throws Exception
{
+ mockPeon.loadSegment(EasyMock.anyObject(), EasyMock.anyObject());
+ EasyMock.expectLastCall().atLeastOnce();
+ EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.newHashSet()).atLeastOnce();
+ EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(0L).atLeastOnce();
+ EasyMock.replay(mockPeon);
+
EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.anyObject())).andReturn(
Lists.newArrayList(
new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-01T06:00:00.000Z"), 2, "hot"),
@@ -284,6 +287,12 @@ public class DruidMasterRuleRunnerTest
@Test
public void testRunTwoTiersWithExistingSegments() throws Exception
{
+ mockPeon.loadSegment(EasyMock.anyObject(), EasyMock.anyObject());
+ EasyMock.expectLastCall().atLeastOnce();
+ EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.newHashSet()).atLeastOnce();
+ EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(0L).atLeastOnce();
+ EasyMock.replay(mockPeon);
+
EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.anyObject())).andReturn(
Lists.newArrayList(
new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-01T12:00:00.000Z"), 1, "hot"),
@@ -356,6 +365,12 @@ public class DruidMasterRuleRunnerTest
@Test
public void testRunTwoTiersTierDoesNotExist() throws Exception
{
+ mockPeon.loadSegment(EasyMock.anyObject(), EasyMock.anyObject());
+ EasyMock.expectLastCall().atLeastOnce();
+ EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.newHashSet()).atLeastOnce();
+ EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(0L).atLeastOnce();
+ EasyMock.replay(mockPeon);
+
emitter.emit(EasyMock.anyObject());
EasyMock.expectLastCall().times(12);
EasyMock.replay(emitter);
@@ -455,6 +470,12 @@ public class DruidMasterRuleRunnerTest
@Test
public void testDropRemove() throws Exception
{
+ mockPeon.dropSegment(EasyMock.anyObject(), EasyMock.anyObject());
+ EasyMock.expectLastCall().atLeastOnce();
+ EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.newHashSet()).atLeastOnce();
+ EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(0L).atLeastOnce();
+ EasyMock.replay(mockPeon);
+
master.removeSegment(EasyMock.anyObject());
EasyMock.expectLastCall().atLeastOnce();
EasyMock.replay(master);
@@ -513,6 +534,12 @@ public class DruidMasterRuleRunnerTest
@Test
public void testDropTooManyInSameTier() throws Exception
{
+ mockPeon.dropSegment(EasyMock.anyObject(), EasyMock.anyObject());
+ EasyMock.expectLastCall().atLeastOnce();
+ EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.newHashSet()).atLeastOnce();
+ EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(0L).atLeastOnce();
+ EasyMock.replay(mockPeon);
+
EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.anyObject())).andReturn(
Lists.newArrayList(
new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-01T12:00:00.000Z"), 1, "normal"),
@@ -581,6 +608,14 @@ public class DruidMasterRuleRunnerTest
@Test
public void testDropTooManyInDifferentTiers() throws Exception
{
+ mockPeon.loadSegment(EasyMock.anyObject(), EasyMock.anyObject());
+ EasyMock.expectLastCall().atLeastOnce();
+ mockPeon.dropSegment(EasyMock.anyObject(), EasyMock.anyObject());
+ EasyMock.expectLastCall().atLeastOnce();
+ EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.newHashSet()).atLeastOnce();
+ EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(0L).atLeastOnce();
+ EasyMock.replay(mockPeon);
+
EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.anyObject())).andReturn(
Lists.newArrayList(
new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-01T12:00:00.000Z"), 1, "hot"),
@@ -653,6 +688,12 @@ public class DruidMasterRuleRunnerTest
@Test
public void testDontDropInDifferentTiers() throws Exception
{
+ mockPeon.loadSegment(EasyMock.anyObject(), EasyMock.anyObject());
+ EasyMock.expectLastCall().atLeastOnce();
+ EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.newHashSet()).atLeastOnce();
+ EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(0L).atLeastOnce();
+ EasyMock.replay(mockPeon);
+
EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.anyObject())).andReturn(
Lists.newArrayList(
new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-01T12:00:00.000Z"), 1, "hot"),
@@ -716,5 +757,95 @@ public class DruidMasterRuleRunnerTest
Assert.assertTrue(stats.getPerTierStats().get("droppedCount") == null);
Assert.assertTrue(stats.getGlobalStats().get("deletedCount").get() == 12);
+
+ EasyMock.verify(mockPeon);
+ }
+
+ @Test
+ public void testDropServerActuallyServesSegment() throws Exception
+ {
+ EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.anyObject())).andReturn(
+ Lists.newArrayList(
+ new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-01T01:00:00.000Z"), 0, "normal")
+ )
+ ).atLeastOnce();
+ EasyMock.replay(databaseRuleManager);
+
+ DruidServer server1 = new DruidServer(
+ "server1",
+ "host1",
+ 1000,
+ "historical",
+ "normal"
+ );
+ server1.addDataSegment(availableSegments.get(0).getIdentifier(), availableSegments.get(0));
+ DruidServer server2 = new DruidServer(
+ "serverNorm2",
+ "hostNorm2",
+ 1000,
+ "historical",
+ "normal"
+ );
+ server2.addDataSegment(availableSegments.get(1).getIdentifier(), availableSegments.get(1));
+ DruidServer server3 = new DruidServer(
+ "serverNorm3",
+ "hostNorm3",
+ 1000,
+ "historical",
+ "normal"
+ );
+ server3.addDataSegment(availableSegments.get(1).getIdentifier(), availableSegments.get(1));
+ server3.addDataSegment(availableSegments.get(2).getIdentifier(), availableSegments.get(2));
+
+ mockPeon.dropSegment(EasyMock.anyObject(), EasyMock.anyObject());
+ EasyMock.expectLastCall().atLeastOnce();
+ EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.newHashSet()).atLeastOnce();
+ EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(0L).atLeastOnce();
+ EasyMock.replay(mockPeon);
+
+ LoadQueuePeon anotherMockPeon = EasyMock.createMock(LoadQueuePeon.class);
+ EasyMock.expect(anotherMockPeon.getSegmentsToLoad()).andReturn(Sets.newHashSet()).atLeastOnce();
+ EasyMock.expect(anotherMockPeon.getLoadQueueSize()).andReturn(10L).atLeastOnce();
+ EasyMock.replay(anotherMockPeon);
+
+ DruidCluster druidCluster = new DruidCluster(
+ ImmutableMap.of(
+ "normal",
+ MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create(
+ Arrays.asList(
+ new ServerHolder(
+ server1,
+ mockPeon
+ ),
+ new ServerHolder(
+ server2,
+ anotherMockPeon
+ ),
+ new ServerHolder(
+ server3,
+ anotherMockPeon
+ )
+ )
+ )
+ )
+ );
+
+ SegmentReplicantLookup segmentReplicantLookup = SegmentReplicantLookup.make(druidCluster);
+
+ DruidMasterRuntimeParams params = new DruidMasterRuntimeParams.Builder()
+ .withDruidCluster(druidCluster)
+ .withMillisToWaitBeforeDeleting(0L)
+ .withAvailableSegments(availableSegments)
+ .withDatabaseRuleManager(databaseRuleManager)
+ .withSegmentReplicantLookup(segmentReplicantLookup)
+ .build();
+
+ DruidMasterRuntimeParams afterParams = ruleRunner.run(params);
+ MasterStats stats = afterParams.getMasterStats();
+
+ Assert.assertTrue(stats.getPerTierStats().get("droppedCount").get("normal").get() == 1);
+
+ EasyMock.verify(mockPeon);
+ EasyMock.verify(anotherMockPeon);
}
}
diff --git a/server/src/test/java/com/metamx/druid/master/rules/PeriodDropRuleTest.java b/server/src/test/java/com/metamx/druid/master/rules/PeriodDropRuleTest.java
index ae0c7cedd2e..c6bceb08e5e 100644
--- a/server/src/test/java/com/metamx/druid/master/rules/PeriodDropRuleTest.java
+++ b/server/src/test/java/com/metamx/druid/master/rules/PeriodDropRuleTest.java
@@ -33,13 +33,13 @@ public class PeriodDropRuleTest
{
private final static DataSegment.Builder builder = DataSegment.builder()
.dataSource("test")
- .version(new DateTime().toString())
+ .version(new DateTime("2012-12-31T01:00:00").toString())
.shardSpec(new NoneShardSpec());
@Test
public void testAppliesToAll()
{
- DateTime now = new DateTime();
+ DateTime now = new DateTime("2012-12-31T01:00:00");
PeriodDropRule rule = new PeriodDropRule(
new Period("P5000Y")
);
@@ -51,13 +51,15 @@ public class PeriodDropRuleTest
now.minusDays(2),
now.minusDays(1)
)
- ).build()
+ ).build(),
+ now
)
);
Assert.assertTrue(
rule.appliesTo(
builder.interval(new Interval(now.minusYears(100), now.minusDays(1)))
- .build()
+ .build(),
+ now
)
);
}
@@ -65,7 +67,7 @@ public class PeriodDropRuleTest
@Test
public void testAppliesToPeriod()
{
- DateTime now = new DateTime();
+ DateTime now = new DateTime("2012-12-31T01:00:00");
PeriodDropRule rule = new PeriodDropRule(
new Period("P1M")
);
@@ -73,19 +75,29 @@ public class PeriodDropRuleTest
Assert.assertTrue(
rule.appliesTo(
builder.interval(new Interval(now.minusWeeks(1), now.minusDays(1)))
- .build()
+ .build(),
+ now
+ )
+ );
+ Assert.assertTrue(
+ rule.appliesTo(
+ builder.interval(new Interval(now.minusDays(1), now))
+ .build(),
+ now
)
);
Assert.assertFalse(
rule.appliesTo(
builder.interval(new Interval(now.minusYears(1), now.minusDays(1)))
- .build()
+ .build(),
+ now
)
);
Assert.assertFalse(
rule.appliesTo(
builder.interval(new Interval(now.minusMonths(2), now.minusDays(1)))
- .build()
+ .build(),
+ now
)
);
}
diff --git a/server/src/test/java/com/metamx/druid/master/rules/PeriodLoadRuleTest.java b/server/src/test/java/com/metamx/druid/master/rules/PeriodLoadRuleTest.java
index 283d684cb07..3944d96ecb9 100644
--- a/server/src/test/java/com/metamx/druid/master/rules/PeriodLoadRuleTest.java
+++ b/server/src/test/java/com/metamx/druid/master/rules/PeriodLoadRuleTest.java
@@ -39,38 +39,41 @@ public class PeriodLoadRuleTest
@Test
public void testAppliesToAll()
{
+ DateTime now = new DateTime("2013-01-01");
PeriodLoadRule rule = new PeriodLoadRule(
new Period("P5000Y"),
0,
""
);
- Assert.assertTrue(rule.appliesTo(builder.interval(new Interval("2012-01-01/2012-12-31")).build()));
- Assert.assertTrue(rule.appliesTo(builder.interval(new Interval("1000-01-01/2012-12-31")).build()));
- Assert.assertTrue(rule.appliesTo(builder.interval(new Interval("0500-01-01/2100-12-31")).build()));
+ Assert.assertTrue(rule.appliesTo(builder.interval(new Interval("2012-01-01/2012-12-31")).build(), now));
+ Assert.assertTrue(rule.appliesTo(builder.interval(new Interval("1000-01-01/2012-12-31")).build(), now));
+ Assert.assertTrue(rule.appliesTo(builder.interval(new Interval("0500-01-01/2100-12-31")).build(), now));
}
@Test
public void testAppliesToPeriod()
{
- DateTime now = new DateTime();
+ DateTime now = new DateTime("2012-12-31T01:00:00");
PeriodLoadRule rule = new PeriodLoadRule(
new Period("P1M"),
0,
""
);
- Assert.assertTrue(rule.appliesTo(builder.interval(new Interval(now.minusWeeks(1), now)).build()));
+ Assert.assertTrue(rule.appliesTo(builder.interval(new Interval(now.minusWeeks(1), now)).build(), now));
Assert.assertTrue(
rule.appliesTo(
builder.interval(new Interval(now.minusDays(1), now.plusDays(1)))
- .build()
+ .build(),
+ now
)
);
Assert.assertFalse(
rule.appliesTo(
builder.interval(new Interval(now.plusDays(1), now.plusDays(2)))
- .build()
+ .build(),
+ now
)
);
}