DruidSegmentReader should work if timestamp is specified as a dimension (#9530)

* DruidSegmentReader should work if timestamp is specified as a dimension

* Add integration tests

Tests for compaction and re-indexing a datasource with the timestamp column

* Instructions to run integration tests against quickstart

* address pr
This commit is contained in:
Suneet Saldanha 2020-03-25 13:47:34 -07:00 committed by GitHub
parent 3f521943fc
commit 55c08e0746
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 283 additions and 22 deletions

View File

@ -27,7 +27,6 @@ import org.apache.druid.data.input.InputEntity.CleanableFile;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.IntermediateRowParsingReader;
import org.apache.druid.data.input.MapBasedInputRow;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.guava.Sequence;
@ -61,6 +60,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.NoSuchElementException;
public class DruidSegmentReader extends IntermediateRowParsingReader<Map<String, Object>>
{
@ -122,7 +122,7 @@ public class DruidSegmentReader extends IntermediateRowParsingReader<Map<String,
@Override
protected List<InputRow> parseInputRows(Map<String, Object> intermediateRow) throws ParseException
{
final DateTime timestamp = (DateTime) intermediateRow.get(TimestampSpec.DEFAULT_COLUMN);
final DateTime timestamp = (DateTime) intermediateRow.get(ColumnHolder.TIME_COLUMN_NAME);
return Collections.singletonList(new MapBasedInputRow(timestamp.getMillis(), dimensions, intermediateRow));
}
@ -209,8 +209,9 @@ public class DruidSegmentReader extends IntermediateRowParsingReader<Map<String,
{
this.cursor = cursor;
timestampColumnSelector =
cursor.getColumnSelectorFactory().makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME);
timestampColumnSelector = cursor
.getColumnSelectorFactory()
.makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME);
dimSelectors = new HashMap<>();
for (String dim : dimensionNames) {
@ -225,8 +226,9 @@ public class DruidSegmentReader extends IntermediateRowParsingReader<Map<String,
metSelectors = new HashMap<>();
for (String metric : metricNames) {
final BaseObjectColumnValueSelector metricSelector =
cursor.getColumnSelectorFactory().makeColumnValueSelector(metric);
final BaseObjectColumnValueSelector metricSelector = cursor
.getColumnSelectorFactory()
.makeColumnValueSelector(metric);
metSelectors.put(metric, metricSelector);
}
}
@ -240,9 +242,10 @@ public class DruidSegmentReader extends IntermediateRowParsingReader<Map<String,
@Override
public Map<String, Object> next()
{
if (!hasNext()) {
throw new NoSuchElementException();
}
final Map<String, Object> theEvent = Maps.newLinkedHashMap();
final long timestamp = timestampColumnSelector.getLong();
theEvent.put(TimestampSpec.DEFAULT_COLUMN, DateTimes.utc(timestamp));
for (Entry<String, DimensionSelector> dimSelector : dimSelectors.entrySet()) {
final String dim = dimSelector.getKey();
@ -270,6 +273,15 @@ public class DruidSegmentReader extends IntermediateRowParsingReader<Map<String,
theEvent.put(metric, value);
}
}
// Timestamp is added last because we expect that the time column will always be a date time object.
// If it is added earlier, it can be overwritten by metrics or dimenstions with the same name.
//
// If a user names a metric or dimension `__time` it will be overwritten. This case should be rare since
// __time is reserved for the time column in druid segments.
final long timestamp = timestampColumnSelector.getLong();
theEvent.put(ColumnHolder.TIME_COLUMN_NAME, DateTimes.utc(timestamp));
cursor.advance();
return theEvent;
}

View File

@ -66,7 +66,40 @@ Integration tests can also be run with either Java 8 or Java 11 by adding -Djvm.
can either be 8 or 11.
Druid's configuration (using Docker) can be overrided by providing -Doverride.config.path=<PATH_TO_FILE>.
The file must contain one property per line, the key must start with druid_ and the format should be snake case.
The file must contain one property per line, the key must start with `druid_` and the format should be snake case.
Running Tests Using A Quickstart Cluster
-------------------
When writing integration tests, it can be helpful to test against a quickstart
cluster so that you can set up remote debugging with in your developer
environment. This section walks you through setting up the integration tests
so that it can run against a quickstart cluster running on your development
machine.
Note that not all features run by default on a quickstart cluster, so it may
not make sense to run the entire test suite against this configuration.
Make sure you have at least 6GB of memory available before you run the tests.
The tests rely on files in the test/resources folder to exist under the path /resources,
so create a symlink to make them available
```
ln -s ${DRUID_HOME}/integration-tests/src/test/resources /resources
```
Set the cluster config file environment variable to the quickstart config
```
export CONFIG_FILE=${DRUID_HOME}/integration-tests/quickstart-it.json
```
Note that quickstart does not run with ssl, so to trick the integration tests
we specify the `*_tls_url` in the config to be the same as the http url
Then run the tests using a command similar to
```
mvn verify -P int-tests-config-file -Dit.test=<test_name>
```
Running Tests Using A Configuration File for Any Cluster
-------------------
@ -90,7 +123,7 @@ To run tests on any druid cluster that is already running, create a configuratio
"cloud_path": "<(optional) cloud_path for test data if running cloud integration test>",
}
Set the environment variable CONFIG_FILE to the name of the configuration file:
Set the environment variable `CONFIG_FILE` to the name of the configuration file:
```
export CONFIG_FILE=<config file name>
```

View File

@ -0,0 +1,16 @@
{
"broker_host" : "localhost",
"broker_port" : "8082",
"broker_tls_url" : "http://localhost:8082",
"router_host" : "localhost",
"router_port" : "8888",
"router_tls_url" : "http://localhost:8888",
"indexer_host" : "localhost",
"indexer_port" : "8081",
"historical_host" : "localhost",
"historical_port" : "8083",
"coordinator_host" : "localhost",
"coordinator_port" : "8081",
"middlemanager_host": "localhost",
"zookeeper_hosts": "localhost:2181"
}

View File

@ -28,13 +28,14 @@ import org.apache.druid.testing.IntegrationTestingConfig;
import org.apache.druid.testing.guice.DruidTestModuleFactory;
import org.apache.druid.testing.utils.ITRetryUtil;
import org.apache.druid.tests.TestNGGroup;
import org.testng.annotations.BeforeSuite;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Guice;
import org.testng.annotations.Test;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.lang.reflect.Method;
import java.nio.charset.StandardCharsets;
import java.util.List;
@ -46,33 +47,47 @@ public class ITCompactionTaskTest extends AbstractIndexerTest
private static final String INDEX_TASK = "/indexer/wikipedia_index_task.json";
private static final String INDEX_QUERIES_RESOURCE = "/indexer/wikipedia_index_queries.json";
private static final String INDEX_DATASOURCE = "wikipedia_index_test";
private static final String COMPACTION_TASK = "/indexer/wikipedia_compaction_task.json";
private static final String INDEX_TASK_WITH_TIMESTAMP = "/indexer/wikipedia_with_timestamp_index_task.json";
@Inject
private IntegrationTestingConfig config;
private String fullDatasourceName;
@BeforeSuite
public void setFullDatasourceName()
@BeforeMethod
public void setFullDatasourceName(Method method)
{
fullDatasourceName = INDEX_DATASOURCE + config.getExtraDatasourceNameSuffix();
fullDatasourceName = INDEX_DATASOURCE + config.getExtraDatasourceNameSuffix() + "-" + method.getName();
}
@Test
public void testCompaction() throws Exception
{
loadData();
loadDataAndCompact(INDEX_TASK, INDEX_QUERIES_RESOURCE);
}
@Test
public void testCompactionWithTimestampDimension() throws Exception
{
loadDataAndCompact(INDEX_TASK_WITH_TIMESTAMP, INDEX_QUERIES_RESOURCE);
}
private void loadDataAndCompact(String indexTask, String queriesResource) throws Exception
{
loadData(indexTask);
final List<String> intervalsBeforeCompaction = coordinator.getSegmentIntervals(fullDatasourceName);
intervalsBeforeCompaction.sort(null);
try (final Closeable ignored = unloader(fullDatasourceName)) {
String queryResponseTemplate;
try {
InputStream is = AbstractITBatchIndexTest.class.getResourceAsStream(INDEX_QUERIES_RESOURCE);
InputStream is = AbstractITBatchIndexTest.class.getResourceAsStream(queriesResource);
queryResponseTemplate = IOUtils.toString(is, StandardCharsets.UTF_8);
}
catch (IOException e) {
throw new ISE(e, "could not read query file: %s", INDEX_QUERIES_RESOURCE);
throw new ISE(e, "could not read query file: %s", queriesResource);
}
queryResponseTemplate = StringUtils.replace(
@ -92,10 +107,9 @@ public class ITCompactionTaskTest extends AbstractIndexerTest
checkCompactionIntervals(intervalsBeforeCompaction);
}
}
private void loadData() throws Exception
private void loadData(String indexTask) throws Exception
{
String taskSpec = getResourceAsString(INDEX_TASK);
String taskSpec = getResourceAsString(indexTask);
taskSpec = StringUtils.replace(taskSpec, "%%DATASOURCE%%", fullDatasourceName);
final String taskID = indexer.submitTask(taskSpec);
LOG.info("TaskID for loading index task %s", taskID);

View File

@ -34,16 +34,26 @@ public class ITIndexerTest extends AbstractITBatchIndexTest
private static final String INDEX_QUERIES_RESOURCE = "/indexer/wikipedia_index_queries.json";
private static final String INDEX_DATASOURCE = "wikipedia_index_test";
private static final String INDEX_WITH_TIMESTAMP_TASK = "/indexer/wikipedia_with_timestamp_index_task.json";
// TODO: add queries that validate timestamp is different from the __time column since it is a dimension
// TODO: https://github.com/apache/druid/issues/9565
private static final String INDEX_WITH_TIMESTAMP_QUERIES_RESOURCE = "/indexer/wikipedia_index_queries.json";
private static final String INDEX_WITH_TIMESTAMP_DATASOURCE = "wikipedia_with_timestamp_index_test";
private static final String REINDEX_TASK = "/indexer/wikipedia_reindex_task.json";
private static final String REINDEX_TASK_WITH_DRUID_INPUT_SOURCE = "/indexer/wikipedia_reindex_druid_input_source_task.json";
private static final String REINDEX_QUERIES_RESOURCE = "/indexer/wikipedia_reindex_queries.json";
private static final String REINDEX_DATASOURCE = "wikipedia_reindex_test";
@Test
public void testIndexData() throws Exception
{
final String reindexDatasource = REINDEX_DATASOURCE + "-testIndexData";
final String reindexDatasourceWithDruidInputSource = REINDEX_DATASOURCE + "-testIndexData-druidInputSource";
try (
final Closeable ignored1 = unloader(INDEX_DATASOURCE + config.getExtraDatasourceNameSuffix());
final Closeable ignored2 = unloader(REINDEX_DATASOURCE + config.getExtraDatasourceNameSuffix())
final Closeable ignored2 = unloader(reindexDatasource + config.getExtraDatasourceNameSuffix());
final Closeable ignored3 = unloader(reindexDatasourceWithDruidInputSource + config.getExtraDatasourceNameSuffix())
) {
doIndexTest(
INDEX_DATASOURCE,
@ -55,10 +65,49 @@ public class ITIndexerTest extends AbstractITBatchIndexTest
);
doReindexTest(
INDEX_DATASOURCE,
REINDEX_DATASOURCE,
reindexDatasource,
REINDEX_TASK,
REINDEX_QUERIES_RESOURCE
);
doReindexTest(
INDEX_DATASOURCE,
reindexDatasourceWithDruidInputSource,
REINDEX_TASK_WITH_DRUID_INPUT_SOURCE,
REINDEX_QUERIES_RESOURCE
);
}
}
@Test
public void testReIndexDataWithTimestamp() throws Exception
{
final String reindexDatasource = REINDEX_DATASOURCE + "-testReIndexDataWithTimestamp";
final String reindexDatasourceWithDruidInputSource = REINDEX_DATASOURCE + "-testReIndexDataWithTimestamp-druidInputSource";
try (
final Closeable ignored1 = unloader(INDEX_WITH_TIMESTAMP_DATASOURCE + config.getExtraDatasourceNameSuffix());
final Closeable ignored2 = unloader(reindexDatasource + config.getExtraDatasourceNameSuffix());
final Closeable ignored3 = unloader(reindexDatasourceWithDruidInputSource + config.getExtraDatasourceNameSuffix())
) {
doIndexTest(
INDEX_WITH_TIMESTAMP_DATASOURCE,
INDEX_WITH_TIMESTAMP_TASK,
INDEX_WITH_TIMESTAMP_QUERIES_RESOURCE,
false,
true,
true
);
doReindexTest(
INDEX_WITH_TIMESTAMP_DATASOURCE,
reindexDatasource,
REINDEX_TASK,
REINDEX_QUERIES_RESOURCE
);
doReindexTest(
INDEX_WITH_TIMESTAMP_DATASOURCE,
reindexDatasourceWithDruidInputSource,
REINDEX_TASK_WITH_DRUID_INPUT_SOURCE,
REINDEX_QUERIES_RESOURCE
);
}
}
}

View File

@ -0,0 +1,51 @@
{
"type": "index",
"spec": {
"ioConfig": {
"type": "index",
"inputSource": {
"type": "druid",
"dataSource": "%%DATASOURCE%%",
"interval": "2013-08-31/2013-09-01"
}
},
"tuningConfig": {
"type": "index",
"partitionsSpec": {
"type": "dynamic"
}
},
"dataSchema": {
"dataSource": "%%REINDEX_DATASOURCE%%",
"granularitySpec": {
"type": "uniform",
"queryGranularity": "SECOND",
"segmentGranularity": "DAY"
},
"timestampSpec": {
"column": "__time",
"format": "iso"
},
"dimensionsSpec": {
"dimensionExclusions" : ["robot", "continent"]
},
"metricsSpec": [
{
"type": "doubleSum",
"name": "added",
"fieldName": "added"
},
{
"type": "doubleSum",
"name": "deleted",
"fieldName": "deleted"
},
{
"type": "doubleSum",
"name": "delta",
"fieldName": "delta"
}
]
}
}
}

View File

@ -0,0 +1,86 @@
{
"type": "index",
"spec": {
"dataSchema": {
"dataSource": "%%DATASOURCE%%",
"metricsSpec": [
{
"type": "count",
"name": "count"
},
{
"type": "doubleSum",
"name": "added",
"fieldName": "added"
},
{
"type": "doubleSum",
"name": "deleted",
"fieldName": "deleted"
},
{
"type": "doubleSum",
"name": "delta",
"fieldName": "delta"
},
{
"name": "thetaSketch",
"type": "thetaSketch",
"fieldName": "user"
},
{
"name": "quantilesDoublesSketch",
"type": "quantilesDoublesSketch",
"fieldName": "delta"
},
{
"name": "HLLSketchBuild",
"type": "HLLSketchBuild",
"fieldName": "user"
}
],
"granularitySpec": {
"segmentGranularity": "DAY",
"queryGranularity": "second",
"intervals" : [ "2013-08-31/2013-09-02" ]
},
"parser": {
"parseSpec": {
"format" : "json",
"timestampSpec": {
"column": "timestamp"
},
"dimensionsSpec": {
"dimensions": [
"page",
{"type": "string", "name": "language", "createBitmapIndex": false},
"user",
"unpatrolled",
"newPage",
"robot",
"anonymous",
"namespace",
"continent",
"country",
"region",
"city",
"timestamp"
]
}
}
}
},
"ioConfig": {
"type": "index",
"firehose": {
"type": "local",
"baseDir": "/resources/data/batch_index",
"filter": "wikipedia_index_data*"
}
},
"tuningConfig": {
"type": "index",
"maxRowsPerSegment": 3
}
}
}