fix day 1 issue with VIT

This commit is contained in:
fjy 2013-05-28 14:26:49 -07:00
parent 54932e3187
commit bd9e6fecc1
4 changed files with 62 additions and 27 deletions

View File

@ -56,7 +56,6 @@ import com.metamx.druid.query.segment.MultipleSpecificSegmentSpec;
import com.metamx.druid.query.segment.SegmentDescriptor;
import com.metamx.druid.result.BySegmentResultValueClass;
import com.metamx.druid.result.Result;
import org.joda.time.DateTime;
import org.joda.time.Interval;
@ -121,7 +120,8 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
final Map<String, CachePopulator> cachePopulatorMap = Maps.newHashMap();
final boolean useCache = Boolean.parseBoolean(query.getContextValue("useCache", "true")) && strategy != null;
final boolean populateCache = Boolean.parseBoolean(query.getContextValue("populateCache", "true")) && strategy != null;
final boolean populateCache = Boolean.parseBoolean(query.getContextValue("populateCache", "true"))
&& strategy != null;
final boolean isBySegment = Boolean.parseBoolean(query.getContextValue("bySegment", "false"));
final Query<T> rewrittenQuery;
@ -161,22 +161,22 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
}
final byte[] queryCacheKey;
if(strategy != null) {
if (strategy != null) {
queryCacheKey = strategy.computeCacheKey(query);
} else {
queryCacheKey = null;
}
// Pull cached segments from cache and remove from set of segments to query
if(useCache && queryCacheKey != null) {
if (useCache && queryCacheKey != null) {
Map<Pair<ServerSelector, SegmentDescriptor>, Cache.NamedKey> cacheKeys = Maps.newHashMap();
for(Pair<ServerSelector, SegmentDescriptor> e : segments) {
for (Pair<ServerSelector, SegmentDescriptor> e : segments) {
cacheKeys.put(e, computeSegmentCacheKey(e.lhs.getSegment().getIdentifier(), e.rhs, queryCacheKey));
}
Map<Cache.NamedKey, byte[]> cachedValues = cache.getBulk(cacheKeys.values());
for(Map.Entry<Pair<ServerSelector, SegmentDescriptor>, Cache.NamedKey> entry : cacheKeys.entrySet()) {
for (Map.Entry<Pair<ServerSelector, SegmentDescriptor>, Cache.NamedKey> entry : cacheKeys.entrySet()) {
Pair<ServerSelector, SegmentDescriptor> segment = entry.getKey();
Cache.NamedKey segmentCacheKey = entry.getValue();
@ -191,8 +191,7 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
// remove cached segment from set of segments to query
segments.remove(segment);
}
else {
} else {
final String segmentIdentifier = selector.getSegment().getIdentifier();
cachePopulatorMap.put(
String.format("%s_%s", segmentIdentifier, segmentQueryInterval),
@ -203,22 +202,22 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
}
// Compile list of all segments not pulled from cache
for(Pair<ServerSelector, SegmentDescriptor> segment : segments) {
for (Pair<ServerSelector, SegmentDescriptor> segment : segments) {
final QueryableDruidServer queryableDruidServer = segment.lhs.pick();
if (queryableDruidServer == null) {
log.error("No servers found for %s?! How can this be?!", segment.rhs);
} else {
final DruidServer server = queryableDruidServer.getServer();
List<SegmentDescriptor> descriptors = serverSegments.get(server);
if (descriptors == null) {
descriptors = Lists.newArrayList();
serverSegments.put(server, descriptors);
}
descriptors.add(segment.rhs);
}
final DruidServer server = queryableDruidServer.getServer();
List<SegmentDescriptor> descriptors = serverSegments.get(server);
if (descriptors == null) {
descriptors = Lists.newArrayList();
serverSegments.put(server, descriptors);
}
descriptors.add(segment.rhs);
}
return new LazySequence<T>(
@ -242,8 +241,7 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
);
if (strategy == null) {
return toolChest.mergeSequences(seq);
}
else {
} else {
return strategy.mergeSequences(seq);
}
}
@ -356,7 +354,11 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
);
}
private Cache.NamedKey computeSegmentCacheKey(String segmentIdentifier, SegmentDescriptor descriptor, byte[] queryCacheKey)
private Cache.NamedKey computeSegmentCacheKey(
String segmentIdentifier,
SegmentDescriptor descriptor,
byte[] queryCacheKey
)
{
final Interval segmentQueryInterval = descriptor.getInterval();
final byte[] versionBytes = descriptor.getVersion().getBytes();

View File

@ -149,9 +149,8 @@ public class ServerInventoryView implements ServerView, InventoryView
public DruidServer addInventory(final DruidServer container, String inventoryKey, final DataSegment inventory)
{
log.info("Server[%s] added segment[%s]", container.getName(), inventoryKey);
final DataSegment segment = container.getSegment(inventoryKey);
if (segment != null) {
if (container.getSegment(inventoryKey) != null) {
log.warn(
"Not adding or running callbacks for existing segment[%s] on server[%s]",
inventoryKey,

View File

@ -26,6 +26,7 @@ import com.metamx.common.logger.Logger;
import com.metamx.druid.partition.ImmutablePartitionHolder;
import com.metamx.druid.partition.PartitionChunk;
import com.metamx.druid.partition.PartitionHolder;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import java.util.ArrayList;
@ -78,7 +79,7 @@ public class VersionedIntervalTimeline<VersionType, ObjectType>
this.versionComparator = versionComparator;
}
public void add(Interval interval, VersionType version, PartitionChunk<ObjectType> object)
public void add(final Interval interval, VersionType version, PartitionChunk<ObjectType> object)
{
try {
lock.writeLock().lock();
@ -278,6 +279,13 @@ public class VersionedIntervalTimeline<VersionType, ObjectType>
addIntervalToTimeline(interval, entry, timeline);
}
/**
*
* @param timeline
* @param key
* @param entry
* @return boolean flag indicating whether or not we inserted or discarded something
*/
private boolean addAtKey(
NavigableMap<Interval, TimelineEntry> timeline,
Interval key,
@ -292,7 +300,7 @@ public class VersionedIntervalTimeline<VersionType, ObjectType>
return false;
}
while (currKey != null && currKey.overlaps(entryInterval)) {
while (entryInterval != null && currKey != null && currKey.overlaps(entryInterval)) {
Interval nextKey = timeline.higherKey(currKey);
int versionCompare = versionComparator.compare(
@ -311,7 +319,7 @@ public class VersionedIntervalTimeline<VersionType, ObjectType>
if (entryInterval.getEnd().isAfter(currKey.getEnd())) {
entryInterval = new Interval(currKey.getEnd(), entryInterval.getEnd());
} else {
entryInterval = null;
entryInterval = null; // discard this entry
}
}
} else if (versionCompare > 0) {
@ -491,4 +499,9 @@ public class VersionedIntervalTimeline<VersionType, ObjectType>
return partitionHolder;
}
}
public static void main(String[] args)
{
System.out.println(new Interval(new DateTime(), (DateTime) null));
}
}

View File

@ -1068,6 +1068,27 @@ public class VersionedIntervalTimelineTest
);
}
// |----3---||---1---|
// |---2---|
@Test
public void testOverlapCausesNullEntries() throws Exception
{
timeline = makeStringIntegerTimeline();
add("2011-01-01T12/2011-01-02", "3", 3);
add("2011-01-02/3011-01-03", "1", 1);
add("2011-01-01/2011-01-02", "2", 2);
assertValues(
Arrays.asList(
createExpected("2011-01-01/2011-01-01T12", "2", 2),
createExpected("2011-01-01T12/2011-01-02", "3", 3),
createExpected("2011-01-02/3011-01-03", "1", 1)
),
timeline.lookup(new Interval("2011-01-01/3011-01-03"))
);
}
// 1|----| |----|
// 2|------| |------|
// 3|------------------|