diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/InputSourceSampler.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/InputSourceSampler.java index 1b25279ef0d..c9984c42965 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/InputSourceSampler.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/InputSourceSampler.java @@ -103,90 +103,102 @@ public class InputSourceSampler final File tempDir = FileUtils.createTempDir(); closer.register(() -> FileUtils.deleteDirectory(tempDir)); - final InputSourceReader reader = buildReader( - nonNullSamplerConfig, - nonNullDataSchema, - inputSource, - inputFormat, - tempDir - ); - try (final CloseableIterator iterator = reader.sample(); - final IncrementalIndex index = buildIncrementalIndex(nonNullSamplerConfig, nonNullDataSchema); - final Closer closer1 = closer) { - List responseRows = new ArrayList<>(nonNullSamplerConfig.getNumRows()); - int numRowsIndexed = 0; - - while (responseRows.size() < nonNullSamplerConfig.getNumRows() && iterator.hasNext()) { - final InputRowListPlusRawValues inputRowListPlusRawValues = iterator.next(); - - final List> rawColumnsList = inputRowListPlusRawValues.getRawValuesList(); - - final ParseException parseException = inputRowListPlusRawValues.getParseException(); - if (parseException != null) { - if (rawColumnsList != null) { - // add all rows to response - responseRows.addAll(rawColumnsList.stream() - .map(rawColumns -> new SamplerResponseRow(rawColumns, null, true, parseException.getMessage())) - .collect(Collectors.toList())); - } else { - // no data parsed, add one response row - responseRows.add(new SamplerResponseRow(null, null, true, parseException.getMessage())); - } - continue; - } - - List inputRows = inputRowListPlusRawValues.getInputRows(); - if (inputRows == null) { - continue; - } - - for (int i = 0; i < inputRows.size(); i++) { - // InputRowListPlusRawValues guarantees the size of rawColumnsList and inputRows are the same - Map rawColumns = rawColumnsList == null ? null : rawColumnsList.get(i); - InputRow row = inputRows.get(i); - - //keep the index of the row to be added to responseRows for further use - final int rowIndex = responseRows.size(); - IncrementalIndexAddResult addResult = index.add(new SamplerInputRow(row, rowIndex), true); - if (addResult.hasParseException()) { - responseRows.add(new SamplerResponseRow(rawColumns, null, true, addResult.getParseException().getMessage())); - } else { - // store the raw value; will be merged with the data from the IncrementalIndex later - responseRows.add(new SamplerResponseRow(rawColumns, null, null, null)); - numRowsIndexed++; - } - } - } - - final List columnNames = index.getColumnNames(); - columnNames.remove(SamplerInputRow.SAMPLER_ORDERING_COLUMN); - - for (Row row : index) { - Map parsed = new HashMap<>(); - - columnNames.forEach(k -> parsed.put(k, row.getRaw(k))); - parsed.put(ColumnHolder.TIME_COLUMN_NAME, row.getTimestampFromEpoch()); - - Number sortKey = row.getMetric(SamplerInputRow.SAMPLER_ORDERING_COLUMN); - if (sortKey != null) { - responseRows.set(sortKey.intValue(), responseRows.get(sortKey.intValue()).withParsed(parsed)); - } - } - - // make sure size of responseRows meets the input - if (responseRows.size() > nonNullSamplerConfig.getNumRows()) { - responseRows = responseRows.subList(0, nonNullSamplerConfig.getNumRows()); - } - - int numRowsRead = responseRows.size(); - return new SamplerResponse( - numRowsRead, - numRowsIndexed, - responseRows.stream() - .filter(Objects::nonNull) - .filter(x -> x.getParsed() != null || x.isUnparseable() != null) - .collect(Collectors.toList()) + try { + final InputSourceReader reader = buildReader( + nonNullSamplerConfig, + nonNullDataSchema, + inputSource, + inputFormat, + tempDir ); + try (final CloseableIterator iterator = reader.sample(); + final IncrementalIndex index = buildIncrementalIndex(nonNullSamplerConfig, nonNullDataSchema); + final Closer closer1 = closer) { + List responseRows = new ArrayList<>(nonNullSamplerConfig.getNumRows()); + int numRowsIndexed = 0; + + while (responseRows.size() < nonNullSamplerConfig.getNumRows() && iterator.hasNext()) { + final InputRowListPlusRawValues inputRowListPlusRawValues = iterator.next(); + + final List> rawColumnsList = inputRowListPlusRawValues.getRawValuesList(); + + final ParseException parseException = inputRowListPlusRawValues.getParseException(); + if (parseException != null) { + if (rawColumnsList != null) { + // add all rows to response + responseRows.addAll(rawColumnsList.stream() + .map(rawColumns -> new SamplerResponseRow( + rawColumns, + null, + true, + parseException.getMessage() + )) + .collect(Collectors.toList())); + } else { + // no data parsed, add one response row + responseRows.add(new SamplerResponseRow(null, null, true, parseException.getMessage())); + } + continue; + } + + List inputRows = inputRowListPlusRawValues.getInputRows(); + if (inputRows == null) { + continue; + } + + for (int i = 0; i < inputRows.size(); i++) { + // InputRowListPlusRawValues guarantees the size of rawColumnsList and inputRows are the same + Map rawColumns = rawColumnsList == null ? null : rawColumnsList.get(i); + InputRow row = inputRows.get(i); + + //keep the index of the row to be added to responseRows for further use + final int rowIndex = responseRows.size(); + IncrementalIndexAddResult addResult = index.add(new SamplerInputRow(row, rowIndex), true); + if (addResult.hasParseException()) { + responseRows.add(new SamplerResponseRow( + rawColumns, + null, + true, + addResult.getParseException().getMessage() + )); + } else { + // store the raw value; will be merged with the data from the IncrementalIndex later + responseRows.add(new SamplerResponseRow(rawColumns, null, null, null)); + numRowsIndexed++; + } + } + } + + final List columnNames = index.getColumnNames(); + columnNames.remove(SamplerInputRow.SAMPLER_ORDERING_COLUMN); + + for (Row row : index) { + Map parsed = new HashMap<>(); + + columnNames.forEach(k -> parsed.put(k, row.getRaw(k))); + parsed.put(ColumnHolder.TIME_COLUMN_NAME, row.getTimestampFromEpoch()); + + Number sortKey = row.getMetric(SamplerInputRow.SAMPLER_ORDERING_COLUMN); + if (sortKey != null) { + responseRows.set(sortKey.intValue(), responseRows.get(sortKey.intValue()).withParsed(parsed)); + } + } + + // make sure size of responseRows meets the input + if (responseRows.size() > nonNullSamplerConfig.getNumRows()) { + responseRows = responseRows.subList(0, nonNullSamplerConfig.getNumRows()); + } + + int numRowsRead = responseRows.size(); + return new SamplerResponse( + numRowsRead, + numRowsIndexed, + responseRows.stream() + .filter(Objects::nonNull) + .filter(x -> x.getParsed() != null || x.isUnparseable() != null) + .collect(Collectors.toList()) + ); + } } catch (Exception e) { throw new SamplerException(e, "Failed to sample data: %s", e.getMessage());