Use shared query executor in indexing-service

This commit is contained in:
Gian Merlino 2013-09-13 14:19:42 -07:00
parent 242476a73f
commit 24df6c482d
7 changed files with 24 additions and 19 deletions

View File

@ -43,6 +43,7 @@ import org.jets3t.service.impl.rest.httpclient.RestS3Service;
import java.io.File; import java.io.File;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ExecutorService;
/** /**
* Stuff that may be needed by a Task in order to conduct its business. * Stuff that may be needed by a Task in order to conduct its business.
@ -59,6 +60,7 @@ public class TaskToolbox
private final DataSegmentAnnouncer segmentAnnouncer; private final DataSegmentAnnouncer segmentAnnouncer;
private final ServerView newSegmentServerView; private final ServerView newSegmentServerView;
private final QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate; private final QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate;
private final ExecutorService queryExecutorService;
private final MonitorScheduler monitorScheduler; private final MonitorScheduler monitorScheduler;
private final ObjectMapper objectMapper; private final ObjectMapper objectMapper;
@ -73,6 +75,7 @@ public class TaskToolbox
DataSegmentAnnouncer segmentAnnouncer, DataSegmentAnnouncer segmentAnnouncer,
ServerView newSegmentServerView, ServerView newSegmentServerView,
QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate, QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate,
ExecutorService queryExecutorService,
MonitorScheduler monitorScheduler, MonitorScheduler monitorScheduler,
ObjectMapper objectMapper ObjectMapper objectMapper
) )
@ -87,6 +90,7 @@ public class TaskToolbox
this.segmentAnnouncer = segmentAnnouncer; this.segmentAnnouncer = segmentAnnouncer;
this.newSegmentServerView = newSegmentServerView; this.newSegmentServerView = newSegmentServerView;
this.queryRunnerFactoryConglomerate = queryRunnerFactoryConglomerate; this.queryRunnerFactoryConglomerate = queryRunnerFactoryConglomerate;
this.queryExecutorService = queryExecutorService;
this.monitorScheduler = monitorScheduler; this.monitorScheduler = monitorScheduler;
this.objectMapper = objectMapper; this.objectMapper = objectMapper;
} }
@ -131,6 +135,11 @@ public class TaskToolbox
return queryRunnerFactoryConglomerate; return queryRunnerFactoryConglomerate;
} }
public ExecutorService getQueryExecutorService()
{
return queryExecutorService;
}
public MonitorScheduler getMonitorScheduler() public MonitorScheduler getMonitorScheduler()
{ {
return monitorScheduler; return monitorScheduler;

View File

@ -32,6 +32,8 @@ import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.metrics.MonitorScheduler; import com.metamx.metrics.MonitorScheduler;
import org.jets3t.service.impl.rest.httpclient.RestS3Service; import org.jets3t.service.impl.rest.httpclient.RestS3Service;
import java.util.concurrent.ExecutorService;
/** /**
* Stuff that may be needed by a Task in order to conduct its business. * Stuff that may be needed by a Task in order to conduct its business.
*/ */
@ -46,6 +48,7 @@ public class TaskToolboxFactory
private final DataSegmentAnnouncer segmentAnnouncer; private final DataSegmentAnnouncer segmentAnnouncer;
private final ServerView newSegmentServerView; private final ServerView newSegmentServerView;
private final QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate; private final QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate;
private final ExecutorService queryExecutorService;
private final MonitorScheduler monitorScheduler; private final MonitorScheduler monitorScheduler;
private final ObjectMapper objectMapper; private final ObjectMapper objectMapper;
@ -59,6 +62,7 @@ public class TaskToolboxFactory
DataSegmentAnnouncer segmentAnnouncer, DataSegmentAnnouncer segmentAnnouncer,
ServerView newSegmentServerView, ServerView newSegmentServerView,
QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate, QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate,
ExecutorService queryExecutorService,
MonitorScheduler monitorScheduler, MonitorScheduler monitorScheduler,
ObjectMapper objectMapper ObjectMapper objectMapper
) )
@ -72,6 +76,7 @@ public class TaskToolboxFactory
this.segmentAnnouncer = segmentAnnouncer; this.segmentAnnouncer = segmentAnnouncer;
this.newSegmentServerView = newSegmentServerView; this.newSegmentServerView = newSegmentServerView;
this.queryRunnerFactoryConglomerate = queryRunnerFactoryConglomerate; this.queryRunnerFactoryConglomerate = queryRunnerFactoryConglomerate;
this.queryExecutorService = queryExecutorService;
this.monitorScheduler = monitorScheduler; this.monitorScheduler = monitorScheduler;
this.objectMapper = objectMapper; this.objectMapper = objectMapper;
} }
@ -89,6 +94,7 @@ public class TaskToolboxFactory
segmentAnnouncer, segmentAnnouncer,
newSegmentServerView, newSegmentServerView,
queryRunnerFactoryConglomerate, queryRunnerFactoryConglomerate,
queryExecutorService,
monitorScheduler, monitorScheduler,
objectMapper objectMapper
); );

View File

@ -274,6 +274,7 @@ public class RealtimeIndexTask extends AbstractTask
// NOTE: "same" segment. // NOTE: "same" segment.
realtimePlumberSchool.setDataSegmentPusher(toolbox.getSegmentPusher()); realtimePlumberSchool.setDataSegmentPusher(toolbox.getSegmentPusher());
realtimePlumberSchool.setConglomerate(toolbox.getQueryRunnerFactoryConglomerate()); realtimePlumberSchool.setConglomerate(toolbox.getQueryRunnerFactoryConglomerate());
realtimePlumberSchool.setQueryExecutorService(toolbox.getQueryExecutorService());
realtimePlumberSchool.setVersioningPolicy(versioningPolicy); realtimePlumberSchool.setVersioningPolicy(versioningPolicy);
realtimePlumberSchool.setSegmentAnnouncer(lockingSegmentAnnouncer); realtimePlumberSchool.setSegmentAnnouncer(lockingSegmentAnnouncer);
realtimePlumberSchool.setSegmentPublisher(segmentPublisher); realtimePlumberSchool.setSegmentPublisher(segmentPublisher);

View File

@ -65,7 +65,6 @@ import com.metamx.druid.loading.DataSegmentPusher;
import com.metamx.druid.loading.S3DataSegmentKiller; import com.metamx.druid.loading.S3DataSegmentKiller;
import com.metamx.druid.utils.PropUtils; import com.metamx.druid.utils.PropUtils;
import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.core.Emitters;
import com.metamx.emitter.service.ServiceEmitter; import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.http.client.HttpClient; import com.metamx.http.client.HttpClient;
import com.metamx.http.client.HttpClientConfig; import com.metamx.http.client.HttpClientConfig;
@ -108,7 +107,6 @@ public class ExecutorNode extends BaseServerNode<ExecutorNode>
private RestS3Service s3Service = null; private RestS3Service s3Service = null;
private MonitorScheduler monitorScheduler = null; private MonitorScheduler monitorScheduler = null;
private HttpClient httpClient = null; private HttpClient httpClient = null;
private ServiceEmitter emitter = null;
private TaskConfig taskConfig = null; private TaskConfig taskConfig = null;
private WorkerConfig workerConfig = null; private WorkerConfig workerConfig = null;
private DataSegmentPusher segmentPusher = null; private DataSegmentPusher segmentPusher = null;
@ -143,7 +141,6 @@ public class ExecutorNode extends BaseServerNode<ExecutorNode>
public void doInit() throws Exception public void doInit() throws Exception
{ {
initializeHttpClient(); initializeHttpClient();
initializeEmitter();
initializeS3Service(); initializeS3Service();
initializeMergerConfig(); initializeMergerConfig();
initializeServiceDiscovery(); initializeServiceDiscovery();
@ -173,7 +170,7 @@ public class ExecutorNode extends BaseServerNode<ExecutorNode>
root.addFilter(GuiceFilter.class, "/druid/worker/v1/*", 0); root.addFilter(GuiceFilter.class, "/druid/worker/v1/*", 0);
root.addServlet( root.addServlet(
new ServletHolder( new ServletHolder(
new QueryServlet(getJsonMapper(), getSmileMapper(), taskRunner, emitter, getRequestLogger()) new QueryServlet(getJsonMapper(), getSmileMapper(), taskRunner, getEmitter(), getRequestLogger())
), ),
"/druid/v2/*" "/druid/v2/*"
); );
@ -186,7 +183,7 @@ public class ExecutorNode extends BaseServerNode<ExecutorNode>
final ScheduledExecutorFactory scheduledExecutorFactory = ScheduledExecutors.createFactory(lifecycle); final ScheduledExecutorFactory scheduledExecutorFactory = ScheduledExecutors.createFactory(lifecycle);
final ScheduledExecutorService globalScheduledExec = scheduledExecutorFactory.create(1, "Global--%d"); final ScheduledExecutorService globalScheduledExec = scheduledExecutorFactory.create(1, "Global--%d");
this.monitorScheduler = new MonitorScheduler( this.monitorScheduler = new MonitorScheduler(
configFactory.build(MonitorSchedulerConfig.class), globalScheduledExec, emitter, ImmutableList.<Monitor>of() configFactory.build(MonitorSchedulerConfig.class), globalScheduledExec, getEmitter(), ImmutableList.<Monitor>of()
); );
lifecycle.addManagedInstance(monitorScheduler); lifecycle.addManagedInstance(monitorScheduler);
} }
@ -272,18 +269,6 @@ public class ExecutorNode extends BaseServerNode<ExecutorNode>
} }
} }
private void initializeEmitter()
{
if (emitter == null) {
emitter = new ServiceEmitter(
PropUtils.getProperty(props, "druid.service"),
PropUtils.getProperty(props, "druid.host"),
Emitters.create(props, httpClient, getJsonMapper(), lifecycle)
);
}
EmittingLogger.registerEmitter(emitter);
}
private void initializeS3Service() throws S3ServiceException private void initializeS3Service() throws S3ServiceException
{ {
if (s3Service == null) { if (s3Service == null) {
@ -331,13 +316,14 @@ public class ExecutorNode extends BaseServerNode<ExecutorNode>
), ),
getJsonMapper() getJsonMapper()
), ),
emitter, getEmitter(),
s3Service, s3Service,
segmentPusher, segmentPusher,
dataSegmentKiller, dataSegmentKiller,
getAnnouncer(), getAnnouncer(),
getServerView(), getServerView(),
getConglomerate(), getConglomerate(),
getQueryExecutorService(),
monitorScheduler, monitorScheduler,
getJsonMapper() getJsonMapper()
); );

View File

@ -385,7 +385,7 @@ public class RemoteTaskRunnerTest
{ {
return null; return null;
} }
}, null, null, null, null, null, null, null, null, null, jsonMapper }, null, null, null, null, null, null, null, null, null, null, jsonMapper
), Executors.newSingleThreadExecutor() ), Executors.newSingleThreadExecutor()
), ),
Executors.newSingleThreadExecutor() Executors.newSingleThreadExecutor()

View File

@ -160,6 +160,7 @@ public class TaskLifecycleTest
null, // segment announcer null, // segment announcer
null, // new segment server view null, // new segment server view
null, // query runner factory conglomerate corporation unionized collective null, // query runner factory conglomerate corporation unionized collective
null, // query executor service
null, // monitor scheduler null, // monitor scheduler
new DefaultObjectMapper() new DefaultObjectMapper()
); );

View File

@ -169,6 +169,7 @@ public class TaskQueueTest
null, null,
null, null,
null, null,
null,
null null
); );
@ -230,6 +231,7 @@ public class TaskQueueTest
null, null,
null, null,
null, null,
null,
null null
); );