diff --git a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java index af72bf3a0b2..124035e612f 100644 --- a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java +++ b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java @@ -130,146 +130,148 @@ public class IncrementalIndex implements Iterable, Closeable int currAggSize = 0; for (int i = 0; i < metrics.length; i++) { final AggregatorFactory agg = metrics[i]; - aggs[i] = agg.factorizeBuffered( - new ColumnSelectorFactory() - { - @Override - public TimestampColumnSelector makeTimestampColumnSelector() - { - return new TimestampColumnSelector() + aggs[i] = new ThreadSafeAggregator( + agg.factorizeBuffered( + new ColumnSelectorFactory() { @Override - public long getTimestamp() + public TimestampColumnSelector makeTimestampColumnSelector() { - return in.get().getTimestampFromEpoch(); - } - }; - } - - @Override - public FloatColumnSelector makeFloatColumnSelector(String columnName) - { - final String metricName = columnName.toLowerCase(); - return new FloatColumnSelector() - { - @Override - public float get() - { - return in.get().getFloatMetric(metricName); - } - }; - } - - @Override - public ObjectColumnSelector makeObjectColumnSelector(String column) - { - final String typeName = agg.getTypeName(); - final String columnName = column.toLowerCase(); - - final ObjectColumnSelector rawColumnSelector = new ObjectColumnSelector() - { - @Override - public Class classOfObject() - { - return Object.class; - } - - @Override - public Object get() - { - return in.get().getRaw(columnName); - } - }; - - if (!deserializeComplexMetrics) { - return rawColumnSelector; - } else { - if (typeName.equals("float")) { - return rawColumnSelector; - } - - final ComplexMetricSerde serde = ComplexMetrics.getSerdeForType(typeName); - if (serde == null) { - throw new ISE("Don't know how to handle type[%s]", typeName); - } - - final ComplexMetricExtractor extractor = serde.getExtractor(); - return new ObjectColumnSelector() - { - @Override - public Class classOfObject() - { - return extractor.extractedClass(); - } - - @Override - public Object get() - { - return extractor.extractValue(in.get(), columnName); - } - }; - } - } - - @Override - public DimensionSelector makeDimensionSelector(final String dimension) - { - final String dimensionName = dimension.toLowerCase(); - return new DimensionSelector() - { - @Override - public IndexedInts getRow() - { - final List dimensionValues = in.get().getDimension(dimensionName); - final ArrayList vals = Lists.newArrayList(); - if (dimensionValues != null) { - for (int i = 0; i < dimensionValues.size(); ++i) { - vals.add(i); - } - } - - return new IndexedInts() + return new TimestampColumnSelector() { @Override - public int size() + public long getTimestamp() { - return vals.size(); - } - - @Override - public int get(int index) - { - return vals.get(index); - } - - @Override - public Iterator iterator() - { - return vals.iterator(); + return in.get().getTimestampFromEpoch(); } }; } @Override - public int getValueCardinality() + public FloatColumnSelector makeFloatColumnSelector(String columnName) { - throw new UnsupportedOperationException("value cardinality is unknown in incremental index"); + final String metricName = columnName.toLowerCase(); + return new FloatColumnSelector() + { + @Override + public float get() + { + return in.get().getFloatMetric(metricName); + } + }; } @Override - public String lookupName(int id) + public ObjectColumnSelector makeObjectColumnSelector(String column) { - return in.get().getDimension(dimensionName).get(id); + final String typeName = agg.getTypeName(); + final String columnName = column.toLowerCase(); + + final ObjectColumnSelector rawColumnSelector = new ObjectColumnSelector() + { + @Override + public Class classOfObject() + { + return Object.class; + } + + @Override + public Object get() + { + return in.get().getRaw(columnName); + } + }; + + if (!deserializeComplexMetrics) { + return rawColumnSelector; + } else { + if (typeName.equals("float")) { + return rawColumnSelector; + } + + final ComplexMetricSerde serde = ComplexMetrics.getSerdeForType(typeName); + if (serde == null) { + throw new ISE("Don't know how to handle type[%s]", typeName); + } + + final ComplexMetricExtractor extractor = serde.getExtractor(); + return new ObjectColumnSelector() + { + @Override + public Class classOfObject() + { + return extractor.extractedClass(); + } + + @Override + public Object get() + { + return extractor.extractValue(in.get(), columnName); + } + }; + } } @Override - public int lookupId(String name) + public DimensionSelector makeDimensionSelector(final String dimension) { - return in.get().getDimension(dimensionName).indexOf(name); + final String dimensionName = dimension.toLowerCase(); + return new DimensionSelector() + { + @Override + public IndexedInts getRow() + { + final List dimensionValues = in.get().getDimension(dimensionName); + final ArrayList vals = Lists.newArrayList(); + if (dimensionValues != null) { + for (int i = 0; i < dimensionValues.size(); ++i) { + vals.add(i); + } + } + + return new IndexedInts() + { + @Override + public int size() + { + return vals.size(); + } + + @Override + public int get(int index) + { + return vals.get(index); + } + + @Override + public Iterator iterator() + { + return vals.iterator(); + } + }; + } + + @Override + public int getValueCardinality() + { + throw new UnsupportedOperationException("value cardinality is unknown in incremental index"); + } + + @Override + public String lookupName(int id) + { + return in.get().getDimension(dimensionName).get(id); + } + + @Override + public int lookupId(String name) + { + return in.get().getDimension(dimensionName).indexOf(name); + } + }; } - }; - } - } + } + ) ); aggPositionOffsets[i] = currAggSize; currAggSize += agg.getMaxIntermediateSize(); @@ -458,9 +460,7 @@ public class IncrementalIndex implements Iterable, Closeable in.set(row); int rowOffset = facts.get(key); for (int i = 0; i < aggs.length; i++) { - synchronized (aggs[i]) { aggs[i].aggregate(bufferHolder.get(), getMetricPosition(rowOffset, i)); - } } in.set(null); return numEntries.get(); @@ -473,7 +473,7 @@ public class IncrementalIndex implements Iterable, Closeable /** * - * @return true if the underlying buffer for IncrementalIndex is full and cannot accomodate more rows. + * @return true if the underlying buffer for IncrementalIndex is full and cannot accommodate more rows. */ public boolean isFull() { @@ -861,4 +861,45 @@ public class IncrementalIndex implements Iterable, Closeable } } } + + private static class ThreadSafeAggregator implements BufferAggregator + { + + private final BufferAggregator delegate; + + public ThreadSafeAggregator(BufferAggregator delegate) + { + this.delegate = delegate; + } + + @Override + public synchronized void init(ByteBuffer buf, int position) + { + delegate.init(buf, position); + } + + @Override + public synchronized void aggregate(ByteBuffer buf, int position) + { + delegate.aggregate(buf, position); + } + + @Override + public synchronized Object get(ByteBuffer buf, int position) + { + return delegate.get(buf, position); + } + + @Override + public synchronized float getFloat(ByteBuffer buf, int position) + { + return delegate.getFloat(buf, position); + } + + @Override + public synchronized void close() + { + delegate.close(); + } + } } diff --git a/server/src/main/java/io/druid/segment/indexing/RealtimeTuningConfig.java b/server/src/main/java/io/druid/segment/indexing/RealtimeTuningConfig.java index 29d93deb5a0..52df7cafbe4 100644 --- a/server/src/main/java/io/druid/segment/indexing/RealtimeTuningConfig.java +++ b/server/src/main/java/io/druid/segment/indexing/RealtimeTuningConfig.java @@ -36,7 +36,7 @@ import java.io.File; */ public class RealtimeTuningConfig implements TuningConfig { - private static final int defaultBufferSize = 512 * 1024 * 1024; + private static final int defaultBufferSize = 256 * 1024 * 1024; private static final Period defaultIntermediatePersistPeriod = new Period("PT10M"); private static final Period defaultWindowPeriod = new Period("PT10M"); private static final File defaultBasePersistDirectory = Files.createTempDir(); diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/Sink.java b/server/src/main/java/io/druid/segment/realtime/plumber/Sink.java index 87f2ef4a9ba..03315201520 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/Sink.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/Sink.java @@ -136,7 +136,7 @@ public class Sink implements Iterable public boolean isFull() { synchronized (currHydrant){ - return currHydrant.getIndex().isFull(); + return currHydrant != null && currHydrant.getIndex().isFull(); } }