fix nulls in realtime persist

This commit is contained in:
fjy 2013-12-05 17:59:02 -08:00
parent 6f079bcc8f
commit 5ba05a46f6
3 changed files with 29 additions and 3 deletions

View File

@ -52,4 +52,13 @@ public class PagingSpec
{
return threshold;
}
@Override
public String toString()
{
return "PagingSpec{" +
"pagingIdentifiers=" + pagingIdentifiers +
", threshold=" + threshold +
'}';
}
}

View File

@ -118,7 +118,9 @@ public class SelectQueryEngine
final DimensionSelector selector = dimSelector.getValue();
final IndexedInts vals = selector.getRow();
if (vals.size() <= 1) {
if (vals.size() == 0) {
continue;
} else if (vals.size() <= 1) {
final String dimVal = selector.lookupName(vals.get(0));
theEvent.put(dim, dimVal);
} else {

View File

@ -43,6 +43,7 @@ import io.druid.server.coordination.DataSegmentAnnouncer;
import io.druid.timeline.DataSegment;
import io.druid.timeline.TimelineObjectHolder;
import io.druid.timeline.VersionedIntervalTimeline;
import io.druid.timeline.partition.NoneShardSpec;
import io.druid.timeline.partition.SingleElementPartitionChunk;
import org.apache.commons.io.FileUtils;
import org.joda.time.DateTime;
@ -495,7 +496,16 @@ public class RealtimePlumber implements Plumber
hydrants.add(
new FireHydrant(
new QueryableIndexSegment(null, IndexIO.loadIndex(segmentDir)),
new QueryableIndexSegment(
DataSegment.makeDataSegmentIdentifier(
schema.getDataSource(),
sinkInterval.getStart(),
sinkInterval.getEnd(),
versioningPolicy.getVersion(sinkInterval),
new NoneShardSpec()
),
IndexIO.loadIndex(segmentDir)
),
Integer.parseInt(segmentDir.getName())
)
);
@ -646,7 +656,12 @@ public class RealtimePlumber implements Plumber
new File(computePersistDir(schema, interval), String.valueOf(indexToPersist.getCount()))
);
indexToPersist.swapSegment(new QueryableIndexSegment(null, IndexIO.loadIndex(persistedFile)));
indexToPersist.swapSegment(
new QueryableIndexSegment(
indexToPersist.getSegment().getIdentifier(),
IndexIO.loadIndex(persistedFile)
)
);
return numRows;
}