diff --git a/client/src/main/java/com/metamx/druid/QueryableNode.java b/client/src/main/java/com/metamx/druid/QueryableNode.java index a9a97ec64af..2cf45630529 100644 --- a/client/src/main/java/com/metamx/druid/QueryableNode.java +++ b/client/src/main/java/com/metamx/druid/QueryableNode.java @@ -52,7 +52,7 @@ import com.metamx.phonebook.PhoneBook; import org.I0Itec.zkclient.ZkClient; - +import org.joda.time.Duration; import org.mortbay.jetty.Server; import org.skife.config.ConfigurationObjectFactory; @@ -333,7 +333,10 @@ public abstract class QueryableNode extends Registering { if (emitter == null) { final HttpClient httpClient = HttpClientInit.createClient( - HttpClientConfig.builder().withNumConnections(1).build(), lifecycle + HttpClientConfig.builder() + .withNumConnections(1) + .withReadTimeout(new Duration(PropUtils.getProperty(props, "druid.emitter.timeOut"))) + .build(), lifecycle ); setEmitter( @@ -358,7 +361,7 @@ public abstract class QueryableNode extends Registering @LifecycleStart public synchronized void start() throws Exception { - if (! initialized) { + if (!initialized) { init(); } diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorNode.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorNode.java index b862644c8c1..23263bcf31e 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorNode.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorNode.java @@ -116,6 +116,7 @@ import com.netflix.curator.framework.recipes.cache.PathChildrenCache; import org.jets3t.service.S3ServiceException; import org.jets3t.service.impl.rest.httpclient.RestS3Service; import org.jets3t.service.security.AWSCredentials; +import org.joda.time.Duration; import org.mortbay.jetty.Server; import org.mortbay.jetty.servlet.Context; import org.mortbay.jetty.servlet.DefaultServlet; @@ -314,10 +315,12 @@ public class IndexerCoordinatorNode extends BaseServerNode { if (httpClient == null) { httpClient = HttpClientInit.createClient( - HttpClientConfig.builder().withNumConnections(1).build(), lifecycle + HttpClientConfig.builder().withNumConnections(1) + .withReadTimeout(new Duration(PropUtils.getProperty(props, "druid.emitter.timeOut"))) + .build(), lifecycle ); } } @@ -336,7 +339,7 @@ public class WorkerNode extends BaseServerNode private void initializeS3Service() throws S3ServiceException { - if(s3Service == null) { + if (s3Service == null) { s3Service = new RestS3Service( new AWSCredentials( PropUtils.getProperty(props, "com.metamx.aws.accessKey"), @@ -527,9 +530,12 @@ public class WorkerNode extends BaseServerNode jsonMapper = new DefaultObjectMapper(); smileMapper = new DefaultObjectMapper(new SmileFactory()); smileMapper.getJsonFactory().setCodec(smileMapper); - } - else if (jsonMapper == null || smileMapper == null) { - throw new ISE("Only jsonMapper[%s] or smileMapper[%s] was set, must set neither or both.", jsonMapper, smileMapper); + } else if (jsonMapper == null || smileMapper == null) { + throw new ISE( + "Only jsonMapper[%s] or smileMapper[%s] was set, must set neither or both.", + jsonMapper, + smileMapper + ); } if (lifecycle == null) { diff --git a/pom.xml b/pom.xml index 3d206ddf77d..811b7ab8f3c 100644 --- a/pom.xml +++ b/pom.xml @@ -64,7 +64,7 @@ com.metamx http-client - 0.7.0 + 0.7.1 com.metamx diff --git a/server/src/main/java/com/metamx/druid/coordination/ZkCoordinator.java b/server/src/main/java/com/metamx/druid/coordination/ZkCoordinator.java index 11fa98093f7..2068ee2b839 100644 --- a/server/src/main/java/com/metamx/druid/coordination/ZkCoordinator.java +++ b/server/src/main/java/com/metamx/druid/coordination/ZkCoordinator.java @@ -218,6 +218,13 @@ public class ZkCoordinator implements DataSegmentChangeHandler DataSegment segment = jsonMapper.readValue(file, DataSegment.class); if (serverManager.isSegmentCached(segment)) { addSegment(segment); + } else { + log.warn("Unable to find cache file for %s. Deleting lookup entry", segment.getIdentifier()); + + File segmentInfoCacheFile = new File(config.getSegmentInfoCacheDirectory(), segment.getIdentifier()); + if (!segmentInfoCacheFile.delete()) { + log.warn("Unable to delete segmentInfoCacheFile[%s]", segmentInfoCacheFile); + } } } catch (Exception e) { diff --git a/server/src/main/java/com/metamx/druid/http/ComputeNode.java b/server/src/main/java/com/metamx/druid/http/ComputeNode.java index 0b801cd6aad..c5c70464857 100644 --- a/server/src/main/java/com/metamx/druid/http/ComputeNode.java +++ b/server/src/main/java/com/metamx/druid/http/ComputeNode.java @@ -39,8 +39,8 @@ import com.metamx.druid.coordination.ZkCoordinatorConfig; import com.metamx.druid.initialization.Initialization; import com.metamx.druid.initialization.ServerInit; import com.metamx.druid.jackson.DefaultObjectMapper; -import com.metamx.druid.loading.SegmentLoaderConfig; import com.metamx.druid.loading.SegmentLoader; +import com.metamx.druid.loading.SegmentLoaderConfig; import com.metamx.druid.metrics.ServerMonitor; import com.metamx.druid.query.MetricsEmittingExecutorService; import com.metamx.druid.query.QueryRunnerFactoryConglomerate; @@ -48,13 +48,10 @@ import com.metamx.druid.utils.PropUtils; import com.metamx.emitter.service.ServiceEmitter; import com.metamx.emitter.service.ServiceMetricEvent; import com.metamx.metrics.Monitor; - - import org.jets3t.service.S3ServiceException; import org.jets3t.service.impl.rest.httpclient.RestS3Service; import org.jets3t.service.security.AWSCredentials; import org.mortbay.jetty.servlet.Context; -import org.mortbay.jetty.servlet.DefaultServlet; import org.mortbay.jetty.servlet.ServletHolder; import org.skife.config.ConfigurationObjectFactory; diff --git a/server/src/main/java/com/metamx/druid/http/MasterMain.java b/server/src/main/java/com/metamx/druid/http/MasterMain.java index 5750cc3560a..19b64b42247 100644 --- a/server/src/main/java/com/metamx/druid/http/MasterMain.java +++ b/server/src/main/java/com/metamx/druid/http/MasterMain.java @@ -73,6 +73,7 @@ import com.netflix.curator.x.discovery.ServiceDiscovery; import com.netflix.curator.x.discovery.ServiceProvider; import org.I0Itec.zkclient.ZkClient; +import org.joda.time.Duration; import org.mortbay.jetty.Server; import org.mortbay.jetty.servlet.Context; import org.mortbay.jetty.servlet.DefaultServlet; @@ -102,7 +103,14 @@ public class MasterMain final Lifecycle lifecycle = new Lifecycle(); final HttpClient httpClient = HttpClientInit.createClient( - HttpClientConfig.builder().withNumConnections(1).build(), lifecycle + HttpClientConfig.builder().withNumConnections(1).withReadTimeout( + new Duration( + PropUtils.getProperty( + props, + "druid.emitter.timeOut" + ) + ) + ).build(), lifecycle ); final ServiceEmitter emitter = new ServiceEmitter( diff --git a/server/src/main/java/com/metamx/druid/index/brita/DimensionPredicateFilter.java b/server/src/main/java/com/metamx/druid/index/brita/DimensionPredicateFilter.java index 69c636ac6a1..d88dd06d3dc 100644 --- a/server/src/main/java/com/metamx/druid/index/brita/DimensionPredicateFilter.java +++ b/server/src/main/java/com/metamx/druid/index/brita/DimensionPredicateFilter.java @@ -22,6 +22,7 @@ package com.metamx.druid.index.brita; import com.google.common.base.Function; import com.google.common.base.Predicate; import com.metamx.common.guava.FunctionalIterable; +import com.metamx.druid.kv.Indexed; import it.uniroma3.mat.extendedset.intset.ImmutableConciseSet; import javax.annotation.Nullable; @@ -45,8 +46,13 @@ class DimensionPredicateFilter implements Filter @Override public ImmutableConciseSet goConcise(final BitmapIndexSelector selector) { + Indexed dimValues = selector.getDimensionValues(dimension); + if (dimValues == null || dimValues.size() == 0 || predicate == null) { + return new ImmutableConciseSet(); + } + return ImmutableConciseSet.union( - FunctionalIterable.create(selector.getDimensionValues(dimension)) + FunctionalIterable.create(dimValues) .filter(predicate) .transform( new Function()