diff --git a/client/pom.xml b/client/pom.xml index 9917a339787..bf428cf9c31 100644 --- a/client/pom.xml +++ b/client/pom.xml @@ -28,7 +28,7 @@ com.metamx druid - 0.2.0-SNAPSHOT + 0.2.3-SNAPSHOT diff --git a/common/pom.xml b/common/pom.xml index b4b900311dd..25e9713082c 100644 --- a/common/pom.xml +++ b/common/pom.xml @@ -28,7 +28,7 @@ com.metamx druid - 0.2.0-SNAPSHOT + 0.2.3-SNAPSHOT diff --git a/druid-services/pom.xml b/druid-services/pom.xml index d514c32514d..0ac7ac24edc 100644 --- a/druid-services/pom.xml +++ b/druid-services/pom.xml @@ -24,11 +24,11 @@ druid-services druid-services druid-services - 0.2.0-SNAPSHOT + 0.2.3-SNAPSHOT com.metamx druid - 0.2.0-SNAPSHOT + 0.2.3-SNAPSHOT diff --git a/examples/pom.xml b/examples/pom.xml index 350660500d1..2041b23753f 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -28,7 +28,7 @@ com.metamx druid - 0.2.0-SNAPSHOT + 0.2.3-SNAPSHOT diff --git a/examples/rand/pom.xml b/examples/rand/pom.xml index 0e186d40b83..e8972cf8ae3 100644 --- a/examples/rand/pom.xml +++ b/examples/rand/pom.xml @@ -9,7 +9,7 @@ com.metamx druid-examples - 0.2.0-SNAPSHOT + 0.2.3-SNAPSHOT diff --git a/examples/twitter/pom.xml b/examples/twitter/pom.xml index f5a8cc28141..16dae001fb9 100644 --- a/examples/twitter/pom.xml +++ b/examples/twitter/pom.xml @@ -9,7 +9,7 @@ com.metamx druid-examples - 0.2.0-SNAPSHOT + 0.2.3-SNAPSHOT diff --git a/index-common/pom.xml b/index-common/pom.xml index a270b9cca22..5ca9cceea5d 100644 --- a/index-common/pom.xml +++ b/index-common/pom.xml @@ -28,7 +28,7 @@ com.metamx druid - 0.2.0-SNAPSHOT + 0.2.3-SNAPSHOT diff --git a/indexer/pom.xml b/indexer/pom.xml index caa8c0c52bf..fadfa67b486 100644 --- a/indexer/pom.xml +++ b/indexer/pom.xml @@ -28,7 +28,7 @@ com.metamx druid - 0.2.0-SNAPSHOT + 0.2.3-SNAPSHOT diff --git a/merger/pom.xml b/merger/pom.xml index 64beb8d6b84..32effdb03e1 100644 --- a/merger/pom.xml +++ b/merger/pom.xml @@ -28,7 +28,7 @@ com.metamx druid - 0.2.0-SNAPSHOT + 0.2.3-SNAPSHOT diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/RemoteTaskRunner.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/RemoteTaskRunner.java index 2e53c699e58..0892328db1c 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/RemoteTaskRunner.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/RemoteTaskRunner.java @@ -179,22 +179,19 @@ public class RemoteTaskRunner implements TaskRunner return; } - List thoseLazyWorkers = Lists.newArrayList( - FunctionalIterable - .create(zkWorkers.values()) - .filter( - new Predicate() - { - @Override - public boolean apply(WorkerWrapper input) - { - return input.getRunningTasks().isEmpty() - && System.currentTimeMillis() - input.getLastCompletedTaskTime().getMillis() - > config.getMaxWorkerIdleTimeMillisBeforeDeletion(); - } - } - ) - ); + int workerCount = 0; + List thoseLazyWorkers = Lists.newArrayList(); + for (WorkerWrapper workerWrapper : zkWorkers.values()) { + workerCount++; + + if (workerCount > workerSetupManager.getWorkerSetupData().getMinNumWorkers() && + workerWrapper.getRunningTasks().isEmpty() && + System.currentTimeMillis() - workerWrapper.getLastCompletedTaskTime().getMillis() + > config.getMaxWorkerIdleTimeMillisBeforeDeletion() + ) { + thoseLazyWorkers.add(workerWrapper); + } + } AutoScalingData terminated = strategy.terminate( Lists.transform( diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/config/RemoteTaskRunnerConfig.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/config/RemoteTaskRunnerConfig.java index c9badf7ef88..44b3a1d4c8c 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/config/RemoteTaskRunnerConfig.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/config/RemoteTaskRunnerConfig.java @@ -38,7 +38,7 @@ public abstract class RemoteTaskRunnerConfig extends IndexerZkConfig public abstract DateTime getTerminateResourcesOriginDateTime(); @Config("druid.indexer.maxWorkerIdleTimeMillisBeforeDeletion") - @Default("10000") + @Default("600000") public abstract int getMaxWorkerIdleTimeMillisBeforeDeletion(); @Config("druid.indexer.maxScalingDuration") diff --git a/pom.xml b/pom.xml index 4310add56c6..b29e612de39 100644 --- a/pom.xml +++ b/pom.xml @@ -23,7 +23,7 @@ com.metamx druid pom - 0.2.0-SNAPSHOT + 0.2.3-SNAPSHOT druid druid diff --git a/realtime/pom.xml b/realtime/pom.xml index 5cf54f122d2..a4777cb0618 100644 --- a/realtime/pom.xml +++ b/realtime/pom.xml @@ -28,7 +28,7 @@ com.metamx druid - 0.2.0-SNAPSHOT + 0.2.3-SNAPSHOT diff --git a/server/pom.xml b/server/pom.xml index 354aecaee60..22dbcc83e27 100644 --- a/server/pom.xml +++ b/server/pom.xml @@ -28,7 +28,7 @@ com.metamx druid - 0.2.0-SNAPSHOT + 0.2.3-SNAPSHOT diff --git a/server/src/main/java/com/metamx/druid/index/v1/IncrementalIndexStorageAdapter.java b/server/src/main/java/com/metamx/druid/index/v1/IncrementalIndexStorageAdapter.java index ef5ecef5d56..bf8e0ac9653 100644 --- a/server/src/main/java/com/metamx/druid/index/v1/IncrementalIndexStorageAdapter.java +++ b/server/src/main/java/com/metamx/druid/index/v1/IncrementalIndexStorageAdapter.java @@ -215,7 +215,7 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter while (baseIter.hasNext()) { currEntry.set(baseIter.next()); if (filterMatcher.matches()) { - break; + return; } numAdvanced++; diff --git a/server/src/test/java/com/metamx/druid/query/group/GroupByQueryRunnerTest.java b/server/src/test/java/com/metamx/druid/query/group/GroupByQueryRunnerTest.java index 7f630d32b96..6e86c9e1022 100644 --- a/server/src/test/java/com/metamx/druid/query/group/GroupByQueryRunnerTest.java +++ b/server/src/test/java/com/metamx/druid/query/group/GroupByQueryRunnerTest.java @@ -22,6 +22,7 @@ package com.metamx.druid.query.group; import com.google.common.base.Function; import com.google.common.base.Preconditions; import com.google.common.base.Supplier; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -41,6 +42,7 @@ import com.metamx.druid.query.dimension.DefaultDimensionSpec; import com.metamx.druid.query.dimension.DimensionSpec; import com.metamx.druid.query.segment.MultipleIntervalSegmentSpec; import org.joda.time.DateTime; +import org.joda.time.DateTimeZone; import org.joda.time.Interval; import org.joda.time.Period; import org.junit.Test; @@ -150,6 +152,70 @@ public class GroupByQueryRunnerTest TestHelper.assertExpectedObjects(expectedResults, results, ""); } + @Test + public void testGroupByWithTimeZone() { + DateTimeZone tz = DateTimeZone.forID("America/Los_Angeles"); + + GroupByQuery query = GroupByQuery.builder() + .setDataSource(QueryRunnerTestHelper.dataSource) + .setInterval("2011-03-31T00:00:00-07:00/2011-04-02T00:00:00-07:00") + .setDimensions( + Lists.newArrayList( + (DimensionSpec) new DefaultDimensionSpec( + "quality", + "alias" + ) + ) + ) + .setAggregatorSpecs( + Arrays.asList( + QueryRunnerTestHelper.rowsCount, + new LongSumAggregatorFactory( + "idx", + "index" + ) + ) + ) + .setGranularity( + new PeriodGranularity( + new Period("P1D"), + null, + tz + ) + ) + .build(); + + List expectedResults = Arrays.asList( + (Row) new MapBasedRow(new DateTime("2011-03-31", tz),ImmutableMap.of("alias", "automotive", "rows", 1L, "idx", 135L)), + (Row) new MapBasedRow(new DateTime("2011-03-31", tz),ImmutableMap.of("alias", "business", "rows", 1L, "idx", 118L)), + (Row) new MapBasedRow(new DateTime("2011-03-31", tz),ImmutableMap.of("alias", "entertainment", "rows", 1L, "idx", 158L)), + (Row) new MapBasedRow(new DateTime("2011-03-31", tz),ImmutableMap.of("alias", "health", "rows", 1L, "idx", 120L)), + (Row) new MapBasedRow(new DateTime("2011-03-31", tz),ImmutableMap.of("alias", "mezzanine", "rows", 3L, "idx", 2870L)), + (Row) new MapBasedRow(new DateTime("2011-03-31", tz),ImmutableMap.of("alias", "news", "rows", 1L, "idx", 121L)), + (Row) new MapBasedRow(new DateTime("2011-03-31", tz),ImmutableMap.of("alias", "premium", "rows", 3L, "idx", 2900L)), + (Row) new MapBasedRow(new DateTime("2011-03-31", tz),ImmutableMap.of("alias", "technology", "rows", 1L, "idx", 78L)), + (Row) new MapBasedRow(new DateTime("2011-03-31", tz),ImmutableMap.of("alias", "travel", "rows", 1L, "idx", 119L)), + + (Row) new MapBasedRow(new DateTime("2011-04-01", tz),ImmutableMap.of("alias", "automotive", "rows", 1L, "idx", 147L)), + (Row) new MapBasedRow(new DateTime("2011-04-01", tz),ImmutableMap.of("alias", "business", "rows", 1L, "idx", 112L)), + (Row) new MapBasedRow(new DateTime("2011-04-01", tz),ImmutableMap.of("alias", "entertainment", "rows", 1L, "idx", 166L)), + (Row) new MapBasedRow(new DateTime("2011-04-01", tz),ImmutableMap.of("alias", "health", "rows", 1L, "idx", 113L)), + (Row) new MapBasedRow(new DateTime("2011-04-01", tz),ImmutableMap.of("alias", "mezzanine", "rows", 3L, "idx", 2447L)), + (Row) new MapBasedRow(new DateTime("2011-04-01", tz),ImmutableMap.of("alias", "news", "rows", 1L, "idx", 114L)), + (Row) new MapBasedRow(new DateTime("2011-04-01", tz),ImmutableMap.of("alias", "premium", "rows", 3L, "idx", 2505L)), + (Row) new MapBasedRow(new DateTime("2011-04-01", tz),ImmutableMap.of("alias", "technology", "rows", 1L, "idx", 97L)), + (Row) new MapBasedRow(new DateTime("2011-04-01", tz),ImmutableMap.of("alias", "travel", "rows", 1L, "idx", 126L)) + ); + + Iterable results = Sequences.toList( + runner.run(query), + Lists.newArrayList() + ); + + TestHelper.assertExpectedObjects(expectedResults, results, ""); + } + + @Test public void testMergeResults() { GroupByQuery.Builder builder = GroupByQuery diff --git a/server/src/test/java/com/metamx/druid/query/timeseries/TimeseriesQueryRunnerBonusTest.java b/server/src/test/java/com/metamx/druid/query/timeseries/TimeseriesQueryRunnerBonusTest.java new file mode 100644 index 00000000000..91865f47c7d --- /dev/null +++ b/server/src/test/java/com/metamx/druid/query/timeseries/TimeseriesQueryRunnerBonusTest.java @@ -0,0 +1,105 @@ +package com.metamx.druid.query.timeseries; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import com.metamx.common.guava.Sequences; +import com.metamx.druid.Druids; +import com.metamx.druid.Query; +import com.metamx.druid.QueryGranularity; +import com.metamx.druid.aggregation.AggregatorFactory; +import com.metamx.druid.aggregation.CountAggregatorFactory; +import com.metamx.druid.index.IncrementalIndexSegment; +import com.metamx.druid.index.Segment; +import com.metamx.druid.index.v1.IncrementalIndex; +import com.metamx.druid.input.MapBasedInputRow; +import com.metamx.druid.query.FinalizeResultsQueryRunner; +import com.metamx.druid.query.QueryRunner; +import com.metamx.druid.query.QueryRunnerFactory; +import com.metamx.druid.result.Result; +import com.metamx.druid.result.TimeseriesResultValue; +import junit.framework.Assert; +import org.joda.time.DateTime; +import org.joda.time.Interval; +import org.junit.Test; + +import java.util.List; + +public class TimeseriesQueryRunnerBonusTest +{ + @Test + public void testOneRowAtATime() throws Exception + { + final IncrementalIndex oneRowIndex = new IncrementalIndex( + new DateTime("2012-01-01T00:00:00Z").getMillis(), QueryGranularity.NONE, new AggregatorFactory[]{} + ); + + List> results; + + oneRowIndex.add( + new MapBasedInputRow( + new DateTime("2012-01-01T00:00:00Z").getMillis(), + ImmutableList.of("dim1"), + ImmutableMap.of("dim1", "x") + ) + ); + + results = runTimeseriesCount(oneRowIndex); + + Assert.assertEquals("index size", 1, oneRowIndex.size()); + Assert.assertEquals("result size", 1, results.size()); + Assert.assertEquals("result timestamp", new DateTime("2012-01-01T00:00:00Z"), results.get(0).getTimestamp()); + Assert.assertEquals("result count metric", 1, (long) results.get(0).getValue().getLongMetric("rows")); + + oneRowIndex.add( + new MapBasedInputRow( + new DateTime("2012-01-01T00:00:00Z").getMillis(), + ImmutableList.of("dim1"), + ImmutableMap.of("dim1", "y") + ) + ); + + results = runTimeseriesCount(oneRowIndex); + + Assert.assertEquals("index size", 2, oneRowIndex.size()); + Assert.assertEquals("result size", 1, results.size()); + Assert.assertEquals("result timestamp", new DateTime("2012-01-01T00:00:00Z"), results.get(0).getTimestamp()); + Assert.assertEquals("result count metric", 2, (long) results.get(0).getValue().getLongMetric("rows")); + } + + private static List> runTimeseriesCount(IncrementalIndex index) + { + final QueryRunnerFactory factory = new TimeseriesQueryRunnerFactory(); + final QueryRunner> runner = makeQueryRunner( + factory, + new IncrementalIndexSegment(index) + ); + + TimeseriesQuery query = Druids.newTimeseriesQueryBuilder() + .dataSource("xxx") + .granularity(QueryGranularity.ALL) + .intervals(ImmutableList.of(new Interval("2012-01-01T00:00:00Z/P1D"))) + .aggregators( + ImmutableList.of( + new CountAggregatorFactory("rows") + ) + ) + .build(); + + return Sequences.toList( + runner.run(query), + Lists.>newArrayList() + ); + } + + private static QueryRunner makeQueryRunner( + QueryRunnerFactory> factory, + Segment adapter + ) + { + return new FinalizeResultsQueryRunner( + factory.createRunner(adapter), + factory.getToolchest() + ); + } +}