diff --git a/processing/src/main/java/io/druid/query/GroupByParallelQueryRunner.java b/processing/src/main/java/io/druid/query/GroupByParallelQueryRunner.java new file mode 100644 index 00000000000..fb98968fb43 --- /dev/null +++ b/processing/src/main/java/io/druid/query/GroupByParallelQueryRunner.java @@ -0,0 +1,137 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2014 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.query; + +import com.google.common.base.Function; +import com.google.common.base.Predicates; +import com.google.common.base.Supplier; +import com.google.common.base.Throwables; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.common.collect.Ordering; +import com.metamx.common.Pair; +import com.metamx.common.guava.Accumulator; +import com.metamx.common.guava.Sequence; +import com.metamx.common.guava.Sequences; +import com.metamx.common.logger.Logger; +import io.druid.data.input.Row; +import io.druid.query.groupby.GroupByQuery; +import io.druid.query.groupby.GroupByQueryConfig; +import io.druid.query.groupby.GroupByQueryHelper; +import io.druid.segment.incremental.IncrementalIndex; + +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; + + +public class GroupByParallelQueryRunner implements QueryRunner +{ + private static final Logger log = new Logger(GroupByParallelQueryRunner.class); + private final Iterable> queryables; + private final ExecutorService exec; + private final Ordering ordering; + private final Supplier configSupplier; + + public GroupByParallelQueryRunner( + ExecutorService exec, + Ordering ordering, + Supplier configSupplier, + QueryRunner... queryables + ) + { + this(exec, ordering, configSupplier, Arrays.asList(queryables)); + } + + public GroupByParallelQueryRunner( + ExecutorService exec, + Ordering ordering, Supplier configSupplier, + Iterable> queryables + ) + { + this.exec = exec; + this.ordering = ordering; + this.queryables = Iterables.unmodifiableIterable(Iterables.filter(queryables, Predicates.notNull())); + this.configSupplier = configSupplier; + } + + @Override + public Sequence run(final Query queryParam) + { + + final GroupByQuery query = (GroupByQuery) queryParam; + final Pair> indexAccumulatorPair = GroupByQueryHelper.createIndexAccumulatorPair( + query, + configSupplier.get() + ); + final int priority = Integer.parseInt(query.getContextValue("priority", "0")); + + if (Iterables.isEmpty(queryables)) { + log.warn("No queryables found."); + } + List> futures = Lists.newArrayList( + Iterables.transform( + queryables, + new Function, Future>() + { + @Override + public Future apply(final QueryRunner input) + { + return exec.submit( + new PrioritizedCallable(priority) + { + @Override + public Boolean call() throws Exception + { + try { + input.run(queryParam).accumulate(indexAccumulatorPair.lhs, indexAccumulatorPair.rhs); + return true; + } + catch (Exception e) { + log.error(e, "Exception with one of the sequences!"); + throw Throwables.propagate(e); + } + } + } + ); + } + } + ) + ); + + // Let the runners complete + for (Future future : futures) { + try { + future.get(); + } + catch (InterruptedException e) { + throw new RuntimeException(e); + } + catch (ExecutionException e) { + throw new RuntimeException(e); + } + } + + return Sequences.simple(indexAccumulatorPair.lhs.iterableWithPostAggregations(null)); + } + +} diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQueryHelper.java b/processing/src/main/java/io/druid/query/groupby/GroupByQueryHelper.java new file mode 100644 index 00000000000..cb1f0279d13 --- /dev/null +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQueryHelper.java @@ -0,0 +1,99 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2014 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.query.groupby; + +import com.google.common.base.Function; +import com.google.common.collect.Lists; +import com.metamx.common.ISE; +import com.metamx.common.Pair; +import com.metamx.common.guava.Accumulator; +import com.metamx.common.guava.Sequence; +import com.metamx.common.guava.Sequences; +import io.druid.data.input.MapBasedRow; +import io.druid.data.input.Row; +import io.druid.data.input.Rows; +import io.druid.granularity.QueryGranularity; +import io.druid.query.aggregation.AggregatorFactory; +import io.druid.query.dimension.DimensionSpec; +import io.druid.segment.incremental.IncrementalIndex; + +import javax.annotation.Nullable; +import java.util.List; + +public class GroupByQueryHelper +{ + public static Pair> createIndexAccumulatorPair( + final GroupByQuery query, + final GroupByQueryConfig config + ) + { + final QueryGranularity gran = query.getGranularity(); + final long timeStart = query.getIntervals().get(0).getStartMillis(); + + // use gran.iterable instead of gran.truncate so that + // AllGranularity returns timeStart instead of Long.MIN_VALUE + final long granTimeStart = gran.iterable(timeStart, timeStart + 1).iterator().next(); + + final List aggs = Lists.transform( + query.getAggregatorSpecs(), + new Function() + { + @Override + public AggregatorFactory apply(@Nullable AggregatorFactory input) + { + return input.getCombiningFactory(); + } + } + ); + final List dimensions = Lists.transform( + query.getDimensions(), + new Function() + { + @Override + public String apply(@Nullable DimensionSpec input) + { + return input.getOutputName(); + } + } + ); + IncrementalIndex index = new IncrementalIndex( + // use granularity truncated min timestamp + // since incoming truncated timestamps may precede timeStart + granTimeStart, + gran, + aggs.toArray(new AggregatorFactory[aggs.size()]) + ); + + Accumulator accumulator = new Accumulator() + { + @Override + public IncrementalIndex accumulate(IncrementalIndex accumulated, Row in) + { + if (accumulated.add(Rows.toCaseInsensitiveInputRow(in, dimensions)) > config.getMaxResults()) { + throw new ISE("Computation exceeds maxRows limit[%s]", config.getMaxResults()); + } + + return accumulated; + } + }; + return new Pair>(index, accumulator); + } + +} diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java index 9bae22699c6..2b215b8ec7d 100644 --- a/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java @@ -24,10 +24,9 @@ import com.google.common.base.Function; import com.google.common.base.Joiner; import com.google.common.base.Supplier; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.inject.Inject; -import com.metamx.common.ISE; +import com.metamx.common.Pair; import com.metamx.common.guava.Accumulator; import com.metamx.common.guava.ConcatSequence; import com.metamx.common.guava.Sequence; @@ -35,21 +34,16 @@ import com.metamx.common.guava.Sequences; import com.metamx.emitter.service.ServiceMetricEvent; import io.druid.data.input.MapBasedRow; import io.druid.data.input.Row; -import io.druid.data.input.Rows; -import io.druid.granularity.QueryGranularity; import io.druid.query.IntervalChunkingQueryRunner; import io.druid.query.Query; import io.druid.query.QueryRunner; import io.druid.query.QueryToolChest; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.MetricManipulationFn; -import io.druid.query.dimension.DimensionSpec; import io.druid.segment.incremental.IncrementalIndex; import org.joda.time.Interval; import org.joda.time.Minutes; -import javax.annotation.Nullable; -import java.util.List; import java.util.Map; /** @@ -61,7 +55,6 @@ public class GroupByQueryQueryToolChest extends QueryToolChest NO_MERGE_CONTEXT = ImmutableMap.of(GROUP_BY_MERGE_KEY, "false"); - private final Supplier configSupplier; @Inject @@ -92,60 +85,13 @@ public class GroupByQueryQueryToolChest extends QueryToolChest mergeGroupByResults(final GroupByQuery query, QueryRunner runner) { final GroupByQueryConfig config = configSupplier.get(); - final QueryGranularity gran = query.getGranularity(); - final long timeStart = query.getIntervals().get(0).getStartMillis(); - - // use gran.iterable instead of gran.truncate so that - // AllGranularity returns timeStart instead of Long.MIN_VALUE - final long granTimeStart = gran.iterable(timeStart, timeStart + 1).iterator().next(); - - final List aggs = Lists.transform( - query.getAggregatorSpecs(), - new Function() - { - @Override - public AggregatorFactory apply(@Nullable AggregatorFactory input) - { - return input.getCombiningFactory(); - } - } - ); - final List dimensions = Lists.transform( - query.getDimensions(), - new Function() - { - @Override - public String apply(@Nullable DimensionSpec input) - { - return input.getOutputName(); - } - } + Pair> indexAccumulatorPair = GroupByQueryHelper.createIndexAccumulatorPair( + query, + config ); - final IncrementalIndex index = runner.run(query).accumulate( - new IncrementalIndex( - // use granularity truncated min timestamp - // since incoming truncated timestamps may precede timeStart - granTimeStart, - gran, - aggs.toArray(new AggregatorFactory[aggs.size()]) - ), - new Accumulator() - { - @Override - public IncrementalIndex accumulate(IncrementalIndex accumulated, Row in) - { - if (accumulated.add(Rows.toCaseInsensitiveInputRow(in, dimensions)) > config.getMaxResults()) { - throw new ISE("Computation exceeds maxRows limit[%s]", config.getMaxResults()); - } - - return accumulated; - } - } - ); - - // convert millis back to timestamp according to granularity to preserve time zone information - Sequence retVal = Sequences.map( + IncrementalIndex index = runner.run(query).accumulate(indexAccumulatorPair.lhs, indexAccumulatorPair.rhs); + Sequence sequence = Sequences.map( Sequences.simple(index.iterableWithPostAggregations(query.getPostAggregatorSpecs())), new Function() { @@ -153,12 +99,15 @@ public class GroupByQueryQueryToolChest extends QueryToolChest(queryExecutor, new RowOrdering(), queryRunners); + } else { + return new GroupByParallelQueryRunner(queryExecutor, new RowOrdering(), config, queryRunners); } } @@ -142,7 +141,7 @@ public class GroupByQueryRunnerFactory implements QueryRunnerFactory run(Query input) { - if (! (input instanceof GroupByQuery)) { + if (!(input instanceof GroupByQuery)) { throw new ISE("Got a [%s] which isn't a %s", input.getClass(), GroupByQuery.class); } diff --git a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java index c13a1c3c588..a2f2f87ba4c 100644 --- a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java +++ b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java @@ -62,6 +62,7 @@ import java.util.Set; import java.util.concurrent.ConcurrentNavigableMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicInteger; /** */ @@ -69,11 +70,9 @@ public class IncrementalIndex implements Iterable { private static final Logger log = new Logger(IncrementalIndex.class); private static final Joiner JOINER = Joiner.on(","); - private final long minTimestamp; private final QueryGranularity gran; private final AggregatorFactory[] metrics; - private final Map metricIndexes; private final Map metricTypes; private final ImmutableList metricNames; @@ -83,10 +82,8 @@ public class IncrementalIndex implements Iterable private final SpatialDimensionRowFormatter spatialDimensionRowFormatter; private final DimensionHolder dimValues; private final ConcurrentSkipListMap facts; - - private volatile int numEntries = 0; - - // This is modified on add() by a (hopefully) single thread. + private volatile AtomicInteger numEntries = new AtomicInteger(); + // This is modified on add() in a critical section. private InputRow in; public IncrementalIndex(IncrementalIndexSchema incrementalIndexSchema) @@ -162,15 +159,22 @@ public class IncrementalIndex implements Iterable dimension = dimension.toLowerCase(); List dimensionValues = row.getDimension(dimension); - final Integer index = dimensionOrder.get(dimension); + Integer index = dimensionOrder.get(dimension); if (index == null) { - dimensionOrder.put(dimension, dimensionOrder.size()); - dimensions.add(dimension); + synchronized (dimensionOrder) { + index = dimensionOrder.get(dimension); + if (index == null) { + dimensionOrder.put(dimension, dimensionOrder.size()); + dimensions.add(dimension); - if (overflow == null) { - overflow = Lists.newArrayList(); + if (overflow == null) { + overflow = Lists.newArrayList(); + } + overflow.add(getDimVals(dimValues.add(dimension), dimensionValues)); + } else { + dims[index] = getDimVals(dimValues.get(dimension), dimensionValues); + } } - overflow.add(getDimVals(dimValues.add(dimension), dimensionValues)); } else { dims[index] = getDimVals(dimValues.get(dimension), dimensionValues); } @@ -188,118 +192,128 @@ public class IncrementalIndex implements Iterable TimeAndDims key = new TimeAndDims(Math.max(gran.truncate(row.getTimestampFromEpoch()), minTimestamp), dims); - in = row; Aggregator[] aggs = facts.get(key); if (aggs == null) { aggs = new Aggregator[metrics.length]; for (int i = 0; i < metrics.length; ++i) { final AggregatorFactory agg = metrics[i]; - aggs[i] = agg.factorize( - new ColumnSelectorFactory() - { - @Override - public TimestampColumnSelector makeTimestampColumnSelector() - { - return new TimestampColumnSelector() + aggs[i] = + agg.factorize( + new ColumnSelectorFactory() { @Override - public long getTimestamp() + public TimestampColumnSelector makeTimestampColumnSelector() { - return in.getTimestampFromEpoch(); - } - }; - } - - @Override - public FloatColumnSelector makeFloatColumnSelector(String columnName) - { - final String metricName = columnName.toLowerCase(); - return new FloatColumnSelector() - { - @Override - public float get() - { - return in.getFloatMetric(metricName); - } - }; - } - - @Override - public ObjectColumnSelector makeObjectColumnSelector(String column) - { - final String typeName = agg.getTypeName(); - final String columnName = column.toLowerCase(); - - if (typeName.equals("float")) { - return new ObjectColumnSelector() - { - @Override - public Class classOfObject() + return new TimestampColumnSelector() { - return Float.TYPE; + @Override + public long getTimestamp() + { + return in.getTimestampFromEpoch(); + } + }; + } + + @Override + public FloatColumnSelector makeFloatColumnSelector(String columnName) + { + final String metricName = columnName.toLowerCase(); + return new FloatColumnSelector() + { + @Override + public float get() + { + return in.getFloatMetric(metricName); + } + }; + } + + @Override + public ObjectColumnSelector makeObjectColumnSelector(String column) + { + final String typeName = agg.getTypeName(); + final String columnName = column.toLowerCase(); + + if (typeName.equals("float")) { + return new ObjectColumnSelector() + { + @Override + public Class classOfObject() + { + return Float.TYPE; + } + + @Override + public Float get() + { + return in.getFloatMetric(columnName); + } + }; } - @Override - public Float get() - { - return in.getFloatMetric(columnName); + final ComplexMetricSerde serde = ComplexMetrics.getSerdeForType(typeName); + + if (serde == null) { + throw new ISE("Don't know how to handle type[%s]", typeName); } - }; - } - final ComplexMetricSerde serde = ComplexMetrics.getSerdeForType(typeName); + final ComplexMetricExtractor extractor = serde.getExtractor(); - if (serde == null) { - throw new ISE("Don't know how to handle type[%s]", typeName); - } + return new ObjectColumnSelector() + { + @Override + public Class classOfObject() + { + return extractor.extractedClass(); + } - final ComplexMetricExtractor extractor = serde.getExtractor(); - - return new ObjectColumnSelector() - { - @Override - public Class classOfObject() - { - return extractor.extractedClass(); + @Override + public Object get() + { + return extractor.extractValue(in, columnName); + } + }; } @Override - public Object get() + public DimensionSelector makeDimensionSelector(String dimension) { - return extractor.extractValue(in, columnName); + // we should implement this, but this is going to be rewritten soon anyways + throw new UnsupportedOperationException( + "Incremental index aggregation does not support dimension selectors" + ); } - }; - } + } - @Override - public DimensionSelector makeDimensionSelector(String dimension) { - // we should implement this, but this is going to be rewritten soon anyways - throw new UnsupportedOperationException("Incremental index aggregation does not support dimension selectors"); - } - } - ); + ); } - facts.put(key, aggs); - ++numEntries; + Aggregator[] prev = facts.putIfAbsent(key, aggs); + if (prev != null) { + aggs = prev; + } + numEntries.incrementAndGet(); } - for (Aggregator agg : aggs) { - agg.aggregate(); + synchronized (this) { + in = row; + for (Aggregator agg : aggs) { + agg.aggregate(); + } + in = null; } - in = null; - return numEntries; + return numEntries.get(); } public boolean isEmpty() { - return numEntries == 0; + return numEntries.get() == 0; } public int size() { - return numEntries; + return numEntries.get(); } public long getMinTimeMillis() @@ -585,7 +599,6 @@ public class IncrementalIndex implements Iterable private final Map poorMansInterning = Maps.newConcurrentMap(); private final Map falseIds; private final Map falseIdsReverse; - private volatile String[] sortedVals = null; public DimDim() diff --git a/publications/whitepaper/druid.bib b/publications/whitepaper/druid.bib index 0a2b961eca6..22a9f24d427 100644 --- a/publications/whitepaper/druid.bib +++ b/publications/whitepaper/druid.bib @@ -366,3 +366,55 @@ year = {2013}, howpublished = "\url{http://www.elasticseach.com/}" } + +@book{oehler2012ibm, + title={IBM Cognos TM1: The Official Guide}, + author={Oehler, Karsten and Gruenes, Jochen and Ilacqua, Christopher and Perez, Manuel}, + year={2012}, + publisher={McGraw-Hill} +} + +@book{schrader2009oracle, + title={Oracle Essbase \& Oracle OLAP}, + author={Schrader, Michael and Vlamis, Dan and Nader, Mike and Claterbos, Chris and Collins, Dave and Campbell, Mitch and Conrad, Floyd}, + year={2009}, + publisher={McGraw-Hill, Inc.} +} + +@book{lachev2005applied, + title={Applied Microsoft Analysis Services 2005: And Microsoft Business Intelligence Platform}, + author={Lachev, Teo}, + year={2005}, + publisher={Prologika Press} +} + +@article{o1996log, + title={The log-structured merge-tree (LSM-tree)}, + author={O’Neil, Patrick and Cheng, Edward and Gawlick, Dieter and O’Neil, Elizabeth}, + journal={Acta Informatica}, + volume={33}, + number={4}, + pages={351--385}, + year={1996}, + publisher={Springer} +} + +@inproceedings{o1997improved, + title={Improved query performance with variant indexes}, + author={O'Neil, Patrick and Quass, Dallan}, + booktitle={ACM Sigmod Record}, + volume={26}, + number={2}, + pages={38--49}, + year={1997}, + organization={ACM} +} + +@inproceedings{cipar2012lazybase, + title={LazyBase: trading freshness for performance in a scalable database}, + author={Cipar, James and Ganger, Greg and Keeton, Kimberly and Morrey III, Charles B and Soules, Craig AN and Veitch, Alistair}, + booktitle={Proceedings of the 7th ACM european conference on Computer Systems}, + pages={169--182}, + year={2012}, + organization={ACM} +} diff --git a/publications/whitepaper/druid.pdf b/publications/whitepaper/druid.pdf index d5388e7e491..b8f30273680 100644 Binary files a/publications/whitepaper/druid.pdf and b/publications/whitepaper/druid.pdf differ diff --git a/publications/whitepaper/druid.tex b/publications/whitepaper/druid.tex index 5966037e68c..9338f51bfc6 100644 --- a/publications/whitepaper/druid.tex +++ b/publications/whitepaper/druid.tex @@ -76,12 +76,13 @@ could be fully leveraged for our requirements. We ended up creating Druid, an open-source, distributed, column-oriented, realtime analytical data store. In many ways, Druid shares similarities with -other interactive query systems \cite{melnik2010dremel}, main-memory databases -\cite{farber2012sap}, and widely-known distributed data stores such as BigTable -\cite{chang2008bigtable}, Dynamo \cite{decandia2007dynamo}, and Cassandra -\cite{lakshman2010cassandra}. The distribution and query model also -borrow ideas from current generation search infrastructure -\cite{linkedin2013senseidb, apache2013solr, banon2013elasticsearch}. +other OLAP systems \cite{oehler2012ibm, schrader2009oracle, lachev2005applied}, +interactive query systems \cite{melnik2010dremel}, main-memory databases +\cite{farber2012sap}, and widely-known distributed data stores +\cite{chang2008bigtable, decandia2007dynamo, lakshman2010cassandra}. The +distribution and query model also borrow ideas from current generation search +infrastructure \cite{linkedin2013senseidb, apache2013solr, +banon2013elasticsearch}. This paper describes the architecture of Druid, explores the various design decisions made in creating an always-on production system that powers a hosted @@ -202,13 +203,14 @@ Zookeeper. Real-time nodes maintain an in-memory index buffer for all incoming events. These indexes are incrementally populated as new events are ingested and the indexes are also directly queryable. Druid virtually behaves as a row store -for queries on events that exist in this JVM heap-based buffer. To avoid heap overflow -problems, real-time nodes persist their in-memory indexes to disk either -periodically or after some maximum row limit is reached. This persist process -converts data stored in the in-memory buffer to a column oriented storage -format described in Section \ref{sec:storage-format}. Each persisted index is immutable and -real-time nodes load persisted indexes into off-heap memory such that they can -still be queried. Figure~\ref{fig:realtime_flow} illustrates the process. +for queries on events that exist in this JVM heap-based buffer. To avoid heap +overflow problems, real-time nodes persist their in-memory indexes to disk +either periodically or after some maximum row limit is reached. This persist +process converts data stored in the in-memory buffer to a column oriented +storage format described in Section \ref{sec:storage-format}. Each persisted +index is immutable and real-time nodes load persisted indexes into off-heap +memory such that they can still be queried. This process is described in detail +in \cite{o1996log} and is illustrated in Figure~\ref{fig:realtime_flow}. \begin{figure} \centering @@ -602,20 +604,22 @@ the two arrays. \end{figure} This approach of performing Boolean operations on large bitmap sets is commonly -used in search engines. Bitmap compression algorithms are a well-defined area -of research and often utilize run-length encoding. Popular algorithms include -Byte-aligned Bitmap Code \cite{antoshenkov1995byte}, Word-Aligned Hybrid (WAH) -code \cite{wu2006optimizing}, and Partitioned Word-Aligned Hybrid (PWAH) -compression \cite{van2011memory}. Druid opted to use the Concise algorithm -\cite{colantonio2010concise} as it can outperform WAH by reducing the size of -the compressed bitmaps by up to 50\%. Figure~\ref{fig:concise_plot} -illustrates the number of bytes using Concise compression versus using an -integer array. The results were generated on a cc2.8xlarge system with a single -thread, 2G heap, 512m young gen, and a forced GC between each run. The data set -is a single day’s worth of data collected from the Twitter garden hose -\cite{twitter2013} data stream. The data set contains 2,272,295 rows and 12 -dimensions of varying cardinality. As an additional comparison, we also -resorted the data set rows to maximize compression. +used in search engines. Bitmap indices for OLAP workloads is described in +detail in \cite{o1997improved}. Bitmap compression algorithms are a +well-defined area of research and often utilize run-length encoding. Popular +algorithms include Byte-aligned Bitmap Code \cite{antoshenkov1995byte}, +Word-Aligned Hybrid (WAH) code \cite{wu2006optimizing}, and Partitioned +Word-Aligned Hybrid (PWAH) compression \cite{van2011memory}. Druid opted to use +the Concise algorithm \cite{colantonio2010concise} as it can outperform WAH by +reducing the size of the compressed bitmaps by up to 50\%. +Figure~\ref{fig:concise_plot} illustrates the number of bytes using Concise +compression versus using an integer array. The results were generated on a +cc2.8xlarge system with a single thread, 2G heap, 512m young gen, and a forced +GC between each run. The data set is a single day’s worth of data collected +from the Twitter garden hose \cite{twitter2013} data stream. The data set +contains 2,272,295 rows and 12 dimensions of varying cardinality. As an +additional comparison, we also resorted the data set rows to maximize +compression. In the unsorted case, the total Concise size was 53,451,144 bytes and the total integer array size was 127,248,520 bytes. Overall, Concise compressed sets are @@ -887,7 +891,7 @@ algorithms mentioned in PowerDrill. Although Druid builds on many of the same principles as other distributed columnar data stores \cite{fink2012distributed}, many of these data stores are -designed to be more generic key-value stores \cite{stonebraker2005c} and do not +designed to be more generic key-value stores \cite{lakshman2010cassandra} and do not support computation directly in the storage layer. There are also other data stores designed for some of the same of the data warehousing issues that Druid is meant to solve. These systems include include in-memory databases such as @@ -896,6 +900,13 @@ stores lack Druid's low latency ingestion characteristics. Druid also has native analytical features baked in, similar to \cite{paraccel2013}, however, Druid allows system wide rolling software updates with no downtime. +Druid is similiar to \cite{stonebraker2005c, cipar2012lazybase} in that it has +two subsystems, a read-optimized subsystem in the historical nodes and a +write-optimized subsystem in real-time nodes. Real-time nodes are designed to +ingest a high volume of append heavy data, and do not support data updates. +Unlike the two aforementioned systems, Druid is meant for OLAP transactions and +not OLTP transactions. + Druid's low latency data ingestion features share some similarities with Trident/Storm \cite{marz2013storm} and Streaming Spark \cite{zaharia2012discretized}, however, both systems are focused on stream @@ -916,7 +927,53 @@ of functionality as Druid, some of Druid’s optimization techniques such as usi inverted indices to perform fast filters are also used in other data stores \cite{macnicol2004sybase}. -\section{Conclusions} +\section{Druid in Production} +Druid is run in production at several organizations and is often part of a more +sophisticated data analytics stack. We've made multiple design decisions to +allow for ease of usability, deployment, and monitoring. + +\subsection{Operational Monitoring} +Each Druid node is designed to periodically emit a set of operational metrics. +These metrics may include system level data such as CPU usage, available +memory, and disk capacity, JVM statistics such as garbage collection time, and +heap usage, or node specific metrics such as segment scan time, cache +hit rates, and data ingestion latencies. For each query, Druid nodes can also +emit metrics about the details of the query such as the number of filters +applied, or the interval of data requested. + +Metrics can be emitted from a production Druid cluster into a dedicated metrics +Druid cluster. Queries can be made to the metrics Druid cluster to explore +production cluster performance and stability. Leveraging a dedicated metrics +cluster has allowed us to find numerous production problems, such as gradual +query speed degregations, less than optimally tuned hardware, and various other +system bottlenecks. We also use a metrics cluster to analyze what queries are +made in production. This analysis allows us to determine what our users are +most often doing and we use this information to drive what optimizations we +should implement. + +\subsection{Pairing Druid with a Stream Processor} +As the time of writing, Druid can only understand fully denormalized data +streams. In order to provide full business logic in production, Druid can be +paired with a stream processor such as Apache Storm \cite{marz2013storm}. A +Storm topology consumes events from a data stream, retains only those that are +“on-time”, and applies any relevant business logic. This could range from +simple transformations, such as id to name lookups, up to complex operations +such as multi-stream joins. The Storm topology forwards the processed event +stream to Druid in real-time. Storm handles the streaming data processing work, +and Druid is used for responding to queries on top of both real-time and +historical data. + +\subsection{Multiple Data Center Distribution} +Large scale production outages may not only affect single nodes, but entire +data centers as well. The tier configuration in Druid coordinator nodes allow +for segments to be replicated across multiple tiers. Hence, segments can be +exactly replicated across historical nodes in multiple data centers. +Similarily, query preference can be assigned to different tiers. It is possible +to have nodes in one data center act as a primary cluster (and recieve all +queries) and have a redundant cluster in another data center. Such a setup may +be desired if one data center is situated much closer to users. + +\section{Conclusions and Future Work} \label{sec:conclusions} In this paper, we presented Druid, a distributed, column-oriented, real-time analytical data store. Druid is designed to power high performance applications diff --git a/server/src/main/java/io/druid/client/cache/BytesBoundedLinkedQueue.java b/server/src/main/java/io/druid/client/cache/BytesBoundedLinkedQueue.java new file mode 100644 index 00000000000..84c5f83d6a2 --- /dev/null +++ b/server/src/main/java/io/druid/client/cache/BytesBoundedLinkedQueue.java @@ -0,0 +1,383 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2014 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.client.cache; + +import java.util.AbstractQueue; +import java.util.Collection; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +/** + * Abstract implementation of a BlockingQueue bounded by the size of elements, + * works similar to LinkedBlockingQueue except that capacity is bounded by the size in bytes of the elements in the queue. + */ +public abstract class BytesBoundedLinkedQueue extends AbstractQueue implements BlockingQueue +{ + private final LinkedList delegate; + private final AtomicLong currentSize = new AtomicLong(0); + private final Lock putLock = new ReentrantLock(); + private final Condition notFull = putLock.newCondition(); + private final Lock takeLock = new ReentrantLock(); + private final Condition notEmpty = takeLock.newCondition(); + private long capacity; + + public BytesBoundedLinkedQueue(long capacity) + { + delegate = new LinkedList<>(); + this.capacity = capacity; + } + + private static void checkNotNull(Object v) + { + if (v == null) { + throw new NullPointerException(); + } + } + + private void checkSize(E e) + { + if (getBytesSize(e) > capacity) { + throw new IllegalArgumentException( + String.format("cannot add element of size[%d] greater than capacity[%d]", getBytesSize(e), capacity) + ); + } + } + + public abstract long getBytesSize(E e); + + public void elementAdded(E e) + { + currentSize.addAndGet(getBytesSize(e)); + } + + public void elementRemoved(E e) + { + currentSize.addAndGet(-1 * getBytesSize(e)); + } + + private void fullyUnlock() + { + takeLock.unlock(); + putLock.unlock(); + } + + private void fullyLock() + { + takeLock.lock(); + putLock.lock(); + } + + private void signalNotEmpty() + { + takeLock.lock(); + try { + notEmpty.signal(); + } + finally { + takeLock.unlock(); + } + } + + private void signalNotFull() + { + putLock.lock(); + try { + notFull.signal(); + } + finally { + putLock.unlock(); + } + } + + @Override + public int size() + { + return delegate.size(); + } + + @Override + public void put(E e) throws InterruptedException + { + while (!offer(e, Long.MAX_VALUE, TimeUnit.NANOSECONDS)) { + // keep trying until added successfully + } + } + + @Override + public boolean offer( + E e, long timeout, TimeUnit unit + ) throws InterruptedException + { + checkNotNull(e); + checkSize(e); + long nanos = unit.toNanos(timeout); + boolean added = false; + putLock.lockInterruptibly(); + try { + while (currentSize.get() + getBytesSize(e) > capacity) { + if (nanos <= 0) { + return false; + } + nanos = notFull.awaitNanos(nanos); + } + delegate.add(e); + elementAdded(e); + added = true; + } + finally { + putLock.unlock(); + } + if (added) { + signalNotEmpty(); + } + return added; + + } + + @Override + public E take() throws InterruptedException + { + E e; + takeLock.lockInterruptibly(); + try { + while (delegate.size() == 0) { + notEmpty.await(); + } + e = delegate.remove(); + elementRemoved(e); + } + finally { + takeLock.unlock(); + } + if (e != null) { + signalNotFull(); + } + return e; + } + + @Override + public int remainingCapacity() + { + int delegateSize = delegate.size(); + long currentByteSize = currentSize.get(); + // return approximate remaining capacity based on current data + if (delegateSize == 0) { + return (int) Math.min(capacity, Integer.MAX_VALUE); + } else if (capacity > currentByteSize) { + long averageElementSize = currentByteSize / delegateSize; + return (int) ((capacity - currentByteSize) / averageElementSize); + } else { + // queue full + return 0; + } + + } + + @Override + public int drainTo(Collection c) + { + return drainTo(c, Integer.MAX_VALUE); + } + + @Override + public int drainTo(Collection c, int maxElements) + { + if (c == null) { + throw new NullPointerException(); + } + if (c == this) { + throw new IllegalArgumentException(); + } + int n = 0; + takeLock.lock(); + try { + n = Math.min(maxElements, delegate.size()); + if (n < 0) { + return 0; + } + // count.get provides visibility to first n Nodes + for (int i = 0; i < n; i++) { + E e = delegate.remove(0); + elementRemoved(e); + c.add(e); + } + } + finally { + takeLock.unlock(); + } + if (n > 0) { + signalNotFull(); + } + return n; + } + + @Override + public boolean offer(E e) + { + checkNotNull(e); + checkSize(e); + boolean added = false; + putLock.lock(); + try { + if (currentSize.get() + getBytesSize(e) > capacity) { + return false; + } else { + added = delegate.add(e); + if (added) { + elementAdded(e); + } + } + } + finally { + putLock.unlock(); + } + if (added) { + signalNotEmpty(); + } + return added; + } + + @Override + public E poll() + { + E e = null; + takeLock.lock(); + try { + e = delegate.poll(); + if (e != null) { + elementRemoved(e); + } + } + finally { + takeLock.unlock(); + } + if (e != null) { + signalNotFull(); + } + return e; + } + + @Override + public E poll(long timeout, TimeUnit unit) throws InterruptedException + { + long nanos = unit.toNanos(timeout); + E e = null; + takeLock.lockInterruptibly(); + try { + while (delegate.size() == 0) { + if (nanos <= 0) { + return null; + } + nanos = notEmpty.awaitNanos(nanos); + } + e = delegate.poll(); + if (e != null) { + elementRemoved(e); + } + } + finally { + takeLock.unlock(); + } + if (e != null) { + signalNotFull(); + } + return e; + + } + + @Override + public E peek() + { + takeLock.lock(); + try { + return delegate.peek(); + } + finally { + takeLock.unlock(); + } + } + + @Override + public Iterator iterator() + { + return new Itr(delegate.iterator()); + } + + private class Itr implements Iterator + { + + private final Iterator delegate; + private E lastReturned; + + Itr(Iterator delegate) + { + this.delegate = delegate; + } + + @Override + public boolean hasNext() + { + fullyLock(); + try { + return delegate.hasNext(); + } + finally { + fullyUnlock(); + } + } + + @Override + public E next() + { + fullyLock(); + try { + this.lastReturned = delegate.next(); + return lastReturned; + } + finally { + fullyUnlock(); + } + } + + @Override + public void remove() + { + fullyLock(); + try { + if (this.lastReturned == null) { + throw new IllegalStateException(); + } + delegate.remove(); + elementRemoved(lastReturned); + signalNotFull(); + lastReturned = null; + } + finally { + fullyUnlock(); + } + } + } +} diff --git a/server/src/main/java/io/druid/client/cache/MemcachedCache.java b/server/src/main/java/io/druid/client/cache/MemcachedCache.java index 155c75e3f86..abbfb54139c 100644 --- a/server/src/main/java/io/druid/client/cache/MemcachedCache.java +++ b/server/src/main/java/io/druid/client/cache/MemcachedCache.java @@ -56,7 +56,7 @@ public class MemcachedCache implements Cache // always use compression transcoder.setCompressionThreshold(0); - + MemcachedOperationQueueFactory queueFactory = new MemcachedOperationQueueFactory(config.getMaxOperationQueueSize()); return new MemcachedCache( new MemcachedClient( new ConnectionFactoryBuilder().setProtocol(ConnectionFactoryBuilder.Protocol.BINARY) @@ -68,6 +68,7 @@ public class MemcachedCache implements Cache .setShouldOptimize(true) .setOpQueueMaxBlockTime(config.getTimeout()) .setOpTimeout(config.getTimeout()) + .setOpQueueFactory(queueFactory) .build(), AddrUtil.getAddresses(config.getHosts()) ), diff --git a/server/src/main/java/io/druid/client/cache/MemcachedCacheConfig.java b/server/src/main/java/io/druid/client/cache/MemcachedCacheConfig.java index 2cc06cf3637..2d8674cdd24 100644 --- a/server/src/main/java/io/druid/client/cache/MemcachedCacheConfig.java +++ b/server/src/main/java/io/druid/client/cache/MemcachedCacheConfig.java @@ -27,19 +27,17 @@ public class MemcachedCacheConfig { @JsonProperty private int expiration = 2592000; // What is this number? - @JsonProperty private int timeout = 500; - @JsonProperty @NotNull private String hosts; - @JsonProperty private int maxObjectSize = 50 * 1024 * 1024; - @JsonProperty private String memcachedPrefix = "druid"; + @JsonProperty + private long maxOperationQueueSize = 256 * 1024 * 1024L; // 256 MB public int getExpiration() { @@ -65,4 +63,9 @@ public class MemcachedCacheConfig { return memcachedPrefix; } + + public long getMaxOperationQueueSize() + { + return maxOperationQueueSize; + } } diff --git a/server/src/main/java/io/druid/client/cache/MemcachedOperationQueueFactory.java b/server/src/main/java/io/druid/client/cache/MemcachedOperationQueueFactory.java new file mode 100644 index 00000000000..8118d869731 --- /dev/null +++ b/server/src/main/java/io/druid/client/cache/MemcachedOperationQueueFactory.java @@ -0,0 +1,48 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2014 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.client.cache; + +import net.spy.memcached.ops.Operation; +import net.spy.memcached.ops.OperationQueueFactory; + +import java.util.concurrent.BlockingQueue; + +public class MemcachedOperationQueueFactory implements OperationQueueFactory +{ + public final long maxQueueSize; + + public MemcachedOperationQueueFactory(long maxQueueSize) + { + this.maxQueueSize = maxQueueSize; + } + + @Override + public BlockingQueue create() + { + return new BytesBoundedLinkedQueue(maxQueueSize) + { + @Override + public long getBytesSize(Operation operation) + { + return operation.getBuffer().remaining(); + } + }; + } +} diff --git a/server/src/test/java/io/druid/client/cache/BytesBoundedLinkedQueueTest.java b/server/src/test/java/io/druid/client/cache/BytesBoundedLinkedQueueTest.java new file mode 100644 index 00000000000..67a863ff8a1 --- /dev/null +++ b/server/src/test/java/io/druid/client/cache/BytesBoundedLinkedQueueTest.java @@ -0,0 +1,195 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.client.cache; + + +import com.metamx.common.ISE; +import org.junit.Assert; +import org.junit.Test; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + + +public class BytesBoundedLinkedQueueTest +{ + private static int delayMS = 50; + private ExecutorService exec = Executors.newCachedThreadPool(); + + private static BlockingQueue getQueue(final int capacity) + { + return new BytesBoundedLinkedQueue(capacity) + { + @Override + public long getBytesSize(TestObject o) + { + return o.getSize(); + } + }; + } + + @Test + public void testPoll() throws InterruptedException + { + final BlockingQueue q = getQueue(10); + long startTime = System.nanoTime(); + Assert.assertNull(q.poll(delayMS, TimeUnit.MILLISECONDS)); + Assert.assertTrue(TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime) >= delayMS); + TestObject obj = new TestObject(2); + Assert.assertTrue(q.offer(obj, delayMS, TimeUnit.MILLISECONDS)); + Assert.assertSame(obj, q.poll(delayMS, TimeUnit.MILLISECONDS)); + + Thread.currentThread().interrupt(); + try { + q.poll(delayMS, TimeUnit.MILLISECONDS); + throw new ISE("FAIL"); + } + catch (InterruptedException success) { + } + Assert.assertFalse(Thread.interrupted()); + } + + @Test + public void testTake() throws Exception + { + final BlockingQueue q = getQueue(10); + Thread.currentThread().interrupt(); + try { + q.take(); + Assert.fail(); + } + catch (InterruptedException success) { + // + } + final CountDownLatch latch = new CountDownLatch(1); + final TestObject object = new TestObject(4); + Future future = exec.submit( + new Callable() + { + @Override + public TestObject call() throws Exception + { + latch.countDown(); + return q.take(); + + } + } + ); + latch.await(); + // test take blocks on empty queue + try { + future.get(delayMS, TimeUnit.MILLISECONDS); + Assert.fail(); + } + catch (TimeoutException success) { + + } + + q.offer(object); + Assert.assertEquals(object, future.get()); + } + + @Test + public void testOfferAndPut() throws Exception + { + final BlockingQueue q = getQueue(10); + try { + q.offer(null); + Assert.fail(); + } + catch (NullPointerException success) { + + } + + final TestObject obj = new TestObject(2); + while (q.remainingCapacity() > 0) { + Assert.assertTrue(q.offer(obj, delayMS, TimeUnit.MILLISECONDS)); + } + // queue full + Assert.assertEquals(0, q.remainingCapacity()); + Assert.assertFalse(q.offer(obj, delayMS, TimeUnit.MILLISECONDS)); + Assert.assertFalse(q.offer(obj)); + final CyclicBarrier barrier = new CyclicBarrier(2); + + Future future = exec.submit( + new Callable() + { + @Override + public Boolean call() throws Exception + { + barrier.await(); + Assert.assertTrue(q.offer(obj, delayMS, TimeUnit.MILLISECONDS)); + Assert.assertEquals(q.remainingCapacity(), 0); + barrier.await(); + q.put(obj); + return true; + } + } + ); + barrier.await(); + q.take(); + barrier.await(); + q.take(); + Assert.assertTrue(future.get()); + + } + + @Test + public void testAddBiggerElementThanCapacityFails() + { + BlockingQueue q = getQueue(5); + try { + q.offer(new TestObject(10)); + Assert.fail(); + } + catch (IllegalArgumentException success) { + + } + } + + @Test public void testAddedObjectExceedsCapacity() throws Exception { + BlockingQueue q = getQueue(4); + Assert.assertTrue(q.offer(new TestObject(3))); + Assert.assertFalse(q.offer(new TestObject(2))); + Assert.assertFalse(q.offer(new TestObject(2),delayMS, TimeUnit.MILLISECONDS)); + } + + public static class TestObject + { + public final int size; + + TestObject(int size) + { + this.size = size; + } + + public int getSize() + { + return size; + } + } +} diff --git a/services/src/main/java/io/druid/cli/CliRealtime.java b/services/src/main/java/io/druid/cli/CliRealtime.java index 0af21c39594..85d74f2fb79 100644 --- a/services/src/main/java/io/druid/cli/CliRealtime.java +++ b/services/src/main/java/io/druid/cli/CliRealtime.java @@ -34,7 +34,7 @@ import java.util.List; ) public class CliRealtime extends ServerRunnable { - private static final Logger log = new Logger(CliBroker.class); + private static final Logger log = new Logger(CliRealtime.class); public CliRealtime() {