Add an integration test for null-only columns (#12365)

* integration test for null-only-columns

* metadata query

* fix test
This commit is contained in:
Jihoon Son 2022-03-28 16:40:45 -07:00 committed by GitHub
parent 9ed7aa33ec
commit 49a3f4291a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 273 additions and 11 deletions

View File

@ -49,31 +49,31 @@ public class TaskResponseObject
this.status = status;
}
@SuppressWarnings("unused") // Used by Jackson serialization?
@JsonProperty
public String getId()
{
return id;
}
@SuppressWarnings("unused") // Used by Jackson serialization?
@JsonProperty
public String getType()
{
return type;
}
@SuppressWarnings("unused") // Used by Jackson serialization?
@JsonProperty
public DateTime getCreatedTime()
{
return createdTime;
}
@SuppressWarnings("unused") // Used by Jackson serialization?
@JsonProperty
public DateTime getQueueInsertionTime()
{
return queueInsertionTime;
}
@SuppressWarnings("unused") // Used by Jackson serialization?
@JsonProperty
public TaskState getStatus()
{
return status;

View File

@ -31,7 +31,9 @@ import org.apache.druid.testing.IntegrationTestingConfig;
import org.apache.druid.testing.clients.CoordinatorResourceTestClient;
import org.apache.druid.testing.clients.OverlordResourceTestClient;
import org.apache.druid.testing.clients.TaskResponseObject;
import org.apache.druid.testing.utils.DataLoaderHelper;
import org.apache.druid.testing.utils.ITRetryUtil;
import org.apache.druid.testing.utils.SqlTestQueryHelper;
import org.apache.druid.testing.utils.TestQueryHelper;
import org.joda.time.Interval;
@ -62,6 +64,10 @@ public abstract class AbstractIndexerTest
protected ObjectMapper smileMapper;
@Inject
protected TestQueryHelper queryHelper;
@Inject
protected SqlTestQueryHelper sqlQueryHelper;
@Inject
protected DataLoaderHelper dataLoaderHelper;
@Inject
protected IntegrationTestingConfig config;

View File

@ -29,6 +29,7 @@ import org.apache.druid.testing.utils.KafkaUtil;
import org.apache.druid.testing.utils.StreamAdminClient;
import org.apache.druid.testing.utils.StreamEventWriter;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.function.Function;
@ -53,6 +54,7 @@ public abstract class AbstractKafkaIndexingServiceTest extends AbstractStreamInd
String fullDatasourceName,
String parserType,
String parserOrInputFormat,
List<String> dimensions,
IntegrationTestingConfig config
)
{
@ -117,13 +119,16 @@ public abstract class AbstractKafkaIndexingServiceTest extends AbstractStreamInd
"%%STREAM_PROPERTIES_KEY%%",
"consumerProperties"
);
spec = StringUtils.replace(
spec,
"%%SCHEMA_REGISTRY_HOST%%",
StringUtils.format("http://%s", config.getSchemaRegistryInternalHost())
);
spec = StringUtils.replace(
spec,
"%%DIMENSIONS%%",
jsonMapper.writeValueAsString(dimensions)
);
return StringUtils.replace(
spec,
"%%STREAM_PROPERTIES_VALUE%%",

View File

@ -28,6 +28,7 @@ import org.apache.druid.testing.utils.StreamAdminClient;
import org.apache.druid.testing.utils.StreamEventWriter;
import javax.annotation.Nullable;
import java.util.List;
import java.util.function.Function;
public abstract class AbstractKinesisIndexingServiceTest extends AbstractStreamIndexingTest
@ -59,6 +60,7 @@ public abstract class AbstractKinesisIndexingServiceTest extends AbstractStreamI
String fullDatasourceName,
String parserType,
String parserOrInputFormat,
List<String> dimensions,
IntegrationTestingConfig config
)
{
@ -122,6 +124,11 @@ public abstract class AbstractKinesisIndexingServiceTest extends AbstractStreamI
"%%SCHEMA_REGISTRY_HOST%%",
StringUtils.format("http://%s", config.getSchemaRegistryInternalHost())
);
spec = StringUtils.replace(
spec,
"%%DIMENSIONS%%",
jsonMapper.writeValueAsString(dimensions)
);
return StringUtils.replace(
spec,
"%%STREAM_PROPERTIES_VALUE%%",

View File

@ -19,6 +19,7 @@
package org.apache.druid.tests.indexer;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.inject.Inject;
@ -73,7 +74,7 @@ public abstract class AbstractStreamIndexingTest extends AbstractIndexerTest
// The value to this tag is a timestamp that can be used by a lambda function to remove unused stream.
private static final String STREAM_EXPIRE_TAG = "druid-ci-expire-after";
private static final int STREAM_SHARD_COUNT = 2;
private static final long CYCLE_PADDING_MS = 100;
protected static final long CYCLE_PADDING_MS = 100;
private static final String QUERIES_FILE = "/stream/queries/stream_index_queries.json";
private static final String SUPERVISOR_SPEC_TEMPLATE_FILE = "supervisor_spec_template.json";
@ -93,9 +94,24 @@ public abstract class AbstractStreamIndexingTest extends AbstractIndexerTest
protected static final String INPUT_FORMAT = "inputFormat";
protected static final String INPUT_ROW_PARSER = "parser";
private static final String JSON_INPUT_FORMAT_PATH =
protected static final String JSON_INPUT_FORMAT_PATH =
String.join("/", DATA_RESOURCE_ROOT, "json", INPUT_FORMAT_SPEC_DIR, "input_format.json");
protected static final List<String> DEFAULT_DIMENSIONS = ImmutableList.of(
"page",
"language",
"user",
"unpatrolled",
"newPage",
"robot",
"anonymous",
"namespace",
"continent",
"country",
"region",
"city"
);
@Inject
private DruidClusterAdminClient druidClusterAdminClient;
@ -117,6 +133,7 @@ public abstract class AbstractStreamIndexingTest extends AbstractIndexerTest
String fullDatasourceName,
String parserType,
String parserOrInputFormat,
List<String> dimensions,
IntegrationTestingConfig config
);
@ -625,7 +642,7 @@ public abstract class AbstractStreamIndexingTest extends AbstractIndexerTest
}
}
private void verifyIngestedData(GeneratedTestConfig generatedTestConfig, long numWritten) throws Exception
protected void verifyIngestedData(GeneratedTestConfig generatedTestConfig, long numWritten) throws Exception
{
// Wait for supervisor to consume events
LOG.info("Waiting for stream indexing tasks to consume events");
@ -720,6 +737,11 @@ public abstract class AbstractStreamIndexingTest extends AbstractIndexerTest
private Function<String, String> streamQueryPropsTransform;
public GeneratedTestConfig(String parserType, String parserOrInputFormat) throws Exception
{
this(parserType, parserOrInputFormat, DEFAULT_DIMENSIONS);
}
public GeneratedTestConfig(String parserType, String parserOrInputFormat, List<String> dimensions) throws Exception
{
streamName = getTestNamePrefix() + "_index_test_" + UUID.randomUUID();
String datasource = getTestNamePrefix() + "_indexing_service_test_" + UUID.randomUUID();
@ -741,6 +763,7 @@ public abstract class AbstractStreamIndexingTest extends AbstractIndexerTest
fullDatasourceName,
parserType,
parserOrInputFormat,
dimensions,
config
);
streamQueryPropsTransform = generateStreamQueryPropsTransform(streamName, fullDatasourceName);

View File

@ -0,0 +1,221 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.tests.indexer;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager.BasicState;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.sql.http.SqlQuery;
import org.apache.druid.testing.clients.TaskResponseObject;
import org.apache.druid.testing.guice.DruidTestModuleFactory;
import org.apache.druid.testing.utils.ITRetryUtil;
import org.apache.druid.testing.utils.JsonEventSerializer;
import org.apache.druid.testing.utils.SqlQueryWithResults;
import org.apache.druid.testing.utils.StreamEventWriter;
import org.apache.druid.testing.utils.StreamGenerator;
import org.apache.druid.testing.utils.WikipediaStreamEventStreamGenerator;
import org.apache.druid.tests.TestNGGroup;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Guice;
import org.testng.annotations.Test;
import java.io.Closeable;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
@Test(groups = TestNGGroup.TRANSACTIONAL_KAFKA_INDEX_SLOW)
@Guice(moduleFactory = DruidTestModuleFactory.class)
public class ITNilColumnTest extends AbstractKafkaIndexingServiceTest
{
private static final Logger LOG = new Logger(ITNilColumnTest.class);
private static final String NIL_DIM1 = "nilDim1";
private static final String NIL_DIM2 = "nilDim2";
private final List<String> dimensions;
public ITNilColumnTest()
{
this.dimensions = new ArrayList<>(DEFAULT_DIMENSIONS.size() + 2);
dimensions.add(NIL_DIM1);
dimensions.addAll(DEFAULT_DIMENSIONS);
dimensions.add(NIL_DIM2);
}
@Override
public String getTestNamePrefix()
{
return "nil-column-test";
}
@BeforeClass
public void beforeClass() throws Exception
{
doBeforeClass();
}
@Test
public void testQueryNilColumnBeforeAndAfterPublishingSegments() throws Exception
{
final GeneratedTestConfig generatedTestConfig = new GeneratedTestConfig(
INPUT_FORMAT,
getResourceAsString(JSON_INPUT_FORMAT_PATH),
dimensions
);
try (
final Closeable closer = createResourceCloser(generatedTestConfig);
final StreamEventWriter streamEventWriter = createStreamEventWriter(config, true)
) {
final String taskSpec = generatedTestConfig.getStreamIngestionPropsTransform()
.apply(getResourceAsString(SUPERVISOR_SPEC_TEMPLATE_PATH));
LOG.info("supervisorSpec: [%s]\n", taskSpec);
// Start supervisor
generatedTestConfig.setSupervisorId(indexer.submitSupervisor(taskSpec));
LOG.info("Submitted supervisor");
// Start generating half of the data
final StreamGenerator streamGenerator = new WikipediaStreamEventStreamGenerator(
new JsonEventSerializer(jsonMapper),
EVENTS_PER_SECOND,
CYCLE_PADDING_MS
);
long numWritten = streamGenerator.run(
generatedTestConfig.getStreamName(),
streamEventWriter,
TOTAL_NUMBER_OF_SECOND,
FIRST_EVENT_TIME
);
// Verify supervisor is healthy before suspension
ITRetryUtil.retryUntil(
() -> BasicState.RUNNING.equals(indexer.getSupervisorStatus(generatedTestConfig.getSupervisorId())),
true,
10000,
30,
"Waiting for supervisor to be healthy"
);
// 60 events should have been ingested as per EVENTS_PER_SECOND and TOTAL_NUMBER_OF_SECOND.
// Since maxRowsInMemory is set to 500,000, every row should be in incrementalIndex.
// So, let's test if SQL finds nil dimensions from incrementalIndexes.
dataLoaderHelper.waitUntilDatasourceIsReady(generatedTestConfig.getFullDatasourceName());
final List<SqlQueryWithResults> queryWithResults = getQueryWithResults(generatedTestConfig);
sqlQueryHelper.testQueriesFromString(jsonMapper.writeValueAsString(queryWithResults));
final List<SqlQueryWithResults> metadataQueryWithResults = getMetadataQueryWithResults(generatedTestConfig);
sqlQueryHelper.testQueriesFromString(jsonMapper.writeValueAsString(metadataQueryWithResults));
// Suspend the supervisor
indexer.terminateSupervisor(generatedTestConfig.getSupervisorId());
ITRetryUtil.retryUntilTrue(
() -> {
List<TaskResponseObject> tasks = indexer
.getRunningTasks()
.stream()
.filter(task -> task.getId().contains(generatedTestConfig.getFullDatasourceName()))
.filter(task -> "index_kafka".equals(task.getType()))
.collect(Collectors.toList());
LOG.info("[%s] tasks are running", tasks.stream().map(task -> {
try {
return jsonMapper.writeValueAsString(task);
}
catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}).collect(Collectors.toList()));
return tasks.isEmpty();
},
"Waiting for all tasks to stop"
);
// Now, we should have published all segments.
// Let's test if SQL finds nil dimensions from queryableIndexes.
dataLoaderHelper.waitUntilDatasourceIsReady(generatedTestConfig.getFullDatasourceName());
verifyIngestedData(generatedTestConfig, numWritten);
sqlQueryHelper.testQueriesFromString(jsonMapper.writeValueAsString(queryWithResults));
sqlQueryHelper.testQueriesFromString(jsonMapper.writeValueAsString(metadataQueryWithResults));
}
}
private static List<SqlQueryWithResults> getQueryWithResults(GeneratedTestConfig generatedTestConfig)
{
return ImmutableList.of(
new SqlQueryWithResults(
new SqlQuery(
StringUtils.format(
"SELECT count(*) FROM \"%s\" WHERE %s IS NOT NULL OR %s IS NOT NULL",
generatedTestConfig.getFullDatasourceName(),
NIL_DIM1,
NIL_DIM2
),
null,
false,
false,
false,
null,
null
),
ImmutableList.of(ImmutableMap.of("EXPR$0", 0))
)
);
}
private List<SqlQueryWithResults> getMetadataQueryWithResults(GeneratedTestConfig generatedTestConfig)
{
return ImmutableList.of(
new SqlQueryWithResults(
new SqlQuery(
StringUtils.format(
"SELECT COLUMN_NAME, IS_NULLABLE, DATA_TYPE"
+ " FROM INFORMATION_SCHEMA.COLUMNS"
+ " WHERE TABLE_NAME = '%s' AND COLUMN_NAME IN ('%s', '%s')",
generatedTestConfig.getFullDatasourceName(),
NIL_DIM1,
NIL_DIM2
),
null,
false,
false,
false,
null,
null
),
ImmutableList.of(
ImmutableMap.of(
"COLUMN_NAME",
NIL_DIM1,
"IS_NULLABLE",
"YES",
"DATA_TYPE",
"VARCHAR"
),
ImmutableMap.of(
"COLUMN_NAME",
NIL_DIM2,
"IS_NULLABLE",
"YES",
"DATA_TYPE",
"VARCHAR"
)
)
)
);
}
}

View File

@ -8,7 +8,7 @@
"format": "auto"
},
"dimensionsSpec": {
"dimensions": ["page", "language", "user", "unpatrolled", "newPage", "robot", "anonymous", "namespace", "continent", "country", "region", "city"],
"dimensions": %%DIMENSIONS%%,
"dimensionExclusions": [],
"spatialDimensions": []
},