1) Make RealtimeIndexTask merge results like it should

This commit is contained in:
cheddar 2013-05-01 13:08:13 -05:00
parent e1a2c43baf
commit e21e85b9fa
3 changed files with 49 additions and 36 deletions

View File

@ -28,6 +28,7 @@ import com.google.common.io.Closeables;
import com.metamx.common.exception.FormattedException;
import com.metamx.druid.Query;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.coordination.DataSegmentAnnouncer;
import com.metamx.druid.index.v1.IndexGranularity;
import com.metamx.druid.input.InputRow;
import com.metamx.druid.merger.common.TaskLock;
@ -37,18 +38,20 @@ import com.metamx.druid.merger.common.actions.LockAcquireAction;
import com.metamx.druid.merger.common.actions.LockListAction;
import com.metamx.druid.merger.common.actions.LockReleaseAction;
import com.metamx.druid.merger.common.actions.SegmentInsertAction;
import com.metamx.druid.query.FinalizeResultsQueryRunner;
import com.metamx.druid.query.QueryRunner;
import com.metamx.druid.query.QueryRunnerFactory;
import com.metamx.druid.query.QueryToolChest;
import com.metamx.druid.realtime.FireDepartmentConfig;
import com.metamx.druid.realtime.FireDepartmentMetrics;
import com.metamx.druid.realtime.Firehose;
import com.metamx.druid.realtime.FirehoseFactory;
import com.metamx.druid.realtime.GracefulShutdownFirehose;
import com.metamx.druid.realtime.MinTimeFirehose;
import com.metamx.druid.realtime.Schema;
import com.metamx.druid.realtime.SegmentPublisher;
import com.metamx.druid.realtime.plumber.Plumber;
import com.metamx.druid.realtime.plumber.RealtimePlumberSchool;
import com.metamx.druid.realtime.Schema;
import com.metamx.druid.coordination.DataSegmentAnnouncer;
import com.metamx.druid.realtime.SegmentPublisher;
import com.metamx.druid.realtime.plumber.Sink;
import com.metamx.druid.realtime.plumber.VersioningPolicy;
import com.metamx.emitter.EmittingLogger;
@ -84,6 +87,9 @@ public class RealtimeIndexTask extends AbstractTask
@JsonIgnore
private volatile Plumber plumber = null;
@JsonIgnore
private volatile TaskToolbox toolbox = null;
@JsonIgnore
private volatile GracefulShutdownFirehose firehose = null;
@ -141,7 +147,10 @@ public class RealtimeIndexTask extends AbstractTask
public <T> QueryRunner<T> getQueryRunner(Query<T> query)
{
if (plumber != null) {
return plumber.getQueryRunner(query);
QueryRunnerFactory<T, Query<T>> factory = toolbox.getQueryRunnerFactoryConglomerate().findFactory(query);
QueryToolChest<T, Query<T>> toolChest = factory.getToolchest();
return new FinalizeResultsQueryRunner<T>(plumber.getQueryRunner(query), toolChest);
} else {
return null;
}
@ -257,6 +266,7 @@ public class RealtimeIndexTask extends AbstractTask
realtimePlumberSchool.setServerView(toolbox.getNewSegmentServerView());
realtimePlumberSchool.setServiceEmitter(toolbox.getEmitter());
this.toolbox = toolbox;
this.plumber = realtimePlumberSchool.findPlumber(schema, metrics);
try {

View File

@ -225,7 +225,7 @@ public class RealtimeManager implements QuerySegmentWalker
QueryRunnerFactory<T, Query<T>> factory = conglomerate.findFactory(query);
QueryToolChest<T, Query<T>> toolChest = factory.getToolchest();
return new FinalizeResultsQueryRunner<T>(toolChest.mergeResults(plumber.getQueryRunner(query)), toolChest);
return new FinalizeResultsQueryRunner<T>(plumber.getQueryRunner(query), toolChest);
}
public void close() throws IOException

View File

@ -230,10 +230,11 @@ public class RealtimePlumberSchool implements PlumberSchool
public <T> QueryRunner<T> getQueryRunner(final Query<T> query)
{
final QueryRunnerFactory<T, Query<T>> factory = conglomerate.findFactory(query);
final QueryToolChest<T, Query<T>> toolchest = factory.getToolchest();
final Function<Query<T>, ServiceMetricEvent.Builder> builderFn =
new Function<Query<T>, ServiceMetricEvent.Builder>()
{
private final QueryToolChest<T, Query<T>> toolchest = factory.getToolchest();
@Override
public ServiceMetricEvent.Builder apply(@Nullable Query<T> input)
@ -242,37 +243,39 @@ public class RealtimePlumberSchool implements PlumberSchool
}
};
return factory.mergeRunners(
EXEC,
FunctionalIterable
.create(sinks.values())
.transform(
new Function<Sink, QueryRunner<T>>()
{
@Override
public QueryRunner<T> apply(@Nullable Sink input)
{
return new MetricsEmittingQueryRunner<T>(
emitter,
builderFn,
factory.mergeRunners(
EXEC,
Iterables.transform(
input,
new Function<FireHydrant, QueryRunner<T>>()
{
@Override
public QueryRunner<T> apply(@Nullable FireHydrant input)
{
return factory.createRunner(input.getSegment());
}
}
return toolchest.mergeResults(
factory.mergeRunners(
EXEC,
FunctionalIterable
.create(sinks.values())
.transform(
new Function<Sink, QueryRunner<T>>()
{
@Override
public QueryRunner<T> apply(@Nullable Sink input)
{
return new MetricsEmittingQueryRunner<T>(
emitter,
builderFn,
factory.mergeRunners(
EXEC,
Iterables.transform(
input,
new Function<FireHydrant, QueryRunner<T>>()
{
@Override
public QueryRunner<T> apply(@Nullable FireHydrant input)
{
return factory.createRunner(input.getSegment());
}
}
)
)
)
);
}
}
)
);
}
}
)
)
);
}