From 83f4130b5f7930efa3c5059f2adeebf0adfcbeab Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Tue, 22 Dec 2015 01:21:56 -0800 Subject: [PATCH] SegmentMetadataQuery merging fixes. - Fix merging when the INTERVALS analysisType is disabled, and add a test. - Remove transformFn from CombiningSequence, use MappingSequence instead. transformFn did not work for "accumulate" anyway, which made the tests wrong (the intervals should have been condensed, but were not). - Add analysisTypes to the Druids segmentMetadataQuery builder to make testing simpler. --- .../druid/common/guava/CombiningSequence.java | 13 +--- .../common/guava/CombiningSequenceTest.java | 3 +- .../src/main/java/io/druid/query/Druids.java | 68 ++++++++++++------- .../druid/query/ResultMergeQueryRunner.java | 2 +- .../druid/query/metadata/SegmentAnalyzer.java | 27 +++++--- .../SegmentMetadataQueryQueryToolChest.java | 42 ++++++------ .../metadata/SegmentMetadataQueryTest.java | 61 +++++++++++++++-- 7 files changed, 143 insertions(+), 73 deletions(-) diff --git a/common/src/main/java/io/druid/common/guava/CombiningSequence.java b/common/src/main/java/io/druid/common/guava/CombiningSequence.java index 7b463a67f2e..ceb714e43ae 100644 --- a/common/src/main/java/io/druid/common/guava/CombiningSequence.java +++ b/common/src/main/java/io/druid/common/guava/CombiningSequence.java @@ -38,29 +38,25 @@ public class CombiningSequence implements Sequence public static CombiningSequence create( Sequence baseSequence, Ordering ordering, - BinaryFn mergeFn, - Function transformFn + BinaryFn mergeFn ) { - return new CombiningSequence(baseSequence, ordering, mergeFn, transformFn); + return new CombiningSequence(baseSequence, ordering, mergeFn); } private final Sequence baseSequence; private final Ordering ordering; private final BinaryFn mergeFn; - private final Function transformFn; public CombiningSequence( Sequence baseSequence, Ordering ordering, - BinaryFn mergeFn, - Function transformFn + BinaryFn mergeFn ) { this.baseSequence = baseSequence; this.ordering = ordering; this.mergeFn = mergeFn; - this.transformFn = transformFn; } @Override @@ -122,9 +118,6 @@ public class CombiningSequence implements Sequence @Override public OutType get() { - if (transformFn != null) { - return (OutType) transformFn.apply(retVal); - } return retVal; } diff --git a/common/src/test/java/io/druid/common/guava/CombiningSequenceTest.java b/common/src/test/java/io/druid/common/guava/CombiningSequenceTest.java index 2485e68436b..c280f15d991 100644 --- a/common/src/test/java/io/druid/common/guava/CombiningSequenceTest.java +++ b/common/src/test/java/io/druid/common/guava/CombiningSequenceTest.java @@ -214,8 +214,7 @@ public class CombiningSequenceTest return Pair.of(lhs.lhs, lhs.rhs + rhs.rhs); } - }, - null + } ); List> merged = Sequences.toList(seq, Lists.>newArrayList()); diff --git a/processing/src/main/java/io/druid/query/Druids.java b/processing/src/main/java/io/druid/query/Druids.java index d001a855eda..56e0a21b805 100644 --- a/processing/src/main/java/io/druid/query/Druids.java +++ b/processing/src/main/java/io/druid/query/Druids.java @@ -55,6 +55,8 @@ import org.joda.time.DateTime; import org.joda.time.Interval; import javax.annotation.Nullable; +import java.util.Arrays; +import java.util.EnumSet; import java.util.List; import java.util.Map; @@ -79,9 +81,9 @@ public class Druids /** * A Builder for AndDimFilter. - * + *

* Required: fields() must be called before build() - * + *

* Usage example: *


    *   AndDimFilter andDimFilter = Druids.newAndDimFilterBuilder()
@@ -125,9 +127,9 @@ public class Druids
 
   /**
    * A Builder for OrDimFilter.
-   *
+   * 

* Required: fields() must be called before build() - * + *

* Usage example: *


    *   OrDimFilter orDimFilter = Druids.newOrDimFilterBuilder()
@@ -180,9 +182,9 @@ public class Druids
 
   /**
    * A Builder for NotDimFilter.
-   *
+   * 

* Required: field() must be called before build() - * + *

* Usage example: *


    *   NotDimFilter notDimFilter = Druids.newNotDimFilterBuilder()
@@ -226,9 +228,9 @@ public class Druids
 
   /**
    * A Builder for SelectorDimFilter.
-   *
+   * 

* Required: dimension() and value() must be called before build() - * + *

* Usage example: *


    *   Selector selDimFilter = Druids.newSelectorDimFilterBuilder()
@@ -305,10 +307,10 @@ public class Druids
 
   /**
    * A Builder for TimeseriesQuery.
-   *
+   * 

* Required: dataSource(), intervals(), and aggregators() must be called before build() * Optional: filters(), granularity(), postAggregators(), and context() can be called before build() - * + *

* Usage example: *


    *   TimeseriesQuery query = Druids.newTimeseriesQueryBuilder()
@@ -503,11 +505,11 @@ public class Druids
 
   /**
    * A Builder for SearchQuery.
-   *
+   * 

* Required: dataSource(), intervals(), dimensions() and query() must be called before build() - * + *

* Optional: filters(), granularity(), and context() can be called before build() - * + *

* Usage example: *


    *   SearchQuery query = Druids.newSearchQueryBuilder()
@@ -738,9 +740,9 @@ public class Druids
 
   /**
    * A Builder for TimeBoundaryQuery.
-   *
+   * 

* Required: dataSource() must be called before build() - * + *

* Usage example: *


    *   TimeBoundaryQuery query = new MaxTimeQueryBuilder()
@@ -834,9 +836,9 @@ public class Druids
 
   /**
    * A Builder for Result.
-   *
+   * 

* Required: timestamp() and value() must be called before build() - * + *

* Usage example: *


    *   Result<T> result = Druids.newResultBuilder()
@@ -900,9 +902,9 @@ public class Druids
 
   /**
    * A Builder for SegmentMetadataQuery.
-   *
+   * 

* Required: dataSource(), intervals() must be called before build() - * + *

* Usage example: *


    *   SegmentMetadataQuery query = new SegmentMetadataQueryBuilder()
@@ -918,6 +920,7 @@ public class Druids
     private DataSource dataSource;
     private QuerySegmentSpec querySegmentSpec;
     private ColumnIncluderator toInclude;
+    private EnumSet analysisTypes;
     private Boolean merge;
     private Map context;
 
@@ -926,6 +929,7 @@ public class Druids
       dataSource = null;
       querySegmentSpec = null;
       toInclude = null;
+      analysisTypes = null;
       merge = null;
       context = null;
     }
@@ -938,17 +942,22 @@ public class Druids
           toInclude,
           merge,
           context,
-          null,
+          analysisTypes,
           false
       );
     }
 
     public SegmentMetadataQueryBuilder copy(SegmentMetadataQueryBuilder builder)
     {
+      final SegmentMetadataQuery.AnalysisType[] analysisTypesArray =
+          analysisTypes != null
+          ? analysisTypes.toArray(new SegmentMetadataQuery.AnalysisType[analysisTypes.size()])
+          : null;
       return new SegmentMetadataQueryBuilder()
           .dataSource(builder.dataSource)
           .intervals(builder.querySegmentSpec)
           .toInclude(toInclude)
+          .analysisTypes(analysisTypesArray)
           .merge(merge)
           .context(builder.context);
     }
@@ -989,6 +998,17 @@ public class Druids
       return this;
     }
 
+    public SegmentMetadataQueryBuilder analysisTypes(SegmentMetadataQuery.AnalysisType... analysisTypes)
+    {
+      if (analysisTypes == null) {
+        this.analysisTypes = null;
+      } else {
+        this.analysisTypes = analysisTypes.length == 0
+                             ? EnumSet.noneOf(SegmentMetadataQuery.AnalysisType.class)
+                             : EnumSet.copyOf(Arrays.asList(analysisTypes));
+      }
+      return this;
+    }
 
     public SegmentMetadataQueryBuilder merge(boolean merge)
     {
@@ -1010,9 +1030,9 @@ public class Druids
 
   /**
    * A Builder for SelectQuery.
-   *
+   * 

* Required: dataSource(), intervals() must be called before build() - * + *

* Usage example: *


    *   SelectQuery query = new SelectQueryBuilder()
@@ -1164,9 +1184,9 @@ public class Druids
 
   /**
    * A Builder for DataSourceMetadataQuery.
-   *
+   * 

* Required: dataSource() must be called before build() - * + *

* Usage example: *


    *   DataSourceMetadataQueryBuilder query = new DataSourceMetadataQueryBuilder()
diff --git a/processing/src/main/java/io/druid/query/ResultMergeQueryRunner.java b/processing/src/main/java/io/druid/query/ResultMergeQueryRunner.java
index 76a39a83a99..a52e7579125 100644
--- a/processing/src/main/java/io/druid/query/ResultMergeQueryRunner.java
+++ b/processing/src/main/java/io/druid/query/ResultMergeQueryRunner.java
@@ -40,7 +40,7 @@ public abstract class ResultMergeQueryRunner extends BySegmentSkippingQueryRu
   @Override
   public Sequence doRun(QueryRunner baseRunner, Query query, Map context)
   {
-    return CombiningSequence.create(baseRunner.run(query, context), makeOrdering(query), createMergeFn(query), null);
+    return CombiningSequence.create(baseRunner.run(query, context), makeOrdering(query), createMergeFn(query));
   }
 
   protected abstract Ordering makeOrdering(Query query);
diff --git a/processing/src/main/java/io/druid/query/metadata/SegmentAnalyzer.java b/processing/src/main/java/io/druid/query/metadata/SegmentAnalyzer.java
index 57cc7a934f4..cf59db44d28 100644
--- a/processing/src/main/java/io/druid/query/metadata/SegmentAnalyzer.java
+++ b/processing/src/main/java/io/druid/query/metadata/SegmentAnalyzer.java
@@ -59,7 +59,10 @@ public class SegmentAnalyzer
    */
   private static final int NUM_BYTES_IN_TEXT_FLOAT = 8;
 
-  public Map analyze(QueryableIndex index, EnumSet analysisTypes)
+  public Map analyze(
+      QueryableIndex index,
+      EnumSet analysisTypes
+  )
   {
     Preconditions.checkNotNull(index, "Index cannot be null");
 
@@ -100,7 +103,10 @@ public class SegmentAnalyzer
     return columns;
   }
 
-  public Map analyze(StorageAdapter adapter, EnumSet analysisTypes)
+  public Map analyze(
+      StorageAdapter adapter,
+      EnumSet analysisTypes
+  )
   {
     Preconditions.checkNotNull(adapter, "Adapter cannot be null");
     Map columns = Maps.newTreeMap();
@@ -174,7 +180,11 @@ public class SegmentAnalyzer
     return lengthBasedAnalysis(column, NUM_BYTES_IN_TEXT_FLOAT, analysisTypes);
   }
 
-  private ColumnAnalysis lengthBasedAnalysis(Column column, final int numBytes, EnumSet analysisTypes)
+  private ColumnAnalysis lengthBasedAnalysis(
+      Column column,
+      final int numBytes,
+      EnumSet analysisTypes
+  )
   {
     final ColumnCapabilities capabilities = column.getCapabilities();
     if (capabilities.hasMultipleValues()) {
@@ -273,16 +283,13 @@ public class SegmentAnalyzer
     );
   }
 
-  private boolean analysisHasSize(EnumSet analysisTypes) {
+  private boolean analysisHasSize(EnumSet analysisTypes)
+  {
     return analysisTypes.contains(SegmentMetadataQuery.AnalysisType.SIZE);
   }
 
-  private boolean analysisHasCardinality(EnumSet analysisTypes) {
+  private boolean analysisHasCardinality(EnumSet analysisTypes)
+  {
     return analysisTypes.contains(SegmentMetadataQuery.AnalysisType.CARDINALITY);
   }
-
-  private boolean analysisHasInterva(EnumSet analysisTypes) {
-    return analysisTypes.contains(SegmentMetadataQuery.AnalysisType.INTERVAL);
-  }
-
 }
diff --git a/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java
index b6d9fd99a49..c20e03ec67b 100644
--- a/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java
+++ b/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java
@@ -29,6 +29,7 @@ import com.google.common.collect.Maps;
 import com.google.common.collect.Ordering;
 import com.google.common.collect.Sets;
 import com.google.inject.Inject;
+import com.metamx.common.guava.MappedSequence;
 import com.metamx.common.guava.MergeSequence;
 import com.metamx.common.guava.Sequence;
 import com.metamx.common.guava.nary.BinaryFn;
@@ -63,6 +64,20 @@ public class SegmentMetadataQueryQueryToolChest extends QueryToolChest MERGE_TRANSFORM_FN = new Function()
+  {
+    @Override
+    public SegmentAnalysis apply(SegmentAnalysis analysis)
+    {
+      return new SegmentAnalysis(
+          analysis.getId(),
+          analysis.getIntervals() != null ? JodaUtils.condenseIntervals(analysis.getIntervals()) : null,
+          analysis.getColumns(),
+          analysis.getSize(),
+          analysis.getNumRows()
+      );
+    }
+  };
 
   private final SegmentMetadataQueryConfig config;
 
@@ -79,21 +94,6 @@ public class SegmentMetadataQueryQueryToolChest extends QueryToolChest(runner)
     {
-      private Function transformFn = new Function()
-      {
-        @Override
-        public SegmentAnalysis apply(SegmentAnalysis analysis)
-        {
-          return new SegmentAnalysis(
-              analysis.getId(),
-              JodaUtils.condenseIntervals(analysis.getIntervals()),
-              analysis.getColumns(),
-              analysis.getSize(),
-              analysis.getNumRows()
-          );
-        }
-      };
-
       @Override
       public Sequence doRun(
           QueryRunner baseRunner,
@@ -101,11 +101,13 @@ public class SegmentMetadataQueryQueryToolChest extends QueryToolChest context
       )
       {
-        return CombiningSequence.create(
-            baseRunner.run(query, context),
-            makeOrdering(query),
-            createMergeFn(query),
-            transformFn
+        return new MappedSequence<>(
+            CombiningSequence.create(
+                baseRunner.run(query, context),
+                makeOrdering(query),
+                createMergeFn(query)
+            ),
+            MERGE_TRANSFORM_FN
         );
       }
 
diff --git a/processing/src/test/java/io/druid/query/metadata/SegmentMetadataQueryTest.java b/processing/src/test/java/io/druid/query/metadata/SegmentMetadataQueryTest.java
index 671b3bbf4eb..5a2cfd5f72a 100644
--- a/processing/src/test/java/io/druid/query/metadata/SegmentMetadataQueryTest.java
+++ b/processing/src/test/java/io/druid/query/metadata/SegmentMetadataQueryTest.java
@@ -25,6 +25,7 @@ import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.MoreExecutors;
 import com.metamx.common.guava.Sequences;
 import io.druid.common.utils.JodaUtils;
 import io.druid.jackson.DefaultObjectMapper;
@@ -89,6 +90,7 @@ public class SegmentMetadataQueryTest
                       .dataSource("testing")
                       .intervals("2013/2014")
                       .toInclude(new ListColumnIncluderator(Arrays.asList("placement")))
+                      .analysisTypes(null)
                       .merge(true)
                       .build();
 
@@ -127,10 +129,7 @@ public class SegmentMetadataQueryTest
   {
     SegmentAnalysis mergedSegmentAnalysis = new SegmentAnalysis(
         "merged",
-        ImmutableList.of(
-            expectedSegmentAnalysis.getIntervals().get(0),
-            expectedSegmentAnalysis.getIntervals().get(0)
-        ),
+        ImmutableList.of(expectedSegmentAnalysis.getIntervals().get(0)),
         ImmutableMap.of(
             "placement",
             new ColumnAnalysis(
@@ -151,7 +150,7 @@ public class SegmentMetadataQueryTest
     QueryRunner myRunner = new FinalizeResultsQueryRunner<>(
         toolChest.mergeResults(
             factory.mergeRunners(
-                Executors.newCachedThreadPool(),
+                MoreExecutors.sameThreadExecutor(),
                 Lists.>newArrayList(singleSegmentQueryRunner, singleSegmentQueryRunner)
             )
         ),
@@ -169,6 +168,56 @@ public class SegmentMetadataQueryTest
     exec.shutdownNow();
   }
 
+  @Test
+  public void testSegmentMetadataQueryWithNoAnalysisTypesMerge()
+  {
+    SegmentAnalysis mergedSegmentAnalysis = new SegmentAnalysis(
+        "merged",
+        null,
+        ImmutableMap.of(
+            "placement",
+            new ColumnAnalysis(
+                ValueType.STRING.toString(),
+                0,
+                0,
+                null
+            )
+        ),
+        0,
+        expectedSegmentAnalysis.getNumRows()*2
+    );
+
+    QueryToolChest toolChest = factory.getToolchest();
+
+    QueryRunner singleSegmentQueryRunner = toolChest.preMergeQueryDecoration(runner);
+    ExecutorService exec = Executors.newCachedThreadPool();
+    QueryRunner myRunner = new FinalizeResultsQueryRunner<>(
+        toolChest.mergeResults(
+            factory.mergeRunners(
+                MoreExecutors.sameThreadExecutor(),
+                Lists.>newArrayList(singleSegmentQueryRunner, singleSegmentQueryRunner)
+            )
+        ),
+        toolChest
+    );
+
+    TestHelper.assertExpectedObjects(
+        ImmutableList.of(mergedSegmentAnalysis),
+        myRunner.run(
+            Druids.newSegmentMetadataQueryBuilder()
+                  .dataSource("testing")
+                  .intervals("2013/2014")
+                  .toInclude(new ListColumnIncluderator(Arrays.asList("placement")))
+                  .analysisTypes()
+                  .merge(true)
+                  .build(),
+            Maps.newHashMap()
+        ),
+        "failed SegmentMetadata merging query"
+    );
+    exec.shutdownNow();
+  }
+
   @Test
   public void testBySegmentResults()
   {
@@ -188,7 +237,7 @@ public class SegmentMetadataQueryTest
     QueryRunner myRunner = new FinalizeResultsQueryRunner<>(
         toolChest.mergeResults(
             factory.mergeRunners(
-                Executors.newCachedThreadPool(),
+                MoreExecutors.sameThreadExecutor(),
                 //Note: It is essential to have atleast 2 query runners merged to reproduce the regression bug described in
                 //https://github.com/druid-io/druid/pull/1172
                 //the bug surfaces only when ordering is used which happens only when you have 2 things to compare