port over multi threaded realtime and also fix broken realtime nodes that can't start up

This commit is contained in:
fjy 2013-09-16 16:03:47 -07:00
parent 9b4c3756ed
commit cabae7993d
11 changed files with 77 additions and 15 deletions

View File

@ -24,6 +24,7 @@ import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.common.collect.ImmutableList;
import com.google.inject.Binder;
import com.google.inject.TypeLiteral;
import com.metamx.common.guava.LazySequence;
import com.metamx.common.logger.Logger;
import druid.examples.flights.FlightsFirehoseFactory;
import druid.examples.rand.RandomFirehoseFactory;
@ -34,6 +35,7 @@ import io.druid.client.InventoryView;
import io.druid.client.ServerView;
import io.druid.guice.FireDepartmentsProvider;
import io.druid.guice.JsonConfigProvider;
import io.druid.guice.LazySingleton;
import io.druid.guice.ManageLifecycle;
import io.druid.guice.NoopSegmentPublisherProvider;
import io.druid.guice.RealtimeManagerConfig;
@ -71,7 +73,7 @@ public class RealtimeExampleModule implements DruidModule
new TypeLiteral<List<FireDepartment>>()
{
}
).toProvider(FireDepartmentsProvider.class);
).toProvider(FireDepartmentsProvider.class).in(LazySingleton.class);
binder.bind(RealtimeManager.class).in(ManageLifecycle.class);
}

View File

@ -39,6 +39,7 @@ import io.druid.timeline.DataSegment;
import java.io.File;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
/**
* Stuff that may be needed by a Task in order to conduct its business.
@ -55,6 +56,7 @@ public class TaskToolbox
private final ServerView newSegmentServerView;
private final QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate;
private final MonitorScheduler monitorScheduler;
private final ExecutorService queryExecutorService;
private final SegmentLoader segmentLoader;
private final ObjectMapper objectMapper;
@ -68,6 +70,7 @@ public class TaskToolbox
DataSegmentAnnouncer segmentAnnouncer,
ServerView newSegmentServerView,
QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate,
ExecutorService queryExecutorService,
MonitorScheduler monitorScheduler,
SegmentLoader segmentLoader,
ObjectMapper objectMapper
@ -82,6 +85,7 @@ public class TaskToolbox
this.segmentAnnouncer = segmentAnnouncer;
this.newSegmentServerView = newSegmentServerView;
this.queryRunnerFactoryConglomerate = queryRunnerFactoryConglomerate;
this.queryExecutorService = queryExecutorService;
this.monitorScheduler = monitorScheduler;
this.segmentLoader = segmentLoader;
this.objectMapper = objectMapper;
@ -127,6 +131,11 @@ public class TaskToolbox
return queryRunnerFactoryConglomerate;
}
public ExecutorService getQueryExecutorService()
{
return queryExecutorService;
}
public MonitorScheduler getMonitorScheduler()
{
return monitorScheduler;

View File

@ -24,6 +24,7 @@ import com.google.inject.Inject;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.metrics.MonitorScheduler;
import io.druid.client.ServerView;
import io.druid.guice.annotations.Processing;
import io.druid.indexing.common.actions.TaskActionClientFactory;
import io.druid.indexing.common.config.TaskConfig;
import io.druid.indexing.common.task.Task;
@ -33,6 +34,8 @@ import io.druid.segment.loading.DataSegmentPusher;
import io.druid.segment.loading.SegmentLoader;
import io.druid.server.coordination.DataSegmentAnnouncer;
import java.util.concurrent.ExecutorService;
/**
* Stuff that may be needed by a Task in order to conduct its business.
*/
@ -46,6 +49,7 @@ public class TaskToolboxFactory
private final DataSegmentAnnouncer segmentAnnouncer;
private final ServerView newSegmentServerView;
private final QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate;
private final ExecutorService queryExecutorService;
private final MonitorScheduler monitorScheduler;
private final SegmentLoader segmentLoader;
private final ObjectMapper objectMapper;
@ -60,6 +64,7 @@ public class TaskToolboxFactory
DataSegmentAnnouncer segmentAnnouncer,
ServerView newSegmentServerView,
QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate,
@Processing ExecutorService queryExecutorService,
MonitorScheduler monitorScheduler,
SegmentLoader segmentLoader,
ObjectMapper objectMapper
@ -73,6 +78,7 @@ public class TaskToolboxFactory
this.segmentAnnouncer = segmentAnnouncer;
this.newSegmentServerView = newSegmentServerView;
this.queryRunnerFactoryConglomerate = queryRunnerFactoryConglomerate;
this.queryExecutorService = queryExecutorService;
this.monitorScheduler = monitorScheduler;
this.segmentLoader = segmentLoader;
this.objectMapper = objectMapper;
@ -90,6 +96,7 @@ public class TaskToolboxFactory
segmentAnnouncer,
newSegmentServerView,
queryRunnerFactoryConglomerate,
queryExecutorService,
monitorScheduler,
segmentLoader,
objectMapper

View File

@ -116,6 +116,7 @@ public class RealtimeIndexTask extends AbstractTask
id == null
? makeTaskId(schema.getDataSource(), schema.getShardSpec().getPartitionNum(), new DateTime().toString())
:id,
String.format(
"index_realtime_%s",
schema.getDataSource()
@ -274,6 +275,7 @@ public class RealtimeIndexTask extends AbstractTask
// NOTE: "same" segment.
realtimePlumberSchool.setDataSegmentPusher(toolbox.getSegmentPusher());
realtimePlumberSchool.setConglomerate(toolbox.getQueryRunnerFactoryConglomerate());
realtimePlumberSchool.setQueryExecutorService(toolbox.getQueryExecutorService());
realtimePlumberSchool.setVersioningPolicy(versioningPolicy);
realtimePlumberSchool.setSegmentAnnouncer(lockingSegmentAnnouncer);
realtimePlumberSchool.setSegmentPublisher(segmentPublisher);

View File

@ -137,6 +137,7 @@ public class TaskLifecycleTest
null, // segment announcer
null, // new segment server view
null, // query runner factory conglomerate corporation unionized collective
null, // query executor service
null, // monitor scheduler
null, // segment loader
new DefaultObjectMapper()

View File

@ -169,6 +169,7 @@ public class TaskQueueTest
null,
null,
null,
null,
null
);
@ -230,6 +231,7 @@ public class TaskQueueTest
null,
null,
null,
null,
null
);

View File

@ -115,7 +115,7 @@ public class WorkerTaskMonitorTest
new ThreadPoolTaskRunner(
new TaskToolboxFactory(
new TaskConfig(tmp.toString(), null, null, 0),
null, null, null, null, null, null, null, null, null, jsonMapper
null, null, null, null, null, null, null, null, null, null, jsonMapper
)
),
new WorkerConfig().setCapacity(1)

View File

@ -19,19 +19,24 @@
package io.druid.guice;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.inject.Binder;
import com.google.inject.Module;
import com.google.inject.TypeLiteral;
import com.metamx.common.logger.Logger;
import io.druid.initialization.DruidModule;
import io.druid.segment.realtime.FireDepartment;
import io.druid.segment.realtime.RealtimeManager;
import io.druid.segment.realtime.SegmentPublisher;
import io.druid.segment.realtime.firehose.KafkaFirehoseFactory;
import java.util.Arrays;
import java.util.List;
/**
*/
public class RealtimeModule implements Module
public class RealtimeModule implements DruidModule
{
private static final Logger log = new Logger(RealtimeModule.class);
@ -46,7 +51,18 @@ public class RealtimeModule implements Module
new TypeLiteral<List<FireDepartment>>()
{
}
).toProvider(FireDepartmentsProvider.class);
).toProvider(FireDepartmentsProvider.class).in(LazySingleton.class);
binder.bind(RealtimeManager.class).in(ManageLifecycle.class);
}
@Override
public List<? extends com.fasterxml.jackson.databind.Module> getJacksonModules()
{
return Arrays.<com.fasterxml.jackson.databind.Module>asList(
new SimpleModule("RealtimeModule")
.registerSubtypes(
new NamedType(KafkaFirehoseFactory.class, "kafka-0.7.2")
)
);
}
}

View File

@ -43,6 +43,7 @@ import io.druid.client.DruidServer;
import io.druid.client.ServerView;
import io.druid.common.guava.ThreadRenamingCallable;
import io.druid.common.guava.ThreadRenamingRunnable;
import io.druid.guice.annotations.Processing;
import io.druid.query.MetricsEmittingQueryRunner;
import io.druid.query.Query;
import io.druid.query.QueryRunner;
@ -75,6 +76,7 @@ import org.joda.time.Interval;
import org.joda.time.Period;
import javax.annotation.Nullable;
import javax.validation.constraints.NotNull;
import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException;
@ -91,24 +93,44 @@ import java.util.concurrent.ScheduledExecutorService;
public class RealtimePlumberSchool implements PlumberSchool
{
private static final EmittingLogger log = new EmittingLogger(RealtimePlumberSchool.class);
private static final ListeningExecutorService EXEC = MoreExecutors.sameThreadExecutor();
private final Period windowPeriod;
private final File basePersistDirectory;
private final IndexGranularity segmentGranularity;
private final Object handoffCondition = new Object();
@JacksonInject
@NotNull
private volatile ServiceEmitter emitter;
@JacksonInject
@NotNull
private volatile QueryRunnerFactoryConglomerate conglomerate = null;
@JacksonInject
@NotNull
private volatile DataSegmentPusher dataSegmentPusher = null;
@JacksonInject
@NotNull
private volatile DataSegmentAnnouncer segmentAnnouncer = null;
@JacksonInject
@NotNull
private volatile SegmentPublisher segmentPublisher = null;
@JacksonInject
@NotNull
private volatile ServerView serverView = null;
@JacksonInject
@NotNull
@Processing
private volatile ExecutorService queryExecutorService = null;
private volatile VersioningPolicy versioningPolicy = null;
private volatile RejectionPolicyFactory rejectionPolicyFactory = null;
@JsonCreator
public RealtimePlumberSchool(
@JsonProperty("windowPeriod") Period windowPeriod,
@ -139,42 +161,41 @@ public class RealtimePlumberSchool implements PlumberSchool
this.rejectionPolicyFactory = factory;
}
@JacksonInject
public void setEmitter(ServiceEmitter emitter)
{
this.emitter = emitter;
}
@JacksonInject
public void setConglomerate(QueryRunnerFactoryConglomerate conglomerate)
{
this.conglomerate = conglomerate;
}
@JacksonInject
public void setDataSegmentPusher(DataSegmentPusher dataSegmentPusher)
{
this.dataSegmentPusher = dataSegmentPusher;
}
@JacksonInject
public void setSegmentAnnouncer(DataSegmentAnnouncer segmentAnnouncer)
{
this.segmentAnnouncer = segmentAnnouncer;
}
@JacksonInject
public void setSegmentPublisher(SegmentPublisher segmentPublisher)
{
this.segmentPublisher = segmentPublisher;
}
@JacksonInject
public void setServerView(ServerView serverView)
{
this.serverView = serverView;
}
public void setQueryExecutorService(ExecutorService executorService)
{
this.queryExecutorService = queryExecutorService;
}
@Override
public Plumber findPlumber(final Schema schema, final FireDepartmentMetrics metrics)
{
@ -262,7 +283,7 @@ public class RealtimePlumberSchool implements PlumberSchool
return toolchest.mergeResults(
factory.mergeRunners(
EXEC,
queryExecutorService,
FunctionalIterable
.create(querySinks)
.transform(
@ -277,7 +298,7 @@ public class RealtimePlumberSchool implements PlumberSchool
emitter,
builderFn,
factory.mergeRunners(
EXEC,
MoreExecutors.sameThreadExecutor(),
Iterables.transform(
theSink,
new Function<FireHydrant, QueryRunner<T>>()

View File

@ -23,6 +23,7 @@ import com.google.common.base.Stopwatch;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.io.Files;
import com.google.common.util.concurrent.MoreExecutors;
import com.metamx.common.ISE;
import com.metamx.emitter.service.ServiceEmitter;
import io.druid.client.ServerView;
@ -114,6 +115,7 @@ public class RealtimePlumberSchoolTest
realtimePlumberSchool.setDataSegmentPusher(dataSegmentPusher);
realtimePlumberSchool.setServerView(serverView);
realtimePlumberSchool.setEmitter(emitter);
realtimePlumberSchool.setQueryExecutorService(MoreExecutors.sameThreadExecutor());
plumber = realtimePlumberSchool.findPlumber(schema, new FireDepartmentMetrics());
}

View File

@ -39,7 +39,7 @@ import java.util.List;
/**
*/
@Command(
name = "example realtime",
name = "realtime",
description = "Runs a standalone realtime node for examples, see https://github.com/metamx/druid/wiki/Realtime for a description"
)
public class CliRealtimeExample extends ServerRunnable