Merge branch 'master' of github.com:metamx/druid

This commit is contained in:
Eric Tschetter 2013-01-04 11:28:35 -06:00
commit 4f5087b386
42 changed files with 434 additions and 145 deletions

View File

@ -28,7 +28,7 @@
<parent> <parent>
<groupId>com.metamx</groupId> <groupId>com.metamx</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<version>0.1.15-SNAPSHOT</version> <version>0.1.26-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>

View File

@ -21,6 +21,7 @@ package com.metamx.druid;
import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequence;
import com.metamx.druid.query.group.GroupByQuery; import com.metamx.druid.query.group.GroupByQuery;
import com.metamx.druid.query.metadata.SegmentMetadataQuery;
import com.metamx.druid.query.search.SearchQuery; import com.metamx.druid.query.search.SearchQuery;
import com.metamx.druid.query.segment.QuerySegmentSpec; import com.metamx.druid.query.segment.QuerySegmentSpec;
import com.metamx.druid.query.segment.QuerySegmentWalker; import com.metamx.druid.query.segment.QuerySegmentWalker;
@ -39,7 +40,8 @@ import java.util.Map;
@JsonSubTypes.Type(name = Query.TIMESERIES, value = TimeseriesQuery.class), @JsonSubTypes.Type(name = Query.TIMESERIES, value = TimeseriesQuery.class),
@JsonSubTypes.Type(name = Query.SEARCH, value = SearchQuery.class), @JsonSubTypes.Type(name = Query.SEARCH, value = SearchQuery.class),
@JsonSubTypes.Type(name = Query.TIME_BOUNDARY, value = TimeBoundaryQuery.class), @JsonSubTypes.Type(name = Query.TIME_BOUNDARY, value = TimeBoundaryQuery.class),
@JsonSubTypes.Type(name = "groupBy", value= GroupByQuery.class) @JsonSubTypes.Type(name = "groupBy", value= GroupByQuery.class),
@JsonSubTypes.Type(name = "segmentMetadata", value= SegmentMetadataQuery.class)
}) })
public interface Query<T> public interface Query<T>
{ {

View File

@ -83,7 +83,7 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker
} }
}, },
toolChest.preMergeQueryDecoration(baseClient) toolChest.preMergeQueryDecoration(baseClient)
) ).withWaitMeasuredFromNow()
) )
), ),
toolChest toolChest

View File

@ -0,0 +1,72 @@
package com.metamx.druid.query;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceMetricEvent;
import java.util.List;
import java.util.concurrent.AbstractExecutorService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class MetricsEmittingExecutorService extends AbstractExecutorService
{
private final ExecutorService base;
private final ServiceEmitter emitter;
private final ServiceMetricEvent.Builder metricBuilder;
public MetricsEmittingExecutorService(
ExecutorService base,
ServiceEmitter emitter,
ServiceMetricEvent.Builder metricBuilder
)
{
this.base = base;
this.emitter = emitter;
this.metricBuilder = metricBuilder;
}
@Override
public void shutdown()
{
base.shutdown();
}
@Override
public List<Runnable> shutdownNow()
{
return base.shutdownNow();
}
@Override
public boolean isShutdown()
{
return base.isShutdown();
}
@Override
public boolean isTerminated()
{
return base.isTerminated();
}
@Override
public boolean awaitTermination(long l, TimeUnit timeUnit) throws InterruptedException
{
return base.awaitTermination(l, timeUnit);
}
@Override
public void execute(Runnable runnable)
{
emitMetrics();
base.execute(runnable);
}
private void emitMetrics()
{
if (base instanceof ThreadPoolExecutor) {
emitter.emit(metricBuilder.build("exec/backlog", ((ThreadPoolExecutor) base).getQueue().size()));
}
}
}

View File

@ -37,16 +37,33 @@ public class MetricsEmittingQueryRunner<T> implements QueryRunner<T>
private final ServiceEmitter emitter; private final ServiceEmitter emitter;
private final Function<Query<T>, ServiceMetricEvent.Builder> builderFn; private final Function<Query<T>, ServiceMetricEvent.Builder> builderFn;
private final QueryRunner<T> queryRunner; private final QueryRunner<T> queryRunner;
private final long creationTime;
public MetricsEmittingQueryRunner( public MetricsEmittingQueryRunner(
ServiceEmitter emitter, ServiceEmitter emitter,
Function<Query<T>, ServiceMetricEvent.Builder> builderFn, Function<Query<T>, ServiceMetricEvent.Builder> builderFn,
QueryRunner<T> queryRunner QueryRunner<T> queryRunner
) )
{
this(emitter, builderFn, queryRunner, -1);
}
public MetricsEmittingQueryRunner(
ServiceEmitter emitter,
Function<Query<T>, ServiceMetricEvent.Builder> builderFn,
QueryRunner<T> queryRunner,
long creationTime
)
{ {
this.emitter = emitter; this.emitter = emitter;
this.builderFn = builderFn; this.builderFn = builderFn;
this.queryRunner = queryRunner; this.queryRunner = queryRunner;
this.creationTime = creationTime;
}
public MetricsEmittingQueryRunner<T> withWaitMeasuredFromNow()
{
return new MetricsEmittingQueryRunner<T>(emitter, builderFn, queryRunner, System.currentTimeMillis());
} }
@Override @Override
@ -77,6 +94,10 @@ public class MetricsEmittingQueryRunner<T> implements QueryRunner<T>
long timeTaken = System.currentTimeMillis() - startTime; long timeTaken = System.currentTimeMillis() - startTime;
emitter.emit(builder.build("query/time", timeTaken)); emitter.emit(builder.build("query/time", timeTaken));
if(creationTime > 0) {
emitter.emit(builder.build("query/wait", startTime - creationTime));
}
} }
return retVal; return retVal;
@ -149,6 +170,10 @@ public class MetricsEmittingQueryRunner<T> implements QueryRunner<T>
long timeTaken = System.currentTimeMillis() - startTime; long timeTaken = System.currentTimeMillis() - startTime;
emitter.emit(builder.build("query/time", timeTaken)); emitter.emit(builder.build("query/time", timeTaken));
if(creationTime > 0) {
emitter.emit(builder.build("query/wait", startTime - creationTime));
}
yielder.close(); yielder.close();
} }
}; };

View File

@ -20,6 +20,7 @@
package com.metamx.druid.query; package com.metamx.druid.query;
import com.google.common.base.Function; import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
@ -58,9 +59,12 @@ public class Queries
); );
for (PostAggregator postAgg : postAggs) { for (PostAggregator postAgg : postAggs) {
Set<String> dependencies = postAgg.getDependentFields();
Set<String> missing = Sets.difference(dependencies, combinedAggNames);
Preconditions.checkArgument( Preconditions.checkArgument(
postAgg.verifyFields(combinedAggNames), missing.isEmpty(),
String.format("Missing field[%s]", postAgg.getName()) "Missing fields [%s] for postAggregator [%s]", missing, postAgg.getName()
); );
combinedAggNames.add(postAgg.getName()); combinedAggNames.add(postAgg.getName());
} }

View File

@ -43,8 +43,8 @@ import java.util.Map;
public class TimeBoundaryQuery extends BaseQuery<Result<TimeBoundaryResultValue>> public class TimeBoundaryQuery extends BaseQuery<Result<TimeBoundaryResultValue>>
{ {
public static final Interval MY_Y2K_INTERVAL = new Interval( public static final Interval MY_Y2K_INTERVAL = new Interval(
new DateTime(Long.MIN_VALUE), new DateTime("0000-01-01"),
new DateTime(Long.MAX_VALUE) new DateTime("3000-01-01")
); );
public static final String MAX_TIME = "maxTime"; public static final String MAX_TIME = "maxTime";
public static final String MIN_TIME = "minTime"; public static final String MIN_TIME = "minTime";

View File

@ -24,6 +24,8 @@ public class MemcachedCacheBrokerBenchmark extends SimpleBenchmark
private Cache cache; private Cache cache;
private static byte[] randBytes; private static byte[] randBytes;
@Param({"localhost:11211"}) String hosts;
// object size in kB // object size in kB
@Param({"1", "5", "10", "40"}) int objectSize; @Param({"1", "5", "10", "40"}) int objectSize;
@Param({"100", "1000"}) int objectCount; @Param({"100", "1000"}) int objectCount;
@ -37,6 +39,8 @@ public class MemcachedCacheBrokerBenchmark extends SimpleBenchmark
// disable compression // disable compression
transcoder.setCompressionThreshold(Integer.MAX_VALUE); transcoder.setCompressionThreshold(Integer.MAX_VALUE);
System.out.println(String.format("Using memcached hosts [%s]", hosts));
client = new MemcachedClient( client = new MemcachedClient(
new ConnectionFactoryBuilder().setProtocol(ConnectionFactoryBuilder.Protocol.BINARY) new ConnectionFactoryBuilder().setProtocol(ConnectionFactoryBuilder.Protocol.BINARY)
.setHashAlg(DefaultHashAlgorithm.FNV1A_64_HASH) .setHashAlg(DefaultHashAlgorithm.FNV1A_64_HASH)
@ -46,7 +50,7 @@ public class MemcachedCacheBrokerBenchmark extends SimpleBenchmark
.setTranscoder(transcoder) .setTranscoder(transcoder)
.setShouldOptimize(true) .setShouldOptimize(true)
.build(), .build(),
AddrUtil.getAddresses("localhost:11211") AddrUtil.getAddresses(hosts)
); );
broker = new MemcachedCacheBroker( broker = new MemcachedCacheBroker(

View File

@ -28,7 +28,7 @@
<parent> <parent>
<groupId>com.metamx</groupId> <groupId>com.metamx</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<version>0.1.15-SNAPSHOT</version> <version>0.1.26-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>

View File

@ -20,6 +20,7 @@
package com.metamx.druid.aggregation.post; package com.metamx.druid.aggregation.post;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.metamx.common.IAE; import com.metamx.common.IAE;
import org.codehaus.jackson.annotate.JsonCreator; import org.codehaus.jackson.annotate.JsonCreator;
import org.codehaus.jackson.annotate.JsonProperty; import org.codehaus.jackson.annotate.JsonProperty;
@ -69,14 +70,13 @@ public class ArithmeticPostAggregator implements PostAggregator
} }
@Override @Override
public boolean verifyFields(Set<String> fieldNames) public Set<String> getDependentFields()
{ {
Set<String> dependentFields = Sets.newHashSet();
for (PostAggregator field : fields) { for (PostAggregator field : fields) {
if (!field.verifyFields(fieldNames)) { dependentFields.addAll(field.getDependentFields());
return false;
} }
} return dependentFields;
return true;
} }
@Override @Override

View File

@ -19,6 +19,7 @@
package com.metamx.druid.aggregation.post; package com.metamx.druid.aggregation.post;
import com.google.common.collect.Sets;
import org.codehaus.jackson.annotate.JsonCreator; import org.codehaus.jackson.annotate.JsonCreator;
import org.codehaus.jackson.annotate.JsonProperty; import org.codehaus.jackson.annotate.JsonProperty;
@ -44,9 +45,9 @@ public class ConstantPostAggregator implements PostAggregator
} }
@Override @Override
public boolean verifyFields(Set<String> fields) public Set<String> getDependentFields()
{ {
return true; return Sets.newHashSet();
} }
@Override @Override

View File

@ -19,6 +19,7 @@
package com.metamx.druid.aggregation.post; package com.metamx.druid.aggregation.post;
import com.google.common.collect.Sets;
import com.metamx.common.ISE; import com.metamx.common.ISE;
import org.codehaus.jackson.annotate.JsonCreator; import org.codehaus.jackson.annotate.JsonCreator;
import org.codehaus.jackson.annotate.JsonProperty; import org.codehaus.jackson.annotate.JsonProperty;
@ -45,9 +46,9 @@ public class FieldAccessPostAggregator implements PostAggregator
} }
@Override @Override
public boolean verifyFields(Set<String> fieldNames) public Set<String> getDependentFields()
{ {
return fieldNames.contains(fieldName); return Sets.newHashSet(fieldName);
} }
@Override @Override

View File

@ -37,7 +37,7 @@ import java.util.Set;
}) })
public interface PostAggregator public interface PostAggregator
{ {
public boolean verifyFields(Set<String> fieldNames); public Set<String> getDependentFields();
public Comparator getComparator(); public Comparator getComparator();

View File

@ -181,7 +181,7 @@ public class QueryGranularityTest
); );
} }
//@Test @Test
public void testPeriodDaylightSaving() throws Exception public void testPeriodDaylightSaving() throws Exception
{ {
final DateTimeZone tz = DateTimeZone.forID("America/Los_Angeles"); final DateTimeZone tz = DateTimeZone.forID("America/Los_Angeles");

View File

@ -24,11 +24,11 @@
<artifactId>druid-services</artifactId> <artifactId>druid-services</artifactId>
<name>druid-services</name> <name>druid-services</name>
<description>druid-services</description> <description>druid-services</description>
<version>0.1.15-SNAPSHOT</version> <version>0.1.26-SNAPSHOT</version>
<parent> <parent>
<groupId>com.metamx</groupId> <groupId>com.metamx</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<version>0.1.15-SNAPSHOT</version> <version>0.1.26-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>

View File

@ -28,7 +28,7 @@
<parent> <parent>
<groupId>com.metamx</groupId> <groupId>com.metamx</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<version>0.1.15-SNAPSHOT</version> <version>0.1.26-SNAPSHOT</version>
</parent> </parent>
<modules> <modules>

View File

@ -9,7 +9,7 @@
<parent> <parent>
<groupId>com.metamx</groupId> <groupId>com.metamx</groupId>
<artifactId>druid-examples</artifactId> <artifactId>druid-examples</artifactId>
<version>0.1.15-SNAPSHOT</version> <version>0.1.26-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>

View File

@ -9,7 +9,7 @@
<parent> <parent>
<groupId>com.metamx</groupId> <groupId>com.metamx</groupId>
<artifactId>druid-examples</artifactId> <artifactId>druid-examples</artifactId>
<version>0.1.15-SNAPSHOT</version> <version>0.1.26-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>

View File

@ -28,7 +28,7 @@
<parent> <parent>
<groupId>com.metamx</groupId> <groupId>com.metamx</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<version>0.1.15-SNAPSHOT</version> <version>0.1.26-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>

View File

@ -28,7 +28,7 @@
<parent> <parent>
<groupId>com.metamx</groupId> <groupId>com.metamx</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<version>0.1.15-SNAPSHOT</version> <version>0.1.26-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>

View File

@ -28,7 +28,7 @@
<parent> <parent>
<groupId>com.metamx</groupId> <groupId>com.metamx</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<version>0.1.15-SNAPSHOT</version> <version>0.1.26-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>

View File

@ -211,10 +211,10 @@ public class RemoteTaskRunner implements TaskRunner
} else { } else {
Duration durSinceLastTerminate = new Duration(new DateTime(), lastTerminateTime); Duration durSinceLastTerminate = new Duration(new DateTime(), lastTerminateTime);
if (durSinceLastTerminate.isLongerThan(config.getMaxScalingDuration())) { if (durSinceLastTerminate.isLongerThan(config.getMaxScalingDuration())) {
log.makeAlert( log.makeAlert("Worker node termination taking too long")
"It has been %d millis since last scheduled termination but nodes remain", .addData("millisSinceLastTerminate", durSinceLastTerminate.getMillis())
durSinceLastTerminate.getMillis() .addData("terminatingCount", currentlyTerminating.size())
).emit(); .emit();
} }
log.info( log.info(
@ -330,7 +330,9 @@ public class RemoteTaskRunner implements TaskRunner
log.info("Registering retry for failed task[%s]", task.getId()); log.info("Registering retry for failed task[%s]", task.getId());
if (retryPolicy.hasExceededRetryThreshold()) { if (retryPolicy.hasExceededRetryThreshold()) {
log.makeAlert("Task [%s] has failed[%d] times, giving up!", task.getId(), retryPolicy.getNumRetries()) log.makeAlert("Task exceeded maximum retry count")
.addData("task", task.getId())
.addData("retryCount", retryPolicy.getNumRetries())
.emit(); .emit();
return; return;
} }
@ -542,10 +544,10 @@ public class RemoteTaskRunner implements TaskRunner
} else { } else {
Duration durSinceLastProvision = new Duration(new DateTime(), lastProvisionTime); Duration durSinceLastProvision = new Duration(new DateTime(), lastProvisionTime);
if (durSinceLastProvision.isLongerThan(config.getMaxScalingDuration())) { if (durSinceLastProvision.isLongerThan(config.getMaxScalingDuration())) {
log.makeAlert( log.makeAlert("Worker node provisioning taking too long")
"It has been %d millis since last scheduled provision but nodes remain", .addData("millisSinceLastProvision", durSinceLastProvision.getMillis())
durSinceLastProvision.getMillis() .addData("provisioningCount", currentlyProvisioning.size())
).emit(); .emit();
} }
log.info( log.info(

View File

@ -20,11 +20,9 @@
package com.metamx.druid.merger.coordinator.exec; package com.metamx.druid.merger.coordinator.exec;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSet;
import com.metamx.common.lifecycle.LifecycleStart; import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.common.logger.Logger;
import com.metamx.druid.client.DataSegment; import com.metamx.druid.client.DataSegment;
import com.metamx.druid.merger.common.TaskStatus; import com.metamx.druid.merger.common.TaskStatus;
import com.metamx.druid.merger.common.task.Task; import com.metamx.druid.merger.common.task.Task;
@ -35,12 +33,9 @@ import com.metamx.druid.merger.coordinator.TaskQueue;
import com.metamx.druid.merger.coordinator.TaskRunner; import com.metamx.druid.merger.coordinator.TaskRunner;
import com.metamx.druid.merger.coordinator.VersionedTaskWrapper; import com.metamx.druid.merger.coordinator.VersionedTaskWrapper;
import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.service.AlertEvent;
import com.metamx.emitter.service.ServiceEmitter; import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceMetricEvent; import com.metamx.emitter.service.ServiceMetricEvent;
import java.util.concurrent.ExecutorService;
public class TaskConsumer implements Runnable public class TaskConsumer implements Runnable
{ {
private final TaskQueue queue; private final TaskQueue queue;
@ -242,17 +237,12 @@ public class TaskConsumer implements Runnable
emitter.emit(builder.build("indexer/segment/bytes", bytes)); emitter.emit(builder.build("indexer/segment/bytes", bytes));
if (status.isFailure()) { if (status.isFailure()) {
emitter.emit( log.makeAlert("Failed to index")
new AlertEvent.Builder().build( .addData("task", task.getId())
String.format("Failed to index: %s", task.getDataSource()), .addData("type", task.getType().toString())
ImmutableMap.<String, Object>builder() .addData("dataSource", task.getDataSource())
.put("task", task.getId()) .addData("interval", task.getInterval())
.put("type", task.getType().toString()) .emit();
.put("dataSource", task.getDataSource())
.put("interval", task.getInterval())
.build()
)
);
} }
log.info( log.info(

View File

@ -23,7 +23,7 @@
<groupId>com.metamx</groupId> <groupId>com.metamx</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<packaging>pom</packaging> <packaging>pom</packaging>
<version>0.1.15-SNAPSHOT</version> <version>0.1.26-SNAPSHOT</version>
<name>druid</name> <name>druid</name>
<description>druid</description> <description>druid</description>
<scm> <scm>

View File

@ -28,7 +28,7 @@
<parent> <parent>
<groupId>com.metamx</groupId> <groupId>com.metamx</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<version>0.1.15-SNAPSHOT</version> <version>0.1.26-SNAPSHOT</version>
</parent> </parent>
<properties> <properties>

View File

@ -28,7 +28,7 @@
<parent> <parent>
<groupId>com.metamx</groupId> <groupId>com.metamx</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<version>0.1.15-SNAPSHOT</version> <version>0.1.26-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>

View File

@ -336,7 +336,7 @@ public class ServerManager implements QuerySegmentWalker
adapter.getInterval().getStart(), adapter.getInterval().getStart(),
factory.createRunner(adapter) factory.createRunner(adapter)
) )
), ).withWaitMeasuredFromNow(),
segmentSpec segmentSpec
); );
} }

View File

@ -40,9 +40,11 @@ import com.metamx.druid.jackson.DefaultObjectMapper;
import com.metamx.druid.loading.QueryableLoaderConfig; import com.metamx.druid.loading.QueryableLoaderConfig;
import com.metamx.druid.loading.StorageAdapterLoader; import com.metamx.druid.loading.StorageAdapterLoader;
import com.metamx.druid.metrics.ServerMonitor; import com.metamx.druid.metrics.ServerMonitor;
import com.metamx.druid.query.MetricsEmittingExecutorService;
import com.metamx.druid.query.QueryRunnerFactoryConglomerate; import com.metamx.druid.query.QueryRunnerFactoryConglomerate;
import com.metamx.druid.utils.PropUtils; import com.metamx.druid.utils.PropUtils;
import com.metamx.emitter.service.ServiceEmitter; import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceMetricEvent;
import com.metamx.metrics.Monitor; import com.metamx.metrics.Monitor;
import org.codehaus.jackson.map.ObjectMapper; import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.smile.SmileFactory; import org.codehaus.jackson.smile.SmileFactory;
@ -118,13 +120,16 @@ public class ComputeNode extends BaseServerNode<ComputeNode>
final List<Monitor> monitors = getMonitors(); final List<Monitor> monitors = getMonitors();
final QueryRunnerFactoryConglomerate conglomerate = getConglomerate(); final QueryRunnerFactoryConglomerate conglomerate = getConglomerate();
final ExecutorService executorService = ExecutorServices.create( final ExecutorService executorService = new MetricsEmittingExecutorService(
ExecutorServices.create(
getLifecycle(), getLifecycle(),
getConfigFactory().buildWithReplacements( getConfigFactory().buildWithReplacements(
ExecutorServiceConfig.class, ImmutableMap.of("base_path", "druid.processing") ExecutorServiceConfig.class, ImmutableMap.of("base_path", "druid.processing")
) )
), emitter, new ServiceMetricEvent.Builder()
); );
ServerManager serverManager = new ServerManager(adapterLoader, conglomerate, emitter, executorService);
final ServerManager serverManager = new ServerManager(adapterLoader, conglomerate, emitter, executorService);
final ZkCoordinator coordinator = new ZkCoordinator( final ZkCoordinator coordinator = new ZkCoordinator(
getJsonMapper(), getJsonMapper(),

View File

@ -33,7 +33,6 @@ import com.metamx.druid.client.ServerInventoryManager;
import com.metamx.druid.coordination.DruidClusterInfo; import com.metamx.druid.coordination.DruidClusterInfo;
import com.metamx.druid.db.DatabaseRuleManager; import com.metamx.druid.db.DatabaseRuleManager;
import com.metamx.druid.db.DatabaseSegmentManager; import com.metamx.druid.db.DatabaseSegmentManager;
import com.metamx.druid.jackson.DefaultObjectMapper;
import com.metamx.druid.master.rules.Rule; import com.metamx.druid.master.rules.Rule;
import javax.annotation.Nullable; import javax.annotation.Nullable;
@ -47,6 +46,7 @@ import javax.ws.rs.PathParam;
import javax.ws.rs.Produces; import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam; import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Response; import javax.ws.rs.core.Response;
import java.util.Collections;
import java.util.Comparator; import java.util.Comparator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -556,7 +556,7 @@ public class InfoResource
return builder.entity(databaseSegmentManager.getInventory()).build(); return builder.entity(databaseSegmentManager.getInventory()).build();
} }
return builder.entity( List<String> dataSourceNames = Lists.newArrayList(
Iterables.transform( Iterables.transform(
databaseSegmentManager.getInventory(), databaseSegmentManager.getInventory(),
new Function<DruidDataSource, String>() new Function<DruidDataSource, String>()
@ -568,7 +568,11 @@ public class InfoResource
} }
} }
) )
).build(); );
Collections.sort(dataSourceNames);
return builder.entity(dataSourceNames).build();
} }
@GET @GET

View File

@ -23,6 +23,7 @@ import com.metamx.druid.client.DataSegment;
import com.metamx.druid.db.DatabaseRuleManager; import com.metamx.druid.db.DatabaseRuleManager;
import com.metamx.druid.master.rules.Rule; import com.metamx.druid.master.rules.Rule;
import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.EmittingLogger;
import org.joda.time.DateTime;
import java.util.List; import java.util.List;
@ -51,13 +52,14 @@ public class DruidMasterRuleRunner implements DruidMasterHelper
} }
// Run through all matched rules for available segments // Run through all matched rules for available segments
DateTime now = new DateTime();
DatabaseRuleManager databaseRuleManager = params.getDatabaseRuleManager(); DatabaseRuleManager databaseRuleManager = params.getDatabaseRuleManager();
for (DataSegment segment : params.getAvailableSegments()) { for (DataSegment segment : params.getAvailableSegments()) {
List<Rule> rules = databaseRuleManager.getRulesWithDefault(segment.getDataSource()); List<Rule> rules = databaseRuleManager.getRulesWithDefault(segment.getDataSource());
boolean foundMatchingRule = false; boolean foundMatchingRule = false;
for (Rule rule : rules) { for (Rule rule : rules) {
if (rule.appliesTo(segment)) { if (rule.appliesTo(segment, now)) {
stats.accumulate(rule.run(master, params, segment)); stats.accumulate(rule.run(master, params, segment));
foundMatchingRule = true; foundMatchingRule = true;
break; break;

View File

@ -24,7 +24,6 @@ import com.google.common.collect.Maps;
import com.google.common.collect.MinMaxPriorityQueue; import com.google.common.collect.MinMaxPriorityQueue;
import com.google.common.collect.Table; import com.google.common.collect.Table;
import com.metamx.druid.client.DataSegment; import com.metamx.druid.client.DataSegment;
import com.metamx.druid.client.DruidDataSource;
import com.metamx.druid.client.DruidServer; import com.metamx.druid.client.DruidServer;
import java.util.Map; import java.util.Map;
@ -37,51 +36,72 @@ public class SegmentReplicantLookup
public static SegmentReplicantLookup make(DruidCluster cluster) public static SegmentReplicantLookup make(DruidCluster cluster)
{ {
final Table<String, String, Integer> segmentsInCluster = HashBasedTable.create(); final Table<String, String, Integer> segmentsInCluster = HashBasedTable.create();
final Table<String, String, Integer> loadingSegments = HashBasedTable.create();
for (MinMaxPriorityQueue<ServerHolder> serversByType : cluster.getSortedServersByTier()) { for (MinMaxPriorityQueue<ServerHolder> serversByType : cluster.getSortedServersByTier()) {
for (ServerHolder serverHolder : serversByType) { for (ServerHolder serverHolder : serversByType) {
DruidServer server = serverHolder.getServer(); DruidServer server = serverHolder.getServer();
for (DruidDataSource dataSource : server.getDataSources()) { for (DataSegment segment : server.getSegments().values()) {
for (DataSegment segment : dataSource.getSegments()) {
Integer numReplicants = segmentsInCluster.get(segment.getIdentifier(), server.getTier()); Integer numReplicants = segmentsInCluster.get(segment.getIdentifier(), server.getTier());
if (numReplicants == null) { if (numReplicants == null) {
numReplicants = 0; numReplicants = 0;
} }
segmentsInCluster.put(segment.getIdentifier(), server.getTier(), ++numReplicants); segmentsInCluster.put(segment.getIdentifier(), server.getTier(), ++numReplicants);
} }
}
// Also account for queued segments // Also account for queued segments
for (DataSegment segment : serverHolder.getPeon().getSegmentsToLoad()) { for (DataSegment segment : serverHolder.getPeon().getSegmentsToLoad()) {
Integer numReplicants = segmentsInCluster.get(segment.getIdentifier(), server.getTier()); Integer numReplicants = loadingSegments.get(segment.getIdentifier(), server.getTier());
if (numReplicants == null) { if (numReplicants == null) {
numReplicants = 0; numReplicants = 0;
} }
segmentsInCluster.put(segment.getIdentifier(), server.getTier(), ++numReplicants); loadingSegments.put(segment.getIdentifier(), server.getTier(), ++numReplicants);
} }
} }
} }
return new SegmentReplicantLookup(segmentsInCluster); return new SegmentReplicantLookup(segmentsInCluster, loadingSegments);
} }
private final Table<String, String, Integer> table; private final Table<String, String, Integer> segmentsInCluster;
private final Table<String, String, Integer> loadingSegments;
private SegmentReplicantLookup(Table<String, String, Integer> table) private SegmentReplicantLookup(
Table<String, String, Integer> segmentsInCluster,
Table<String, String, Integer> loadingSegments
)
{ {
this.table = table; this.segmentsInCluster = segmentsInCluster;
this.loadingSegments = loadingSegments;
} }
public Map<String, Integer> getTiers(String segmentId) public Map<String, Integer> getClusterTiers(String segmentId)
{ {
Map<String, Integer> retVal = table.row(segmentId); Map<String, Integer> retVal = segmentsInCluster.row(segmentId);
return (retVal == null) ? Maps.<String, Integer>newHashMap() : retVal; return (retVal == null) ? Maps.<String, Integer>newHashMap() : retVal;
} }
public int lookup(String segmentId, String tier) public Map<String, Integer> getLoadingTiers(String segmentId)
{ {
Integer retVal = table.get(segmentId, tier); Map<String, Integer> retVal = loadingSegments.row(segmentId);
return (retVal == null) ? Maps.<String, Integer>newHashMap() : retVal;
}
public int getClusterReplicants(String segmentId, String tier)
{
Integer retVal = segmentsInCluster.get(segmentId, tier);
return (retVal == null) ? 0 : retVal; return (retVal == null) ? 0 : retVal;
} }
public int getLoadingReplicants(String segmentId, String tier)
{
Integer retVal = loadingSegments.get(segmentId, tier);
return (retVal == null) ? 0 : retVal;
}
public int getTotalReplicants(String segmentId, String tier)
{
return getClusterReplicants(segmentId, tier) + getLoadingReplicants(segmentId, tier);
}
} }

View File

@ -94,9 +94,14 @@ public class ServerHolder implements Comparable<ServerHolder>
return availableSize; return availableSize;
} }
public boolean containsSegment(DataSegment segment) public boolean isServingSegment(DataSegment segment)
{ {
return (server.getSegment(segment.getIdentifier()) != null || peon.getSegmentsToLoad().contains(segment)); return (server.getSegment(segment.getIdentifier()) != null);
}
public boolean isLoadingSegment(DataSegment segment)
{
return peon.getSegmentsToLoad().contains(segment);
} }
@Override @Override

View File

@ -22,6 +22,7 @@ package com.metamx.druid.master.rules;
import com.metamx.druid.client.DataSegment; import com.metamx.druid.client.DataSegment;
import org.codehaus.jackson.annotate.JsonCreator; import org.codehaus.jackson.annotate.JsonCreator;
import org.codehaus.jackson.annotate.JsonProperty; import org.codehaus.jackson.annotate.JsonProperty;
import org.joda.time.DateTime;
import org.joda.time.Interval; import org.joda.time.Interval;
/** /**
@ -52,7 +53,7 @@ public class IntervalDropRule extends DropRule
} }
@Override @Override
public boolean appliesTo(DataSegment segment) public boolean appliesTo(DataSegment segment, DateTime referenceTimestamp)
{ {
return interval.contains(segment.getInterval()); return interval.contains(segment.getInterval());
} }

View File

@ -23,6 +23,7 @@ import com.metamx.common.logger.Logger;
import com.metamx.druid.client.DataSegment; import com.metamx.druid.client.DataSegment;
import org.codehaus.jackson.annotate.JsonCreator; import org.codehaus.jackson.annotate.JsonCreator;
import org.codehaus.jackson.annotate.JsonProperty; import org.codehaus.jackson.annotate.JsonProperty;
import org.joda.time.DateTime;
import org.joda.time.Interval; import org.joda.time.Interval;
/** /**
@ -81,7 +82,7 @@ public class IntervalLoadRule extends LoadRule
} }
@Override @Override
public boolean appliesTo(DataSegment segment) public boolean appliesTo(DataSegment segment, DateTime referenceTimestamp)
{ {
return interval.contains(segment.getInterval()); return interval.contains(segment.getInterval());
} }

View File

@ -22,7 +22,6 @@ package com.metamx.druid.master.rules;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.MinMaxPriorityQueue; import com.google.common.collect.MinMaxPriorityQueue;
import com.metamx.common.ISE;
import com.metamx.druid.client.DataSegment; import com.metamx.druid.client.DataSegment;
import com.metamx.druid.master.DruidMaster; import com.metamx.druid.master.DruidMaster;
import com.metamx.druid.master.DruidMasterRuntimeParams; import com.metamx.druid.master.DruidMasterRuntimeParams;
@ -47,7 +46,8 @@ public abstract class LoadRule implements Rule
MasterStats stats = new MasterStats(); MasterStats stats = new MasterStats();
int expectedReplicants = getReplicants(); int expectedReplicants = getReplicants();
int actualReplicants = params.getSegmentReplicantLookup().lookup(segment.getIdentifier(), getTier()); int totalReplicants = params.getSegmentReplicantLookup().getTotalReplicants(segment.getIdentifier(), getTier());
int clusterReplicants = params.getSegmentReplicantLookup().getClusterReplicants(segment.getIdentifier(), getTier());
MinMaxPriorityQueue<ServerHolder> serverQueue = params.getDruidCluster().getServersByTier(getTier()); MinMaxPriorityQueue<ServerHolder> serverQueue = params.getDruidCluster().getServersByTier(getTier());
if (serverQueue == null) { if (serverQueue == null) {
@ -55,15 +55,15 @@ public abstract class LoadRule implements Rule
return stats; return stats;
} }
stats.accumulate(assign(expectedReplicants, actualReplicants, serverQueue, segment)); stats.accumulate(assign(expectedReplicants, totalReplicants, serverQueue, segment));
stats.accumulate(drop(expectedReplicants, actualReplicants, segment, params)); stats.accumulate(drop(expectedReplicants, clusterReplicants, segment, params));
return stats; return stats;
} }
private MasterStats assign( private MasterStats assign(
int expectedReplicants, int expectedReplicants,
int actualReplicants, int totalReplicants,
MinMaxPriorityQueue<ServerHolder> serverQueue, MinMaxPriorityQueue<ServerHolder> serverQueue,
DataSegment segment DataSegment segment
) )
@ -71,7 +71,7 @@ public abstract class LoadRule implements Rule
MasterStats stats = new MasterStats(); MasterStats stats = new MasterStats();
List<ServerHolder> assignedServers = Lists.newArrayList(); List<ServerHolder> assignedServers = Lists.newArrayList();
while (actualReplicants < expectedReplicants) { while (totalReplicants < expectedReplicants) {
ServerHolder holder = serverQueue.pollFirst(); ServerHolder holder = serverQueue.pollFirst();
if (holder == null) { if (holder == null) {
log.warn( log.warn(
@ -83,7 +83,8 @@ public abstract class LoadRule implements Rule
); );
break; break;
} }
if (holder.containsSegment(segment)) { if (holder.isServingSegment(segment) || holder.isLoadingSegment(segment)) {
assignedServers.add(holder);
continue; continue;
} }
@ -121,7 +122,7 @@ public abstract class LoadRule implements Rule
assignedServers.add(holder); assignedServers.add(holder);
stats.addToTieredStat("assignedCount", getTier(), 1); stats.addToTieredStat("assignedCount", getTier(), 1);
++actualReplicants; ++totalReplicants;
} }
serverQueue.addAll(assignedServers); serverQueue.addAll(assignedServers);
@ -130,7 +131,7 @@ public abstract class LoadRule implements Rule
private MasterStats drop( private MasterStats drop(
int expectedReplicants, int expectedReplicants,
int actualReplicants, int clusterReplicants,
DataSegment segment, DataSegment segment,
DruidMasterRuntimeParams params DruidMasterRuntimeParams params
) )
@ -142,11 +143,11 @@ public abstract class LoadRule implements Rule
} }
// Make sure we have enough actual replicants in the cluster before doing anything // Make sure we have enough actual replicants in the cluster before doing anything
if (actualReplicants < expectedReplicants) { if (clusterReplicants < expectedReplicants) {
return stats; return stats;
} }
Map<String, Integer> replicantsByType = params.getSegmentReplicantLookup().getTiers(segment.getIdentifier()); Map<String, Integer> replicantsByType = params.getSegmentReplicantLookup().getClusterTiers(segment.getIdentifier());
for (Map.Entry<String, Integer> entry : replicantsByType.entrySet()) { for (Map.Entry<String, Integer> entry : replicantsByType.entrySet()) {
String tier = entry.getKey(); String tier = entry.getKey();
@ -163,10 +164,11 @@ public abstract class LoadRule implements Rule
while (actualNumReplicantsForType > expectedNumReplicantsForType) { while (actualNumReplicantsForType > expectedNumReplicantsForType) {
ServerHolder holder = serverQueue.pollLast(); ServerHolder holder = serverQueue.pollLast();
if (holder == null) { if (holder == null) {
log.warn("Wtf, holder was null? Do I have no servers[%s]?", serverQueue); log.warn("Wtf, holder was null? I have no servers serving [%s]?", segment.getIdentifier());
continue; break;
} }
if (holder.isServingSegment(segment)) {
holder.getPeon().dropSegment( holder.getPeon().dropSegment(
segment, segment,
new LoadPeonCallback() new LoadPeonCallback()
@ -177,10 +179,11 @@ public abstract class LoadRule implements Rule
} }
} }
); );
droppedServers.add(holder);
--actualNumReplicantsForType; --actualNumReplicantsForType;
stats.addToTieredStat("droppedCount", tier, 1); stats.addToTieredStat("droppedCount", tier, 1);
} }
droppedServers.add(holder);
}
serverQueue.addAll(droppedServers); serverQueue.addAll(droppedServers);
} }

View File

@ -54,9 +54,9 @@ public class PeriodDropRule extends DropRule
} }
@Override @Override
public boolean appliesTo(DataSegment segment) public boolean appliesTo(DataSegment segment, DateTime referenceTimestamp)
{ {
final Interval currInterval = new Interval(new DateTime().minus(period), period); final Interval currInterval = new Interval(period, referenceTimestamp);
return currInterval.contains(segment.getInterval()); return currInterval.contains(segment.getInterval());
} }
} }

View File

@ -81,9 +81,9 @@ public class PeriodLoadRule extends LoadRule
} }
@Override @Override
public boolean appliesTo(DataSegment segment) public boolean appliesTo(DataSegment segment, DateTime referenceTimestamp)
{ {
final Interval currInterval = new Interval(new DateTime().minus(period), period); final Interval currInterval = new Interval(period, referenceTimestamp);
return currInterval.overlaps(segment.getInterval()); return currInterval.overlaps(segment.getInterval());
} }
} }

View File

@ -25,6 +25,7 @@ import com.metamx.druid.master.DruidMasterRuntimeParams;
import com.metamx.druid.master.MasterStats; import com.metamx.druid.master.MasterStats;
import org.codehaus.jackson.annotate.JsonSubTypes; import org.codehaus.jackson.annotate.JsonSubTypes;
import org.codehaus.jackson.annotate.JsonTypeInfo; import org.codehaus.jackson.annotate.JsonTypeInfo;
import org.joda.time.DateTime;
/** /**
*/ */
@ -40,7 +41,7 @@ public interface Rule
{ {
public String getType(); public String getType();
public boolean appliesTo(DataSegment segment); public boolean appliesTo(DataSegment segment, DateTime referenceTimestamp);
public MasterStats run(DruidMaster master, DruidMasterRuntimeParams params, DataSegment segment); public MasterStats run(DruidMaster master, DruidMasterRuntimeParams params, DataSegment segment);
} }

View File

@ -72,7 +72,7 @@ public class SegmentMetadataQueryEngine
final Indexed<String> lookup = adapter.getDimValueLookup(input); final Indexed<String> lookup = adapter.getDimValueLookup(input);
for (String dimVal : lookup) { for (String dimVal : lookup) {
ImmutableConciseSet index = adapter.getInvertedIndex(input, dimVal); ImmutableConciseSet index = adapter.getInvertedIndex(input, dimVal);
size += index.size() * Charsets.UTF_8.encode(dimVal).capacity(); size += (dimVal == null) ? 0 : index.size() * Charsets.UTF_8.encode(dimVal).capacity();
} }
return new SegmentMetadataResultValue.Dimension( return new SegmentMetadataResultValue.Dimension(
size, size,

View File

@ -31,7 +31,6 @@ import com.metamx.druid.db.DatabaseRuleManager;
import com.metamx.druid.master.rules.IntervalDropRule; import com.metamx.druid.master.rules.IntervalDropRule;
import com.metamx.druid.master.rules.IntervalLoadRule; import com.metamx.druid.master.rules.IntervalLoadRule;
import com.metamx.druid.master.rules.Rule; import com.metamx.druid.master.rules.Rule;
import com.metamx.druid.master.rules.RuleMap;
import com.metamx.druid.shard.NoneShardSpec; import com.metamx.druid.shard.NoneShardSpec;
import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.service.ServiceEmitter; import com.metamx.emitter.service.ServiceEmitter;
@ -86,14 +85,6 @@ public class DruidMasterRuleRunnerTest
} }
ruleRunner = new DruidMasterRuleRunner(master); ruleRunner = new DruidMasterRuleRunner(master);
mockPeon.loadSegment(EasyMock.<DataSegment>anyObject(), EasyMock.<LoadPeonCallback>anyObject());
EasyMock.expectLastCall().anyTimes();
mockPeon.dropSegment(EasyMock.<DataSegment>anyObject(), EasyMock.<LoadPeonCallback>anyObject());
EasyMock.expectLastCall().anyTimes();
EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.<DataSegment>newHashSet()).anyTimes();
EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(0L).atLeastOnce();
EasyMock.replay(mockPeon);
} }
@After @After
@ -113,6 +104,12 @@ public class DruidMasterRuleRunnerTest
@Test @Test
public void testRunThreeTiersOneReplicant() throws Exception public void testRunThreeTiersOneReplicant() throws Exception
{ {
mockPeon.loadSegment(EasyMock.<DataSegment>anyObject(), EasyMock.<LoadPeonCallback>anyObject());
EasyMock.expectLastCall().atLeastOnce();
EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.<DataSegment>newHashSet()).atLeastOnce();
EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(0L).atLeastOnce();
EasyMock.replay(mockPeon);
EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.<String>anyObject())).andReturn( EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.<String>anyObject())).andReturn(
Lists.<Rule>newArrayList( Lists.<Rule>newArrayList(
new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-01T06:00:00.000Z"), 1, "hot"), new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-01T06:00:00.000Z"), 1, "hot"),
@ -202,6 +199,12 @@ public class DruidMasterRuleRunnerTest
@Test @Test
public void testRunTwoTiersTwoReplicants() throws Exception public void testRunTwoTiersTwoReplicants() throws Exception
{ {
mockPeon.loadSegment(EasyMock.<DataSegment>anyObject(), EasyMock.<LoadPeonCallback>anyObject());
EasyMock.expectLastCall().atLeastOnce();
EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.<DataSegment>newHashSet()).atLeastOnce();
EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(0L).atLeastOnce();
EasyMock.replay(mockPeon);
EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.<String>anyObject())).andReturn( EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.<String>anyObject())).andReturn(
Lists.<Rule>newArrayList( Lists.<Rule>newArrayList(
new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-01T06:00:00.000Z"), 2, "hot"), new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-01T06:00:00.000Z"), 2, "hot"),
@ -284,6 +287,12 @@ public class DruidMasterRuleRunnerTest
@Test @Test
public void testRunTwoTiersWithExistingSegments() throws Exception public void testRunTwoTiersWithExistingSegments() throws Exception
{ {
mockPeon.loadSegment(EasyMock.<DataSegment>anyObject(), EasyMock.<LoadPeonCallback>anyObject());
EasyMock.expectLastCall().atLeastOnce();
EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.<DataSegment>newHashSet()).atLeastOnce();
EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(0L).atLeastOnce();
EasyMock.replay(mockPeon);
EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.<String>anyObject())).andReturn( EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.<String>anyObject())).andReturn(
Lists.<Rule>newArrayList( Lists.<Rule>newArrayList(
new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-01T12:00:00.000Z"), 1, "hot"), new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-01T12:00:00.000Z"), 1, "hot"),
@ -356,6 +365,12 @@ public class DruidMasterRuleRunnerTest
@Test @Test
public void testRunTwoTiersTierDoesNotExist() throws Exception public void testRunTwoTiersTierDoesNotExist() throws Exception
{ {
mockPeon.loadSegment(EasyMock.<DataSegment>anyObject(), EasyMock.<LoadPeonCallback>anyObject());
EasyMock.expectLastCall().atLeastOnce();
EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.<DataSegment>newHashSet()).atLeastOnce();
EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(0L).atLeastOnce();
EasyMock.replay(mockPeon);
emitter.emit(EasyMock.<ServiceEventBuilder>anyObject()); emitter.emit(EasyMock.<ServiceEventBuilder>anyObject());
EasyMock.expectLastCall().times(12); EasyMock.expectLastCall().times(12);
EasyMock.replay(emitter); EasyMock.replay(emitter);
@ -455,6 +470,12 @@ public class DruidMasterRuleRunnerTest
@Test @Test
public void testDropRemove() throws Exception public void testDropRemove() throws Exception
{ {
mockPeon.dropSegment(EasyMock.<DataSegment>anyObject(), EasyMock.<LoadPeonCallback>anyObject());
EasyMock.expectLastCall().atLeastOnce();
EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.<DataSegment>newHashSet()).atLeastOnce();
EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(0L).atLeastOnce();
EasyMock.replay(mockPeon);
master.removeSegment(EasyMock.<DataSegment>anyObject()); master.removeSegment(EasyMock.<DataSegment>anyObject());
EasyMock.expectLastCall().atLeastOnce(); EasyMock.expectLastCall().atLeastOnce();
EasyMock.replay(master); EasyMock.replay(master);
@ -513,6 +534,12 @@ public class DruidMasterRuleRunnerTest
@Test @Test
public void testDropTooManyInSameTier() throws Exception public void testDropTooManyInSameTier() throws Exception
{ {
mockPeon.dropSegment(EasyMock.<DataSegment>anyObject(), EasyMock.<LoadPeonCallback>anyObject());
EasyMock.expectLastCall().atLeastOnce();
EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.<DataSegment>newHashSet()).atLeastOnce();
EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(0L).atLeastOnce();
EasyMock.replay(mockPeon);
EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.<String>anyObject())).andReturn( EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.<String>anyObject())).andReturn(
Lists.<Rule>newArrayList( Lists.<Rule>newArrayList(
new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-01T12:00:00.000Z"), 1, "normal"), new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-01T12:00:00.000Z"), 1, "normal"),
@ -581,6 +608,14 @@ public class DruidMasterRuleRunnerTest
@Test @Test
public void testDropTooManyInDifferentTiers() throws Exception public void testDropTooManyInDifferentTiers() throws Exception
{ {
mockPeon.loadSegment(EasyMock.<DataSegment>anyObject(), EasyMock.<LoadPeonCallback>anyObject());
EasyMock.expectLastCall().atLeastOnce();
mockPeon.dropSegment(EasyMock.<DataSegment>anyObject(), EasyMock.<LoadPeonCallback>anyObject());
EasyMock.expectLastCall().atLeastOnce();
EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.<DataSegment>newHashSet()).atLeastOnce();
EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(0L).atLeastOnce();
EasyMock.replay(mockPeon);
EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.<String>anyObject())).andReturn( EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.<String>anyObject())).andReturn(
Lists.<Rule>newArrayList( Lists.<Rule>newArrayList(
new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-01T12:00:00.000Z"), 1, "hot"), new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-01T12:00:00.000Z"), 1, "hot"),
@ -653,6 +688,12 @@ public class DruidMasterRuleRunnerTest
@Test @Test
public void testDontDropInDifferentTiers() throws Exception public void testDontDropInDifferentTiers() throws Exception
{ {
mockPeon.loadSegment(EasyMock.<DataSegment>anyObject(), EasyMock.<LoadPeonCallback>anyObject());
EasyMock.expectLastCall().atLeastOnce();
EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.<DataSegment>newHashSet()).atLeastOnce();
EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(0L).atLeastOnce();
EasyMock.replay(mockPeon);
EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.<String>anyObject())).andReturn( EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.<String>anyObject())).andReturn(
Lists.<Rule>newArrayList( Lists.<Rule>newArrayList(
new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-01T12:00:00.000Z"), 1, "hot"), new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-01T12:00:00.000Z"), 1, "hot"),
@ -716,5 +757,95 @@ public class DruidMasterRuleRunnerTest
Assert.assertTrue(stats.getPerTierStats().get("droppedCount") == null); Assert.assertTrue(stats.getPerTierStats().get("droppedCount") == null);
Assert.assertTrue(stats.getGlobalStats().get("deletedCount").get() == 12); Assert.assertTrue(stats.getGlobalStats().get("deletedCount").get() == 12);
EasyMock.verify(mockPeon);
}
@Test
public void testDropServerActuallyServesSegment() throws Exception
{
EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.<String>anyObject())).andReturn(
Lists.<Rule>newArrayList(
new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-01T01:00:00.000Z"), 0, "normal")
)
).atLeastOnce();
EasyMock.replay(databaseRuleManager);
DruidServer server1 = new DruidServer(
"server1",
"host1",
1000,
"historical",
"normal"
);
server1.addDataSegment(availableSegments.get(0).getIdentifier(), availableSegments.get(0));
DruidServer server2 = new DruidServer(
"serverNorm2",
"hostNorm2",
1000,
"historical",
"normal"
);
server2.addDataSegment(availableSegments.get(1).getIdentifier(), availableSegments.get(1));
DruidServer server3 = new DruidServer(
"serverNorm3",
"hostNorm3",
1000,
"historical",
"normal"
);
server3.addDataSegment(availableSegments.get(1).getIdentifier(), availableSegments.get(1));
server3.addDataSegment(availableSegments.get(2).getIdentifier(), availableSegments.get(2));
mockPeon.dropSegment(EasyMock.<DataSegment>anyObject(), EasyMock.<LoadPeonCallback>anyObject());
EasyMock.expectLastCall().atLeastOnce();
EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.<DataSegment>newHashSet()).atLeastOnce();
EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(0L).atLeastOnce();
EasyMock.replay(mockPeon);
LoadQueuePeon anotherMockPeon = EasyMock.createMock(LoadQueuePeon.class);
EasyMock.expect(anotherMockPeon.getSegmentsToLoad()).andReturn(Sets.<DataSegment>newHashSet()).atLeastOnce();
EasyMock.expect(anotherMockPeon.getLoadQueueSize()).andReturn(10L).atLeastOnce();
EasyMock.replay(anotherMockPeon);
DruidCluster druidCluster = new DruidCluster(
ImmutableMap.of(
"normal",
MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create(
Arrays.asList(
new ServerHolder(
server1,
mockPeon
),
new ServerHolder(
server2,
anotherMockPeon
),
new ServerHolder(
server3,
anotherMockPeon
)
)
)
)
);
SegmentReplicantLookup segmentReplicantLookup = SegmentReplicantLookup.make(druidCluster);
DruidMasterRuntimeParams params = new DruidMasterRuntimeParams.Builder()
.withDruidCluster(druidCluster)
.withMillisToWaitBeforeDeleting(0L)
.withAvailableSegments(availableSegments)
.withDatabaseRuleManager(databaseRuleManager)
.withSegmentReplicantLookup(segmentReplicantLookup)
.build();
DruidMasterRuntimeParams afterParams = ruleRunner.run(params);
MasterStats stats = afterParams.getMasterStats();
Assert.assertTrue(stats.getPerTierStats().get("droppedCount").get("normal").get() == 1);
EasyMock.verify(mockPeon);
EasyMock.verify(anotherMockPeon);
} }
} }

View File

@ -33,13 +33,13 @@ public class PeriodDropRuleTest
{ {
private final static DataSegment.Builder builder = DataSegment.builder() private final static DataSegment.Builder builder = DataSegment.builder()
.dataSource("test") .dataSource("test")
.version(new DateTime().toString()) .version(new DateTime("2012-12-31T01:00:00").toString())
.shardSpec(new NoneShardSpec()); .shardSpec(new NoneShardSpec());
@Test @Test
public void testAppliesToAll() public void testAppliesToAll()
{ {
DateTime now = new DateTime(); DateTime now = new DateTime("2012-12-31T01:00:00");
PeriodDropRule rule = new PeriodDropRule( PeriodDropRule rule = new PeriodDropRule(
new Period("P5000Y") new Period("P5000Y")
); );
@ -51,13 +51,15 @@ public class PeriodDropRuleTest
now.minusDays(2), now.minusDays(2),
now.minusDays(1) now.minusDays(1)
) )
).build() ).build(),
now
) )
); );
Assert.assertTrue( Assert.assertTrue(
rule.appliesTo( rule.appliesTo(
builder.interval(new Interval(now.minusYears(100), now.minusDays(1))) builder.interval(new Interval(now.minusYears(100), now.minusDays(1)))
.build() .build(),
now
) )
); );
} }
@ -65,7 +67,7 @@ public class PeriodDropRuleTest
@Test @Test
public void testAppliesToPeriod() public void testAppliesToPeriod()
{ {
DateTime now = new DateTime(); DateTime now = new DateTime("2012-12-31T01:00:00");
PeriodDropRule rule = new PeriodDropRule( PeriodDropRule rule = new PeriodDropRule(
new Period("P1M") new Period("P1M")
); );
@ -73,19 +75,29 @@ public class PeriodDropRuleTest
Assert.assertTrue( Assert.assertTrue(
rule.appliesTo( rule.appliesTo(
builder.interval(new Interval(now.minusWeeks(1), now.minusDays(1))) builder.interval(new Interval(now.minusWeeks(1), now.minusDays(1)))
.build() .build(),
now
)
);
Assert.assertTrue(
rule.appliesTo(
builder.interval(new Interval(now.minusDays(1), now))
.build(),
now
) )
); );
Assert.assertFalse( Assert.assertFalse(
rule.appliesTo( rule.appliesTo(
builder.interval(new Interval(now.minusYears(1), now.minusDays(1))) builder.interval(new Interval(now.minusYears(1), now.minusDays(1)))
.build() .build(),
now
) )
); );
Assert.assertFalse( Assert.assertFalse(
rule.appliesTo( rule.appliesTo(
builder.interval(new Interval(now.minusMonths(2), now.minusDays(1))) builder.interval(new Interval(now.minusMonths(2), now.minusDays(1)))
.build() .build(),
now
) )
); );
} }

View File

@ -39,38 +39,41 @@ public class PeriodLoadRuleTest
@Test @Test
public void testAppliesToAll() public void testAppliesToAll()
{ {
DateTime now = new DateTime("2013-01-01");
PeriodLoadRule rule = new PeriodLoadRule( PeriodLoadRule rule = new PeriodLoadRule(
new Period("P5000Y"), new Period("P5000Y"),
0, 0,
"" ""
); );
Assert.assertTrue(rule.appliesTo(builder.interval(new Interval("2012-01-01/2012-12-31")).build())); Assert.assertTrue(rule.appliesTo(builder.interval(new Interval("2012-01-01/2012-12-31")).build(), now));
Assert.assertTrue(rule.appliesTo(builder.interval(new Interval("1000-01-01/2012-12-31")).build())); Assert.assertTrue(rule.appliesTo(builder.interval(new Interval("1000-01-01/2012-12-31")).build(), now));
Assert.assertTrue(rule.appliesTo(builder.interval(new Interval("0500-01-01/2100-12-31")).build())); Assert.assertTrue(rule.appliesTo(builder.interval(new Interval("0500-01-01/2100-12-31")).build(), now));
} }
@Test @Test
public void testAppliesToPeriod() public void testAppliesToPeriod()
{ {
DateTime now = new DateTime(); DateTime now = new DateTime("2012-12-31T01:00:00");
PeriodLoadRule rule = new PeriodLoadRule( PeriodLoadRule rule = new PeriodLoadRule(
new Period("P1M"), new Period("P1M"),
0, 0,
"" ""
); );
Assert.assertTrue(rule.appliesTo(builder.interval(new Interval(now.minusWeeks(1), now)).build())); Assert.assertTrue(rule.appliesTo(builder.interval(new Interval(now.minusWeeks(1), now)).build(), now));
Assert.assertTrue( Assert.assertTrue(
rule.appliesTo( rule.appliesTo(
builder.interval(new Interval(now.minusDays(1), now.plusDays(1))) builder.interval(new Interval(now.minusDays(1), now.plusDays(1)))
.build() .build(),
now
) )
); );
Assert.assertFalse( Assert.assertFalse(
rule.appliesTo( rule.appliesTo(
builder.interval(new Interval(now.plusDays(1), now.plusDays(2))) builder.interval(new Interval(now.plusDays(1), now.plusDays(2)))
.build() .build(),
now
) )
); );
} }