Better handle null adapters and NPEs in the CQE

This commit is contained in:
fjy 2014-04-22 12:47:53 -07:00
parent 560045ddd2
commit 8fd39c63d5
10 changed files with 140 additions and 99 deletions

View File

@ -74,7 +74,7 @@
<dependency> <dependency>
<groupId>com.metamx</groupId> <groupId>com.metamx</groupId>
<artifactId>emitter</artifactId> <artifactId>emitter</artifactId>
<version>0.2.10</version> <version>0.2.11</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>com.metamx</groupId> <groupId>com.metamx</groupId>

View File

@ -35,8 +35,10 @@ import com.metamx.common.logger.Logger;
import java.util.Arrays; import java.util.Arrays;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future; import java.util.concurrent.Future;
/** /**
@ -110,7 +112,11 @@ public class ChainedExecutionQueryRunner<T> implements QueryRunner<T>
if (input == null) { if (input == null) {
throw new ISE("Input is null?! How is this possible?!"); throw new ISE("Input is null?! How is this possible?!");
} }
return Sequences.toList(input.run(query), Lists.<T>newArrayList()); Sequence<T> result = input.run(query);
if (result == null) {
throw new ISE("Got a null result! Segments are missing!");
}
return Sequences.toList(result, Lists.<T>newArrayList());
} }
catch (Exception e) { catch (Exception e) {
log.error(e, "Exception with one of the sequences!"); log.error(e, "Exception with one of the sequences!");
@ -156,4 +162,20 @@ public class ChainedExecutionQueryRunner<T> implements QueryRunner<T>
} }
); );
} }
public static void main(String[] args) throws Exception
{
ExecutorService foo = Executors.newFixedThreadPool(1);
Future test = foo.submit(
new Callable<List>()
{
@Override
public List call() throws Exception
{
throw new ISE("");
}
}
);
System.out.println(Lists.newArrayList(test));
}
} }

View File

@ -70,7 +70,7 @@ public class GroupByQueryEngine
private final StupidPool<ByteBuffer> intermediateResultsBufferPool; private final StupidPool<ByteBuffer> intermediateResultsBufferPool;
@Inject @Inject
public GroupByQueryEngine ( public GroupByQueryEngine(
Supplier<GroupByQueryConfig> config, Supplier<GroupByQueryConfig> config,
@Global StupidPool<ByteBuffer> intermediateResultsBufferPool @Global StupidPool<ByteBuffer> intermediateResultsBufferPool
) )
@ -81,6 +81,12 @@ public class GroupByQueryEngine
public Sequence<Row> process(final GroupByQuery query, StorageAdapter storageAdapter) public Sequence<Row> process(final GroupByQuery query, StorageAdapter storageAdapter)
{ {
if (storageAdapter == null) {
throw new ISE(
"Null storage adapter found. Probably trying to issue a query against a segment being memory unmapped."
);
}
final List<Interval> intervals = query.getQuerySegmentSpec().getIntervals(); final List<Interval> intervals = query.getQuerySegmentSpec().getIntervals();
if (intervals.size() != 1) { if (intervals.size() != 1) {
throw new IAE("Should only have one interval, got[%s]", intervals); throw new IAE("Should only have one interval, got[%s]", intervals);
@ -187,8 +193,7 @@ public class GroupByQueryEngine
ByteBuffer newKey = key.duplicate(); ByteBuffer newKey = key.duplicate();
newKey.putInt(dimSelector.getValueCardinality()); newKey.putInt(dimSelector.getValueCardinality());
unaggregatedBuffers = updateValues(newKey, dims.subList(1, dims.size())); unaggregatedBuffers = updateValues(newKey, dims.subList(1, dims.size()));
} } else {
else {
for (Integer dimValue : row) { for (Integer dimValue : row) {
ByteBuffer newKey = key.duplicate(); ByteBuffer newKey = key.duplicate();
newKey.putInt(dimValue); newKey.putInt(dimValue);
@ -202,8 +207,7 @@ public class GroupByQueryEngine
retVal.addAll(unaggregatedBuffers); retVal.addAll(unaggregatedBuffers);
} }
return retVal; return retVal;
} } else {
else {
key.clear(); key.clear();
Integer position = positions.get(key); Integer position = positions.get(key);
int[] increments = positionMaintainer.getIncrements(); int[] increments = positionMaintainer.getIncrements();
@ -267,8 +271,7 @@ public class GroupByQueryEngine
{ {
if (nextVal > max) { if (nextVal > max) {
return null; return null;
} } else {
else {
int retVal = (int) nextVal; int retVal = (int) nextVal;
nextVal += increment; nextVal += increment;
return retVal; return retVal;
@ -402,7 +405,7 @@ public class GroupByQueryEngine
final DimExtractionFn fn = dimensionSpecs.get(i).getDimExtractionFn(); final DimExtractionFn fn = dimensionSpecs.get(i).getDimExtractionFn();
final int dimVal = keyBuffer.getInt(); final int dimVal = keyBuffer.getInt();
if (dimSelector.getValueCardinality() != dimVal) { if (dimSelector.getValueCardinality() != dimVal) {
if(fn != null) { if (fn != null) {
theEvent.put(dimNames.get(i), fn.apply(dimSelector.lookupName(dimVal))); theEvent.put(dimNames.get(i), fn.apply(dimSelector.lookupName(dimVal)));
} else { } else {
theEvent.put(dimNames.get(i), dimSelector.lookupName(dimVal)); theEvent.put(dimNames.get(i), dimSelector.lookupName(dimVal));
@ -434,9 +437,10 @@ public class GroupByQueryEngine
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }
public void close() { public void close()
{
// cleanup // cleanup
for(BufferAggregator agg : aggregators) { for (BufferAggregator agg : aggregators) {
agg.close(); agg.close();
} }
} }

View File

@ -32,7 +32,6 @@ import java.util.Map;
public class SegmentMetadataQuery extends BaseQuery<SegmentAnalysis> public class SegmentMetadataQuery extends BaseQuery<SegmentAnalysis>
{ {
private final ColumnIncluderator toInclude; private final ColumnIncluderator toInclude;
private final boolean merge; private final boolean merge;

View File

@ -55,7 +55,7 @@ import java.util.Map;
import java.util.TreeSet; import java.util.TreeSet;
/** /**
*/ */
public class SearchQueryRunner implements QueryRunner<Result<SearchResultValue>> public class SearchQueryRunner implements QueryRunner<Result<SearchResultValue>>
{ {
private static final EmittingLogger log = new EmittingLogger(SearchQueryRunner.class); private static final EmittingLogger log = new EmittingLogger(SearchQueryRunner.class);
@ -99,12 +99,10 @@ public class SearchQueryRunner implements QueryRunner<Result<SearchResultValue>>
ConciseSet set = new ConciseSet(); ConciseSet set = new ConciseSet();
set.add(0); set.add(0);
baseFilter = ImmutableConciseSet.newImmutableFromMutable(set); baseFilter = ImmutableConciseSet.newImmutableFromMutable(set);
} } else {
else {
baseFilter = ImmutableConciseSet.complement(new ImmutableConciseSet(), index.getNumRows()); baseFilter = ImmutableConciseSet.complement(new ImmutableConciseSet(), index.getNumRows());
} }
} } else {
else {
baseFilter = filter.goConcise(new ColumnSelectorBitmapIndexSelector(index)); baseFilter = filter.goConcise(new ColumnSelectorBitmapIndexSelector(index));
} }
@ -133,7 +131,16 @@ public class SearchQueryRunner implements QueryRunner<Result<SearchResultValue>>
} }
final StorageAdapter adapter = segment.asStorageAdapter(); final StorageAdapter adapter = segment.asStorageAdapter();
if (adapter != null) {
if (adapter == null) {
log.makeAlert("WTF!? Unable to process search query on segment.")
.addData("segment", segment.getIdentifier())
.addData("query", query).emit();
throw new ISE(
"Null storage adapter found. Probably trying to issue a query against a segment being memory unmapped."
);
}
Iterable<String> dimsToSearch; Iterable<String> dimsToSearch;
if (dimensions == null || dimensions.isEmpty()) { if (dimensions == null || dimensions.isEmpty()) {
dimsToSearch = adapter.getAvailableDimensions(); dimsToSearch = adapter.getAvailableDimensions();
@ -172,12 +179,6 @@ public class SearchQueryRunner implements QueryRunner<Result<SearchResultValue>>
return makeReturnResult(limit, retVal); return makeReturnResult(limit, retVal);
} }
log.makeAlert("WTF!? Unable to process search query on segment.")
.addData("segment", segment.getIdentifier())
.addData("query", query);
return Sequences.empty();
}
private Sequence<Result<SearchResultValue>> makeReturnResult(int limit, TreeSet<SearchHit> retVal) private Sequence<Result<SearchResultValue>> makeReturnResult(int limit, TreeSet<SearchHit> retVal)
{ {
return Sequences.simple( return Sequences.simple(

View File

@ -22,6 +22,7 @@ package io.druid.query.select;
import com.google.common.base.Function; import com.google.common.base.Function;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.metamx.common.ISE;
import com.metamx.common.guava.BaseSequence; import com.metamx.common.guava.BaseSequence;
import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequence;
import io.druid.query.QueryRunnerHelper; import io.druid.query.QueryRunnerHelper;
@ -54,6 +55,12 @@ public class SelectQueryEngine
{ {
final StorageAdapter adapter = segment.asStorageAdapter(); final StorageAdapter adapter = segment.asStorageAdapter();
if (adapter == null) {
throw new ISE(
"Null storage adapter found. Probably trying to issue a query against a segment being memory unmapped."
);
}
final Iterable<String> dims; final Iterable<String> dims;
if (query.getDimensions() == null || query.getDimensions().isEmpty()) { if (query.getDimensions() == null || query.getDimensions().isEmpty()) {
dims = adapter.getAvailableDimensions(); dims = adapter.getAvailableDimensions();

View File

@ -87,6 +87,12 @@ public class TimeBoundaryQueryRunnerFactory
@Override @Override
public Iterator<Result<TimeBoundaryResultValue>> make() public Iterator<Result<TimeBoundaryResultValue>> make()
{ {
if (adapter == null) {
throw new ISE(
"Null storage adapter found. Probably trying to issue a query against a segment being memory unmapped."
);
}
return legacyQuery.buildResult( return legacyQuery.buildResult(
adapter.getInterval().getStart(), adapter.getInterval().getStart(),
adapter.getMinTime(), adapter.getMinTime(),

View File

@ -20,6 +20,7 @@
package io.druid.query.timeseries; package io.druid.query.timeseries;
import com.google.common.base.Function; import com.google.common.base.Function;
import com.metamx.common.ISE;
import com.metamx.common.guava.BaseSequence; import com.metamx.common.guava.BaseSequence;
import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequence;
import io.druid.query.QueryRunnerHelper; import io.druid.query.QueryRunnerHelper;
@ -40,6 +41,12 @@ public class TimeseriesQueryEngine
{ {
public Sequence<Result<TimeseriesResultValue>> process(final TimeseriesQuery query, final StorageAdapter adapter) public Sequence<Result<TimeseriesResultValue>> process(final TimeseriesQuery query, final StorageAdapter adapter)
{ {
if (adapter == null) {
throw new ISE(
"Null storage adapter found. Probably trying to issue a query against a segment being memory unmapped."
);
}
return new BaseSequence<Result<TimeseriesResultValue>, Iterator<Result<TimeseriesResultValue>>>( return new BaseSequence<Result<TimeseriesResultValue>, Iterator<Result<TimeseriesResultValue>>>(
new BaseSequence.IteratorMaker<Result<TimeseriesResultValue>, Iterator<Result<TimeseriesResultValue>>>() new BaseSequence.IteratorMaker<Result<TimeseriesResultValue>, Iterator<Result<TimeseriesResultValue>>>()
{ {

View File

@ -21,7 +21,7 @@ package io.druid.query.topn;
import com.google.common.base.Function; import com.google.common.base.Function;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.collect.Lists; import com.metamx.common.ISE;
import com.metamx.common.guava.FunctionalIterable; import com.metamx.common.guava.FunctionalIterable;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
import io.druid.collections.StupidPool; import io.druid.collections.StupidPool;
@ -53,6 +53,12 @@ public class TopNQueryEngine
public Iterable<Result<TopNResultValue>> query(final TopNQuery query, final StorageAdapter adapter) public Iterable<Result<TopNResultValue>> query(final TopNQuery query, final StorageAdapter adapter)
{ {
if (adapter == null) {
throw new ISE(
"Null storage adapter found. Probably trying to issue a query against a segment being memory unmapped."
);
}
final List<Interval> queryIntervals = query.getQuerySegmentSpec().getIntervals(); final List<Interval> queryIntervals = query.getQuerySegmentSpec().getIntervals();
final Filter filter = Filters.convertDimensionFilters(query.getDimensionsFilter()); final Filter filter = Filters.convertDimensionFilters(query.getDimensionsFilter());
final QueryGranularity granularity = query.getGranularity(); final QueryGranularity granularity = query.getGranularity();
@ -62,10 +68,6 @@ public class TopNQueryEngine
queryIntervals.size() == 1, "Can only handle a single interval, got[%s]", queryIntervals queryIntervals.size() == 1, "Can only handle a single interval, got[%s]", queryIntervals
); );
if (mapFn == null) {
return Lists.newArrayList();
}
return FunctionalIterable return FunctionalIterable
.create(adapter.makeCursors(filter, queryIntervals.get(0), granularity)) .create(adapter.makeCursors(filter, queryIntervals.get(0), granularity))
.transform( .transform(
@ -84,13 +86,6 @@ public class TopNQueryEngine
private Function<Cursor, Result<TopNResultValue>> getMapFn(TopNQuery query, final StorageAdapter adapter) private Function<Cursor, Result<TopNResultValue>> getMapFn(TopNQuery query, final StorageAdapter adapter)
{ {
if (adapter == null) {
log.warn(
"Null storage adapter found. Probably trying to issue a query against a segment being memory unmapped. Returning empty results."
);
return null;
}
final Capabilities capabilities = adapter.getCapabilities(); final Capabilities capabilities = adapter.getCapabilities();
final int cardinality = adapter.getDimensionCardinality(query.getDimensionSpec().getDimension()); final int cardinality = adapter.getDimensionCardinality(query.getDimensionSpec().getDimension());
int numBytesPerRecord = 0; int numBytesPerRecord = 0;