update emitter version and fix some NPEs

This commit is contained in:
Fangjin Yang 2013-04-08 11:23:08 -07:00
parent 8b100248a8
commit 925c104dd2
8 changed files with 68 additions and 24 deletions

View File

@ -52,7 +52,7 @@ import com.metamx.phonebook.PhoneBook;
import org.I0Itec.zkclient.ZkClient; import org.I0Itec.zkclient.ZkClient;
import org.joda.time.Duration;
import org.mortbay.jetty.Server; import org.mortbay.jetty.Server;
import org.skife.config.ConfigurationObjectFactory; import org.skife.config.ConfigurationObjectFactory;
@ -333,7 +333,10 @@ public abstract class QueryableNode<T extends QueryableNode> extends Registering
{ {
if (emitter == null) { if (emitter == null) {
final HttpClient httpClient = HttpClientInit.createClient( final HttpClient httpClient = HttpClientInit.createClient(
HttpClientConfig.builder().withNumConnections(1).build(), lifecycle HttpClientConfig.builder()
.withNumConnections(1)
.withReadTimeout(new Duration(PropUtils.getProperty(props, "druid.emitter.timeOut")))
.build(), lifecycle
); );
setEmitter( setEmitter(
@ -358,7 +361,7 @@ public abstract class QueryableNode<T extends QueryableNode> extends Registering
@LifecycleStart @LifecycleStart
public synchronized void start() throws Exception public synchronized void start() throws Exception
{ {
if (! initialized) { if (!initialized) {
init(); init();
} }

View File

@ -116,6 +116,7 @@ import com.netflix.curator.framework.recipes.cache.PathChildrenCache;
import org.jets3t.service.S3ServiceException; import org.jets3t.service.S3ServiceException;
import org.jets3t.service.impl.rest.httpclient.RestS3Service; import org.jets3t.service.impl.rest.httpclient.RestS3Service;
import org.jets3t.service.security.AWSCredentials; import org.jets3t.service.security.AWSCredentials;
import org.joda.time.Duration;
import org.mortbay.jetty.Server; import org.mortbay.jetty.Server;
import org.mortbay.jetty.servlet.Context; import org.mortbay.jetty.servlet.Context;
import org.mortbay.jetty.servlet.DefaultServlet; import org.mortbay.jetty.servlet.DefaultServlet;
@ -314,10 +315,12 @@ public class IndexerCoordinatorNode extends BaseServerNode<IndexerCoordinatorNod
final Context staticContext = new Context(server, "/static", Context.SESSIONS); final Context staticContext = new Context(server, "/static", Context.SESSIONS);
staticContext.addServlet(new ServletHolder(new DefaultServlet()), "/*"); staticContext.addServlet(new ServletHolder(new DefaultServlet()), "/*");
ResourceCollection resourceCollection = new ResourceCollection(new String[] { ResourceCollection resourceCollection = new ResourceCollection(
new String[]{
IndexerCoordinatorNode.class.getClassLoader().getResource("static").toExternalForm(), IndexerCoordinatorNode.class.getClassLoader().getResource("static").toExternalForm(),
IndexerCoordinatorNode.class.getClassLoader().getResource("indexer_static").toExternalForm() IndexerCoordinatorNode.class.getClassLoader().getResource("indexer_static").toExternalForm()
}); }
);
staticContext.setBaseResource(resourceCollection); staticContext.setBaseResource(resourceCollection);
// TODO -- Need a QueryServlet and some kind of QuerySegmentWalker if we want to support querying tasks // TODO -- Need a QueryServlet and some kind of QuerySegmentWalker if we want to support querying tasks
@ -448,7 +451,14 @@ public class IndexerCoordinatorNode extends BaseServerNode<IndexerCoordinatorNod
{ {
if (emitter == null) { if (emitter == null) {
final HttpClient httpClient = HttpClientInit.createClient( final HttpClient httpClient = HttpClientInit.createClient(
HttpClientConfig.builder().withNumConnections(1).build(), lifecycle HttpClientConfig.builder().withNumConnections(1).withReadTimeout(
new Duration(
PropUtils.getProperty(
props,
"druid.emitter.timeOut"
)
)
).build(), lifecycle
); );
emitter = new ServiceEmitter( emitter = new ServiceEmitter(
@ -602,7 +612,11 @@ public class IndexerCoordinatorNode extends BaseServerNode<IndexerCoordinatorNod
taskStorage = new HeapMemoryTaskStorage(); taskStorage = new HeapMemoryTaskStorage();
} else if (config.getStorageImpl().equals("db")) { } else if (config.getStorageImpl().equals("db")) {
final IndexerDbConnectorConfig dbConnectorConfig = configFactory.build(IndexerDbConnectorConfig.class); final IndexerDbConnectorConfig dbConnectorConfig = configFactory.build(IndexerDbConnectorConfig.class);
taskStorage = new DbTaskStorage(getJsonMapper(), dbConnectorConfig, new DbConnector(dbConnectorConfig).getDBI()); taskStorage = new DbTaskStorage(
getJsonMapper(),
dbConnectorConfig,
new DbConnector(dbConnectorConfig).getDBI()
);
} else { } else {
throw new ISE("Invalid storage implementation: %s", config.getStorageImpl()); throw new ISE("Invalid storage implementation: %s", config.getStorageImpl());
} }
@ -754,9 +768,12 @@ public class IndexerCoordinatorNode extends BaseServerNode<IndexerCoordinatorNod
jsonMapper = new DefaultObjectMapper(); jsonMapper = new DefaultObjectMapper();
smileMapper = new DefaultObjectMapper(new SmileFactory()); smileMapper = new DefaultObjectMapper(new SmileFactory());
smileMapper.getJsonFactory().setCodec(smileMapper); smileMapper.getJsonFactory().setCodec(smileMapper);
} } else if (jsonMapper == null || smileMapper == null) {
else if (jsonMapper == null || smileMapper == null) { throw new ISE(
throw new ISE("Only jsonMapper[%s] or smileMapper[%s] was set, must set neither or both.", jsonMapper, smileMapper); "Only jsonMapper[%s] or smileMapper[%s] was set, must set neither or both.",
jsonMapper,
smileMapper
);
} }
if (lifecycle == null) { if (lifecycle == null) {

View File

@ -81,6 +81,7 @@ import com.netflix.curator.x.discovery.ServiceProvider;
import org.jets3t.service.S3ServiceException; import org.jets3t.service.S3ServiceException;
import org.jets3t.service.impl.rest.httpclient.RestS3Service; import org.jets3t.service.impl.rest.httpclient.RestS3Service;
import org.jets3t.service.security.AWSCredentials; import org.jets3t.service.security.AWSCredentials;
import org.joda.time.Duration;
import org.mortbay.jetty.Server; import org.mortbay.jetty.Server;
import org.mortbay.jetty.servlet.Context; import org.mortbay.jetty.servlet.Context;
import org.mortbay.jetty.servlet.DefaultServlet; import org.mortbay.jetty.servlet.DefaultServlet;
@ -317,7 +318,9 @@ public class WorkerNode extends BaseServerNode<WorkerNode>
{ {
if (httpClient == null) { if (httpClient == null) {
httpClient = HttpClientInit.createClient( httpClient = HttpClientInit.createClient(
HttpClientConfig.builder().withNumConnections(1).build(), lifecycle HttpClientConfig.builder().withNumConnections(1)
.withReadTimeout(new Duration(PropUtils.getProperty(props, "druid.emitter.timeOut")))
.build(), lifecycle
); );
} }
} }
@ -336,7 +339,7 @@ public class WorkerNode extends BaseServerNode<WorkerNode>
private void initializeS3Service() throws S3ServiceException private void initializeS3Service() throws S3ServiceException
{ {
if(s3Service == null) { if (s3Service == null) {
s3Service = new RestS3Service( s3Service = new RestS3Service(
new AWSCredentials( new AWSCredentials(
PropUtils.getProperty(props, "com.metamx.aws.accessKey"), PropUtils.getProperty(props, "com.metamx.aws.accessKey"),
@ -527,9 +530,12 @@ public class WorkerNode extends BaseServerNode<WorkerNode>
jsonMapper = new DefaultObjectMapper(); jsonMapper = new DefaultObjectMapper();
smileMapper = new DefaultObjectMapper(new SmileFactory()); smileMapper = new DefaultObjectMapper(new SmileFactory());
smileMapper.getJsonFactory().setCodec(smileMapper); smileMapper.getJsonFactory().setCodec(smileMapper);
} } else if (jsonMapper == null || smileMapper == null) {
else if (jsonMapper == null || smileMapper == null) { throw new ISE(
throw new ISE("Only jsonMapper[%s] or smileMapper[%s] was set, must set neither or both.", jsonMapper, smileMapper); "Only jsonMapper[%s] or smileMapper[%s] was set, must set neither or both.",
jsonMapper,
smileMapper
);
} }
if (lifecycle == null) { if (lifecycle == null) {

View File

@ -64,7 +64,7 @@
<dependency> <dependency>
<groupId>com.metamx</groupId> <groupId>com.metamx</groupId>
<artifactId>http-client</artifactId> <artifactId>http-client</artifactId>
<version>0.7.0</version> <version>0.7.1</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>com.metamx</groupId> <groupId>com.metamx</groupId>

View File

@ -218,6 +218,13 @@ public class ZkCoordinator implements DataSegmentChangeHandler
DataSegment segment = jsonMapper.readValue(file, DataSegment.class); DataSegment segment = jsonMapper.readValue(file, DataSegment.class);
if (serverManager.isSegmentCached(segment)) { if (serverManager.isSegmentCached(segment)) {
addSegment(segment); addSegment(segment);
} else {
log.warn("Unable to find cache file for %s. Deleting lookup entry", segment.getIdentifier());
File segmentInfoCacheFile = new File(config.getSegmentInfoCacheDirectory(), segment.getIdentifier());
if (!segmentInfoCacheFile.delete()) {
log.warn("Unable to delete segmentInfoCacheFile[%s]", segmentInfoCacheFile);
}
} }
} }
catch (Exception e) { catch (Exception e) {

View File

@ -39,8 +39,8 @@ import com.metamx.druid.coordination.ZkCoordinatorConfig;
import com.metamx.druid.initialization.Initialization; import com.metamx.druid.initialization.Initialization;
import com.metamx.druid.initialization.ServerInit; import com.metamx.druid.initialization.ServerInit;
import com.metamx.druid.jackson.DefaultObjectMapper; import com.metamx.druid.jackson.DefaultObjectMapper;
import com.metamx.druid.loading.SegmentLoaderConfig;
import com.metamx.druid.loading.SegmentLoader; import com.metamx.druid.loading.SegmentLoader;
import com.metamx.druid.loading.SegmentLoaderConfig;
import com.metamx.druid.metrics.ServerMonitor; import com.metamx.druid.metrics.ServerMonitor;
import com.metamx.druid.query.MetricsEmittingExecutorService; import com.metamx.druid.query.MetricsEmittingExecutorService;
import com.metamx.druid.query.QueryRunnerFactoryConglomerate; import com.metamx.druid.query.QueryRunnerFactoryConglomerate;
@ -48,13 +48,10 @@ import com.metamx.druid.utils.PropUtils;
import com.metamx.emitter.service.ServiceEmitter; import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceMetricEvent; import com.metamx.emitter.service.ServiceMetricEvent;
import com.metamx.metrics.Monitor; import com.metamx.metrics.Monitor;
import org.jets3t.service.S3ServiceException; import org.jets3t.service.S3ServiceException;
import org.jets3t.service.impl.rest.httpclient.RestS3Service; import org.jets3t.service.impl.rest.httpclient.RestS3Service;
import org.jets3t.service.security.AWSCredentials; import org.jets3t.service.security.AWSCredentials;
import org.mortbay.jetty.servlet.Context; import org.mortbay.jetty.servlet.Context;
import org.mortbay.jetty.servlet.DefaultServlet;
import org.mortbay.jetty.servlet.ServletHolder; import org.mortbay.jetty.servlet.ServletHolder;
import org.skife.config.ConfigurationObjectFactory; import org.skife.config.ConfigurationObjectFactory;

View File

@ -73,6 +73,7 @@ import com.netflix.curator.x.discovery.ServiceDiscovery;
import com.netflix.curator.x.discovery.ServiceProvider; import com.netflix.curator.x.discovery.ServiceProvider;
import org.I0Itec.zkclient.ZkClient; import org.I0Itec.zkclient.ZkClient;
import org.joda.time.Duration;
import org.mortbay.jetty.Server; import org.mortbay.jetty.Server;
import org.mortbay.jetty.servlet.Context; import org.mortbay.jetty.servlet.Context;
import org.mortbay.jetty.servlet.DefaultServlet; import org.mortbay.jetty.servlet.DefaultServlet;
@ -102,7 +103,14 @@ public class MasterMain
final Lifecycle lifecycle = new Lifecycle(); final Lifecycle lifecycle = new Lifecycle();
final HttpClient httpClient = HttpClientInit.createClient( final HttpClient httpClient = HttpClientInit.createClient(
HttpClientConfig.builder().withNumConnections(1).build(), lifecycle HttpClientConfig.builder().withNumConnections(1).withReadTimeout(
new Duration(
PropUtils.getProperty(
props,
"druid.emitter.timeOut"
)
)
).build(), lifecycle
); );
final ServiceEmitter emitter = new ServiceEmitter( final ServiceEmitter emitter = new ServiceEmitter(

View File

@ -22,6 +22,7 @@ package com.metamx.druid.index.brita;
import com.google.common.base.Function; import com.google.common.base.Function;
import com.google.common.base.Predicate; import com.google.common.base.Predicate;
import com.metamx.common.guava.FunctionalIterable; import com.metamx.common.guava.FunctionalIterable;
import com.metamx.druid.kv.Indexed;
import it.uniroma3.mat.extendedset.intset.ImmutableConciseSet; import it.uniroma3.mat.extendedset.intset.ImmutableConciseSet;
import javax.annotation.Nullable; import javax.annotation.Nullable;
@ -45,8 +46,13 @@ class DimensionPredicateFilter implements Filter
@Override @Override
public ImmutableConciseSet goConcise(final BitmapIndexSelector selector) public ImmutableConciseSet goConcise(final BitmapIndexSelector selector)
{ {
Indexed<String> dimValues = selector.getDimensionValues(dimension);
if (dimValues == null || dimValues.size() == 0 || predicate == null) {
return new ImmutableConciseSet();
}
return ImmutableConciseSet.union( return ImmutableConciseSet.union(
FunctionalIterable.create(selector.getDimensionValues(dimension)) FunctionalIterable.create(dimValues)
.filter(predicate) .filter(predicate)
.transform( .transform(
new Function<String, ImmutableConciseSet>() new Function<String, ImmutableConciseSet>()