integration-test update - use KafkaProducer in ITKafkaTest and add retries to task submission (#3888)

* use KafkaProducer instead of old Producer in ITKafkaTest

* add retries to OverlordResourceTestClient.submitTask(..)
This commit is contained in:
Himanshu 2017-01-27 09:38:17 -06:00 committed by Gian Merlino
parent d3a3b7ba0c
commit 17c6512ad8
2 changed files with 49 additions and 34 deletions

View File

@ -22,6 +22,7 @@ package io.druid.testing.clients;
import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Charsets; import com.google.common.base.Charsets;
import com.google.common.base.Predicates;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.metamx.http.client.HttpClient; import com.metamx.http.client.HttpClient;
@ -31,6 +32,7 @@ import com.metamx.http.client.response.StatusResponseHolder;
import io.druid.indexing.common.TaskStatus; import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.task.Task; import io.druid.indexing.common.task.Task;
import io.druid.java.util.common.ISE; import io.druid.java.util.common.ISE;
import io.druid.java.util.common.RetryUtils;
import io.druid.java.util.common.logger.Logger; import io.druid.java.util.common.logger.Logger;
import io.druid.testing.IntegrationTestingConfig; import io.druid.testing.IntegrationTestingConfig;
import io.druid.testing.guice.TestClient; import io.druid.testing.guice.TestClient;
@ -83,32 +85,43 @@ public class OverlordResourceTestClient
} }
} }
public String submitTask(String task) public String submitTask(final String task)
{ {
try { try {
StatusResponseHolder response = httpClient.go( return RetryUtils.retry(
new Request(HttpMethod.POST, new URL(getIndexerURL() + "task")) new Callable<String>()
.setContent(
"application/json",
task.getBytes()
),
responseHandler
).get();
if (!response.getStatus().equals(HttpResponseStatus.OK)) {
throw new ISE(
"Error while submitting task to indexer response [%s %s]",
response.getStatus(),
response.getContent()
);
}
Map<String, String> responseData = jsonMapper.readValue(
response.getContent(), new TypeReference<Map<String, String>>()
{ {
} @Override
public String call() throws Exception
{
StatusResponseHolder response = httpClient.go(
new Request(HttpMethod.POST, new URL(getIndexerURL() + "task"))
.setContent(
"application/json",
task.getBytes()
),
responseHandler
).get();
if (!response.getStatus().equals(HttpResponseStatus.OK)) {
throw new ISE(
"Error while submitting task to indexer response [%s %s]",
response.getStatus(),
response.getContent()
);
}
Map<String, String> responseData = jsonMapper.readValue(
response.getContent(), new TypeReference<Map<String, String>>()
{
}
);
String taskID = responseData.get("task");
LOG.info("Submitted task with TaskID[%s]", taskID);
return taskID;
}
},
Predicates.<Throwable>alwaysTrue(),
5
); );
String taskID = responseData.get("task");
LOG.info("Submitted task with TaskID[%s]", taskID);
return taskID;
} }
catch (Exception e) { catch (Exception e) {
throw Throwables.propagate(e); throw Throwables.propagate(e);

View File

@ -29,12 +29,12 @@ import io.druid.testing.utils.RetryUtil;
import io.druid.testing.utils.TestQueryHelper; import io.druid.testing.utils.TestQueryHelper;
import kafka.admin.AdminUtils; import kafka.admin.AdminUtils;
import kafka.common.TopicExistsException; import kafka.common.TopicExistsException;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import kafka.utils.ZKStringSerializer$; import kafka.utils.ZKStringSerializer$;
import org.I0Itec.zkclient.ZkClient; import org.I0Itec.zkclient.ZkClient;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import org.joda.time.DateTimeZone; import org.joda.time.DateTimeZone;
import org.joda.time.format.DateTimeFormat; import org.joda.time.format.DateTimeFormat;
@ -126,13 +126,16 @@ public class ITKafkaTest extends AbstractIndexerTest
// set up kafka producer // set up kafka producer
Properties properties = new Properties(); Properties properties = new Properties();
properties.put("metadata.broker.list", config.getKafkaHost()); properties.put("bootstrap.servers", config.getKafkaHost());
LOG.info("kafka host: [%s]", config.getKafkaHost()); LOG.info("Kafka bootstrap.servers: [%s]", config.getKafkaHost());
properties.put("serializer.class", "kafka.serializer.StringEncoder"); properties.put("acks", "all");
properties.put("request.required.acks", "1"); properties.put("retries", "3");
properties.put("producer.type", "sync"); // use sync producer for deterministic test
ProducerConfig producerConfig = new ProducerConfig(properties); KafkaProducer<String, String> producer = new KafkaProducer<>(
Producer<String, String> producer = new Producer<String, String>(producerConfig); properties,
new StringSerializer(),
new StringSerializer()
);
DateTimeZone zone = DateTimeZone.forID("UTC"); DateTimeZone zone = DateTimeZone.forID("UTC");
// format for putting into events // format for putting into events
@ -157,8 +160,7 @@ public class ITKafkaTest extends AbstractIndexerTest
LOG.info("sending event: [%s]", event); LOG.info("sending event: [%s]", event);
try { try {
// Send event to kafka // Send event to kafka
KeyedMessage<String, String> message = new KeyedMessage<String, String>(TOPIC_NAME, event); producer.send(new ProducerRecord<String, String>(TOPIC_NAME, event)).get();
producer.send(message);
} }
catch (Exception ioe) { catch (Exception ioe) {
throw Throwables.propagate(ioe); throw Throwables.propagate(ioe);