mirror of
https://github.com/apache/druid.git
synced 2025-02-17 07:25:02 +00:00
ITKafkaIndexingServiceTest fixes (#3872)
* remove wait between sends, wait for ingestion to complete before querying send fixed number of events more fixes * handle interrupted exception * remove while * review comments
This commit is contained in:
parent
efb1b40fe0
commit
515caa8a85
@ -21,7 +21,6 @@ package io.druid.tests.indexer;
|
||||
|
||||
import com.google.common.base.Throwables;
|
||||
import com.google.inject.Inject;
|
||||
|
||||
import io.druid.java.util.common.ISE;
|
||||
import io.druid.java.util.common.logger.Logger;
|
||||
import io.druid.testing.IntegrationTestingConfig;
|
||||
@ -56,12 +55,12 @@ import java.util.concurrent.Callable;
|
||||
public class ITKafkaIndexingServiceTest extends AbstractIndexerTest
|
||||
{
|
||||
private static final Logger LOG = new Logger(ITKafkaIndexingServiceTest.class);
|
||||
private static final int DELAY_BETWEEN_EVENTS_SECS = 5;
|
||||
private static final String INDEXER_FILE = "/indexer/kafka_supervisor_spec.json";
|
||||
private static final String QUERIES_FILE = "/indexer/kafka_index_queries.json";
|
||||
private static final String DATASOURCE = "kafka_indexing_service_test";
|
||||
private static final String TOPIC_NAME = "kafka_indexing_service_topic";
|
||||
private static final int MINUTES_TO_SEND = 4;
|
||||
private static final int NUM_EVENTS_TO_SEND = 60;
|
||||
private static final long WAIT_TIME_MILLIS = 2 * 60 * 1000L;
|
||||
|
||||
// We'll fill in the current time and numbers for added, deleted and changed
|
||||
// before sending the event.
|
||||
@ -163,15 +162,13 @@ public class ITKafkaIndexingServiceTest extends AbstractIndexerTest
|
||||
DateTime dt = new DateTime(zone); // timestamp to put on events
|
||||
dtFirst = dt; // timestamp of 1st event
|
||||
dtLast = dt; // timestamp of last event
|
||||
// stop sending events when time passes this
|
||||
DateTime dtStop = dtFirst.plusMinutes(MINUTES_TO_SEND).plusSeconds(30);
|
||||
|
||||
// these are used to compute the expected aggregations
|
||||
int added = 0;
|
||||
int num_events = 0;
|
||||
|
||||
// send data to kafka
|
||||
while (dt.compareTo(dtStop) < 0) { // as long as we're within the time span
|
||||
while (num_events < NUM_EVENTS_TO_SEND) {
|
||||
num_events++;
|
||||
added += num_events;
|
||||
// construct the event to send
|
||||
@ -184,16 +181,22 @@ public class ITKafkaIndexingServiceTest extends AbstractIndexerTest
|
||||
throw Throwables.propagate(ioe);
|
||||
}
|
||||
|
||||
try {
|
||||
Thread.sleep(DELAY_BETWEEN_EVENTS_SECS * 1000);
|
||||
}
|
||||
catch (InterruptedException ex) { /* nothing */ }
|
||||
dtLast = dt;
|
||||
dt = new DateTime(zone);
|
||||
}
|
||||
|
||||
producer.close();
|
||||
|
||||
LOG.info("Waiting for [%s] millis for Kafka indexing tasks to consume events", WAIT_TIME_MILLIS);
|
||||
try {
|
||||
Thread.sleep(WAIT_TIME_MILLIS);
|
||||
}
|
||||
catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
|
||||
|
||||
InputStream is = ITKafkaIndexingServiceTest.class.getResourceAsStream(QUERIES_FILE);
|
||||
if (null == is) {
|
||||
throw new ISE("could not open query file: %s", QUERIES_FILE);
|
||||
@ -214,7 +217,7 @@ public class ITKafkaIndexingServiceTest extends AbstractIndexerTest
|
||||
.replace("%%TIMEBOUNDARY_RESPONSE_MAXTIME%%", TIMESTAMP_FMT.print(dtLast))
|
||||
.replace("%%TIMEBOUNDARY_RESPONSE_MINTIME%%", TIMESTAMP_FMT.print(dtFirst))
|
||||
.replace("%%TIMESERIES_QUERY_START%%", INTERVAL_FMT.print(dtFirst))
|
||||
.replace("%%TIMESERIES_QUERY_END%%", INTERVAL_FMT.print(dtFirst.plusMinutes(MINUTES_TO_SEND + 2)))
|
||||
.replace("%%TIMESERIES_QUERY_END%%", INTERVAL_FMT.print(dtLast.plusMinutes(2)))
|
||||
.replace("%%TIMESERIES_RESPONSE_TIMESTAMP%%", TIMESTAMP_FMT.print(dtFirst))
|
||||
.replace("%%TIMESERIES_ADDED%%", Integer.toString(added))
|
||||
.replace("%%TIMESERIES_NUMEVENTS%%", Integer.toString(num_events));
|
||||
@ -227,8 +230,23 @@ public class ITKafkaIndexingServiceTest extends AbstractIndexerTest
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
|
||||
LOG.info("Shutting down Kafka Supervisor");
|
||||
indexer.shutdownSupervisor(supervisorId);
|
||||
|
||||
// wait for all kafka indexing tasks to finish
|
||||
LOG.info("Waiting for all kafka indexing tasks to finish");
|
||||
RetryUtil.retryUntilTrue(
|
||||
new Callable<Boolean>()
|
||||
{
|
||||
@Override
|
||||
public Boolean call() throws Exception
|
||||
{
|
||||
return (indexer.getPendingTasks().size() + indexer.getRunningTasks().size() + indexer.getWaitingTasks()
|
||||
.size()) == 0;
|
||||
}
|
||||
}, "Waiting for Tasks Completion"
|
||||
);
|
||||
|
||||
// wait for segments to be handed off
|
||||
try {
|
||||
RetryUtil.retryUntil(
|
||||
|
Loading…
x
Reference in New Issue
Block a user