change hadoop inputSource IT to use parallel batch ingestion (#9616)

This commit is contained in:
Maytas Monsereenusorn 2020-04-07 08:37:37 -10:00 committed by GitHub
parent d267b1c414
commit 73a6baaeb6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 12 additions and 6 deletions

View File

@ -20,6 +20,7 @@
package org.apache.druid.tests.indexer;
import com.google.common.collect.ImmutableList;
import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.testng.annotations.DataProvider;
@ -29,9 +30,9 @@ import java.util.List;
import java.util.UUID;
import java.util.function.Function;
public abstract class AbstractHdfsInputSourceSimpleIndexTest extends AbstractITBatchIndexTest
public abstract class AbstractHdfsInputSourceParallelIndexTest extends AbstractITBatchIndexTest
{
private static final String INDEX_TASK = "/indexer/wikipedia_cloud_simple_index_task.json";
private static final String INDEX_TASK = "/indexer/wikipedia_cloud_index_task.json";
private static final String INDEX_QUERIES_RESOURCE = "/indexer/wikipedia_index_queries.json";
private static final String INDEX_DATASOURCE = "wikipedia_index_test_" + UUID.randomUUID();
private static final String INPUT_SOURCE_PATHS_KEY = "paths";
@ -70,6 +71,11 @@ public abstract class AbstractHdfsInputSourceSimpleIndexTest extends AbstractITB
"%%INPUT_SOURCE_TYPE%%",
"hdfs"
);
spec = StringUtils.replace(
spec,
"%%PARTITIONS_SPEC%%",
jsonMapper.writeValueAsString(new DynamicPartitionsSpec(null, null))
);
spec = StringUtils.replace(
spec,
"%%INPUT_SOURCE_PROPERTY_KEY%%",

View File

@ -37,7 +37,7 @@ import java.util.List;
*/
@Test(groups = TestNGGroup.AZURE_DEEP_STORAGE)
@Guice(moduleFactory = DruidTestModuleFactory.class)
public class ITHdfsToAzureSimpleIndexTest extends AbstractHdfsInputSourceSimpleIndexTest
public class ITHdfsToAzureParallelIndexTest extends AbstractHdfsInputSourceParallelIndexTest
{
@Test(dataProvider = "resources")
public void testHdfsIndexData(Pair<String, List> hdfsInputSource) throws Exception

View File

@ -38,7 +38,7 @@ import java.util.List;
*/
@Test(groups = TestNGGroup.GCS_DEEP_STORAGE)
@Guice(moduleFactory = DruidTestModuleFactory.class)
public class ITHdfsToGcsSimpleIndexTest extends AbstractHdfsInputSourceSimpleIndexTest
public class ITHdfsToGcsParallelIndexTest extends AbstractHdfsInputSourceParallelIndexTest
{
@Test(dataProvider = "resources")
public void testHdfsIndexData(Pair<String, List> hdfsInputSource) throws Exception

View File

@ -36,7 +36,7 @@ import java.util.List;
*/
@Test(groups = TestNGGroup.HDFS_DEEP_STORAGE)
@Guice(moduleFactory = DruidTestModuleFactory.class)
public class ITHdfsToHdfsSimpleIndexTest extends AbstractHdfsInputSourceSimpleIndexTest
public class ITHdfsToHdfsParallelIndexTest extends AbstractHdfsInputSourceParallelIndexTest
{
@Test(dataProvider = "resources")
public void testHdfsIndexData(Pair<String, List> hdfsInputSource) throws Exception

View File

@ -37,7 +37,7 @@ import java.util.List;
*/
@Test(groups = TestNGGroup.S3_DEEP_STORAGE)
@Guice(moduleFactory = DruidTestModuleFactory.class)
public class ITHdfsToS3SimpleIndexTest extends AbstractHdfsInputSourceSimpleIndexTest
public class ITHdfsToS3ParallelIndexTest extends AbstractHdfsInputSourceParallelIndexTest
{
@Test(dataProvider = "resources")
public void testHdfsIndexData(Pair<String, List> hdfsInputSource) throws Exception