Add InputStats to track bytes processed by a task (#13520)

This commit adds a new class `InputStats` to track the total bytes processed by a task.
The field `processedBytes` is published in task reports along with other row stats.

Major changes:
- Add class `InputStats` to track processed bytes
- Add method `InputSourceReader.read(InputStats)` to read input rows while counting bytes.
> Since we need to count the bytes, we could not just have a wrapper around `InputSourceReader` or `InputEntityReader` (the way `CountableInputSourceReader` does) because the `InputSourceReader` only deals with `InputRow`s and the byte information is already lost.
- Classic batch: Use the new `InputSourceReader.read(inputStats)` in `AbstractBatchIndexTask`
- Streaming: Increment `processedBytes` in `StreamChunkParser`. This does not use the new `InputSourceReader.read(inputStats)` method.
- Extend `InputStats` with `RowIngestionMeters` so that bytes can be exposed in task reports

Other changes:
- Update tests to verify the value of `processedBytes`
- Rename `MutableRowIngestionMeters` to `SimpleRowIngestionMeters` and remove duplicate class
- Replace `CacheTestSegmentCacheManager` with `NoopSegmentCacheManager`
- Refactor `KafkaIndexTaskTest` and `KinesisIndexTaskTest`
This commit is contained in:
Kashif Faraz 2022-12-13 18:54:42 +05:30 committed by GitHub
parent 7682b0b6b1
commit 58a3acc2c4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
56 changed files with 1783 additions and 2369 deletions

View File

@ -0,0 +1,123 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.data.input;
import com.google.common.base.Predicate;
import com.google.common.io.CountingInputStream;
import org.apache.druid.java.util.common.FileUtils;
import javax.annotation.Nullable;
import java.io.File;
import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
/**
* Wrapper around an {@link InputEntity} that counts the number of bytes read.
*/
public class BytesCountingInputEntity implements InputEntity
{
private final InputStats inputStats;
private final InputEntity baseInputEntity;
public BytesCountingInputEntity(InputEntity baseInputEntity, InputStats inputStats)
{
this.baseInputEntity = baseInputEntity;
this.inputStats = inputStats;
}
@Nullable
@Override
public URI getUri()
{
return baseInputEntity.getUri();
}
@Override
public InputStream open() throws IOException
{
return new BytesCountingInputStream(baseInputEntity.open(), inputStats);
}
public InputEntity getBaseInputEntity()
{
return baseInputEntity;
}
@Override
public CleanableFile fetch(File temporaryDirectory, byte[] fetchBuffer) throws IOException
{
final CleanableFile cleanableFile = baseInputEntity.fetch(temporaryDirectory, fetchBuffer);
inputStats.incrementProcessedBytes(FileUtils.getFileSize(cleanableFile.file()));
return cleanableFile;
}
@Override
public Predicate<Throwable> getRetryCondition()
{
return baseInputEntity.getRetryCondition();
}
/**
* Wraps an input stream, and counts the number of bytes read.
* <p>
* Similar to {@link CountingInputStream} but does not reset count on call to
* {@link CountingInputStream#reset()}.
*/
private static class BytesCountingInputStream extends FilterInputStream
{
private final InputStats inputStats;
BytesCountingInputStream(@Nullable InputStream in, InputStats inputStats)
{
super(in);
this.inputStats = inputStats;
}
@Override
public int read() throws IOException
{
int result = in.read();
if (result != -1) {
inputStats.incrementProcessedBytes(1);
}
return result;
}
@Override
public int read(byte[] b, int off, int len) throws IOException
{
int result = in.read(b, off, len);
if (result != -1) {
inputStats.incrementProcessedBytes(result);
}
return result;
}
@Override
public long skip(long n) throws IOException
{
long result = in.skip(n);
inputStats.incrementProcessedBytes(result);
return result;
}
}
}

View File

@ -37,7 +37,12 @@ import java.io.IOException;
@UnstableApi
public interface InputSourceReader
{
CloseableIterator<InputRow> read() throws IOException;
default CloseableIterator<InputRow> read() throws IOException
{
return read(null);
}
CloseableIterator<InputRow> read(InputStats inputStats) throws IOException;
CloseableIterator<InputRowListPlusRawValues> sample() throws IOException;
}

View File

@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.data.input;
/**
* Tracks bytes read from an input source.
*/
public interface InputStats
{
void incrementProcessedBytes(long incrementByValue);
long getProcessedBytes();
}

View File

@ -24,6 +24,7 @@ import org.apache.druid.data.input.FirehoseFactory;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowListPlusRawValues;
import org.apache.druid.data.input.InputSourceReader;
import org.apache.druid.data.input.InputStats;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import java.io.File;
@ -48,7 +49,7 @@ public class FirehoseToInputSourceReaderAdaptor implements InputSourceReader
}
@Override
public CloseableIterator<InputRow> read() throws IOException
public CloseableIterator<InputRow> read(InputStats inputStats) throws IOException
{
return new CloseableIterator<InputRow>()
{

View File

@ -19,6 +19,7 @@
package org.apache.druid.data.input.impl;
import org.apache.druid.data.input.BytesCountingInputEntity;
import org.apache.druid.data.input.InputEntity;
import org.apache.druid.data.input.InputEntityReader;
import org.apache.druid.data.input.InputFormat;
@ -26,6 +27,7 @@ import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowListPlusRawValues;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.InputSourceReader;
import org.apache.druid.data.input.InputStats;
import org.apache.druid.java.util.common.CloseableIterators;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
@ -69,12 +71,13 @@ public class InputEntityIteratingReader implements InputSourceReader
}
@Override
public CloseableIterator<InputRow> read()
public CloseableIterator<InputRow> read(InputStats inputStats)
{
return createIterator(entity -> {
// InputEntityReader is stateful and so a new one should be created per entity.
try {
final InputEntityReader reader = inputFormat.createReader(inputRowSchema, entity, temporaryDirectory);
final InputEntity entityToRead = inputStats == null ? entity : new BytesCountingInputEntity(entity, inputStats);
final InputEntityReader reader = inputFormat.createReader(inputRowSchema, entityToRead, temporaryDirectory);
return reader.read();
}
catch (IOException e) {

View File

@ -22,6 +22,7 @@ package org.apache.druid.data.input.impl;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowListPlusRawValues;
import org.apache.druid.data.input.InputSourceReader;
import org.apache.druid.data.input.InputStats;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.logger.Logger;
@ -47,10 +48,10 @@ public class TimedShutoffInputSourceReader implements InputSourceReader
}
@Override
public CloseableIterator<InputRow> read() throws IOException
public CloseableIterator<InputRow> read(InputStats inputStats) throws IOException
{
final ScheduledExecutorService shutdownExec = Execs.scheduledSingleThreaded("timed-shutoff-reader-%d");
final CloseableIterator<InputRow> delegateIterator = delegate.read();
final CloseableIterator<InputRow> delegateIterator = delegate.read(inputStats);
return decorateShutdownTimeout(shutdownExec, delegateIterator);
}

View File

@ -388,6 +388,29 @@ public class FileUtils
}
}
/**
* Computes the size of the file. If it is a directory, computes the size up
* to a depth of 1.
*/
public static long getFileSize(File file)
{
if (file == null) {
return 0;
} else if (file.isDirectory()) {
File[] children = file.listFiles();
if (children == null) {
return 0;
}
long totalSize = 0;
for (File child : children) {
totalSize += child.length();
}
return totalSize;
} else {
return file.length();
}
}
/**
* Creates a temporary directory inside the configured temporary space (java.io.tmpdir). Similar to the method
* {@link com.google.common.io.Files#createTempDir()} from Guava, but has nicer error messages.

View File

@ -0,0 +1,145 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.data.input;
import org.apache.druid.data.input.impl.ByteEntity;
import org.apache.druid.data.input.impl.FileEntity;
import org.apache.druid.data.input.impl.InputStatsImpl;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
public class BytesCountingInputEntityTest
{
@Rule
public TemporaryFolder folder = new TemporaryFolder();
private InputStats inputStats;
@Before
public void setup()
{
inputStats = new InputStatsImpl();
}
@Test
public void testFetch() throws IOException
{
final int fileSize = 200;
final File sourceFile = folder.newFile("testWithFileEntity");
writeBytesToFile(sourceFile, fileSize);
final BytesCountingInputEntity inputEntity = new BytesCountingInputEntity(new FileEntity(sourceFile), inputStats);
inputEntity.fetch(folder.newFolder(), new byte[50]);
Assert.assertEquals(fileSize, inputStats.getProcessedBytes());
}
@Test
public void testFetchFromPartiallyReadFile() throws IOException
{
final int fileSize = 200;
final File sourceFile = folder.newFile("testWithFileEntity");
writeBytesToFile(sourceFile, fileSize);
final int bufferSize = 50;
final byte[] intermediateBuffer = new byte[bufferSize];
// Read the file partially
final BytesCountingInputEntity inputEntity = new BytesCountingInputEntity(new FileEntity(sourceFile), inputStats);
inputEntity.open().read(intermediateBuffer);
Assert.assertEquals(bufferSize, inputStats.getProcessedBytes());
// Read the whole file again
inputEntity.fetch(folder.newFolder(), intermediateBuffer);
Assert.assertEquals(fileSize + bufferSize, inputStats.getProcessedBytes());
}
@Test
public void testFetchFromDirectory() throws IOException
{
final File sourceDir = folder.newFolder("testWithDirectory");
final int fileSize1 = 100;
final File sourceFile1 = new File(sourceDir, "file1");
writeBytesToFile(sourceFile1, fileSize1);
final int fileSize2 = 200;
final File sourceFile2 = new File(sourceDir, "file2");
writeBytesToFile(sourceFile2, fileSize2);
final BytesCountingInputEntity inputEntity = new BytesCountingInputEntity(new FileEntity(sourceDir), inputStats);
inputEntity.fetch(folder.newFolder(), new byte[1000]);
Assert.assertEquals(fileSize1 + fileSize2, inputStats.getProcessedBytes());
}
@Test
public void testOpen() throws IOException
{
final int entitySize = 100;
final BytesCountingInputEntity inputEntity = new BytesCountingInputEntity(
new ByteEntity(new byte[entitySize]),
inputStats
);
inputEntity.open().read(new byte[200]);
Assert.assertEquals(entitySize, inputStats.getProcessedBytes());
}
@Test
public void testOpenWithSmallBuffer() throws IOException
{
final int entitySize = 100;
final int bufferSize = 50;
final BytesCountingInputEntity inputEntity = new BytesCountingInputEntity(
new ByteEntity(new byte[entitySize]),
inputStats
);
inputEntity.open().read(new byte[bufferSize]);
Assert.assertEquals(bufferSize, inputStats.getProcessedBytes());
}
private void writeBytesToFile(File sourceFile, int numBytes) throws IOException
{
if (!sourceFile.exists()) {
sourceFile.createNewFile();
}
final OutputStreamWriter outputStreamWriter = new OutputStreamWriter(
new FileOutputStream(sourceFile),
StandardCharsets.UTF_8
);
char[] chars = new char[numBytes];
Arrays.fill(chars, ' ');
outputStreamWriter.write(chars);
outputStreamWriter.flush();
outputStreamWriter.close();
}
}

View File

@ -24,9 +24,11 @@ import com.google.common.collect.Iterables;
import org.apache.druid.data.input.ColumnsFilter;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.InputStats;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
@ -42,7 +44,7 @@ import java.nio.file.Files;
import java.util.ArrayList;
import java.util.List;
public class InputEntityIteratingReaderTest
public class InputEntityIteratingReaderTest extends InitializedNullHandlingTest
{
@Rule
public final TemporaryFolder temporaryFolder = new TemporaryFolder();
@ -52,6 +54,7 @@ public class InputEntityIteratingReaderTest
{
final int numFiles = 5;
final List<File> files = new ArrayList<>();
long totalFileSize = 0;
for (int i = 0; i < numFiles; i++) {
final File file = temporaryFolder.newFile("test_" + i);
files.add(file);
@ -59,8 +62,10 @@ public class InputEntityIteratingReaderTest
writer.write(StringUtils.format("%d,%s,%d\n", 20190101 + i, "name_" + i, i));
writer.write(StringUtils.format("%d,%s,%d", 20190102 + i, "name_" + (i + 1), i + 1));
}
totalFileSize += file.length();
}
final InputEntityIteratingReader firehose = new InputEntityIteratingReader(
final InputEntityIteratingReader reader = new InputEntityIteratingReader(
new InputRowSchema(
new TimestampSpec("time", "yyyyMMdd", null),
new DimensionsSpec(
@ -79,7 +84,8 @@ public class InputEntityIteratingReaderTest
temporaryFolder.newFolder()
);
try (CloseableIterator<InputRow> iterator = firehose.read()) {
final InputStats inputStats = new InputStatsImpl();
try (CloseableIterator<InputRow> iterator = reader.read(inputStats)) {
int i = 0;
while (iterator.hasNext()) {
InputRow row = iterator.next();
@ -95,6 +101,7 @@ public class InputEntityIteratingReaderTest
i++;
}
Assert.assertEquals(numFiles, i);
Assert.assertEquals(totalFileSize, inputStats.getProcessedBytes());
}
}

View File

@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.data.input.impl;
import org.apache.druid.data.input.InputStats;
public class InputStatsImpl implements InputStats
{
private long processedBytes;
@Override
public void incrementProcessedBytes(long incrementByValue)
{
processedBytes += incrementByValue;
}
@Override
public long getProcessedBytes()
{
return processedBytes;
}
}

View File

@ -40,7 +40,6 @@ import javax.annotation.Nullable;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.nio.ByteBuffer;
import java.util.Locale;
import java.util.Set;
public class FunctionTest extends InitializedNullHandlingTest
@ -525,7 +524,7 @@ public class FunctionTest extends InitializedNullHandlingTest
);
for (Pair<String, String> argAndType : invalidArguments) {
try {
assertExpr(String.format(Locale.ENGLISH, "round(d, %s)", argAndType.lhs), null);
assertExpr(StringUtils.format("round(d, %s)", argAndType.lhs), null);
Assert.fail("Did not throw IllegalArgumentException");
}
catch (ExpressionValidationException e) {

View File

@ -44,10 +44,12 @@ import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.InputSourceReader;
import org.apache.druid.data.input.InputSplit;
import org.apache.druid.data.input.InputStats;
import org.apache.druid.data.input.MaxSizeSplitHintSpec;
import org.apache.druid.data.input.impl.CloudObjectLocation;
import org.apache.druid.data.input.impl.CsvInputFormat;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.InputStatsImpl;
import org.apache.druid.data.input.impl.JsonInputFormat;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.initialization.DruidModule;
@ -481,7 +483,8 @@ public class OssInputSourceTest extends InitializedNullHandlingTest
temporaryFolder.newFolder()
);
CloseableIterator<InputRow> iterator = reader.read();
final InputStats inputStats = new InputStatsImpl();
CloseableIterator<InputRow> iterator = reader.read(inputStats);
while (iterator.hasNext()) {
InputRow nextRow = iterator.next();
@ -490,6 +493,7 @@ public class OssInputSourceTest extends InitializedNullHandlingTest
Assert.assertEquals("world", nextRow.getDimension("dim2").get(0));
}
Assert.assertEquals(2 * CONTENT.length, inputStats.getProcessedBytes());
EasyMock.verify(OSSCLIENT);
}
@ -525,7 +529,8 @@ public class OssInputSourceTest extends InitializedNullHandlingTest
temporaryFolder.newFolder()
);
CloseableIterator<InputRow> iterator = reader.read();
final InputStats inputStats = new InputStatsImpl();
CloseableIterator<InputRow> iterator = reader.read(inputStats);
while (iterator.hasNext()) {
InputRow nextRow = iterator.next();
@ -534,6 +539,7 @@ public class OssInputSourceTest extends InitializedNullHandlingTest
Assert.assertEquals("world", nextRow.getDimension("dim2").get(0));
}
Assert.assertEquals(2 * CONTENT.length, inputStats.getProcessedBytes());
EasyMock.verify(OSSCLIENT);
}

View File

@ -36,10 +36,12 @@ import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.InputSourceReader;
import org.apache.druid.data.input.InputSplit;
import org.apache.druid.data.input.InputStats;
import org.apache.druid.data.input.MaxSizeSplitHintSpec;
import org.apache.druid.data.input.impl.CloudObjectLocation;
import org.apache.druid.data.input.impl.CsvInputFormat;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.InputStatsImpl;
import org.apache.druid.data.input.impl.JsonInputFormat;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.initialization.DruidModule;
@ -295,7 +297,8 @@ public class GoogleCloudStorageInputSourceTest extends InitializedNullHandlingTe
null
);
CloseableIterator<InputRow> iterator = reader.read();
final InputStats inputStats = new InputStatsImpl();
CloseableIterator<InputRow> iterator = reader.read(inputStats);
while (iterator.hasNext()) {
InputRow nextRow = iterator.next();
@ -303,6 +306,7 @@ public class GoogleCloudStorageInputSourceTest extends InitializedNullHandlingTe
Assert.assertEquals("hello", nextRow.getDimension("dim1").get(0));
Assert.assertEquals("world", nextRow.getDimension("dim2").get(0));
}
Assert.assertEquals(2 * CONTENT.length, inputStats.getProcessedBytes());
}
@Test
@ -339,7 +343,8 @@ public class GoogleCloudStorageInputSourceTest extends InitializedNullHandlingTe
null
);
CloseableIterator<InputRow> iterator = reader.read();
final InputStats inputStats = new InputStatsImpl();
CloseableIterator<InputRow> iterator = reader.read(inputStats);
while (iterator.hasNext()) {
InputRow nextRow = iterator.next();
@ -347,6 +352,7 @@ public class GoogleCloudStorageInputSourceTest extends InitializedNullHandlingTe
Assert.assertEquals("hello", nextRow.getDimension("dim1").get(0));
Assert.assertEquals("world", nextRow.getDimension("dim2").get(0));
}
Assert.assertEquals(2 * CONTENT.length, inputStats.getProcessedBytes());
}
private static void addExpectedPrefixObjects(URI prefix, List<URI> uris) throws IOException

View File

@ -31,9 +31,11 @@ import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.InputSource;
import org.apache.druid.data.input.InputSourceReader;
import org.apache.druid.data.input.InputSplit;
import org.apache.druid.data.input.InputStats;
import org.apache.druid.data.input.MaxSizeSplitHintSpec;
import org.apache.druid.data.input.impl.CsvInputFormat;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.InputStatsImpl;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.storage.hdfs.HdfsStorageDruidModule;
@ -59,6 +61,7 @@ import java.io.OutputStreamWriter;
import java.io.UncheckedIOException;
import java.io.Writer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
@ -259,11 +262,13 @@ public class HdfsInputSourceTest extends InitializedNullHandlingTest
private HdfsInputSource target;
private Set<Path> paths;
private Map<Long, String> timestampToValue;
private List<String> fileContents;
@Before
public void setup() throws IOException
{
timestampToValue = new HashMap<>();
fileContents = new ArrayList<>();
File dir = temporaryFolder.getRoot();
Configuration configuration = new Configuration(true);
@ -276,11 +281,10 @@ public class HdfsInputSourceTest extends InitializedNullHandlingTest
i -> {
char value = ALPHABET.charAt(i % ALPHABET.length());
timestampToValue.put((long) i, Character.toString(value));
return createFile(
fileSystem,
String.valueOf(i),
i + KEY_VALUE_SEPARATOR + value
);
final String contents = i + KEY_VALUE_SEPARATOR + value;
fileContents.add(contents);
return createFile(fileSystem, String.valueOf(i), contents);
}
)
.collect(Collectors.toSet());
@ -319,9 +323,10 @@ public class HdfsInputSourceTest extends InitializedNullHandlingTest
public void readsSplitsCorrectly() throws IOException
{
InputSourceReader reader = target.formattableReader(INPUT_ROW_SCHEMA, INPUT_FORMAT, null);
final InputStats inputStats = new InputStatsImpl();
Map<Long, String> actualTimestampToValue = new HashMap<>();
try (CloseableIterator<InputRow> iterator = reader.read()) {
try (CloseableIterator<InputRow> iterator = reader.read(inputStats)) {
while (iterator.hasNext()) {
InputRow row = iterator.next();
actualTimestampToValue.put(row.getTimestampFromEpoch(), row.getDimension(COLUMN).get(0));
@ -329,6 +334,9 @@ public class HdfsInputSourceTest extends InitializedNullHandlingTest
}
Assert.assertEquals(timestampToValue, actualTimestampToValue);
long totalFileSize = fileContents.stream().mapToLong(String::length).sum();
Assert.assertEquals(totalFileSize, inputStats.getProcessedBytes());
}
@Test
@ -395,10 +403,12 @@ public class HdfsInputSourceTest extends InitializedNullHandlingTest
public void readsSplitsCorrectly() throws IOException
{
InputSourceReader reader = target.formattableReader(INPUT_ROW_SCHEMA, INPUT_FORMAT, null);
final InputStats inputStats = new InputStatsImpl();
try (CloseableIterator<InputRow> iterator = reader.read()) {
try (CloseableIterator<InputRow> iterator = reader.read(inputStats)) {
Assert.assertFalse(iterator.hasNext());
}
Assert.assertEquals(0, inputStats.getProcessedBytes());
}
@Test

View File

@ -22,6 +22,7 @@ package org.apache.druid.msq.indexing;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowListPlusRawValues;
import org.apache.druid.data.input.InputSourceReader;
import org.apache.druid.data.input.InputStats;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.msq.counters.ChannelCounters;
@ -42,9 +43,9 @@ public class CountableInputSourceReader implements InputSourceReader
}
@Override
public CloseableIterator<InputRow> read() throws IOException
public CloseableIterator<InputRow> read(InputStats inputStats) throws IOException
{
return inputSourceReader.read().map(inputRow -> {
return inputSourceReader.read(inputStats).map(inputRow -> {
channelCounters.incrementRowCount();
return inputRow;
});

View File

@ -27,6 +27,7 @@ import org.apache.druid.data.input.InputRowListPlusRawValues;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.InputSource;
import org.apache.druid.data.input.InputSourceReader;
import org.apache.druid.data.input.InputStats;
import org.apache.druid.java.util.common.CloseableIterators;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
@ -75,7 +76,7 @@ public class NilInputSource implements InputSource
return new InputSourceReader()
{
@Override
public CloseableIterator<InputRow> read()
public CloseableIterator<InputRow> read(InputStats inputStats)
{
return CloseableIterators.wrap(Collections.emptyIterator(), () -> {});
}

View File

@ -53,10 +53,12 @@ import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.InputSourceReader;
import org.apache.druid.data.input.InputSplit;
import org.apache.druid.data.input.InputStats;
import org.apache.druid.data.input.MaxSizeSplitHintSpec;
import org.apache.druid.data.input.impl.CloudObjectLocation;
import org.apache.druid.data.input.impl.CsvInputFormat;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.InputStatsImpl;
import org.apache.druid.data.input.impl.JsonInputFormat;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.initialization.DruidModule;
@ -942,8 +944,10 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
temporaryFolder.newFolder()
);
CloseableIterator<InputRow> iterator = reader.read();
final InputStats inputStats = new InputStatsImpl();
CloseableIterator<InputRow> iterator = reader.read(inputStats);
Assert.assertEquals(CONTENT.length, inputStats.getProcessedBytes());
while (iterator.hasNext()) {
InputRow nextRow = iterator.next();
Assert.assertEquals(NOW, nextRow.getTimestamp());
@ -1036,8 +1040,10 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
temporaryFolder.newFolder()
);
CloseableIterator<InputRow> iterator = reader.read();
final InputStats inputStats = new InputStatsImpl();
CloseableIterator<InputRow> iterator = reader.read(inputStats);
Assert.assertEquals(CONTENT.length, inputStats.getProcessedBytes());
while (iterator.hasNext()) {
InputRow nextRow = iterator.next();
Assert.assertEquals(NOW, nextRow.getTimestamp());

View File

@ -34,6 +34,7 @@ public class DropwizardRowIngestionMeters implements RowIngestionMeters
private static final String FIFTEEN_MINUTE_NAME = "15m";
private final Meter processed;
private final Meter processedBytes;
private final Meter processedWithError;
private final Meter unparseable;
private final Meter thrownAway;
@ -42,6 +43,7 @@ public class DropwizardRowIngestionMeters implements RowIngestionMeters
{
MetricRegistry metricRegistry = new MetricRegistry();
this.processed = metricRegistry.meter(PROCESSED);
this.processedBytes = metricRegistry.meter(PROCESSED_BYTES);
this.processedWithError = metricRegistry.meter(PROCESSED_WITH_ERROR);
this.unparseable = metricRegistry.meter(UNPARSEABLE);
this.thrownAway = metricRegistry.meter(THROWN_AWAY);
@ -59,6 +61,18 @@ public class DropwizardRowIngestionMeters implements RowIngestionMeters
processed.mark();
}
@Override
public long getProcessedBytes()
{
return processedBytes.getCount();
}
@Override
public void incrementProcessedBytes(long incrementByValue)
{
processedBytes.mark(incrementByValue);
}
@Override
public long getProcessedWithError()
{
@ -100,6 +114,7 @@ public class DropwizardRowIngestionMeters implements RowIngestionMeters
{
return new RowIngestionMetersTotals(
processed.getCount(),
processedBytes.getCount(),
processedWithError.getCount(),
thrownAway.getCount(),
unparseable.getCount()
@ -113,18 +128,21 @@ public class DropwizardRowIngestionMeters implements RowIngestionMeters
Map<String, Object> oneMinute = new HashMap<>();
oneMinute.put(PROCESSED, processed.getOneMinuteRate());
oneMinute.put(PROCESSED_BYTES, processedBytes.getOneMinuteRate());
oneMinute.put(PROCESSED_WITH_ERROR, processedWithError.getOneMinuteRate());
oneMinute.put(UNPARSEABLE, unparseable.getOneMinuteRate());
oneMinute.put(THROWN_AWAY, thrownAway.getOneMinuteRate());
Map<String, Object> fiveMinute = new HashMap<>();
fiveMinute.put(PROCESSED, processed.getFiveMinuteRate());
fiveMinute.put(PROCESSED_BYTES, processedBytes.getFiveMinuteRate());
fiveMinute.put(PROCESSED_WITH_ERROR, processedWithError.getFiveMinuteRate());
fiveMinute.put(UNPARSEABLE, unparseable.getFiveMinuteRate());
fiveMinute.put(THROWN_AWAY, thrownAway.getFiveMinuteRate());
Map<String, Object> fifteenMinute = new HashMap<>();
fifteenMinute.put(PROCESSED, processed.getFifteenMinuteRate());
fifteenMinute.put(PROCESSED_BYTES, processedBytes.getFifteenMinuteRate());
fifteenMinute.put(PROCESSED_WITH_ERROR, processedWithError.getFifteenMinuteRate());
fifteenMinute.put(UNPARSEABLE, unparseable.getFifteenMinuteRate());
fifteenMinute.put(THROWN_AWAY, thrownAway.getFifteenMinuteRate());

View File

@ -60,7 +60,7 @@ public class TaskRealtimeMetricsMonitor extends AbstractMonitor
this.rowIngestionMeters = rowIngestionMeters;
this.dimensions = ImmutableMap.copyOf(dimensions);
previousFireDepartmentMetrics = new FireDepartmentMetrics();
previousRowIngestionMetersTotals = new RowIngestionMetersTotals(0, 0, 0, 0);
previousRowIngestionMetersTotals = new RowIngestionMetersTotals(0, 0, 0, 0, 0);
}
@Override

View File

@ -217,7 +217,7 @@ public abstract class AbstractBatchIndexTask extends AbstractTask
)
);
return new FilteringCloseableInputRowIterator(
inputSourceReader.read(),
inputSourceReader.read(ingestionMeters),
rowFilter,
ingestionMeters,
parseExceptionHandler

View File

@ -64,10 +64,10 @@ import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.rpc.HttpResponseException;
import org.apache.druid.rpc.indexing.OverlordClient;
import org.apache.druid.segment.incremental.MutableRowIngestionMeters;
import org.apache.druid.segment.incremental.ParseExceptionReport;
import org.apache.druid.segment.incremental.RowIngestionMeters;
import org.apache.druid.segment.incremental.RowIngestionMetersTotals;
import org.apache.druid.segment.incremental.SimpleRowIngestionMeters;
import org.apache.druid.segment.indexing.TuningConfig;
import org.apache.druid.segment.indexing.granularity.ArbitraryGranularitySpec;
import org.apache.druid.segment.indexing.granularity.GranularitySpec;
@ -1214,10 +1214,8 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
*/
private Map<String, TaskReport> getTaskCompletionReports(TaskStatus taskStatus, boolean segmentAvailabilityConfirmed)
{
Pair<Map<String, Object>, Map<String, Object>> rowStatsAndUnparseableEvents = doGetRowStatsAndUnparseableEvents(
"true",
true
);
Pair<Map<String, Object>, Map<String, Object>> rowStatsAndUnparseableEvents =
doGetRowStatsAndUnparseableEvents("true", true);
return TaskReport.buildTaskReports(
new IngestionStatsAndErrorsTaskReport(
getId(),
@ -1538,6 +1536,7 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
Map<String, Object> buildSegmentsRowStatsMap = (Map<String, Object>) buildSegmentsRowStats;
return new RowIngestionMetersTotals(
((Number) buildSegmentsRowStatsMap.get("processed")).longValue(),
((Number) buildSegmentsRowStatsMap.get("processedBytes")).longValue(),
((Number) buildSegmentsRowStatsMap.get("processedWithError")).longValue(),
((Number) buildSegmentsRowStatsMap.get("thrownAway")).longValue(),
((Number) buildSegmentsRowStatsMap.get("unparseable")).longValue()
@ -1553,7 +1552,7 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
boolean includeUnparseable
)
{
final MutableRowIngestionMeters buildSegmentsRowStats = new MutableRowIngestionMeters();
final SimpleRowIngestionMeters buildSegmentsRowStats = new SimpleRowIngestionMeters();
List<ParseExceptionReport> unparseableEvents = new ArrayList<>();
@ -1594,7 +1593,7 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
Map<String, GeneratedPartitionsReport> completedSubtaskReports =
(Map<String, GeneratedPartitionsReport>) currentRunner.getReports();
final MutableRowIngestionMeters buildSegmentsRowStats = new MutableRowIngestionMeters();
final SimpleRowIngestionMeters buildSegmentsRowStats = new SimpleRowIngestionMeters();
final List<ParseExceptionReport> unparseableEvents = new ArrayList<>();
for (GeneratedPartitionsReport generatedPartitionsReport : completedSubtaskReports.values()) {
Map<String, TaskReport> taskReport = generatedPartitionsReport.getTaskReport();
@ -1625,7 +1624,7 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
boolean includeUnparseable
)
{
final MutableRowIngestionMeters buildSegmentsRowStats = new MutableRowIngestionMeters();
final SimpleRowIngestionMeters buildSegmentsRowStats = new SimpleRowIngestionMeters();
for (String runningTaskId : runningTaskIds) {
try {
final Map<String, Object> report = getTaskReport(toolbox.getOverlordClient(), runningTaskId);
@ -1690,7 +1689,10 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
return totals;
}
private Pair<Map<String, Object>, Map<String, Object>> doGetRowStatsAndUnparseableEvents(String full, boolean includeUnparseable)
private Pair<Map<String, Object>, Map<String, Object>> doGetRowStatsAndUnparseableEvents(
String full,
boolean includeUnparseable
)
{
if (currentSubTaskHolder == null) {
return Pair.of(ImmutableMap.of(), ImmutableMap.of());

View File

@ -20,6 +20,7 @@
package org.apache.druid.indexing.input;
import com.google.common.base.Preconditions;
import org.apache.druid.data.input.BytesCountingInputEntity;
import org.apache.druid.data.input.InputEntity;
import org.apache.druid.data.input.InputEntityReader;
import org.apache.druid.data.input.InputFormat;
@ -56,19 +57,25 @@ public class DruidSegmentInputFormat implements InputFormat
File temporaryDirectory
)
{
final InputEntity baseInputEntity;
if (source instanceof BytesCountingInputEntity) {
baseInputEntity = ((BytesCountingInputEntity) source).getBaseInputEntity();
} else {
baseInputEntity = source;
}
// this method handles the case when the entity comes from a tombstone or from a regular segment
Preconditions.checkArgument(
source instanceof DruidSegmentInputEntity,
baseInputEntity instanceof DruidSegmentInputEntity,
DruidSegmentInputEntity.class.getName() + " required, but "
+ source.getClass().getName() + " provided."
+ baseInputEntity.getClass().getName() + " provided."
);
final InputEntityReader retVal;
// Cast is safe here because of the precondition above passed
if (((DruidSegmentInputEntity) source).isFromTombstone()) {
retVal = new DruidTombstoneSegmentReader(source);
final DruidSegmentInputEntity druidSegmentEntity = (DruidSegmentInputEntity) baseInputEntity;
if (druidSegmentEntity.isFromTombstone()) {
return new DruidTombstoneSegmentReader(druidSegmentEntity);
} else {
retVal = new DruidSegmentReader(
return new DruidSegmentReader(
source,
indexIO,
inputRowSchema.getTimestampSpec(),
@ -78,6 +85,5 @@ public class DruidSegmentInputFormat implements InputFormat
temporaryDirectory
);
}
return retVal;
}
}

View File

@ -23,6 +23,7 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Supplier;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import org.apache.druid.data.input.BytesCountingInputEntity;
import org.apache.druid.data.input.ColumnsFilter;
import org.apache.druid.data.input.InputEntity;
import org.apache.druid.data.input.InputEntity.CleanableFile;
@ -57,6 +58,7 @@ import org.apache.druid.segment.filter.Filters;
import org.apache.druid.segment.realtime.firehose.WindowedStorageAdapter;
import org.apache.druid.utils.CloseableUtils;
import org.apache.druid.utils.CollectionUtils;
import org.joda.time.Interval;
import java.io.File;
import java.io.IOException;
@ -71,12 +73,13 @@ import java.util.Set;
public class DruidSegmentReader extends IntermediateRowParsingReader<Map<String, Object>>
{
private DruidSegmentInputEntity source;
private final InputEntity source;
private final IndexIO indexIO;
private final ColumnsFilter columnsFilter;
private final InputRowSchema inputRowSchema;
private final DimFilter dimFilter;
private final File temporaryDirectory;
private final Interval intervalFilter;
DruidSegmentReader(
final InputEntity source,
@ -88,7 +91,7 @@ public class DruidSegmentReader extends IntermediateRowParsingReader<Map<String,
final File temporaryDirectory
)
{
this.source = (DruidSegmentInputEntity) source;
this.source = source;
this.indexIO = indexIO;
this.columnsFilter = columnsFilter;
this.inputRowSchema = new InputRowSchema(
@ -98,6 +101,14 @@ public class DruidSegmentReader extends IntermediateRowParsingReader<Map<String,
);
this.dimFilter = dimFilter;
this.temporaryDirectory = temporaryDirectory;
final InputEntity baseInputEntity;
if (source instanceof BytesCountingInputEntity) {
baseInputEntity = ((BytesCountingInputEntity) source).getBaseInputEntity();
} else {
baseInputEntity = source;
}
this.intervalFilter = ((DruidSegmentInputEntity) baseInputEntity).getIntervalFilter();
}
@Override
@ -108,7 +119,7 @@ public class DruidSegmentReader extends IntermediateRowParsingReader<Map<String,
new QueryableIndexStorageAdapter(
indexIO.loadIndex(segmentFile.file())
),
source.getIntervalFilter()
intervalFilter
);
final Sequence<Cursor> cursors = storageAdapter.getAdapter().makeCursors(

View File

@ -20,10 +20,9 @@
package org.apache.druid.indexing.input;
import com.google.common.annotations.VisibleForTesting;
import org.apache.druid.data.input.InputEntity;
import com.google.common.base.Preconditions;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.IntermediateRowParsingReader;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import java.util.List;
@ -35,16 +34,14 @@ import java.util.NoSuchElementException;
*/
public class DruidTombstoneSegmentReader extends IntermediateRowParsingReader<Map<String, Object>>
{
private DruidSegmentInputEntity source;
public DruidTombstoneSegmentReader(
InputEntity source
DruidSegmentInputEntity source
)
{
this.source = (DruidSegmentInputEntity) source;
if (!this.source.isFromTombstone()) {
throw new IAE("DruidSegmentInputEntity must be created from a tombstone but is not.");
}
Preconditions.checkArgument(
source.isFromTombstone(),
"DruidSegmentInputEntity must be created from a tombstone."
);
}
@Override

View File

@ -30,6 +30,7 @@ import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.InputSource;
import org.apache.druid.data.input.InputSourceReader;
import org.apache.druid.data.input.InputSplit;
import org.apache.druid.data.input.InputStats;
import org.apache.druid.data.input.MapBasedInputRow;
import org.apache.druid.data.input.SplitHintSpec;
import org.apache.druid.data.input.impl.SplittableInputSource;
@ -148,7 +149,7 @@ public class GeneratorInputSource extends AbstractInputSource implements Splitta
return new InputSourceReader()
{
@Override
public CloseableIterator<InputRow> read()
public CloseableIterator<InputRow> read(InputStats inputStats)
{
return CloseableIterators.withEmptyBaggage(new Iterator<InputRow>()
{

View File

@ -108,11 +108,13 @@ class StreamChunkParser<RecordType extends ByteEntity>
}
}
private List<InputRow> parseWithParser(InputRowParser<ByteBuffer> parser, List<? extends ByteEntity> valueBytess)
private List<InputRow> parseWithParser(InputRowParser<ByteBuffer> parser, List<? extends ByteEntity> valueBytes)
{
final FluentIterable<InputRow> iterable = FluentIterable
.from(valueBytess)
.transformAndConcat(bytes -> parser.parseBatch(bytes.getBuffer()));
.from(valueBytes)
.transform(ByteEntity::getBuffer)
.transform(this::incrementProcessedBytes)
.transformAndConcat(parser::parseBatch);
final FilteringCloseableInputRowIterator rowIterator = new FilteringCloseableInputRowIterator(
CloseableIterators.withEmptyBaggage(iterable.iterator()),
@ -123,6 +125,16 @@ class StreamChunkParser<RecordType extends ByteEntity>
return Lists.newArrayList(rowIterator);
}
/**
* Increments the processed bytes with the number of bytes remaining in the
* given buffer. This method must be called before reading the buffer.
*/
private ByteBuffer incrementProcessedBytes(final ByteBuffer recordByteBuffer)
{
rowIngestionMeters.incrementProcessedBytes(recordByteBuffer.remaining());
return recordByteBuffer;
}
private List<InputRow> parseWithInputFormat(
SettableByteEntityReader byteEntityReader,
List<? extends ByteEntity> valueBytess
@ -130,6 +142,7 @@ class StreamChunkParser<RecordType extends ByteEntity>
{
final List<InputRow> rows = new ArrayList<>();
for (ByteEntity valueBytes : valueBytess) {
rowIngestionMeters.incrementProcessedBytes(valueBytes.getBuffer().remaining());
byteEntityReader.setEntity(valueBytes);
try (FilteringCloseableInputRowIterator rowIterator = new FilteringCloseableInputRowIterator(
byteEntityReader.read(),

View File

@ -147,7 +147,6 @@ import java.util.ArrayDeque;
import java.util.Arrays;
import java.util.Collection;
import java.util.Deque;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@ -158,7 +157,6 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
public class AppenderatorDriverRealtimeIndexTaskTest extends InitializedNullHandlingTest
{
@ -690,25 +688,17 @@ public class AppenderatorDriverRealtimeIndexTaskTest extends InitializedNullHand
IngestionStatsAndErrorsTaskReportData reportData = getTaskReportData();
List<LinkedHashMap> parseExceptionReports = (List<LinkedHashMap>) reportData
.getUnparseableEvents()
.get(RowIngestionMeters.BUILD_SEGMENTS);
ParseExceptionReport parseExceptionReport =
ParseExceptionReport.forPhase(reportData, RowIngestionMeters.BUILD_SEGMENTS);
List<String> expectedMessages = ImmutableList.of(
"Unable to parse value[foo] for field[met1]"
);
List<String> actualMessages = parseExceptionReports.stream().map((r) -> {
return ((List<String>) r.get("details")).get(0);
}).collect(Collectors.toList());
Assert.assertEquals(expectedMessages, actualMessages);
Assert.assertEquals(expectedMessages, parseExceptionReport.getErrorMessages());
List<String> expectedInputs = ImmutableList.of(
"{t=3000000, dim1=foo, met1=foo}"
);
List<String> actualInputs = parseExceptionReports.stream().map((r) -> {
return (String) r.get("input");
}).collect(Collectors.toList());
Assert.assertEquals(expectedInputs, actualInputs);
Assert.assertEquals(expectedInputs, parseExceptionReport.getInputs());
}
@Test(timeout = 60_000L)
@ -792,6 +782,7 @@ public class AppenderatorDriverRealtimeIndexTaskTest extends InitializedNullHand
RowIngestionMeters.BUILD_SEGMENTS,
ImmutableMap.of(
RowIngestionMeters.PROCESSED, 2,
RowIngestionMeters.PROCESSED_BYTES, 0,
RowIngestionMeters.PROCESSED_WITH_ERROR, 1,
RowIngestionMeters.UNPARSEABLE, 2,
RowIngestionMeters.THROWN_AWAY, 0
@ -895,6 +886,7 @@ public class AppenderatorDriverRealtimeIndexTaskTest extends InitializedNullHand
RowIngestionMeters.BUILD_SEGMENTS,
ImmutableMap.of(
RowIngestionMeters.PROCESSED, 2,
RowIngestionMeters.PROCESSED_BYTES, 0,
RowIngestionMeters.PROCESSED_WITH_ERROR, 2,
RowIngestionMeters.UNPARSEABLE, 2,
RowIngestionMeters.THROWN_AWAY, 0
@ -906,12 +898,10 @@ public class AppenderatorDriverRealtimeIndexTaskTest extends InitializedNullHand
Assert.assertEquals(TaskState.SUCCESS, taskStatus.getStatusCode());
IngestionStatsAndErrorsTaskReportData reportData = getTaskReportData();
Assert.assertEquals(expectedMetrics, reportData.getRowStats());
List<LinkedHashMap> parseExceptionReports = (List<LinkedHashMap>) reportData
.getUnparseableEvents()
.get(RowIngestionMeters.BUILD_SEGMENTS);
ParseExceptionReport parseExceptionReport =
ParseExceptionReport.forPhase(reportData, RowIngestionMeters.BUILD_SEGMENTS);
List<String> expectedMessages = Arrays.asList(
"Timestamp[null] is unparseable! Event: {dim1=foo, met1=2.0, __fail__=x}",
@ -919,10 +909,7 @@ public class AppenderatorDriverRealtimeIndexTaskTest extends InitializedNullHand
"Unable to parse value[foo] for field[met1]",
"Timestamp[null] is unparseable! Event: null"
);
List<String> actualMessages = parseExceptionReports.stream().map((r) -> {
return ((List<String>) r.get("details")).get(0);
}).collect(Collectors.toList());
Assert.assertEquals(expectedMessages, actualMessages);
Assert.assertEquals(expectedMessages, parseExceptionReport.getErrorMessages());
List<String> expectedInputs = Arrays.asList(
"{dim1=foo, met1=2.0, __fail__=x}",
@ -930,11 +917,7 @@ public class AppenderatorDriverRealtimeIndexTaskTest extends InitializedNullHand
"{t=1521251960729, dim1=foo, met1=foo}",
null
);
List<String> actualInputs = parseExceptionReports.stream().map((r) -> {
return (String) r.get("input");
}).collect(Collectors.toList());
Assert.assertEquals(expectedInputs, actualInputs);
Assert.assertEquals(expectedInputs, parseExceptionReport.getInputs());
Assert.assertEquals(IngestionState.COMPLETED, reportData.getIngestionState());
}
@ -1000,6 +983,7 @@ public class AppenderatorDriverRealtimeIndexTaskTest extends InitializedNullHand
RowIngestionMeters.BUILD_SEGMENTS,
ImmutableMap.of(
RowIngestionMeters.PROCESSED, 1,
RowIngestionMeters.PROCESSED_BYTES, 0,
RowIngestionMeters.PROCESSED_WITH_ERROR, 2,
RowIngestionMeters.UNPARSEABLE, 2,
RowIngestionMeters.THROWN_AWAY, 0
@ -1007,9 +991,8 @@ public class AppenderatorDriverRealtimeIndexTaskTest extends InitializedNullHand
);
Assert.assertEquals(expectedMetrics, reportData.getRowStats());
List<LinkedHashMap> parseExceptionReports = (List<LinkedHashMap>) reportData
.getUnparseableEvents()
.get(RowIngestionMeters.BUILD_SEGMENTS);
ParseExceptionReport parseExceptionReport =
ParseExceptionReport.forPhase(reportData, RowIngestionMeters.BUILD_SEGMENTS);
List<String> expectedMessages = ImmutableList.of(
"Timestamp[null] is unparseable! Event: {dim1=foo, met1=2.0, __fail__=x}",
@ -1017,10 +1000,7 @@ public class AppenderatorDriverRealtimeIndexTaskTest extends InitializedNullHand
"Unable to parse value[foo] for field[met1]",
"Timestamp[null] is unparseable! Event: null"
);
List<String> actualMessages = parseExceptionReports.stream().map((r) -> {
return ((List<String>) r.get("details")).get(0);
}).collect(Collectors.toList());
Assert.assertEquals(expectedMessages, actualMessages);
Assert.assertEquals(expectedMessages, parseExceptionReport.getErrorMessages());
List<String> expectedInputs = Arrays.asList(
"{dim1=foo, met1=2.0, __fail__=x}",
@ -1028,12 +1008,7 @@ public class AppenderatorDriverRealtimeIndexTaskTest extends InitializedNullHand
"{t=1521251960729, dim1=foo, met1=foo}",
null
);
List<String> actualInputs = parseExceptionReports.stream().map((r) -> {
return (String) r.get("input");
}).collect(Collectors.toList());
Assert.assertEquals(expectedInputs, actualInputs);
Assert.assertEquals(expectedInputs, parseExceptionReport.getInputs());
Assert.assertEquals(IngestionState.BUILD_SEGMENTS, reportData.getIngestionState());
}
@ -1272,6 +1247,7 @@ public class AppenderatorDriverRealtimeIndexTaskTest extends InitializedNullHand
ImmutableMap.of(
RowIngestionMeters.PROCESSED_WITH_ERROR, 0,
RowIngestionMeters.PROCESSED, 0,
RowIngestionMeters.PROCESSED_BYTES, 0,
RowIngestionMeters.UNPARSEABLE, 0,
RowIngestionMeters.THROWN_AWAY, 0
)

View File

@ -52,7 +52,6 @@ import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervi
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTuningConfig;
import org.apache.druid.indexing.firehose.WindowedSegmentId;
import org.apache.druid.indexing.input.DruidInputSource;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.query.aggregation.AggregatorFactory;
@ -62,7 +61,7 @@ import org.apache.druid.query.filter.SelectorDimFilter;
import org.apache.druid.segment.SegmentUtils;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.segment.loading.SegmentCacheManager;
import org.apache.druid.segment.loading.NoopSegmentCacheManager;
import org.apache.druid.timeline.CompactionState;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.DimensionRangeShardSpec;
@ -94,7 +93,6 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ExecutorService;
@RunWith(Parameterized.class)
public class CompactionTaskParallelRunTest extends AbstractParallelIndexSupervisorTaskTest
@ -880,46 +878,7 @@ public class CompactionTaskParallelRunTest extends AbstractParallelIndexSupervis
return baseToolbox;
} else {
return new TaskToolbox.Builder(baseToolbox)
.segmentCacheManager(
new SegmentCacheManager()
{
@Override
public boolean isSegmentCached(DataSegment segment)
{
throw new ISE("Expected no segment fetches by the compaction task");
}
@Override
public File getSegmentFiles(DataSegment segment)
{
throw new ISE("Expected no segment fetches by the compaction task");
}
@Override
public boolean reserve(DataSegment segment)
{
throw new ISE("Expected no segment fetches by the compaction task");
}
@Override
public boolean release(DataSegment segment)
{
throw new ISE("Expected no segment fetches by the compaction task");
}
@Override
public void cleanup(DataSegment segment)
{
throw new ISE("Expected no segment fetches by the compaction task");
}
@Override
public void loadSegmentIntoPageCache(DataSegment segment, ExecutorService exec)
{
throw new ISE("Expected no segment fetches by the compaction task");
}
}
)
.segmentCacheManager(new NoopSegmentCacheManager())
.build();
}
}

View File

@ -118,6 +118,7 @@ import org.apache.druid.segment.indexing.RealtimeTuningConfig;
import org.apache.druid.segment.indexing.TuningConfig;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.segment.join.NoopJoinableFactory;
import org.apache.druid.segment.loading.NoopSegmentCacheManager;
import org.apache.druid.segment.loading.SegmentCacheManager;
import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
@ -151,7 +152,6 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@ -1975,43 +1975,19 @@ public class CompactionTaskTest
Map<DataSegment, File> segments
)
{
final SegmentCacheManager segmentCacheManager = new SegmentCacheManager()
final SegmentCacheManager segmentCacheManager = new NoopSegmentCacheManager()
{
@Override
public boolean isSegmentCached(DataSegment segment)
{
throw new UnsupportedOperationException();
}
@Override
public File getSegmentFiles(DataSegment segment)
{
return Preconditions.checkNotNull(segments.get(segment));
}
@Override
public boolean reserve(DataSegment segment)
{
throw new UnsupportedOperationException();
}
@Override
public boolean release(DataSegment segment)
{
throw new UnsupportedOperationException();
}
@Override
public void cleanup(DataSegment segment)
{
// Do nothing.
}
@Override
public void loadSegmentIntoPageCache(DataSegment segment, ExecutorService exec)
{
throw new UnsupportedOperationException();
}
};
return new TaskToolbox.Builder()

View File

@ -123,13 +123,11 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@RunWith(Parameterized.class)
public class IndexTaskTest extends IngestionTestBase
@ -1521,22 +1519,12 @@ public class IndexTaskTest extends IngestionTestBase
IngestionStatsAndErrorsTaskReportData reportData = getTaskReportData();
List<LinkedHashMap> parseExceptionReports = (List<LinkedHashMap>) reportData
.getUnparseableEvents()
.get(RowIngestionMeters.BUILD_SEGMENTS);
ParseExceptionReport parseExceptionReport =
ParseExceptionReport.forPhase(reportData, RowIngestionMeters.BUILD_SEGMENTS);
Assert.assertEquals(expectedMessages, parseExceptionReport.getErrorMessages());
List<String> actualMessages = parseExceptionReports.stream().map((r) -> {
return ((List<String>) r.get("details")).get(0);
}).collect(Collectors.toList());
Assert.assertEquals(expectedMessages, actualMessages);
List<String> expectedInputs = ImmutableList.of(
"{time=unparseable, d=a, val=1}"
);
List<String> actualInputs = parseExceptionReports.stream().map((r) -> {
return (String) r.get("input");
}).collect(Collectors.toList());
Assert.assertEquals(expectedInputs, actualInputs);
List<String> expectedInputs = ImmutableList.of("{time=unparseable, d=a, val=1}");
Assert.assertEquals(expectedInputs, parseExceptionReport.getInputs());
}
@Test
@ -1633,32 +1621,33 @@ public class IndexTaskTest extends IngestionTestBase
TaskStatus status = runTask(indexTask).lhs;
Assert.assertEquals(TaskState.SUCCESS, status.getStatusCode());
Assert.assertEquals(null, status.getErrorMsg());
Assert.assertNull(status.getErrorMsg());
IngestionStatsAndErrorsTaskReportData reportData = getTaskReportData();
final int processedBytes = useInputFormatApi ? 657 : 0;
Map<String, Object> expectedMetrics = ImmutableMap.of(
RowIngestionMeters.DETERMINE_PARTITIONS,
ImmutableMap.of(
RowIngestionMeters.PROCESSED_WITH_ERROR, 0,
RowIngestionMeters.PROCESSED, 4,
RowIngestionMeters.PROCESSED_BYTES, processedBytes,
RowIngestionMeters.UNPARSEABLE, 4,
RowIngestionMeters.THROWN_AWAY, 1
),
RowIngestionMeters.BUILD_SEGMENTS,
ImmutableMap.of(
RowIngestionMeters.PROCESSED_WITH_ERROR, 3,
RowIngestionMeters.PROCESSED, 1,
RowIngestionMeters.PROCESSED_BYTES, processedBytes,
RowIngestionMeters.UNPARSEABLE, 4,
RowIngestionMeters.THROWN_AWAY, 1
)
);
Assert.assertEquals(expectedMetrics, reportData.getRowStats());
List<LinkedHashMap> parseExceptionReports = (List<LinkedHashMap>) reportData
.getUnparseableEvents()
.get(RowIngestionMeters.BUILD_SEGMENTS);
ParseExceptionReport parseExceptionReport =
ParseExceptionReport.forPhase(reportData, RowIngestionMeters.BUILD_SEGMENTS);
List<String> expectedMessages;
if (useInputFormatApi) {
@ -1692,10 +1681,7 @@ public class IndexTaskTest extends IngestionTestBase
);
}
List<String> actualMessages = parseExceptionReports.stream().map((r) -> {
return ((List<String>) r.get("details")).get(0);
}).collect(Collectors.toList());
Assert.assertEquals(expectedMessages, actualMessages);
Assert.assertEquals(expectedMessages, parseExceptionReport.getErrorMessages());
List<String> expectedInputs = Arrays.asList(
"this is not JSON",
@ -1706,14 +1692,10 @@ public class IndexTaskTest extends IngestionTestBase
"{time=2014-01-01T00:00:10Z, dim=b, dimLong=notnumber, dimFloat=3.0, val=1}",
"{time=unparseable, dim=a, dimLong=2, dimFloat=3.0, val=1}"
);
List<String> actualInputs = parseExceptionReports.stream().map((r) -> {
return (String) r.get("input");
}).collect(Collectors.toList());
Assert.assertEquals(expectedInputs, actualInputs);
Assert.assertEquals(expectedInputs, parseExceptionReport.getInputs());
parseExceptionReports = (List<LinkedHashMap>) reportData
.getUnparseableEvents()
.get(RowIngestionMeters.DETERMINE_PARTITIONS);
parseExceptionReport =
ParseExceptionReport.forPhase(reportData, RowIngestionMeters.DETERMINE_PARTITIONS);
if (useInputFormatApi) {
expectedMessages = Arrays.asList(
@ -1740,10 +1722,7 @@ public class IndexTaskTest extends IngestionTestBase
);
}
actualMessages = parseExceptionReports.stream().map((r) -> {
return ((List<String>) r.get("details")).get(0);
}).collect(Collectors.toList());
Assert.assertEquals(expectedMessages, actualMessages);
Assert.assertEquals(expectedMessages, parseExceptionReport.getErrorMessages());
expectedInputs = Arrays.asList(
"this is not JSON",
@ -1751,11 +1730,7 @@ public class IndexTaskTest extends IngestionTestBase
"{\"time\":9.0x,\"dim\":\"a\",\"dimLong\":2,\"dimFloat\":3.0,\"val\":1}",
"{time=unparseable, dim=a, dimLong=2, dimFloat=3.0, val=1}"
);
actualInputs = parseExceptionReports.stream().map((r) -> {
return (String) r.get("input");
}).collect(Collectors.toList());
Assert.assertEquals(expectedInputs, actualInputs);
Assert.assertEquals(expectedInputs, parseExceptionReport.getInputs());
}
@Test
@ -1877,6 +1852,7 @@ public class IndexTaskTest extends IngestionTestBase
ImmutableMap.of(
RowIngestionMeters.PROCESSED_WITH_ERROR, 0,
RowIngestionMeters.PROCESSED, 0,
RowIngestionMeters.PROCESSED_BYTES, 0,
RowIngestionMeters.UNPARSEABLE, 0,
RowIngestionMeters.THROWN_AWAY, 0
),
@ -1884,6 +1860,7 @@ public class IndexTaskTest extends IngestionTestBase
ImmutableMap.of(
RowIngestionMeters.PROCESSED_WITH_ERROR, 0,
RowIngestionMeters.PROCESSED, 1,
RowIngestionMeters.PROCESSED_BYTES, useInputFormatApi ? 182 : 0,
RowIngestionMeters.UNPARSEABLE, 3,
RowIngestionMeters.THROWN_AWAY, useInputFormatApi ? 1 : 2
)
@ -1891,24 +1868,16 @@ public class IndexTaskTest extends IngestionTestBase
Assert.assertEquals(expectedMetrics, reportData.getRowStats());
List<LinkedHashMap> parseExceptionReports = (List<LinkedHashMap>) reportData
.getUnparseableEvents()
.get(RowIngestionMeters.BUILD_SEGMENTS);
List<String> actualMessages = parseExceptionReports.stream().map((r) -> {
return ((List<String>) r.get("details")).get(0);
}).collect(Collectors.toList());
Assert.assertEquals(expectedMessages, actualMessages);
ParseExceptionReport parseExceptionReport =
ParseExceptionReport.forPhase(reportData, RowIngestionMeters.BUILD_SEGMENTS);
Assert.assertEquals(expectedMessages, parseExceptionReport.getErrorMessages());
List<String> expectedInputs = Arrays.asList(
"{time=99999999999-01-01T00:00:10Z, dim=b, dimLong=2, dimFloat=3.0, val=1}",
"{time=9.0, dim=a, dimLong=2, dimFloat=3.0, val=1}",
"{time=unparseable, dim=a, dimLong=2, dimFloat=3.0, val=1}"
);
List<String> actualInputs = parseExceptionReports.stream().map((r) -> {
return (String) r.get("input");
}).collect(Collectors.toList());
Assert.assertEquals(expectedInputs, actualInputs);
Assert.assertEquals(expectedInputs, parseExceptionReport.getInputs());
}
@Test
@ -2021,6 +1990,7 @@ public class IndexTaskTest extends IngestionTestBase
ImmutableMap.of(
RowIngestionMeters.PROCESSED_WITH_ERROR, 0,
RowIngestionMeters.PROCESSED, 1,
RowIngestionMeters.PROCESSED_BYTES, useInputFormatApi ? 182 : 0,
RowIngestionMeters.UNPARSEABLE, 3,
RowIngestionMeters.THROWN_AWAY, useInputFormatApi ? 1 : 2
),
@ -2028,6 +1998,7 @@ public class IndexTaskTest extends IngestionTestBase
ImmutableMap.of(
RowIngestionMeters.PROCESSED_WITH_ERROR, 0,
RowIngestionMeters.PROCESSED, 0,
RowIngestionMeters.PROCESSED_BYTES, 0,
RowIngestionMeters.UNPARSEABLE, 0,
RowIngestionMeters.THROWN_AWAY, 0
)
@ -2035,24 +2006,16 @@ public class IndexTaskTest extends IngestionTestBase
Assert.assertEquals(expectedMetrics, reportData.getRowStats());
List<LinkedHashMap> parseExceptionReports = (List<LinkedHashMap>) reportData
.getUnparseableEvents()
.get(RowIngestionMeters.DETERMINE_PARTITIONS);
List<String> actualMessages = parseExceptionReports.stream().map((r) -> {
return ((List<String>) r.get("details")).get(0);
}).collect(Collectors.toList());
Assert.assertEquals(expectedMessages, actualMessages);
ParseExceptionReport parseExceptionReport =
ParseExceptionReport.forPhase(reportData, RowIngestionMeters.DETERMINE_PARTITIONS);
Assert.assertEquals(expectedMessages, parseExceptionReport.getErrorMessages());
List<String> expectedInputs = Arrays.asList(
"{time=99999999999-01-01T00:00:10Z, dim=b, dimLong=2, dimFloat=3.0, val=1}",
"{time=9.0, dim=a, dimLong=2, dimFloat=3.0, val=1}",
"{time=unparseable, dim=a, dimLong=2, dimFloat=3.0, val=1}"
);
List<String> actualInputs = parseExceptionReports.stream().map((r) -> {
return (String) r.get("input");
}).collect(Collectors.toList());
Assert.assertEquals(expectedInputs, actualInputs);
Assert.assertEquals(expectedInputs, parseExceptionReport.getInputs());
}
@Test
@ -2208,22 +2171,14 @@ public class IndexTaskTest extends IngestionTestBase
IngestionStatsAndErrorsTaskReportData reportData = getTaskReportData();
List<LinkedHashMap> parseExceptionReports = (List<LinkedHashMap>) reportData
.getUnparseableEvents()
.get(RowIngestionMeters.BUILD_SEGMENTS);
List<String> actualMessages = parseExceptionReports.stream().map((r) -> {
return ((List<String>) r.get("details")).get(0);
}).collect(Collectors.toList());
Assert.assertEquals(expectedMessages, actualMessages);
ParseExceptionReport parseExceptionReport =
ParseExceptionReport.forPhase(reportData, RowIngestionMeters.BUILD_SEGMENTS);
Assert.assertEquals(expectedMessages, parseExceptionReport.getErrorMessages());
List<String> expectedInputs = ImmutableList.of(
"{column_1=2014-01-01T00:00:10Z, column_2=a, column_3=1}"
);
List<String> actualInputs = parseExceptionReports.stream().map((r) -> {
return (String) r.get("input");
}).collect(Collectors.toList());
Assert.assertEquals(expectedInputs, actualInputs);
Assert.assertEquals(expectedInputs, parseExceptionReport.getInputs());
}
@Test

View File

@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.common.task;
import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
/**
* Utility class encapsulating a parse exception report used in tests.
*/
public class ParseExceptionReport
{
private final List<String> inputs;
private final List<String> errorMessages;
private ParseExceptionReport(List<String> inputs, List<String> errorMessages)
{
this.inputs = inputs;
this.errorMessages = errorMessages;
}
@SuppressWarnings("unchecked")
public static ParseExceptionReport forPhase(
IngestionStatsAndErrorsTaskReportData reportData,
String phase
)
{
List<LinkedHashMap<String, Object>> events =
(List<LinkedHashMap<String, Object>>) reportData.getUnparseableEvents().get(phase);
final List<String> inputs = new ArrayList<>();
final List<String> errorMessages = new ArrayList<>();
events.forEach(event -> {
inputs.add((String) event.get("input"));
errorMessages.add(((List<String>) event.get("details")).get(0));
});
return new ParseExceptionReport(inputs, errorMessages);
}
public List<String> getInputs()
{
return inputs;
}
public List<String> getErrorMessages()
{
return errorMessages;
}
}

View File

@ -828,6 +828,7 @@ public class AbstractParallelIndexSupervisorTaskTest extends IngestionTestBase
);
Map<String, Object> emptyAverageMinuteMap = ImmutableMap.of(
"processed", 0.0,
"processedBytes", 0.0,
"unparseable", 0.0,
"thrownAway", 0.0,
"processedWithError", 0.0

View File

@ -31,6 +31,7 @@ import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.segment.incremental.RowIngestionMetersTotals;
import org.apache.druid.segment.incremental.RowMeters;
import org.joda.time.Interval;
import org.junit.Before;
import org.junit.Ignore;
@ -132,13 +133,13 @@ public class MultiPhaseParallelIndexingRowStatsTest extends AbstractMultiPhasePa
false
);
final RowIngestionMetersTotals expectedTotals = new RowIngestionMetersTotals(200, 0, 0, 0);
final RowIngestionMetersTotals expectedTotals = RowMeters.with().totalProcessed(200);
final Map<String, Object> expectedReports =
maxNumConcurrentSubTasks <= 1
? buildExpectedTaskReportSequential(
task.getId(),
ImmutableList.of(),
new RowIngestionMetersTotals(0, 0, 0, 0),
RowMeters.with().totalProcessed(0),
expectedTotals
)
: buildExpectedTaskReportParallel(
@ -163,7 +164,6 @@ public class MultiPhaseParallelIndexingRowStatsTest extends AbstractMultiPhasePa
INTERVAL_TO_INDEX,
inputDir,
"test_*",
//new DimensionRangePartitionsSpec(targetRowsPerSegment, null, DIMS, false),
new SingleDimensionPartitionsSpec(targetRowsPerSegment, null, DIM1, false),
10,
false,
@ -172,7 +172,7 @@ public class MultiPhaseParallelIndexingRowStatsTest extends AbstractMultiPhasePa
Map<String, Object> expectedReports = buildExpectedTaskReportParallel(
task.getId(),
ImmutableList.of(),
new RowIngestionMetersTotals(200, 0, 0, 0)
new RowIngestionMetersTotals(200, 0, 0, 0, 0)
);
Map<String, Object> actualReports = runTaskAndGetReports(task, TaskState.SUCCESS);
compareTaskReports(expectedReports, actualReports);

View File

@ -437,6 +437,7 @@ public class SinglePhaseParallelIndexingTest extends AbstractParallelIndexSuperv
Collections.emptyList()
);
Map<String, Object> actualReports = task.doGetLiveReports("full");
final long processedBytes = useInputFormatApi ? 335 : 0;
Map<String, Object> expectedReports = buildExpectedTaskReportParallel(
task.getId(),
ImmutableList.of(
@ -453,11 +454,7 @@ public class SinglePhaseParallelIndexingTest extends AbstractParallelIndexSuperv
1L
)
),
new RowIngestionMetersTotals(
10,
1,
1,
1)
new RowIngestionMetersTotals(10, processedBytes, 1, 1, 1)
);
compareTaskReports(expectedReports, actualReports);
}
@ -492,12 +489,8 @@ public class SinglePhaseParallelIndexingTest extends AbstractParallelIndexSuperv
final ParallelIndexSupervisorTask executedTask = (ParallelIndexSupervisorTask) taskContainer.getTask();
Map<String, Object> actualReports = executedTask.doGetLiveReports("full");
RowIngestionMetersTotals expectedTotals = new RowIngestionMetersTotals(
10,
1,
1,
1
);
final long processedBytes = useInputFormatApi ? 335 : 0;
RowIngestionMetersTotals expectedTotals = new RowIngestionMetersTotals(10, processedBytes, 1, 1, 1);
List<ParseExceptionReport> expectedUnparseableEvents = ImmutableList.of(
new ParseExceptionReport(
"{ts=2017unparseable}",
@ -518,7 +511,7 @@ public class SinglePhaseParallelIndexingTest extends AbstractParallelIndexSuperv
expectedReports = buildExpectedTaskReportSequential(
task.getId(),
expectedUnparseableEvents,
new RowIngestionMetersTotals(0, 0, 0, 0),
new RowIngestionMetersTotals(0, 0, 0, 0, 0),
expectedTotals
);
} else {
@ -532,7 +525,6 @@ public class SinglePhaseParallelIndexingTest extends AbstractParallelIndexSuperv
}
compareTaskReports(expectedReports, actualReports);
System.out.println(actualReports);
}
@Test
@ -885,17 +877,6 @@ public class SinglePhaseParallelIndexingTest extends AbstractParallelIndexSuperv
boolean appendToExisting,
boolean splittableInputSource
)
{
return newTask(interval, segmentGranularity, appendToExisting, splittableInputSource, false);
}
private ParallelIndexSupervisorTask newTask(
@Nullable Interval interval,
Granularity segmentGranularity,
boolean appendToExisting,
boolean splittableInputSource,
boolean isReplace
)
{
return newTask(
interval,

View File

@ -24,19 +24,23 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.commons.lang.mutable.MutableBoolean;
import org.apache.druid.common.config.NullHandlingTest;
import org.apache.druid.data.input.BytesCountingInputEntity;
import org.apache.druid.data.input.ColumnsFilter;
import org.apache.druid.data.input.InputEntity;
import org.apache.druid.data.input.InputEntity.CleanableFile;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputStats;
import org.apache.druid.data.input.MapBasedInputRow;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.DoubleDimensionSchema;
import org.apache.druid.data.input.impl.FileEntity;
import org.apache.druid.data.input.impl.InputStatsImpl;
import org.apache.druid.data.input.impl.LongDimensionSchema;
import org.apache.druid.data.input.impl.StringDimensionSchema;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.hll.HyperLogLogCollector;
import org.apache.druid.hll.HyperLogLogHash;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.guava.BaseSequence;
import org.apache.druid.java.util.common.guava.BaseSequence.IteratorMaker;
@ -54,7 +58,7 @@ import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
import org.apache.druid.segment.loading.SegmentCacheManager;
import org.apache.druid.segment.loading.NoopSegmentCacheManager;
import org.apache.druid.segment.writeout.OnHeapMemorySegmentWriteOutMediumFactory;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.TombstoneShardSpec;
@ -68,11 +72,11 @@ import org.junit.rules.TemporaryFolder;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import static org.junit.Assert.assertThrows;
@ -82,11 +86,13 @@ public class DruidSegmentReaderTest extends NullHandlingTest
public TemporaryFolder temporaryFolder = new TemporaryFolder();
private File segmentDirectory;
private long segmentSize;
private final IndexIO indexIO = TestHelper.getTestIndexIO();
private DimensionsSpec dimensionsSpec;
private List<AggregatorFactory> metrics;
private List<InputRow> rows;
private InputStats inputStats;
@Before
public void setUp() throws IOException
@ -102,7 +108,7 @@ public class DruidSegmentReaderTest extends NullHandlingTest
new CountAggregatorFactory("cnt"),
new HyperUniquesAggregatorFactory("met_s", "strCol")
);
rows = ImmutableList.of(
final List<InputRow> rows = ImmutableList.of(
new MapBasedInputRow(
DateTimes.of("2000"),
ImmutableList.of("strCol", "dblCol"),
@ -121,7 +127,8 @@ public class DruidSegmentReaderTest extends NullHandlingTest
)
);
createTestSetup();
inputStats = new InputStatsImpl();
persistSegment(rows);
}
@Test
@ -169,6 +176,7 @@ public class DruidSegmentReaderTest extends NullHandlingTest
),
readRows(reader)
);
Assert.assertEquals(segmentSize, inputStats.getProcessedBytes());
}
@Test
@ -184,7 +192,7 @@ public class DruidSegmentReaderTest extends NullHandlingTest
metrics = ImmutableList.of();
List<String> columnNames = ImmutableList.of("longCol", "a", "b");
rows = ImmutableList.of(
final List<InputRow> rows = ImmutableList.of(
new MapBasedInputRow(
DateTimes.utc(1667115726217L),
columnNames,
@ -217,10 +225,14 @@ public class DruidSegmentReaderTest extends NullHandlingTest
)
);
createTestSetup();
persistSegment(rows);
final InputStats inputStats = new InputStatsImpl();
final DruidSegmentReader reader = new DruidSegmentReader(
makeInputEntityWithParams(Intervals.of("2022-10-30/2022-10-31"), columnNames, null),
new BytesCountingInputEntity(
makeInputEntity(Intervals.of("2022-10-30/2022-10-31"), segmentDirectory, columnNames, null),
inputStats
),
indexIO,
new TimestampSpec("__time", "iso", null),
dimensionsSpec,
@ -233,10 +245,8 @@ public class DruidSegmentReaderTest extends NullHandlingTest
temporaryFolder.newFolder()
);
List<InputRow> expectedRows = new ArrayList<>();
expectedRows.add(rows.get(2));
expectedRows.add(rows.get(1));
Assert.assertEquals(expectedRows, readRows(reader));
Assert.assertEquals(Arrays.asList(rows.get(2), rows.get(1)), readRows(reader));
Assert.assertEquals(segmentSize, inputStats.getProcessedBytes());
}
@Test
@ -247,32 +257,27 @@ public class DruidSegmentReaderTest extends NullHandlingTest
);
Assert.assertFalse(reader.intermediateRowIterator().hasNext());
Assert.assertEquals(
Collections.emptyList(),
readRows(reader)
);
}
@Test
public void testDruidTombstoneSegmentReaderBadEntity()
{
assertThrows(ClassCastException.class, () -> {
new DruidTombstoneSegmentReader(
new FileEntity(null));
});
Assert.assertTrue(readRows(reader).isEmpty());
}
@Test
public void testDruidTombstoneSegmentReaderNotCreatedFromTombstone()
{
Exception exception = assertThrows(IllegalArgumentException.class, () -> {
new DruidTombstoneSegmentReader(makeInputEntity(Intervals.of("2000/P1D")));
});
String expectedMessage =
"DruidSegmentInputEntity must be created from a tombstone but is not.";
String actualMessage = exception.getMessage();
Assert.assertEquals(expectedMessage, actualMessage);
Exception exception = assertThrows(
IllegalArgumentException.class,
() -> new DruidTombstoneSegmentReader(
makeInputEntity(
Intervals.of("2000/P1D"),
segmentDirectory,
Collections.emptyList(),
Collections.emptyList()
)
)
);
Assert.assertEquals(
"DruidSegmentInputEntity must be created from a tombstone.",
exception.getMessage()
);
}
@Test
@ -320,6 +325,7 @@ public class DruidSegmentReaderTest extends NullHandlingTest
),
readRows(reader)
);
Assert.assertEquals(segmentSize, inputStats.getProcessedBytes());
}
@Test
@ -362,6 +368,7 @@ public class DruidSegmentReaderTest extends NullHandlingTest
),
readRows(reader)
);
Assert.assertEquals(segmentSize, inputStats.getProcessedBytes());
}
@Test
@ -405,6 +412,7 @@ public class DruidSegmentReaderTest extends NullHandlingTest
),
readRows(reader)
);
Assert.assertEquals(segmentSize, inputStats.getProcessedBytes());
}
@Test
@ -446,6 +454,7 @@ public class DruidSegmentReaderTest extends NullHandlingTest
),
readRows(reader)
);
Assert.assertEquals(segmentSize, inputStats.getProcessedBytes());
}
@Test
@ -482,6 +491,7 @@ public class DruidSegmentReaderTest extends NullHandlingTest
),
readRows(reader)
);
Assert.assertEquals(segmentSize, inputStats.getProcessedBytes());
}
@Test
@ -529,6 +539,7 @@ public class DruidSegmentReaderTest extends NullHandlingTest
),
readRows(reader)
);
Assert.assertEquals(segmentSize, inputStats.getProcessedBytes());
}
@Test
@ -576,6 +587,7 @@ public class DruidSegmentReaderTest extends NullHandlingTest
),
readRows(reader)
);
Assert.assertEquals(segmentSize, inputStats.getProcessedBytes());
}
@Test
@ -623,6 +635,7 @@ public class DruidSegmentReaderTest extends NullHandlingTest
),
readRows(reader)
);
Assert.assertEquals(segmentSize, inputStats.getProcessedBytes());
}
@Test
@ -670,14 +683,17 @@ public class DruidSegmentReaderTest extends NullHandlingTest
Assert.assertTrue("Sequence is not closed", isSequenceClosed.booleanValue());
}
private DruidSegmentInputEntity makeInputEntity(final Interval interval)
private InputEntity makeInputEntity(final Interval interval)
{
return makeInputEntity(interval, segmentDirectory, ImmutableList.of("strCol", "dblCol"), ImmutableList.of("cnt", "met_s"));
}
private DruidSegmentInputEntity makeInputEntityWithParams(final Interval interval, final List<String> dimensions, final List<String> metrics)
{
return makeInputEntity(interval, segmentDirectory, dimensions, metrics);
return new BytesCountingInputEntity(
makeInputEntity(
interval,
segmentDirectory,
ImmutableList.of("strCol", "dblCol"),
ImmutableList.of("cnt", "met_s")
),
inputStats
);
}
public static DruidSegmentInputEntity makeInputEntity(
@ -688,43 +704,13 @@ public class DruidSegmentReaderTest extends NullHandlingTest
)
{
return new DruidSegmentInputEntity(
new SegmentCacheManager()
new NoopSegmentCacheManager()
{
@Override
public boolean isSegmentCached(DataSegment segment)
{
throw new UnsupportedOperationException("unused");
}
@Override
public File getSegmentFiles(DataSegment segment)
{
return segmentDirectory;
}
@Override
public void cleanup(DataSegment segment)
{
throw new UnsupportedOperationException("unused");
}
@Override
public boolean reserve(DataSegment segment)
{
throw new UnsupportedOperationException();
}
@Override
public boolean release(DataSegment segment)
{
throw new UnsupportedOperationException();
}
@Override
public void loadSegmentIntoPageCache(DataSegment segment, ExecutorService exec)
{
throw new UnsupportedOperationException();
}
},
DataSegment.builder()
.dataSource("ds")
@ -741,45 +727,7 @@ public class DruidSegmentReaderTest extends NullHandlingTest
public static DruidSegmentInputEntity makeTombstoneInputEntity(final Interval interval)
{
return new DruidSegmentInputEntity(
new SegmentCacheManager()
{
@Override
public boolean isSegmentCached(DataSegment segment)
{
throw new UnsupportedOperationException("unused");
}
@Override
public File getSegmentFiles(DataSegment segment)
{
throw new UnsupportedOperationException("unused");
}
@Override
public void cleanup(DataSegment segment)
{
throw new UnsupportedOperationException("unused");
}
@Override
public boolean reserve(DataSegment segment)
{
throw new UnsupportedOperationException();
}
@Override
public boolean release(DataSegment segment)
{
throw new UnsupportedOperationException();
}
@Override
public void loadSegmentIntoPageCache(DataSegment segment, ExecutorService exec)
{
throw new UnsupportedOperationException();
}
},
new NoopSegmentCacheManager(),
DataSegment.builder()
.dataSource("ds")
.interval(Intervals.of("2000/P1D"))
@ -792,7 +740,6 @@ public class DruidSegmentReaderTest extends NullHandlingTest
);
}
private List<InputRow> readRows(DruidSegmentReader reader) throws IOException
{
final List<InputRow> rows = new ArrayList<>();
@ -815,7 +762,6 @@ public class DruidSegmentReaderTest extends NullHandlingTest
return rows;
}
private static HyperLogLogCollector makeHLLC(final String... values)
{
final HyperLogLogCollector collector = HyperLogLogCollector.makeLatestCollector();
@ -825,7 +771,7 @@ public class DruidSegmentReaderTest extends NullHandlingTest
return collector;
}
private void createTestSetup() throws IOException
private void persistSegment(List<InputRow> rows) throws IOException
{
final IncrementalIndex incrementalIndex =
IndexBuilder.create()
@ -850,6 +796,7 @@ public class DruidSegmentReaderTest extends NullHandlingTest
new IndexSpec(),
null
);
segmentSize = FileUtils.getFileSize(segmentDirectory);
}
finally {
incrementalIndex.close();

View File

@ -42,6 +42,7 @@ import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowListPlusRawValues;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.InputSourceReader;
import org.apache.druid.data.input.InputStats;
import org.apache.druid.data.input.MapBasedInputRow;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.InputRowParser;
@ -293,7 +294,7 @@ public class TaskLifecycleTest extends InitializedNullHandlingTest
return new InputSourceReader()
{
@Override
public CloseableIterator<InputRow> read()
public CloseableIterator<InputRow> read(InputStats inputStats)
{
return new CloseableIterator<InputRow>()
{
@ -345,7 +346,7 @@ public class TaskLifecycleTest extends InitializedNullHandlingTest
return new InputSourceReader()
{
@Override
public CloseableIterator<InputRow> read()
public CloseableIterator<InputRow> read(InputStats inputStats)
{
final Iterator<InputRow> inputRowIterator = IDX_TASK_INPUT_ROWS.iterator();
return CloseableIterators.withEmptyBaggage(inputRowIterator);

View File

@ -27,9 +27,11 @@ import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.InputSource;
import org.apache.druid.data.input.InputSourceReader;
import org.apache.druid.data.input.InputStats;
import org.apache.druid.data.input.impl.ByteEntity;
import org.apache.druid.data.input.impl.CsvInputFormat;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.InputStatsImpl;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord;
import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber;
@ -88,7 +90,8 @@ public class RecordSupplierInputSourceTest extends InitializedNullHandlingTest
);
int read = 0;
try (CloseableIterator<InputRow> iterator = reader.read()) {
final InputStats inputStats = new InputStatsImpl();
try (CloseableIterator<InputRow> iterator = reader.read(inputStats)) {
for (; read < NUM_ROWS && iterator.hasNext(); read++) {
final InputRow inputRow = iterator.next();
Assert.assertEquals(DateTimes.of(TIMESTAMP_STRING), inputRow.getTimestamp());
@ -96,6 +99,7 @@ public class RecordSupplierInputSourceTest extends InitializedNullHandlingTest
}
}
Assert.assertTrue(inputStats.getProcessedBytes() > NUM_ROWS * supplier.getMinRowSize());
Assert.assertEquals(NUM_ROWS, read);
Assert.assertTrue(supplier.isClosed());
}
@ -120,11 +124,13 @@ public class RecordSupplierInputSourceTest extends InitializedNullHandlingTest
);
int read = 0;
try (CloseableIterator<InputRow> iterator = reader.read()) {
final InputStats inputStats = new InputStatsImpl();
try (CloseableIterator<InputRow> iterator = reader.read(inputStats)) {
for (; read < NUM_ROWS && iterator.hasNext(); read++) {
iterator.next();
}
}
Assert.assertEquals(0, inputStats.getProcessedBytes());
Assert.assertEquals(0, read);
Assert.assertTrue(supplier.isClosed());
}
@ -255,5 +261,10 @@ public class RecordSupplierInputSourceTest extends InitializedNullHandlingTest
{
throw new UnsupportedOperationException();
}
private long getMinRowSize()
{
return TIMESTAMP_STRING.length() + (NUM_COLS - 1) * STR_LEN;
}
}
}

View File

@ -31,6 +31,10 @@ import com.google.common.collect.ImmutableSet;
import com.google.common.io.Files;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import org.apache.druid.client.cache.CacheConfig;
import org.apache.druid.client.cache.CachePopulatorStats;
import org.apache.druid.client.cache.MapCache;
import org.apache.druid.client.indexing.NoopOverlordClient;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.impl.ByteEntity;
@ -42,19 +46,36 @@ import org.apache.druid.data.input.impl.LongDimensionSchema;
import org.apache.druid.data.input.impl.StringDimensionSchema;
import org.apache.druid.data.input.impl.StringInputRowParser;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.discovery.DataNodeService;
import org.apache.druid.discovery.DruidNodeAnnouncer;
import org.apache.druid.discovery.LookupNodeService;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
import org.apache.druid.indexing.common.SingleFileTaskReportFileWriter;
import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.TaskToolboxFactory;
import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.indexing.common.actions.LocalTaskActionClientFactory;
import org.apache.druid.indexing.common.actions.TaskActionClientFactory;
import org.apache.druid.indexing.common.actions.TaskActionToolbox;
import org.apache.druid.indexing.common.actions.TaskAuditLogConfig;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.config.TaskStorageConfig;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.Tasks;
import org.apache.druid.indexing.common.task.TestAppenderatorsManager;
import org.apache.druid.indexing.overlord.DataSourceMetadata;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import org.apache.druid.indexing.overlord.MetadataTaskStorage;
import org.apache.druid.indexing.overlord.Segments;
import org.apache.druid.indexing.overlord.TaskLockbox;
import org.apache.druid.indexing.overlord.TaskStorage;
import org.apache.druid.indexing.overlord.supervisor.SupervisorManager;
import org.apache.druid.indexing.test.TestDataSegmentAnnouncer;
import org.apache.druid.indexing.test.TestDataSegmentKiller;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
@ -62,9 +83,16 @@ import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.guava.Comparators;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.common.parsers.JSONPathSpec;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.metrics.MonitorScheduler;
import org.apache.druid.metadata.DerbyMetadataStorageActionHandlerFactory;
import org.apache.druid.metadata.EntryExistsException;
import org.apache.druid.metadata.IndexerSQLMetadataStorageCoordinator;
import org.apache.druid.metadata.TestDerbyConnector;
import org.apache.druid.query.DirectQueryProcessingPool;
import org.apache.druid.query.Druids;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.query.Result;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.query.aggregation.AggregatorFactory;
@ -77,16 +105,32 @@ import org.apache.druid.segment.DimensionHandlerUtils;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.column.DictionaryEncodedColumn;
import org.apache.druid.segment.handoff.SegmentHandoffNotifier;
import org.apache.druid.segment.handoff.SegmentHandoffNotifierFactory;
import org.apache.druid.segment.incremental.RowIngestionMetersTotals;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.segment.join.NoopJoinableFactory;
import org.apache.druid.segment.loading.DataSegmentPusher;
import org.apache.druid.segment.loading.LocalDataSegmentPusher;
import org.apache.druid.segment.loading.LocalDataSegmentPusherConfig;
import org.apache.druid.segment.realtime.appenderator.StreamAppenderator;
import org.apache.druid.segment.realtime.firehose.NoopChatHandlerProvider;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.coordination.DataSegmentServerAnnouncer;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.security.AuthTestUtils;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.utils.CompressionUtils;
import org.assertj.core.api.Assertions;
import org.easymock.EasyMock;
import org.easymock.EasyMockSupport;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.rules.TemporaryFolder;
import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
@ -97,13 +141,22 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.stream.Collectors;
public class SeekableStreamIndexTaskTestBase extends EasyMockSupport
public abstract class SeekableStreamIndexTaskTestBase extends EasyMockSupport
{
@Rule
public final TemporaryFolder tempFolder = new TemporaryFolder();
@Rule
public final TestDerbyConnector.DerbyConnectorRule derby = new TestDerbyConnector.DerbyConnectorRule();
protected static final ObjectMapper OBJECT_MAPPER;
protected static final DataSchema OLD_DATA_SCHEMA;
protected static final DataSchema NEW_DATA_SCHEMA = new DataSchema(
@ -147,6 +200,7 @@ public class SeekableStreamIndexTaskTestBase extends EasyMockSupport
protected TaskStorage taskStorage;
protected TaskLockbox taskLockbox;
protected IndexerMetadataStorageCoordinator metadataStorageCoordinator;
protected final Set<Integer> checkpointRequestsHash = new HashSet<>();
static {
OBJECT_MAPPER = new TestUtils().getTestObjectMapper();
@ -494,6 +548,167 @@ public class SeekableStreamIndexTaskTestBase extends EasyMockSupport
metadataStorageCoordinator = null;
}
protected void verifyTaskMetrics(
SeekableStreamIndexTask<?, ?, ?> task,
RowIngestionMetersTotals expectedTotals
)
{
Assert.assertEquals(expectedTotals, task.getRunner().getRowIngestionMeters().getTotals());
}
protected abstract QueryRunnerFactoryConglomerate makeQueryRunnerConglomerate();
protected void makeToolboxFactory(TestUtils testUtils, ServiceEmitter emitter, boolean doHandoff)
throws IOException
{
final ObjectMapper objectMapper = testUtils.getTestObjectMapper();
directory = tempFolder.newFolder();
final TaskConfig taskConfig = new TaskConfig(
new File(directory, "baseDir").getPath(),
new File(directory, "baseTaskDir").getPath(),
null,
50000,
null,
true,
null,
null,
null,
false,
false,
TaskConfig.BATCH_PROCESSING_MODE_DEFAULT.name(),
null,
false
);
final TestDerbyConnector derbyConnector = derby.getConnector();
derbyConnector.createDataSourceTable();
derbyConnector.createPendingSegmentsTable();
derbyConnector.createSegmentTable();
derbyConnector.createRulesTable();
derbyConnector.createConfigTable();
derbyConnector.createTaskTables();
derbyConnector.createAuditTable();
taskStorage = new MetadataTaskStorage(
derbyConnector,
new TaskStorageConfig(null),
new DerbyMetadataStorageActionHandlerFactory(
derbyConnector,
derby.metadataTablesConfigSupplier().get(),
objectMapper
)
);
metadataStorageCoordinator = new IndexerSQLMetadataStorageCoordinator(
objectMapper,
derby.metadataTablesConfigSupplier().get(),
derbyConnector
);
taskLockbox = new TaskLockbox(taskStorage, metadataStorageCoordinator);
final TaskActionToolbox taskActionToolbox = new TaskActionToolbox(
taskLockbox,
taskStorage,
metadataStorageCoordinator,
emitter,
new SupervisorManager(null)
{
@Override
public boolean checkPointDataSourceMetadata(
String supervisorId,
int taskGroupId,
@Nullable DataSourceMetadata previousDataSourceMetadata
)
{
// log.info("Adding checkpoint hash to the set");
checkpointRequestsHash.add(
Objects.hash(
supervisorId,
taskGroupId,
previousDataSourceMetadata
)
);
return true;
}
},
objectMapper
);
final TaskActionClientFactory taskActionClientFactory = new LocalTaskActionClientFactory(
taskStorage,
taskActionToolbox,
new TaskAuditLogConfig(false)
);
final SegmentHandoffNotifierFactory handoffNotifierFactory = dataSource -> new SegmentHandoffNotifier()
{
@Override
public boolean registerSegmentHandoffCallback(
SegmentDescriptor descriptor,
Executor exec,
Runnable handOffRunnable
)
{
if (doHandoff) {
// Simulate immediate handoff
exec.execute(handOffRunnable);
}
return true;
}
@Override
public void start()
{
//Noop
}
@Override
public void close()
{
//Noop
}
};
final LocalDataSegmentPusherConfig dataSegmentPusherConfig = new LocalDataSegmentPusherConfig();
dataSegmentPusherConfig.storageDirectory = getSegmentDirectory();
dataSegmentPusherConfig.zip = true;
final DataSegmentPusher dataSegmentPusher = new LocalDataSegmentPusher(dataSegmentPusherConfig);
toolboxFactory = new TaskToolboxFactory(
taskConfig,
null, // taskExecutorNode
taskActionClientFactory,
emitter,
dataSegmentPusher,
new TestDataSegmentKiller(),
null, // DataSegmentMover
null, // DataSegmentArchiver
new TestDataSegmentAnnouncer(),
EasyMock.createNiceMock(DataSegmentServerAnnouncer.class),
handoffNotifierFactory,
this::makeQueryRunnerConglomerate,
DirectQueryProcessingPool.INSTANCE,
NoopJoinableFactory.INSTANCE,
() -> EasyMock.createMock(MonitorScheduler.class),
new SegmentCacheManagerFactory(objectMapper),
objectMapper,
testUtils.getTestIndexIO(),
MapCache.create(1024),
new CacheConfig(),
new CachePopulatorStats(),
testUtils.getIndexMergerV9Factory(),
EasyMock.createNiceMock(DruidNodeAnnouncer.class),
EasyMock.createNiceMock(DruidNode.class),
new LookupNodeService("tier"),
new DataNodeService("tier", 1, ServerType.INDEXER_EXECUTOR, 0),
new SingleFileTaskReportFileWriter(reportsFile),
null,
AuthTestUtils.TEST_AUTHORIZER_MAPPER,
new NoopChatHandlerProvider(),
testUtils.getRowIngestionMetersFactory(),
new TestAppenderatorsManager(),
new NoopOverlordClient(),
null,
null,
null,
null,
"1"
);
}
protected class SegmentDescriptorAndExpectedDim1Values
{
final SegmentDescriptor segmentDescriptor;

View File

@ -99,8 +99,7 @@ public class DataGenerator
for (ColumnValueGenerator generator : columnGenerators) {
event.put(generator.getSchema().getName(), generator.generateRowValue());
}
MapBasedInputRow row = new MapBasedInputRow(nextTimestamp(), dimensionNames, event);
return row;
return new MapBasedInputRow(nextTimestamp(), dimensionNames, event);
}
/**

View File

@ -29,7 +29,7 @@ import java.util.Map;
*/
public class NoopRowIngestionMeters implements RowIngestionMeters
{
private static final RowIngestionMetersTotals EMPTY_TOTALS = new RowIngestionMetersTotals(0, 0, 0, 0);
private static final RowIngestionMetersTotals EMPTY_TOTALS = new RowIngestionMetersTotals(0, 0, 0, 0, 0);
@Override
public long getProcessed()

View File

@ -19,6 +19,7 @@
package org.apache.druid.segment.incremental;
import org.apache.druid.data.input.InputStats;
import org.apache.druid.guice.annotations.ExtensionPoint;
import java.util.Map;
@ -30,12 +31,13 @@ import java.util.Map;
* RowIngestionMeters to avoid unnecessary overhead from maintaining these moving averages.
*/
@ExtensionPoint
public interface RowIngestionMeters
public interface RowIngestionMeters extends InputStats
{
String BUILD_SEGMENTS = "buildSegments";
String DETERMINE_PARTITIONS = "determinePartitions";
String PROCESSED = "processed";
String PROCESSED_BYTES = "processedBytes";
String PROCESSED_WITH_ERROR = "processedWithError";
String UNPARSEABLE = "unparseable";
String THROWN_AWAY = "thrownAway";
@ -43,6 +45,18 @@ public interface RowIngestionMeters
long getProcessed();
void incrementProcessed();
@Override
default void incrementProcessedBytes(long incrementByValue)
{
}
@Override
default long getProcessedBytes()
{
return 0;
}
long getProcessedWithError();
void incrementProcessedWithError();

View File

@ -27,6 +27,7 @@ import java.util.Objects;
public class RowIngestionMetersTotals
{
private final long processed;
private final long processedBytes;
private final long processedWithError;
private final long thrownAway;
private final long unparseable;
@ -34,12 +35,14 @@ public class RowIngestionMetersTotals
@JsonCreator
public RowIngestionMetersTotals(
@JsonProperty("processed") long processed,
@JsonProperty("processedBytes") long processedBytes,
@JsonProperty("processedWithError") long processedWithError,
@JsonProperty("thrownAway") long thrownAway,
@JsonProperty("unparseable") long unparseable
)
{
this.processed = processed;
this.processedBytes = processedBytes;
this.processedWithError = processedWithError;
this.thrownAway = thrownAway;
this.unparseable = unparseable;
@ -51,6 +54,12 @@ public class RowIngestionMetersTotals
return processed;
}
@JsonProperty
public long getProcessedBytes()
{
return processedBytes;
}
@JsonProperty
public long getProcessedWithError()
{
@ -80,6 +89,7 @@ public class RowIngestionMetersTotals
}
RowIngestionMetersTotals that = (RowIngestionMetersTotals) o;
return processed == that.processed
&& processedBytes == that.processedBytes
&& processedWithError == that.processedWithError
&& thrownAway == that.thrownAway
&& unparseable == that.unparseable;
@ -88,7 +98,7 @@ public class RowIngestionMetersTotals
@Override
public int hashCode()
{
return Objects.hash(processed, processedWithError, thrownAway, unparseable);
return Objects.hash(processed, processedBytes, processedWithError, thrownAway, unparseable);
}
@Override
@ -96,6 +106,7 @@ public class RowIngestionMetersTotals
{
return "RowIngestionMetersTotals{" +
"processed=" + processed +
", processedBytes=" + processedBytes +
", processedWithError=" + processedWithError +
", thrownAway=" + thrownAway +
", unparseable=" + unparseable +

View File

@ -21,20 +21,13 @@ package org.apache.druid.segment.incremental;
import java.util.Map;
public class MutableRowIngestionMeters implements RowIngestionMeters
public class SimpleRowIngestionMeters implements RowIngestionMeters
{
private long processed;
private long processedWithError;
private long unparseable;
private long thrownAway;
public MutableRowIngestionMeters()
{
this.processed = 0;
this.processedWithError = 0;
this.unparseable = 0;
this.thrownAway = 0;
}
private long processedBytes;
@Override
public long getProcessed()
@ -48,6 +41,18 @@ public class MutableRowIngestionMeters implements RowIngestionMeters
processed++;
}
@Override
public void incrementProcessedBytes(long increment)
{
processedBytes += increment;
}
@Override
public long getProcessedBytes()
{
return processedBytes;
}
@Override
public long getProcessedWithError()
{
@ -87,7 +92,13 @@ public class MutableRowIngestionMeters implements RowIngestionMeters
@Override
public RowIngestionMetersTotals getTotals()
{
return new RowIngestionMetersTotals(processed, processedWithError, thrownAway, unparseable);
return new RowIngestionMetersTotals(
processed,
processedBytes,
processedWithError,
thrownAway,
unparseable
);
}
@Override
@ -102,5 +113,6 @@ public class MutableRowIngestionMeters implements RowIngestionMeters
this.processedWithError += rowIngestionMetersTotals.getProcessedWithError();
this.unparseable += rowIngestionMetersTotals.getUnparseable();
this.thrownAway += rowIngestionMetersTotals.getThrownAway();
this.processedBytes += rowIngestionMetersTotals.getProcessedBytes();
}
}

View File

@ -22,6 +22,7 @@ package org.apache.druid.segment.transform;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowListPlusRawValues;
import org.apache.druid.data.input.InputSourceReader;
import org.apache.druid.data.input.InputStats;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import java.io.IOException;
@ -38,9 +39,9 @@ public class TransformingInputSourceReader implements InputSourceReader
}
@Override
public CloseableIterator<InputRow> read() throws IOException
public CloseableIterator<InputRow> read(InputStats inputStats) throws IOException
{
return delegate.read().map(transformer::transform);
return delegate.read(inputStats).map(transformer::transform);
}
@Override

View File

@ -19,72 +19,51 @@
package org.apache.druid.segment.incremental;
import java.util.Map;
public class SimpleRowIngestionMeters implements RowIngestionMeters
/**
* Utility class to build {@link RowIngestionMetersTotals}, used in tests.
*/
public class RowMeters
{
private long processed;
private long processedBytes;
private long processedWithError;
private long unparseable;
private long thrownAway;
@Override
public long getProcessed()
/**
* Creates a new {@link RowMeters}, that can be used to build an instance of
* {@link RowIngestionMetersTotals}.
*/
public static RowMeters with()
{
return processed;
return new RowMeters();
}
@Override
public void incrementProcessed()
public RowMeters bytes(long processedBytes)
{
processed++;
this.processedBytes = processedBytes;
return this;
}
@Override
public long getProcessedWithError()
public RowMeters errors(long processedWithError)
{
return processedWithError;
this.processedWithError = processedWithError;
return this;
}
@Override
public void incrementProcessedWithError()
public RowMeters unparseable(long unparseable)
{
processedWithError++;
this.unparseable = unparseable;
return this;
}
@Override
public long getUnparseable()
public RowMeters thrownAway(long thrownAway)
{
return unparseable;
this.thrownAway = thrownAway;
return this;
}
@Override
public void incrementUnparseable()
public RowIngestionMetersTotals totalProcessed(long processed)
{
unparseable++;
}
@Override
public long getThrownAway()
{
return thrownAway;
}
@Override
public void incrementThrownAway()
{
thrownAway++;
}
@Override
public RowIngestionMetersTotals getTotals()
{
return new RowIngestionMetersTotals(processed, processedWithError, thrownAway, unparseable);
}
@Override
public Map<String, Object> getMovingAverages()
{
throw new UnsupportedOperationException();
return new RowIngestionMetersTotals(processed, processedBytes, processedWithError, thrownAway, unparseable);
}
}

View File

@ -22,25 +22,26 @@ package org.apache.druid.segment.incremental;
import org.junit.Assert;
import org.junit.Test;
public class MutableRowIngestionMetersTest
public class SimpleRowIngestionMetersTest
{
@Test
public void testIncrement()
{
MutableRowIngestionMeters mutableRowIngestionMeters = new MutableRowIngestionMeters();
mutableRowIngestionMeters.incrementProcessed();
mutableRowIngestionMeters.incrementProcessedWithError();
mutableRowIngestionMeters.incrementUnparseable();
mutableRowIngestionMeters.incrementThrownAway();
Assert.assertEquals(mutableRowIngestionMeters.getTotals(), new RowIngestionMetersTotals(1, 1, 1, 1));
SimpleRowIngestionMeters rowIngestionMeters = new SimpleRowIngestionMeters();
rowIngestionMeters.incrementProcessed();
rowIngestionMeters.incrementProcessedBytes(5);
rowIngestionMeters.incrementProcessedWithError();
rowIngestionMeters.incrementUnparseable();
rowIngestionMeters.incrementThrownAway();
Assert.assertEquals(rowIngestionMeters.getTotals(), new RowIngestionMetersTotals(1, 5, 1, 1, 1));
}
@Test
public void testAddRowIngestionMetersTotals()
{
MutableRowIngestionMeters mutableRowIngestionMeters = new MutableRowIngestionMeters();
RowIngestionMetersTotals rowIngestionMetersTotals = new RowIngestionMetersTotals(10, 1, 0, 1);
mutableRowIngestionMeters.addRowIngestionMetersTotals(rowIngestionMetersTotals);
Assert.assertEquals(mutableRowIngestionMeters.getTotals(), rowIngestionMetersTotals);
SimpleRowIngestionMeters rowIngestionMeters = new SimpleRowIngestionMeters();
RowIngestionMetersTotals rowIngestionMetersTotals = new RowIngestionMetersTotals(10, 0, 1, 0, 1);
rowIngestionMeters.addRowIngestionMetersTotals(rowIngestionMetersTotals);
Assert.assertEquals(rowIngestionMeters.getTotals(), rowIngestionMetersTotals);
}
}

View File

@ -44,7 +44,7 @@ import java.util.Map;
public class SqlReader extends IntermediateRowParsingReader<Map<String, Object>>
{
private final InputRowSchema inputRowSchema;
private final SqlEntity source;
private final InputEntity source;
private final File temporaryDirectory;
private final ObjectMapper objectMapper;
@ -57,7 +57,7 @@ public class SqlReader extends IntermediateRowParsingReader<Map<String, Object>>
)
{
this.inputRowSchema = inputRowSchema;
this.source = (SqlEntity) source;
this.source = source;
this.temporaryDirectory = temporaryDirectory;
this.objectMapper = objectMapper;
}

View File

@ -44,15 +44,11 @@ import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.tweak.HandleCallback;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Random;
import java.util.UUID;
@ -553,18 +549,14 @@ public class SQLMetadataStorageActionHandlerTest
private Integer getUnmigratedTaskCount()
{
return handler.getConnector().retryWithHandle(
new HandleCallback<Integer>()
{
@Override
public Integer withHandle(Handle handle) throws SQLException
{
String sql = String.format(Locale.ENGLISH,
"SELECT COUNT(*) FROM %s WHERE type is NULL or group_id is NULL",
entryTable);
ResultSet resultSet = handle.getConnection().createStatement().executeQuery(sql);
resultSet.next();
return resultSet.getInt(1);
}
handle -> {
String sql = StringUtils.format(
"SELECT COUNT(*) FROM %s WHERE type is NULL or group_id is NULL",
entryTable
);
ResultSet resultSet = handle.getConnection().createStatement().executeQuery(sql);
resultSet.next();
return resultSet.getInt(1);
}
);
}

View File

@ -23,6 +23,8 @@ import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.io.IOUtils;
import org.apache.druid.data.input.InputEntity;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.MapBasedInputRow;
import org.apache.druid.metadata.TestDerbyConnector;
import org.apache.druid.segment.TestHelper;
import org.junit.Assert;
@ -35,6 +37,7 @@ import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
public class SqlEntityTest
{
@ -47,10 +50,6 @@ public class SqlEntityTest
String VALID_SQL = "SELECT timestamp,a,b FROM FOOS_TABLE";
String INVALID_SQL = "DONT SELECT timestamp,a,b FROM FOOS_TABLE";
String resultJson = "[{\"a\":\"0\","
+ "\"b\":\"0\","
+ "\"timestamp\":\"2011-01-12T00:00:00.000Z\""
+ "}]";
@Before
public void setUp()
@ -65,11 +64,8 @@ public class SqlEntityTest
{
derbyConnector = derbyConnectorRule.getConnector();
SqlTestUtils testUtils = new SqlTestUtils(derbyConnector);
testUtils.createAndUpdateTable(TABLE_NAME_1, 1);
File tmpFile = File.createTempFile(
"testQueryResults",
""
);
final InputRow expectedRow = testUtils.createTableWithRows(TABLE_NAME_1, 1).get(0);
File tmpFile = File.createTempFile("testQueryResults", "");
InputEntity.CleanableFile queryResult = SqlEntity.openCleanableFile(
VALID_SQL,
testUtils.getDerbyFirehoseConnector(),
@ -79,56 +75,34 @@ public class SqlEntityTest
);
InputStream queryInputStream = new FileInputStream(queryResult.file());
String actualJson = IOUtils.toString(queryInputStream, StandardCharsets.UTF_8);
Assert.assertEquals(actualJson, resultJson);
String expectedJson = mapper.writeValueAsString(
Collections.singletonList(((MapBasedInputRow) expectedRow).getEvent())
);
Assert.assertEquals(actualJson, expectedJson);
testUtils.dropTable(TABLE_NAME_1);
}
@Test(expected = IOException.class)
public void testFailOnInvalidQuery() throws IOException
{
derbyConnector = derbyConnectorRule.getConnector();
SqlTestUtils testUtils = new SqlTestUtils(derbyConnector);
testUtils.createAndUpdateTable(TABLE_NAME_1, 1);
File tmpFile = File.createTempFile(
"testQueryResults",
""
);
InputEntity.CleanableFile queryResult = SqlEntity.openCleanableFile(
INVALID_SQL,
testUtils.getDerbyFirehoseConnector(),
mapper,
true,
tmpFile
);
Assert.assertTrue(tmpFile.exists());
}
@Test
public void testFileDeleteOnInvalidQuery() throws IOException
{
//The test parameters here are same as those used for testFailOnInvalidQuery().
//The only difference is that this test checks if the temporary file is deleted upon failure.
derbyConnector = derbyConnectorRule.getConnector();
SqlTestUtils testUtils = new SqlTestUtils(derbyConnector);
testUtils.createAndUpdateTable(TABLE_NAME_1, 1);
File tmpFile = File.createTempFile(
"testQueryResults",
""
testUtils.createTableWithRows(TABLE_NAME_1, 1);
File tmpFile = File.createTempFile("testQueryResults", "");
Assert.assertTrue(tmpFile.exists());
Assert.assertThrows(
IOException.class,
() -> SqlEntity.openCleanableFile(
INVALID_SQL,
testUtils.getDerbyFirehoseConnector(),
mapper,
true,
tmpFile
)
);
try {
SqlEntity.openCleanableFile(
INVALID_SQL,
testUtils.getDerbyFirehoseConnector(),
mapper,
true,
tmpFile
);
}
// Lets catch the exception so as to test temporary file deletion.
catch (IOException e) {
Assert.assertFalse(tmpFile.exists());
}
// Verify that the temporary file is cleaned up
Assert.assertFalse(tmpFile.exists());
}
}

View File

@ -23,8 +23,8 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import nl.jqno.equalsverifier.EqualsVerifier;
import org.apache.commons.dbcp2.BasicDataSource;
import org.apache.druid.data.input.ColumnsFilter;
@ -34,11 +34,11 @@ import org.apache.druid.data.input.InputRowListPlusRawValues;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.InputSourceReader;
import org.apache.druid.data.input.InputSplit;
import org.apache.druid.data.input.Row;
import org.apache.druid.data.input.InputStats;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.InputStatsImpl;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.metadata.MetadataStorageConnectorConfig;
import org.apache.druid.metadata.SQLFirehoseDatabaseConnector;
@ -57,7 +57,6 @@ import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.Objects;
import java.util.Set;
@ -67,14 +66,8 @@ import java.util.stream.Stream;
public class SqlInputSourceTest
{
private static final List<File> FIREHOSE_TMP_DIRS = new ArrayList<>();
private final String TABLE_NAME_1 = "FOOS_TABLE_1";
private final String TABLE_NAME_2 = "FOOS_TABLE_2";
private final List<String> SQLLIST1 = ImmutableList.of("SELECT timestamp,a,b FROM FOOS_TABLE_1");
private final List<String> SQLLIST2 = ImmutableList.of(
"SELECT timestamp,a,b FROM FOOS_TABLE_1",
"SELECT timestamp,a,b FROM FOOS_TABLE_2"
);
private final String TABLE_1 = "FOOS_TABLE_1";
private final String TABLE_2 = "FOOS_TABLE_2";
private static final InputRowSchema INPUT_ROW_SCHEMA = new InputRowSchema(
new TimestampSpec("timestamp", "auto", null),
@ -83,6 +76,7 @@ public class SqlInputSourceTest
),
ColumnsFilter.all()
);
@Rule
public final TestDerbyConnector.DerbyConnectorRule derbyConnectorRule = new TestDerbyConnector.DerbyConnectorRule();
private final ObjectMapper mapper = TestHelper.makeSmileMapper();
@ -104,25 +98,6 @@ public class SqlInputSourceTest
}
}
private void assertResult(List<Row> rows, List<String> sqls)
{
Assert.assertEquals(10 * sqls.size(), rows.size());
rows.sort(Comparator.comparing(Row::getTimestamp)
.thenComparingInt(r -> Integer.valueOf(r.getDimension("a").get(0)))
.thenComparingInt(r -> Integer.valueOf(r.getDimension("b").get(0))));
int rowCount = 0;
for (int i = 0; i < 10; i++) {
for (int j = 0; j < sqls.size(); j++) {
final Row row = rows.get(rowCount);
String timestampSt = StringUtils.format("2011-01-12T00:0%s:00.000Z", i);
Assert.assertEquals(timestampSt, row.getTimestamp().toString());
Assert.assertEquals(i, Integer.valueOf(row.getDimension("a").get(0)).intValue());
Assert.assertEquals(i, Integer.valueOf(row.getDimension("b").get(0)).intValue());
rowCount++;
}
}
}
private File createFirehoseTmpDir(String dirSuffix) throws IOException
{
final File firehoseTempDir = File.createTempFile(
@ -141,7 +116,8 @@ public class SqlInputSourceTest
mapper.registerSubtypes(TestSerdeFirehoseConnector.class);
final SqlInputSourceTest.TestSerdeFirehoseConnector testSerdeFirehoseConnector = new SqlInputSourceTest.TestSerdeFirehoseConnector(
new MetadataStorageConnectorConfig());
final SqlInputSource sqlInputSource = new SqlInputSource(SQLLIST1, true, testSerdeFirehoseConnector, mapper);
final SqlInputSource sqlInputSource =
new SqlInputSource(SqlTestUtils.selectFrom(TABLE_1), true, testSerdeFirehoseConnector, mapper);
final String valueString = mapper.writeValueAsString(sqlInputSource);
final SqlInputSource inputSourceFromJson = mapper.readValue(valueString, SqlInputSource.class);
Assert.assertEquals(sqlInputSource, inputSourceFromJson);
@ -152,17 +128,26 @@ public class SqlInputSourceTest
{
derbyConnector = derbyConnectorRule.getConnector();
SqlTestUtils testUtils = new SqlTestUtils(derbyConnector);
testUtils.createAndUpdateTable(TABLE_NAME_1, 10);
final List<InputRow> expectedRows = testUtils.createTableWithRows(TABLE_1, 10);
final File tempDir = createFirehoseTmpDir("testSingleSplit");
SqlInputSource sqlInputSource = new SqlInputSource(SQLLIST1, true, testUtils.getDerbyFirehoseConnector(), mapper);
final InputStats inputStats = new InputStatsImpl();
SqlInputSource sqlInputSource = new SqlInputSource(
SqlTestUtils.selectFrom(TABLE_1),
true,
testUtils.getDerbyFirehoseConnector(),
mapper
);
InputSourceReader sqlReader = sqlInputSource.fixedFormatReader(INPUT_ROW_SCHEMA, tempDir);
CloseableIterator<InputRow> resultIterator = sqlReader.read();
final List<Row> rows = new ArrayList<>();
while (resultIterator.hasNext()) {
rows.add(resultIterator.next());
}
assertResult(rows, SQLLIST1);
testUtils.dropTable(TABLE_NAME_1);
CloseableIterator<InputRow> resultIterator = sqlReader.read(inputStats);
final List<InputRow> rows = Lists.newArrayList(resultIterator);
// Records for each split are written to a temp file as a json array
// file size = 1B (array open char) + 10 records * 60B (including trailing comma)
Assert.assertEquals(601, inputStats.getProcessedBytes());
Assert.assertEquals(expectedRows, rows);
testUtils.dropTable(TABLE_1);
}
@ -171,19 +156,27 @@ public class SqlInputSourceTest
{
derbyConnector = derbyConnectorRule.getConnector();
SqlTestUtils testUtils = new SqlTestUtils(derbyConnector);
testUtils.createAndUpdateTable(TABLE_NAME_1, 10);
testUtils.createAndUpdateTable(TABLE_NAME_2, 10);
final List<InputRow> expectedRowsTable1 = testUtils.createTableWithRows(TABLE_1, 10);
final List<InputRow> expectedRowsTable2 = testUtils.createTableWithRows(TABLE_2, 10);
final File tempDir = createFirehoseTmpDir("testMultipleSplit");
SqlInputSource sqlInputSource = new SqlInputSource(SQLLIST2, true, testUtils.getDerbyFirehoseConnector(), mapper);
SqlInputSource sqlInputSource = new SqlInputSource(
SqlTestUtils.selectFrom(TABLE_1, TABLE_2),
true,
testUtils.getDerbyFirehoseConnector(),
mapper
);
final InputStats inputStats = new InputStatsImpl();
InputSourceReader sqlReader = sqlInputSource.fixedFormatReader(INPUT_ROW_SCHEMA, tempDir);
CloseableIterator<InputRow> resultIterator = sqlReader.read();
final List<Row> rows = new ArrayList<>();
while (resultIterator.hasNext()) {
rows.add(resultIterator.next());
}
assertResult(rows, SQLLIST2);
testUtils.dropTable(TABLE_NAME_1);
testUtils.dropTable(TABLE_NAME_2);
CloseableIterator<InputRow> resultIterator = sqlReader.read(inputStats);
final List<InputRow> rows = Lists.newArrayList(resultIterator);
Assert.assertEquals(expectedRowsTable1, rows.subList(0, 10));
Assert.assertEquals(expectedRowsTable2, rows.subList(10, 20));
Assert.assertEquals(1202, inputStats.getProcessedBytes());
testUtils.dropTable(TABLE_1);
testUtils.dropTable(TABLE_2);
}
@Test
@ -191,10 +184,12 @@ public class SqlInputSourceTest
{
derbyConnector = derbyConnectorRule.getConnector();
SqlTestUtils testUtils = new SqlTestUtils(derbyConnector);
SqlInputSource sqlInputSource = new SqlInputSource(SQLLIST2, true, testUtils.getDerbyFirehoseConnector(), mapper);
final List<String> sqls = SqlTestUtils.selectFrom(TABLE_1, TABLE_2);
SqlInputSource sqlInputSource =
new SqlInputSource(sqls, true, testUtils.getDerbyFirehoseConnector(), mapper);
InputFormat inputFormat = EasyMock.createMock(InputFormat.class);
Stream<InputSplit<String>> sqlSplits = sqlInputSource.createSplits(inputFormat, null);
Assert.assertEquals(SQLLIST2, sqlSplits.map(InputSplit::get).collect(Collectors.toList()));
Assert.assertEquals(sqls, sqlSplits.map(InputSplit::get).collect(Collectors.toList()));
Assert.assertEquals(2, sqlInputSource.estimateNumSplits(inputFormat, null));
}
@ -203,22 +198,23 @@ public class SqlInputSourceTest
{
derbyConnector = derbyConnectorRule.getConnector();
SqlTestUtils testUtils = new SqlTestUtils(derbyConnector);
testUtils.createAndUpdateTable(TABLE_NAME_1, 10);
final List<InputRow> expectedRows = testUtils.createTableWithRows(TABLE_1, 10);
try {
final File tempDir = createFirehoseTmpDir("testSingleSplit");
SqlInputSource sqlInputSource = new SqlInputSource(SQLLIST1, true, testUtils.getDerbyFirehoseConnector(), mapper);
SqlInputSource sqlInputSource =
new SqlInputSource(SqlTestUtils.selectFrom(TABLE_1), true, testUtils.getDerbyFirehoseConnector(), mapper);
InputSourceReader sqlReader = sqlInputSource.fixedFormatReader(INPUT_ROW_SCHEMA, tempDir);
CloseableIterator<InputRowListPlusRawValues> resultIterator = sqlReader.sample();
final List<InputRowListPlusRawValues> rows = new ArrayList<>();
final List<InputRow> rows = new ArrayList<>();
while (resultIterator.hasNext()) {
InputRowListPlusRawValues row = resultIterator.next();
Assert.assertNull(row.getParseException());
rows.add(row);
rows.addAll(row.getInputRows());
}
assertResult(rows.stream().flatMap(r -> r.getInputRows().stream()).collect(Collectors.toList()), SQLLIST1);
Assert.assertEquals(expectedRows, rows);
}
finally {
testUtils.dropTable(TABLE_NAME_1);
testUtils.dropTable(TABLE_1);
}
}

View File

@ -23,6 +23,9 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import org.apache.commons.dbcp2.BasicDataSource;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.MapBasedInputRow;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.metadata.MetadataStorageConnectorConfig;
import org.apache.druid.metadata.SQLFirehoseDatabaseConnector;
@ -33,7 +36,14 @@ import org.skife.jdbi.v2.Batch;
import org.skife.jdbi.v2.DBI;
import org.skife.jdbi.v2.tweak.HandleCallback;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
public class SqlTestUtils
{
@ -88,7 +98,7 @@ public class SqlTestUtils
}
}
public void createAndUpdateTable(final String tableName, int numEntries)
public List<InputRow> createTableWithRows(final String tableName, int numEntries)
{
derbyConnector.createTable(
tableName,
@ -104,20 +114,30 @@ public class SqlTestUtils
)
);
final List<InputRow> rowsToCreate = IntStream.range(0, numEntries).mapToObj(i -> {
final String timestamp = StringUtils.format("2011-01-12T00:%02d:00.000Z", i);
final Map<String, Object> event = new LinkedHashMap<>();
event.put("a", "a " + i);
event.put("b", "b" + i);
event.put("timestamp", timestamp);
return new MapBasedInputRow(DateTimes.of(timestamp), Arrays.asList("timestamp", "a", "b"), event);
}).collect(Collectors.toList());
derbyConnector.getDBI().withHandle(
(handle) -> {
Batch batch = handle.createBatch();
for (int i = 0; i < numEntries; i++) {
String timestampSt = StringUtils.format("2011-01-12T00:0%s:00.000Z", i);
batch.add(StringUtils.format("INSERT INTO %1$s (timestamp, a, b) VALUES ('%2$s', '%3$s', '%4$s')",
tableName, timestampSt,
i, i
for (InputRow row : rowsToCreate) {
batch.add(StringUtils.format(
"INSERT INTO %1$s (timestamp, a, b) VALUES ('%2$s', '%3$s', '%4$s')",
tableName, row.getTimestamp(), row.getDimension("a").get(0), row.getDimension("b").get(0)
));
}
batch.execute();
return null;
}
);
return rowsToCreate;
}
public void dropTable(final String tableName)
@ -135,4 +155,17 @@ public class SqlTestUtils
{
return derbyFirehoseConnector;
}
/**
* Builds a {@code SELECT timestamp, a, b FROM tableName} query for each of
* the given tables.
*/
public static List<String> selectFrom(String... tableNames)
{
final List<String> selects = new ArrayList<>();
for (String tableName : tableNames) {
selects.add(StringUtils.format("SELECT timestamp, a, b FROM %s", tableName));
}
return selects;
}
}

View File

@ -19,27 +19,22 @@
package org.apache.druid.segment.loading;
import org.apache.druid.java.util.common.MapUtils;
import org.apache.druid.timeline.DataSegment;
import java.io.File;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
/**
*
* Test implementation of {@link SegmentCacheManager} which throws an
* {@link UnsupportedOperationException} on invocation of any method.
*/
public class CacheTestSegmentCacheManager implements SegmentCacheManager
public class NoopSegmentCacheManager implements SegmentCacheManager
{
private final Set<DataSegment> segmentsInTrash = new HashSet<>();
@Override
public boolean isSegmentCached(DataSegment segment)
{
Map<String, Object> loadSpec = segment.getLoadSpec();
return new File(MapUtils.getString(loadSpec, "cacheDir")).exists();
throw new UnsupportedOperationException();
}
@Override
@ -63,12 +58,7 @@ public class CacheTestSegmentCacheManager implements SegmentCacheManager
@Override
public void cleanup(DataSegment segment)
{
segmentsInTrash.add(segment);
}
public Set<DataSegment> getSegmentsInTrash()
{
return segmentsInTrash;
throw new UnsupportedOperationException();
}
@Override

View File

@ -20,8 +20,8 @@
package org.apache.druid.segment.realtime.firehose;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import org.apache.druid.data.input.Firehose;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.Row;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.InputRowParser;
@ -29,7 +29,6 @@ import org.apache.druid.data.input.impl.MapInputRowParser;
import org.apache.druid.data.input.impl.TimeAndDimsParseSpec;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.metadata.TestDerbyConnector;
import org.apache.druid.metadata.input.SqlTestUtils;
import org.apache.druid.segment.TestHelper;
@ -44,21 +43,14 @@ import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
public class SqlFirehoseFactoryTest
{
private static final List<File> FIREHOSE_TMP_DIRS = new ArrayList<>();
private static File TEST_DIR;
private final String TABLE_NAME_1 = "FOOS_TABLE_1";
private final String TABLE_NAME_2 = "FOOS_TABLE_2";
private final List<String> SQLLIST1 = ImmutableList.of("SELECT timestamp,a,b FROM FOOS_TABLE_1");
private final List<String> SQLLIST2 = ImmutableList.of(
"SELECT timestamp,a,b FROM FOOS_TABLE_1",
"SELECT timestamp,a,b FROM FOOS_TABLE_2"
);
private static final String TABLE_NAME_1 = "FOOS_TABLE_1";
private static final String TABLE_NAME_2 = "FOOS_TABLE_2";
@Rule
public final TestDerbyConnector.DerbyConnectorRule derbyConnectorRule = new TestDerbyConnector.DerbyConnectorRule();
@ -93,25 +85,6 @@ public class SqlFirehoseFactoryTest
}
}
private void assertResult(List<Row> rows, List<String> sqls)
{
Assert.assertEquals(10 * sqls.size(), rows.size());
rows.sort(Comparator.comparing(Row::getTimestamp)
.thenComparingInt(r -> Integer.valueOf(r.getDimension("a").get(0)))
.thenComparingInt(r -> Integer.valueOf(r.getDimension("b").get(0))));
int rowCount = 0;
for (int i = 0; i < 10; i++) {
for (int j = 0; j < sqls.size(); j++) {
final Row row = rows.get(rowCount);
String timestampSt = StringUtils.format("2011-01-12T00:0%s:00.000Z", i);
Assert.assertEquals(timestampSt, row.getTimestamp().toString());
Assert.assertEquals(i, Integer.valueOf(row.getDimension("a").get(0)).intValue());
Assert.assertEquals(i, Integer.valueOf(row.getDimension("b").get(0)).intValue());
rowCount++;
}
}
}
private void assertNumRemainingCacheFiles(File firehoseTmpDir, int expectedNumFiles)
{
final String[] files = firehoseTmpDir.list();
@ -136,10 +109,10 @@ public class SqlFirehoseFactoryTest
{
derbyConnector = derbyConnectorRule.getConnector();
SqlTestUtils testUtils = new SqlTestUtils(derbyConnector);
testUtils.createAndUpdateTable(TABLE_NAME_1, 10);
final List<InputRow> expectedRows = testUtils.createTableWithRows(TABLE_NAME_1, 10);
final SqlFirehoseFactory factory =
new SqlFirehoseFactory(
SQLLIST1,
SqlTestUtils.selectFrom(TABLE_NAME_1),
0L,
0L,
0L,
@ -157,7 +130,7 @@ public class SqlFirehoseFactoryTest
}
}
assertResult(rows, SQLLIST1);
Assert.assertEquals(expectedRows, rows);
assertNumRemainingCacheFiles(firehoseTmpDir, 0);
testUtils.dropTable(TABLE_NAME_1);
}
@ -168,10 +141,10 @@ public class SqlFirehoseFactoryTest
{
derbyConnector = derbyConnectorRule.getConnector();
SqlTestUtils testUtils = new SqlTestUtils(derbyConnector);
testUtils.createAndUpdateTable(TABLE_NAME_1, 10);
final List<InputRow> expectedRows = testUtils.createTableWithRows(TABLE_NAME_1, 10);
final SqlFirehoseFactory factory =
new SqlFirehoseFactory(
SQLLIST1,
SqlTestUtils.selectFrom(TABLE_NAME_1),
0L,
null,
null,
@ -190,7 +163,7 @@ public class SqlFirehoseFactoryTest
}
}
assertResult(rows, SQLLIST1);
Assert.assertEquals(expectedRows, rows);
assertNumRemainingCacheFiles(firehoseTmpDir, 0);
testUtils.dropTable(TABLE_NAME_1);
}
@ -201,12 +174,12 @@ public class SqlFirehoseFactoryTest
{
derbyConnector = derbyConnectorRule.getConnector();
SqlTestUtils testUtils = new SqlTestUtils(derbyConnector);
testUtils.createAndUpdateTable(TABLE_NAME_1, 10);
testUtils.createAndUpdateTable(TABLE_NAME_2, 10);
final List<InputRow> expectedRowsTable1 = testUtils.createTableWithRows(TABLE_NAME_1, 10);
final List<InputRow> expectedRowsTable2 = testUtils.createTableWithRows(TABLE_NAME_2, 10);
final SqlFirehoseFactory factory = new
SqlFirehoseFactory(
SQLLIST2,
SqlTestUtils.selectFrom(TABLE_NAME_1, TABLE_NAME_2),
null,
null,
0L,
@ -224,7 +197,8 @@ public class SqlFirehoseFactoryTest
}
}
assertResult(rows, SQLLIST2);
Assert.assertEquals(expectedRowsTable1, rows.subList(0, 10));
Assert.assertEquals(expectedRowsTable2, rows.subList(10, 20));
assertNumRemainingCacheFiles(firehoseTmpDir, 2);
testUtils.dropTable(TABLE_NAME_1);
testUtils.dropTable(TABLE_NAME_2);

View File

@ -26,14 +26,15 @@ import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.druid.guice.ServerTypeConfig;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.MapUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.loading.CacheTestSegmentCacheManager;
import org.apache.druid.segment.loading.CacheTestSegmentLoader;
import org.apache.druid.segment.loading.SegmentLoader;
import org.apache.druid.segment.loading.NoopSegmentCacheManager;
import org.apache.druid.segment.loading.SegmentCacheManager;
import org.apache.druid.segment.loading.SegmentLoaderConfig;
import org.apache.druid.segment.loading.StorageLocationConfig;
import org.apache.druid.server.SegmentManager;
@ -84,13 +85,12 @@ public class SegmentLoadDropHandlerTest
private TestStorageLocation testStorageLocation;
private AtomicInteger announceCount;
private ConcurrentSkipListSet<DataSegment> segmentsAnnouncedByMe;
private CacheTestSegmentCacheManager segmentCacheManager;
private SegmentLoader segmentLoader;
private SegmentCacheManager segmentCacheManager;
private Set<DataSegment> segmentsRemovedFromCache;
private SegmentManager segmentManager;
private List<Runnable> scheduledRunnable;
private SegmentLoaderConfig segmentLoaderConfig;
private SegmentLoaderConfig noAnnouncerSegmentLoaderConfig;
private SegmentLoaderConfig segmentLoaderConfigNoLocations;
private ScheduledExecutorFactory scheduledExecutorFactory;
private List<StorageLocationConfig> locations;
@ -122,9 +122,24 @@ public class SegmentLoadDropHandlerTest
scheduledRunnable = new ArrayList<>();
segmentCacheManager = new CacheTestSegmentCacheManager();
segmentLoader = new CacheTestSegmentLoader();
segmentManager = new SegmentManager(segmentLoader);
segmentsRemovedFromCache = new HashSet<>();
segmentCacheManager = new NoopSegmentCacheManager()
{
@Override
public boolean isSegmentCached(DataSegment segment)
{
Map<String, Object> loadSpec = segment.getLoadSpec();
return new File(MapUtils.getString(loadSpec, "cacheDir")).exists();
}
@Override
public void cleanup(DataSegment segment)
{
segmentsRemovedFromCache.add(segment);
}
};
segmentManager = new SegmentManager(new CacheTestSegmentLoader());
segmentsAnnouncedByMe = new ConcurrentSkipListSet<>();
announceCount = new AtomicInteger(0);
@ -230,28 +245,6 @@ public class SegmentLoadDropHandlerTest
}
};
segmentLoaderConfigNoLocations = new SegmentLoaderConfig()
{
@Override
public int getNumLoadingThreads()
{
return 5;
}
@Override
public int getAnnounceIntervalMillis()
{
return 50;
}
@Override
public int getDropSegmentDelayMillis()
{
return 0;
}
};
scheduledExecutorFactory = new ScheduledExecutorFactory()
{
@Override
@ -314,7 +307,7 @@ public class SegmentLoadDropHandlerTest
}
Assert.assertTrue(segmentsAnnouncedByMe.contains(segment));
Assert.assertFalse("segment files shouldn't be deleted", segmentCacheManager.getSegmentsInTrash().contains(segment));
Assert.assertFalse("segment files shouldn't be deleted", segmentsRemovedFromCache.contains(segment));
segmentLoadDropHandler.stop();
}
@ -353,7 +346,7 @@ public class SegmentLoadDropHandlerTest
}
Assert.assertTrue(segmentsAnnouncedByMe.contains(segment));
Assert.assertFalse("segment files shouldn't be deleted", segmentCacheManager.getSegmentsInTrash().contains(segment));
Assert.assertFalse("segment files shouldn't be deleted", segmentsRemovedFromCache.contains(segment));
segmentLoadDropHandler.stop();
}