make ITKafkaTest less non-deterministic (#3856)

This commit is contained in:
Himanshu 2017-01-17 16:52:51 -06:00 committed by Parag Jain
parent e550d48772
commit 7004f5d499
4 changed files with 154 additions and 76 deletions

View File

@ -21,7 +21,6 @@ package io.druid.tests.indexer;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import com.google.inject.Inject; import com.google.inject.Inject;
import io.druid.java.util.common.ISE; import io.druid.java.util.common.ISE;
import io.druid.java.util.common.logger.Logger; import io.druid.java.util.common.logger.Logger;
import io.druid.testing.IntegrationTestingConfig; import io.druid.testing.IntegrationTestingConfig;
@ -48,7 +47,6 @@ import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.util.Properties; import java.util.Properties;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
/* /*
* This is a test for the kafka firehose. * This is a test for the kafka firehose.
@ -126,32 +124,6 @@ public class ITKafkaTest extends AbstractIndexerTest
throw new ISE(e, "could not create kafka topic"); throw new ISE(e, "could not create kafka topic");
} }
String indexerSpec;
// replace temp strings in indexer file
try {
LOG.info("indexerFile name: [%s]", INDEXER_FILE);
indexerSpec = getTaskAsString(INDEXER_FILE)
.replaceAll("%%DATASOURCE%%", DATASOURCE)
.replaceAll("%%TOPIC%%", TOPIC_NAME)
.replaceAll("%%ZOOKEEPER_SERVER%%", config.getZookeeperHosts())
.replaceAll("%%GROUP_ID%%", Long.toString(System.currentTimeMillis()))
.replaceAll(
"%%SHUTOFFTIME%%",
new DateTime(System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(2 * MINUTES_TO_SEND)).toString()
);
LOG.info("indexerFile: [%s]\n", indexerSpec);
}
catch (Exception e) {
// log here so the message will appear in the console output
LOG.error("could not read indexer file [%s]", INDEXER_FILE);
throw new ISE(e, "could not read indexer file [%s]", INDEXER_FILE);
}
// start indexing task
taskID = indexer.submitTask(indexerSpec);
LOG.info("-------------SUBMITTED TASK");
// set up kafka producer // set up kafka producer
Properties properties = new Properties(); Properties properties = new Properties();
properties.put("metadata.broker.list", config.getKafkaHost()); properties.put("metadata.broker.list", config.getKafkaHost());
@ -169,21 +141,18 @@ public class ITKafkaTest extends AbstractIndexerTest
DateTime dt = new DateTime(zone); // timestamp to put on events DateTime dt = new DateTime(zone); // timestamp to put on events
dtFirst = dt; // timestamp of 1st event dtFirst = dt; // timestamp of 1st event
dtLast = dt; // timestamp of last event dtLast = dt; // timestamp of last event
// stop sending events when time passes this
DateTime dtStop = dtFirst.plusMinutes(MINUTES_TO_SEND).plusSeconds(30);
// these are used to compute the expected aggregations // these are used to compute the expected aggregations
int added = 0; int added = 0;
int num_events = 0; int num_events = 10;
// send data to kafka // send data to kafka
while (dt.compareTo(dtStop) < 0) { // as long as we're within the time span for (int i = 0; i < num_events; i++) {
num_events++; added += i;
added += num_events;
// construct the event to send // construct the event to send
String event = String.format( String event = String.format(
event_template, event_template,
event_fmt.print(dt), num_events, 0, num_events event_fmt.print(dt), i, 0, i
); );
LOG.info("sending event: [%s]", event); LOG.info("sending event: [%s]", event);
try { try {
@ -195,16 +164,61 @@ public class ITKafkaTest extends AbstractIndexerTest
throw Throwables.propagate(ioe); throw Throwables.propagate(ioe);
} }
try {
Thread.sleep(DELAY_BETWEEN_EVENTS_SECS * 1000);
}
catch (InterruptedException ex) { /* nothing */ }
dtLast = dt; dtLast = dt;
dt = new DateTime(zone); dt = new DateTime(zone);
} }
producer.close(); producer.close();
String indexerSpec;
// replace temp strings in indexer file
try {
LOG.info("indexerFile name: [%s]", INDEXER_FILE);
indexerSpec = getTaskAsString(INDEXER_FILE)
.replaceAll("%%DATASOURCE%%", DATASOURCE)
.replaceAll("%%TOPIC%%", TOPIC_NAME)
.replaceAll("%%ZOOKEEPER_SERVER%%", config.getZookeeperHosts())
.replaceAll("%%GROUP_ID%%", Long.toString(System.currentTimeMillis()))
.replaceAll("%%COUNT%%", Integer.toString(num_events));
LOG.info("indexerFile: [%s]\n", indexerSpec);
}
catch (Exception e) {
// log here so the message will appear in the console output
LOG.error("could not read indexer file [%s]", INDEXER_FILE);
throw new ISE(e, "could not read indexer file [%s]", INDEXER_FILE);
}
// start indexing task
taskID = indexer.submitTask(indexerSpec);
LOG.info("-------------SUBMITTED TASK");
// wait for the task to finish
indexer.waitUntilTaskCompletes (taskID, 20000, 30);
// wait for segments to be handed off
try {
RetryUtil.retryUntil(
new Callable<Boolean>()
{
@Override
public Boolean call() throws Exception
{
return coordinator.areSegmentsLoaded(DATASOURCE);
}
},
true,
30000,
10,
"Real-time generated segments loaded"
);
}
catch (Exception e) {
throw Throwables.propagate(e);
}
LOG.info("segments are present");
segmentsExist = true;
// put the timestamps into the query structure // put the timestamps into the query structure
String query_response_template = null; String query_response_template = null;
InputStream is = ITKafkaTest.class.getResourceAsStream(QUERIES_FILE); InputStream is = ITKafkaTest.class.getResourceAsStream(QUERIES_FILE);
@ -239,37 +253,6 @@ public class ITKafkaTest extends AbstractIndexerTest
catch (Exception e) { catch (Exception e) {
throw Throwables.propagate(e); throw Throwables.propagate(e);
} }
// wait for segments to be handed off
try {
RetryUtil.retryUntil(
new Callable<Boolean>()
{
@Override
public Boolean call() throws Exception
{
return coordinator.areSegmentsLoaded(DATASOURCE);
}
},
true,
30000,
10,
"Real-time generated segments loaded"
);
}
catch (Exception e) {
throw Throwables.propagate(e);
}
LOG.info("segments are present");
segmentsExist = true;
// this query will be answered by historical
try {
this.queryHelper.testQueriesFromString(queryStr, 2);
}
catch (Exception e) {
throw Throwables.propagate(e);
}
} }
@AfterClass @AfterClass
@ -277,9 +260,6 @@ public class ITKafkaTest extends AbstractIndexerTest
{ {
LOG.info("teardown"); LOG.info("teardown");
// wait for the task to complete
indexer.waitUntilTaskCompletes(taskID);
// delete kafka topic // delete kafka topic
AdminUtils.deleteTopic(zkClient, TOPIC_NAME); AdminUtils.deleteTopic(zkClient, TOPIC_NAME);

View File

@ -48,8 +48,8 @@
"ioConfig" : { "ioConfig" : {
"type" : "realtime", "type" : "realtime",
"firehose": { "firehose": {
"type": "timed", "type": "fixedCount",
"shutoffTime": "%%SHUTOFFTIME%%", "count": "%%COUNT%%",
"delegate": { "delegate": {
"type": "kafka-0.8", "type": "kafka-0.8",
"consumerProps": { "consumerProps": {

View File

@ -27,6 +27,7 @@ import io.druid.initialization.DruidModule;
import io.druid.segment.realtime.firehose.ClippedFirehoseFactory; import io.druid.segment.realtime.firehose.ClippedFirehoseFactory;
import io.druid.segment.realtime.firehose.CombiningFirehoseFactory; import io.druid.segment.realtime.firehose.CombiningFirehoseFactory;
import io.druid.segment.realtime.firehose.EventReceiverFirehoseFactory; import io.druid.segment.realtime.firehose.EventReceiverFirehoseFactory;
import io.druid.segment.realtime.firehose.FixedCountFirehoseFactory;
import io.druid.segment.realtime.firehose.IrcFirehoseFactory; import io.druid.segment.realtime.firehose.IrcFirehoseFactory;
import io.druid.segment.realtime.firehose.LocalFirehoseFactory; import io.druid.segment.realtime.firehose.LocalFirehoseFactory;
import io.druid.segment.realtime.firehose.TimedShutoffFirehoseFactory; import io.druid.segment.realtime.firehose.TimedShutoffFirehoseFactory;
@ -54,7 +55,8 @@ public class FirehoseModule implements DruidModule
new NamedType(IrcFirehoseFactory.class, "irc"), new NamedType(IrcFirehoseFactory.class, "irc"),
new NamedType(LocalFirehoseFactory.class, "local"), new NamedType(LocalFirehoseFactory.class, "local"),
new NamedType(EventReceiverFirehoseFactory.class, "receiver"), new NamedType(EventReceiverFirehoseFactory.class, "receiver"),
new NamedType(CombiningFirehoseFactory.class, "combining") new NamedType(CombiningFirehoseFactory.class, "combining"),
new NamedType(FixedCountFirehoseFactory.class, "fixedCount")
) )
); );
} }

View File

@ -0,0 +1,96 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.segment.realtime.firehose;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import io.druid.data.input.Firehose;
import io.druid.data.input.FirehoseFactory;
import io.druid.data.input.InputRow;
import io.druid.data.input.impl.InputRowParser;
import java.io.IOException;
/**
* Firehose to give out only first n events from the delegate firehose.
*/
public class FixedCountFirehoseFactory implements FirehoseFactory
{
private final FirehoseFactory delegate;
private final int count;
@JsonCreator
public FixedCountFirehoseFactory(
@JsonProperty("delegate") FirehoseFactory delegate,
@JsonProperty("count") int count
)
{
this.delegate = delegate;
this.count = count;
}
@JsonProperty
public FirehoseFactory getDelegate()
{
return delegate;
}
@JsonProperty
public int getCount()
{
return count;
}
@Override
public Firehose connect(final InputRowParser parser) throws IOException
{
return new Firehose()
{
private int i = 0;
private Firehose delegateFirehose = delegate.connect(parser);
@Override
public boolean hasMore()
{
return i < count && delegateFirehose.hasMore();
}
@Override
public InputRow nextRow()
{
Preconditions.checkArgument(i++ < count, "Max events limit reached.");
return delegateFirehose.nextRow();
}
@Override
public Runnable commit()
{
return delegateFirehose.commit();
}
@Override
public void close() throws IOException
{
delegateFirehose.close();
}
};
}
}