Merge branch 'master' into igalDruid

This commit is contained in:
Igal Levy 2014-02-20 09:54:04 -08:00
commit 0e3224bd1e
14 changed files with 1129 additions and 193 deletions

View File

@ -0,0 +1,137 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2014 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.query;
import com.google.common.base.Function;
import com.google.common.base.Predicates;
import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
import com.metamx.common.Pair;
import com.metamx.common.guava.Accumulator;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import com.metamx.common.logger.Logger;
import io.druid.data.input.Row;
import io.druid.query.groupby.GroupByQuery;
import io.druid.query.groupby.GroupByQueryConfig;
import io.druid.query.groupby.GroupByQueryHelper;
import io.druid.segment.incremental.IncrementalIndex;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
public class GroupByParallelQueryRunner implements QueryRunner<Row>
{
private static final Logger log = new Logger(GroupByParallelQueryRunner.class);
private final Iterable<QueryRunner<Row>> queryables;
private final ExecutorService exec;
private final Ordering<Row> ordering;
private final Supplier<GroupByQueryConfig> configSupplier;
public GroupByParallelQueryRunner(
ExecutorService exec,
Ordering<Row> ordering,
Supplier<GroupByQueryConfig> configSupplier,
QueryRunner<Row>... queryables
)
{
this(exec, ordering, configSupplier, Arrays.asList(queryables));
}
public GroupByParallelQueryRunner(
ExecutorService exec,
Ordering<Row> ordering, Supplier<GroupByQueryConfig> configSupplier,
Iterable<QueryRunner<Row>> queryables
)
{
this.exec = exec;
this.ordering = ordering;
this.queryables = Iterables.unmodifiableIterable(Iterables.filter(queryables, Predicates.notNull()));
this.configSupplier = configSupplier;
}
@Override
public Sequence<Row> run(final Query<Row> queryParam)
{
final GroupByQuery query = (GroupByQuery) queryParam;
final Pair<IncrementalIndex, Accumulator<IncrementalIndex, Row>> indexAccumulatorPair = GroupByQueryHelper.createIndexAccumulatorPair(
query,
configSupplier.get()
);
final int priority = Integer.parseInt(query.getContextValue("priority", "0"));
if (Iterables.isEmpty(queryables)) {
log.warn("No queryables found.");
}
List<Future<Boolean>> futures = Lists.newArrayList(
Iterables.transform(
queryables,
new Function<QueryRunner<Row>, Future<Boolean>>()
{
@Override
public Future<Boolean> apply(final QueryRunner<Row> input)
{
return exec.submit(
new PrioritizedCallable<Boolean>(priority)
{
@Override
public Boolean call() throws Exception
{
try {
input.run(queryParam).accumulate(indexAccumulatorPair.lhs, indexAccumulatorPair.rhs);
return true;
}
catch (Exception e) {
log.error(e, "Exception with one of the sequences!");
throw Throwables.propagate(e);
}
}
}
);
}
}
)
);
// Let the runners complete
for (Future<Boolean> future : futures) {
try {
future.get();
}
catch (InterruptedException e) {
throw new RuntimeException(e);
}
catch (ExecutionException e) {
throw new RuntimeException(e);
}
}
return Sequences.simple(indexAccumulatorPair.lhs.iterableWithPostAggregations(null));
}
}

View File

@ -0,0 +1,99 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2014 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.query.groupby;
import com.google.common.base.Function;
import com.google.common.collect.Lists;
import com.metamx.common.ISE;
import com.metamx.common.Pair;
import com.metamx.common.guava.Accumulator;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import io.druid.data.input.MapBasedRow;
import io.druid.data.input.Row;
import io.druid.data.input.Rows;
import io.druid.granularity.QueryGranularity;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.dimension.DimensionSpec;
import io.druid.segment.incremental.IncrementalIndex;
import javax.annotation.Nullable;
import java.util.List;
public class GroupByQueryHelper
{
public static Pair<IncrementalIndex, Accumulator<IncrementalIndex, Row>> createIndexAccumulatorPair(
final GroupByQuery query,
final GroupByQueryConfig config
)
{
final QueryGranularity gran = query.getGranularity();
final long timeStart = query.getIntervals().get(0).getStartMillis();
// use gran.iterable instead of gran.truncate so that
// AllGranularity returns timeStart instead of Long.MIN_VALUE
final long granTimeStart = gran.iterable(timeStart, timeStart + 1).iterator().next();
final List<AggregatorFactory> aggs = Lists.transform(
query.getAggregatorSpecs(),
new Function<AggregatorFactory, AggregatorFactory>()
{
@Override
public AggregatorFactory apply(@Nullable AggregatorFactory input)
{
return input.getCombiningFactory();
}
}
);
final List<String> dimensions = Lists.transform(
query.getDimensions(),
new Function<DimensionSpec, String>()
{
@Override
public String apply(@Nullable DimensionSpec input)
{
return input.getOutputName();
}
}
);
IncrementalIndex index = new IncrementalIndex(
// use granularity truncated min timestamp
// since incoming truncated timestamps may precede timeStart
granTimeStart,
gran,
aggs.toArray(new AggregatorFactory[aggs.size()])
);
Accumulator<IncrementalIndex, Row> accumulator = new Accumulator<IncrementalIndex, Row>()
{
@Override
public IncrementalIndex accumulate(IncrementalIndex accumulated, Row in)
{
if (accumulated.add(Rows.toCaseInsensitiveInputRow(in, dimensions)) > config.getMaxResults()) {
throw new ISE("Computation exceeds maxRows limit[%s]", config.getMaxResults());
}
return accumulated;
}
};
return new Pair<IncrementalIndex, Accumulator<IncrementalIndex, Row>>(index, accumulator);
}
}

View File

@ -24,10 +24,9 @@ import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.inject.Inject;
import com.metamx.common.ISE;
import com.metamx.common.Pair;
import com.metamx.common.guava.Accumulator;
import com.metamx.common.guava.ConcatSequence;
import com.metamx.common.guava.Sequence;
@ -35,21 +34,16 @@ import com.metamx.common.guava.Sequences;
import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.data.input.MapBasedRow;
import io.druid.data.input.Row;
import io.druid.data.input.Rows;
import io.druid.granularity.QueryGranularity;
import io.druid.query.IntervalChunkingQueryRunner;
import io.druid.query.Query;
import io.druid.query.QueryRunner;
import io.druid.query.QueryToolChest;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.MetricManipulationFn;
import io.druid.query.dimension.DimensionSpec;
import io.druid.segment.incremental.IncrementalIndex;
import org.joda.time.Interval;
import org.joda.time.Minutes;
import javax.annotation.Nullable;
import java.util.List;
import java.util.Map;
/**
@ -61,7 +55,6 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
};
private static final String GROUP_BY_MERGE_KEY = "groupByMerge";
private static final Map<String, String> NO_MERGE_CONTEXT = ImmutableMap.of(GROUP_BY_MERGE_KEY, "false");
private final Supplier<GroupByQueryConfig> configSupplier;
@Inject
@ -92,60 +85,13 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
private Sequence<Row> mergeGroupByResults(final GroupByQuery query, QueryRunner<Row> runner)
{
final GroupByQueryConfig config = configSupplier.get();
final QueryGranularity gran = query.getGranularity();
final long timeStart = query.getIntervals().get(0).getStartMillis();
// use gran.iterable instead of gran.truncate so that
// AllGranularity returns timeStart instead of Long.MIN_VALUE
final long granTimeStart = gran.iterable(timeStart, timeStart + 1).iterator().next();
final List<AggregatorFactory> aggs = Lists.transform(
query.getAggregatorSpecs(),
new Function<AggregatorFactory, AggregatorFactory>()
{
@Override
public AggregatorFactory apply(@Nullable AggregatorFactory input)
{
return input.getCombiningFactory();
}
}
);
final List<String> dimensions = Lists.transform(
query.getDimensions(),
new Function<DimensionSpec, String>()
{
@Override
public String apply(@Nullable DimensionSpec input)
{
return input.getOutputName();
}
}
Pair<IncrementalIndex, Accumulator<IncrementalIndex, Row>> indexAccumulatorPair = GroupByQueryHelper.createIndexAccumulatorPair(
query,
config
);
final IncrementalIndex index = runner.run(query).accumulate(
new IncrementalIndex(
// use granularity truncated min timestamp
// since incoming truncated timestamps may precede timeStart
granTimeStart,
gran,
aggs.toArray(new AggregatorFactory[aggs.size()])
),
new Accumulator<IncrementalIndex, Row>()
{
@Override
public IncrementalIndex accumulate(IncrementalIndex accumulated, Row in)
{
if (accumulated.add(Rows.toCaseInsensitiveInputRow(in, dimensions)) > config.getMaxResults()) {
throw new ISE("Computation exceeds maxRows limit[%s]", config.getMaxResults());
}
return accumulated;
}
}
);
// convert millis back to timestamp according to granularity to preserve time zone information
Sequence<Row> retVal = Sequences.map(
IncrementalIndex index = runner.run(query).accumulate(indexAccumulatorPair.lhs, indexAccumulatorPair.rhs);
Sequence<Row> sequence = Sequences.map(
Sequences.simple(index.iterableWithPostAggregations(query.getPostAggregatorSpecs())),
new Function<Row, Row>()
{
@ -153,12 +99,15 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
public Row apply(Row input)
{
final MapBasedRow row = (MapBasedRow) input;
return new MapBasedRow(gran.toDateTime(row.getTimestampFromEpoch()), row.getEvent());
return new MapBasedRow(
query.getGranularity()
.toDateTime(row.getTimestampFromEpoch()),
row.getEvent()
);
}
}
);
return query.applyLimit(retVal);
return query.applyLimit(sequence);
}
@Override

View File

@ -30,8 +30,8 @@ import com.metamx.common.guava.ExecutorExecutingSequence;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import io.druid.data.input.Row;
import io.druid.query.ChainedExecutionQueryRunner;
import io.druid.query.ConcatQueryRunner;
import io.druid.query.GroupByParallelQueryRunner;
import io.druid.query.Query;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerFactory;
@ -116,9 +116,8 @@ public class GroupByQueryRunnerFactory implements QueryRunnerFactory<Row, GroupB
}
)
);
}
else {
return new ChainedExecutionQueryRunner<Row>(queryExecutor, new RowOrdering(), queryRunners);
} else {
return new GroupByParallelQueryRunner(queryExecutor, new RowOrdering(), config, queryRunners);
}
}
@ -142,7 +141,7 @@ public class GroupByQueryRunnerFactory implements QueryRunnerFactory<Row, GroupB
@Override
public Sequence<Row> run(Query<Row> input)
{
if (! (input instanceof GroupByQuery)) {
if (!(input instanceof GroupByQuery)) {
throw new ISE("Got a [%s] which isn't a %s", input.getClass(), GroupByQuery.class);
}

View File

@ -62,6 +62,7 @@ import java.util.Set;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger;
/**
*/
@ -69,11 +70,9 @@ public class IncrementalIndex implements Iterable<Row>
{
private static final Logger log = new Logger(IncrementalIndex.class);
private static final Joiner JOINER = Joiner.on(",");
private final long minTimestamp;
private final QueryGranularity gran;
private final AggregatorFactory[] metrics;
private final Map<String, Integer> metricIndexes;
private final Map<String, String> metricTypes;
private final ImmutableList<String> metricNames;
@ -83,10 +82,8 @@ public class IncrementalIndex implements Iterable<Row>
private final SpatialDimensionRowFormatter spatialDimensionRowFormatter;
private final DimensionHolder dimValues;
private final ConcurrentSkipListMap<TimeAndDims, Aggregator[]> facts;
private volatile int numEntries = 0;
// This is modified on add() by a (hopefully) single thread.
private volatile AtomicInteger numEntries = new AtomicInteger();
// This is modified on add() in a critical section.
private InputRow in;
public IncrementalIndex(IncrementalIndexSchema incrementalIndexSchema)
@ -162,15 +159,22 @@ public class IncrementalIndex implements Iterable<Row>
dimension = dimension.toLowerCase();
List<String> dimensionValues = row.getDimension(dimension);
final Integer index = dimensionOrder.get(dimension);
Integer index = dimensionOrder.get(dimension);
if (index == null) {
dimensionOrder.put(dimension, dimensionOrder.size());
dimensions.add(dimension);
synchronized (dimensionOrder) {
index = dimensionOrder.get(dimension);
if (index == null) {
dimensionOrder.put(dimension, dimensionOrder.size());
dimensions.add(dimension);
if (overflow == null) {
overflow = Lists.newArrayList();
if (overflow == null) {
overflow = Lists.newArrayList();
}
overflow.add(getDimVals(dimValues.add(dimension), dimensionValues));
} else {
dims[index] = getDimVals(dimValues.get(dimension), dimensionValues);
}
}
overflow.add(getDimVals(dimValues.add(dimension), dimensionValues));
} else {
dims[index] = getDimVals(dimValues.get(dimension), dimensionValues);
}
@ -188,118 +192,128 @@ public class IncrementalIndex implements Iterable<Row>
TimeAndDims key = new TimeAndDims(Math.max(gran.truncate(row.getTimestampFromEpoch()), minTimestamp), dims);
in = row;
Aggregator[] aggs = facts.get(key);
if (aggs == null) {
aggs = new Aggregator[metrics.length];
for (int i = 0; i < metrics.length; ++i) {
final AggregatorFactory agg = metrics[i];
aggs[i] = agg.factorize(
new ColumnSelectorFactory()
{
@Override
public TimestampColumnSelector makeTimestampColumnSelector()
{
return new TimestampColumnSelector()
aggs[i] =
agg.factorize(
new ColumnSelectorFactory()
{
@Override
public long getTimestamp()
public TimestampColumnSelector makeTimestampColumnSelector()
{
return in.getTimestampFromEpoch();
}
};
}
@Override
public FloatColumnSelector makeFloatColumnSelector(String columnName)
{
final String metricName = columnName.toLowerCase();
return new FloatColumnSelector()
{
@Override
public float get()
{
return in.getFloatMetric(metricName);
}
};
}
@Override
public ObjectColumnSelector makeObjectColumnSelector(String column)
{
final String typeName = agg.getTypeName();
final String columnName = column.toLowerCase();
if (typeName.equals("float")) {
return new ObjectColumnSelector<Float>()
{
@Override
public Class classOfObject()
return new TimestampColumnSelector()
{
return Float.TYPE;
@Override
public long getTimestamp()
{
return in.getTimestampFromEpoch();
}
};
}
@Override
public FloatColumnSelector makeFloatColumnSelector(String columnName)
{
final String metricName = columnName.toLowerCase();
return new FloatColumnSelector()
{
@Override
public float get()
{
return in.getFloatMetric(metricName);
}
};
}
@Override
public ObjectColumnSelector makeObjectColumnSelector(String column)
{
final String typeName = agg.getTypeName();
final String columnName = column.toLowerCase();
if (typeName.equals("float")) {
return new ObjectColumnSelector<Float>()
{
@Override
public Class classOfObject()
{
return Float.TYPE;
}
@Override
public Float get()
{
return in.getFloatMetric(columnName);
}
};
}
@Override
public Float get()
{
return in.getFloatMetric(columnName);
final ComplexMetricSerde serde = ComplexMetrics.getSerdeForType(typeName);
if (serde == null) {
throw new ISE("Don't know how to handle type[%s]", typeName);
}
};
}
final ComplexMetricSerde serde = ComplexMetrics.getSerdeForType(typeName);
final ComplexMetricExtractor extractor = serde.getExtractor();
if (serde == null) {
throw new ISE("Don't know how to handle type[%s]", typeName);
}
return new ObjectColumnSelector()
{
@Override
public Class classOfObject()
{
return extractor.extractedClass();
}
final ComplexMetricExtractor extractor = serde.getExtractor();
return new ObjectColumnSelector()
{
@Override
public Class classOfObject()
{
return extractor.extractedClass();
@Override
public Object get()
{
return extractor.extractValue(in, columnName);
}
};
}
@Override
public Object get()
public DimensionSelector makeDimensionSelector(String dimension)
{
return extractor.extractValue(in, columnName);
// we should implement this, but this is going to be rewritten soon anyways
throw new UnsupportedOperationException(
"Incremental index aggregation does not support dimension selectors"
);
}
};
}
}
@Override
public DimensionSelector makeDimensionSelector(String dimension) {
// we should implement this, but this is going to be rewritten soon anyways
throw new UnsupportedOperationException("Incremental index aggregation does not support dimension selectors");
}
}
);
);
}
facts.put(key, aggs);
++numEntries;
Aggregator[] prev = facts.putIfAbsent(key, aggs);
if (prev != null) {
aggs = prev;
}
numEntries.incrementAndGet();
}
for (Aggregator agg : aggs) {
agg.aggregate();
synchronized (this) {
in = row;
for (Aggregator agg : aggs) {
agg.aggregate();
}
in = null;
}
in = null;
return numEntries;
return numEntries.get();
}
public boolean isEmpty()
{
return numEntries == 0;
return numEntries.get() == 0;
}
public int size()
{
return numEntries;
return numEntries.get();
}
public long getMinTimeMillis()
@ -585,7 +599,6 @@ public class IncrementalIndex implements Iterable<Row>
private final Map<String, String> poorMansInterning = Maps.newConcurrentMap();
private final Map<String, Integer> falseIds;
private final Map<Integer, String> falseIdsReverse;
private volatile String[] sortedVals = null;
public DimDim()

View File

@ -366,3 +366,55 @@
year = {2013},
howpublished = "\url{http://www.elasticseach.com/}"
}
@book{oehler2012ibm,
title={IBM Cognos TM1: The Official Guide},
author={Oehler, Karsten and Gruenes, Jochen and Ilacqua, Christopher and Perez, Manuel},
year={2012},
publisher={McGraw-Hill}
}
@book{schrader2009oracle,
title={Oracle Essbase \& Oracle OLAP},
author={Schrader, Michael and Vlamis, Dan and Nader, Mike and Claterbos, Chris and Collins, Dave and Campbell, Mitch and Conrad, Floyd},
year={2009},
publisher={McGraw-Hill, Inc.}
}
@book{lachev2005applied,
title={Applied Microsoft Analysis Services 2005: And Microsoft Business Intelligence Platform},
author={Lachev, Teo},
year={2005},
publisher={Prologika Press}
}
@article{o1996log,
title={The log-structured merge-tree (LSM-tree)},
author={ONeil, Patrick and Cheng, Edward and Gawlick, Dieter and ONeil, Elizabeth},
journal={Acta Informatica},
volume={33},
number={4},
pages={351--385},
year={1996},
publisher={Springer}
}
@inproceedings{o1997improved,
title={Improved query performance with variant indexes},
author={O'Neil, Patrick and Quass, Dallan},
booktitle={ACM Sigmod Record},
volume={26},
number={2},
pages={38--49},
year={1997},
organization={ACM}
}
@inproceedings{cipar2012lazybase,
title={LazyBase: trading freshness for performance in a scalable database},
author={Cipar, James and Ganger, Greg and Keeton, Kimberly and Morrey III, Charles B and Soules, Craig AN and Veitch, Alistair},
booktitle={Proceedings of the 7th ACM european conference on Computer Systems},
pages={169--182},
year={2012},
organization={ACM}
}

Binary file not shown.

View File

@ -76,12 +76,13 @@ could be fully leveraged for our requirements.
We ended up creating Druid, an open-source, distributed, column-oriented,
realtime analytical data store. In many ways, Druid shares similarities with
other interactive query systems \cite{melnik2010dremel}, main-memory databases
\cite{farber2012sap}, and widely-known distributed data stores such as BigTable
\cite{chang2008bigtable}, Dynamo \cite{decandia2007dynamo}, and Cassandra
\cite{lakshman2010cassandra}. The distribution and query model also
borrow ideas from current generation search infrastructure
\cite{linkedin2013senseidb, apache2013solr, banon2013elasticsearch}.
other OLAP systems \cite{oehler2012ibm, schrader2009oracle, lachev2005applied},
interactive query systems \cite{melnik2010dremel}, main-memory databases
\cite{farber2012sap}, and widely-known distributed data stores
\cite{chang2008bigtable, decandia2007dynamo, lakshman2010cassandra}. The
distribution and query model also borrow ideas from current generation search
infrastructure \cite{linkedin2013senseidb, apache2013solr,
banon2013elasticsearch}.
This paper describes the architecture of Druid, explores the various design
decisions made in creating an always-on production system that powers a hosted
@ -202,13 +203,14 @@ Zookeeper.
Real-time nodes maintain an in-memory index buffer for all incoming events.
These indexes are incrementally populated as new events are ingested and the
indexes are also directly queryable. Druid virtually behaves as a row store
for queries on events that exist in this JVM heap-based buffer. To avoid heap overflow
problems, real-time nodes persist their in-memory indexes to disk either
periodically or after some maximum row limit is reached. This persist process
converts data stored in the in-memory buffer to a column oriented storage
format described in Section \ref{sec:storage-format}. Each persisted index is immutable and
real-time nodes load persisted indexes into off-heap memory such that they can
still be queried. Figure~\ref{fig:realtime_flow} illustrates the process.
for queries on events that exist in this JVM heap-based buffer. To avoid heap
overflow problems, real-time nodes persist their in-memory indexes to disk
either periodically or after some maximum row limit is reached. This persist
process converts data stored in the in-memory buffer to a column oriented
storage format described in Section \ref{sec:storage-format}. Each persisted
index is immutable and real-time nodes load persisted indexes into off-heap
memory such that they can still be queried. This process is described in detail
in \cite{o1996log} and is illustrated in Figure~\ref{fig:realtime_flow}.
\begin{figure}
\centering
@ -602,20 +604,22 @@ the two arrays.
\end{figure}
This approach of performing Boolean operations on large bitmap sets is commonly
used in search engines. Bitmap compression algorithms are a well-defined area
of research and often utilize run-length encoding. Popular algorithms include
Byte-aligned Bitmap Code \cite{antoshenkov1995byte}, Word-Aligned Hybrid (WAH)
code \cite{wu2006optimizing}, and Partitioned Word-Aligned Hybrid (PWAH)
compression \cite{van2011memory}. Druid opted to use the Concise algorithm
\cite{colantonio2010concise} as it can outperform WAH by reducing the size of
the compressed bitmaps by up to 50\%. Figure~\ref{fig:concise_plot}
illustrates the number of bytes using Concise compression versus using an
integer array. The results were generated on a cc2.8xlarge system with a single
thread, 2G heap, 512m young gen, and a forced GC between each run. The data set
is a single days worth of data collected from the Twitter garden hose
\cite{twitter2013} data stream. The data set contains 2,272,295 rows and 12
dimensions of varying cardinality. As an additional comparison, we also
resorted the data set rows to maximize compression.
used in search engines. Bitmap indices for OLAP workloads is described in
detail in \cite{o1997improved}. Bitmap compression algorithms are a
well-defined area of research and often utilize run-length encoding. Popular
algorithms include Byte-aligned Bitmap Code \cite{antoshenkov1995byte},
Word-Aligned Hybrid (WAH) code \cite{wu2006optimizing}, and Partitioned
Word-Aligned Hybrid (PWAH) compression \cite{van2011memory}. Druid opted to use
the Concise algorithm \cite{colantonio2010concise} as it can outperform WAH by
reducing the size of the compressed bitmaps by up to 50\%.
Figure~\ref{fig:concise_plot} illustrates the number of bytes using Concise
compression versus using an integer array. The results were generated on a
cc2.8xlarge system with a single thread, 2G heap, 512m young gen, and a forced
GC between each run. The data set is a single days worth of data collected
from the Twitter garden hose \cite{twitter2013} data stream. The data set
contains 2,272,295 rows and 12 dimensions of varying cardinality. As an
additional comparison, we also resorted the data set rows to maximize
compression.
In the unsorted case, the total Concise size was 53,451,144 bytes and the total
integer array size was 127,248,520 bytes. Overall, Concise compressed sets are
@ -887,7 +891,7 @@ algorithms mentioned in PowerDrill.
Although Druid builds on many of the same principles as other distributed
columnar data stores \cite{fink2012distributed}, many of these data stores are
designed to be more generic key-value stores \cite{stonebraker2005c} and do not
designed to be more generic key-value stores \cite{lakshman2010cassandra} and do not
support computation directly in the storage layer. There are also other data
stores designed for some of the same of the data warehousing issues that Druid
is meant to solve. These systems include include in-memory databases such as
@ -896,6 +900,13 @@ stores lack Druid's low latency ingestion characteristics. Druid also has
native analytical features baked in, similar to \cite{paraccel2013}, however,
Druid allows system wide rolling software updates with no downtime.
Druid is similiar to \cite{stonebraker2005c, cipar2012lazybase} in that it has
two subsystems, a read-optimized subsystem in the historical nodes and a
write-optimized subsystem in real-time nodes. Real-time nodes are designed to
ingest a high volume of append heavy data, and do not support data updates.
Unlike the two aforementioned systems, Druid is meant for OLAP transactions and
not OLTP transactions.
Druid's low latency data ingestion features share some similarities with
Trident/Storm \cite{marz2013storm} and Streaming Spark
\cite{zaharia2012discretized}, however, both systems are focused on stream
@ -916,7 +927,53 @@ of functionality as Druid, some of Druids optimization techniques such as usi
inverted indices to perform fast filters are also used in other data
stores \cite{macnicol2004sybase}.
\section{Conclusions}
\section{Druid in Production}
Druid is run in production at several organizations and is often part of a more
sophisticated data analytics stack. We've made multiple design decisions to
allow for ease of usability, deployment, and monitoring.
\subsection{Operational Monitoring}
Each Druid node is designed to periodically emit a set of operational metrics.
These metrics may include system level data such as CPU usage, available
memory, and disk capacity, JVM statistics such as garbage collection time, and
heap usage, or node specific metrics such as segment scan time, cache
hit rates, and data ingestion latencies. For each query, Druid nodes can also
emit metrics about the details of the query such as the number of filters
applied, or the interval of data requested.
Metrics can be emitted from a production Druid cluster into a dedicated metrics
Druid cluster. Queries can be made to the metrics Druid cluster to explore
production cluster performance and stability. Leveraging a dedicated metrics
cluster has allowed us to find numerous production problems, such as gradual
query speed degregations, less than optimally tuned hardware, and various other
system bottlenecks. We also use a metrics cluster to analyze what queries are
made in production. This analysis allows us to determine what our users are
most often doing and we use this information to drive what optimizations we
should implement.
\subsection{Pairing Druid with a Stream Processor}
As the time of writing, Druid can only understand fully denormalized data
streams. In order to provide full business logic in production, Druid can be
paired with a stream processor such as Apache Storm \cite{marz2013storm}. A
Storm topology consumes events from a data stream, retains only those that are
“on-time”, and applies any relevant business logic. This could range from
simple transformations, such as id to name lookups, up to complex operations
such as multi-stream joins. The Storm topology forwards the processed event
stream to Druid in real-time. Storm handles the streaming data processing work,
and Druid is used for responding to queries on top of both real-time and
historical data.
\subsection{Multiple Data Center Distribution}
Large scale production outages may not only affect single nodes, but entire
data centers as well. The tier configuration in Druid coordinator nodes allow
for segments to be replicated across multiple tiers. Hence, segments can be
exactly replicated across historical nodes in multiple data centers.
Similarily, query preference can be assigned to different tiers. It is possible
to have nodes in one data center act as a primary cluster (and recieve all
queries) and have a redundant cluster in another data center. Such a setup may
be desired if one data center is situated much closer to users.
\section{Conclusions and Future Work}
\label{sec:conclusions}
In this paper, we presented Druid, a distributed, column-oriented, real-time
analytical data store. Druid is designed to power high performance applications

View File

@ -0,0 +1,383 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2014 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.client.cache;
import java.util.AbstractQueue;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* Abstract implementation of a BlockingQueue bounded by the size of elements,
* works similar to LinkedBlockingQueue except that capacity is bounded by the size in bytes of the elements in the queue.
*/
public abstract class BytesBoundedLinkedQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>
{
private final LinkedList<E> delegate;
private final AtomicLong currentSize = new AtomicLong(0);
private final Lock putLock = new ReentrantLock();
private final Condition notFull = putLock.newCondition();
private final Lock takeLock = new ReentrantLock();
private final Condition notEmpty = takeLock.newCondition();
private long capacity;
public BytesBoundedLinkedQueue(long capacity)
{
delegate = new LinkedList<>();
this.capacity = capacity;
}
private static void checkNotNull(Object v)
{
if (v == null) {
throw new NullPointerException();
}
}
private void checkSize(E e)
{
if (getBytesSize(e) > capacity) {
throw new IllegalArgumentException(
String.format("cannot add element of size[%d] greater than capacity[%d]", getBytesSize(e), capacity)
);
}
}
public abstract long getBytesSize(E e);
public void elementAdded(E e)
{
currentSize.addAndGet(getBytesSize(e));
}
public void elementRemoved(E e)
{
currentSize.addAndGet(-1 * getBytesSize(e));
}
private void fullyUnlock()
{
takeLock.unlock();
putLock.unlock();
}
private void fullyLock()
{
takeLock.lock();
putLock.lock();
}
private void signalNotEmpty()
{
takeLock.lock();
try {
notEmpty.signal();
}
finally {
takeLock.unlock();
}
}
private void signalNotFull()
{
putLock.lock();
try {
notFull.signal();
}
finally {
putLock.unlock();
}
}
@Override
public int size()
{
return delegate.size();
}
@Override
public void put(E e) throws InterruptedException
{
while (!offer(e, Long.MAX_VALUE, TimeUnit.NANOSECONDS)) {
// keep trying until added successfully
}
}
@Override
public boolean offer(
E e, long timeout, TimeUnit unit
) throws InterruptedException
{
checkNotNull(e);
checkSize(e);
long nanos = unit.toNanos(timeout);
boolean added = false;
putLock.lockInterruptibly();
try {
while (currentSize.get() + getBytesSize(e) > capacity) {
if (nanos <= 0) {
return false;
}
nanos = notFull.awaitNanos(nanos);
}
delegate.add(e);
elementAdded(e);
added = true;
}
finally {
putLock.unlock();
}
if (added) {
signalNotEmpty();
}
return added;
}
@Override
public E take() throws InterruptedException
{
E e;
takeLock.lockInterruptibly();
try {
while (delegate.size() == 0) {
notEmpty.await();
}
e = delegate.remove();
elementRemoved(e);
}
finally {
takeLock.unlock();
}
if (e != null) {
signalNotFull();
}
return e;
}
@Override
public int remainingCapacity()
{
int delegateSize = delegate.size();
long currentByteSize = currentSize.get();
// return approximate remaining capacity based on current data
if (delegateSize == 0) {
return (int) Math.min(capacity, Integer.MAX_VALUE);
} else if (capacity > currentByteSize) {
long averageElementSize = currentByteSize / delegateSize;
return (int) ((capacity - currentByteSize) / averageElementSize);
} else {
// queue full
return 0;
}
}
@Override
public int drainTo(Collection<? super E> c)
{
return drainTo(c, Integer.MAX_VALUE);
}
@Override
public int drainTo(Collection<? super E> c, int maxElements)
{
if (c == null) {
throw new NullPointerException();
}
if (c == this) {
throw new IllegalArgumentException();
}
int n = 0;
takeLock.lock();
try {
n = Math.min(maxElements, delegate.size());
if (n < 0) {
return 0;
}
// count.get provides visibility to first n Nodes
for (int i = 0; i < n; i++) {
E e = delegate.remove(0);
elementRemoved(e);
c.add(e);
}
}
finally {
takeLock.unlock();
}
if (n > 0) {
signalNotFull();
}
return n;
}
@Override
public boolean offer(E e)
{
checkNotNull(e);
checkSize(e);
boolean added = false;
putLock.lock();
try {
if (currentSize.get() + getBytesSize(e) > capacity) {
return false;
} else {
added = delegate.add(e);
if (added) {
elementAdded(e);
}
}
}
finally {
putLock.unlock();
}
if (added) {
signalNotEmpty();
}
return added;
}
@Override
public E poll()
{
E e = null;
takeLock.lock();
try {
e = delegate.poll();
if (e != null) {
elementRemoved(e);
}
}
finally {
takeLock.unlock();
}
if (e != null) {
signalNotFull();
}
return e;
}
@Override
public E poll(long timeout, TimeUnit unit) throws InterruptedException
{
long nanos = unit.toNanos(timeout);
E e = null;
takeLock.lockInterruptibly();
try {
while (delegate.size() == 0) {
if (nanos <= 0) {
return null;
}
nanos = notEmpty.awaitNanos(nanos);
}
e = delegate.poll();
if (e != null) {
elementRemoved(e);
}
}
finally {
takeLock.unlock();
}
if (e != null) {
signalNotFull();
}
return e;
}
@Override
public E peek()
{
takeLock.lock();
try {
return delegate.peek();
}
finally {
takeLock.unlock();
}
}
@Override
public Iterator<E> iterator()
{
return new Itr(delegate.iterator());
}
private class Itr implements Iterator<E>
{
private final Iterator<E> delegate;
private E lastReturned;
Itr(Iterator<E> delegate)
{
this.delegate = delegate;
}
@Override
public boolean hasNext()
{
fullyLock();
try {
return delegate.hasNext();
}
finally {
fullyUnlock();
}
}
@Override
public E next()
{
fullyLock();
try {
this.lastReturned = delegate.next();
return lastReturned;
}
finally {
fullyUnlock();
}
}
@Override
public void remove()
{
fullyLock();
try {
if (this.lastReturned == null) {
throw new IllegalStateException();
}
delegate.remove();
elementRemoved(lastReturned);
signalNotFull();
lastReturned = null;
}
finally {
fullyUnlock();
}
}
}
}

View File

@ -56,7 +56,7 @@ public class MemcachedCache implements Cache
// always use compression
transcoder.setCompressionThreshold(0);
MemcachedOperationQueueFactory queueFactory = new MemcachedOperationQueueFactory(config.getMaxOperationQueueSize());
return new MemcachedCache(
new MemcachedClient(
new ConnectionFactoryBuilder().setProtocol(ConnectionFactoryBuilder.Protocol.BINARY)
@ -68,6 +68,7 @@ public class MemcachedCache implements Cache
.setShouldOptimize(true)
.setOpQueueMaxBlockTime(config.getTimeout())
.setOpTimeout(config.getTimeout())
.setOpQueueFactory(queueFactory)
.build(),
AddrUtil.getAddresses(config.getHosts())
),

View File

@ -27,19 +27,17 @@ public class MemcachedCacheConfig
{
@JsonProperty
private int expiration = 2592000; // What is this number?
@JsonProperty
private int timeout = 500;
@JsonProperty
@NotNull
private String hosts;
@JsonProperty
private int maxObjectSize = 50 * 1024 * 1024;
@JsonProperty
private String memcachedPrefix = "druid";
@JsonProperty
private long maxOperationQueueSize = 256 * 1024 * 1024L; // 256 MB
public int getExpiration()
{
@ -65,4 +63,9 @@ public class MemcachedCacheConfig
{
return memcachedPrefix;
}
public long getMaxOperationQueueSize()
{
return maxOperationQueueSize;
}
}

View File

@ -0,0 +1,48 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2014 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.client.cache;
import net.spy.memcached.ops.Operation;
import net.spy.memcached.ops.OperationQueueFactory;
import java.util.concurrent.BlockingQueue;
public class MemcachedOperationQueueFactory implements OperationQueueFactory
{
public final long maxQueueSize;
public MemcachedOperationQueueFactory(long maxQueueSize)
{
this.maxQueueSize = maxQueueSize;
}
@Override
public BlockingQueue<Operation> create()
{
return new BytesBoundedLinkedQueue<Operation>(maxQueueSize)
{
@Override
public long getBytesSize(Operation operation)
{
return operation.getBuffer().remaining();
}
};
}
}

View File

@ -0,0 +1,195 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.client.cache;
import com.metamx.common.ISE;
import org.junit.Assert;
import org.junit.Test;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class BytesBoundedLinkedQueueTest
{
private static int delayMS = 50;
private ExecutorService exec = Executors.newCachedThreadPool();
private static BlockingQueue<TestObject> getQueue(final int capacity)
{
return new BytesBoundedLinkedQueue<TestObject>(capacity)
{
@Override
public long getBytesSize(TestObject o)
{
return o.getSize();
}
};
}
@Test
public void testPoll() throws InterruptedException
{
final BlockingQueue q = getQueue(10);
long startTime = System.nanoTime();
Assert.assertNull(q.poll(delayMS, TimeUnit.MILLISECONDS));
Assert.assertTrue(TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime) >= delayMS);
TestObject obj = new TestObject(2);
Assert.assertTrue(q.offer(obj, delayMS, TimeUnit.MILLISECONDS));
Assert.assertSame(obj, q.poll(delayMS, TimeUnit.MILLISECONDS));
Thread.currentThread().interrupt();
try {
q.poll(delayMS, TimeUnit.MILLISECONDS);
throw new ISE("FAIL");
}
catch (InterruptedException success) {
}
Assert.assertFalse(Thread.interrupted());
}
@Test
public void testTake() throws Exception
{
final BlockingQueue<TestObject> q = getQueue(10);
Thread.currentThread().interrupt();
try {
q.take();
Assert.fail();
}
catch (InterruptedException success) {
//
}
final CountDownLatch latch = new CountDownLatch(1);
final TestObject object = new TestObject(4);
Future<TestObject> future = exec.submit(
new Callable<TestObject>()
{
@Override
public TestObject call() throws Exception
{
latch.countDown();
return q.take();
}
}
);
latch.await();
// test take blocks on empty queue
try {
future.get(delayMS, TimeUnit.MILLISECONDS);
Assert.fail();
}
catch (TimeoutException success) {
}
q.offer(object);
Assert.assertEquals(object, future.get());
}
@Test
public void testOfferAndPut() throws Exception
{
final BlockingQueue<TestObject> q = getQueue(10);
try {
q.offer(null);
Assert.fail();
}
catch (NullPointerException success) {
}
final TestObject obj = new TestObject(2);
while (q.remainingCapacity() > 0) {
Assert.assertTrue(q.offer(obj, delayMS, TimeUnit.MILLISECONDS));
}
// queue full
Assert.assertEquals(0, q.remainingCapacity());
Assert.assertFalse(q.offer(obj, delayMS, TimeUnit.MILLISECONDS));
Assert.assertFalse(q.offer(obj));
final CyclicBarrier barrier = new CyclicBarrier(2);
Future<Boolean> future = exec.submit(
new Callable<Boolean>()
{
@Override
public Boolean call() throws Exception
{
barrier.await();
Assert.assertTrue(q.offer(obj, delayMS, TimeUnit.MILLISECONDS));
Assert.assertEquals(q.remainingCapacity(), 0);
barrier.await();
q.put(obj);
return true;
}
}
);
barrier.await();
q.take();
barrier.await();
q.take();
Assert.assertTrue(future.get());
}
@Test
public void testAddBiggerElementThanCapacityFails()
{
BlockingQueue<TestObject> q = getQueue(5);
try {
q.offer(new TestObject(10));
Assert.fail();
}
catch (IllegalArgumentException success) {
}
}
@Test public void testAddedObjectExceedsCapacity() throws Exception {
BlockingQueue<TestObject> q = getQueue(4);
Assert.assertTrue(q.offer(new TestObject(3)));
Assert.assertFalse(q.offer(new TestObject(2)));
Assert.assertFalse(q.offer(new TestObject(2),delayMS, TimeUnit.MILLISECONDS));
}
public static class TestObject
{
public final int size;
TestObject(int size)
{
this.size = size;
}
public int getSize()
{
return size;
}
}
}

View File

@ -34,7 +34,7 @@ import java.util.List;
)
public class CliRealtime extends ServerRunnable
{
private static final Logger log = new Logger(CliBroker.class);
private static final Logger log = new Logger(CliRealtime.class);
public CliRealtime()
{