change the way to use RetryQueryRunnerConfig

This commit is contained in:
jisookim0513 2014-06-18 17:15:04 -07:00
parent d8430b854d
commit f4b1dc032b
3 changed files with 12 additions and 6 deletions

View File

@ -32,11 +32,13 @@ public class RetryQueryRunner<T> implements QueryRunner<T>
{ {
private final QueryRunner<T> baseRunner; private final QueryRunner<T> baseRunner;
private final QueryToolChest<T, Query<T>> toolChest; private final QueryToolChest<T, Query<T>> toolChest;
private final RetryQueryRunnerConfig config;
public RetryQueryRunner(QueryRunner<T> baseRunner, QueryToolChest<T, Query<T>> toolChest) public RetryQueryRunner(QueryRunner<T> baseRunner, QueryToolChest<T, Query<T>> toolChest, RetryQueryRunnerConfig config)
{ {
this.baseRunner = baseRunner; this.baseRunner = baseRunner;
this.toolChest = toolChest; this.toolChest = toolChest;
this.config = config;
} }
@Override @Override
@ -44,7 +46,7 @@ public class RetryQueryRunner<T> implements QueryRunner<T>
{ {
Sequence<T> returningSeq = baseRunner.run(query, metadata); Sequence<T> returningSeq = baseRunner.run(query, metadata);
for (int i = RetryQueryRunnerConfig.numTries(); i > 0; i--) { for (int i = config.numTries(); i > 0; i--) {
for (int j = metadata.get("missingSegments").size(); j > 0; j--) { for (int j = metadata.get("missingSegments").size(); j > 0; j--) {
QuerySegmentSpec segmentSpec = new SpecificSegmentSpec((SegmentDescriptor)metadata.get("missingSegments").remove(0)); QuerySegmentSpec segmentSpec = new SpecificSegmentSpec((SegmentDescriptor)metadata.get("missingSegments").remove(0));
returningSeq = toolChest.mergeSequences( returningSeq = toolChest.mergeSequences(

View File

@ -24,7 +24,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
public class RetryQueryRunnerConfig public class RetryQueryRunnerConfig
{ {
@JsonProperty @JsonProperty
private static int numTries = 1; private int numTries = 1;
public static int numTries() { return numTries; } public int numTries() { return numTries; }
} }

View File

@ -32,6 +32,7 @@ import io.druid.query.QuerySegmentWalker;
import io.druid.query.QueryToolChest; import io.druid.query.QueryToolChest;
import io.druid.query.QueryToolChestWarehouse; import io.druid.query.QueryToolChestWarehouse;
import io.druid.query.RetryQueryRunner; import io.druid.query.RetryQueryRunner;
import io.druid.query.RetryQueryRunnerConfig;
import io.druid.query.SegmentDescriptor; import io.druid.query.SegmentDescriptor;
import io.druid.query.UnionQueryRunner; import io.druid.query.UnionQueryRunner;
import org.joda.time.Interval; import org.joda.time.Interval;
@ -45,17 +46,20 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker
private final ServiceEmitter emitter; private final ServiceEmitter emitter;
private final CachingClusteredClient baseClient; private final CachingClusteredClient baseClient;
private final QueryToolChestWarehouse warehouse; private final QueryToolChestWarehouse warehouse;
private final RetryQueryRunnerConfig retryConfig;
@Inject @Inject
public ClientQuerySegmentWalker( public ClientQuerySegmentWalker(
ServiceEmitter emitter, ServiceEmitter emitter,
CachingClusteredClient baseClient, CachingClusteredClient baseClient,
QueryToolChestWarehouse warehouse QueryToolChestWarehouse warehouse,
RetryQueryRunnerConfig retryConfig
) )
{ {
this.emitter = emitter; this.emitter = emitter;
this.baseClient = baseClient; this.baseClient = baseClient;
this.warehouse = warehouse; this.warehouse = warehouse;
this.retryConfig = retryConfig;
} }
@Override @Override
@ -87,7 +91,7 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker
return toolChest.makeMetricBuilder(query); return toolChest.makeMetricBuilder(query);
} }
}, },
toolChest.preMergeQueryDecoration(new RetryQueryRunner<T>(baseClient, toolChest) toolChest.preMergeQueryDecoration(new RetryQueryRunner<T>(baseClient, toolChest, retryConfig)
) )
).withWaitMeasuredFromNow(), ).withWaitMeasuredFromNow(),
toolChest toolChest