fixes from review comments

fix sync of aggs,
fix NPE in sink.isFull,
RealtimeTuningConfig lower the bufferSize to 256m
This commit is contained in:
nishantmonu51 2014-08-21 09:24:04 +05:30
parent d64879ccca
commit 67f4bbae74
3 changed files with 165 additions and 124 deletions

View File

@ -130,146 +130,148 @@ public class IncrementalIndex implements Iterable<Row>, Closeable
int currAggSize = 0;
for (int i = 0; i < metrics.length; i++) {
final AggregatorFactory agg = metrics[i];
aggs[i] = agg.factorizeBuffered(
new ColumnSelectorFactory()
{
@Override
public TimestampColumnSelector makeTimestampColumnSelector()
{
return new TimestampColumnSelector()
aggs[i] = new ThreadSafeAggregator(
agg.factorizeBuffered(
new ColumnSelectorFactory()
{
@Override
public long getTimestamp()
public TimestampColumnSelector makeTimestampColumnSelector()
{
return in.get().getTimestampFromEpoch();
}
};
}
@Override
public FloatColumnSelector makeFloatColumnSelector(String columnName)
{
final String metricName = columnName.toLowerCase();
return new FloatColumnSelector()
{
@Override
public float get()
{
return in.get().getFloatMetric(metricName);
}
};
}
@Override
public ObjectColumnSelector makeObjectColumnSelector(String column)
{
final String typeName = agg.getTypeName();
final String columnName = column.toLowerCase();
final ObjectColumnSelector<Object> rawColumnSelector = new ObjectColumnSelector<Object>()
{
@Override
public Class classOfObject()
{
return Object.class;
}
@Override
public Object get()
{
return in.get().getRaw(columnName);
}
};
if (!deserializeComplexMetrics) {
return rawColumnSelector;
} else {
if (typeName.equals("float")) {
return rawColumnSelector;
}
final ComplexMetricSerde serde = ComplexMetrics.getSerdeForType(typeName);
if (serde == null) {
throw new ISE("Don't know how to handle type[%s]", typeName);
}
final ComplexMetricExtractor extractor = serde.getExtractor();
return new ObjectColumnSelector()
{
@Override
public Class classOfObject()
{
return extractor.extractedClass();
}
@Override
public Object get()
{
return extractor.extractValue(in.get(), columnName);
}
};
}
}
@Override
public DimensionSelector makeDimensionSelector(final String dimension)
{
final String dimensionName = dimension.toLowerCase();
return new DimensionSelector()
{
@Override
public IndexedInts getRow()
{
final List<String> dimensionValues = in.get().getDimension(dimensionName);
final ArrayList<Integer> vals = Lists.newArrayList();
if (dimensionValues != null) {
for (int i = 0; i < dimensionValues.size(); ++i) {
vals.add(i);
}
}
return new IndexedInts()
return new TimestampColumnSelector()
{
@Override
public int size()
public long getTimestamp()
{
return vals.size();
}
@Override
public int get(int index)
{
return vals.get(index);
}
@Override
public Iterator<Integer> iterator()
{
return vals.iterator();
return in.get().getTimestampFromEpoch();
}
};
}
@Override
public int getValueCardinality()
public FloatColumnSelector makeFloatColumnSelector(String columnName)
{
throw new UnsupportedOperationException("value cardinality is unknown in incremental index");
final String metricName = columnName.toLowerCase();
return new FloatColumnSelector()
{
@Override
public float get()
{
return in.get().getFloatMetric(metricName);
}
};
}
@Override
public String lookupName(int id)
public ObjectColumnSelector makeObjectColumnSelector(String column)
{
return in.get().getDimension(dimensionName).get(id);
final String typeName = agg.getTypeName();
final String columnName = column.toLowerCase();
final ObjectColumnSelector<Object> rawColumnSelector = new ObjectColumnSelector<Object>()
{
@Override
public Class classOfObject()
{
return Object.class;
}
@Override
public Object get()
{
return in.get().getRaw(columnName);
}
};
if (!deserializeComplexMetrics) {
return rawColumnSelector;
} else {
if (typeName.equals("float")) {
return rawColumnSelector;
}
final ComplexMetricSerde serde = ComplexMetrics.getSerdeForType(typeName);
if (serde == null) {
throw new ISE("Don't know how to handle type[%s]", typeName);
}
final ComplexMetricExtractor extractor = serde.getExtractor();
return new ObjectColumnSelector()
{
@Override
public Class classOfObject()
{
return extractor.extractedClass();
}
@Override
public Object get()
{
return extractor.extractValue(in.get(), columnName);
}
};
}
}
@Override
public int lookupId(String name)
public DimensionSelector makeDimensionSelector(final String dimension)
{
return in.get().getDimension(dimensionName).indexOf(name);
final String dimensionName = dimension.toLowerCase();
return new DimensionSelector()
{
@Override
public IndexedInts getRow()
{
final List<String> dimensionValues = in.get().getDimension(dimensionName);
final ArrayList<Integer> vals = Lists.newArrayList();
if (dimensionValues != null) {
for (int i = 0; i < dimensionValues.size(); ++i) {
vals.add(i);
}
}
return new IndexedInts()
{
@Override
public int size()
{
return vals.size();
}
@Override
public int get(int index)
{
return vals.get(index);
}
@Override
public Iterator<Integer> iterator()
{
return vals.iterator();
}
};
}
@Override
public int getValueCardinality()
{
throw new UnsupportedOperationException("value cardinality is unknown in incremental index");
}
@Override
public String lookupName(int id)
{
return in.get().getDimension(dimensionName).get(id);
}
@Override
public int lookupId(String name)
{
return in.get().getDimension(dimensionName).indexOf(name);
}
};
}
};
}
}
}
)
);
aggPositionOffsets[i] = currAggSize;
currAggSize += agg.getMaxIntermediateSize();
@ -458,9 +460,7 @@ public class IncrementalIndex implements Iterable<Row>, Closeable
in.set(row);
int rowOffset = facts.get(key);
for (int i = 0; i < aggs.length; i++) {
synchronized (aggs[i]) {
aggs[i].aggregate(bufferHolder.get(), getMetricPosition(rowOffset, i));
}
}
in.set(null);
return numEntries.get();
@ -473,7 +473,7 @@ public class IncrementalIndex implements Iterable<Row>, Closeable
/**
*
* @return true if the underlying buffer for IncrementalIndex is full and cannot accomodate more rows.
* @return true if the underlying buffer for IncrementalIndex is full and cannot accommodate more rows.
*/
public boolean isFull()
{
@ -861,4 +861,45 @@ public class IncrementalIndex implements Iterable<Row>, Closeable
}
}
}
private static class ThreadSafeAggregator implements BufferAggregator
{
private final BufferAggregator delegate;
public ThreadSafeAggregator(BufferAggregator delegate)
{
this.delegate = delegate;
}
@Override
public synchronized void init(ByteBuffer buf, int position)
{
delegate.init(buf, position);
}
@Override
public synchronized void aggregate(ByteBuffer buf, int position)
{
delegate.aggregate(buf, position);
}
@Override
public synchronized Object get(ByteBuffer buf, int position)
{
return delegate.get(buf, position);
}
@Override
public synchronized float getFloat(ByteBuffer buf, int position)
{
return delegate.getFloat(buf, position);
}
@Override
public synchronized void close()
{
delegate.close();
}
}
}

View File

@ -36,7 +36,7 @@ import java.io.File;
*/
public class RealtimeTuningConfig implements TuningConfig
{
private static final int defaultBufferSize = 512 * 1024 * 1024;
private static final int defaultBufferSize = 256 * 1024 * 1024;
private static final Period defaultIntermediatePersistPeriod = new Period("PT10M");
private static final Period defaultWindowPeriod = new Period("PT10M");
private static final File defaultBasePersistDirectory = Files.createTempDir();

View File

@ -136,7 +136,7 @@ public class Sink implements Iterable<FireHydrant>
public boolean isFull()
{
synchronized (currHydrant){
return currHydrant.getIndex().isFull();
return currHydrant != null && currHydrant.getIndex().isFull();
}
}