Merge branch 'master' of github.com:metamx/druid

This commit is contained in:
fjy 2014-02-20 18:17:26 -08:00
commit 699bae8e1b
11 changed files with 991 additions and 164 deletions

View File

@ -0,0 +1,137 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2014 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.query;
import com.google.common.base.Function;
import com.google.common.base.Predicates;
import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
import com.metamx.common.Pair;
import com.metamx.common.guava.Accumulator;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import com.metamx.common.logger.Logger;
import io.druid.data.input.Row;
import io.druid.query.groupby.GroupByQuery;
import io.druid.query.groupby.GroupByQueryConfig;
import io.druid.query.groupby.GroupByQueryHelper;
import io.druid.segment.incremental.IncrementalIndex;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
public class GroupByParallelQueryRunner implements QueryRunner<Row>
{
private static final Logger log = new Logger(GroupByParallelQueryRunner.class);
private final Iterable<QueryRunner<Row>> queryables;
private final ExecutorService exec;
private final Ordering<Row> ordering;
private final Supplier<GroupByQueryConfig> configSupplier;
public GroupByParallelQueryRunner(
ExecutorService exec,
Ordering<Row> ordering,
Supplier<GroupByQueryConfig> configSupplier,
QueryRunner<Row>... queryables
)
{
this(exec, ordering, configSupplier, Arrays.asList(queryables));
}
public GroupByParallelQueryRunner(
ExecutorService exec,
Ordering<Row> ordering, Supplier<GroupByQueryConfig> configSupplier,
Iterable<QueryRunner<Row>> queryables
)
{
this.exec = exec;
this.ordering = ordering;
this.queryables = Iterables.unmodifiableIterable(Iterables.filter(queryables, Predicates.notNull()));
this.configSupplier = configSupplier;
}
@Override
public Sequence<Row> run(final Query<Row> queryParam)
{
final GroupByQuery query = (GroupByQuery) queryParam;
final Pair<IncrementalIndex, Accumulator<IncrementalIndex, Row>> indexAccumulatorPair = GroupByQueryHelper.createIndexAccumulatorPair(
query,
configSupplier.get()
);
final int priority = Integer.parseInt(query.getContextValue("priority", "0"));
if (Iterables.isEmpty(queryables)) {
log.warn("No queryables found.");
}
List<Future<Boolean>> futures = Lists.newArrayList(
Iterables.transform(
queryables,
new Function<QueryRunner<Row>, Future<Boolean>>()
{
@Override
public Future<Boolean> apply(final QueryRunner<Row> input)
{
return exec.submit(
new PrioritizedCallable<Boolean>(priority)
{
@Override
public Boolean call() throws Exception
{
try {
input.run(queryParam).accumulate(indexAccumulatorPair.lhs, indexAccumulatorPair.rhs);
return true;
}
catch (Exception e) {
log.error(e, "Exception with one of the sequences!");
throw Throwables.propagate(e);
}
}
}
);
}
}
)
);
// Let the runners complete
for (Future<Boolean> future : futures) {
try {
future.get();
}
catch (InterruptedException e) {
throw new RuntimeException(e);
}
catch (ExecutionException e) {
throw new RuntimeException(e);
}
}
return Sequences.simple(indexAccumulatorPair.lhs.iterableWithPostAggregations(null));
}
}

View File

@ -0,0 +1,99 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2014 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.query.groupby;
import com.google.common.base.Function;
import com.google.common.collect.Lists;
import com.metamx.common.ISE;
import com.metamx.common.Pair;
import com.metamx.common.guava.Accumulator;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import io.druid.data.input.MapBasedRow;
import io.druid.data.input.Row;
import io.druid.data.input.Rows;
import io.druid.granularity.QueryGranularity;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.dimension.DimensionSpec;
import io.druid.segment.incremental.IncrementalIndex;
import javax.annotation.Nullable;
import java.util.List;
public class GroupByQueryHelper
{
public static Pair<IncrementalIndex, Accumulator<IncrementalIndex, Row>> createIndexAccumulatorPair(
final GroupByQuery query,
final GroupByQueryConfig config
)
{
final QueryGranularity gran = query.getGranularity();
final long timeStart = query.getIntervals().get(0).getStartMillis();
// use gran.iterable instead of gran.truncate so that
// AllGranularity returns timeStart instead of Long.MIN_VALUE
final long granTimeStart = gran.iterable(timeStart, timeStart + 1).iterator().next();
final List<AggregatorFactory> aggs = Lists.transform(
query.getAggregatorSpecs(),
new Function<AggregatorFactory, AggregatorFactory>()
{
@Override
public AggregatorFactory apply(@Nullable AggregatorFactory input)
{
return input.getCombiningFactory();
}
}
);
final List<String> dimensions = Lists.transform(
query.getDimensions(),
new Function<DimensionSpec, String>()
{
@Override
public String apply(@Nullable DimensionSpec input)
{
return input.getOutputName();
}
}
);
IncrementalIndex index = new IncrementalIndex(
// use granularity truncated min timestamp
// since incoming truncated timestamps may precede timeStart
granTimeStart,
gran,
aggs.toArray(new AggregatorFactory[aggs.size()])
);
Accumulator<IncrementalIndex, Row> accumulator = new Accumulator<IncrementalIndex, Row>()
{
@Override
public IncrementalIndex accumulate(IncrementalIndex accumulated, Row in)
{
if (accumulated.add(Rows.toCaseInsensitiveInputRow(in, dimensions)) > config.getMaxResults()) {
throw new ISE("Computation exceeds maxRows limit[%s]", config.getMaxResults());
}
return accumulated;
}
};
return new Pair<IncrementalIndex, Accumulator<IncrementalIndex, Row>>(index, accumulator);
}
}

View File

@ -24,10 +24,9 @@ import com.google.common.base.Function;
import com.google.common.base.Joiner; import com.google.common.base.Joiner;
import com.google.common.base.Supplier; import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.metamx.common.ISE; import com.metamx.common.Pair;
import com.metamx.common.guava.Accumulator; import com.metamx.common.guava.Accumulator;
import com.metamx.common.guava.ConcatSequence; import com.metamx.common.guava.ConcatSequence;
import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequence;
@ -35,21 +34,16 @@ import com.metamx.common.guava.Sequences;
import com.metamx.emitter.service.ServiceMetricEvent; import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.data.input.MapBasedRow; import io.druid.data.input.MapBasedRow;
import io.druid.data.input.Row; import io.druid.data.input.Row;
import io.druid.data.input.Rows;
import io.druid.granularity.QueryGranularity;
import io.druid.query.IntervalChunkingQueryRunner; import io.druid.query.IntervalChunkingQueryRunner;
import io.druid.query.Query; import io.druid.query.Query;
import io.druid.query.QueryRunner; import io.druid.query.QueryRunner;
import io.druid.query.QueryToolChest; import io.druid.query.QueryToolChest;
import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.MetricManipulationFn; import io.druid.query.aggregation.MetricManipulationFn;
import io.druid.query.dimension.DimensionSpec;
import io.druid.segment.incremental.IncrementalIndex; import io.druid.segment.incremental.IncrementalIndex;
import org.joda.time.Interval; import org.joda.time.Interval;
import org.joda.time.Minutes; import org.joda.time.Minutes;
import javax.annotation.Nullable;
import java.util.List;
import java.util.Map; import java.util.Map;
/** /**
@ -61,7 +55,6 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
}; };
private static final String GROUP_BY_MERGE_KEY = "groupByMerge"; private static final String GROUP_BY_MERGE_KEY = "groupByMerge";
private static final Map<String, String> NO_MERGE_CONTEXT = ImmutableMap.of(GROUP_BY_MERGE_KEY, "false"); private static final Map<String, String> NO_MERGE_CONTEXT = ImmutableMap.of(GROUP_BY_MERGE_KEY, "false");
private final Supplier<GroupByQueryConfig> configSupplier; private final Supplier<GroupByQueryConfig> configSupplier;
@Inject @Inject
@ -92,60 +85,13 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
private Sequence<Row> mergeGroupByResults(final GroupByQuery query, QueryRunner<Row> runner) private Sequence<Row> mergeGroupByResults(final GroupByQuery query, QueryRunner<Row> runner)
{ {
final GroupByQueryConfig config = configSupplier.get(); final GroupByQueryConfig config = configSupplier.get();
final QueryGranularity gran = query.getGranularity(); Pair<IncrementalIndex, Accumulator<IncrementalIndex, Row>> indexAccumulatorPair = GroupByQueryHelper.createIndexAccumulatorPair(
final long timeStart = query.getIntervals().get(0).getStartMillis(); query,
config
// use gran.iterable instead of gran.truncate so that
// AllGranularity returns timeStart instead of Long.MIN_VALUE
final long granTimeStart = gran.iterable(timeStart, timeStart + 1).iterator().next();
final List<AggregatorFactory> aggs = Lists.transform(
query.getAggregatorSpecs(),
new Function<AggregatorFactory, AggregatorFactory>()
{
@Override
public AggregatorFactory apply(@Nullable AggregatorFactory input)
{
return input.getCombiningFactory();
}
}
);
final List<String> dimensions = Lists.transform(
query.getDimensions(),
new Function<DimensionSpec, String>()
{
@Override
public String apply(@Nullable DimensionSpec input)
{
return input.getOutputName();
}
}
); );
final IncrementalIndex index = runner.run(query).accumulate( IncrementalIndex index = runner.run(query).accumulate(indexAccumulatorPair.lhs, indexAccumulatorPair.rhs);
new IncrementalIndex( Sequence<Row> sequence = Sequences.map(
// use granularity truncated min timestamp
// since incoming truncated timestamps may precede timeStart
granTimeStart,
gran,
aggs.toArray(new AggregatorFactory[aggs.size()])
),
new Accumulator<IncrementalIndex, Row>()
{
@Override
public IncrementalIndex accumulate(IncrementalIndex accumulated, Row in)
{
if (accumulated.add(Rows.toCaseInsensitiveInputRow(in, dimensions)) > config.getMaxResults()) {
throw new ISE("Computation exceeds maxRows limit[%s]", config.getMaxResults());
}
return accumulated;
}
}
);
// convert millis back to timestamp according to granularity to preserve time zone information
Sequence<Row> retVal = Sequences.map(
Sequences.simple(index.iterableWithPostAggregations(query.getPostAggregatorSpecs())), Sequences.simple(index.iterableWithPostAggregations(query.getPostAggregatorSpecs())),
new Function<Row, Row>() new Function<Row, Row>()
{ {
@ -153,12 +99,15 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
public Row apply(Row input) public Row apply(Row input)
{ {
final MapBasedRow row = (MapBasedRow) input; final MapBasedRow row = (MapBasedRow) input;
return new MapBasedRow(gran.toDateTime(row.getTimestampFromEpoch()), row.getEvent()); return new MapBasedRow(
query.getGranularity()
.toDateTime(row.getTimestampFromEpoch()),
row.getEvent()
);
} }
} }
); );
return query.applyLimit(sequence);
return query.applyLimit(retVal);
} }
@Override @Override

View File

@ -30,8 +30,8 @@ import com.metamx.common.guava.ExecutorExecutingSequence;
import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences; import com.metamx.common.guava.Sequences;
import io.druid.data.input.Row; import io.druid.data.input.Row;
import io.druid.query.ChainedExecutionQueryRunner;
import io.druid.query.ConcatQueryRunner; import io.druid.query.ConcatQueryRunner;
import io.druid.query.GroupByParallelQueryRunner;
import io.druid.query.Query; import io.druid.query.Query;
import io.druid.query.QueryRunner; import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerFactory; import io.druid.query.QueryRunnerFactory;
@ -116,9 +116,8 @@ public class GroupByQueryRunnerFactory implements QueryRunnerFactory<Row, GroupB
} }
) )
); );
} } else {
else { return new GroupByParallelQueryRunner(queryExecutor, new RowOrdering(), config, queryRunners);
return new ChainedExecutionQueryRunner<Row>(queryExecutor, new RowOrdering(), queryRunners);
} }
} }
@ -142,7 +141,7 @@ public class GroupByQueryRunnerFactory implements QueryRunnerFactory<Row, GroupB
@Override @Override
public Sequence<Row> run(Query<Row> input) public Sequence<Row> run(Query<Row> input)
{ {
if (! (input instanceof GroupByQuery)) { if (!(input instanceof GroupByQuery)) {
throw new ISE("Got a [%s] which isn't a %s", input.getClass(), GroupByQuery.class); throw new ISE("Got a [%s] which isn't a %s", input.getClass(), GroupByQuery.class);
} }

View File

@ -62,6 +62,7 @@ import java.util.Set;
import java.util.concurrent.ConcurrentNavigableMap; import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger;
/** /**
*/ */
@ -69,11 +70,9 @@ public class IncrementalIndex implements Iterable<Row>
{ {
private static final Logger log = new Logger(IncrementalIndex.class); private static final Logger log = new Logger(IncrementalIndex.class);
private static final Joiner JOINER = Joiner.on(","); private static final Joiner JOINER = Joiner.on(",");
private final long minTimestamp; private final long minTimestamp;
private final QueryGranularity gran; private final QueryGranularity gran;
private final AggregatorFactory[] metrics; private final AggregatorFactory[] metrics;
private final Map<String, Integer> metricIndexes; private final Map<String, Integer> metricIndexes;
private final Map<String, String> metricTypes; private final Map<String, String> metricTypes;
private final ImmutableList<String> metricNames; private final ImmutableList<String> metricNames;
@ -83,10 +82,8 @@ public class IncrementalIndex implements Iterable<Row>
private final SpatialDimensionRowFormatter spatialDimensionRowFormatter; private final SpatialDimensionRowFormatter spatialDimensionRowFormatter;
private final DimensionHolder dimValues; private final DimensionHolder dimValues;
private final ConcurrentSkipListMap<TimeAndDims, Aggregator[]> facts; private final ConcurrentSkipListMap<TimeAndDims, Aggregator[]> facts;
private volatile AtomicInteger numEntries = new AtomicInteger();
private volatile int numEntries = 0; // This is modified on add() in a critical section.
// This is modified on add() by a (hopefully) single thread.
private InputRow in; private InputRow in;
public IncrementalIndex(IncrementalIndexSchema incrementalIndexSchema) public IncrementalIndex(IncrementalIndexSchema incrementalIndexSchema)
@ -162,15 +159,22 @@ public class IncrementalIndex implements Iterable<Row>
dimension = dimension.toLowerCase(); dimension = dimension.toLowerCase();
List<String> dimensionValues = row.getDimension(dimension); List<String> dimensionValues = row.getDimension(dimension);
final Integer index = dimensionOrder.get(dimension); Integer index = dimensionOrder.get(dimension);
if (index == null) { if (index == null) {
dimensionOrder.put(dimension, dimensionOrder.size()); synchronized (dimensionOrder) {
dimensions.add(dimension); index = dimensionOrder.get(dimension);
if (index == null) {
dimensionOrder.put(dimension, dimensionOrder.size());
dimensions.add(dimension);
if (overflow == null) { if (overflow == null) {
overflow = Lists.newArrayList(); overflow = Lists.newArrayList();
}
overflow.add(getDimVals(dimValues.add(dimension), dimensionValues));
} else {
dims[index] = getDimVals(dimValues.get(dimension), dimensionValues);
}
} }
overflow.add(getDimVals(dimValues.add(dimension), dimensionValues));
} else { } else {
dims[index] = getDimVals(dimValues.get(dimension), dimensionValues); dims[index] = getDimVals(dimValues.get(dimension), dimensionValues);
} }
@ -188,118 +192,128 @@ public class IncrementalIndex implements Iterable<Row>
TimeAndDims key = new TimeAndDims(Math.max(gran.truncate(row.getTimestampFromEpoch()), minTimestamp), dims); TimeAndDims key = new TimeAndDims(Math.max(gran.truncate(row.getTimestampFromEpoch()), minTimestamp), dims);
in = row;
Aggregator[] aggs = facts.get(key); Aggregator[] aggs = facts.get(key);
if (aggs == null) { if (aggs == null) {
aggs = new Aggregator[metrics.length]; aggs = new Aggregator[metrics.length];
for (int i = 0; i < metrics.length; ++i) { for (int i = 0; i < metrics.length; ++i) {
final AggregatorFactory agg = metrics[i]; final AggregatorFactory agg = metrics[i];
aggs[i] = agg.factorize( aggs[i] =
new ColumnSelectorFactory() agg.factorize(
{ new ColumnSelectorFactory()
@Override
public TimestampColumnSelector makeTimestampColumnSelector()
{
return new TimestampColumnSelector()
{ {
@Override @Override
public long getTimestamp() public TimestampColumnSelector makeTimestampColumnSelector()
{ {
return in.getTimestampFromEpoch(); return new TimestampColumnSelector()
}
};
}
@Override
public FloatColumnSelector makeFloatColumnSelector(String columnName)
{
final String metricName = columnName.toLowerCase();
return new FloatColumnSelector()
{
@Override
public float get()
{
return in.getFloatMetric(metricName);
}
};
}
@Override
public ObjectColumnSelector makeObjectColumnSelector(String column)
{
final String typeName = agg.getTypeName();
final String columnName = column.toLowerCase();
if (typeName.equals("float")) {
return new ObjectColumnSelector<Float>()
{
@Override
public Class classOfObject()
{ {
return Float.TYPE; @Override
public long getTimestamp()
{
return in.getTimestampFromEpoch();
}
};
}
@Override
public FloatColumnSelector makeFloatColumnSelector(String columnName)
{
final String metricName = columnName.toLowerCase();
return new FloatColumnSelector()
{
@Override
public float get()
{
return in.getFloatMetric(metricName);
}
};
}
@Override
public ObjectColumnSelector makeObjectColumnSelector(String column)
{
final String typeName = agg.getTypeName();
final String columnName = column.toLowerCase();
if (typeName.equals("float")) {
return new ObjectColumnSelector<Float>()
{
@Override
public Class classOfObject()
{
return Float.TYPE;
}
@Override
public Float get()
{
return in.getFloatMetric(columnName);
}
};
} }
@Override final ComplexMetricSerde serde = ComplexMetrics.getSerdeForType(typeName);
public Float get()
{ if (serde == null) {
return in.getFloatMetric(columnName); throw new ISE("Don't know how to handle type[%s]", typeName);
} }
};
}
final ComplexMetricSerde serde = ComplexMetrics.getSerdeForType(typeName); final ComplexMetricExtractor extractor = serde.getExtractor();
if (serde == null) { return new ObjectColumnSelector()
throw new ISE("Don't know how to handle type[%s]", typeName); {
} @Override
public Class classOfObject()
{
return extractor.extractedClass();
}
final ComplexMetricExtractor extractor = serde.getExtractor(); @Override
public Object get()
return new ObjectColumnSelector() {
{ return extractor.extractValue(in, columnName);
@Override }
public Class classOfObject() };
{
return extractor.extractedClass();
} }
@Override @Override
public Object get() public DimensionSelector makeDimensionSelector(String dimension)
{ {
return extractor.extractValue(in, columnName); // we should implement this, but this is going to be rewritten soon anyways
throw new UnsupportedOperationException(
"Incremental index aggregation does not support dimension selectors"
);
} }
}; }
}
@Override );
public DimensionSelector makeDimensionSelector(String dimension) {
// we should implement this, but this is going to be rewritten soon anyways
throw new UnsupportedOperationException("Incremental index aggregation does not support dimension selectors");
}
}
);
} }
facts.put(key, aggs); Aggregator[] prev = facts.putIfAbsent(key, aggs);
++numEntries; if (prev != null) {
aggs = prev;
}
numEntries.incrementAndGet();
} }
for (Aggregator agg : aggs) { synchronized (this) {
agg.aggregate(); in = row;
for (Aggregator agg : aggs) {
agg.aggregate();
}
in = null;
} }
in = null; return numEntries.get();
return numEntries;
} }
public boolean isEmpty() public boolean isEmpty()
{ {
return numEntries == 0; return numEntries.get() == 0;
} }
public int size() public int size()
{ {
return numEntries; return numEntries.get();
} }
public long getMinTimeMillis() public long getMinTimeMillis()
@ -585,7 +599,6 @@ public class IncrementalIndex implements Iterable<Row>
private final Map<String, String> poorMansInterning = Maps.newConcurrentMap(); private final Map<String, String> poorMansInterning = Maps.newConcurrentMap();
private final Map<String, Integer> falseIds; private final Map<String, Integer> falseIds;
private final Map<Integer, String> falseIdsReverse; private final Map<Integer, String> falseIdsReverse;
private volatile String[] sortedVals = null; private volatile String[] sortedVals = null;
public DimDim() public DimDim()

View File

@ -0,0 +1,383 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2014 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.client.cache;
import java.util.AbstractQueue;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* Abstract implementation of a BlockingQueue bounded by the size of elements,
* works similar to LinkedBlockingQueue except that capacity is bounded by the size in bytes of the elements in the queue.
*/
public abstract class BytesBoundedLinkedQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>
{
private final LinkedList<E> delegate;
private final AtomicLong currentSize = new AtomicLong(0);
private final Lock putLock = new ReentrantLock();
private final Condition notFull = putLock.newCondition();
private final Lock takeLock = new ReentrantLock();
private final Condition notEmpty = takeLock.newCondition();
private long capacity;
public BytesBoundedLinkedQueue(long capacity)
{
delegate = new LinkedList<>();
this.capacity = capacity;
}
private static void checkNotNull(Object v)
{
if (v == null) {
throw new NullPointerException();
}
}
private void checkSize(E e)
{
if (getBytesSize(e) > capacity) {
throw new IllegalArgumentException(
String.format("cannot add element of size[%d] greater than capacity[%d]", getBytesSize(e), capacity)
);
}
}
public abstract long getBytesSize(E e);
public void elementAdded(E e)
{
currentSize.addAndGet(getBytesSize(e));
}
public void elementRemoved(E e)
{
currentSize.addAndGet(-1 * getBytesSize(e));
}
private void fullyUnlock()
{
takeLock.unlock();
putLock.unlock();
}
private void fullyLock()
{
takeLock.lock();
putLock.lock();
}
private void signalNotEmpty()
{
takeLock.lock();
try {
notEmpty.signal();
}
finally {
takeLock.unlock();
}
}
private void signalNotFull()
{
putLock.lock();
try {
notFull.signal();
}
finally {
putLock.unlock();
}
}
@Override
public int size()
{
return delegate.size();
}
@Override
public void put(E e) throws InterruptedException
{
while (!offer(e, Long.MAX_VALUE, TimeUnit.NANOSECONDS)) {
// keep trying until added successfully
}
}
@Override
public boolean offer(
E e, long timeout, TimeUnit unit
) throws InterruptedException
{
checkNotNull(e);
checkSize(e);
long nanos = unit.toNanos(timeout);
boolean added = false;
putLock.lockInterruptibly();
try {
while (currentSize.get() + getBytesSize(e) > capacity) {
if (nanos <= 0) {
return false;
}
nanos = notFull.awaitNanos(nanos);
}
delegate.add(e);
elementAdded(e);
added = true;
}
finally {
putLock.unlock();
}
if (added) {
signalNotEmpty();
}
return added;
}
@Override
public E take() throws InterruptedException
{
E e;
takeLock.lockInterruptibly();
try {
while (delegate.size() == 0) {
notEmpty.await();
}
e = delegate.remove();
elementRemoved(e);
}
finally {
takeLock.unlock();
}
if (e != null) {
signalNotFull();
}
return e;
}
@Override
public int remainingCapacity()
{
int delegateSize = delegate.size();
long currentByteSize = currentSize.get();
// return approximate remaining capacity based on current data
if (delegateSize == 0) {
return (int) Math.min(capacity, Integer.MAX_VALUE);
} else if (capacity > currentByteSize) {
long averageElementSize = currentByteSize / delegateSize;
return (int) ((capacity - currentByteSize) / averageElementSize);
} else {
// queue full
return 0;
}
}
@Override
public int drainTo(Collection<? super E> c)
{
return drainTo(c, Integer.MAX_VALUE);
}
@Override
public int drainTo(Collection<? super E> c, int maxElements)
{
if (c == null) {
throw new NullPointerException();
}
if (c == this) {
throw new IllegalArgumentException();
}
int n = 0;
takeLock.lock();
try {
n = Math.min(maxElements, delegate.size());
if (n < 0) {
return 0;
}
// count.get provides visibility to first n Nodes
for (int i = 0; i < n; i++) {
E e = delegate.remove(0);
elementRemoved(e);
c.add(e);
}
}
finally {
takeLock.unlock();
}
if (n > 0) {
signalNotFull();
}
return n;
}
@Override
public boolean offer(E e)
{
checkNotNull(e);
checkSize(e);
boolean added = false;
putLock.lock();
try {
if (currentSize.get() + getBytesSize(e) > capacity) {
return false;
} else {
added = delegate.add(e);
if (added) {
elementAdded(e);
}
}
}
finally {
putLock.unlock();
}
if (added) {
signalNotEmpty();
}
return added;
}
@Override
public E poll()
{
E e = null;
takeLock.lock();
try {
e = delegate.poll();
if (e != null) {
elementRemoved(e);
}
}
finally {
takeLock.unlock();
}
if (e != null) {
signalNotFull();
}
return e;
}
@Override
public E poll(long timeout, TimeUnit unit) throws InterruptedException
{
long nanos = unit.toNanos(timeout);
E e = null;
takeLock.lockInterruptibly();
try {
while (delegate.size() == 0) {
if (nanos <= 0) {
return null;
}
nanos = notEmpty.awaitNanos(nanos);
}
e = delegate.poll();
if (e != null) {
elementRemoved(e);
}
}
finally {
takeLock.unlock();
}
if (e != null) {
signalNotFull();
}
return e;
}
@Override
public E peek()
{
takeLock.lock();
try {
return delegate.peek();
}
finally {
takeLock.unlock();
}
}
@Override
public Iterator<E> iterator()
{
return new Itr(delegate.iterator());
}
private class Itr implements Iterator<E>
{
private final Iterator<E> delegate;
private E lastReturned;
Itr(Iterator<E> delegate)
{
this.delegate = delegate;
}
@Override
public boolean hasNext()
{
fullyLock();
try {
return delegate.hasNext();
}
finally {
fullyUnlock();
}
}
@Override
public E next()
{
fullyLock();
try {
this.lastReturned = delegate.next();
return lastReturned;
}
finally {
fullyUnlock();
}
}
@Override
public void remove()
{
fullyLock();
try {
if (this.lastReturned == null) {
throw new IllegalStateException();
}
delegate.remove();
elementRemoved(lastReturned);
signalNotFull();
lastReturned = null;
}
finally {
fullyUnlock();
}
}
}
}

View File

@ -56,7 +56,7 @@ public class MemcachedCache implements Cache
// always use compression // always use compression
transcoder.setCompressionThreshold(0); transcoder.setCompressionThreshold(0);
MemcachedOperationQueueFactory queueFactory = new MemcachedOperationQueueFactory(config.getMaxOperationQueueSize());
return new MemcachedCache( return new MemcachedCache(
new MemcachedClient( new MemcachedClient(
new ConnectionFactoryBuilder().setProtocol(ConnectionFactoryBuilder.Protocol.BINARY) new ConnectionFactoryBuilder().setProtocol(ConnectionFactoryBuilder.Protocol.BINARY)
@ -68,6 +68,7 @@ public class MemcachedCache implements Cache
.setShouldOptimize(true) .setShouldOptimize(true)
.setOpQueueMaxBlockTime(config.getTimeout()) .setOpQueueMaxBlockTime(config.getTimeout())
.setOpTimeout(config.getTimeout()) .setOpTimeout(config.getTimeout())
.setOpQueueFactory(queueFactory)
.build(), .build(),
AddrUtil.getAddresses(config.getHosts()) AddrUtil.getAddresses(config.getHosts())
), ),

View File

@ -27,19 +27,17 @@ public class MemcachedCacheConfig
{ {
@JsonProperty @JsonProperty
private int expiration = 2592000; // What is this number? private int expiration = 2592000; // What is this number?
@JsonProperty @JsonProperty
private int timeout = 500; private int timeout = 500;
@JsonProperty @JsonProperty
@NotNull @NotNull
private String hosts; private String hosts;
@JsonProperty @JsonProperty
private int maxObjectSize = 50 * 1024 * 1024; private int maxObjectSize = 50 * 1024 * 1024;
@JsonProperty @JsonProperty
private String memcachedPrefix = "druid"; private String memcachedPrefix = "druid";
@JsonProperty
private long maxOperationQueueSize = 256 * 1024 * 1024L; // 256 MB
public int getExpiration() public int getExpiration()
{ {
@ -65,4 +63,9 @@ public class MemcachedCacheConfig
{ {
return memcachedPrefix; return memcachedPrefix;
} }
public long getMaxOperationQueueSize()
{
return maxOperationQueueSize;
}
} }

View File

@ -0,0 +1,48 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2014 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.client.cache;
import net.spy.memcached.ops.Operation;
import net.spy.memcached.ops.OperationQueueFactory;
import java.util.concurrent.BlockingQueue;
public class MemcachedOperationQueueFactory implements OperationQueueFactory
{
public final long maxQueueSize;
public MemcachedOperationQueueFactory(long maxQueueSize)
{
this.maxQueueSize = maxQueueSize;
}
@Override
public BlockingQueue<Operation> create()
{
return new BytesBoundedLinkedQueue<Operation>(maxQueueSize)
{
@Override
public long getBytesSize(Operation operation)
{
return operation.getBuffer().remaining();
}
};
}
}

View File

@ -0,0 +1,195 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.client.cache;
import com.metamx.common.ISE;
import org.junit.Assert;
import org.junit.Test;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class BytesBoundedLinkedQueueTest
{
private static int delayMS = 50;
private ExecutorService exec = Executors.newCachedThreadPool();
private static BlockingQueue<TestObject> getQueue(final int capacity)
{
return new BytesBoundedLinkedQueue<TestObject>(capacity)
{
@Override
public long getBytesSize(TestObject o)
{
return o.getSize();
}
};
}
@Test
public void testPoll() throws InterruptedException
{
final BlockingQueue q = getQueue(10);
long startTime = System.nanoTime();
Assert.assertNull(q.poll(delayMS, TimeUnit.MILLISECONDS));
Assert.assertTrue(TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime) >= delayMS);
TestObject obj = new TestObject(2);
Assert.assertTrue(q.offer(obj, delayMS, TimeUnit.MILLISECONDS));
Assert.assertSame(obj, q.poll(delayMS, TimeUnit.MILLISECONDS));
Thread.currentThread().interrupt();
try {
q.poll(delayMS, TimeUnit.MILLISECONDS);
throw new ISE("FAIL");
}
catch (InterruptedException success) {
}
Assert.assertFalse(Thread.interrupted());
}
@Test
public void testTake() throws Exception
{
final BlockingQueue<TestObject> q = getQueue(10);
Thread.currentThread().interrupt();
try {
q.take();
Assert.fail();
}
catch (InterruptedException success) {
//
}
final CountDownLatch latch = new CountDownLatch(1);
final TestObject object = new TestObject(4);
Future<TestObject> future = exec.submit(
new Callable<TestObject>()
{
@Override
public TestObject call() throws Exception
{
latch.countDown();
return q.take();
}
}
);
latch.await();
// test take blocks on empty queue
try {
future.get(delayMS, TimeUnit.MILLISECONDS);
Assert.fail();
}
catch (TimeoutException success) {
}
q.offer(object);
Assert.assertEquals(object, future.get());
}
@Test
public void testOfferAndPut() throws Exception
{
final BlockingQueue<TestObject> q = getQueue(10);
try {
q.offer(null);
Assert.fail();
}
catch (NullPointerException success) {
}
final TestObject obj = new TestObject(2);
while (q.remainingCapacity() > 0) {
Assert.assertTrue(q.offer(obj, delayMS, TimeUnit.MILLISECONDS));
}
// queue full
Assert.assertEquals(0, q.remainingCapacity());
Assert.assertFalse(q.offer(obj, delayMS, TimeUnit.MILLISECONDS));
Assert.assertFalse(q.offer(obj));
final CyclicBarrier barrier = new CyclicBarrier(2);
Future<Boolean> future = exec.submit(
new Callable<Boolean>()
{
@Override
public Boolean call() throws Exception
{
barrier.await();
Assert.assertTrue(q.offer(obj, delayMS, TimeUnit.MILLISECONDS));
Assert.assertEquals(q.remainingCapacity(), 0);
barrier.await();
q.put(obj);
return true;
}
}
);
barrier.await();
q.take();
barrier.await();
q.take();
Assert.assertTrue(future.get());
}
@Test
public void testAddBiggerElementThanCapacityFails()
{
BlockingQueue<TestObject> q = getQueue(5);
try {
q.offer(new TestObject(10));
Assert.fail();
}
catch (IllegalArgumentException success) {
}
}
@Test public void testAddedObjectExceedsCapacity() throws Exception {
BlockingQueue<TestObject> q = getQueue(4);
Assert.assertTrue(q.offer(new TestObject(3)));
Assert.assertFalse(q.offer(new TestObject(2)));
Assert.assertFalse(q.offer(new TestObject(2),delayMS, TimeUnit.MILLISECONDS));
}
public static class TestObject
{
public final int size;
TestObject(int size)
{
this.size = size;
}
public int getSize()
{
return size;
}
}
}

View File

@ -34,7 +34,7 @@ import java.util.List;
) )
public class CliRealtime extends ServerRunnable public class CliRealtime extends ServerRunnable
{ {
private static final Logger log = new Logger(CliBroker.class); private static final Logger log = new Logger(CliRealtime.class);
public CliRealtime() public CliRealtime()
{ {