diff --git a/client/src/main/java/com/metamx/druid/client/CachingClusteredClient.java b/client/src/main/java/com/metamx/druid/client/CachingClusteredClient.java index 6e3c052c4b3..bb1b18ecb7c 100644 --- a/client/src/main/java/com/metamx/druid/client/CachingClusteredClient.java +++ b/client/src/main/java/com/metamx/druid/client/CachingClusteredClient.java @@ -56,7 +56,6 @@ import com.metamx.druid.query.segment.MultipleSpecificSegmentSpec; import com.metamx.druid.query.segment.SegmentDescriptor; import com.metamx.druid.result.BySegmentResultValueClass; import com.metamx.druid.result.Result; - import org.joda.time.DateTime; import org.joda.time.Interval; @@ -121,7 +120,8 @@ public class CachingClusteredClient implements QueryRunner final Map cachePopulatorMap = Maps.newHashMap(); final boolean useCache = Boolean.parseBoolean(query.getContextValue("useCache", "true")) && strategy != null; - final boolean populateCache = Boolean.parseBoolean(query.getContextValue("populateCache", "true")) && strategy != null; + final boolean populateCache = Boolean.parseBoolean(query.getContextValue("populateCache", "true")) + && strategy != null; final boolean isBySegment = Boolean.parseBoolean(query.getContextValue("bySegment", "false")); final Query rewrittenQuery; @@ -161,22 +161,22 @@ public class CachingClusteredClient implements QueryRunner } final byte[] queryCacheKey; - if(strategy != null) { + if (strategy != null) { queryCacheKey = strategy.computeCacheKey(query); } else { queryCacheKey = null; } // Pull cached segments from cache and remove from set of segments to query - if(useCache && queryCacheKey != null) { + if (useCache && queryCacheKey != null) { Map, Cache.NamedKey> cacheKeys = Maps.newHashMap(); - for(Pair e : segments) { + for (Pair e : segments) { cacheKeys.put(e, computeSegmentCacheKey(e.lhs.getSegment().getIdentifier(), e.rhs, queryCacheKey)); } Map cachedValues = cache.getBulk(cacheKeys.values()); - for(Map.Entry, Cache.NamedKey> entry : cacheKeys.entrySet()) { + for (Map.Entry, Cache.NamedKey> entry : cacheKeys.entrySet()) { Pair segment = entry.getKey(); Cache.NamedKey segmentCacheKey = entry.getValue(); @@ -191,8 +191,7 @@ public class CachingClusteredClient implements QueryRunner // remove cached segment from set of segments to query segments.remove(segment); - } - else { + } else { final String segmentIdentifier = selector.getSegment().getIdentifier(); cachePopulatorMap.put( String.format("%s_%s", segmentIdentifier, segmentQueryInterval), @@ -203,22 +202,22 @@ public class CachingClusteredClient implements QueryRunner } // Compile list of all segments not pulled from cache - for(Pair segment : segments) { + for (Pair segment : segments) { final QueryableDruidServer queryableDruidServer = segment.lhs.pick(); if (queryableDruidServer == null) { log.error("No servers found for %s?! How can this be?!", segment.rhs); + } else { + final DruidServer server = queryableDruidServer.getServer(); + List descriptors = serverSegments.get(server); + + if (descriptors == null) { + descriptors = Lists.newArrayList(); + serverSegments.put(server, descriptors); + } + + descriptors.add(segment.rhs); } - - final DruidServer server = queryableDruidServer.getServer(); - List descriptors = serverSegments.get(server); - - if (descriptors == null) { - descriptors = Lists.newArrayList(); - serverSegments.put(server, descriptors); - } - - descriptors.add(segment.rhs); } return new LazySequence( @@ -242,8 +241,7 @@ public class CachingClusteredClient implements QueryRunner ); if (strategy == null) { return toolChest.mergeSequences(seq); - } - else { + } else { return strategy.mergeSequences(seq); } } @@ -356,7 +354,11 @@ public class CachingClusteredClient implements QueryRunner ); } - private Cache.NamedKey computeSegmentCacheKey(String segmentIdentifier, SegmentDescriptor descriptor, byte[] queryCacheKey) + private Cache.NamedKey computeSegmentCacheKey( + String segmentIdentifier, + SegmentDescriptor descriptor, + byte[] queryCacheKey + ) { final Interval segmentQueryInterval = descriptor.getInterval(); final byte[] versionBytes = descriptor.getVersion().getBytes(); diff --git a/client/src/main/java/com/metamx/druid/client/ServerInventoryView.java b/client/src/main/java/com/metamx/druid/client/ServerInventoryView.java index dc1156fe966..c98091e0560 100644 --- a/client/src/main/java/com/metamx/druid/client/ServerInventoryView.java +++ b/client/src/main/java/com/metamx/druid/client/ServerInventoryView.java @@ -149,9 +149,8 @@ public class ServerInventoryView implements ServerView, InventoryView public DruidServer addInventory(final DruidServer container, String inventoryKey, final DataSegment inventory) { log.info("Server[%s] added segment[%s]", container.getName(), inventoryKey); - final DataSegment segment = container.getSegment(inventoryKey); - if (segment != null) { + if (container.getSegment(inventoryKey) != null) { log.warn( "Not adding or running callbacks for existing segment[%s] on server[%s]", inventoryKey, diff --git a/common/src/main/java/com/metamx/druid/VersionedIntervalTimeline.java b/common/src/main/java/com/metamx/druid/VersionedIntervalTimeline.java index bee025c8270..fcbdfc6502b 100644 --- a/common/src/main/java/com/metamx/druid/VersionedIntervalTimeline.java +++ b/common/src/main/java/com/metamx/druid/VersionedIntervalTimeline.java @@ -26,6 +26,7 @@ import com.metamx.common.logger.Logger; import com.metamx.druid.partition.ImmutablePartitionHolder; import com.metamx.druid.partition.PartitionChunk; import com.metamx.druid.partition.PartitionHolder; +import org.joda.time.DateTime; import org.joda.time.Interval; import java.util.ArrayList; @@ -78,7 +79,7 @@ public class VersionedIntervalTimeline this.versionComparator = versionComparator; } - public void add(Interval interval, VersionType version, PartitionChunk object) + public void add(final Interval interval, VersionType version, PartitionChunk object) { try { lock.writeLock().lock(); @@ -278,6 +279,13 @@ public class VersionedIntervalTimeline addIntervalToTimeline(interval, entry, timeline); } + /** + * + * @param timeline + * @param key + * @param entry + * @return boolean flag indicating whether or not we inserted or discarded something + */ private boolean addAtKey( NavigableMap timeline, Interval key, @@ -292,7 +300,7 @@ public class VersionedIntervalTimeline return false; } - while (currKey != null && currKey.overlaps(entryInterval)) { + while (entryInterval != null && currKey != null && currKey.overlaps(entryInterval)) { Interval nextKey = timeline.higherKey(currKey); int versionCompare = versionComparator.compare( @@ -311,7 +319,7 @@ public class VersionedIntervalTimeline if (entryInterval.getEnd().isAfter(currKey.getEnd())) { entryInterval = new Interval(currKey.getEnd(), entryInterval.getEnd()); } else { - entryInterval = null; + entryInterval = null; // discard this entry } } } else if (versionCompare > 0) { @@ -491,4 +499,9 @@ public class VersionedIntervalTimeline return partitionHolder; } } + + public static void main(String[] args) + { + System.out.println(new Interval(new DateTime(), (DateTime) null)); + } } diff --git a/common/src/test/java/com/metamx/druid/VersionedIntervalTimelineTest.java b/common/src/test/java/com/metamx/druid/VersionedIntervalTimelineTest.java index 96d0fa01ce1..80068a6dd6e 100644 --- a/common/src/test/java/com/metamx/druid/VersionedIntervalTimelineTest.java +++ b/common/src/test/java/com/metamx/druid/VersionedIntervalTimelineTest.java @@ -1068,6 +1068,27 @@ public class VersionedIntervalTimelineTest ); } + // |----3---||---1---| + // |---2---| + @Test + public void testOverlapCausesNullEntries() throws Exception + { + timeline = makeStringIntegerTimeline(); + + add("2011-01-01T12/2011-01-02", "3", 3); + add("2011-01-02/3011-01-03", "1", 1); + add("2011-01-01/2011-01-02", "2", 2); + + assertValues( + Arrays.asList( + createExpected("2011-01-01/2011-01-01T12", "2", 2), + createExpected("2011-01-01T12/2011-01-02", "3", 3), + createExpected("2011-01-02/3011-01-03", "1", 1) + ), + timeline.lookup(new Interval("2011-01-01/3011-01-03")) + ); + } + // 1|----| |----| // 2|------| |------| // 3|------------------|