diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/InputSourceSampler.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/InputSourceSampler.java index c9984c42965..1b25279ef0d 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/InputSourceSampler.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/InputSourceSampler.java @@ -103,102 +103,90 @@ public class InputSourceSampler final File tempDir = FileUtils.createTempDir(); closer.register(() -> FileUtils.deleteDirectory(tempDir)); - try { - final InputSourceReader reader = buildReader( - nonNullSamplerConfig, - nonNullDataSchema, - inputSource, - inputFormat, - tempDir - ); - try (final CloseableIterator iterator = reader.sample(); - final IncrementalIndex index = buildIncrementalIndex(nonNullSamplerConfig, nonNullDataSchema); - final Closer closer1 = closer) { - List responseRows = new ArrayList<>(nonNullSamplerConfig.getNumRows()); - int numRowsIndexed = 0; + final InputSourceReader reader = buildReader( + nonNullSamplerConfig, + nonNullDataSchema, + inputSource, + inputFormat, + tempDir + ); + try (final CloseableIterator iterator = reader.sample(); + final IncrementalIndex index = buildIncrementalIndex(nonNullSamplerConfig, nonNullDataSchema); + final Closer closer1 = closer) { + List responseRows = new ArrayList<>(nonNullSamplerConfig.getNumRows()); + int numRowsIndexed = 0; - while (responseRows.size() < nonNullSamplerConfig.getNumRows() && iterator.hasNext()) { - final InputRowListPlusRawValues inputRowListPlusRawValues = iterator.next(); + while (responseRows.size() < nonNullSamplerConfig.getNumRows() && iterator.hasNext()) { + final InputRowListPlusRawValues inputRowListPlusRawValues = iterator.next(); - final List> rawColumnsList = inputRowListPlusRawValues.getRawValuesList(); + final List> rawColumnsList = inputRowListPlusRawValues.getRawValuesList(); - final ParseException parseException = inputRowListPlusRawValues.getParseException(); - if (parseException != null) { - if (rawColumnsList != null) { - // add all rows to response - responseRows.addAll(rawColumnsList.stream() - .map(rawColumns -> new SamplerResponseRow( - rawColumns, - null, - true, - parseException.getMessage() - )) - .collect(Collectors.toList())); - } else { - // no data parsed, add one response row - responseRows.add(new SamplerResponseRow(null, null, true, parseException.getMessage())); - } - continue; - } - - List inputRows = inputRowListPlusRawValues.getInputRows(); - if (inputRows == null) { - continue; - } - - for (int i = 0; i < inputRows.size(); i++) { - // InputRowListPlusRawValues guarantees the size of rawColumnsList and inputRows are the same - Map rawColumns = rawColumnsList == null ? null : rawColumnsList.get(i); - InputRow row = inputRows.get(i); - - //keep the index of the row to be added to responseRows for further use - final int rowIndex = responseRows.size(); - IncrementalIndexAddResult addResult = index.add(new SamplerInputRow(row, rowIndex), true); - if (addResult.hasParseException()) { - responseRows.add(new SamplerResponseRow( - rawColumns, - null, - true, - addResult.getParseException().getMessage() - )); - } else { - // store the raw value; will be merged with the data from the IncrementalIndex later - responseRows.add(new SamplerResponseRow(rawColumns, null, null, null)); - numRowsIndexed++; - } + final ParseException parseException = inputRowListPlusRawValues.getParseException(); + if (parseException != null) { + if (rawColumnsList != null) { + // add all rows to response + responseRows.addAll(rawColumnsList.stream() + .map(rawColumns -> new SamplerResponseRow(rawColumns, null, true, parseException.getMessage())) + .collect(Collectors.toList())); + } else { + // no data parsed, add one response row + responseRows.add(new SamplerResponseRow(null, null, true, parseException.getMessage())); } + continue; } - final List columnNames = index.getColumnNames(); - columnNames.remove(SamplerInputRow.SAMPLER_ORDERING_COLUMN); + List inputRows = inputRowListPlusRawValues.getInputRows(); + if (inputRows == null) { + continue; + } - for (Row row : index) { - Map parsed = new HashMap<>(); + for (int i = 0; i < inputRows.size(); i++) { + // InputRowListPlusRawValues guarantees the size of rawColumnsList and inputRows are the same + Map rawColumns = rawColumnsList == null ? null : rawColumnsList.get(i); + InputRow row = inputRows.get(i); - columnNames.forEach(k -> parsed.put(k, row.getRaw(k))); - parsed.put(ColumnHolder.TIME_COLUMN_NAME, row.getTimestampFromEpoch()); - - Number sortKey = row.getMetric(SamplerInputRow.SAMPLER_ORDERING_COLUMN); - if (sortKey != null) { - responseRows.set(sortKey.intValue(), responseRows.get(sortKey.intValue()).withParsed(parsed)); + //keep the index of the row to be added to responseRows for further use + final int rowIndex = responseRows.size(); + IncrementalIndexAddResult addResult = index.add(new SamplerInputRow(row, rowIndex), true); + if (addResult.hasParseException()) { + responseRows.add(new SamplerResponseRow(rawColumns, null, true, addResult.getParseException().getMessage())); + } else { + // store the raw value; will be merged with the data from the IncrementalIndex later + responseRows.add(new SamplerResponseRow(rawColumns, null, null, null)); + numRowsIndexed++; } } - - // make sure size of responseRows meets the input - if (responseRows.size() > nonNullSamplerConfig.getNumRows()) { - responseRows = responseRows.subList(0, nonNullSamplerConfig.getNumRows()); - } - - int numRowsRead = responseRows.size(); - return new SamplerResponse( - numRowsRead, - numRowsIndexed, - responseRows.stream() - .filter(Objects::nonNull) - .filter(x -> x.getParsed() != null || x.isUnparseable() != null) - .collect(Collectors.toList()) - ); } + + final List columnNames = index.getColumnNames(); + columnNames.remove(SamplerInputRow.SAMPLER_ORDERING_COLUMN); + + for (Row row : index) { + Map parsed = new HashMap<>(); + + columnNames.forEach(k -> parsed.put(k, row.getRaw(k))); + parsed.put(ColumnHolder.TIME_COLUMN_NAME, row.getTimestampFromEpoch()); + + Number sortKey = row.getMetric(SamplerInputRow.SAMPLER_ORDERING_COLUMN); + if (sortKey != null) { + responseRows.set(sortKey.intValue(), responseRows.get(sortKey.intValue()).withParsed(parsed)); + } + } + + // make sure size of responseRows meets the input + if (responseRows.size() > nonNullSamplerConfig.getNumRows()) { + responseRows = responseRows.subList(0, nonNullSamplerConfig.getNumRows()); + } + + int numRowsRead = responseRows.size(); + return new SamplerResponse( + numRowsRead, + numRowsIndexed, + responseRows.stream() + .filter(Objects::nonNull) + .filter(x -> x.getParsed() != null || x.isUnparseable() != null) + .collect(Collectors.toList()) + ); } catch (Exception e) { throw new SamplerException(e, "Failed to sample data: %s", e.getMessage());