From 3884ebbc3ea4a04fb6f90973c383a201726b7aeb Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Mon, 15 Apr 2013 16:41:41 -0700 Subject: [PATCH 01/19] ForkingTaskRunner: Fix properties loading --- .../com/metamx/druid/merger/coordinator/ForkingTaskRunner.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/ForkingTaskRunner.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/ForkingTaskRunner.java index b6ebdf4276f..0f6c8697e69 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/ForkingTaskRunner.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/ForkingTaskRunner.java @@ -159,7 +159,7 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogProvider String.format( "-D%s=%s", propName.substring(CHILD_PROPERTY_PREFIX.length()), - System.getProperty(propName) + props.getProperty(propName) ) ); } From eb8042917778f7246c5970b583bfd574816a7244 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Mon, 15 Apr 2013 16:43:47 -0700 Subject: [PATCH 02/19] [maven-release-plugin] prepare release druid-0.3.36 --- client/pom.xml | 2 +- common/pom.xml | 2 +- druid-services/pom.xml | 4 ++-- examples/pom.xml | 2 +- examples/rand/pom.xml | 2 +- examples/twitter/pom.xml | 2 +- index-common/pom.xml | 2 +- indexer/pom.xml | 2 +- merger/pom.xml | 2 +- pom.xml | 2 +- realtime/pom.xml | 2 +- server/pom.xml | 2 +- 12 files changed, 13 insertions(+), 13 deletions(-) diff --git a/client/pom.xml b/client/pom.xml index 14dfcc2a0f5..f325797708a 100644 --- a/client/pom.xml +++ b/client/pom.xml @@ -28,7 +28,7 @@ com.metamx druid - 0.3.36-SNAPSHOT + 0.3.36 diff --git a/common/pom.xml b/common/pom.xml index 96c2f54c25f..fb09dda3988 100644 --- a/common/pom.xml +++ b/common/pom.xml @@ -28,7 +28,7 @@ com.metamx druid - 0.3.36-SNAPSHOT + 0.3.36 diff --git a/druid-services/pom.xml b/druid-services/pom.xml index d14c5a04795..5a4ec557dad 100644 --- a/druid-services/pom.xml +++ b/druid-services/pom.xml @@ -24,11 +24,11 @@ druid-services druid-services druid-services - 0.3.36-SNAPSHOT + 0.3.36 com.metamx druid - 0.3.36-SNAPSHOT + 0.3.36 diff --git a/examples/pom.xml b/examples/pom.xml index 2108ba2fb9a..583140bf22e 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -28,7 +28,7 @@ com.metamx druid - 0.3.36-SNAPSHOT + 0.3.36 diff --git a/examples/rand/pom.xml b/examples/rand/pom.xml index 528195ca434..7d9efd354d7 100644 --- a/examples/rand/pom.xml +++ b/examples/rand/pom.xml @@ -9,7 +9,7 @@ com.metamx druid-examples - 0.3.36-SNAPSHOT + 0.3.36 diff --git a/examples/twitter/pom.xml b/examples/twitter/pom.xml index 9aefee12d58..ed638dba153 100644 --- a/examples/twitter/pom.xml +++ b/examples/twitter/pom.xml @@ -9,7 +9,7 @@ com.metamx druid-examples - 0.3.36-SNAPSHOT + 0.3.36 diff --git a/index-common/pom.xml b/index-common/pom.xml index 4f8f910a07e..8513df865d0 100644 --- a/index-common/pom.xml +++ b/index-common/pom.xml @@ -28,7 +28,7 @@ com.metamx druid - 0.3.36-SNAPSHOT + 0.3.36 diff --git a/indexer/pom.xml b/indexer/pom.xml index e839dabbba0..118eed0c28d 100644 --- a/indexer/pom.xml +++ b/indexer/pom.xml @@ -28,7 +28,7 @@ com.metamx druid - 0.3.36-SNAPSHOT + 0.3.36 diff --git a/merger/pom.xml b/merger/pom.xml index 580c32aec65..f6d631740a4 100644 --- a/merger/pom.xml +++ b/merger/pom.xml @@ -28,7 +28,7 @@ com.metamx druid - 0.3.36-SNAPSHOT + 0.3.36 diff --git a/pom.xml b/pom.xml index 1b831c817eb..c9e7423aff1 100644 --- a/pom.xml +++ b/pom.xml @@ -23,7 +23,7 @@ com.metamx druid pom - 0.3.36-SNAPSHOT + 0.3.36 druid druid diff --git a/realtime/pom.xml b/realtime/pom.xml index 2149258d599..d34bcf03998 100644 --- a/realtime/pom.xml +++ b/realtime/pom.xml @@ -28,7 +28,7 @@ com.metamx druid - 0.3.36-SNAPSHOT + 0.3.36 diff --git a/server/pom.xml b/server/pom.xml index eb6455a487f..07993f6f1e2 100644 --- a/server/pom.xml +++ b/server/pom.xml @@ -28,7 +28,7 @@ com.metamx druid - 0.3.36-SNAPSHOT + 0.3.36 From 128b8730225ea6050f5ce7f3c20343ad75c69e9f Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Mon, 15 Apr 2013 16:43:53 -0700 Subject: [PATCH 03/19] [maven-release-plugin] prepare for next development iteration --- client/pom.xml | 2 +- common/pom.xml | 2 +- druid-services/pom.xml | 4 ++-- examples/pom.xml | 2 +- examples/rand/pom.xml | 2 +- examples/twitter/pom.xml | 2 +- index-common/pom.xml | 2 +- indexer/pom.xml | 2 +- merger/pom.xml | 2 +- pom.xml | 2 +- realtime/pom.xml | 2 +- server/pom.xml | 2 +- 12 files changed, 13 insertions(+), 13 deletions(-) diff --git a/client/pom.xml b/client/pom.xml index f325797708a..ece5afe2afd 100644 --- a/client/pom.xml +++ b/client/pom.xml @@ -28,7 +28,7 @@ com.metamx druid - 0.3.36 + 0.3.37-SNAPSHOT diff --git a/common/pom.xml b/common/pom.xml index fb09dda3988..0bdde9af3af 100644 --- a/common/pom.xml +++ b/common/pom.xml @@ -28,7 +28,7 @@ com.metamx druid - 0.3.36 + 0.3.37-SNAPSHOT diff --git a/druid-services/pom.xml b/druid-services/pom.xml index 5a4ec557dad..279267e0860 100644 --- a/druid-services/pom.xml +++ b/druid-services/pom.xml @@ -24,11 +24,11 @@ druid-services druid-services druid-services - 0.3.36 + 0.3.37-SNAPSHOT com.metamx druid - 0.3.36 + 0.3.37-SNAPSHOT diff --git a/examples/pom.xml b/examples/pom.xml index 583140bf22e..e7b9bed3492 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -28,7 +28,7 @@ com.metamx druid - 0.3.36 + 0.3.37-SNAPSHOT diff --git a/examples/rand/pom.xml b/examples/rand/pom.xml index 7d9efd354d7..78756dab24d 100644 --- a/examples/rand/pom.xml +++ b/examples/rand/pom.xml @@ -9,7 +9,7 @@ com.metamx druid-examples - 0.3.36 + 0.3.37-SNAPSHOT diff --git a/examples/twitter/pom.xml b/examples/twitter/pom.xml index ed638dba153..84b30034876 100644 --- a/examples/twitter/pom.xml +++ b/examples/twitter/pom.xml @@ -9,7 +9,7 @@ com.metamx druid-examples - 0.3.36 + 0.3.37-SNAPSHOT diff --git a/index-common/pom.xml b/index-common/pom.xml index 8513df865d0..cad48de1c93 100644 --- a/index-common/pom.xml +++ b/index-common/pom.xml @@ -28,7 +28,7 @@ com.metamx druid - 0.3.36 + 0.3.37-SNAPSHOT diff --git a/indexer/pom.xml b/indexer/pom.xml index 118eed0c28d..69b48632991 100644 --- a/indexer/pom.xml +++ b/indexer/pom.xml @@ -28,7 +28,7 @@ com.metamx druid - 0.3.36 + 0.3.37-SNAPSHOT diff --git a/merger/pom.xml b/merger/pom.xml index f6d631740a4..c25038ee0dc 100644 --- a/merger/pom.xml +++ b/merger/pom.xml @@ -28,7 +28,7 @@ com.metamx druid - 0.3.36 + 0.3.37-SNAPSHOT diff --git a/pom.xml b/pom.xml index c9e7423aff1..77dec29a00a 100644 --- a/pom.xml +++ b/pom.xml @@ -23,7 +23,7 @@ com.metamx druid pom - 0.3.36 + 0.3.37-SNAPSHOT druid druid diff --git a/realtime/pom.xml b/realtime/pom.xml index d34bcf03998..61a7ffb1e07 100644 --- a/realtime/pom.xml +++ b/realtime/pom.xml @@ -28,7 +28,7 @@ com.metamx druid - 0.3.36 + 0.3.37-SNAPSHOT diff --git a/server/pom.xml b/server/pom.xml index 07993f6f1e2..523b5338212 100644 --- a/server/pom.xml +++ b/server/pom.xml @@ -28,7 +28,7 @@ com.metamx druid - 0.3.36 + 0.3.37-SNAPSHOT From cafbcdb899bea90d29b964f550421b4fe0e74b0d Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Mon, 15 Apr 2013 18:15:33 -0700 Subject: [PATCH 04/19] Merger: Tweaks to task tracking --- .../ExecutorServiceTaskRunner.java | 50 ++++++++++--------- .../merger/coordinator/RemoteTaskRunner.java | 4 +- .../coordinator/TaskRunnerWorkItem.java | 14 ++++-- 3 files changed, 38 insertions(+), 30 deletions(-) diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/ExecutorServiceTaskRunner.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/ExecutorServiceTaskRunner.java index 512df6d4222..23fdf851c7d 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/ExecutorServiceTaskRunner.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/ExecutorServiceTaskRunner.java @@ -23,6 +23,8 @@ import com.google.common.base.Function; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; @@ -81,7 +83,28 @@ public class ExecutorServiceTaskRunner implements TaskRunner, QuerySegmentWalker public ListenableFuture run(final Task task) { final TaskToolbox toolbox = toolboxFactory.build(task); - return exec.submit(new ExecutorServiceTaskRunnerCallable(task, toolbox)); + final ListenableFuture statusFuture = exec.submit(new ExecutorServiceTaskRunnerCallable(task, toolbox)); + + final TaskRunnerWorkItem taskRunnerWorkItem = new TaskRunnerWorkItem(task, statusFuture, null, new DateTime()); + runningItems.add(taskRunnerWorkItem); + Futures.addCallback( + statusFuture, new FutureCallback() + { + @Override + public void onSuccess(TaskStatus result) + { + runningItems.remove(taskRunnerWorkItem); + } + + @Override + public void onFailure(Throwable t) + { + runningItems.remove(taskRunnerWorkItem); + } + } + ); + + return statusFuture; } @Override @@ -97,34 +120,13 @@ public class ExecutorServiceTaskRunner implements TaskRunner, QuerySegmentWalker @Override public Collection getRunningTasks() { - return runningItems; + return ImmutableList.copyOf(runningItems); } @Override public Collection getPendingTasks() { - if (exec instanceof ThreadPoolExecutor) { - ThreadPoolExecutor tpe = (ThreadPoolExecutor) exec; - - return Lists.newArrayList( - FunctionalIterable.create(tpe.getQueue()) - .keep( - new Function() - { - @Override - public TaskRunnerWorkItem apply(Runnable input) - { - if (input instanceof ExecutorServiceTaskRunnerCallable) { - return ((ExecutorServiceTaskRunnerCallable) input).getTaskRunnerWorkItem(); - } - return null; - } - } - ) - ); - } - - return Lists.newArrayList(); + return ImmutableList.of(); } @Override diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/RemoteTaskRunner.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/RemoteTaskRunner.java index 9284a946773..2772eba3abf 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/RemoteTaskRunner.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/RemoteTaskRunner.java @@ -573,9 +573,9 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider if (taskStatus.isComplete()) { if (taskRunnerWorkItem != null) { - final SettableFuture result = taskRunnerWorkItem.getResult(); + final ListenableFuture result = taskRunnerWorkItem.getResult(); if (result != null) { - result.set(taskStatus); + ((SettableFuture) result).set(taskStatus); } } diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/TaskRunnerWorkItem.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/TaskRunnerWorkItem.java index d799c23e303..d1b3ccbd403 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/TaskRunnerWorkItem.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/TaskRunnerWorkItem.java @@ -20,6 +20,9 @@ package com.metamx.druid.merger.coordinator; import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.ComparisonChain; +import com.google.common.collect.Ordering; +import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; import com.metamx.druid.merger.common.RetryPolicy; import com.metamx.druid.merger.common.TaskStatus; @@ -33,7 +36,7 @@ import org.joda.time.DateTimeComparator; public class TaskRunnerWorkItem implements Comparable { private final Task task; - private final SettableFuture result; + private final ListenableFuture result; private final RetryPolicy retryPolicy; private final DateTime createdTime; @@ -41,7 +44,7 @@ public class TaskRunnerWorkItem implements Comparable public TaskRunnerWorkItem( Task task, - SettableFuture result, + ListenableFuture result, RetryPolicy retryPolicy, DateTime createdTime ) @@ -58,7 +61,7 @@ public class TaskRunnerWorkItem implements Comparable return task; } - public SettableFuture getResult() + public ListenableFuture getResult() { return result; } @@ -89,7 +92,10 @@ public class TaskRunnerWorkItem implements Comparable @Override public int compareTo(TaskRunnerWorkItem taskRunnerWorkItem) { - return DateTimeComparator.getInstance().compare(createdTime, taskRunnerWorkItem.getCreatedTime()); + return ComparisonChain.start() + .compare(createdTime, taskRunnerWorkItem.getCreatedTime(), DateTimeComparator.getInstance()) + .compare(task.getId(), taskRunnerWorkItem.getTask().getId()) + .result(); } @Override From c3d229df7fe0abf9a330f21626473c926c8e2e18 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Mon, 15 Apr 2013 18:17:36 -0700 Subject: [PATCH 05/19] [maven-release-plugin] prepare release druid-0.3.37 --- client/pom.xml | 2 +- common/pom.xml | 2 +- druid-services/pom.xml | 4 ++-- examples/pom.xml | 2 +- examples/rand/pom.xml | 2 +- examples/twitter/pom.xml | 2 +- index-common/pom.xml | 2 +- indexer/pom.xml | 2 +- merger/pom.xml | 2 +- pom.xml | 2 +- realtime/pom.xml | 2 +- server/pom.xml | 2 +- 12 files changed, 13 insertions(+), 13 deletions(-) diff --git a/client/pom.xml b/client/pom.xml index ece5afe2afd..b64d8eb8f1c 100644 --- a/client/pom.xml +++ b/client/pom.xml @@ -28,7 +28,7 @@ com.metamx druid - 0.3.37-SNAPSHOT + 0.3.37 diff --git a/common/pom.xml b/common/pom.xml index 0bdde9af3af..1569bafd230 100644 --- a/common/pom.xml +++ b/common/pom.xml @@ -28,7 +28,7 @@ com.metamx druid - 0.3.37-SNAPSHOT + 0.3.37 diff --git a/druid-services/pom.xml b/druid-services/pom.xml index 279267e0860..9082e137dc7 100644 --- a/druid-services/pom.xml +++ b/druid-services/pom.xml @@ -24,11 +24,11 @@ druid-services druid-services druid-services - 0.3.37-SNAPSHOT + 0.3.37 com.metamx druid - 0.3.37-SNAPSHOT + 0.3.37 diff --git a/examples/pom.xml b/examples/pom.xml index e7b9bed3492..55fbec2f317 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -28,7 +28,7 @@ com.metamx druid - 0.3.37-SNAPSHOT + 0.3.37 diff --git a/examples/rand/pom.xml b/examples/rand/pom.xml index 78756dab24d..b6eaff41433 100644 --- a/examples/rand/pom.xml +++ b/examples/rand/pom.xml @@ -9,7 +9,7 @@ com.metamx druid-examples - 0.3.37-SNAPSHOT + 0.3.37 diff --git a/examples/twitter/pom.xml b/examples/twitter/pom.xml index 84b30034876..e2bddb202de 100644 --- a/examples/twitter/pom.xml +++ b/examples/twitter/pom.xml @@ -9,7 +9,7 @@ com.metamx druid-examples - 0.3.37-SNAPSHOT + 0.3.37 diff --git a/index-common/pom.xml b/index-common/pom.xml index cad48de1c93..2e3d07c2582 100644 --- a/index-common/pom.xml +++ b/index-common/pom.xml @@ -28,7 +28,7 @@ com.metamx druid - 0.3.37-SNAPSHOT + 0.3.37 diff --git a/indexer/pom.xml b/indexer/pom.xml index 69b48632991..3d89274c559 100644 --- a/indexer/pom.xml +++ b/indexer/pom.xml @@ -28,7 +28,7 @@ com.metamx druid - 0.3.37-SNAPSHOT + 0.3.37 diff --git a/merger/pom.xml b/merger/pom.xml index c25038ee0dc..2e80e878829 100644 --- a/merger/pom.xml +++ b/merger/pom.xml @@ -28,7 +28,7 @@ com.metamx druid - 0.3.37-SNAPSHOT + 0.3.37 diff --git a/pom.xml b/pom.xml index 77dec29a00a..b8b21a82533 100644 --- a/pom.xml +++ b/pom.xml @@ -23,7 +23,7 @@ com.metamx druid pom - 0.3.37-SNAPSHOT + 0.3.37 druid druid diff --git a/realtime/pom.xml b/realtime/pom.xml index 61a7ffb1e07..3c883b004a4 100644 --- a/realtime/pom.xml +++ b/realtime/pom.xml @@ -28,7 +28,7 @@ com.metamx druid - 0.3.37-SNAPSHOT + 0.3.37 diff --git a/server/pom.xml b/server/pom.xml index 523b5338212..a510c16d17e 100644 --- a/server/pom.xml +++ b/server/pom.xml @@ -28,7 +28,7 @@ com.metamx druid - 0.3.37-SNAPSHOT + 0.3.37 From d879c9a41ece30600280d32519b8ba92456c05da Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Mon, 15 Apr 2013 18:17:42 -0700 Subject: [PATCH 06/19] [maven-release-plugin] prepare for next development iteration --- client/pom.xml | 2 +- common/pom.xml | 2 +- druid-services/pom.xml | 4 ++-- examples/pom.xml | 2 +- examples/rand/pom.xml | 2 +- examples/twitter/pom.xml | 2 +- index-common/pom.xml | 2 +- indexer/pom.xml | 2 +- merger/pom.xml | 2 +- pom.xml | 2 +- realtime/pom.xml | 2 +- server/pom.xml | 2 +- 12 files changed, 13 insertions(+), 13 deletions(-) diff --git a/client/pom.xml b/client/pom.xml index b64d8eb8f1c..f98feb959c3 100644 --- a/client/pom.xml +++ b/client/pom.xml @@ -28,7 +28,7 @@ com.metamx druid - 0.3.37 + 0.3.38-SNAPSHOT diff --git a/common/pom.xml b/common/pom.xml index 1569bafd230..06afffef778 100644 --- a/common/pom.xml +++ b/common/pom.xml @@ -28,7 +28,7 @@ com.metamx druid - 0.3.37 + 0.3.38-SNAPSHOT diff --git a/druid-services/pom.xml b/druid-services/pom.xml index 9082e137dc7..27122bd8286 100644 --- a/druid-services/pom.xml +++ b/druid-services/pom.xml @@ -24,11 +24,11 @@ druid-services druid-services druid-services - 0.3.37 + 0.3.38-SNAPSHOT com.metamx druid - 0.3.37 + 0.3.38-SNAPSHOT diff --git a/examples/pom.xml b/examples/pom.xml index 55fbec2f317..c5a121a62d3 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -28,7 +28,7 @@ com.metamx druid - 0.3.37 + 0.3.38-SNAPSHOT diff --git a/examples/rand/pom.xml b/examples/rand/pom.xml index b6eaff41433..fbcd65a3252 100644 --- a/examples/rand/pom.xml +++ b/examples/rand/pom.xml @@ -9,7 +9,7 @@ com.metamx druid-examples - 0.3.37 + 0.3.38-SNAPSHOT diff --git a/examples/twitter/pom.xml b/examples/twitter/pom.xml index e2bddb202de..709a6f76c5a 100644 --- a/examples/twitter/pom.xml +++ b/examples/twitter/pom.xml @@ -9,7 +9,7 @@ com.metamx druid-examples - 0.3.37 + 0.3.38-SNAPSHOT diff --git a/index-common/pom.xml b/index-common/pom.xml index 2e3d07c2582..d451da5099a 100644 --- a/index-common/pom.xml +++ b/index-common/pom.xml @@ -28,7 +28,7 @@ com.metamx druid - 0.3.37 + 0.3.38-SNAPSHOT diff --git a/indexer/pom.xml b/indexer/pom.xml index 3d89274c559..0f9620c44e2 100644 --- a/indexer/pom.xml +++ b/indexer/pom.xml @@ -28,7 +28,7 @@ com.metamx druid - 0.3.37 + 0.3.38-SNAPSHOT diff --git a/merger/pom.xml b/merger/pom.xml index 2e80e878829..49346941ef7 100644 --- a/merger/pom.xml +++ b/merger/pom.xml @@ -28,7 +28,7 @@ com.metamx druid - 0.3.37 + 0.3.38-SNAPSHOT diff --git a/pom.xml b/pom.xml index b8b21a82533..2b936c950ba 100644 --- a/pom.xml +++ b/pom.xml @@ -23,7 +23,7 @@ com.metamx druid pom - 0.3.37 + 0.3.38-SNAPSHOT druid druid diff --git a/realtime/pom.xml b/realtime/pom.xml index 3c883b004a4..081e5a00b08 100644 --- a/realtime/pom.xml +++ b/realtime/pom.xml @@ -28,7 +28,7 @@ com.metamx druid - 0.3.37 + 0.3.38-SNAPSHOT diff --git a/server/pom.xml b/server/pom.xml index a510c16d17e..32b5ceb9c22 100644 --- a/server/pom.xml +++ b/server/pom.xml @@ -28,7 +28,7 @@ com.metamx druid - 0.3.37 + 0.3.38-SNAPSHOT From cdbdf843c04f7da05d21a045a9bce52e1ff4ba65 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Mon, 15 Apr 2013 18:51:01 -0700 Subject: [PATCH 07/19] Merger: Tweaks to task shutdown --- .../merger/coordinator/ForkingTaskRunner.java | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/ForkingTaskRunner.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/ForkingTaskRunner.java index 0f6c8697e69..68d8ad12abf 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/ForkingTaskRunner.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/ForkingTaskRunner.java @@ -125,11 +125,12 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogProvider // time to adjust process holders synchronized (tasks) { - if (Thread.interrupted()) { - throw new InterruptedException(); + final TaskInfo taskInfo = tasks.get(task.getId()); + + if (taskInfo.shutdown) { + throw new IllegalStateException("Task has been shut down!"); } - final TaskInfo taskInfo = tasks.get(task.getId()); if (taskInfo == null) { throw new ISE("WTF?! TaskInfo disappeared for task: %s", task.getId()); } @@ -227,11 +228,8 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogProvider return TaskStatus.failure(task.getId()); } } - catch (InterruptedException e) { - log.info(e, "Interrupted during execution"); - return TaskStatus.failure(task.getId()); - } - catch (IOException e) { + catch (Exception e) { + log.info(e, "Exception caught during execution"); throw Throwables.propagate(e); } finally { @@ -288,9 +286,9 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogProvider log.info("Ignoring request to cancel unknown task: %s", taskid); return; } - } - taskInfo.statusFuture.cancel(true); + taskInfo.shutdown = true; + } if (taskInfo.processHolder != null) { final int shutdowns = taskInfo.processHolder.shutdowns.getAndIncrement(); @@ -397,6 +395,7 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogProvider private static class TaskInfo { private final ListenableFuture statusFuture; + private volatile boolean shutdown = false; private volatile ProcessHolder processHolder = null; private TaskInfo(ListenableFuture statusFuture) From 21c2aa5793f52009e2bf3c0e5a308b57693ad22e Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Mon, 15 Apr 2013 18:55:46 -0700 Subject: [PATCH 08/19] [maven-release-plugin] prepare release druid-0.3.38 --- client/pom.xml | 2 +- common/pom.xml | 2 +- druid-services/pom.xml | 4 ++-- examples/pom.xml | 2 +- examples/rand/pom.xml | 2 +- examples/twitter/pom.xml | 2 +- index-common/pom.xml | 2 +- indexer/pom.xml | 2 +- merger/pom.xml | 2 +- pom.xml | 2 +- realtime/pom.xml | 2 +- server/pom.xml | 2 +- 12 files changed, 13 insertions(+), 13 deletions(-) diff --git a/client/pom.xml b/client/pom.xml index f98feb959c3..b7e31a0ce54 100644 --- a/client/pom.xml +++ b/client/pom.xml @@ -28,7 +28,7 @@ com.metamx druid - 0.3.38-SNAPSHOT + 0.3.38 diff --git a/common/pom.xml b/common/pom.xml index 06afffef778..6b5cea31eea 100644 --- a/common/pom.xml +++ b/common/pom.xml @@ -28,7 +28,7 @@ com.metamx druid - 0.3.38-SNAPSHOT + 0.3.38 diff --git a/druid-services/pom.xml b/druid-services/pom.xml index 27122bd8286..90bd3ae4e95 100644 --- a/druid-services/pom.xml +++ b/druid-services/pom.xml @@ -24,11 +24,11 @@ druid-services druid-services druid-services - 0.3.38-SNAPSHOT + 0.3.38 com.metamx druid - 0.3.38-SNAPSHOT + 0.3.38 diff --git a/examples/pom.xml b/examples/pom.xml index c5a121a62d3..6a3d7cf42cf 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -28,7 +28,7 @@ com.metamx druid - 0.3.38-SNAPSHOT + 0.3.38 diff --git a/examples/rand/pom.xml b/examples/rand/pom.xml index fbcd65a3252..e5dc69000ba 100644 --- a/examples/rand/pom.xml +++ b/examples/rand/pom.xml @@ -9,7 +9,7 @@ com.metamx druid-examples - 0.3.38-SNAPSHOT + 0.3.38 diff --git a/examples/twitter/pom.xml b/examples/twitter/pom.xml index 709a6f76c5a..91465475b20 100644 --- a/examples/twitter/pom.xml +++ b/examples/twitter/pom.xml @@ -9,7 +9,7 @@ com.metamx druid-examples - 0.3.38-SNAPSHOT + 0.3.38 diff --git a/index-common/pom.xml b/index-common/pom.xml index d451da5099a..0c35c32845b 100644 --- a/index-common/pom.xml +++ b/index-common/pom.xml @@ -28,7 +28,7 @@ com.metamx druid - 0.3.38-SNAPSHOT + 0.3.38 diff --git a/indexer/pom.xml b/indexer/pom.xml index 0f9620c44e2..293dd236ebe 100644 --- a/indexer/pom.xml +++ b/indexer/pom.xml @@ -28,7 +28,7 @@ com.metamx druid - 0.3.38-SNAPSHOT + 0.3.38 diff --git a/merger/pom.xml b/merger/pom.xml index 49346941ef7..5351aadf7cd 100644 --- a/merger/pom.xml +++ b/merger/pom.xml @@ -28,7 +28,7 @@ com.metamx druid - 0.3.38-SNAPSHOT + 0.3.38 diff --git a/pom.xml b/pom.xml index 2b936c950ba..350a8753613 100644 --- a/pom.xml +++ b/pom.xml @@ -23,7 +23,7 @@ com.metamx druid pom - 0.3.38-SNAPSHOT + 0.3.38 druid druid diff --git a/realtime/pom.xml b/realtime/pom.xml index 081e5a00b08..d30d09b79b8 100644 --- a/realtime/pom.xml +++ b/realtime/pom.xml @@ -28,7 +28,7 @@ com.metamx druid - 0.3.38-SNAPSHOT + 0.3.38 diff --git a/server/pom.xml b/server/pom.xml index 32b5ceb9c22..8b740626e8b 100644 --- a/server/pom.xml +++ b/server/pom.xml @@ -28,7 +28,7 @@ com.metamx druid - 0.3.38-SNAPSHOT + 0.3.38 From a166344297557ed3b41d07535b2573c04de53e21 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Mon, 15 Apr 2013 18:55:53 -0700 Subject: [PATCH 09/19] [maven-release-plugin] prepare for next development iteration --- client/pom.xml | 2 +- common/pom.xml | 2 +- druid-services/pom.xml | 4 ++-- examples/pom.xml | 2 +- examples/rand/pom.xml | 2 +- examples/twitter/pom.xml | 2 +- index-common/pom.xml | 2 +- indexer/pom.xml | 2 +- merger/pom.xml | 2 +- pom.xml | 2 +- realtime/pom.xml | 2 +- server/pom.xml | 2 +- 12 files changed, 13 insertions(+), 13 deletions(-) diff --git a/client/pom.xml b/client/pom.xml index b7e31a0ce54..ea4c4259adb 100644 --- a/client/pom.xml +++ b/client/pom.xml @@ -28,7 +28,7 @@ com.metamx druid - 0.3.38 + 0.3.39-SNAPSHOT diff --git a/common/pom.xml b/common/pom.xml index 6b5cea31eea..6e093648e2a 100644 --- a/common/pom.xml +++ b/common/pom.xml @@ -28,7 +28,7 @@ com.metamx druid - 0.3.38 + 0.3.39-SNAPSHOT diff --git a/druid-services/pom.xml b/druid-services/pom.xml index 90bd3ae4e95..399dfc51524 100644 --- a/druid-services/pom.xml +++ b/druid-services/pom.xml @@ -24,11 +24,11 @@ druid-services druid-services druid-services - 0.3.38 + 0.3.39-SNAPSHOT com.metamx druid - 0.3.38 + 0.3.39-SNAPSHOT diff --git a/examples/pom.xml b/examples/pom.xml index 6a3d7cf42cf..27b0eef39d2 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -28,7 +28,7 @@ com.metamx druid - 0.3.38 + 0.3.39-SNAPSHOT diff --git a/examples/rand/pom.xml b/examples/rand/pom.xml index e5dc69000ba..6569b377f42 100644 --- a/examples/rand/pom.xml +++ b/examples/rand/pom.xml @@ -9,7 +9,7 @@ com.metamx druid-examples - 0.3.38 + 0.3.39-SNAPSHOT diff --git a/examples/twitter/pom.xml b/examples/twitter/pom.xml index 91465475b20..f0bb40cff2d 100644 --- a/examples/twitter/pom.xml +++ b/examples/twitter/pom.xml @@ -9,7 +9,7 @@ com.metamx druid-examples - 0.3.38 + 0.3.39-SNAPSHOT diff --git a/index-common/pom.xml b/index-common/pom.xml index 0c35c32845b..1a6458a8809 100644 --- a/index-common/pom.xml +++ b/index-common/pom.xml @@ -28,7 +28,7 @@ com.metamx druid - 0.3.38 + 0.3.39-SNAPSHOT diff --git a/indexer/pom.xml b/indexer/pom.xml index 293dd236ebe..fe231aa63ca 100644 --- a/indexer/pom.xml +++ b/indexer/pom.xml @@ -28,7 +28,7 @@ com.metamx druid - 0.3.38 + 0.3.39-SNAPSHOT diff --git a/merger/pom.xml b/merger/pom.xml index 5351aadf7cd..5d3d9eebdf6 100644 --- a/merger/pom.xml +++ b/merger/pom.xml @@ -28,7 +28,7 @@ com.metamx druid - 0.3.38 + 0.3.39-SNAPSHOT diff --git a/pom.xml b/pom.xml index 350a8753613..27f4a264693 100644 --- a/pom.xml +++ b/pom.xml @@ -23,7 +23,7 @@ com.metamx druid pom - 0.3.38 + 0.3.39-SNAPSHOT druid druid diff --git a/realtime/pom.xml b/realtime/pom.xml index d30d09b79b8..f87c3cc38c6 100644 --- a/realtime/pom.xml +++ b/realtime/pom.xml @@ -28,7 +28,7 @@ com.metamx druid - 0.3.38 + 0.3.39-SNAPSHOT diff --git a/server/pom.xml b/server/pom.xml index 8b740626e8b..9fb76b0be13 100644 --- a/server/pom.xml +++ b/server/pom.xml @@ -28,7 +28,7 @@ com.metamx druid - 0.3.38 + 0.3.39-SNAPSHOT From de5622d7924360d146b8aaf1435877c11df51d05 Mon Sep 17 00:00:00 2001 From: xvrl Date: Tue, 9 Apr 2013 17:55:21 -0700 Subject: [PATCH 10/19] bump up druid version to 0.4.0 --- realtime/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/realtime/pom.xml b/realtime/pom.xml index f87c3cc38c6..0e9c8c32d02 100644 --- a/realtime/pom.xml +++ b/realtime/pom.xml @@ -28,7 +28,7 @@ com.metamx druid - 0.3.39-SNAPSHOT + 0.4.0-SNAPSHOT From 0c68bd1b1d146e3b1618ac6c502809cb4bbdcd0f Mon Sep 17 00:00:00 2001 From: xvrl Date: Tue, 9 Apr 2013 17:51:23 -0700 Subject: [PATCH 11/19] rename MetricSelectorFactory to ColumnSelectorFactory --- .../com/metamx/druid/aggregation/AggregatorFactory.java | 8 ++++---- .../metamx/druid/aggregation/CountAggregatorFactory.java | 6 +++--- .../druid/aggregation/DoubleSumAggregatorFactory.java | 6 +++--- .../druid/aggregation/HistogramAggregatorFactory.java | 6 +++--- .../druid/aggregation/JavaScriptAggregatorFactory.java | 6 +++--- .../druid/aggregation/LongSumAggregatorFactory.java | 6 +++--- .../metamx/druid/aggregation/MaxAggregatorFactory.java | 6 +++--- .../metamx/druid/aggregation/MinAggregatorFactory.java | 6 +++--- .../druid/aggregation/ToLowerCaseAggregatorFactory.java | 6 +++--- ...ricSelectorFactory.java => ColumnSelectorFactory.java} | 2 +- .../java/com/metamx/druid/index/v1/IncrementalIndex.java | 4 ++-- .../java/com/metamx/druid/index/v1/processing/Cursor.java | 4 ++-- 12 files changed, 33 insertions(+), 33 deletions(-) rename common/src/main/java/com/metamx/druid/processing/{MetricSelectorFactory.java => ColumnSelectorFactory.java} (96%) diff --git a/common/src/main/java/com/metamx/druid/aggregation/AggregatorFactory.java b/common/src/main/java/com/metamx/druid/aggregation/AggregatorFactory.java index a924187333f..400217d0bc4 100644 --- a/common/src/main/java/com/metamx/druid/aggregation/AggregatorFactory.java +++ b/common/src/main/java/com/metamx/druid/aggregation/AggregatorFactory.java @@ -21,7 +21,7 @@ package com.metamx.druid.aggregation; import com.fasterxml.jackson.annotation.JsonSubTypes; import com.fasterxml.jackson.annotation.JsonTypeInfo; -import com.metamx.druid.processing.MetricSelectorFactory; +import com.metamx.druid.processing.ColumnSelectorFactory; import java.util.Comparator; import java.util.List; @@ -29,7 +29,7 @@ import java.util.List; /** * Processing related interface * - * An AggregatorFactory is an object that knows how to generate an Aggregator using a MetricSelectorFactory. + * An AggregatorFactory is an object that knows how to generate an Aggregator using a ColumnSelectorFactory. * * This is useful as an abstraction to allow Aggregator classes to be written in terms of MetricSelector objects * without making any assumptions about how they are pulling values out of the base data. That is, the data is @@ -48,8 +48,8 @@ import java.util.List; }) public interface AggregatorFactory { - public Aggregator factorize(MetricSelectorFactory metricFactory); - public BufferAggregator factorizeBuffered(MetricSelectorFactory metricFactory); + public Aggregator factorize(ColumnSelectorFactory metricFactory); + public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory); public Comparator getComparator(); /** diff --git a/common/src/main/java/com/metamx/druid/aggregation/CountAggregatorFactory.java b/common/src/main/java/com/metamx/druid/aggregation/CountAggregatorFactory.java index 507bb7fa79f..fe7d53e9ff4 100644 --- a/common/src/main/java/com/metamx/druid/aggregation/CountAggregatorFactory.java +++ b/common/src/main/java/com/metamx/druid/aggregation/CountAggregatorFactory.java @@ -23,7 +23,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.collect.ImmutableList; import com.google.common.primitives.Longs; -import com.metamx.druid.processing.MetricSelectorFactory; +import com.metamx.druid.processing.ColumnSelectorFactory; import java.util.Comparator; import java.util.List; @@ -44,13 +44,13 @@ public class CountAggregatorFactory implements AggregatorFactory } @Override - public Aggregator factorize(MetricSelectorFactory metricFactory) + public Aggregator factorize(ColumnSelectorFactory metricFactory) { return new CountAggregator(name); } @Override - public BufferAggregator factorizeBuffered(MetricSelectorFactory metricFactory) + public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory) { return new CountBufferAggregator(); } diff --git a/common/src/main/java/com/metamx/druid/aggregation/DoubleSumAggregatorFactory.java b/common/src/main/java/com/metamx/druid/aggregation/DoubleSumAggregatorFactory.java index 1c3e2a85ada..86469a37792 100644 --- a/common/src/main/java/com/metamx/druid/aggregation/DoubleSumAggregatorFactory.java +++ b/common/src/main/java/com/metamx/druid/aggregation/DoubleSumAggregatorFactory.java @@ -22,7 +22,7 @@ package com.metamx.druid.aggregation; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.primitives.Doubles; -import com.metamx.druid.processing.MetricSelectorFactory; +import com.metamx.druid.processing.ColumnSelectorFactory; import java.nio.ByteBuffer; import java.util.Arrays; @@ -49,7 +49,7 @@ public class DoubleSumAggregatorFactory implements AggregatorFactory } @Override - public Aggregator factorize(MetricSelectorFactory metricFactory) + public Aggregator factorize(ColumnSelectorFactory metricFactory) { return new DoubleSumAggregator( name, @@ -58,7 +58,7 @@ public class DoubleSumAggregatorFactory implements AggregatorFactory } @Override - public BufferAggregator factorizeBuffered(MetricSelectorFactory metricFactory) + public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory) { return new DoubleSumBufferAggregator(metricFactory.makeFloatMetricSelector(fieldName)); } diff --git a/common/src/main/java/com/metamx/druid/aggregation/HistogramAggregatorFactory.java b/common/src/main/java/com/metamx/druid/aggregation/HistogramAggregatorFactory.java index 6a5eaa5d238..8d39cbb8935 100644 --- a/common/src/main/java/com/metamx/druid/aggregation/HistogramAggregatorFactory.java +++ b/common/src/main/java/com/metamx/druid/aggregation/HistogramAggregatorFactory.java @@ -24,7 +24,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Charsets; import com.google.common.primitives.Floats; import com.google.common.primitives.Longs; -import com.metamx.druid.processing.MetricSelectorFactory; +import com.metamx.druid.processing.ColumnSelectorFactory; import org.apache.commons.codec.binary.Base64; import java.nio.ByteBuffer; @@ -56,7 +56,7 @@ public class HistogramAggregatorFactory implements AggregatorFactory for(int i = 0; i < breaksList.size(); ++i) this.breaks[i] = breaksList.get(i); } @Override - public Aggregator factorize(MetricSelectorFactory metricFactory) + public Aggregator factorize(ColumnSelectorFactory metricFactory) { return new HistogramAggregator( name, @@ -66,7 +66,7 @@ public class HistogramAggregatorFactory implements AggregatorFactory } @Override - public BufferAggregator factorizeBuffered(MetricSelectorFactory metricFactory) + public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory) { return new HistogramBufferAggregator( metricFactory.makeFloatMetricSelector(fieldName), diff --git a/common/src/main/java/com/metamx/druid/aggregation/JavaScriptAggregatorFactory.java b/common/src/main/java/com/metamx/druid/aggregation/JavaScriptAggregatorFactory.java index 119c88b46b2..1d0bb9da863 100644 --- a/common/src/main/java/com/metamx/druid/aggregation/JavaScriptAggregatorFactory.java +++ b/common/src/main/java/com/metamx/druid/aggregation/JavaScriptAggregatorFactory.java @@ -26,7 +26,7 @@ import com.google.common.collect.Lists; import com.google.common.primitives.Doubles; import com.metamx.common.IAE; import com.metamx.druid.processing.FloatMetricSelector; -import com.metamx.druid.processing.MetricSelectorFactory; +import com.metamx.druid.processing.ColumnSelectorFactory; import org.mozilla.javascript.Context; import org.mozilla.javascript.ContextAction; @@ -66,7 +66,7 @@ public class JavaScriptAggregatorFactory implements AggregatorFactory } @Override - public Aggregator factorize(final MetricSelectorFactory metricFactory) + public Aggregator factorize(final ColumnSelectorFactory metricFactory) { return new JavaScriptAggregator( name, @@ -83,7 +83,7 @@ public class JavaScriptAggregatorFactory implements AggregatorFactory } @Override - public BufferAggregator factorizeBuffered(final MetricSelectorFactory metricFactory) + public BufferAggregator factorizeBuffered(final ColumnSelectorFactory metricFactory) { return new JavaScriptBufferAggregator( Lists.transform( diff --git a/common/src/main/java/com/metamx/druid/aggregation/LongSumAggregatorFactory.java b/common/src/main/java/com/metamx/druid/aggregation/LongSumAggregatorFactory.java index fb29296a319..d84e22041db 100644 --- a/common/src/main/java/com/metamx/druid/aggregation/LongSumAggregatorFactory.java +++ b/common/src/main/java/com/metamx/druid/aggregation/LongSumAggregatorFactory.java @@ -22,7 +22,7 @@ package com.metamx.druid.aggregation; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.primitives.Longs; -import com.metamx.druid.processing.MetricSelectorFactory; +import com.metamx.druid.processing.ColumnSelectorFactory; import java.nio.ByteBuffer; import java.util.Arrays; @@ -49,7 +49,7 @@ public class LongSumAggregatorFactory implements AggregatorFactory } @Override - public Aggregator factorize(MetricSelectorFactory metricFactory) + public Aggregator factorize(ColumnSelectorFactory metricFactory) { return new LongSumAggregator( name, @@ -58,7 +58,7 @@ public class LongSumAggregatorFactory implements AggregatorFactory } @Override - public BufferAggregator factorizeBuffered(MetricSelectorFactory metricFactory) + public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory) { return new LongSumBufferAggregator(metricFactory.makeFloatMetricSelector(fieldName)); } diff --git a/common/src/main/java/com/metamx/druid/aggregation/MaxAggregatorFactory.java b/common/src/main/java/com/metamx/druid/aggregation/MaxAggregatorFactory.java index 110ce8e6b35..fd66aeae90d 100644 --- a/common/src/main/java/com/metamx/druid/aggregation/MaxAggregatorFactory.java +++ b/common/src/main/java/com/metamx/druid/aggregation/MaxAggregatorFactory.java @@ -22,7 +22,7 @@ package com.metamx.druid.aggregation; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.primitives.Doubles; -import com.metamx.druid.processing.MetricSelectorFactory; +import com.metamx.druid.processing.ColumnSelectorFactory; import java.nio.ByteBuffer; import java.util.Arrays; @@ -49,13 +49,13 @@ public class MaxAggregatorFactory implements AggregatorFactory } @Override - public Aggregator factorize(MetricSelectorFactory metricFactory) + public Aggregator factorize(ColumnSelectorFactory metricFactory) { return new MaxAggregator(name, metricFactory.makeFloatMetricSelector(fieldName)); } @Override - public BufferAggregator factorizeBuffered(MetricSelectorFactory metricFactory) + public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory) { return new MaxBufferAggregator(metricFactory.makeFloatMetricSelector(fieldName)); } diff --git a/common/src/main/java/com/metamx/druid/aggregation/MinAggregatorFactory.java b/common/src/main/java/com/metamx/druid/aggregation/MinAggregatorFactory.java index 8012b9c2d70..ed1e82cc9b5 100644 --- a/common/src/main/java/com/metamx/druid/aggregation/MinAggregatorFactory.java +++ b/common/src/main/java/com/metamx/druid/aggregation/MinAggregatorFactory.java @@ -22,7 +22,7 @@ package com.metamx.druid.aggregation; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.primitives.Doubles; -import com.metamx.druid.processing.MetricSelectorFactory; +import com.metamx.druid.processing.ColumnSelectorFactory; import java.nio.ByteBuffer; import java.util.Arrays; @@ -49,13 +49,13 @@ public class MinAggregatorFactory implements AggregatorFactory } @Override - public Aggregator factorize(MetricSelectorFactory metricFactory) + public Aggregator factorize(ColumnSelectorFactory metricFactory) { return new MinAggregator(name, metricFactory.makeFloatMetricSelector(fieldName)); } @Override - public BufferAggregator factorizeBuffered(MetricSelectorFactory metricFactory) + public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory) { return new MinBufferAggregator(metricFactory.makeFloatMetricSelector(fieldName)); } diff --git a/common/src/main/java/com/metamx/druid/aggregation/ToLowerCaseAggregatorFactory.java b/common/src/main/java/com/metamx/druid/aggregation/ToLowerCaseAggregatorFactory.java index 6d16b10be06..fcd782a9f01 100644 --- a/common/src/main/java/com/metamx/druid/aggregation/ToLowerCaseAggregatorFactory.java +++ b/common/src/main/java/com/metamx/druid/aggregation/ToLowerCaseAggregatorFactory.java @@ -19,7 +19,7 @@ package com.metamx.druid.aggregation; -import com.metamx.druid.processing.MetricSelectorFactory; +import com.metamx.druid.processing.ColumnSelectorFactory; import java.util.Comparator; import java.util.List; @@ -36,13 +36,13 @@ public class ToLowerCaseAggregatorFactory implements AggregatorFactory } @Override - public Aggregator factorize(MetricSelectorFactory metricFactory) + public Aggregator factorize(ColumnSelectorFactory metricFactory) { return baseAggregatorFactory.factorize(metricFactory); } @Override - public BufferAggregator factorizeBuffered(MetricSelectorFactory metricFactory) + public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory) { return baseAggregatorFactory.factorizeBuffered(metricFactory); } diff --git a/common/src/main/java/com/metamx/druid/processing/MetricSelectorFactory.java b/common/src/main/java/com/metamx/druid/processing/ColumnSelectorFactory.java similarity index 96% rename from common/src/main/java/com/metamx/druid/processing/MetricSelectorFactory.java rename to common/src/main/java/com/metamx/druid/processing/ColumnSelectorFactory.java index 5864766787f..a314dceeff4 100644 --- a/common/src/main/java/com/metamx/druid/processing/MetricSelectorFactory.java +++ b/common/src/main/java/com/metamx/druid/processing/ColumnSelectorFactory.java @@ -22,7 +22,7 @@ package com.metamx.druid.processing; /** * Factory class for MetricSelectors */ -public interface MetricSelectorFactory +public interface ColumnSelectorFactory { public FloatMetricSelector makeFloatMetricSelector(String metricName); public ComplexMetricSelector makeComplexMetricSelector(String metricName); diff --git a/index-common/src/main/java/com/metamx/druid/index/v1/IncrementalIndex.java b/index-common/src/main/java/com/metamx/druid/index/v1/IncrementalIndex.java index 624d6d4b375..c368d688afe 100644 --- a/index-common/src/main/java/com/metamx/druid/index/v1/IncrementalIndex.java +++ b/index-common/src/main/java/com/metamx/druid/index/v1/IncrementalIndex.java @@ -42,9 +42,9 @@ import com.metamx.druid.index.v1.serde.ComplexMetrics; import com.metamx.druid.input.InputRow; import com.metamx.druid.input.MapBasedRow; import com.metamx.druid.input.Row; +import com.metamx.druid.processing.ColumnSelectorFactory; import com.metamx.druid.processing.ComplexMetricSelector; import com.metamx.druid.processing.FloatMetricSelector; -import com.metamx.druid.processing.MetricSelectorFactory; import org.joda.time.DateTime; import org.joda.time.Interval; @@ -169,7 +169,7 @@ public class IncrementalIndex implements Iterable for (int i = 0; i < metrics.length; ++i) { final AggregatorFactory agg = metrics[i]; aggs[i] = agg.factorize( - new MetricSelectorFactory() + new ColumnSelectorFactory() { @Override public FloatMetricSelector makeFloatMetricSelector(String metric) diff --git a/server/src/main/java/com/metamx/druid/index/v1/processing/Cursor.java b/server/src/main/java/com/metamx/druid/index/v1/processing/Cursor.java index 419d23d11ee..c304d66b0d6 100644 --- a/server/src/main/java/com/metamx/druid/index/v1/processing/Cursor.java +++ b/server/src/main/java/com/metamx/druid/index/v1/processing/Cursor.java @@ -19,12 +19,12 @@ package com.metamx.druid.index.v1.processing; -import com.metamx.druid.processing.MetricSelectorFactory; +import com.metamx.druid.processing.ColumnSelectorFactory; import org.joda.time.DateTime; /** */ -public interface Cursor extends MetricSelectorFactory, DimensionSelectorFactory +public interface Cursor extends ColumnSelectorFactory, DimensionSelectorFactory { public DateTime getTime(); public void advance(); From c83631928cbdc017b3bab4d1cb2cf3f91d641a42 Mon Sep 17 00:00:00 2001 From: xvrl Date: Thu, 11 Apr 2013 14:42:19 -0700 Subject: [PATCH 12/19] object column selectors --- .../processing/ColumnSelectorFactory.java | 1 + .../processing/ObjectColumnSelector.java | 26 ++ .../druid/index/v1/IncrementalIndex.java | 52 ++++ .../v1/IncrementalIndexStorageAdapter.java | 56 ++++ .../druid/index/v1/IndexStorageAdapter.java | 116 +++++++- .../v1/QueryableIndexStorageAdapter.java | 262 ++++++++++++++++++ 6 files changed, 507 insertions(+), 6 deletions(-) create mode 100644 common/src/main/java/com/metamx/druid/processing/ObjectColumnSelector.java diff --git a/common/src/main/java/com/metamx/druid/processing/ColumnSelectorFactory.java b/common/src/main/java/com/metamx/druid/processing/ColumnSelectorFactory.java index a314dceeff4..250ab22ab11 100644 --- a/common/src/main/java/com/metamx/druid/processing/ColumnSelectorFactory.java +++ b/common/src/main/java/com/metamx/druid/processing/ColumnSelectorFactory.java @@ -26,4 +26,5 @@ public interface ColumnSelectorFactory { public FloatMetricSelector makeFloatMetricSelector(String metricName); public ComplexMetricSelector makeComplexMetricSelector(String metricName); + public ObjectColumnSelector makeObjectColumnSelector(String columnName); } diff --git a/common/src/main/java/com/metamx/druid/processing/ObjectColumnSelector.java b/common/src/main/java/com/metamx/druid/processing/ObjectColumnSelector.java new file mode 100644 index 00000000000..b91a3e0b4d1 --- /dev/null +++ b/common/src/main/java/com/metamx/druid/processing/ObjectColumnSelector.java @@ -0,0 +1,26 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package com.metamx.druid.processing; + +public interface ObjectColumnSelector +{ + public Class classOfObject(); + public T get(); +} diff --git a/index-common/src/main/java/com/metamx/druid/index/v1/IncrementalIndex.java b/index-common/src/main/java/com/metamx/druid/index/v1/IncrementalIndex.java index c368d688afe..4ee321a368e 100644 --- a/index-common/src/main/java/com/metamx/druid/index/v1/IncrementalIndex.java +++ b/index-common/src/main/java/com/metamx/druid/index/v1/IncrementalIndex.java @@ -36,6 +36,11 @@ import com.metamx.druid.QueryGranularity; import com.metamx.druid.aggregation.Aggregator; import com.metamx.druid.aggregation.AggregatorFactory; import com.metamx.druid.aggregation.post.PostAggregator; +import com.metamx.druid.index.column.Column; +import com.metamx.druid.index.column.ComplexColumn; +import com.metamx.druid.index.column.DictionaryEncodedColumn; +import com.metamx.druid.index.column.GenericColumn; +import com.metamx.druid.index.column.ValueType; import com.metamx.druid.index.v1.serde.ComplexMetricExtractor; import com.metamx.druid.index.v1.serde.ComplexMetricSerde; import com.metamx.druid.index.v1.serde.ComplexMetrics; @@ -45,6 +50,7 @@ import com.metamx.druid.input.Row; import com.metamx.druid.processing.ColumnSelectorFactory; import com.metamx.druid.processing.ComplexMetricSelector; import com.metamx.druid.processing.FloatMetricSelector; +import com.metamx.druid.processing.ObjectColumnSelector; import org.joda.time.DateTime; import org.joda.time.Interval; @@ -189,6 +195,7 @@ public class IncrementalIndex implements Iterable public ComplexMetricSelector makeComplexMetricSelector(final String metric) { final String typeName = agg.getTypeName(); + final ComplexMetricSerde serde = ComplexMetrics.getSerdeForType(typeName); if (serde == null) { @@ -213,6 +220,51 @@ public class IncrementalIndex implements Iterable } }; } + + @Override + public ObjectColumnSelector makeObjectColumnSelector(String column) + { + final String typeName = agg.getTypeName(); + final String columnName = column.toLowerCase(); + + if(typeName.equals("float")) return new ObjectColumnSelector() + { + @Override + public Class classOfObject() + { + return Float.TYPE; + } + + @Override + public Float get() + { + return in.getFloatMetric(columnName); + } + }; + + final ComplexMetricSerde serde = ComplexMetrics.getSerdeForType(typeName); + + if (serde == null) { + throw new ISE("Don't know how to handle type[%s]", typeName); + } + + final ComplexMetricExtractor extractor = serde.getExtractor(); + + return new ObjectColumnSelector() + { + @Override + public Class classOfObject() + { + return extractor.extractedClass(); + } + + @Override + public Object get() + { + return extractor.extractValue(in, columnName); + } + }; + } } ); } diff --git a/server/src/main/java/com/metamx/druid/index/v1/IncrementalIndexStorageAdapter.java b/server/src/main/java/com/metamx/druid/index/v1/IncrementalIndexStorageAdapter.java index 8ec2cc10f52..479e93bf99e 100644 --- a/server/src/main/java/com/metamx/druid/index/v1/IncrementalIndexStorageAdapter.java +++ b/server/src/main/java/com/metamx/druid/index/v1/IncrementalIndexStorageAdapter.java @@ -43,6 +43,7 @@ import com.metamx.druid.index.v1.serde.ComplexMetrics; import com.metamx.druid.kv.IndexedInts; import com.metamx.druid.processing.ComplexMetricSelector; import com.metamx.druid.processing.FloatMetricSelector; +import com.metamx.druid.processing.ObjectColumnSelector; import com.metamx.druid.query.search.SearchHit; import com.metamx.druid.query.search.SearchQuery; import com.metamx.druid.query.search.SearchQuerySpec; @@ -359,6 +360,61 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter } }; } + + @Override + public ObjectColumnSelector makeObjectColumnSelector(String column) + { + final String columnName = column.toLowerCase(); + final Integer metricIndexInt = index.getMetricIndex(columnName); + + if(metricIndexInt != null) { + final int metricIndex = metricIndexInt; + + final ComplexMetricSerde serde = ComplexMetrics.getSerdeForType(index.getMetricType(columnName)); + + return new ObjectColumnSelector() + { + @Override + public Class classOfObject() + { + return serde.getObjectStrategy().getClazz(); + } + + @Override + public Object get() + { + return currEntry.getValue()[metricIndex].get(); + } + }; + } + + final Integer dimensionIndexInt = index.getDimensionIndex(columnName); + + if(dimensionIndexInt != null) { + final int dimensionIndex = dimensionIndexInt; + return new ObjectColumnSelector() + { + @Override + public Class classOfObject() + { + return String.class; + } + + @Override + public String get() + { + final String[] dimVals = currEntry.getKey().getDims()[dimensionIndex]; + if(dimVals.length == 1) return dimVals[0]; + if(dimVals.length == 0) return null; + throw new UnsupportedOperationException( + "makeObjectColumnSelector does not support multivalued columns" + ); + } + }; + } + + return null; + } }; } } diff --git a/server/src/main/java/com/metamx/druid/index/v1/IndexStorageAdapter.java b/server/src/main/java/com/metamx/druid/index/v1/IndexStorageAdapter.java index 2d41ce338aa..dd93d33a42f 100644 --- a/server/src/main/java/com/metamx/druid/index/v1/IndexStorageAdapter.java +++ b/server/src/main/java/com/metamx/druid/index/v1/IndexStorageAdapter.java @@ -46,11 +46,13 @@ import com.metamx.druid.kv.IndexedInts; import com.metamx.druid.kv.ListIndexed; import com.metamx.druid.processing.ComplexMetricSelector; import com.metamx.druid.processing.FloatMetricSelector; +import com.metamx.druid.processing.ObjectColumnSelector; import it.uniroma3.mat.extendedset.intset.ImmutableConciseSet; import org.joda.time.DateTime; import org.joda.time.Interval; import java.io.Closeable; +import java.util.ArrayList; import java.util.Arrays; import java.util.Iterator; import java.util.Map; @@ -148,7 +150,7 @@ public class IndexStorageAdapter extends BaseStorageAdapter ); } - final Map metricHolderCache = Maps.newHashMap(); + final Map columnHolderCache = Maps.newHashMap(); // This after call is not perfect, if there is an exception during processing, it will never get called, // but it's better than nothing and doing this properly all the time requires a lot more fixerating @@ -246,7 +248,7 @@ public class IndexStorageAdapter extends BaseStorageAdapter public FloatMetricSelector makeFloatMetricSelector(String metric) { String metricName = metric.toLowerCase(); - IndexedFloats cachedFloats = (IndexedFloats) metricHolderCache.get(metric); + IndexedFloats cachedFloats = (IndexedFloats) columnHolderCache.get(metric); if (cachedFloats == null) { MetricHolder holder = index.metricVals.get(metricName); if (holder == null) { @@ -261,7 +263,7 @@ public class IndexStorageAdapter extends BaseStorageAdapter } cachedFloats = holder.getFloatType(); - metricHolderCache.put(metricName, cachedFloats); + columnHolderCache.put(metricName, cachedFloats); } final IndexedFloats metricVals = cachedFloats; @@ -279,12 +281,13 @@ public class IndexStorageAdapter extends BaseStorageAdapter public ComplexMetricSelector makeComplexMetricSelector(String metric) { final String metricName = metric.toLowerCase(); - Indexed cachedComplex = (Indexed) metricHolderCache.get(metricName); + Indexed cachedComplex = (Indexed) columnHolderCache.get(metricName); + if (cachedComplex == null) { MetricHolder holder = index.metricVals.get(metricName); if (holder != null) { cachedComplex = holder.getComplexType(); - metricHolderCache.put(metricName, cachedComplex); + columnHolderCache.put(metricName, cachedComplex); } } @@ -308,6 +311,107 @@ public class IndexStorageAdapter extends BaseStorageAdapter } }; } + + @Override + public ObjectColumnSelector makeObjectColumnSelector(String column) + { + + final String columnName = column.toLowerCase(); + Object cachedColumn = (Indexed) columnHolderCache.get(columnName); + + if (cachedColumn == null) { + MetricHolder holder = index.metricVals.get(columnName); + final String[] nameLookup = index.reverseDimLookup.get(columnName); + + if(nameLookup != null) { + cachedColumn = index.dimensionValues.get(columnName); + } + else if(holder != null) { + final MetricHolder.MetricType type = holder.getType(); + + if (type == MetricHolder.MetricType.COMPLEX) { + cachedColumn = holder.getComplexType(); + } + else { + cachedColumn = holder.getFloatType(); + } + } + + if(cachedColumn != null) { + columnHolderCache.put(columnName, cachedColumn); + } + } + + if (cachedColumn == null) { + return null; + } + + if(cachedColumn instanceof IndexedFloats) { + final IndexedFloats vals = (IndexedFloats)cachedColumn; + return new ObjectColumnSelector() + { + @Override + public Class classOfObject() + { + return Float.TYPE; + } + + @Override + public Float get() + { + return vals.get(cursorOffset.getOffset()); + } + }; + } + + if(cachedColumn instanceof Indexed) { + final Indexed vals = (Indexed)cachedColumn; + return new ObjectColumnSelector() + { + @Override + public Class classOfObject() + { + return vals.getClazz(); + } + + @Override + public Object get() + { + return vals.get(cursorOffset.getOffset()); + } + }; + } + + if(cachedColumn instanceof DimensionColumn) { + final DimensionColumn vals = (DimensionColumn)cachedColumn; + + final String[] nameLookup = index.reverseDimLookup.get(columnName); + final int[] dimensionRowValues = vals.getDimensionRowValues(); + final int[][] dimensionExpansions = vals.getDimensionExpansions(); + + return new ObjectColumnSelector() + { + @Override + public Class classOfObject() + { + return String.class; + } + + @Override + public String get() + { + final int[] dimIds = dimensionExpansions[dimensionRowValues[cursorOffset.getOffset()]]; + if(dimIds.length == 1) return nameLookup[dimIds[0]]; + if(dimIds.length == 0) return null; + throw new UnsupportedOperationException( + "makeObjectColumnSelector does not support multivalued columns" + ); + } + }; + } + + return null; + } }; } } @@ -317,7 +421,7 @@ public class IndexStorageAdapter extends BaseStorageAdapter @Override public void run() { - for (Object object : metricHolderCache.values()) { + for (Object object : columnHolderCache.values()) { if (object instanceof Closeable) { Closeables.closeQuietly((Closeable) object); } diff --git a/server/src/main/java/com/metamx/druid/index/v1/QueryableIndexStorageAdapter.java b/server/src/main/java/com/metamx/druid/index/v1/QueryableIndexStorageAdapter.java index 1cde70b5c73..883f43fd2c1 100644 --- a/server/src/main/java/com/metamx/druid/index/v1/QueryableIndexStorageAdapter.java +++ b/server/src/main/java/com/metamx/druid/index/v1/QueryableIndexStorageAdapter.java @@ -48,10 +48,12 @@ import com.metamx.druid.kv.IndexedInts; import com.metamx.druid.kv.IndexedIterable; import com.metamx.druid.processing.ComplexMetricSelector; import com.metamx.druid.processing.FloatMetricSelector; +import com.metamx.druid.processing.ObjectColumnSelector; import it.uniroma3.mat.extendedset.intset.ImmutableConciseSet; import org.joda.time.DateTime; import org.joda.time.Interval; +import java.io.Closeable; import java.util.Iterator; import java.util.Map; @@ -254,6 +256,8 @@ public class QueryableIndexStorageAdapter extends BaseStorageAdapter final Map genericColumnCache = Maps.newHashMap(); final Map complexColumnCache = Maps.newHashMap(); + final Map objectColumnCache = Maps.newHashMap(); + final GenericColumn timestamps = index.getTimeColumn().getGenericColumn(); final FunctionalIterator retVal = FunctionalIterator @@ -465,6 +469,131 @@ public class QueryableIndexStorageAdapter extends BaseStorageAdapter } }; } + + @Override + public ObjectColumnSelector makeObjectColumnSelector(String column) + { + final String columnName = column.toLowerCase(); + + Object cachedColumnVals = objectColumnCache.get(columnName); + + if (cachedColumnVals == null) { + Column holder = index.getColumn(columnName); + + if(holder != null) { + if(holder.getCapabilities().hasMultipleValues()) { + throw new UnsupportedOperationException( + "makeObjectColumnSelector does not support multivalued columns" + ); + } + final ValueType type = holder.getCapabilities().getType(); + + if(holder.getCapabilities().isDictionaryEncoded()) { + cachedColumnVals = holder.getDictionaryEncoding(); + } + else if(type == ValueType.COMPLEX) { + cachedColumnVals = holder.getComplexColumn(); + } + else { + cachedColumnVals = holder.getGenericColumn(); + } + } + + if(cachedColumnVals != null) objectColumnCache.put(columnName, cachedColumnVals); + } + + if (cachedColumnVals == null) { + return null; + } + + if(cachedColumnVals instanceof GenericColumn) { + final GenericColumn columnVals = (GenericColumn) cachedColumnVals; + final ValueType type = columnVals.getType(); + + if(type == ValueType.FLOAT) { + return new ObjectColumnSelector() + { + @Override + public Class classOfObject() + { + return Float.TYPE; + } + + @Override + public Float get() + { + return columnVals.getFloatSingleValueRow(cursorOffset.getOffset()); + } + }; + } + if(type == ValueType.LONG) { + return new ObjectColumnSelector() + { + @Override + public Class classOfObject() + { + return Long.TYPE; + } + + @Override + public Long get() + { + return columnVals.getLongSingleValueRow(cursorOffset.getOffset()); + } + }; + } + if(type == ValueType.STRING) { + return new ObjectColumnSelector() + { + @Override + public Class classOfObject() + { + return String.class; + } + + @Override + public String get() + { + return columnVals.getStringSingleValueRow(cursorOffset.getOffset()); + } + }; + } + } + + if (cachedColumnVals instanceof DictionaryEncodedColumn) { + final DictionaryEncodedColumn columnVals = (DictionaryEncodedColumn) cachedColumnVals; + return new ObjectColumnSelector() + { + @Override + public Class classOfObject() + { + return String.class; + } + + @Override + public String get() + { + return columnVals.lookupName(columnVals.getSingleValueRow(cursorOffset.getOffset())); + } + }; + } + + final ComplexColumn columnVals = (ComplexColumn)cachedColumnVals; + return new ObjectColumnSelector() + { + @Override + public Class classOfObject() + { + return columnVals.getClazz(); + } + + @Override + public Object get() + { + return columnVals.getRowValue(cursorOffset.getOffset()); + } + }; + } }; } } @@ -486,6 +615,9 @@ public class QueryableIndexStorageAdapter extends BaseStorageAdapter for (ComplexColumn complexColumn : complexColumnCache.values()) { Closeables.closeQuietly(complexColumn); } + for (Object column : complexColumnCache.values()) { + if(column instanceof Closeable) Closeables.closeQuietly((Closeable)column); + } } } ); @@ -562,6 +694,8 @@ public class QueryableIndexStorageAdapter extends BaseStorageAdapter { final Map genericColumnCache = Maps.newHashMap(); final Map complexColumnCache = Maps.newHashMap(); + final Map objectColumnCache = Maps.newHashMap(); + final GenericColumn timestamps = index.getTimeColumn().getGenericColumn(); final FunctionalIterator retVal = FunctionalIterator @@ -769,6 +903,131 @@ public class QueryableIndexStorageAdapter extends BaseStorageAdapter } }; } + + @Override + public ObjectColumnSelector makeObjectColumnSelector(String column) + { + final String columnName = column.toLowerCase(); + + Object cachedColumnVals = objectColumnCache.get(columnName); + + if (cachedColumnVals == null) { + Column holder = index.getColumn(columnName); + + if(holder != null) { + if(holder.getCapabilities().hasMultipleValues()) { + throw new UnsupportedOperationException( + "makeObjectColumnSelector does not support multivalued columns" + ); + } + final ValueType type = holder.getCapabilities().getType(); + + if(holder.getCapabilities().isDictionaryEncoded()) { + cachedColumnVals = holder.getDictionaryEncoding(); + } + else if(type == ValueType.COMPLEX) { + cachedColumnVals = holder.getComplexColumn(); + } + else { + cachedColumnVals = holder.getGenericColumn(); + } + } + + if(cachedColumnVals != null) objectColumnCache.put(columnName, cachedColumnVals); + } + + if (cachedColumnVals == null) { + return null; + } + + if(cachedColumnVals instanceof GenericColumn) { + final GenericColumn columnVals = (GenericColumn) cachedColumnVals; + final ValueType type = columnVals.getType(); + + if(type == ValueType.FLOAT) { + return new ObjectColumnSelector() + { + @Override + public Class classOfObject() + { + return Float.TYPE; + } + + @Override + public Float get() + { + return columnVals.getFloatSingleValueRow(currRow); + } + }; + } + if(type == ValueType.LONG) { + return new ObjectColumnSelector() + { + @Override + public Class classOfObject() + { + return Long.TYPE; + } + + @Override + public Long get() + { + return columnVals.getLongSingleValueRow(currRow); + } + }; + } + if(type == ValueType.STRING) { + return new ObjectColumnSelector() + { + @Override + public Class classOfObject() + { + return String.class; + } + + @Override + public String get() + { + return columnVals.getStringSingleValueRow(currRow); + } + }; + } + } + + if (cachedColumnVals instanceof DictionaryEncodedColumn) { + final DictionaryEncodedColumn columnVals = (DictionaryEncodedColumn) cachedColumnVals; + return new ObjectColumnSelector() + { + @Override + public Class classOfObject() + { + return String.class; + } + + @Override + public String get() + { + return columnVals.lookupName(columnVals.getSingleValueRow(currRow)); + } + }; + } + + final ComplexColumn columnVals = (ComplexColumn)cachedColumnVals; + return new ObjectColumnSelector() + { + @Override + public Class classOfObject() + { + return columnVals.getClazz(); + } + + @Override + public Object get() + { + return columnVals.getRowValue(currRow); + } + }; + } }; } } @@ -788,6 +1047,9 @@ public class QueryableIndexStorageAdapter extends BaseStorageAdapter for (ComplexColumn complexColumn : complexColumnCache.values()) { Closeables.closeQuietly(complexColumn); } + for (Object column : objectColumnCache.values()) { + if(column instanceof Closeable) Closeables.closeQuietly((Closeable)column); + } } } ); From b66f69def6c55d5a8842ad36155b762d1b69c4cd Mon Sep 17 00:00:00 2001 From: xvrl Date: Wed, 20 Mar 2013 16:08:18 -0700 Subject: [PATCH 13/19] specify javascript aggregator functions individually to support getCombiningFactory() --- .../JavaScriptAggregatorFactory.java | 64 +++++++++++-------- .../aggregation/JavaScriptAggregatorTest.java | 38 +++++++---- 2 files changed, 64 insertions(+), 38 deletions(-) diff --git a/common/src/main/java/com/metamx/druid/aggregation/JavaScriptAggregatorFactory.java b/common/src/main/java/com/metamx/druid/aggregation/JavaScriptAggregatorFactory.java index 1d0bb9da863..9a594febb96 100644 --- a/common/src/main/java/com/metamx/druid/aggregation/JavaScriptAggregatorFactory.java +++ b/common/src/main/java/com/metamx/druid/aggregation/JavaScriptAggregatorFactory.java @@ -48,7 +48,11 @@ public class JavaScriptAggregatorFactory implements AggregatorFactory private final String name; private final List fieldNames; - private final String script; + private final String fnAggregate; + private final String fnReset; + private final String fnCombine; + + private final JavaScriptAggregator.ScriptAggregator compiledScript; @@ -56,13 +60,19 @@ public class JavaScriptAggregatorFactory implements AggregatorFactory public JavaScriptAggregatorFactory( @JsonProperty("name") String name, @JsonProperty("fieldNames") final List fieldNames, - @JsonProperty("script") final String expression + @JsonProperty("fnAggregate") final String fnAggregate, + @JsonProperty("fnReset") final String fnReset, + @JsonProperty("fnCombine") final String fnCombine ) { this.name = name; - this.script = expression; this.fieldNames = fieldNames; - this.compiledScript = compileScript(script); + + this.fnAggregate = fnAggregate; + this.fnReset = fnReset; + this.fnCombine = fnCombine; + + this.compiledScript = compileScript(fnAggregate, fnReset, fnCombine); } @Override @@ -116,7 +126,7 @@ public class JavaScriptAggregatorFactory implements AggregatorFactory @Override public AggregatorFactory getCombiningFactory() { - throw new UnsupportedOperationException(); + return new JavaScriptAggregatorFactory(name, Lists.newArrayList(name), fnCombine, fnReset, fnCombine); } @Override @@ -144,8 +154,21 @@ public class JavaScriptAggregatorFactory implements AggregatorFactory } @JsonProperty - public String getScript() { - return script; + public String getFnAggregate() + { + return fnAggregate; + } + + @JsonProperty + public String getFnReset() + { + return fnReset; + } + + @JsonProperty + public String getFnCombine() + { + return fnCombine; } @Override @@ -160,7 +183,7 @@ public class JavaScriptAggregatorFactory implements AggregatorFactory try { MessageDigest md = MessageDigest.getInstance("SHA-1"); byte[] fieldNameBytes = Joiner.on(",").join(fieldNames).getBytes(); - byte[] sha1 = md.digest(script.getBytes()); + byte[] sha1 = md.digest((fnAggregate+fnReset+fnCombine).getBytes()); return ByteBuffer.allocate(1 + fieldNameBytes.length + sha1.length) .put(CACHE_TYPE_ID) @@ -197,21 +220,13 @@ public class JavaScriptAggregatorFactory implements AggregatorFactory return "JavaScriptAggregatorFactory{" + "name='" + name + '\'' + ", fieldNames=" + fieldNames + - ", script='" + script + '\'' + + ", fnAggregate='" + fnAggregate + '\'' + + ", fnReset='" + fnReset + '\'' + + ", fnCombine='" + fnCombine + '\'' + '}'; } - protected static Function getScriptFunction(String name, ScriptableObject scope) - { - Object fun = scope.get(name, scope); - if (fun instanceof Function) { - return (Function) fun; - } else { - throw new IAE("Function [%s] not defined in script", name); - } - } - - public static JavaScriptAggregator.ScriptAggregator compileScript(final String script) + public static JavaScriptAggregator.ScriptAggregator compileScript(final String aggregate, final String reset, final String combine) { final ContextFactory contextFactory = ContextFactory.getGlobal(); Context context = contextFactory.enterContext(); @@ -219,12 +234,9 @@ public class JavaScriptAggregatorFactory implements AggregatorFactory final ScriptableObject scope = context.initStandardObjects(); - Script compiledScript = context.compileString(script, "script", 1, null); - compiledScript.exec(context, scope); - - final Function fnAggregate = getScriptFunction("aggregate", scope); - final Function fnReset = getScriptFunction("reset", scope); - final Function fnCombine = getScriptFunction("combine", scope); + final Function fnAggregate = context.compileFunction(scope, aggregate, "aggregate", 1, null); + final Function fnReset = context.compileFunction(scope, reset, "reset", 1, null); + final Function fnCombine = context.compileFunction(scope, combine, "combine", 1, null); Context.exit(); return new JavaScriptAggregator.ScriptAggregator() diff --git a/common/src/test/java/com/metamx/druid/aggregation/JavaScriptAggregatorTest.java b/common/src/test/java/com/metamx/druid/aggregation/JavaScriptAggregatorTest.java index bbe97070752..bb1a3a39075 100644 --- a/common/src/test/java/com/metamx/druid/aggregation/JavaScriptAggregatorTest.java +++ b/common/src/test/java/com/metamx/druid/aggregation/JavaScriptAggregatorTest.java @@ -20,25 +20,29 @@ package com.metamx.druid.aggregation; import com.google.common.collect.Lists; -import com.google.common.primitives.Doubles; +import com.google.common.collect.Maps; import com.metamx.druid.processing.FloatMetricSelector; import org.junit.Assert; import org.junit.Test; import java.nio.ByteBuffer; import java.util.Arrays; +import java.util.Map; public class JavaScriptAggregatorTest { - protected static final String sumLogATimesBPlusTen = - "function aggregate(current, a, b) { return current + (Math.log(a) * b) }" - + "function combine(a,b) { return a + b }" - + "function reset() { return 10 }"; + protected static final Map sumLogATimesBPlusTen = Maps.newHashMap(); + protected static final Map scriptDoubleSum = Maps.newHashMap(); - protected static final String scriptDoubleSum = - "function aggregate(current, a) { return current + a }" - + "function combine(a,b) { return a + b }" - + "function reset() { return 0 }"; + static { + sumLogATimesBPlusTen.put("fnAggregate", "function aggregate(current, a, b) { return current + (Math.log(a) * b) }"); + sumLogATimesBPlusTen.put("fnReset", "function reset() { return 10 }"); + sumLogATimesBPlusTen.put("fnCombine", "function combine(a,b) { return a + b }"); + + scriptDoubleSum.put("fnAggregate", "function aggregate(current, a) { return current + a }"); + scriptDoubleSum.put("fnReset", "function reset() { return 0 }"); + scriptDoubleSum.put("fnCombine", "function combine(a,b) { return a + b }"); + } private static void aggregate(TestFloatMetricSelector selector1, TestFloatMetricSelector selector2, Aggregator agg) { @@ -69,10 +73,14 @@ public class JavaScriptAggregatorTest final TestFloatMetricSelector selector1 = new TestFloatMetricSelector(new float[]{42.12f, 9f}); final TestFloatMetricSelector selector2 = new TestFloatMetricSelector(new float[]{2f, 3f}); + Map script = sumLogATimesBPlusTen; + JavaScriptAggregator agg = new JavaScriptAggregator( "billy", Arrays.asList(selector1, selector2), - JavaScriptAggregatorFactory.compileScript(sumLogATimesBPlusTen) + JavaScriptAggregatorFactory.compileScript(script.get("fnAggregate"), + script.get("fnReset"), + script.get("fnCombine")) ); agg.reset(); @@ -103,9 +111,12 @@ public class JavaScriptAggregatorTest final TestFloatMetricSelector selector1 = new TestFloatMetricSelector(new float[]{42.12f, 9f}); final TestFloatMetricSelector selector2 = new TestFloatMetricSelector(new float[]{2f, 3f}); + Map script = sumLogATimesBPlusTen; JavaScriptBufferAggregator agg = new JavaScriptBufferAggregator( Arrays.asList(selector1, selector2), - JavaScriptAggregatorFactory.compileScript(sumLogATimesBPlusTen) + JavaScriptAggregatorFactory.compileScript(script.get("fnAggregate"), + script.get("fnReset"), + script.get("fnCombine")) ); ByteBuffer buf = ByteBuffer.allocateDirect(32); @@ -150,10 +161,13 @@ public class JavaScriptAggregatorTest } */ + Map script = scriptDoubleSum; JavaScriptAggregator aggRhino = new JavaScriptAggregator( "billy", Lists.asList(selector, new FloatMetricSelector[]{}), - JavaScriptAggregatorFactory.compileScript(scriptDoubleSum) + JavaScriptAggregatorFactory.compileScript(script.get("fnAggregate"), + script.get("fnReset"), + script.get("fnCombine")) ); DoubleSumAggregator doubleAgg = new DoubleSumAggregator("billy", selector); From 66a52ed282673c5305387e93168ce56d3765f61f Mon Sep 17 00:00:00 2001 From: xvrl Date: Thu, 11 Apr 2013 14:43:47 -0700 Subject: [PATCH 14/19] javascript aggregator now uses column selector --- .../aggregation/JavaScriptAggregator.java | 10 ++--- .../JavaScriptAggregatorFactory.java | 29 ++++++------ .../JavaScriptBufferAggregator.java | 8 ++-- .../druid/processing/MetricSelectorUtils.java | 5 --- .../aggregation/JavaScriptAggregatorTest.java | 8 ++-- .../aggregation/MetricSelectorUtils.java | 45 +++++++++++++++++++ 6 files changed, 73 insertions(+), 32 deletions(-) delete mode 100644 common/src/main/java/com/metamx/druid/processing/MetricSelectorUtils.java create mode 100644 common/src/test/java/com/metamx/druid/aggregation/MetricSelectorUtils.java diff --git a/common/src/main/java/com/metamx/druid/aggregation/JavaScriptAggregator.java b/common/src/main/java/com/metamx/druid/aggregation/JavaScriptAggregator.java index dbb1a85bbb5..6ce103e2af4 100644 --- a/common/src/main/java/com/metamx/druid/aggregation/JavaScriptAggregator.java +++ b/common/src/main/java/com/metamx/druid/aggregation/JavaScriptAggregator.java @@ -20,7 +20,7 @@ package com.metamx.druid.aggregation; import com.google.common.collect.Lists; -import com.metamx.druid.processing.FloatMetricSelector; +import com.metamx.druid.processing.ObjectColumnSelector; import java.util.List; @@ -28,7 +28,7 @@ public class JavaScriptAggregator implements Aggregator { static interface ScriptAggregator { - public double aggregate(double current, FloatMetricSelector[] selectorList); + public double aggregate(double current, ObjectColumnSelector[] selectorList); public double combine(double a, double b); @@ -38,15 +38,15 @@ public class JavaScriptAggregator implements Aggregator } private final String name; - private final FloatMetricSelector[] selectorList; + private final ObjectColumnSelector[] selectorList; private final ScriptAggregator script; private volatile double current; - public JavaScriptAggregator(String name, List selectorList, ScriptAggregator script) + public JavaScriptAggregator(String name, List selectorList, ScriptAggregator script) { this.name = name; - this.selectorList = Lists.newArrayList(selectorList).toArray(new FloatMetricSelector[]{}); + this.selectorList = Lists.newArrayList(selectorList).toArray(new ObjectColumnSelector[]{}); this.script = script; this.current = script.reset(); diff --git a/common/src/main/java/com/metamx/druid/aggregation/JavaScriptAggregatorFactory.java b/common/src/main/java/com/metamx/druid/aggregation/JavaScriptAggregatorFactory.java index 9a594febb96..af68b26df42 100644 --- a/common/src/main/java/com/metamx/druid/aggregation/JavaScriptAggregatorFactory.java +++ b/common/src/main/java/com/metamx/druid/aggregation/JavaScriptAggregatorFactory.java @@ -21,18 +21,16 @@ package com.metamx.druid.aggregation; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Charsets; import com.google.common.base.Joiner; import com.google.common.collect.Lists; import com.google.common.primitives.Doubles; -import com.metamx.common.IAE; -import com.metamx.druid.processing.FloatMetricSelector; import com.metamx.druid.processing.ColumnSelectorFactory; - +import com.metamx.druid.processing.ObjectColumnSelector; import org.mozilla.javascript.Context; import org.mozilla.javascript.ContextAction; import org.mozilla.javascript.ContextFactory; import org.mozilla.javascript.Function; -import org.mozilla.javascript.Script; import org.mozilla.javascript.ScriptableObject; import javax.annotation.Nullable; @@ -76,16 +74,18 @@ public class JavaScriptAggregatorFactory implements AggregatorFactory } @Override - public Aggregator factorize(final ColumnSelectorFactory metricFactory) + public Aggregator factorize(final ColumnSelectorFactory columnFactory) { return new JavaScriptAggregator( name, Lists.transform( fieldNames, - new com.google.common.base.Function() + new com.google.common.base.Function() { @Override - public FloatMetricSelector apply(@Nullable String s) { return metricFactory.makeFloatMetricSelector(s); } + public ObjectColumnSelector apply(@Nullable String s) { + return columnFactory.makeObjectColumnSelector(s); + } } ), compiledScript @@ -93,17 +93,16 @@ public class JavaScriptAggregatorFactory implements AggregatorFactory } @Override - public BufferAggregator factorizeBuffered(final ColumnSelectorFactory metricFactory) + public BufferAggregator factorizeBuffered(final ColumnSelectorFactory columnSelectorFactory) { return new JavaScriptBufferAggregator( Lists.transform( fieldNames, - new com.google.common.base.Function() + new com.google.common.base.Function() { @Override - public FloatMetricSelector apply(@Nullable String s) - { - return metricFactory.makeFloatMetricSelector(s); + public ObjectColumnSelector apply(@Nullable String s) { + return columnSelectorFactory.makeObjectColumnSelector(s); } } ), @@ -182,8 +181,8 @@ public class JavaScriptAggregatorFactory implements AggregatorFactory { try { MessageDigest md = MessageDigest.getInstance("SHA-1"); - byte[] fieldNameBytes = Joiner.on(",").join(fieldNames).getBytes(); - byte[] sha1 = md.digest((fnAggregate+fnReset+fnCombine).getBytes()); + byte[] fieldNameBytes = Joiner.on(",").join(fieldNames).getBytes(Charsets.UTF_8); + byte[] sha1 = md.digest((fnAggregate+fnReset+fnCombine).getBytes(Charsets.UTF_8)); return ByteBuffer.allocate(1 + fieldNameBytes.length + sha1.length) .put(CACHE_TYPE_ID) @@ -242,7 +241,7 @@ public class JavaScriptAggregatorFactory implements AggregatorFactory return new JavaScriptAggregator.ScriptAggregator() { @Override - public double aggregate(final double current, final FloatMetricSelector[] selectorList) + public double aggregate(final double current, final ObjectColumnSelector[] selectorList) { Context cx = Context.getCurrentContext(); if(cx == null) cx = contextFactory.enterContext(); diff --git a/common/src/main/java/com/metamx/druid/aggregation/JavaScriptBufferAggregator.java b/common/src/main/java/com/metamx/druid/aggregation/JavaScriptBufferAggregator.java index 88f97b6a5dd..90cb171d972 100644 --- a/common/src/main/java/com/metamx/druid/aggregation/JavaScriptBufferAggregator.java +++ b/common/src/main/java/com/metamx/druid/aggregation/JavaScriptBufferAggregator.java @@ -20,22 +20,24 @@ package com.metamx.druid.aggregation; import com.google.common.collect.Lists; +import com.metamx.druid.processing.ComplexMetricSelector; import com.metamx.druid.processing.FloatMetricSelector; +import com.metamx.druid.processing.ObjectColumnSelector; import java.nio.ByteBuffer; import java.util.List; public class JavaScriptBufferAggregator implements BufferAggregator { - private final FloatMetricSelector[] selectorList; + private final ObjectColumnSelector[] selectorList; private final JavaScriptAggregator.ScriptAggregator script; public JavaScriptBufferAggregator( - List selectorList, + List selectorList, JavaScriptAggregator.ScriptAggregator script ) { - this.selectorList = Lists.newArrayList(selectorList).toArray(new FloatMetricSelector[]{}); + this.selectorList = Lists.newArrayList(selectorList).toArray(new ObjectColumnSelector[]{}); this.script = script; } diff --git a/common/src/main/java/com/metamx/druid/processing/MetricSelectorUtils.java b/common/src/main/java/com/metamx/druid/processing/MetricSelectorUtils.java deleted file mode 100644 index 0db847b70d9..00000000000 --- a/common/src/main/java/com/metamx/druid/processing/MetricSelectorUtils.java +++ /dev/null @@ -1,5 +0,0 @@ -package com.metamx.druid.processing; - -public class MetricSelectorUtils -{ -} diff --git a/common/src/test/java/com/metamx/druid/aggregation/JavaScriptAggregatorTest.java b/common/src/test/java/com/metamx/druid/aggregation/JavaScriptAggregatorTest.java index bb1a3a39075..98e4576a731 100644 --- a/common/src/test/java/com/metamx/druid/aggregation/JavaScriptAggregatorTest.java +++ b/common/src/test/java/com/metamx/druid/aggregation/JavaScriptAggregatorTest.java @@ -21,7 +21,7 @@ package com.metamx.druid.aggregation; import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import com.metamx.druid.processing.FloatMetricSelector; +import com.metamx.druid.processing.ObjectColumnSelector; import org.junit.Assert; import org.junit.Test; @@ -77,7 +77,7 @@ public class JavaScriptAggregatorTest JavaScriptAggregator agg = new JavaScriptAggregator( "billy", - Arrays.asList(selector1, selector2), + Arrays.asList(MetricSelectorUtils.wrap(selector1), MetricSelectorUtils.wrap(selector2)), JavaScriptAggregatorFactory.compileScript(script.get("fnAggregate"), script.get("fnReset"), script.get("fnCombine")) @@ -113,7 +113,7 @@ public class JavaScriptAggregatorTest Map script = sumLogATimesBPlusTen; JavaScriptBufferAggregator agg = new JavaScriptBufferAggregator( - Arrays.asList(selector1, selector2), + Arrays.asList(MetricSelectorUtils.wrap(selector1), MetricSelectorUtils.wrap(selector2)), JavaScriptAggregatorFactory.compileScript(script.get("fnAggregate"), script.get("fnReset"), script.get("fnCombine")) @@ -164,7 +164,7 @@ public class JavaScriptAggregatorTest Map script = scriptDoubleSum; JavaScriptAggregator aggRhino = new JavaScriptAggregator( "billy", - Lists.asList(selector, new FloatMetricSelector[]{}), + Lists.asList(MetricSelectorUtils.wrap(selector), new ObjectColumnSelector[]{}), JavaScriptAggregatorFactory.compileScript(script.get("fnAggregate"), script.get("fnReset"), script.get("fnCombine")) diff --git a/common/src/test/java/com/metamx/druid/aggregation/MetricSelectorUtils.java b/common/src/test/java/com/metamx/druid/aggregation/MetricSelectorUtils.java new file mode 100644 index 00000000000..da49af820e2 --- /dev/null +++ b/common/src/test/java/com/metamx/druid/aggregation/MetricSelectorUtils.java @@ -0,0 +1,45 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package com.metamx.druid.aggregation; + +import com.metamx.druid.processing.ComplexMetricSelector; +import com.metamx.druid.processing.FloatMetricSelector; +import com.metamx.druid.processing.ObjectColumnSelector; + +public class MetricSelectorUtils +{ + public static ObjectColumnSelector wrap(final FloatMetricSelector selector) + { + return new ObjectColumnSelector() + { + @Override + public Class classOfObject() + { + return Float.TYPE; + } + + @Override + public Float get() + { + return selector.get(); + } + }; + } +} From 2afe2bdeaecec09810b1a85b0e1e14eaa8291140 Mon Sep 17 00:00:00 2001 From: xvrl Date: Mon, 15 Apr 2013 09:28:32 -0700 Subject: [PATCH 15/19] formatting --- .../druid/index/v1/IncrementalIndex.java | 26 ++++++++++--------- .../v1/QueryableIndexStorageAdapter.java | 18 ++++++++----- 2 files changed, 26 insertions(+), 18 deletions(-) diff --git a/index-common/src/main/java/com/metamx/druid/index/v1/IncrementalIndex.java b/index-common/src/main/java/com/metamx/druid/index/v1/IncrementalIndex.java index 4ee321a368e..cde0c61529b 100644 --- a/index-common/src/main/java/com/metamx/druid/index/v1/IncrementalIndex.java +++ b/index-common/src/main/java/com/metamx/druid/index/v1/IncrementalIndex.java @@ -227,20 +227,22 @@ public class IncrementalIndex implements Iterable final String typeName = agg.getTypeName(); final String columnName = column.toLowerCase(); - if(typeName.equals("float")) return new ObjectColumnSelector() - { - @Override - public Class classOfObject() + if(typeName.equals("float")) { + return new ObjectColumnSelector() { - return Float.TYPE; - } + @Override + public Class classOfObject() + { + return Float.TYPE; + } - @Override - public Float get() - { - return in.getFloatMetric(columnName); - } - }; + @Override + public Float get() + { + return in.getFloatMetric(columnName); + } + }; + } final ComplexMetricSerde serde = ComplexMetrics.getSerdeForType(typeName); diff --git a/server/src/main/java/com/metamx/druid/index/v1/QueryableIndexStorageAdapter.java b/server/src/main/java/com/metamx/druid/index/v1/QueryableIndexStorageAdapter.java index 883f43fd2c1..2f41b0d08a0 100644 --- a/server/src/main/java/com/metamx/druid/index/v1/QueryableIndexStorageAdapter.java +++ b/server/src/main/java/com/metamx/druid/index/v1/QueryableIndexStorageAdapter.java @@ -35,6 +35,7 @@ import com.metamx.druid.index.QueryableIndex; import com.metamx.druid.index.brita.BitmapIndexSelector; import com.metamx.druid.index.brita.Filter; import com.metamx.druid.index.column.Column; +import com.metamx.druid.index.column.ColumnCapabilities; import com.metamx.druid.index.column.ColumnSelector; import com.metamx.druid.index.column.ComplexColumn; import com.metamx.druid.index.column.DictionaryEncodedColumn; @@ -481,17 +482,18 @@ public class QueryableIndexStorageAdapter extends BaseStorageAdapter Column holder = index.getColumn(columnName); if(holder != null) { - if(holder.getCapabilities().hasMultipleValues()) { + final ColumnCapabilities capabilities = holder.getCapabilities(); + + if(capabilities.hasMultipleValues()) { throw new UnsupportedOperationException( "makeObjectColumnSelector does not support multivalued columns" ); } - final ValueType type = holder.getCapabilities().getType(); - if(holder.getCapabilities().isDictionaryEncoded()) { + if(capabilities.isDictionaryEncoded()) { cachedColumnVals = holder.getDictionaryEncoding(); } - else if(type == ValueType.COMPLEX) { + else if(capabilities.getType() == ValueType.COMPLEX) { cachedColumnVals = holder.getComplexColumn(); } else { @@ -499,7 +501,9 @@ public class QueryableIndexStorageAdapter extends BaseStorageAdapter } } - if(cachedColumnVals != null) objectColumnCache.put(columnName, cachedColumnVals); + if(cachedColumnVals != null) { + objectColumnCache.put(columnName, cachedColumnVals); + } } if (cachedColumnVals == null) { @@ -616,7 +620,9 @@ public class QueryableIndexStorageAdapter extends BaseStorageAdapter Closeables.closeQuietly(complexColumn); } for (Object column : complexColumnCache.values()) { - if(column instanceof Closeable) Closeables.closeQuietly((Closeable)column); + if(column instanceof Closeable) { + Closeables.closeQuietly((Closeable)column); + } } } } From 4597d7f44ec8611acd0bd278f784323b7ee1f58c Mon Sep 17 00:00:00 2001 From: xvrl Date: Mon, 15 Apr 2013 14:31:24 -0700 Subject: [PATCH 16/19] remove IndexStorageAdapter --- .../druid/index/v1/IndexStorageAdapter.java | 627 ------------------ 1 file changed, 627 deletions(-) delete mode 100644 server/src/main/java/com/metamx/druid/index/v1/IndexStorageAdapter.java diff --git a/server/src/main/java/com/metamx/druid/index/v1/IndexStorageAdapter.java b/server/src/main/java/com/metamx/druid/index/v1/IndexStorageAdapter.java deleted file mode 100644 index dd93d33a42f..00000000000 --- a/server/src/main/java/com/metamx/druid/index/v1/IndexStorageAdapter.java +++ /dev/null @@ -1,627 +0,0 @@ -/* - * Druid - a distributed column store. - * Copyright (C) 2012 Metamarkets Group Inc. - * - * This program is free software; you can redistribute it and/or - * modify it under the terms of the GNU General Public License - * as published by the Free Software Foundation; either version 2 - * of the License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. - */ - -package com.metamx.druid.index.v1; - -import com.google.common.base.Function; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.io.Closeables; -import com.metamx.common.Pair; -import com.metamx.common.collect.MoreIterators; -import com.metamx.common.guava.FunctionalIterator; -import com.metamx.common.logger.Logger; -import com.metamx.druid.BaseStorageAdapter; -import com.metamx.druid.Capabilities; -import com.metamx.druid.QueryGranularity; -import com.metamx.druid.index.brita.BitmapIndexSelector; -import com.metamx.druid.index.brita.Filter; -import com.metamx.druid.index.v1.processing.ArrayBasedOffset; -import com.metamx.druid.index.v1.processing.Cursor; -import com.metamx.druid.index.v1.processing.DimensionSelector; -import com.metamx.druid.index.v1.processing.Offset; -import com.metamx.druid.index.v1.processing.StartLimitedOffset; -import com.metamx.druid.kv.ArrayBasedIndexedInts; -import com.metamx.druid.kv.ArrayIndexed; -import com.metamx.druid.kv.Indexed; -import com.metamx.druid.kv.IndexedFloats; -import com.metamx.druid.kv.IndexedInts; -import com.metamx.druid.kv.ListIndexed; -import com.metamx.druid.processing.ComplexMetricSelector; -import com.metamx.druid.processing.FloatMetricSelector; -import com.metamx.druid.processing.ObjectColumnSelector; -import it.uniroma3.mat.extendedset.intset.ImmutableConciseSet; -import org.joda.time.DateTime; -import org.joda.time.Interval; - -import java.io.Closeable; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Iterator; -import java.util.Map; - -/** - */ -public class IndexStorageAdapter extends BaseStorageAdapter -{ - private final Logger log = new Logger(IndexStorageAdapter.class); - - private final Index index; - - private final int[] ids; - - private final Capabilities capabilities; - - public IndexStorageAdapter( - Index index - ) - { - this.index = index; - - capabilities = Capabilities.builder() - .dimensionValuesSorted(isReverseDimSorted()) - .build(); - - ids = new int[index.timeOffsets.length]; - for (int i = 0; i < ids.length; i++) { - ids[i] = i; - } - } - - @Override - public String getSegmentIdentifier() - { - throw new UnsupportedOperationException(); - } - - @Override - public Interval getInterval() - { - return index.dataInterval; - } - - @Override - public int getDimensionCardinality(String dimension) - { - final String[] strings = index.reverseDimLookup.get(dimension); - return strings == null ? 0 : strings.length; - } - - @Override - public DateTime getMinTime() - { - return new DateTime(index.timeOffsets[0]); - } - - @Override - public DateTime getMaxTime() - { - return new DateTime(index.timeOffsets[index.timeOffsets.length - 1]); - } - - @Override - public Iterable makeCursors(final Filter filter, final Interval interval, final QueryGranularity gran) - { - Interval actualIntervalTmp = interval; - if (!actualIntervalTmp.overlaps(index.dataInterval)) { - return ImmutableList.of(); - } - - if (actualIntervalTmp.getStart().isBefore(index.dataInterval.getStart())) { - actualIntervalTmp = actualIntervalTmp.withStart(index.dataInterval.getStart()); - } - if (actualIntervalTmp.getEnd().isAfter(index.dataInterval.getEnd())) { - actualIntervalTmp = actualIntervalTmp.withEnd(index.dataInterval.getEnd()); - } - - final Interval actualInterval = actualIntervalTmp; - - final Pair intervalStartAndEnd = computeTimeStartEnd(actualInterval); - - return new Iterable() - { - @Override - public Iterator iterator() - { - final Offset baseOffset; - if (filter == null) { - baseOffset = new ArrayBasedOffset(ids, intervalStartAndEnd.lhs); - } else { - baseOffset = new StartLimitedOffset( - new ConciseOffset(filter.goConcise(new IndexBasedBitmapIndexSelector(index))), - intervalStartAndEnd.lhs - ); - } - - final Map columnHolderCache = Maps.newHashMap(); - - // This after call is not perfect, if there is an exception during processing, it will never get called, - // but it's better than nothing and doing this properly all the time requires a lot more fixerating - return MoreIterators.after( - FunctionalIterator - .create(gran.iterable(actualInterval.getStartMillis(), actualInterval.getEndMillis()).iterator()) - .keep( - new Function() - { - @Override - public Cursor apply(final Long intervalStart) - { - final Offset offset = new TimestampCheckingOffset( - baseOffset, - index.timeOffsets, - Math.min(actualInterval.getEndMillis(), gran.next(intervalStart)) - ); - - return new Cursor() - { - - private final Offset initOffset = offset.clone(); - private Offset cursorOffset = offset; - private final DateTime timestamp = gran.toDateTime(intervalStart); - - @Override - public DateTime getTime() - { - return timestamp; - } - - @Override - public void advance() - { - cursorOffset.increment(); - } - - @Override - public boolean isDone() - { - return !cursorOffset.withinBounds(); - } - - @Override - public void reset() - { - cursorOffset = initOffset.clone(); - } - - @Override - public DimensionSelector makeDimensionSelector(String dimension) - { - final String dimensionName = dimension.toLowerCase(); - final String[] nameLookup = index.reverseDimLookup.get(dimensionName); - if (nameLookup == null) { - return null; - } - - return new DimensionSelector() - { - final Map dimValLookup = index.dimIdLookup.get(dimensionName); - final DimensionColumn dimColumn = index.dimensionValues.get(dimensionName); - final int[][] dimensionExpansions = dimColumn.getDimensionExpansions(); - final int[] dimensionRowValues = dimColumn.getDimensionRowValues(); - - @Override - public IndexedInts getRow() - { - return new ArrayBasedIndexedInts(dimensionExpansions[dimensionRowValues[cursorOffset.getOffset()]]); - } - - @Override - public int getValueCardinality() - { - return nameLookup.length; - } - - @Override - public String lookupName(int id) - { - return nameLookup[id]; - } - - @Override - public int lookupId(String name) - { - final Integer retVal = dimValLookup.get(name); - - return retVal == null ? -1 : retVal; - } - }; - } - - @Override - public FloatMetricSelector makeFloatMetricSelector(String metric) - { - String metricName = metric.toLowerCase(); - IndexedFloats cachedFloats = (IndexedFloats) columnHolderCache.get(metric); - if (cachedFloats == null) { - MetricHolder holder = index.metricVals.get(metricName); - if (holder == null) { - return new FloatMetricSelector() - { - @Override - public float get() - { - return 0.0f; - } - }; - } - - cachedFloats = holder.getFloatType(); - columnHolderCache.put(metricName, cachedFloats); - } - - final IndexedFloats metricVals = cachedFloats; - return new FloatMetricSelector() - { - @Override - public float get() - { - return metricVals.get(cursorOffset.getOffset()); - } - }; - } - - @Override - public ComplexMetricSelector makeComplexMetricSelector(String metric) - { - final String metricName = metric.toLowerCase(); - Indexed cachedComplex = (Indexed) columnHolderCache.get(metricName); - - if (cachedComplex == null) { - MetricHolder holder = index.metricVals.get(metricName); - if (holder != null) { - cachedComplex = holder.getComplexType(); - columnHolderCache.put(metricName, cachedComplex); - } - } - - if (cachedComplex == null) { - return null; - } - - final Indexed vals = cachedComplex; - return new ComplexMetricSelector() - { - @Override - public Class classOfObject() - { - return vals.getClazz(); - } - - @Override - public Object get() - { - return vals.get(cursorOffset.getOffset()); - } - }; - } - - @Override - public ObjectColumnSelector makeObjectColumnSelector(String column) - { - - final String columnName = column.toLowerCase(); - Object cachedColumn = (Indexed) columnHolderCache.get(columnName); - - if (cachedColumn == null) { - MetricHolder holder = index.metricVals.get(columnName); - final String[] nameLookup = index.reverseDimLookup.get(columnName); - - if(nameLookup != null) { - cachedColumn = index.dimensionValues.get(columnName); - } - else if(holder != null) { - final MetricHolder.MetricType type = holder.getType(); - - if (type == MetricHolder.MetricType.COMPLEX) { - cachedColumn = holder.getComplexType(); - } - else { - cachedColumn = holder.getFloatType(); - } - } - - if(cachedColumn != null) { - columnHolderCache.put(columnName, cachedColumn); - } - } - - if (cachedColumn == null) { - return null; - } - - if(cachedColumn instanceof IndexedFloats) { - final IndexedFloats vals = (IndexedFloats)cachedColumn; - return new ObjectColumnSelector() - { - @Override - public Class classOfObject() - { - return Float.TYPE; - } - - @Override - public Float get() - { - return vals.get(cursorOffset.getOffset()); - } - }; - } - - if(cachedColumn instanceof Indexed) { - final Indexed vals = (Indexed)cachedColumn; - return new ObjectColumnSelector() - { - @Override - public Class classOfObject() - { - return vals.getClazz(); - } - - @Override - public Object get() - { - return vals.get(cursorOffset.getOffset()); - } - }; - } - - if(cachedColumn instanceof DimensionColumn) { - final DimensionColumn vals = (DimensionColumn)cachedColumn; - - final String[] nameLookup = index.reverseDimLookup.get(columnName); - final int[] dimensionRowValues = vals.getDimensionRowValues(); - final int[][] dimensionExpansions = vals.getDimensionExpansions(); - - return new ObjectColumnSelector() - { - @Override - public Class classOfObject() - { - return String.class; - } - - @Override - public String get() - { - final int[] dimIds = dimensionExpansions[dimensionRowValues[cursorOffset.getOffset()]]; - if(dimIds.length == 1) return nameLookup[dimIds[0]]; - if(dimIds.length == 0) return null; - throw new UnsupportedOperationException( - "makeObjectColumnSelector does not support multivalued columns" - ); - } - }; - } - - return null; - } - }; - } - } - ), - new Runnable() - { - @Override - public void run() - { - for (Object object : columnHolderCache.values()) { - if (object instanceof Closeable) { - Closeables.closeQuietly((Closeable) object); - } - } - } - } - ); - } - }; - } - - @Override - public Indexed getAvailableDimensions() - { - return new ArrayIndexed(index.dimensions, String.class); - } - - @Override - public Indexed getDimValueLookup(String dimension) - { - return new ListIndexed( - Lists.newArrayList(index.dimIdLookup.get(dimension.toLowerCase()).keySet()), String.class - ); - } - - @Override - public ImmutableConciseSet getInvertedIndex(String dimension, String dimVal) - { - return index.getInvertedIndex(dimension.toLowerCase(), dimVal); - } - - @Override - public Offset getFilterOffset(Filter filter) - { - return new ConciseOffset(filter.goConcise(new IndexBasedBitmapIndexSelector(index))); - } - - @Override - public Capabilities getCapabilities() - { - return capabilities; - } - - private boolean isReverseDimSorted() - { - for (Map.Entry entry : index.reverseDimLookup.entrySet()) { - String[] arr = entry.getValue(); - for (int i = 0; i < arr.length - 1; i++) { - if (arr[i].compareTo(arr[i + 1]) > 0) { - return false; - } - } - } - return true; - } - - private Pair computeTimeStartEnd(Interval interval) - { - DateTime actualIntervalStart = index.dataInterval.getStart(); - DateTime actualIntervalEnd = index.dataInterval.getEnd(); - - if (index.dataInterval.contains(interval.getStart())) { - actualIntervalStart = interval.getStart(); - } - - if (index.dataInterval.contains(interval.getEnd())) { - actualIntervalEnd = interval.getEnd(); - } - - return computeOffsets(actualIntervalStart.getMillis(), 0, actualIntervalEnd.getMillis(), index.timeOffsets.length); - } - - private Pair computeOffsets(long startMillis, int startOffset, long endMillis, int endOffset) - { - int startIndex = startOffset; - int endIndex = endOffset; - - if (index.timeOffsets[startIndex] < startMillis) { - startIndex = Math.abs(Arrays.binarySearch(index.timeOffsets, startMillis)); - - if (startIndex >= endOffset) { - return new Pair(0, 0); - } - - while (startIndex > 0 && index.timeOffsets[startIndex - 1] == startMillis) { - --startIndex; - } - } - - if (index.timeOffsets[endIndex - 1] >= endMillis) { - endIndex = Math.abs(Arrays.binarySearch(index.timeOffsets, endMillis)); - - while (endIndex > startIndex && index.timeOffsets[endIndex - 1] == endMillis) { - --endIndex; - } - } - - return new Pair(startIndex, endIndex); - } - - private static class TimestampCheckingOffset implements Offset - { - private final Offset baseOffset; - private final long[] timestamps; - private final long threshold; - - public TimestampCheckingOffset( - Offset baseOffset, - long[] timestamps, - long threshold - ) - { - this.baseOffset = baseOffset; - this.timestamps = timestamps; - this.threshold = threshold; - } - - @Override - public int getOffset() - { - return baseOffset.getOffset(); - } - - @Override - public Offset clone() - { - return new TimestampCheckingOffset(baseOffset.clone(), timestamps, threshold); - } - - @Override - public boolean withinBounds() - { - return baseOffset.withinBounds() && timestamps[baseOffset.getOffset()] < threshold; - } - - @Override - public void increment() - { - baseOffset.increment(); - } - } - - private static class IndexBasedBitmapIndexSelector implements BitmapIndexSelector - { - private final Index index; - - public IndexBasedBitmapIndexSelector(final Index index) - { - this.index = index; - } - - @Override - public Indexed getDimensionValues(final String dimension) - { - return new Indexed() - { - private final String[] dimVals = index.reverseDimLookup.get(dimension.toLowerCase()); - - @Override - public Class getClazz() - { - return String.class; - } - - @Override - public int size() - { - return dimVals.length; - } - - @Override - public String get(int index) - { - return dimVals[index]; - } - - @Override - public int indexOf(String value) - { - return Arrays.binarySearch(dimVals, value); - } - - @Override - public Iterator iterator() - { - return Arrays.asList(dimVals).iterator(); - } - }; - } - - @Override - public int getNumRows() - { - return index.timeOffsets.length; - } - - @Override - public ImmutableConciseSet getConciseInvertedIndex(String dimension, String value) - { - return index.getInvertedIndex(dimension.toLowerCase(), value); - } - } -} From 50998671aef1a3eec35fb153e9443109dadd3778 Mon Sep 17 00:00:00 2001 From: xvrl Date: Mon, 15 Apr 2013 15:39:26 -0700 Subject: [PATCH 17/19] formatting --- .../metamx/druid/index/v1/QueryableIndexStorageAdapter.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/com/metamx/druid/index/v1/QueryableIndexStorageAdapter.java b/server/src/main/java/com/metamx/druid/index/v1/QueryableIndexStorageAdapter.java index 2f41b0d08a0..d80e44b4b9b 100644 --- a/server/src/main/java/com/metamx/druid/index/v1/QueryableIndexStorageAdapter.java +++ b/server/src/main/java/com/metamx/druid/index/v1/QueryableIndexStorageAdapter.java @@ -939,7 +939,9 @@ public class QueryableIndexStorageAdapter extends BaseStorageAdapter } } - if(cachedColumnVals != null) objectColumnCache.put(columnName, cachedColumnVals); + if(cachedColumnVals != null) { + objectColumnCache.put(columnName, cachedColumnVals); + } } if (cachedColumnVals == null) { From db2f3aa761f79b3146cb532b631ad9c6d795d776 Mon Sep 17 00:00:00 2001 From: xvrl Date: Tue, 16 Apr 2013 09:53:34 -0700 Subject: [PATCH 18/19] fix pom versions --- client/pom.xml | 2 +- common/pom.xml | 2 +- druid-services/pom.xml | 4 ++-- examples/pom.xml | 2 +- examples/rand/pom.xml | 2 +- examples/twitter/pom.xml | 2 +- index-common/pom.xml | 2 +- indexer/pom.xml | 2 +- merger/pom.xml | 2 +- pom.xml | 2 +- server/pom.xml | 2 +- 11 files changed, 12 insertions(+), 12 deletions(-) diff --git a/client/pom.xml b/client/pom.xml index ea4c4259adb..194a5fd2753 100644 --- a/client/pom.xml +++ b/client/pom.xml @@ -28,7 +28,7 @@ com.metamx druid - 0.3.39-SNAPSHOT + 0.4.0-SNAPSHOT diff --git a/common/pom.xml b/common/pom.xml index 6e093648e2a..73cd8ead7ab 100644 --- a/common/pom.xml +++ b/common/pom.xml @@ -28,7 +28,7 @@ com.metamx druid - 0.3.39-SNAPSHOT + 0.4.0-SNAPSHOT diff --git a/druid-services/pom.xml b/druid-services/pom.xml index 399dfc51524..694e33c8fb9 100644 --- a/druid-services/pom.xml +++ b/druid-services/pom.xml @@ -24,11 +24,11 @@ druid-services druid-services druid-services - 0.3.39-SNAPSHOT + 0.4.0-SNAPSHOT com.metamx druid - 0.3.39-SNAPSHOT + 0.4.0-SNAPSHOT diff --git a/examples/pom.xml b/examples/pom.xml index 27b0eef39d2..affa586a8d6 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -28,7 +28,7 @@ com.metamx druid - 0.3.39-SNAPSHOT + 0.4.0-SNAPSHOT diff --git a/examples/rand/pom.xml b/examples/rand/pom.xml index 6569b377f42..14680cc229d 100644 --- a/examples/rand/pom.xml +++ b/examples/rand/pom.xml @@ -9,7 +9,7 @@ com.metamx druid-examples - 0.3.39-SNAPSHOT + 0.4.0-SNAPSHOT diff --git a/examples/twitter/pom.xml b/examples/twitter/pom.xml index f0bb40cff2d..eb498f783de 100644 --- a/examples/twitter/pom.xml +++ b/examples/twitter/pom.xml @@ -9,7 +9,7 @@ com.metamx druid-examples - 0.3.39-SNAPSHOT + 0.4.0-SNAPSHOT diff --git a/index-common/pom.xml b/index-common/pom.xml index 1a6458a8809..5f4e06d9a0b 100644 --- a/index-common/pom.xml +++ b/index-common/pom.xml @@ -28,7 +28,7 @@ com.metamx druid - 0.3.39-SNAPSHOT + 0.4.0-SNAPSHOT diff --git a/indexer/pom.xml b/indexer/pom.xml index fe231aa63ca..5583e098d2c 100644 --- a/indexer/pom.xml +++ b/indexer/pom.xml @@ -28,7 +28,7 @@ com.metamx druid - 0.3.39-SNAPSHOT + 0.4.0-SNAPSHOT diff --git a/merger/pom.xml b/merger/pom.xml index 5d3d9eebdf6..10831986e44 100644 --- a/merger/pom.xml +++ b/merger/pom.xml @@ -28,7 +28,7 @@ com.metamx druid - 0.3.39-SNAPSHOT + 0.4.0-SNAPSHOT diff --git a/pom.xml b/pom.xml index 27f4a264693..5daabd0e8ac 100644 --- a/pom.xml +++ b/pom.xml @@ -23,7 +23,7 @@ com.metamx druid pom - 0.3.39-SNAPSHOT + 0.4.0-SNAPSHOT druid druid diff --git a/server/pom.xml b/server/pom.xml index 9fb76b0be13..8cf6c507715 100644 --- a/server/pom.xml +++ b/server/pom.xml @@ -28,7 +28,7 @@ com.metamx druid - 0.3.39-SNAPSHOT + 0.4.0-SNAPSHOT From cc30dfad2f4dd3eb634d673ad667744f766e4169 Mon Sep 17 00:00:00 2001 From: cheddar Date: Fri, 19 Apr 2013 10:14:17 -0500 Subject: [PATCH 19/19] Test commit to test contributions --- some_file.txt | 0 1 file changed, 0 insertions(+), 0 deletions(-) create mode 100644 some_file.txt diff --git a/some_file.txt b/some_file.txt new file mode 100644 index 00000000000..e69de29bb2d