mirror of https://github.com/apache/druid.git
Revert "fixed input source sampler buildReader exp"
This reverts commit e688db8
This commit is contained in:
parent
e688db8503
commit
f46cc4faaf
|
@ -103,102 +103,90 @@ public class InputSourceSampler
|
|||
final File tempDir = FileUtils.createTempDir();
|
||||
closer.register(() -> FileUtils.deleteDirectory(tempDir));
|
||||
|
||||
try {
|
||||
final InputSourceReader reader = buildReader(
|
||||
nonNullSamplerConfig,
|
||||
nonNullDataSchema,
|
||||
inputSource,
|
||||
inputFormat,
|
||||
tempDir
|
||||
);
|
||||
try (final CloseableIterator<InputRowListPlusRawValues> iterator = reader.sample();
|
||||
final IncrementalIndex<Aggregator> index = buildIncrementalIndex(nonNullSamplerConfig, nonNullDataSchema);
|
||||
final Closer closer1 = closer) {
|
||||
List<SamplerResponseRow> responseRows = new ArrayList<>(nonNullSamplerConfig.getNumRows());
|
||||
int numRowsIndexed = 0;
|
||||
final InputSourceReader reader = buildReader(
|
||||
nonNullSamplerConfig,
|
||||
nonNullDataSchema,
|
||||
inputSource,
|
||||
inputFormat,
|
||||
tempDir
|
||||
);
|
||||
try (final CloseableIterator<InputRowListPlusRawValues> iterator = reader.sample();
|
||||
final IncrementalIndex<Aggregator> index = buildIncrementalIndex(nonNullSamplerConfig, nonNullDataSchema);
|
||||
final Closer closer1 = closer) {
|
||||
List<SamplerResponseRow> responseRows = new ArrayList<>(nonNullSamplerConfig.getNumRows());
|
||||
int numRowsIndexed = 0;
|
||||
|
||||
while (responseRows.size() < nonNullSamplerConfig.getNumRows() && iterator.hasNext()) {
|
||||
final InputRowListPlusRawValues inputRowListPlusRawValues = iterator.next();
|
||||
while (responseRows.size() < nonNullSamplerConfig.getNumRows() && iterator.hasNext()) {
|
||||
final InputRowListPlusRawValues inputRowListPlusRawValues = iterator.next();
|
||||
|
||||
final List<Map<String, Object>> rawColumnsList = inputRowListPlusRawValues.getRawValuesList();
|
||||
final List<Map<String, Object>> rawColumnsList = inputRowListPlusRawValues.getRawValuesList();
|
||||
|
||||
final ParseException parseException = inputRowListPlusRawValues.getParseException();
|
||||
if (parseException != null) {
|
||||
if (rawColumnsList != null) {
|
||||
// add all rows to response
|
||||
responseRows.addAll(rawColumnsList.stream()
|
||||
.map(rawColumns -> new SamplerResponseRow(
|
||||
rawColumns,
|
||||
null,
|
||||
true,
|
||||
parseException.getMessage()
|
||||
))
|
||||
.collect(Collectors.toList()));
|
||||
} else {
|
||||
// no data parsed, add one response row
|
||||
responseRows.add(new SamplerResponseRow(null, null, true, parseException.getMessage()));
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
List<InputRow> inputRows = inputRowListPlusRawValues.getInputRows();
|
||||
if (inputRows == null) {
|
||||
continue;
|
||||
}
|
||||
|
||||
for (int i = 0; i < inputRows.size(); i++) {
|
||||
// InputRowListPlusRawValues guarantees the size of rawColumnsList and inputRows are the same
|
||||
Map<String, Object> rawColumns = rawColumnsList == null ? null : rawColumnsList.get(i);
|
||||
InputRow row = inputRows.get(i);
|
||||
|
||||
//keep the index of the row to be added to responseRows for further use
|
||||
final int rowIndex = responseRows.size();
|
||||
IncrementalIndexAddResult addResult = index.add(new SamplerInputRow(row, rowIndex), true);
|
||||
if (addResult.hasParseException()) {
|
||||
responseRows.add(new SamplerResponseRow(
|
||||
rawColumns,
|
||||
null,
|
||||
true,
|
||||
addResult.getParseException().getMessage()
|
||||
));
|
||||
} else {
|
||||
// store the raw value; will be merged with the data from the IncrementalIndex later
|
||||
responseRows.add(new SamplerResponseRow(rawColumns, null, null, null));
|
||||
numRowsIndexed++;
|
||||
}
|
||||
final ParseException parseException = inputRowListPlusRawValues.getParseException();
|
||||
if (parseException != null) {
|
||||
if (rawColumnsList != null) {
|
||||
// add all rows to response
|
||||
responseRows.addAll(rawColumnsList.stream()
|
||||
.map(rawColumns -> new SamplerResponseRow(rawColumns, null, true, parseException.getMessage()))
|
||||
.collect(Collectors.toList()));
|
||||
} else {
|
||||
// no data parsed, add one response row
|
||||
responseRows.add(new SamplerResponseRow(null, null, true, parseException.getMessage()));
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
final List<String> columnNames = index.getColumnNames();
|
||||
columnNames.remove(SamplerInputRow.SAMPLER_ORDERING_COLUMN);
|
||||
List<InputRow> inputRows = inputRowListPlusRawValues.getInputRows();
|
||||
if (inputRows == null) {
|
||||
continue;
|
||||
}
|
||||
|
||||
for (Row row : index) {
|
||||
Map<String, Object> parsed = new HashMap<>();
|
||||
for (int i = 0; i < inputRows.size(); i++) {
|
||||
// InputRowListPlusRawValues guarantees the size of rawColumnsList and inputRows are the same
|
||||
Map<String, Object> rawColumns = rawColumnsList == null ? null : rawColumnsList.get(i);
|
||||
InputRow row = inputRows.get(i);
|
||||
|
||||
columnNames.forEach(k -> parsed.put(k, row.getRaw(k)));
|
||||
parsed.put(ColumnHolder.TIME_COLUMN_NAME, row.getTimestampFromEpoch());
|
||||
|
||||
Number sortKey = row.getMetric(SamplerInputRow.SAMPLER_ORDERING_COLUMN);
|
||||
if (sortKey != null) {
|
||||
responseRows.set(sortKey.intValue(), responseRows.get(sortKey.intValue()).withParsed(parsed));
|
||||
//keep the index of the row to be added to responseRows for further use
|
||||
final int rowIndex = responseRows.size();
|
||||
IncrementalIndexAddResult addResult = index.add(new SamplerInputRow(row, rowIndex), true);
|
||||
if (addResult.hasParseException()) {
|
||||
responseRows.add(new SamplerResponseRow(rawColumns, null, true, addResult.getParseException().getMessage()));
|
||||
} else {
|
||||
// store the raw value; will be merged with the data from the IncrementalIndex later
|
||||
responseRows.add(new SamplerResponseRow(rawColumns, null, null, null));
|
||||
numRowsIndexed++;
|
||||
}
|
||||
}
|
||||
|
||||
// make sure size of responseRows meets the input
|
||||
if (responseRows.size() > nonNullSamplerConfig.getNumRows()) {
|
||||
responseRows = responseRows.subList(0, nonNullSamplerConfig.getNumRows());
|
||||
}
|
||||
|
||||
int numRowsRead = responseRows.size();
|
||||
return new SamplerResponse(
|
||||
numRowsRead,
|
||||
numRowsIndexed,
|
||||
responseRows.stream()
|
||||
.filter(Objects::nonNull)
|
||||
.filter(x -> x.getParsed() != null || x.isUnparseable() != null)
|
||||
.collect(Collectors.toList())
|
||||
);
|
||||
}
|
||||
|
||||
final List<String> columnNames = index.getColumnNames();
|
||||
columnNames.remove(SamplerInputRow.SAMPLER_ORDERING_COLUMN);
|
||||
|
||||
for (Row row : index) {
|
||||
Map<String, Object> parsed = new HashMap<>();
|
||||
|
||||
columnNames.forEach(k -> parsed.put(k, row.getRaw(k)));
|
||||
parsed.put(ColumnHolder.TIME_COLUMN_NAME, row.getTimestampFromEpoch());
|
||||
|
||||
Number sortKey = row.getMetric(SamplerInputRow.SAMPLER_ORDERING_COLUMN);
|
||||
if (sortKey != null) {
|
||||
responseRows.set(sortKey.intValue(), responseRows.get(sortKey.intValue()).withParsed(parsed));
|
||||
}
|
||||
}
|
||||
|
||||
// make sure size of responseRows meets the input
|
||||
if (responseRows.size() > nonNullSamplerConfig.getNumRows()) {
|
||||
responseRows = responseRows.subList(0, nonNullSamplerConfig.getNumRows());
|
||||
}
|
||||
|
||||
int numRowsRead = responseRows.size();
|
||||
return new SamplerResponse(
|
||||
numRowsRead,
|
||||
numRowsIndexed,
|
||||
responseRows.stream()
|
||||
.filter(Objects::nonNull)
|
||||
.filter(x -> x.getParsed() != null || x.isUnparseable() != null)
|
||||
.collect(Collectors.toList())
|
||||
);
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw new SamplerException(e, "Failed to sample data: %s", e.getMessage());
|
||||
|
|
Loading…
Reference in New Issue