Merge pull request #502 from metamx/fix-NPE-CQE

Better handle null adapters and NPEs in the CQE
This commit is contained in:
xvrl 2014-04-22 13:16:14 -07:00
commit d63107f890
11 changed files with 131 additions and 112 deletions

View File

@ -74,7 +74,7 @@
<dependency>
<groupId>com.metamx</groupId>
<artifactId>emitter</artifactId>
<version>0.2.10</version>
<version>0.2.11</version>
</dependency>
<dependency>
<groupId>com.metamx</groupId>

View File

@ -35,8 +35,10 @@ import com.metamx.common.logger.Logger;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
/**
@ -110,7 +112,11 @@ public class ChainedExecutionQueryRunner<T> implements QueryRunner<T>
if (input == null) {
throw new ISE("Input is null?! How is this possible?!");
}
return Sequences.toList(input.run(query), Lists.<T>newArrayList());
Sequence<T> result = input.run(query);
if (result == null) {
throw new ISE("Got a null result! Segments are missing!");
}
return Sequences.toList(result, Lists.<T>newArrayList());
}
catch (Exception e) {
log.error(e, "Exception with one of the sequences!");

View File

@ -70,7 +70,7 @@ public class GroupByQueryEngine
private final StupidPool<ByteBuffer> intermediateResultsBufferPool;
@Inject
public GroupByQueryEngine (
public GroupByQueryEngine(
Supplier<GroupByQueryConfig> config,
@Global StupidPool<ByteBuffer> intermediateResultsBufferPool
)
@ -81,6 +81,12 @@ public class GroupByQueryEngine
public Sequence<Row> process(final GroupByQuery query, StorageAdapter storageAdapter)
{
if (storageAdapter == null) {
throw new ISE(
"Null storage adapter found. Probably trying to issue a query against a segment being memory unmapped."
);
}
final List<Interval> intervals = query.getQuerySegmentSpec().getIntervals();
if (intervals.size() != 1) {
throw new IAE("Should only have one interval, got[%s]", intervals);
@ -187,8 +193,7 @@ public class GroupByQueryEngine
ByteBuffer newKey = key.duplicate();
newKey.putInt(dimSelector.getValueCardinality());
unaggregatedBuffers = updateValues(newKey, dims.subList(1, dims.size()));
}
else {
} else {
for (Integer dimValue : row) {
ByteBuffer newKey = key.duplicate();
newKey.putInt(dimValue);
@ -202,8 +207,7 @@ public class GroupByQueryEngine
retVal.addAll(unaggregatedBuffers);
}
return retVal;
}
else {
} else {
key.clear();
Integer position = positions.get(key);
int[] increments = positionMaintainer.getIncrements();
@ -267,8 +271,7 @@ public class GroupByQueryEngine
{
if (nextVal > max) {
return null;
}
else {
} else {
int retVal = (int) nextVal;
nextVal += increment;
return retVal;
@ -402,7 +405,7 @@ public class GroupByQueryEngine
final DimExtractionFn fn = dimensionSpecs.get(i).getDimExtractionFn();
final int dimVal = keyBuffer.getInt();
if (dimSelector.getValueCardinality() != dimVal) {
if(fn != null) {
if (fn != null) {
theEvent.put(dimNames.get(i), fn.apply(dimSelector.lookupName(dimVal)));
} else {
theEvent.put(dimNames.get(i), dimSelector.lookupName(dimVal));
@ -434,9 +437,10 @@ public class GroupByQueryEngine
throw new UnsupportedOperationException();
}
public void close() {
public void close()
{
// cleanup
for(BufferAggregator agg : aggregators) {
for (BufferAggregator agg : aggregators) {
agg.close();
}
}

View File

@ -104,47 +104,47 @@ public class SegmentMetadataQueryRunnerFactory implements QueryRunnerFactory<Seg
)
{
return new ConcatQueryRunner<SegmentAnalysis>(
Sequences.map(
Sequences.simple(queryRunners),
new Function<QueryRunner<SegmentAnalysis>, QueryRunner<SegmentAnalysis>>()
Sequences.map(
Sequences.simple(queryRunners),
new Function<QueryRunner<SegmentAnalysis>, QueryRunner<SegmentAnalysis>>()
{
@Override
public QueryRunner<SegmentAnalysis> apply(final QueryRunner<SegmentAnalysis> input)
{
return new QueryRunner<SegmentAnalysis>()
{
@Override
public QueryRunner<SegmentAnalysis> apply(final QueryRunner<SegmentAnalysis> input)
public Sequence<SegmentAnalysis> run(final Query<SegmentAnalysis> query)
{
return new QueryRunner<SegmentAnalysis>()
{
@Override
public Sequence<SegmentAnalysis> run(final Query<SegmentAnalysis> query)
{
Future<Sequence<SegmentAnalysis>> future = queryExecutor.submit(
new Callable<Sequence<SegmentAnalysis>>()
{
@Override
public Sequence<SegmentAnalysis> call() throws Exception
{
return new ExecutorExecutingSequence<SegmentAnalysis>(
input.run(query),
queryExecutor
);
}
}
);
try {
return future.get();
Future<Sequence<SegmentAnalysis>> future = queryExecutor.submit(
new Callable<Sequence<SegmentAnalysis>>()
{
@Override
public Sequence<SegmentAnalysis> call() throws Exception
{
return new ExecutorExecutingSequence<SegmentAnalysis>(
input.run(query),
queryExecutor
);
}
}
catch (InterruptedException e) {
throw Throwables.propagate(e);
}
catch (ExecutionException e) {
throw Throwables.propagate(e);
}
}
};
);
try {
return future.get();
}
catch (InterruptedException e) {
throw Throwables.propagate(e);
}
catch (ExecutionException e) {
throw Throwables.propagate(e);
}
}
}
)
);
};
}
}
)
);
}
@Override

View File

@ -32,7 +32,6 @@ import java.util.Map;
public class SegmentMetadataQuery extends BaseQuery<SegmentAnalysis>
{
private final ColumnIncluderator toInclude;
private final boolean merge;

View File

@ -55,7 +55,7 @@ import java.util.Map;
import java.util.TreeSet;
/**
*/
*/
public class SearchQueryRunner implements QueryRunner<Result<SearchResultValue>>
{
private static final EmittingLogger log = new EmittingLogger(SearchQueryRunner.class);
@ -99,12 +99,10 @@ public class SearchQueryRunner implements QueryRunner<Result<SearchResultValue>>
ConciseSet set = new ConciseSet();
set.add(0);
baseFilter = ImmutableConciseSet.newImmutableFromMutable(set);
}
else {
} else {
baseFilter = ImmutableConciseSet.complement(new ImmutableConciseSet(), index.getNumRows());
}
}
else {
} else {
baseFilter = filter.goConcise(new ColumnSelectorBitmapIndexSelector(index));
}
@ -133,49 +131,52 @@ public class SearchQueryRunner implements QueryRunner<Result<SearchResultValue>>
}
final StorageAdapter adapter = segment.asStorageAdapter();
if (adapter != null) {
Iterable<String> dimsToSearch;
if (dimensions == null || dimensions.isEmpty()) {
dimsToSearch = adapter.getAvailableDimensions();
} else {
dimsToSearch = dimensions;
if (adapter == null) {
log.makeAlert("WTF!? Unable to process search query on segment.")
.addData("segment", segment.getIdentifier())
.addData("query", query).emit();
throw new ISE(
"Null storage adapter found. Probably trying to issue a query against a segment being memory unmapped."
);
}
Iterable<String> dimsToSearch;
if (dimensions == null || dimensions.isEmpty()) {
dimsToSearch = adapter.getAvailableDimensions();
} else {
dimsToSearch = dimensions;
}
final TreeSet<SearchHit> retVal = Sets.newTreeSet(query.getSort().getComparator());
final Iterable<Cursor> cursors = adapter.makeCursors(filter, segment.getDataInterval(), QueryGranularity.ALL);
for (Cursor cursor : cursors) {
Map<String, DimensionSelector> dimSelectors = Maps.newHashMap();
for (String dim : dimsToSearch) {
dimSelectors.put(dim, cursor.makeDimensionSelector(dim));
}
final TreeSet<SearchHit> retVal = Sets.newTreeSet(query.getSort().getComparator());
final Iterable<Cursor> cursors = adapter.makeCursors(filter, segment.getDataInterval(), QueryGranularity.ALL);
for (Cursor cursor : cursors) {
Map<String, DimensionSelector> dimSelectors = Maps.newHashMap();
for (String dim : dimsToSearch) {
dimSelectors.put(dim, cursor.makeDimensionSelector(dim));
}
while (!cursor.isDone()) {
for (Map.Entry<String, DimensionSelector> entry : dimSelectors.entrySet()) {
final DimensionSelector selector = entry.getValue();
final IndexedInts vals = selector.getRow();
for (int i = 0; i < vals.size(); ++i) {
final String dimVal = selector.lookupName(vals.get(i));
if (searchQuerySpec.accept(dimVal)) {
retVal.add(new SearchHit(entry.getKey(), dimVal));
if (retVal.size() >= limit) {
return makeReturnResult(limit, retVal);
}
while (!cursor.isDone()) {
for (Map.Entry<String, DimensionSelector> entry : dimSelectors.entrySet()) {
final DimensionSelector selector = entry.getValue();
final IndexedInts vals = selector.getRow();
for (int i = 0; i < vals.size(); ++i) {
final String dimVal = selector.lookupName(vals.get(i));
if (searchQuerySpec.accept(dimVal)) {
retVal.add(new SearchHit(entry.getKey(), dimVal));
if (retVal.size() >= limit) {
return makeReturnResult(limit, retVal);
}
}
}
cursor.advance();
}
}
return makeReturnResult(limit, retVal);
cursor.advance();
}
}
log.makeAlert("WTF!? Unable to process search query on segment.")
.addData("segment", segment.getIdentifier())
.addData("query", query);
return Sequences.empty();
return makeReturnResult(limit, retVal);
}
private Sequence<Result<SearchResultValue>> makeReturnResult(int limit, TreeSet<SearchHit> retVal)

View File

@ -22,6 +22,7 @@ package io.druid.query.select;
import com.google.common.base.Function;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.metamx.common.ISE;
import com.metamx.common.guava.BaseSequence;
import com.metamx.common.guava.Sequence;
import io.druid.query.QueryRunnerHelper;
@ -54,6 +55,12 @@ public class SelectQueryEngine
{
final StorageAdapter adapter = segment.asStorageAdapter();
if (adapter == null) {
throw new ISE(
"Null storage adapter found. Probably trying to issue a query against a segment being memory unmapped."
);
}
final Iterable<String> dims;
if (query.getDimensions() == null || query.getDimensions().isEmpty()) {
dims = adapter.getAvailableDimensions();

View File

@ -87,6 +87,12 @@ public class TimeBoundaryQueryRunnerFactory
@Override
public Iterator<Result<TimeBoundaryResultValue>> make()
{
if (adapter == null) {
throw new ISE(
"Null storage adapter found. Probably trying to issue a query against a segment being memory unmapped."
);
}
return legacyQuery.buildResult(
adapter.getInterval().getStart(),
adapter.getMinTime(),

View File

@ -20,6 +20,7 @@
package io.druid.query.timeseries;
import com.google.common.base.Function;
import com.metamx.common.ISE;
import com.metamx.common.guava.BaseSequence;
import com.metamx.common.guava.Sequence;
import io.druid.query.QueryRunnerHelper;
@ -40,6 +41,12 @@ public class TimeseriesQueryEngine
{
public Sequence<Result<TimeseriesResultValue>> process(final TimeseriesQuery query, final StorageAdapter adapter)
{
if (adapter == null) {
throw new ISE(
"Null storage adapter found. Probably trying to issue a query against a segment being memory unmapped."
);
}
return new BaseSequence<Result<TimeseriesResultValue>, Iterator<Result<TimeseriesResultValue>>>(
new BaseSequence.IteratorMaker<Result<TimeseriesResultValue>, Iterator<Result<TimeseriesResultValue>>>()
{

View File

@ -21,7 +21,7 @@ package io.druid.query.topn;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.metamx.common.ISE;
import com.metamx.common.guava.FunctionalIterable;
import com.metamx.common.logger.Logger;
import io.druid.collections.StupidPool;
@ -53,6 +53,12 @@ public class TopNQueryEngine
public Iterable<Result<TopNResultValue>> query(final TopNQuery query, final StorageAdapter adapter)
{
if (adapter == null) {
throw new ISE(
"Null storage adapter found. Probably trying to issue a query against a segment being memory unmapped."
);
}
final List<Interval> queryIntervals = query.getQuerySegmentSpec().getIntervals();
final Filter filter = Filters.convertDimensionFilters(query.getDimensionsFilter());
final QueryGranularity granularity = query.getGranularity();
@ -62,10 +68,6 @@ public class TopNQueryEngine
queryIntervals.size() == 1, "Can only handle a single interval, got[%s]", queryIntervals
);
if (mapFn == null) {
return Lists.newArrayList();
}
return FunctionalIterable
.create(adapter.makeCursors(filter, queryIntervals.get(0), granularity))
.transform(
@ -84,13 +86,6 @@ public class TopNQueryEngine
private Function<Cursor, Result<TopNResultValue>> getMapFn(TopNQuery query, final StorageAdapter adapter)
{
if (adapter == null) {
log.warn(
"Null storage adapter found. Probably trying to issue a query against a segment being memory unmapped. Returning empty results."
);
return null;
}
final Capabilities capabilities = adapter.getCapabilities();
final int cardinality = adapter.getDimensionCardinality(query.getDimensionSpec().getDimension());
int numBytesPerRecord = 0;

View File

@ -29,8 +29,7 @@ import com.google.common.io.Closeables;
import com.google.inject.Inject;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import com.metamx.common.logger.Logger;
import com.metamx.emitter.service.AlertEvent;
import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.guice.annotations.Json;
@ -57,7 +56,7 @@ import java.util.UUID;
@Path("/druid/v2/")
public class QueryResource
{
private static final Logger log = new Logger(QueryResource.class);
private static final EmittingLogger log = new EmittingLogger(QueryResource.class);
private static final Charset UTF8 = Charset.forName("UTF-8");
private static final Joiner COMMA_JOIN = Joiner.on(",");
@ -192,16 +191,11 @@ public class QueryResource
log.error(e2, "Unable to log query [%s]!", queryString);
}
emitter.emit(
new AlertEvent.Builder().build(
"Exception handling request",
ImmutableMap.<String, Object>builder()
.put("exception", e.toString())
.put("query", queryString)
.put("peer", req.getRemoteAddr())
.build()
)
);
log.makeAlert(e, "Exception handling request")
.addData("exception", e.toString())
.addData("query", queryString)
.addData("peer", req.getRemoteAddr())
.emit();
}
finally {
resp.flushBuffer();