Accept total rows over Integer.MAX_VALUE. (#6080)

This commit is contained in:
es1220 2018-08-16 06:03:22 +09:00 committed by Jonathan Wei
parent 5ce3185b9c
commit 5726692f8f
2 changed files with 10 additions and 10 deletions

View File

@ -577,7 +577,7 @@ public class DeterminePartitionsJob implements Jobby
} }
// Respect "poisoning" (negative values mean we can't use this dimension) // Respect "poisoning" (negative values mean we can't use this dimension)
final int newNumRows = (arg1.numRows >= 0 && arg2.numRows >= 0 ? arg1.numRows + arg2.numRows : -1); final long newNumRows = (arg1.numRows >= 0 && arg2.numRows >= 0 ? arg1.numRows + arg2.numRows : -1);
return new DimValueCount(arg1.dim, arg1.value, newNumRows); return new DimValueCount(arg1.dim, arg1.value, newNumRows);
} }
} }
@ -620,7 +620,7 @@ public class DeterminePartitionsJob implements Jobby
// First DVC should be the total row count indicator // First DVC should be the total row count indicator
final DimValueCount firstDvc = iterator.next(); final DimValueCount firstDvc = iterator.next();
final int totalRows = firstDvc.numRows; final long totalRows = firstDvc.numRows;
if (!"".equals(firstDvc.dim) || !"".equals(firstDvc.value)) { if (!"".equals(firstDvc.dim) || !"".equals(firstDvc.value)) {
throw new IllegalStateException("WTF?! Expected total row indicator on first k/v pair!"); throw new IllegalStateException("WTF?! Expected total row indicator on first k/v pair!");
@ -899,9 +899,9 @@ public class DeterminePartitionsJob implements Jobby
return distance; return distance;
} }
public int getRows() public long getRows()
{ {
int sum = 0; long sum = 0;
for (final DimPartition dimPartition : partitions) { for (final DimPartition dimPartition : partitions) {
sum += dimPartition.rows; sum += dimPartition.rows;
} }
@ -913,16 +913,16 @@ public class DeterminePartitionsJob implements Jobby
{ {
public ShardSpec shardSpec = null; public ShardSpec shardSpec = null;
public int cardinality = 0; public int cardinality = 0;
public int rows = 0; public long rows = 0;
} }
private static class DimValueCount private static class DimValueCount
{ {
public final String dim; public final String dim;
public final String value; public final String value;
public final int numRows; public final long numRows;
private DimValueCount(String dim, String value, int numRows) private DimValueCount(String dim, String value, long numRows)
{ {
this.dim = dim; this.dim = dim;
this.value = value; this.value = value;
@ -938,7 +938,7 @@ public class DeterminePartitionsJob implements Jobby
{ {
final Iterator<String> splits = TAB_SPLITTER.limit(3).split(text.toString()).iterator(); final Iterator<String> splits = TAB_SPLITTER.limit(3).split(text.toString()).iterator();
final String dim = splits.next(); final String dim = splits.next();
final int numRows = Integer.parseInt(splits.next()); final long numRows = Long.parseLong(splits.next());
final String value = splits.next(); final String value = splits.next();
return new DimValueCount(dim, value, numRows); return new DimValueCount(dim, value, numRows);

View File

@ -52,12 +52,12 @@ public class DatasourceRecordReader extends RecordReader<NullWritable, InputRow>
private DatasourceIngestionSpec spec; private DatasourceIngestionSpec spec;
private IngestSegmentFirehose firehose; private IngestSegmentFirehose firehose;
private int rowNum; private long rowNum;
private Row currRow; private Row currRow;
private List<QueryableIndex> indexes = Lists.newArrayList(); private List<QueryableIndex> indexes = Lists.newArrayList();
private List<File> tmpSegmentDirs = Lists.newArrayList(); private List<File> tmpSegmentDirs = Lists.newArrayList();
private int numRows; private long numRows;
@Override @Override
public void initialize(InputSplit split, final TaskAttemptContext context) throws IOException public void initialize(InputSplit split, final TaskAttemptContext context) throws IOException