Reducing testing time for KafkaIndexTaskTest and KafkaSupervisorTest (#4352)

This commit is contained in:
Jihoon Son 2017-06-03 00:53:07 +09:00 committed by Gian Merlino
parent f876246af7
commit da32e1ae53
4 changed files with 248 additions and 187 deletions

View File

@ -121,7 +121,6 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
private static final String TYPE = "index_kafka"; private static final String TYPE = "index_kafka";
private static final Random RANDOM = new Random(); private static final Random RANDOM = new Random();
private static final long POLL_TIMEOUT = 100; private static final long POLL_TIMEOUT = 100;
private static final long POLL_RETRY_MS = 30000;
private static final long LOCK_ACQUIRE_TIMEOUT_SECONDS = 15; private static final long LOCK_ACQUIRE_TIMEOUT_SECONDS = 15;
private static final String METADATA_NEXT_PARTITIONS = "nextPartitions"; private static final String METADATA_NEXT_PARTITIONS = "nextPartitions";
@ -182,6 +181,9 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
private volatile boolean pauseRequested = false; private volatile boolean pauseRequested = false;
private volatile long pauseMillis = 0; private volatile long pauseMillis = 0;
// This value can be tuned in some tests
private long pollRetryMs = 30000;
@JsonCreator @JsonCreator
public KafkaIndexTask( public KafkaIndexTask(
@JsonProperty("id") String id, @JsonProperty("id") String id,
@ -210,6 +212,12 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
this.endOffsets.putAll(ioConfig.getEndPartitions().getPartitionOffsetMap()); this.endOffsets.putAll(ioConfig.getEndPartitions().getPartitionOffsetMap());
} }
@VisibleForTesting
void setPollRetryMs(long retryMs)
{
this.pollRetryMs = retryMs;
}
private static String makeTaskId(String dataSource, int randomBits) private static String makeTaskId(String dataSource, int randomBits)
{ {
final StringBuilder suffix = new StringBuilder(8); final StringBuilder suffix = new StringBuilder(8);
@ -1055,10 +1063,10 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
if (doReset) { if (doReset) {
sendResetRequestAndWait(resetPartitions, taskToolbox); sendResetRequestAndWait(resetPartitions, taskToolbox);
} else { } else {
log.warn("Retrying in %dms", POLL_RETRY_MS); log.warn("Retrying in %dms", pollRetryMs);
pollRetryLock.lockInterruptibly(); pollRetryLock.lockInterruptibly();
try { try {
long nanos = TimeUnit.MILLISECONDS.toNanos(POLL_RETRY_MS); long nanos = TimeUnit.MILLISECONDS.toNanos(pollRetryMs);
while (nanos > 0L && !pauseRequested && !stopRequested) { while (nanos > 0L && !pauseRequested && !stopRequested) {
nanos = isAwaitingRetry.awaitNanos(nanos); nanos = isAwaitingRetry.awaitNanos(nanos);
} }

View File

@ -122,8 +122,10 @@ import org.joda.time.DateTime;
import org.joda.time.Interval; import org.joda.time.Interval;
import org.joda.time.Period; import org.joda.time.Period;
import org.junit.After; import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule; import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.junit.rules.TemporaryFolder; import org.junit.rules.TemporaryFolder;
@ -145,66 +147,78 @@ import java.util.concurrent.TimeoutException;
@RunWith(Parameterized.class) @RunWith(Parameterized.class)
public class KafkaIndexTaskTest public class KafkaIndexTaskTest
{ {
private static final Logger log = new Logger(KafkaIndexTaskTest.class);
private static final ObjectMapper objectMapper = new DefaultObjectMapper();
private static final long POLL_RETRY_MS = 100;
private static TestingCluster zkServer;
private static TestBroker kafkaServer;
private static ServiceEmitter emitter;
private static ListeningExecutorService taskExec;
private static int topicPostfix;
private final List<Task> runningTasks = Lists.newArrayList();
private final boolean buildV9Directly; private final boolean buildV9Directly;
private long handoffConditionTimeout = 0; private long handoffConditionTimeout = 0;
private boolean reportParseExceptions = false; private boolean reportParseExceptions = false;
private boolean doHandoff = true; private boolean doHandoff = true;
private TestingCluster zkServer;
private TestBroker kafkaServer;
private ServiceEmitter emitter;
private ListeningExecutorService taskExec;
private TaskToolboxFactory toolboxFactory; private TaskToolboxFactory toolboxFactory;
private IndexerMetadataStorageCoordinator metadataStorageCoordinator; private IndexerMetadataStorageCoordinator metadataStorageCoordinator;
private TaskStorage taskStorage; private TaskStorage taskStorage;
private TaskLockbox taskLockbox; private TaskLockbox taskLockbox;
private File directory; private File directory;
private String topic;
private List<ProducerRecord<byte[], byte[]>> records;
private final List<Task> runningTasks = Lists.newArrayList(); private static final DataSchema DATA_SCHEMA = new DataSchema(
"test_ds",
private static final Logger log = new Logger(KafkaIndexTaskTest.class); objectMapper.convertValue(
private static final ObjectMapper objectMapper = new DefaultObjectMapper(); new StringInputRowParser(
new JSONParseSpec(
private static final DataSchema DATA_SCHEMA; new TimestampSpec("timestamp", "iso", null),
new DimensionsSpec(
private static final List<ProducerRecord<byte[], byte[]>> RECORDS = ImmutableList.of( DimensionsSpec.getDefaultSchemas(ImmutableList.<String>of("dim1", "dim2")),
new ProducerRecord<byte[], byte[]>("topic0", 0, null, JB("2008", "a", "y", 1.0f)), null,
new ProducerRecord<byte[], byte[]>("topic0", 0, null, JB("2009", "b", "y", 1.0f)), null
new ProducerRecord<byte[], byte[]>("topic0", 0, null, JB("2010", "c", "y", 1.0f)), ),
new ProducerRecord<byte[], byte[]>("topic0", 0, null, JB("2011", "d", "y", 1.0f)), new JSONPathSpec(true, ImmutableList.<JSONPathFieldSpec>of()),
new ProducerRecord<byte[], byte[]>("topic0", 0, null, JB("2011", "e", "y", 1.0f)), ImmutableMap.<String, Boolean>of()
new ProducerRecord<byte[], byte[]>("topic0", 0, null, "unparseable".getBytes()), ),
new ProducerRecord<byte[], byte[]>("topic0", 0, null, null), Charsets.UTF_8.name()
new ProducerRecord<byte[], byte[]>("topic0", 0, null, JB("2013", "f", "y", 1.0f)), ),
new ProducerRecord<byte[], byte[]>("topic0", 1, null, JB("2012", "g", "y", 1.0f)), Map.class
new ProducerRecord<byte[], byte[]>("topic0", 1, null, JB("2011", "h", "y", 1.0f)) ),
new AggregatorFactory[]{new CountAggregatorFactory("rows")},
new UniformGranularitySpec(Granularities.DAY, Granularities.NONE, null),
objectMapper
); );
static { private static List<ProducerRecord<byte[], byte[]>> generateRecords(String topic)
DATA_SCHEMA = new DataSchema( {
"test_ds", return ImmutableList.of(
objectMapper.convertValue( new ProducerRecord<byte[], byte[]>(topic, 0, null, JB("2008", "a", "y", 1.0f)),
new StringInputRowParser( new ProducerRecord<byte[], byte[]>(topic, 0, null, JB("2009", "b", "y", 1.0f)),
new JSONParseSpec( new ProducerRecord<byte[], byte[]>(topic, 0, null, JB("2010", "c", "y", 1.0f)),
new TimestampSpec("timestamp", "iso", null), new ProducerRecord<byte[], byte[]>(topic, 0, null, JB("2011", "d", "y", 1.0f)),
new DimensionsSpec( new ProducerRecord<byte[], byte[]>(topic, 0, null, JB("2011", "e", "y", 1.0f)),
DimensionsSpec.getDefaultSchemas(ImmutableList.<String>of("dim1", "dim2")), new ProducerRecord<byte[], byte[]>(topic, 0, null, "unparseable".getBytes()),
null, new ProducerRecord<byte[], byte[]>(topic, 0, null, null),
null new ProducerRecord<byte[], byte[]>(topic, 0, null, JB("2013", "f", "y", 1.0f)),
), new ProducerRecord<byte[], byte[]>(topic, 1, null, JB("2012", "g", "y", 1.0f)),
new JSONPathSpec(true, ImmutableList.<JSONPathFieldSpec>of()), new ProducerRecord<byte[], byte[]>(topic, 1, null, JB("2011", "h", "y", 1.0f))
ImmutableMap.<String, Boolean>of()
),
Charsets.UTF_8.name()
),
Map.class
),
new AggregatorFactory[]{new CountAggregatorFactory("rows")},
new UniformGranularitySpec(Granularities.DAY, Granularities.NONE, null),
objectMapper
); );
} }
private static String getTopicName()
{
return "topic" + topicPostfix++;
}
@Rule
public final TemporaryFolder tempFolder = new TemporaryFolder();
@Parameterized.Parameters(name = "buildV9Directly = {0}") @Parameterized.Parameters(name = "buildV9Directly = {0}")
public static Iterable<Object[]> constructorFeeder() public static Iterable<Object[]> constructorFeeder()
{ {
@ -216,14 +230,11 @@ public class KafkaIndexTaskTest
this.buildV9Directly = buildV9Directly; this.buildV9Directly = buildV9Directly;
} }
@Rule
public final TemporaryFolder tempFolder = new TemporaryFolder();
@Rule @Rule
public final TestDerbyConnector.DerbyConnectorRule derby = new TestDerbyConnector.DerbyConnectorRule(); public final TestDerbyConnector.DerbyConnectorRule derby = new TestDerbyConnector.DerbyConnectorRule();
@Before @BeforeClass
public void setUp() throws Exception public static void setupClass() throws Exception
{ {
emitter = new ServiceEmitter( emitter = new ServiceEmitter(
"service", "service",
@ -237,14 +248,12 @@ public class KafkaIndexTaskTest
emitter.start(); emitter.start();
EmittingLogger.registerEmitter(emitter); EmittingLogger.registerEmitter(emitter);
makeToolboxFactory();
zkServer = new TestingCluster(1); zkServer = new TestingCluster(1);
zkServer.start(); zkServer.start();
kafkaServer = new TestBroker( kafkaServer = new TestBroker(
zkServer.getConnectString(), zkServer.getConnectString(),
tempFolder.newFolder(), null,
1, 1,
ImmutableMap.of("num.partitions", "2") ImmutableMap.of("num.partitions", "2")
); );
@ -255,17 +264,22 @@ public class KafkaIndexTaskTest
Execs.makeThreadFactory("kafka-task-test-%d") Execs.makeThreadFactory("kafka-task-test-%d")
) )
); );
}
@Before
public void setupTest() throws IOException
{
handoffConditionTimeout = 0; handoffConditionTimeout = 0;
reportParseExceptions = false; reportParseExceptions = false;
doHandoff = true; doHandoff = true;
topic = getTopicName();
records = generateRecords(topic);
makeToolboxFactory();
} }
@After @After
public void tearDown() throws Exception public void tearDownTest()
{ {
emitter.close();
synchronized (runningTasks) { synchronized (runningTasks) {
for (Task task : runningTasks) { for (Task task : runningTasks) {
task.stopGracefully(); task.stopGracefully();
@ -274,6 +288,12 @@ public class KafkaIndexTaskTest
runningTasks.clear(); runningTasks.clear();
} }
destroyToolboxFactory();
}
@AfterClass
public static void tearDownClass() throws Exception
{
taskExec.shutdown(); taskExec.shutdown();
taskExec.awaitTermination(9999, TimeUnit.DAYS); taskExec.awaitTermination(9999, TimeUnit.DAYS);
@ -283,7 +303,7 @@ public class KafkaIndexTaskTest
zkServer.stop(); zkServer.stop();
zkServer = null; zkServer = null;
destroyToolboxFactory(); emitter.close();
} }
@Test(timeout = 60_000L) @Test(timeout = 60_000L)
@ -291,7 +311,7 @@ public class KafkaIndexTaskTest
{ {
// Insert data // Insert data
try (final KafkaProducer<byte[], byte[]> kafkaProducer = kafkaServer.newProducer()) { try (final KafkaProducer<byte[], byte[]> kafkaProducer = kafkaServer.newProducer()) {
for (ProducerRecord<byte[], byte[]> record : RECORDS) { for (ProducerRecord<byte[], byte[]> record : records) {
kafkaProducer.send(record).get(); kafkaProducer.send(record).get();
} }
} }
@ -300,8 +320,8 @@ public class KafkaIndexTaskTest
null, null,
new KafkaIOConfig( new KafkaIOConfig(
"sequence0", "sequence0",
new KafkaPartitions("topic0", ImmutableMap.of(0, 2L)), new KafkaPartitions(topic, ImmutableMap.of(0, 2L)),
new KafkaPartitions("topic0", ImmutableMap.of(0, 5L)), new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
kafkaServer.consumerProperties(), kafkaServer.consumerProperties(),
true, true,
false, false,
@ -327,7 +347,7 @@ public class KafkaIndexTaskTest
SegmentDescriptor desc2 = SD(task, "2011/P1D", 0); SegmentDescriptor desc2 = SD(task, "2011/P1D", 0);
Assert.assertEquals(ImmutableSet.of(desc1, desc2), publishedDescriptors()); Assert.assertEquals(ImmutableSet.of(desc1, desc2), publishedDescriptors());
Assert.assertEquals( Assert.assertEquals(
new KafkaDataSourceMetadata(new KafkaPartitions("topic0", ImmutableMap.of(0, 5L))), new KafkaDataSourceMetadata(new KafkaPartitions(topic, ImmutableMap.of(0, 5L))),
metadataStorageCoordinator.getDataSourceMetadata(DATA_SCHEMA.getDataSource()) metadataStorageCoordinator.getDataSourceMetadata(DATA_SCHEMA.getDataSource())
); );
@ -343,8 +363,8 @@ public class KafkaIndexTaskTest
null, null,
new KafkaIOConfig( new KafkaIOConfig(
"sequence0", "sequence0",
new KafkaPartitions("topic0", ImmutableMap.of(0, 2L)), new KafkaPartitions(topic, ImmutableMap.of(0, 2L)),
new KafkaPartitions("topic0", ImmutableMap.of(0, 5L)), new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
kafkaServer.consumerProperties(), kafkaServer.consumerProperties(),
true, true,
false, false,
@ -364,7 +384,7 @@ public class KafkaIndexTaskTest
// Insert data // Insert data
try (final KafkaProducer<byte[], byte[]> kafkaProducer = kafkaServer.newProducer()) { try (final KafkaProducer<byte[], byte[]> kafkaProducer = kafkaServer.newProducer()) {
for (ProducerRecord<byte[], byte[]> record : RECORDS) { for (ProducerRecord<byte[], byte[]> record : records) {
kafkaProducer.send(record).get(); kafkaProducer.send(record).get();
} }
} }
@ -382,7 +402,7 @@ public class KafkaIndexTaskTest
SegmentDescriptor desc2 = SD(task, "2011/P1D", 0); SegmentDescriptor desc2 = SD(task, "2011/P1D", 0);
Assert.assertEquals(ImmutableSet.of(desc1, desc2), publishedDescriptors()); Assert.assertEquals(ImmutableSet.of(desc1, desc2), publishedDescriptors());
Assert.assertEquals( Assert.assertEquals(
new KafkaDataSourceMetadata(new KafkaPartitions("topic0", ImmutableMap.of(0, 5L))), new KafkaDataSourceMetadata(new KafkaPartitions(topic, ImmutableMap.of(0, 5L))),
metadataStorageCoordinator.getDataSourceMetadata(DATA_SCHEMA.getDataSource()) metadataStorageCoordinator.getDataSourceMetadata(DATA_SCHEMA.getDataSource())
); );
@ -398,8 +418,8 @@ public class KafkaIndexTaskTest
null, null,
new KafkaIOConfig( new KafkaIOConfig(
"sequence0", "sequence0",
new KafkaPartitions("topic0", ImmutableMap.of(0, 0L)), new KafkaPartitions(topic, ImmutableMap.of(0, 0L)),
new KafkaPartitions("topic0", ImmutableMap.of(0, 5L)), new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
kafkaServer.consumerProperties(), kafkaServer.consumerProperties(),
true, true,
false, false,
@ -419,7 +439,7 @@ public class KafkaIndexTaskTest
// Insert data // Insert data
try (final KafkaProducer<byte[], byte[]> kafkaProducer = kafkaServer.newProducer()) { try (final KafkaProducer<byte[], byte[]> kafkaProducer = kafkaServer.newProducer()) {
for (ProducerRecord<byte[], byte[]> record : RECORDS) { for (ProducerRecord<byte[], byte[]> record : records) {
kafkaProducer.send(record).get(); kafkaProducer.send(record).get();
} }
} }
@ -437,7 +457,7 @@ public class KafkaIndexTaskTest
SegmentDescriptor desc2 = SD(task, "2011/P1D", 0); SegmentDescriptor desc2 = SD(task, "2011/P1D", 0);
Assert.assertEquals(ImmutableSet.of(desc1, desc2), publishedDescriptors()); Assert.assertEquals(ImmutableSet.of(desc1, desc2), publishedDescriptors());
Assert.assertEquals( Assert.assertEquals(
new KafkaDataSourceMetadata(new KafkaPartitions("topic0", ImmutableMap.of(0, 5L))), new KafkaDataSourceMetadata(new KafkaPartitions(topic, ImmutableMap.of(0, 5L))),
metadataStorageCoordinator.getDataSourceMetadata(DATA_SCHEMA.getDataSource()) metadataStorageCoordinator.getDataSourceMetadata(DATA_SCHEMA.getDataSource())
); );
@ -451,7 +471,7 @@ public class KafkaIndexTaskTest
{ {
// Insert data // Insert data
try (final KafkaProducer<byte[], byte[]> kafkaProducer = kafkaServer.newProducer()) { try (final KafkaProducer<byte[], byte[]> kafkaProducer = kafkaServer.newProducer()) {
for (ProducerRecord<byte[], byte[]> record : RECORDS) { for (ProducerRecord<byte[], byte[]> record : records) {
kafkaProducer.send(record).get(); kafkaProducer.send(record).get();
} }
} }
@ -460,8 +480,8 @@ public class KafkaIndexTaskTest
null, null,
new KafkaIOConfig( new KafkaIOConfig(
"sequence0", "sequence0",
new KafkaPartitions("topic0", ImmutableMap.of(0, 2L)), new KafkaPartitions(topic, ImmutableMap.of(0, 2L)),
new KafkaPartitions("topic0", ImmutableMap.of(0, 2L)), new KafkaPartitions(topic, ImmutableMap.of(0, 2L)),
kafkaServer.consumerProperties(), kafkaServer.consumerProperties(),
true, true,
false, false,
@ -493,7 +513,7 @@ public class KafkaIndexTaskTest
// Insert data // Insert data
try (final KafkaProducer<byte[], byte[]> kafkaProducer = kafkaServer.newProducer()) { try (final KafkaProducer<byte[], byte[]> kafkaProducer = kafkaServer.newProducer()) {
for (ProducerRecord<byte[], byte[]> record : RECORDS) { for (ProducerRecord<byte[], byte[]> record : records) {
kafkaProducer.send(record).get(); kafkaProducer.send(record).get();
} }
} }
@ -502,8 +522,8 @@ public class KafkaIndexTaskTest
null, null,
new KafkaIOConfig( new KafkaIOConfig(
"sequence0", "sequence0",
new KafkaPartitions("topic0", ImmutableMap.of(0, 2L)), new KafkaPartitions(topic, ImmutableMap.of(0, 2L)),
new KafkaPartitions("topic0", ImmutableMap.of(0, 5L)), new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
kafkaServer.consumerProperties(), kafkaServer.consumerProperties(),
true, true,
false, false,
@ -529,7 +549,7 @@ public class KafkaIndexTaskTest
SegmentDescriptor desc2 = SD(task, "2011/P1D", 0); SegmentDescriptor desc2 = SD(task, "2011/P1D", 0);
Assert.assertEquals(ImmutableSet.of(desc1, desc2), publishedDescriptors()); Assert.assertEquals(ImmutableSet.of(desc1, desc2), publishedDescriptors());
Assert.assertEquals( Assert.assertEquals(
new KafkaDataSourceMetadata(new KafkaPartitions("topic0", ImmutableMap.of(0, 5L))), new KafkaDataSourceMetadata(new KafkaPartitions(topic, ImmutableMap.of(0, 5L))),
metadataStorageCoordinator.getDataSourceMetadata(DATA_SCHEMA.getDataSource()) metadataStorageCoordinator.getDataSourceMetadata(DATA_SCHEMA.getDataSource())
); );
@ -546,7 +566,7 @@ public class KafkaIndexTaskTest
// Insert data // Insert data
try (final KafkaProducer<byte[], byte[]> kafkaProducer = kafkaServer.newProducer()) { try (final KafkaProducer<byte[], byte[]> kafkaProducer = kafkaServer.newProducer()) {
for (ProducerRecord<byte[], byte[]> record : RECORDS) { for (ProducerRecord<byte[], byte[]> record : records) {
kafkaProducer.send(record).get(); kafkaProducer.send(record).get();
} }
} }
@ -555,8 +575,8 @@ public class KafkaIndexTaskTest
null, null,
new KafkaIOConfig( new KafkaIOConfig(
"sequence0", "sequence0",
new KafkaPartitions("topic0", ImmutableMap.of(0, 2L)), new KafkaPartitions(topic, ImmutableMap.of(0, 2L)),
new KafkaPartitions("topic0", ImmutableMap.of(0, 5L)), new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
kafkaServer.consumerProperties(), kafkaServer.consumerProperties(),
true, true,
false, false,
@ -582,7 +602,7 @@ public class KafkaIndexTaskTest
SegmentDescriptor desc2 = SD(task, "2011/P1D", 0); SegmentDescriptor desc2 = SD(task, "2011/P1D", 0);
Assert.assertEquals(ImmutableSet.of(desc1, desc2), publishedDescriptors()); Assert.assertEquals(ImmutableSet.of(desc1, desc2), publishedDescriptors());
Assert.assertEquals( Assert.assertEquals(
new KafkaDataSourceMetadata(new KafkaPartitions("topic0", ImmutableMap.of(0, 5L))), new KafkaDataSourceMetadata(new KafkaPartitions(topic, ImmutableMap.of(0, 5L))),
metadataStorageCoordinator.getDataSourceMetadata(DATA_SCHEMA.getDataSource()) metadataStorageCoordinator.getDataSourceMetadata(DATA_SCHEMA.getDataSource())
); );
@ -598,7 +618,7 @@ public class KafkaIndexTaskTest
// Insert data // Insert data
try (final KafkaProducer<byte[], byte[]> kafkaProducer = kafkaServer.newProducer()) { try (final KafkaProducer<byte[], byte[]> kafkaProducer = kafkaServer.newProducer()) {
for (ProducerRecord<byte[], byte[]> record : RECORDS) { for (ProducerRecord<byte[], byte[]> record : records) {
kafkaProducer.send(record).get(); kafkaProducer.send(record).get();
} }
} }
@ -607,8 +627,8 @@ public class KafkaIndexTaskTest
null, null,
new KafkaIOConfig( new KafkaIOConfig(
"sequence0", "sequence0",
new KafkaPartitions("topic0", ImmutableMap.of(0, 2L)), new KafkaPartitions(topic, ImmutableMap.of(0, 2L)),
new KafkaPartitions("topic0", ImmutableMap.of(0, 7L)), new KafkaPartitions(topic, ImmutableMap.of(0, 7L)),
kafkaServer.consumerProperties(), kafkaServer.consumerProperties(),
true, true,
false, false,
@ -641,8 +661,8 @@ public class KafkaIndexTaskTest
null, null,
new KafkaIOConfig( new KafkaIOConfig(
"sequence0", "sequence0",
new KafkaPartitions("topic0", ImmutableMap.of(0, 2L)), new KafkaPartitions(topic, ImmutableMap.of(0, 2L)),
new KafkaPartitions("topic0", ImmutableMap.of(0, 5L)), new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
kafkaServer.consumerProperties(), kafkaServer.consumerProperties(),
true, true,
false, false,
@ -656,8 +676,8 @@ public class KafkaIndexTaskTest
null, null,
new KafkaIOConfig( new KafkaIOConfig(
"sequence0", "sequence0",
new KafkaPartitions("topic0", ImmutableMap.of(0, 2L)), new KafkaPartitions(topic, ImmutableMap.of(0, 2L)),
new KafkaPartitions("topic0", ImmutableMap.of(0, 5L)), new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
kafkaServer.consumerProperties(), kafkaServer.consumerProperties(),
true, true,
false, false,
@ -673,7 +693,7 @@ public class KafkaIndexTaskTest
// Insert data // Insert data
try (final KafkaProducer<byte[], byte[]> kafkaProducer = kafkaServer.newProducer()) { try (final KafkaProducer<byte[], byte[]> kafkaProducer = kafkaServer.newProducer()) {
for (ProducerRecord<byte[], byte[]> record : RECORDS) { for (ProducerRecord<byte[], byte[]> record : records) {
kafkaProducer.send(record).get(); kafkaProducer.send(record).get();
} }
} }
@ -695,7 +715,7 @@ public class KafkaIndexTaskTest
SegmentDescriptor desc2 = SD(task1, "2011/P1D", 0); SegmentDescriptor desc2 = SD(task1, "2011/P1D", 0);
Assert.assertEquals(ImmutableSet.of(desc1, desc2), publishedDescriptors()); Assert.assertEquals(ImmutableSet.of(desc1, desc2), publishedDescriptors());
Assert.assertEquals( Assert.assertEquals(
new KafkaDataSourceMetadata(new KafkaPartitions("topic0", ImmutableMap.of(0, 5L))), new KafkaDataSourceMetadata(new KafkaPartitions(topic, ImmutableMap.of(0, 5L))),
metadataStorageCoordinator.getDataSourceMetadata(DATA_SCHEMA.getDataSource()) metadataStorageCoordinator.getDataSourceMetadata(DATA_SCHEMA.getDataSource())
); );
@ -711,8 +731,8 @@ public class KafkaIndexTaskTest
null, null,
new KafkaIOConfig( new KafkaIOConfig(
"sequence0", "sequence0",
new KafkaPartitions("topic0", ImmutableMap.of(0, 2L)), new KafkaPartitions(topic, ImmutableMap.of(0, 2L)),
new KafkaPartitions("topic0", ImmutableMap.of(0, 5L)), new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
kafkaServer.consumerProperties(), kafkaServer.consumerProperties(),
true, true,
false, false,
@ -726,8 +746,8 @@ public class KafkaIndexTaskTest
null, null,
new KafkaIOConfig( new KafkaIOConfig(
"sequence1", "sequence1",
new KafkaPartitions("topic0", ImmutableMap.of(0, 3L)), new KafkaPartitions(topic, ImmutableMap.of(0, 3L)),
new KafkaPartitions("topic0", ImmutableMap.of(0, 8L)), new KafkaPartitions(topic, ImmutableMap.of(0, 8L)),
kafkaServer.consumerProperties(), kafkaServer.consumerProperties(),
true, true,
false, false,
@ -740,7 +760,7 @@ public class KafkaIndexTaskTest
// Insert data // Insert data
try (final KafkaProducer<byte[], byte[]> kafkaProducer = kafkaServer.newProducer()) { try (final KafkaProducer<byte[], byte[]> kafkaProducer = kafkaServer.newProducer()) {
for (ProducerRecord<byte[], byte[]> record : RECORDS) { for (ProducerRecord<byte[], byte[]> record : records) {
kafkaProducer.send(record).get(); kafkaProducer.send(record).get();
} }
} }
@ -766,7 +786,7 @@ public class KafkaIndexTaskTest
SegmentDescriptor desc2 = SD(task1, "2011/P1D", 0); SegmentDescriptor desc2 = SD(task1, "2011/P1D", 0);
Assert.assertEquals(ImmutableSet.of(desc1, desc2), publishedDescriptors()); Assert.assertEquals(ImmutableSet.of(desc1, desc2), publishedDescriptors());
Assert.assertEquals( Assert.assertEquals(
new KafkaDataSourceMetadata(new KafkaPartitions("topic0", ImmutableMap.of(0, 5L))), new KafkaDataSourceMetadata(new KafkaPartitions(topic, ImmutableMap.of(0, 5L))),
metadataStorageCoordinator.getDataSourceMetadata(DATA_SCHEMA.getDataSource()) metadataStorageCoordinator.getDataSourceMetadata(DATA_SCHEMA.getDataSource())
); );
@ -782,8 +802,8 @@ public class KafkaIndexTaskTest
null, null,
new KafkaIOConfig( new KafkaIOConfig(
"sequence0", "sequence0",
new KafkaPartitions("topic0", ImmutableMap.of(0, 2L)), new KafkaPartitions(topic, ImmutableMap.of(0, 2L)),
new KafkaPartitions("topic0", ImmutableMap.of(0, 5L)), new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
kafkaServer.consumerProperties(), kafkaServer.consumerProperties(),
false, false,
false, false,
@ -797,8 +817,8 @@ public class KafkaIndexTaskTest
null, null,
new KafkaIOConfig( new KafkaIOConfig(
"sequence1", "sequence1",
new KafkaPartitions("topic0", ImmutableMap.of(0, 3L)), new KafkaPartitions(topic, ImmutableMap.of(0, 3L)),
new KafkaPartitions("topic0", ImmutableMap.of(0, 8L)), new KafkaPartitions(topic, ImmutableMap.of(0, 8L)),
kafkaServer.consumerProperties(), kafkaServer.consumerProperties(),
false, false,
false, false,
@ -811,7 +831,7 @@ public class KafkaIndexTaskTest
// Insert data // Insert data
try (final KafkaProducer<byte[], byte[]> kafkaProducer = kafkaServer.newProducer()) { try (final KafkaProducer<byte[], byte[]> kafkaProducer = kafkaServer.newProducer()) {
for (ProducerRecord<byte[], byte[]> record : RECORDS) { for (ProducerRecord<byte[], byte[]> record : records) {
kafkaProducer.send(record).get(); kafkaProducer.send(record).get();
} }
} }
@ -858,8 +878,8 @@ public class KafkaIndexTaskTest
null, null,
new KafkaIOConfig( new KafkaIOConfig(
"sequence0", "sequence0",
new KafkaPartitions("topic0", ImmutableMap.of(0, 2L, 1, 0L)), new KafkaPartitions(topic, ImmutableMap.of(0, 2L, 1, 0L)),
new KafkaPartitions("topic0", ImmutableMap.of(0, 5L, 1, 2L)), new KafkaPartitions(topic, ImmutableMap.of(0, 5L, 1, 2L)),
kafkaServer.consumerProperties(), kafkaServer.consumerProperties(),
true, true,
false, false,
@ -874,9 +894,10 @@ public class KafkaIndexTaskTest
// Insert data // Insert data
try (final KafkaProducer<byte[], byte[]> kafkaProducer = kafkaServer.newProducer()) { try (final KafkaProducer<byte[], byte[]> kafkaProducer = kafkaServer.newProducer()) {
for (ProducerRecord<byte[], byte[]> record : RECORDS) { for (ProducerRecord<byte[], byte[]> record : records) {
kafkaProducer.send(record).get(); kafkaProducer.send(record).get();
} }
kafkaProducer.flush();
} }
// Wait for tasks to exit // Wait for tasks to exit
@ -894,7 +915,7 @@ public class KafkaIndexTaskTest
SegmentDescriptor desc4 = SD(task, "2012/P1D", 0); SegmentDescriptor desc4 = SD(task, "2012/P1D", 0);
Assert.assertEquals(ImmutableSet.of(desc1, desc2, desc3, desc4), publishedDescriptors()); Assert.assertEquals(ImmutableSet.of(desc1, desc2, desc3, desc4), publishedDescriptors());
Assert.assertEquals( Assert.assertEquals(
new KafkaDataSourceMetadata(new KafkaPartitions("topic0", ImmutableMap.of(0, 5L, 1, 2L))), new KafkaDataSourceMetadata(new KafkaPartitions(topic, ImmutableMap.of(0, 5L, 1, 2L))),
metadataStorageCoordinator.getDataSourceMetadata(DATA_SCHEMA.getDataSource()) metadataStorageCoordinator.getDataSourceMetadata(DATA_SCHEMA.getDataSource())
); );
@ -916,8 +937,8 @@ public class KafkaIndexTaskTest
null, null,
new KafkaIOConfig( new KafkaIOConfig(
"sequence0", "sequence0",
new KafkaPartitions("topic0", ImmutableMap.of(0, 2L)), new KafkaPartitions(topic, ImmutableMap.of(0, 2L)),
new KafkaPartitions("topic0", ImmutableMap.of(0, 5L)), new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
kafkaServer.consumerProperties(), kafkaServer.consumerProperties(),
true, true,
false, false,
@ -931,8 +952,8 @@ public class KafkaIndexTaskTest
null, null,
new KafkaIOConfig( new KafkaIOConfig(
"sequence1", "sequence1",
new KafkaPartitions("topic0", ImmutableMap.of(1, 0L)), new KafkaPartitions(topic, ImmutableMap.of(1, 0L)),
new KafkaPartitions("topic0", ImmutableMap.of(1, 1L)), new KafkaPartitions(topic, ImmutableMap.of(1, 1L)),
kafkaServer.consumerProperties(), kafkaServer.consumerProperties(),
true, true,
false, false,
@ -948,7 +969,7 @@ public class KafkaIndexTaskTest
// Insert data // Insert data
try (final KafkaProducer<byte[], byte[]> kafkaProducer = kafkaServer.newProducer()) { try (final KafkaProducer<byte[], byte[]> kafkaProducer = kafkaServer.newProducer()) {
for (ProducerRecord<byte[], byte[]> record : RECORDS) { for (ProducerRecord<byte[], byte[]> record : records) {
kafkaProducer.send(record).get(); kafkaProducer.send(record).get();
} }
} }
@ -971,7 +992,7 @@ public class KafkaIndexTaskTest
SegmentDescriptor desc3 = SD(task2, "2012/P1D", 0); SegmentDescriptor desc3 = SD(task2, "2012/P1D", 0);
Assert.assertEquals(ImmutableSet.of(desc1, desc2, desc3), publishedDescriptors()); Assert.assertEquals(ImmutableSet.of(desc1, desc2, desc3), publishedDescriptors());
Assert.assertEquals( Assert.assertEquals(
new KafkaDataSourceMetadata(new KafkaPartitions("topic0", ImmutableMap.of(0, 5L, 1, 1L))), new KafkaDataSourceMetadata(new KafkaPartitions(topic, ImmutableMap.of(0, 5L, 1, 1L))),
metadataStorageCoordinator.getDataSourceMetadata(DATA_SCHEMA.getDataSource()) metadataStorageCoordinator.getDataSourceMetadata(DATA_SCHEMA.getDataSource())
); );
@ -988,8 +1009,8 @@ public class KafkaIndexTaskTest
null, null,
new KafkaIOConfig( new KafkaIOConfig(
"sequence0", "sequence0",
new KafkaPartitions("topic0", ImmutableMap.of(0, 2L)), new KafkaPartitions(topic, ImmutableMap.of(0, 2L)),
new KafkaPartitions("topic0", ImmutableMap.of(0, 5L)), new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
kafkaServer.consumerProperties(), kafkaServer.consumerProperties(),
true, true,
false, false,
@ -1004,7 +1025,7 @@ public class KafkaIndexTaskTest
// Insert some data, but not enough for the task to finish // Insert some data, but not enough for the task to finish
try (final KafkaProducer<byte[], byte[]> kafkaProducer = kafkaServer.newProducer()) { try (final KafkaProducer<byte[], byte[]> kafkaProducer = kafkaServer.newProducer()) {
for (ProducerRecord<byte[], byte[]> record : Iterables.limit(RECORDS, 4)) { for (ProducerRecord<byte[], byte[]> record : Iterables.limit(records, 4)) {
kafkaProducer.send(record).get(); kafkaProducer.send(record).get();
} }
} }
@ -1024,8 +1045,8 @@ public class KafkaIndexTaskTest
task1.getId(), task1.getId(),
new KafkaIOConfig( new KafkaIOConfig(
"sequence0", "sequence0",
new KafkaPartitions("topic0", ImmutableMap.of(0, 2L)), new KafkaPartitions(topic, ImmutableMap.of(0, 2L)),
new KafkaPartitions("topic0", ImmutableMap.of(0, 5L)), new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
kafkaServer.consumerProperties(), kafkaServer.consumerProperties(),
true, true,
false, false,
@ -1040,7 +1061,7 @@ public class KafkaIndexTaskTest
// Insert remaining data // Insert remaining data
try (final KafkaProducer<byte[], byte[]> kafkaProducer = kafkaServer.newProducer()) { try (final KafkaProducer<byte[], byte[]> kafkaProducer = kafkaServer.newProducer()) {
for (ProducerRecord<byte[], byte[]> record : Iterables.skip(RECORDS, 4)) { for (ProducerRecord<byte[], byte[]> record : Iterables.skip(records, 4)) {
kafkaProducer.send(record).get(); kafkaProducer.send(record).get();
} }
} }
@ -1061,7 +1082,7 @@ public class KafkaIndexTaskTest
SegmentDescriptor desc2 = SD(task1, "2011/P1D", 0); SegmentDescriptor desc2 = SD(task1, "2011/P1D", 0);
Assert.assertEquals(ImmutableSet.of(desc1, desc2), publishedDescriptors()); Assert.assertEquals(ImmutableSet.of(desc1, desc2), publishedDescriptors());
Assert.assertEquals( Assert.assertEquals(
new KafkaDataSourceMetadata(new KafkaPartitions("topic0", ImmutableMap.of(0, 5L))), new KafkaDataSourceMetadata(new KafkaPartitions(topic, ImmutableMap.of(0, 5L))),
metadataStorageCoordinator.getDataSourceMetadata(DATA_SCHEMA.getDataSource()) metadataStorageCoordinator.getDataSourceMetadata(DATA_SCHEMA.getDataSource())
); );
@ -1077,8 +1098,8 @@ public class KafkaIndexTaskTest
null, null,
new KafkaIOConfig( new KafkaIOConfig(
"sequence0", "sequence0",
new KafkaPartitions("topic0", ImmutableMap.of(0, 2L)), new KafkaPartitions(topic, ImmutableMap.of(0, 2L)),
new KafkaPartitions("topic0", ImmutableMap.of(0, 5L)), new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
kafkaServer.consumerProperties(), kafkaServer.consumerProperties(),
true, true,
false, false,
@ -1093,9 +1114,10 @@ public class KafkaIndexTaskTest
// Insert some data, but not enough for the task to finish // Insert some data, but not enough for the task to finish
try (final KafkaProducer<byte[], byte[]> kafkaProducer = kafkaServer.newProducer()) { try (final KafkaProducer<byte[], byte[]> kafkaProducer = kafkaServer.newProducer()) {
for (ProducerRecord<byte[], byte[]> record : Iterables.limit(RECORDS, 4)) { for (ProducerRecord<byte[], byte[]> record : Iterables.limit(records, 4)) {
kafkaProducer.send(record).get(); kafkaProducer.send(record).get();
} }
kafkaProducer.flush();
} }
while (countEvents(task) != 2) { while (countEvents(task) != 2) {
@ -1115,7 +1137,7 @@ public class KafkaIndexTaskTest
// Insert remaining data // Insert remaining data
try (final KafkaProducer<byte[], byte[]> kafkaProducer = kafkaServer.newProducer()) { try (final KafkaProducer<byte[], byte[]> kafkaProducer = kafkaServer.newProducer()) {
for (ProducerRecord<byte[], byte[]> record : Iterables.skip(RECORDS, 4)) { for (ProducerRecord<byte[], byte[]> record : Iterables.skip(records, 4)) {
kafkaProducer.send(record).get(); kafkaProducer.send(record).get();
} }
} }
@ -1145,7 +1167,7 @@ public class KafkaIndexTaskTest
SegmentDescriptor desc2 = SD(task, "2011/P1D", 0); SegmentDescriptor desc2 = SD(task, "2011/P1D", 0);
Assert.assertEquals(ImmutableSet.of(desc1, desc2), publishedDescriptors()); Assert.assertEquals(ImmutableSet.of(desc1, desc2), publishedDescriptors());
Assert.assertEquals( Assert.assertEquals(
new KafkaDataSourceMetadata(new KafkaPartitions("topic0", ImmutableMap.of(0, 5L))), new KafkaDataSourceMetadata(new KafkaPartitions(topic, ImmutableMap.of(0, 5L))),
metadataStorageCoordinator.getDataSourceMetadata(DATA_SCHEMA.getDataSource()) metadataStorageCoordinator.getDataSourceMetadata(DATA_SCHEMA.getDataSource())
); );
@ -1161,8 +1183,8 @@ public class KafkaIndexTaskTest
null, null,
new KafkaIOConfig( new KafkaIOConfig(
"sequence0", "sequence0",
new KafkaPartitions("topic0", ImmutableMap.of(0, 1L)), new KafkaPartitions(topic, ImmutableMap.of(0, 1L)),
new KafkaPartitions("topic0", ImmutableMap.of(0, 3L)), new KafkaPartitions(topic, ImmutableMap.of(0, 3L)),
kafkaServer.consumerProperties(), kafkaServer.consumerProperties(),
true, true,
true, true,
@ -1176,7 +1198,7 @@ public class KafkaIndexTaskTest
final ListenableFuture<TaskStatus> future = runTask(task); final ListenableFuture<TaskStatus> future = runTask(task);
try (final KafkaProducer<byte[], byte[]> kafkaProducer = kafkaServer.newProducer()) { try (final KafkaProducer<byte[], byte[]> kafkaProducer = kafkaServer.newProducer()) {
for (ProducerRecord<byte[], byte[]> record : RECORDS) { for (ProducerRecord<byte[], byte[]> record : records) {
kafkaProducer.send(record).get(); kafkaProducer.send(record).get();
} }
} }
@ -1232,7 +1254,7 @@ public class KafkaIndexTaskTest
SegmentDescriptor desc3 = SD(task, "2011/P1D", 0); SegmentDescriptor desc3 = SD(task, "2011/P1D", 0);
Assert.assertEquals(ImmutableSet.of(desc1, desc2, desc3), publishedDescriptors()); Assert.assertEquals(ImmutableSet.of(desc1, desc2, desc3), publishedDescriptors());
Assert.assertEquals( Assert.assertEquals(
new KafkaDataSourceMetadata(new KafkaPartitions("topic0", ImmutableMap.of(0, 6L))), new KafkaDataSourceMetadata(new KafkaPartitions(topic, ImmutableMap.of(0, 6L))),
metadataStorageCoordinator.getDataSourceMetadata(DATA_SCHEMA.getDataSource()) metadataStorageCoordinator.getDataSourceMetadata(DATA_SCHEMA.getDataSource())
); );
@ -1249,8 +1271,8 @@ public class KafkaIndexTaskTest
null, null,
new KafkaIOConfig( new KafkaIOConfig(
"sequence0", "sequence0",
new KafkaPartitions("topic0", ImmutableMap.of(0, 2L)), new KafkaPartitions(topic, ImmutableMap.of(0, 2L)),
new KafkaPartitions("topic0", ImmutableMap.of(0, 5L)), new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
kafkaServer.consumerProperties(), kafkaServer.consumerProperties(),
true, true,
false, false,
@ -1279,7 +1301,7 @@ public class KafkaIndexTaskTest
{ {
// Insert data // Insert data
try (final KafkaProducer<byte[], byte[]> kafkaProducer = kafkaServer.newProducer()) { try (final KafkaProducer<byte[], byte[]> kafkaProducer = kafkaServer.newProducer()) {
for (ProducerRecord<byte[], byte[]> record : RECORDS) { for (ProducerRecord<byte[], byte[]> record : records) {
kafkaProducer.send(record).get(); kafkaProducer.send(record).get();
} }
} }
@ -1288,8 +1310,8 @@ public class KafkaIndexTaskTest
null, null,
new KafkaIOConfig( new KafkaIOConfig(
"sequence0", "sequence0",
new KafkaPartitions("topic0", ImmutableMap.of(0, 200L)), new KafkaPartitions(topic, ImmutableMap.of(0, 200L)),
new KafkaPartitions("topic0", ImmutableMap.of(0, 500L)), new KafkaPartitions(topic, ImmutableMap.of(0, 500L)),
kafkaServer.consumerProperties(), kafkaServer.consumerProperties(),
true, true,
false, false,
@ -1303,15 +1325,13 @@ public class KafkaIndexTaskTest
runTask(task); runTask(task);
while (!task.getStatus().equals(KafkaIndexTask.Status.READING)) { while (!task.getStatus().equals(KafkaIndexTask.Status.READING)) {
Thread.sleep(2000); Thread.sleep(20);
} }
int i = 0; for (int i = 0; i < 5; i++) {
while(i++ < 5) {
Assert.assertEquals(task.getStatus(), KafkaIndexTask.Status.READING); Assert.assertEquals(task.getStatus(), KafkaIndexTask.Status.READING);
// Offset should not be reset // Offset should not be reset
Assert.assertTrue(task.getCurrentOffsets().get(0) == 200L); Assert.assertTrue(task.getCurrentOffsets().get(0) == 200L);
Thread.sleep(2000);
} }
} }
@ -1384,7 +1404,7 @@ public class KafkaIndexTaskTest
handoffConditionTimeout, handoffConditionTimeout,
resetOffsetAutomatically resetOffsetAutomatically
); );
return new KafkaIndexTask( final KafkaIndexTask task = new KafkaIndexTask(
taskId, taskId,
null, null,
DATA_SCHEMA, DATA_SCHEMA,
@ -1393,6 +1413,8 @@ public class KafkaIndexTaskTest
null, null,
null null
); );
task.setPollRetryMs(POLL_RETRY_MS);
return task;
} }
private QueryRunnerFactoryConglomerate makeTimeseriesOnlyConglomerate() private QueryRunnerFactoryConglomerate makeTimeseriesOnlyConglomerate()

View File

@ -79,15 +79,16 @@ import org.joda.time.Duration;
import org.joda.time.Interval; import org.joda.time.Interval;
import org.joda.time.Period; import org.joda.time.Period;
import org.junit.After; import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Rule; import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.junit.runners.Parameterized; import org.junit.runners.Parameterized;
import java.io.File; import java.io.File;
import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
@ -108,7 +109,7 @@ import static org.easymock.EasyMock.reset;
public class KafkaSupervisorTest extends EasyMockSupport public class KafkaSupervisorTest extends EasyMockSupport
{ {
private static final ObjectMapper objectMapper = new DefaultObjectMapper(); private static final ObjectMapper objectMapper = new DefaultObjectMapper();
private static final String KAFKA_TOPIC = "testTopic"; private static final String TOPIC_PREFIX = "testTopic";
private static final String DATASOURCE = "testDS"; private static final String DATASOURCE = "testDS";
private static final int NUM_PARTITIONS = 3; private static final int NUM_PARTITIONS = 3;
private static final int TEST_CHAT_THREADS = 3; private static final int TEST_CHAT_THREADS = 3;
@ -116,12 +117,15 @@ public class KafkaSupervisorTest extends EasyMockSupport
private static final Period TEST_HTTP_TIMEOUT = new Period("PT10S"); private static final Period TEST_HTTP_TIMEOUT = new Period("PT10S");
private static final Period TEST_SHUTDOWN_TIMEOUT = new Period("PT80S"); private static final Period TEST_SHUTDOWN_TIMEOUT = new Period("PT80S");
private int numThreads; private static TestingCluster zkServer;
private TestingCluster zkServer; private static TestBroker kafkaServer;
private TestBroker kafkaServer; private static String kafkaHost;
private static DataSchema dataSchema;
private static int topicPostfix;
private final int numThreads;
private KafkaSupervisor supervisor; private KafkaSupervisor supervisor;
private String kafkaHost;
private DataSchema dataSchema;
private KafkaSupervisorTuningConfig tuningConfig; private KafkaSupervisorTuningConfig tuningConfig;
private TaskStorage taskStorage; private TaskStorage taskStorage;
private TaskMaster taskMaster; private TaskMaster taskMaster;
@ -129,9 +133,12 @@ public class KafkaSupervisorTest extends EasyMockSupport
private IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator; private IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator;
private KafkaIndexTaskClient taskClient; private KafkaIndexTaskClient taskClient;
private TaskQueue taskQueue; private TaskQueue taskQueue;
private String topic;
@Rule private static String getTopic()
public final TemporaryFolder tempFolder = new TemporaryFolder(); {
return TOPIC_PREFIX + topicPostfix++;
}
@Parameterized.Parameters(name = "numThreads = {0}") @Parameterized.Parameters(name = "numThreads = {0}")
public static Iterable<Object[]> constructorFeeder() public static Iterable<Object[]> constructorFeeder()
@ -144,8 +151,26 @@ public class KafkaSupervisorTest extends EasyMockSupport
this.numThreads = numThreads; this.numThreads = numThreads;
} }
@BeforeClass
public static void setupClass() throws Exception
{
zkServer = new TestingCluster(1);
zkServer.start();
kafkaServer = new TestBroker(
zkServer.getConnectString(),
null,
1,
ImmutableMap.of("num.partitions", String.valueOf(NUM_PARTITIONS))
);
kafkaServer.start();
kafkaHost = String.format("localhost:%d", kafkaServer.getPort());
dataSchema = getDataSchema(DATASOURCE);
}
@Before @Before
public void setUp() throws Exception public void setupTest() throws Exception
{ {
taskStorage = createMock(TaskStorage.class); taskStorage = createMock(TaskStorage.class);
taskMaster = createMock(TaskMaster.class); taskMaster = createMock(TaskMaster.class);
@ -154,19 +179,6 @@ public class KafkaSupervisorTest extends EasyMockSupport
taskClient = createMock(KafkaIndexTaskClient.class); taskClient = createMock(KafkaIndexTaskClient.class);
taskQueue = createMock(TaskQueue.class); taskQueue = createMock(TaskQueue.class);
zkServer = new TestingCluster(1);
zkServer.start();
kafkaServer = new TestBroker(
zkServer.getConnectString(),
tempFolder.newFolder(),
1,
ImmutableMap.of("num.partitions", String.valueOf(NUM_PARTITIONS))
);
kafkaServer.start();
kafkaHost = String.format("localhost:%d", kafkaServer.getPort());
dataSchema = getDataSchema(DATASOURCE);
tuningConfig = new KafkaSupervisorTuningConfig( tuningConfig = new KafkaSupervisorTuningConfig(
1000, 1000,
50000, 50000,
@ -184,18 +196,24 @@ public class KafkaSupervisorTest extends EasyMockSupport
TEST_HTTP_TIMEOUT, TEST_HTTP_TIMEOUT,
TEST_SHUTDOWN_TIMEOUT TEST_SHUTDOWN_TIMEOUT
); );
topic = getTopic();
} }
@After @After
public void tearDown() throws Exception public void tearDownTest() throws Exception
{
supervisor = null;
}
@AfterClass
public static void tearDownClass() throws IOException
{ {
kafkaServer.close(); kafkaServer.close();
kafkaServer = null; kafkaServer = null;
zkServer.stop(); zkServer.stop();
zkServer = null; zkServer = null;
supervisor = null;
} }
@Test @Test
@ -234,12 +252,12 @@ public class KafkaSupervisorTest extends EasyMockSupport
Assert.assertFalse("minimumMessageTime", taskConfig.getMinimumMessageTime().isPresent()); Assert.assertFalse("minimumMessageTime", taskConfig.getMinimumMessageTime().isPresent());
Assert.assertFalse("skipOffsetGaps", taskConfig.isSkipOffsetGaps()); Assert.assertFalse("skipOffsetGaps", taskConfig.isSkipOffsetGaps());
Assert.assertEquals(KAFKA_TOPIC, taskConfig.getStartPartitions().getTopic()); Assert.assertEquals(topic, taskConfig.getStartPartitions().getTopic());
Assert.assertEquals(0L, (long) taskConfig.getStartPartitions().getPartitionOffsetMap().get(0)); Assert.assertEquals(0L, (long) taskConfig.getStartPartitions().getPartitionOffsetMap().get(0));
Assert.assertEquals(0L, (long) taskConfig.getStartPartitions().getPartitionOffsetMap().get(1)); Assert.assertEquals(0L, (long) taskConfig.getStartPartitions().getPartitionOffsetMap().get(1));
Assert.assertEquals(0L, (long) taskConfig.getStartPartitions().getPartitionOffsetMap().get(2)); Assert.assertEquals(0L, (long) taskConfig.getStartPartitions().getPartitionOffsetMap().get(2));
Assert.assertEquals(KAFKA_TOPIC, taskConfig.getEndPartitions().getTopic()); Assert.assertEquals(topic, taskConfig.getEndPartitions().getTopic());
Assert.assertEquals(Long.MAX_VALUE, (long) taskConfig.getEndPartitions().getPartitionOffsetMap().get(0)); Assert.assertEquals(Long.MAX_VALUE, (long) taskConfig.getEndPartitions().getPartitionOffsetMap().get(0));
Assert.assertEquals(Long.MAX_VALUE, (long) taskConfig.getEndPartitions().getPartitionOffsetMap().get(1)); Assert.assertEquals(Long.MAX_VALUE, (long) taskConfig.getEndPartitions().getPartitionOffsetMap().get(1));
Assert.assertEquals(Long.MAX_VALUE, (long) taskConfig.getEndPartitions().getPartitionOffsetMap().get(2)); Assert.assertEquals(Long.MAX_VALUE, (long) taskConfig.getEndPartitions().getPartitionOffsetMap().get(2));
@ -434,7 +452,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.<Task>of()).anyTimes(); expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.<Task>of()).anyTimes();
expect(indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn( expect(indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn(
new KafkaDataSourceMetadata( new KafkaDataSourceMetadata(
new KafkaPartitions(KAFKA_TOPIC, ImmutableMap.of(0, 10L, 1, 20L, 2, 30L)) new KafkaPartitions(topic, ImmutableMap.of(0, 10L, 1, 20L, 2, 30L))
) )
).anyTimes(); ).anyTimes();
expect(taskQueue.add(capture(captured))).andReturn(true); expect(taskQueue.add(capture(captured))).andReturn(true);
@ -462,7 +480,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.<Task>of()).anyTimes(); expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.<Task>of()).anyTimes();
expect(indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn( expect(indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn(
new KafkaDataSourceMetadata( new KafkaDataSourceMetadata(
new KafkaPartitions(KAFKA_TOPIC, ImmutableMap.of(0, 10L, 1, 20L, 2, 30L)) new KafkaPartitions(topic, ImmutableMap.of(0, 10L, 1, 20L, 2, 30L))
) )
).anyTimes(); ).anyTimes();
replayAll(); replayAll();
@ -931,7 +949,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
Assert.assertTrue("isUseTransaction", taskConfig.isUseTransaction()); Assert.assertTrue("isUseTransaction", taskConfig.isUseTransaction());
Assert.assertFalse("pauseAfterRead", taskConfig.isPauseAfterRead()); Assert.assertFalse("pauseAfterRead", taskConfig.isPauseAfterRead());
Assert.assertEquals(KAFKA_TOPIC, taskConfig.getStartPartitions().getTopic()); Assert.assertEquals(topic, taskConfig.getStartPartitions().getTopic());
Assert.assertEquals(10L, (long) taskConfig.getStartPartitions().getPartitionOffsetMap().get(0)); Assert.assertEquals(10L, (long) taskConfig.getStartPartitions().getPartitionOffsetMap().get(0));
Assert.assertEquals(20L, (long) taskConfig.getStartPartitions().getPartitionOffsetMap().get(1)); Assert.assertEquals(20L, (long) taskConfig.getStartPartitions().getPartitionOffsetMap().get(1));
Assert.assertEquals(35L, (long) taskConfig.getStartPartitions().getPartitionOffsetMap().get(2)); Assert.assertEquals(35L, (long) taskConfig.getStartPartitions().getPartitionOffsetMap().get(2));
@ -994,7 +1012,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
Assert.assertEquals(3600L, (long) payload.getDurationSeconds()); Assert.assertEquals(3600L, (long) payload.getDurationSeconds());
Assert.assertEquals(NUM_PARTITIONS, (int) payload.getPartitions()); Assert.assertEquals(NUM_PARTITIONS, (int) payload.getPartitions());
Assert.assertEquals(1, (int) payload.getReplicas()); Assert.assertEquals(1, (int) payload.getReplicas());
Assert.assertEquals(KAFKA_TOPIC, payload.getTopic()); Assert.assertEquals(topic, payload.getTopic());
Assert.assertEquals(0, payload.getActiveTasks().size()); Assert.assertEquals(0, payload.getActiveTasks().size());
Assert.assertEquals(1, payload.getPublishingTasks().size()); Assert.assertEquals(1, payload.getPublishingTasks().size());
@ -1016,12 +1034,12 @@ public class KafkaSupervisorTest extends EasyMockSupport
Assert.assertFalse("pauseAfterRead", capturedTaskConfig.isPauseAfterRead()); Assert.assertFalse("pauseAfterRead", capturedTaskConfig.isPauseAfterRead());
// check that the new task was created with starting offsets matching where the publishing task finished // check that the new task was created with starting offsets matching where the publishing task finished
Assert.assertEquals(KAFKA_TOPIC, capturedTaskConfig.getStartPartitions().getTopic()); Assert.assertEquals(topic, capturedTaskConfig.getStartPartitions().getTopic());
Assert.assertEquals(10L, (long) capturedTaskConfig.getStartPartitions().getPartitionOffsetMap().get(0)); Assert.assertEquals(10L, (long) capturedTaskConfig.getStartPartitions().getPartitionOffsetMap().get(0));
Assert.assertEquals(20L, (long) capturedTaskConfig.getStartPartitions().getPartitionOffsetMap().get(1)); Assert.assertEquals(20L, (long) capturedTaskConfig.getStartPartitions().getPartitionOffsetMap().get(1));
Assert.assertEquals(30L, (long) capturedTaskConfig.getStartPartitions().getPartitionOffsetMap().get(2)); Assert.assertEquals(30L, (long) capturedTaskConfig.getStartPartitions().getPartitionOffsetMap().get(2));
Assert.assertEquals(KAFKA_TOPIC, capturedTaskConfig.getEndPartitions().getTopic()); Assert.assertEquals(topic, capturedTaskConfig.getEndPartitions().getTopic());
Assert.assertEquals(Long.MAX_VALUE, (long) capturedTaskConfig.getEndPartitions().getPartitionOffsetMap().get(0)); Assert.assertEquals(Long.MAX_VALUE, (long) capturedTaskConfig.getEndPartitions().getPartitionOffsetMap().get(0));
Assert.assertEquals(Long.MAX_VALUE, (long) capturedTaskConfig.getEndPartitions().getPartitionOffsetMap().get(1)); Assert.assertEquals(Long.MAX_VALUE, (long) capturedTaskConfig.getEndPartitions().getPartitionOffsetMap().get(1));
Assert.assertEquals(Long.MAX_VALUE, (long) capturedTaskConfig.getEndPartitions().getPartitionOffsetMap().get(2)); Assert.assertEquals(Long.MAX_VALUE, (long) capturedTaskConfig.getEndPartitions().getPartitionOffsetMap().get(2));
@ -1083,7 +1101,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
Assert.assertEquals(3600L, (long) payload.getDurationSeconds()); Assert.assertEquals(3600L, (long) payload.getDurationSeconds());
Assert.assertEquals(NUM_PARTITIONS, (int) payload.getPartitions()); Assert.assertEquals(NUM_PARTITIONS, (int) payload.getPartitions());
Assert.assertEquals(1, (int) payload.getReplicas()); Assert.assertEquals(1, (int) payload.getReplicas());
Assert.assertEquals(KAFKA_TOPIC, payload.getTopic()); Assert.assertEquals(topic, payload.getTopic());
Assert.assertEquals(0, payload.getActiveTasks().size()); Assert.assertEquals(0, payload.getActiveTasks().size());
Assert.assertEquals(1, payload.getPublishingTasks().size()); Assert.assertEquals(1, payload.getPublishingTasks().size());
@ -1105,12 +1123,12 @@ public class KafkaSupervisorTest extends EasyMockSupport
Assert.assertFalse("pauseAfterRead", capturedTaskConfig.isPauseAfterRead()); Assert.assertFalse("pauseAfterRead", capturedTaskConfig.isPauseAfterRead());
// check that the new task was created with starting offsets matching where the publishing task finished // check that the new task was created with starting offsets matching where the publishing task finished
Assert.assertEquals(KAFKA_TOPIC, capturedTaskConfig.getStartPartitions().getTopic()); Assert.assertEquals(topic, capturedTaskConfig.getStartPartitions().getTopic());
Assert.assertEquals(10L, (long) capturedTaskConfig.getStartPartitions().getPartitionOffsetMap().get(0)); Assert.assertEquals(10L, (long) capturedTaskConfig.getStartPartitions().getPartitionOffsetMap().get(0));
Assert.assertEquals(0L, (long) capturedTaskConfig.getStartPartitions().getPartitionOffsetMap().get(1)); Assert.assertEquals(0L, (long) capturedTaskConfig.getStartPartitions().getPartitionOffsetMap().get(1));
Assert.assertEquals(30L, (long) capturedTaskConfig.getStartPartitions().getPartitionOffsetMap().get(2)); Assert.assertEquals(30L, (long) capturedTaskConfig.getStartPartitions().getPartitionOffsetMap().get(2));
Assert.assertEquals(KAFKA_TOPIC, capturedTaskConfig.getEndPartitions().getTopic()); Assert.assertEquals(topic, capturedTaskConfig.getEndPartitions().getTopic());
Assert.assertEquals(Long.MAX_VALUE, (long) capturedTaskConfig.getEndPartitions().getPartitionOffsetMap().get(0)); Assert.assertEquals(Long.MAX_VALUE, (long) capturedTaskConfig.getEndPartitions().getPartitionOffsetMap().get(0));
Assert.assertEquals(Long.MAX_VALUE, (long) capturedTaskConfig.getEndPartitions().getPartitionOffsetMap().get(1)); Assert.assertEquals(Long.MAX_VALUE, (long) capturedTaskConfig.getEndPartitions().getPartitionOffsetMap().get(1));
Assert.assertEquals(Long.MAX_VALUE, (long) capturedTaskConfig.getEndPartitions().getPartitionOffsetMap().get(2)); Assert.assertEquals(Long.MAX_VALUE, (long) capturedTaskConfig.getEndPartitions().getPartitionOffsetMap().get(2));
@ -1188,7 +1206,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
Assert.assertEquals(3600L, (long) payload.getDurationSeconds()); Assert.assertEquals(3600L, (long) payload.getDurationSeconds());
Assert.assertEquals(NUM_PARTITIONS, (int) payload.getPartitions()); Assert.assertEquals(NUM_PARTITIONS, (int) payload.getPartitions());
Assert.assertEquals(1, (int) payload.getReplicas()); Assert.assertEquals(1, (int) payload.getReplicas());
Assert.assertEquals(KAFKA_TOPIC, payload.getTopic()); Assert.assertEquals(topic, payload.getTopic());
Assert.assertEquals(1, payload.getActiveTasks().size()); Assert.assertEquals(1, payload.getActiveTasks().size());
Assert.assertEquals(1, payload.getPublishingTasks().size()); Assert.assertEquals(1, payload.getPublishingTasks().size());
@ -1500,7 +1518,6 @@ public class KafkaSupervisorTest extends EasyMockSupport
@Test @Test
public void testResetNoTasks() throws Exception public void testResetNoTasks() throws Exception
{ {
supervisor = getSupervisor(1, 1, true, "PT1H", null, false);
expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
expect(taskRunner.getRunningTasks()).andReturn(Collections.EMPTY_LIST).anyTimes(); expect(taskRunner.getRunningTasks()).andReturn(Collections.EMPTY_LIST).anyTimes();
@ -1508,6 +1525,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
taskRunner.registerListener(anyObject(TaskRunnerListener.class), anyObject(Executor.class)); taskRunner.registerListener(anyObject(TaskRunnerListener.class), anyObject(Executor.class));
replayAll(); replayAll();
supervisor = getSupervisor(1, 1, true, "PT1H", null, false);
supervisor.start(); supervisor.start();
supervisor.runInternal(); supervisor.runInternal();
verifyAll(); verifyAll();
@ -1540,17 +1558,17 @@ public class KafkaSupervisorTest extends EasyMockSupport
Capture<DataSourceMetadata> captureDataSourceMetadata = EasyMock.newCapture(); Capture<DataSourceMetadata> captureDataSourceMetadata = EasyMock.newCapture();
KafkaDataSourceMetadata kafkaDataSourceMetadata = new KafkaDataSourceMetadata(new KafkaPartitions( KafkaDataSourceMetadata kafkaDataSourceMetadata = new KafkaDataSourceMetadata(new KafkaPartitions(
KAFKA_TOPIC, topic,
ImmutableMap.of(0, 1000L, 1, 1000L, 2, 1000L) ImmutableMap.of(0, 1000L, 1, 1000L, 2, 1000L)
)); ));
KafkaDataSourceMetadata resetMetadata = new KafkaDataSourceMetadata(new KafkaPartitions( KafkaDataSourceMetadata resetMetadata = new KafkaDataSourceMetadata(new KafkaPartitions(
KAFKA_TOPIC, topic,
ImmutableMap.of(1, 1000L, 2, 1000L) ImmutableMap.of(1, 1000L, 2, 1000L)
)); ));
KafkaDataSourceMetadata expectedMetadata = new KafkaDataSourceMetadata(new KafkaPartitions( KafkaDataSourceMetadata expectedMetadata = new KafkaDataSourceMetadata(new KafkaPartitions(
KAFKA_TOPIC, topic,
ImmutableMap.of(0, 1000L) ImmutableMap.of(0, 1000L)
)); ));
@ -1585,7 +1603,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
verifyAll(); verifyAll();
KafkaDataSourceMetadata resetMetadata = new KafkaDataSourceMetadata(new KafkaPartitions( KafkaDataSourceMetadata resetMetadata = new KafkaDataSourceMetadata(new KafkaPartitions(
KAFKA_TOPIC, topic,
ImmutableMap.of(1, 1000L, 2, 1000L) ImmutableMap.of(1, 1000L, 2, 1000L)
)); ));
@ -1685,7 +1703,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
for (int j = 0; j < numEventsPerPartition; j++) { for (int j = 0; j < numEventsPerPartition; j++) {
kafkaProducer.send( kafkaProducer.send(
new ProducerRecord<byte[], byte[]>( new ProducerRecord<byte[], byte[]>(
KAFKA_TOPIC, topic,
i, i,
null, null,
String.format("event-%d", j).getBytes() String.format("event-%d", j).getBytes()
@ -1706,7 +1724,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
) )
{ {
KafkaSupervisorIOConfig kafkaSupervisorIOConfig = new KafkaSupervisorIOConfig( KafkaSupervisorIOConfig kafkaSupervisorIOConfig = new KafkaSupervisorIOConfig(
KAFKA_TOPIC, topic,
replicas, replicas,
taskCount, taskCount,
new Period(duration), new Period(duration),
@ -1759,7 +1777,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
); );
} }
private DataSchema getDataSchema(String dataSource) private static DataSchema getDataSchema(String dataSource)
{ {
List<DimensionSchema> dimensions = new ArrayList<>(); List<DimensionSchema> dimensions = new ArrayList<>();
dimensions.add(StringDimensionSchema.create("dim1")); dimensions.add(StringDimensionSchema.create("dim1"));

View File

@ -21,8 +21,10 @@ package io.druid.indexing.kafka.test;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.google.common.io.Files;
import kafka.server.KafkaConfig; import kafka.server.KafkaConfig;
import kafka.server.KafkaServer; import kafka.server.KafkaServer;
import org.apache.commons.io.FileUtils;
import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.serialization.ByteArrayDeserializer;
@ -31,6 +33,7 @@ import org.apache.kafka.common.utils.SystemTime;
import scala.Some; import scala.Some;
import scala.collection.immutable.List$; import scala.collection.immutable.List$;
import javax.annotation.Nullable;
import java.io.Closeable; import java.io.Closeable;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
@ -44,15 +47,22 @@ public class TestBroker implements Closeable
private final String zookeeperConnect; private final String zookeeperConnect;
private final File directory; private final File directory;
private final boolean directoryCleanup;
private final int id; private final int id;
private final Map<String, String> brokerProps; private final Map<String, String> brokerProps;
private volatile KafkaServer server; private volatile KafkaServer server;
public TestBroker(String zookeeperConnect, File directory, int id, Map<String, String> brokerProps) public TestBroker(
String zookeeperConnect,
@Nullable File directory,
int id,
Map<String, String> brokerProps
)
{ {
this.zookeeperConnect = zookeeperConnect; this.zookeeperConnect = zookeeperConnect;
this.directory = directory; this.directory = directory == null ? Files.createTempDir() : directory;
this.directoryCleanup = directory == null;
this.id = id; this.id = id;
this.brokerProps = brokerProps == null ? ImmutableMap.<String, String>of() : brokerProps; this.brokerProps = brokerProps == null ? ImmutableMap.<String, String>of() : brokerProps;
} }
@ -117,5 +127,8 @@ public class TestBroker implements Closeable
server.shutdown(); server.shutdown();
server.awaitShutdown(); server.awaitShutdown();
} }
if (directoryCleanup) {
FileUtils.forceDelete(directory);
}
} }
} }