mirror of https://github.com/apache/druid.git
fix currSize bug and maxTime bugs
This commit is contained in:
parent
e76fd18a36
commit
8af9598157
|
@ -124,7 +124,7 @@ public class DruidServer implements Comparable
|
|||
return segments.get(segmentName);
|
||||
}
|
||||
|
||||
public DruidServer addDataSegment(String segmentName, DataSegment segment)
|
||||
public DruidServer addDataSegment(String segmentId, DataSegment segment)
|
||||
{
|
||||
synchronized (lock) {
|
||||
String dataSourceName = segment.getDataSource();
|
||||
|
@ -138,10 +138,11 @@ public class DruidServer implements Comparable
|
|||
dataSources.put(dataSourceName, dataSource);
|
||||
}
|
||||
|
||||
dataSource.addSegment(segmentName, segment);
|
||||
segments.put(segmentName, segment);
|
||||
|
||||
currSize += segment.getSize();
|
||||
dataSource.addSegment(segmentId, segment);
|
||||
if (!segments.containsKey(segmentId)) {
|
||||
segments.put(segmentId, segment);
|
||||
currSize += segment.getSize();
|
||||
}
|
||||
}
|
||||
return this;
|
||||
}
|
||||
|
@ -156,13 +157,13 @@ public class DruidServer implements Comparable
|
|||
return this;
|
||||
}
|
||||
|
||||
public DruidServer removeDataSegment(String segmentName)
|
||||
public DruidServer removeDataSegment(String segmentId)
|
||||
{
|
||||
synchronized (lock) {
|
||||
DataSegment segment = segments.get(segmentName);
|
||||
DataSegment segment = segments.get(segmentId);
|
||||
|
||||
if (segment == null) {
|
||||
log.warn("Asked to remove data segment that doesn't exist!? server[%s], segment[%s]", getName(), segmentName);
|
||||
log.warn("Asked to remove data segment that doesn't exist!? server[%s], segment[%s]", getName(), segmentId);
|
||||
return this;
|
||||
}
|
||||
|
||||
|
@ -172,18 +173,21 @@ public class DruidServer implements Comparable
|
|||
log.warn(
|
||||
"Asked to remove data segment from dataSource[%s] that doesn't exist, but the segment[%s] exists!?!?!?! wtf? server[%s]",
|
||||
segment.getDataSource(),
|
||||
segmentName,
|
||||
segmentId,
|
||||
getName()
|
||||
);
|
||||
return this;
|
||||
}
|
||||
|
||||
dataSource.removePartition(segmentName);
|
||||
segments.remove(segmentName);
|
||||
dataSource.removePartition(segmentId);
|
||||
if (segments.containsKey(segmentId)) {
|
||||
segments.remove(segmentId);
|
||||
currSize -= segment.getSize();
|
||||
}
|
||||
|
||||
if (dataSource.isEmpty()) {
|
||||
dataSources.remove(dataSource.getName());
|
||||
}
|
||||
currSize -= segment.getSize();
|
||||
}
|
||||
|
||||
return this;
|
||||
|
|
|
@ -115,11 +115,14 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
|
|||
public Iterable<Cursor> makeCursors(final Filter filter, final Interval interval, final QueryGranularity gran)
|
||||
{
|
||||
Interval actualIntervalTmp = interval;
|
||||
Interval dataInterval = getInterval();
|
||||
if (!actualIntervalTmp.overlaps(dataInterval)) {
|
||||
final Interval indexInterval = getInterval();
|
||||
|
||||
if (!actualIntervalTmp.overlaps(indexInterval)) {
|
||||
return ImmutableList.of();
|
||||
}
|
||||
|
||||
final Interval dataInterval = new Interval(getMinTime().getMillis(), gran.next(getMaxTime().getMillis()));
|
||||
|
||||
if (actualIntervalTmp.getStart().isBefore(dataInterval.getStart())) {
|
||||
actualIntervalTmp = actualIntervalTmp.withStart(dataInterval.getStart());
|
||||
}
|
||||
|
@ -367,7 +370,7 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
|
|||
final String columnName = column.toLowerCase();
|
||||
final Integer metricIndexInt = index.getMetricIndex(columnName);
|
||||
|
||||
if(metricIndexInt != null) {
|
||||
if (metricIndexInt != null) {
|
||||
final int metricIndex = metricIndexInt;
|
||||
|
||||
final ComplexMetricSerde serde = ComplexMetrics.getSerdeForType(index.getMetricType(columnName));
|
||||
|
@ -390,7 +393,7 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
|
|||
|
||||
final Integer dimensionIndexInt = index.getDimensionIndex(columnName);
|
||||
|
||||
if(dimensionIndexInt != null) {
|
||||
if (dimensionIndexInt != null) {
|
||||
final int dimensionIndex = dimensionIndexInt;
|
||||
return new ObjectColumnSelector<String>()
|
||||
{
|
||||
|
@ -404,10 +407,14 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
|
|||
public String get()
|
||||
{
|
||||
final String[] dimVals = currEntry.getKey().getDims()[dimensionIndex];
|
||||
if(dimVals.length == 1) return dimVals[0];
|
||||
if(dimVals.length == 0) return null;
|
||||
if (dimVals.length == 1) {
|
||||
return dimVals[0];
|
||||
}
|
||||
if (dimVals.length == 0) {
|
||||
return null;
|
||||
}
|
||||
throw new UnsupportedOperationException(
|
||||
"makeObjectColumnSelector does not support multivalued columns"
|
||||
"makeObjectColumnSelector does not support multivalued columns"
|
||||
);
|
||||
}
|
||||
};
|
||||
|
|
|
@ -136,11 +136,14 @@ public class QueryableIndexStorageAdapter extends BaseStorageAdapter
|
|||
public Iterable<Cursor> makeCursors(Filter filter, Interval interval, QueryGranularity gran)
|
||||
{
|
||||
Interval actualInterval = interval;
|
||||
final Interval dataInterval = getInterval();
|
||||
if (!actualInterval.overlaps(dataInterval)) {
|
||||
final Interval indexInterval = getInterval();
|
||||
|
||||
if (!actualInterval.overlaps(indexInterval)) {
|
||||
return ImmutableList.of();
|
||||
}
|
||||
|
||||
final Interval dataInterval = new Interval(getMinTime().getMillis(), gran.next(getMaxTime().getMillis()));
|
||||
|
||||
if (actualInterval.getStart().isBefore(dataInterval.getStart())) {
|
||||
actualInterval = actualInterval.withStart(dataInterval.getStart());
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue