fix concurrency issue with the map; introduce new exception; add incomplete retry query runner test

This commit is contained in:
jisookim0513 2014-06-19 14:14:54 -07:00
parent 8515a11787
commit 35e080bbc1
8 changed files with 149 additions and 18 deletions

View File

@ -21,8 +21,8 @@ package io.druid.query;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import io.druid.query.spec.QuerySegmentSpec;
import io.druid.query.spec.SpecificSegmentSpec;
import io.druid.query.spec.MultipleSpecificSegmentSpec;
import io.druid.segment.SegmentMissingException;
import java.util.Arrays;
import java.util.List;
@ -30,6 +30,7 @@ import java.util.Map;
public class RetryQueryRunner<T> implements QueryRunner<T>
{
public static String missingSegments = "missingSegments";
private final QueryRunner<T> baseRunner;
private final QueryToolChest<T, Query<T>> toolChest;
private final RetryQueryRunnerConfig config;
@ -46,23 +47,32 @@ public class RetryQueryRunner<T> implements QueryRunner<T>
{
Sequence<T> returningSeq = baseRunner.run(query, context);
for (int i = config.numTries(); i > 0; i--) {
for (int j = ((List)context.get("missingSegments")).size(); j > 0; j--) {
QuerySegmentSpec segmentSpec = new SpecificSegmentSpec((SegmentDescriptor)((List) context.get("missingSegments")).remove(0));
for (int i = config.numTries(); i > 0 && !((List)context.get(missingSegments)).isEmpty(); i--) {
List<SegmentDescriptor> segList= (List<SegmentDescriptor>)context.get(missingSegments);
((List)context.get(missingSegments)).clear();
returningSeq = toolChest.mergeSequences(
Sequences.simple(
Arrays.asList(
returningSeq,
baseRunner.run(
query.withQuerySegmentSpec(segmentSpec),
query.withQuerySegmentSpec(new MultipleSpecificSegmentSpec(segList)),
context
)
)
)
);
}
if (!config.returnPartialResults() && !((List)context.get(missingSegments)).isEmpty()) {
String failedSegments = "";
for (SegmentDescriptor segment : (List<SegmentDescriptor>) context.get("missingSegments")) {
failedSegments = failedSegments + segment.toString() + " ";
}
throw new SegmentMissingException("The following segments are missing: " + failedSegments);
}
return returningSeq;
}
}

View File

@ -25,6 +25,8 @@ public class RetryQueryRunnerConfig
{
@JsonProperty
private int numTries = 1;
private boolean returnPartialResults = false;
public int numTries() { return numTries; }
public boolean returnPartialResults() { return returnPartialResults; }
}

View File

@ -27,7 +27,7 @@ import com.metamx.common.guava.Yielder;
import com.metamx.common.guava.YieldingAccumulator;
import io.druid.query.Query;
import io.druid.query.QueryRunner;
import io.druid.segment.NullStorageAdapterException;
import io.druid.segment.SegmentMissingException;
import java.io.IOException;
import java.util.List;
@ -67,7 +67,7 @@ public class SpecificSegmentQueryRunner<T> implements QueryRunner<T>
Sequence<T> returningSeq;
try {
returningSeq = base.run(query, context);
} catch (NullStorageAdapterException e) {
} catch (SegmentMissingException e) {
((List)context.get("missingSegments")).add(((SpecificSegmentSpec) specificSpec).getDescriptor());
returningSeq = Sequences.empty();
}

View File

@ -27,7 +27,7 @@ import io.druid.query.aggregation.Aggregator;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.PostAggregator;
import io.druid.segment.Cursor;
import io.druid.segment.NullStorageAdapterException;
import io.druid.segment.SegmentMissingException;
import io.druid.segment.StorageAdapter;
import io.druid.segment.filter.Filters;
@ -40,7 +40,7 @@ public class TimeseriesQueryEngine
public Sequence<Result<TimeseriesResultValue>> process(final TimeseriesQuery query, final StorageAdapter adapter)
{
if (adapter == null) {
throw new NullStorageAdapterException(
throw new SegmentMissingException(
"Null storage adapter found. Probably trying to issue a query against a segment being memory unmapped."
);
}

View File

@ -32,7 +32,7 @@ import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.filter.Filter;
import io.druid.segment.Capabilities;
import io.druid.segment.Cursor;
import io.druid.segment.NullStorageAdapterException;
import io.druid.segment.SegmentMissingException;
import io.druid.segment.StorageAdapter;
import io.druid.segment.filter.Filters;
import org.joda.time.Interval;
@ -56,7 +56,7 @@ public class TopNQueryEngine
public Sequence<Result<TopNResultValue>> query(final TopNQuery query, final StorageAdapter adapter)
{
if (adapter == null) {
throw new NullStorageAdapterException(
throw new SegmentMissingException(
"Null storage adapter found. Probably trying to issue a query against a segment being memory unmapped."
);
}

View File

@ -19,9 +19,11 @@
package io.druid.segment;
public class NullStorageAdapterException extends IllegalStateException
import com.metamx.common.ISE;
public class SegmentMissingException extends ISE
{
public NullStorageAdapterException(String formatText, Object... arguments) {
public SegmentMissingException(String formatText, Object... arguments) {
super(String.format(formatText, arguments));
}
}

View File

@ -0,0 +1,116 @@
package io.druid.query;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.base.Function;
import com.google.common.collect.Lists;
import com.google.common.collect.MapMaker;
import com.google.common.collect.Ordering;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.collections.OrderedMergeSequence;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.LongSumAggregatorFactory;
import io.druid.query.aggregation.MetricManipulationFn;
import io.druid.query.timeseries.TimeseriesQuery;
import io.druid.query.timeseries.TimeseriesResultValue;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Test;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
public class RetryQueryRunnerTest
{
final TimeseriesQuery query = Druids.newTimeseriesQueryBuilder()
.dataSource(QueryRunnerTestHelper.dataSource)
.granularity(QueryRunnerTestHelper.dayGran)
.intervals(QueryRunnerTestHelper.firstToThird)
.aggregators(
Arrays.<AggregatorFactory>asList(
QueryRunnerTestHelper.rowsCount,
new LongSumAggregatorFactory(
"idx",
"index"
),
QueryRunnerTestHelper.qualityUniques
)
)
.build();
@Test
public void testRunWithMissingSegments() throws Exception
{
Map<String, Object> context = new MapMaker().makeMap();
context.put("missingSegments", Lists.newArrayList());
RetryQueryRunner runner = new RetryQueryRunner(
new QueryRunner()
{
@Override
public Sequence run(Query query, Map context)
{
((List)context.get(RetryQueryRunner.missingSegments)).add(new SegmentDescriptor(new Interval(178888, 1999999), "test", 1));
return Sequences.empty();
}
},
new QueryToolChest()
{
@Override
public QueryRunner mergeResults(QueryRunner runner)
{
return null;
}
@Override
public Sequence mergeSequences(Sequence seqOfSequences)
{
return new OrderedMergeSequence<Result<TimeseriesResultValue>>(getOrdering(), seqOfSequences);
}
@Override
public ServiceMetricEvent.Builder makeMetricBuilder(Query query)
{
return null;
}
@Override
public Function makePreComputeManipulatorFn(
Query query, MetricManipulationFn fn
)
{
return null;
}
@Override
public TypeReference getResultTypeReference()
{
return null;
}
public Ordering<Result<TimeseriesResultValue>> getOrdering()
{
return Ordering.natural();
}
},
new RetryQueryRunnerConfig()
{
private int numTries = 1;
private boolean returnPartialResults = true;
public int numTries() { return numTries; }
public boolean returnPartialResults() { return returnPartialResults; }
}
);
Iterable<Result<TimeseriesResultValue>> actualResults = Sequences.toList(
runner.run(query, context),
Lists.<Result<TimeseriesResultValue>>newArrayList()
);
Assert.assertTrue("Should return an empty sequence as a result", ((List) actualResults).size() == 0);
}
}

View File

@ -22,9 +22,11 @@ package io.druid.server;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectWriter;
import com.google.api.client.util.Lists;
import com.google.common.base.Charsets;
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.MapMaker;
import com.google.common.io.ByteStreams;
import com.google.inject.Inject;
import com.metamx.common.guava.Sequence;
@ -57,9 +59,8 @@ import javax.ws.rs.core.Response;
import javax.ws.rs.core.StreamingOutput;
import java.io.IOException;
import java.io.OutputStream;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
/**
@ -142,8 +143,8 @@ public class QueryResource
log.debug("Got query [%s]", query);
}
HashMap<String, Object> context = new HashMap<String, Object>();
context.put("missingSegments", new LinkedList());
Map<String, Object> context = new MapMaker().makeMap();
context.put("missingSegments", Lists.newArrayList());
Sequence results = query.run(texasRanger, context);
if (results == null) {