update KafkaSupervisor to make HTTP requests to tasks in parallel where possible (#3452)

This commit is contained in:
David Lim 2016-09-20 11:21:15 -06:00 committed by Nishant
parent 54ec4dd584
commit 96fcca18ea
23 changed files with 2610 additions and 660 deletions

View File

@ -105,10 +105,10 @@ A sample supervisor spec is shown below:
|--------|-----------|---------|
|`type`|The supervisor type, this should always be `kafka`.|yes|
|`dataSchema`|The schema that will be used by the Kafka indexing task during ingestion, see [Ingestion Spec](../../ingestion/index.html).|yes|
|`tuningConfig`|A KafkaTuningConfig that will be provided to indexing tasks, see below.|no|
|`ioConfig`|A KafkaSupervisorIOConfig to configure the supervisor, see below.|yes|
|`tuningConfig`|A KafkaSupervisorTuningConfig to configure the supervisor and indexing tasks, see below.|no|
|`ioConfig`|A KafkaSupervisorIOConfig to configure the supervisor and indexing tasks, see below.|yes|
### KafkaTuningConfig
### KafkaSupervisorTuningConfig
The tuningConfig is optional and default parameters will be used if no tuningConfig is specified.
@ -123,6 +123,10 @@ The tuningConfig is optional and default parameters will be used if no tuningCon
|`buildV9Directly`|Boolean|Whether to build a v9 index directly instead of first building a v8 index and then converting it to v9 format.|no (default == false)|
|`reportParseExceptions`|Boolean|If true, exceptions encountered during parsing will be thrown and will halt ingestion; if false, unparseable rows and fields will be skipped.|no (default == false)|
|`handoffConditionTimeout`|Long|Milliseconds to wait for segment handoff. It must be >= 0, where 0 means to wait forever.|no (default == 0)|
|`workerThreads`|Integer|The number of threads that will be used by the supervisor for asynchronous operations.|no (default == min(10, taskCount))|
|`chatThreads`|Integer|The number of threads that will be used for communicating with indexing tasks.|no (default == min(10, taskCount * replicas))|
|`chatRetries`|Integer|The number of times HTTP requests to indexing tasks will be retried before considering tasks unresponsive.|no (default == 8)|
|`httpTimeout`|ISO8601 Period|How long to wait for a HTTP response from an indexing task.|no (default == PT10S)|
#### IndexSpec

View File

@ -25,6 +25,10 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Charsets;
import com.google.common.base.Optional;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.metamx.common.IAE;
import com.metamx.common.ISE;
import com.metamx.emitter.EmittingLogger;
@ -32,6 +36,7 @@ import com.metamx.http.client.HttpClient;
import com.metamx.http.client.Request;
import com.metamx.http.client.response.FullResponseHandler;
import com.metamx.http.client.response.FullResponseHolder;
import io.druid.concurrent.Execs;
import io.druid.indexing.common.RetryPolicy;
import io.druid.indexing.common.RetryPolicyConfig;
import io.druid.indexing.common.RetryPolicyFactory;
@ -51,12 +56,14 @@ import java.io.IOException;
import java.net.Socket;
import java.net.URI;
import java.util.Map;
import java.util.concurrent.Callable;
public class KafkaIndexTaskClient
{
public class NoTaskLocationException extends RuntimeException
{
public NoTaskLocationException(String message) {
public NoTaskLocationException(String message)
{
super(message);
}
}
@ -76,33 +83,89 @@ public class KafkaIndexTaskClient
private final HttpClient httpClient;
private final ObjectMapper jsonMapper;
private final TaskInfoProvider taskInfoProvider;
private final Duration httpTimeout;
private final RetryPolicyFactory retryPolicyFactory;
private final ListeningExecutorService executorService;
private final long numRetries;
public KafkaIndexTaskClient(HttpClient httpClient, ObjectMapper jsonMapper, TaskInfoProvider taskInfoProvider)
public KafkaIndexTaskClient(
HttpClient httpClient,
ObjectMapper jsonMapper,
TaskInfoProvider taskInfoProvider,
String dataSource,
int numThreads,
Duration httpTimeout,
long numRetries
)
{
this.httpClient = httpClient;
this.jsonMapper = jsonMapper;
this.taskInfoProvider = taskInfoProvider;
this.httpTimeout = httpTimeout;
this.numRetries = numRetries;
this.retryPolicyFactory = createRetryPolicyFactory();
this.executorService = MoreExecutors.listeningDecorator(
Execs.multiThreaded(
numThreads,
String.format(
"KafkaIndexTaskClient-%s-%%d",
dataSource
)
)
);
}
public void stop(String id, boolean publish)
public void close()
{
submitRequest(id, HttpMethod.POST, "stop", publish ? "publish=true" : null, true);
executorService.shutdownNow();
}
public void resume(String id)
public boolean stop(final String id, final boolean publish)
{
submitRequest(id, HttpMethod.POST, "resume", null, true);
log.debug("Stop task[%s] publish[%s]", id, publish);
try {
final FullResponseHolder response = submitRequest(
id, HttpMethod.POST, "stop", publish ? "publish=true" : null, true
);
return response.getStatus().getCode() / 100 == 2;
}
catch (NoTaskLocationException e) {
return false;
}
catch (TaskNotRunnableException e) {
log.info("Task [%s] couldn't be stopped because it is no longer running", id);
return true;
}
catch (Exception e) {
log.warn(e, "Exception while stopping task [%s]", id);
return false;
}
}
public Map<Integer, Long> pause(String id)
public boolean resume(final String id)
{
log.debug("Resume task[%s]", id);
try {
final FullResponseHolder response = submitRequest(id, HttpMethod.POST, "resume", null, true);
return response.getStatus().getCode() / 100 == 2;
}
catch (NoTaskLocationException e) {
return false;
}
}
public Map<Integer, Long> pause(final String id)
{
return pause(id, 0);
}
public Map<Integer, Long> pause(String id, long timeout)
public Map<Integer, Long> pause(final String id, final long timeout)
{
log.debug("Pause task[%s] timeout[%d]", id, timeout);
try {
final FullResponseHolder response = submitRequest(
id,
@ -128,72 +191,99 @@ public class KafkaIndexTaskClient
} else {
final long sleepTime = delay.getMillis();
log.info(
"Still waiting for task [%s] to pause; will try again in [%s]", id, new Duration(sleepTime).toString()
"Still waiting for task [%s] to pause; will try again in [%s]",
id,
new Duration(sleepTime).toString()
);
Thread.sleep(sleepTime);
}
}
}
catch (NoTaskLocationException e) {
return ImmutableMap.of();
}
catch (IOException | InterruptedException e) {
throw Throwables.propagate(e);
}
}
public KafkaIndexTask.Status getStatus(String id)
public KafkaIndexTask.Status getStatus(final String id)
{
log.debug("GetStatus task[%s]", id);
try {
final FullResponseHolder response = submitRequest(id, HttpMethod.GET, "status", null, true);
return jsonMapper.readValue(response.getContent(), KafkaIndexTask.Status.class);
}
catch (NoTaskLocationException e) {
return KafkaIndexTask.Status.NOT_STARTED;
}
catch (IOException e) {
throw Throwables.propagate(e);
}
}
public DateTime getStartTime(String id)
public DateTime getStartTime(final String id)
{
log.debug("GetStartTime task[%s]", id);
try {
final FullResponseHolder response = submitRequest(id, HttpMethod.GET, "time/start", null, true);
return response.getContent() == null || response.getContent().isEmpty()
? null
: jsonMapper.readValue(response.getContent(), DateTime.class);
}
catch (NoTaskLocationException e) {
return null;
}
catch (IOException e) {
throw Throwables.propagate(e);
}
}
public Map<Integer, Long> getCurrentOffsets(String id, boolean retry)
public Map<Integer, Long> getCurrentOffsets(final String id, final boolean retry)
{
log.debug("GetCurrentOffsets task[%s] retry[%s]", id, retry);
try {
final FullResponseHolder response = submitRequest(id, HttpMethod.GET, "offsets/current", null, retry);
return jsonMapper.readValue(response.getContent(), new TypeReference<Map<Integer, Long>>() {});
}
catch (NoTaskLocationException e) {
return ImmutableMap.of();
}
catch (IOException e) {
throw Throwables.propagate(e);
}
}
public Map<Integer, Long> getEndOffsets(String id)
public Map<Integer, Long> getEndOffsets(final String id)
{
log.debug("GetEndOffsets task[%s]", id);
try {
final FullResponseHolder response = submitRequest(id, HttpMethod.GET, "offsets/end", null, true);
return jsonMapper.readValue(response.getContent(), new TypeReference<Map<Integer, Long>>() {});
}
catch (NoTaskLocationException e) {
return ImmutableMap.of();
}
catch (IOException e) {
throw Throwables.propagate(e);
}
}
public void setEndOffsets(String id, Map<Integer, Long> endOffsets)
public boolean setEndOffsets(final String id, final Map<Integer, Long> endOffsets)
{
setEndOffsets(id, endOffsets, false);
return setEndOffsets(id, endOffsets, false);
}
public void setEndOffsets(String id, Map<Integer, Long> endOffsets, boolean resume)
public boolean setEndOffsets(final String id, final Map<Integer, Long> endOffsets, final boolean resume)
{
log.debug("SetEndOffsets task[%s] endOffsets[%s] resume[%s]", id, endOffsets, resume);
try {
submitRequest(
final FullResponseHolder response = submitRequest(
id,
HttpMethod.POST,
"offsets/end",
@ -201,24 +291,151 @@ public class KafkaIndexTaskClient
jsonMapper.writeValueAsBytes(endOffsets),
true
);
return response.getStatus().getCode() / 100 == 2;
}
catch (NoTaskLocationException e) {
return false;
}
catch (IOException e) {
throw Throwables.propagate(e);
}
}
public ListenableFuture<Boolean> stopAsync(final String id, final boolean publish)
{
return executorService.submit(
new Callable<Boolean>()
{
@Override
public Boolean call() throws Exception
{
return stop(id, publish);
}
}
);
}
public ListenableFuture<Boolean> resumeAsync(final String id)
{
return executorService.submit(
new Callable<Boolean>()
{
@Override
public Boolean call() throws Exception
{
return resume(id);
}
}
);
}
public ListenableFuture<Map<Integer, Long>> pauseAsync(final String id)
{
return pauseAsync(id, 0);
}
public ListenableFuture<Map<Integer, Long>> pauseAsync(final String id, final long timeout)
{
return executorService.submit(
new Callable<Map<Integer, Long>>()
{
@Override
public Map<Integer, Long> call() throws Exception
{
return pause(id, timeout);
}
}
);
}
public ListenableFuture<KafkaIndexTask.Status> getStatusAsync(final String id)
{
return executorService.submit(
new Callable<KafkaIndexTask.Status>()
{
@Override
public KafkaIndexTask.Status call() throws Exception
{
return getStatus(id);
}
}
);
}
public ListenableFuture<DateTime> getStartTimeAsync(final String id)
{
return executorService.submit(
new Callable<DateTime>()
{
@Override
public DateTime call() throws Exception
{
return getStartTime(id);
}
}
);
}
public ListenableFuture<Map<Integer, Long>> getCurrentOffsetsAsync(final String id, final boolean retry)
{
return executorService.submit(
new Callable<Map<Integer, Long>>()
{
@Override
public Map<Integer, Long> call() throws Exception
{
return getCurrentOffsets(id, retry);
}
}
);
}
public ListenableFuture<Map<Integer, Long>> getEndOffsetsAsync(final String id)
{
return executorService.submit(
new Callable<Map<Integer, Long>>()
{
@Override
public Map<Integer, Long> call() throws Exception
{
return getEndOffsets(id);
}
}
);
}
public ListenableFuture<Boolean> setEndOffsetsAsync(final String id, final Map<Integer, Long> endOffsets)
{
return setEndOffsetsAsync(id, endOffsets, false);
}
public ListenableFuture<Boolean> setEndOffsetsAsync(
final String id, final Map<Integer, Long> endOffsets, final boolean resume
)
{
return executorService.submit(
new Callable<Boolean>()
{
@Override
public Boolean call() throws Exception
{
return setEndOffsets(id, endOffsets, resume);
}
}
);
}
@VisibleForTesting
RetryPolicyFactory createRetryPolicyFactory()
{
// Retries for about a minute before giving up; this should be long enough to handle any temporary unresponsiveness
// such as network issues, if a task is still in the process of starting up, or if the task is in the middle of
// persisting to disk and doesn't respond immediately.
// Retries [numRetries] times before giving up; this should be set long enough to handle any temporary
// unresponsiveness such as network issues, if a task is still in the process of starting up, or if the task is in
// the middle of persisting to disk and doesn't respond immediately.
return new RetryPolicyFactory(
new RetryPolicyConfig()
.setMinWait(Period.seconds(2))
.setMaxWait(Period.seconds(8))
.setMaxRetryCount(8)
.setMaxWait(Period.seconds(10))
.setMaxRetryCount(numRetries)
);
}
@ -246,6 +463,8 @@ public class KafkaIndexTaskClient
while (true) {
FullResponseHolder response = null;
Request request = null;
TaskLocation location = TaskLocation.unknown();
String path = String.format("%s/%s/%s", BASE_PATH, id, pathSuffix);
Optional<TaskStatus> status = taskInfoProvider.getTaskStatus(id);
if (!status.isPresent() || !status.get().isRunnable()) {
@ -253,9 +472,8 @@ public class KafkaIndexTaskClient
}
try {
TaskLocation location = taskInfoProvider.getTaskLocation(id);
location = taskInfoProvider.getTaskLocation(id);
if (location.equals(TaskLocation.unknown())) {
log.info("No TaskLocation available for task [%s], this task may not have been assigned to a worker yet", id);
throw new NoTaskLocationException(String.format("No TaskLocation available for task [%s]", id));
}
@ -264,15 +482,7 @@ public class KafkaIndexTaskClient
checkConnection(location.getHost(), location.getPort());
try {
URI serviceUri = new URI(
"http",
null,
location.getHost(),
location.getPort(),
String.format("%s/%s/%s", BASE_PATH, id, pathSuffix),
query,
null
);
URI serviceUri = new URI("http", null, location.getHost(), location.getPort(), path, query, null);
request = new Request(method, serviceUri.toURL());
// used to validate that we are talking to the correct worker
@ -282,7 +492,8 @@ public class KafkaIndexTaskClient
request.setContent(MediaType.APPLICATION_JSON, content);
}
response = httpClient.go(request, new FullResponseHandler(Charsets.UTF_8)).get();
log.debug("HTTP %s: %s", method.getName(), serviceUri.toString());
response = httpClient.go(request, new FullResponseHandler(Charsets.UTF_8), httpTimeout).get();
}
catch (Exception e) {
Throwables.propagateIfInstanceOf(e.getCause(), IOException.class);
@ -324,17 +535,26 @@ public class KafkaIndexTaskClient
delay = retryPolicy.getAndIncrementRetryDelay();
}
if (!retry || delay == null) {
String urlForLog = (request != null
? request.getUrl().toString()
: String.format("http://%s:%d%s", location.getHost(), location.getPort(), path));
if (!retry) {
// if retry=false, we probably aren't too concerned if the operation doesn't succeed (i.e. the request was
// for informational purposes only) so don't log a scary stack trace
log.info("submitRequest failed for [%s], with message [%s]", urlForLog, e.getMessage());
Throwables.propagate(e);
} else if (delay == null) {
log.warn(e, "Retries exhausted for [%s], last exception:", urlForLog);
Throwables.propagate(e);
} else {
try {
final long sleepTime = delay.getMillis();
log.debug(
"Bad response HTTP [%d] from %s; will try again in [%s] (body: [%s])",
(response != null ? response.getStatus().getCode() : 0),
(request != null ? request.getUrl() : "-"),
"Bad response HTTP [%s] from [%s]; will try again in [%s] (body/exception: [%s])",
(response != null ? response.getStatus().getCode() : "no response"),
urlForLog,
new Duration(sleepTime).toString(),
(response != null ? response.getContent() : "[empty]")
(response != null ? response.getContent() : e.getMessage())
);
Thread.sleep(sleepTime);
}
@ -343,6 +563,15 @@ public class KafkaIndexTaskClient
}
}
}
catch (NoTaskLocationException e) {
log.info("No TaskLocation available for task [%s], this task may not have been assigned to a worker yet or "
+ "may have already completed", id);
throw e;
}
catch (Exception e) {
log.warn(e, "Exception while sending request");
throw e;
}
}
}
}

View File

@ -25,6 +25,7 @@ import com.metamx.http.client.HttpClient;
import io.druid.guice.annotations.Global;
import io.druid.guice.annotations.Json;
import io.druid.indexing.common.TaskInfoProvider;
import org.joda.time.Duration;
public class KafkaIndexTaskClientFactory
{
@ -38,8 +39,22 @@ public class KafkaIndexTaskClientFactory
this.mapper = mapper;
}
public KafkaIndexTaskClient build(TaskInfoProvider taskInfoProvider)
public KafkaIndexTaskClient build(
TaskInfoProvider taskInfoProvider,
String dataSource,
int numThreads,
Duration httpTimeout,
long numRetries
)
{
return new KafkaIndexTaskClient(httpClient, mapper, taskInfoProvider);
return new KafkaIndexTaskClient(
httpClient,
mapper,
taskInfoProvider,
dataSource,
numThreads,
httpTimeout,
numRetries
);
}
}

View File

@ -25,6 +25,7 @@ import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.common.collect.ImmutableList;
import com.google.inject.Binder;
import io.druid.indexing.kafka.supervisor.KafkaSupervisorSpec;
import io.druid.indexing.kafka.supervisor.KafkaSupervisorTuningConfig;
import io.druid.initialization.DruidModule;
import java.util.List;
@ -40,7 +41,7 @@ public class KafkaIndexTaskModule implements DruidModule
new NamedType(KafkaIndexTask.class, "index_kafka"),
new NamedType(KafkaDataSourceMetadata.class, "kafka"),
new NamedType(KafkaIOConfig.class, "kafka"),
new NamedType(KafkaTuningConfig.class, "kafka"),
new NamedType(KafkaSupervisorTuningConfig.class, "kafka"),
new NamedType(KafkaSupervisorSpec.class, "kafka")
)
);

View File

@ -76,6 +76,21 @@ public class KafkaTuningConfig implements TuningConfig, AppenderatorConfig
: handoffConditionTimeout;
}
public static KafkaTuningConfig copyOf(KafkaTuningConfig config)
{
return new KafkaTuningConfig(
config.maxRowsInMemory,
config.maxRowsPerSegment,
config.intermediatePersistPeriod,
config.basePersistDirectory,
config.maxPendingPersists,
config.indexSpec,
config.buildV9Directly,
config.reportParseExceptions,
config.handoffConditionTimeout
);
}
@JsonProperty
public int getMaxRowsInMemory()
{
@ -159,4 +174,78 @@ public class KafkaTuningConfig implements TuningConfig, AppenderatorConfig
handoffConditionTimeout
);
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
KafkaTuningConfig that = (KafkaTuningConfig) o;
if (maxRowsInMemory != that.maxRowsInMemory) {
return false;
}
if (maxRowsPerSegment != that.maxRowsPerSegment) {
return false;
}
if (maxPendingPersists != that.maxPendingPersists) {
return false;
}
if (buildV9Directly != that.buildV9Directly) {
return false;
}
if (reportParseExceptions != that.reportParseExceptions) {
return false;
}
if (handoffConditionTimeout != that.handoffConditionTimeout) {
return false;
}
if (intermediatePersistPeriod != null
? !intermediatePersistPeriod.equals(that.intermediatePersistPeriod)
: that.intermediatePersistPeriod != null) {
return false;
}
if (basePersistDirectory != null
? !basePersistDirectory.equals(that.basePersistDirectory)
: that.basePersistDirectory != null) {
return false;
}
return !(indexSpec != null ? !indexSpec.equals(that.indexSpec) : that.indexSpec != null);
}
@Override
public int hashCode()
{
int result = maxRowsInMemory;
result = 31 * result + maxRowsPerSegment;
result = 31 * result + (intermediatePersistPeriod != null ? intermediatePersistPeriod.hashCode() : 0);
result = 31 * result + (basePersistDirectory != null ? basePersistDirectory.hashCode() : 0);
result = 31 * result + maxPendingPersists;
result = 31 * result + (indexSpec != null ? indexSpec.hashCode() : 0);
result = 31 * result + (buildV9Directly ? 1 : 0);
result = 31 * result + (reportParseExceptions ? 1 : 0);
result = 31 * result + (int) (handoffConditionTimeout ^ (handoffConditionTimeout >>> 32));
return result;
}
@Override
public String toString()
{
return "KafkaTuningConfig{" +
"maxRowsInMemory=" + maxRowsInMemory +
", maxRowsPerSegment=" + maxRowsPerSegment +
", intermediatePersistPeriod=" + intermediatePersistPeriod +
", basePersistDirectory=" + basePersistDirectory +
", maxPendingPersists=" + maxPendingPersists +
", indexSpec=" + indexSpec +
", buildV9Directly=" + buildV9Directly +
", reportParseExceptions=" + reportParseExceptions +
", handoffConditionTimeout=" + handoffConditionTimeout +
'}';
}
}

View File

@ -136,6 +136,23 @@ public class KafkaSupervisorIOConfig
return lateMessageRejectionPeriod;
}
@Override
public String toString()
{
return "KafkaSupervisorIOConfig{" +
"topic='" + topic + '\'' +
", replicas=" + replicas +
", taskCount=" + taskCount +
", taskDuration=" + taskDuration +
", consumerProperties=" + consumerProperties +
", startDelay=" + startDelay +
", period=" + period +
", useEarliestOffset=" + useEarliestOffset +
", completionTimeout=" + completionTimeout +
", lateMessageRejectionPeriod=" + lateMessageRejectionPeriod +
'}';
}
private static Duration defaultDuration(final Period period, final String theDefault)
{
return (period == null ? new Period(theDefault) : period).toStandardDuration();

View File

@ -21,80 +21,14 @@ package io.druid.indexing.kafka.supervisor;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.Lists;
import com.metamx.common.IAE;
import io.druid.indexing.overlord.supervisor.SupervisorReport;
import org.joda.time.DateTime;
import java.util.List;
import java.util.Map;
public class KafkaSupervisorReport extends SupervisorReport
{
public class TaskReportData
{
private final String id;
private final Map<Integer, Long> startingOffsets;
private final Map<Integer, Long> currentOffsets;
private final DateTime startTime;
private final Long remainingSeconds;
public TaskReportData(
String id,
Map<Integer, Long> startingOffsets,
Map<Integer, Long> currentOffsets,
DateTime startTime,
Long remainingSeconds
)
{
this.id = id;
this.startingOffsets = startingOffsets;
this.currentOffsets = currentOffsets;
this.startTime = startTime;
this.remainingSeconds = remainingSeconds;
}
@JsonProperty
public String getId()
{
return id;
}
@JsonProperty
public Map<Integer, Long> getStartingOffsets()
{
return startingOffsets;
}
@JsonProperty
public Map<Integer, Long> getCurrentOffsets()
{
return currentOffsets;
}
@JsonProperty
public DateTime getStartTime()
{
return startTime;
}
@JsonProperty
public Long getRemainingSeconds()
{
return remainingSeconds;
}
@Override
public String toString()
{
return "{" +
"id='" + id + '\'' +
(startingOffsets != null ? ", startingOffsets=" + startingOffsets : "") +
(currentOffsets != null ? ", currentOffsets=" + currentOffsets : "") +
", startTime=" + startTime +
", remainingSeconds=" + remainingSeconds +
'}';
}
}
public class KafkaSupervisorReportPayload
{
private final String dataSource;
@ -200,26 +134,15 @@ public class KafkaSupervisorReport extends SupervisorReport
return payload;
}
public void addActiveTask(
String id,
Map<Integer, Long> startingOffsets,
Map<Integer, Long> currentOffsets,
DateTime startTime,
Long remainingSeconds
)
public void addTask(TaskReportData data)
{
payload.activeTasks.add(new TaskReportData(id, startingOffsets, currentOffsets, startTime, remainingSeconds));
if (data.getType().equals(TaskReportData.TaskType.ACTIVE)) {
payload.activeTasks.add(data);
} else if (data.getType().equals(TaskReportData.TaskType.PUBLISHING)) {
payload.publishingTasks.add(data);
} else {
throw new IAE("Unknown task type [%s]", data.getType().name());
}
public void addPublishingTask(
String id,
Map<Integer, Long> startingOffsets,
Map<Integer, Long> currentOffsets,
DateTime startTime,
Long remainingSeconds
)
{
payload.publishingTasks.add(new TaskReportData(id, startingOffsets, currentOffsets, startTime, remainingSeconds));
}
@Override

View File

@ -25,9 +25,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import io.druid.guice.annotations.Json;
import io.druid.indexing.kafka.KafkaIndexTaskClient;
import io.druid.indexing.kafka.KafkaIndexTaskClientFactory;
import io.druid.indexing.kafka.KafkaTuningConfig;
import io.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import io.druid.indexing.overlord.TaskMaster;
import io.druid.indexing.overlord.TaskStorage;
@ -38,7 +36,7 @@ import io.druid.segment.indexing.DataSchema;
public class KafkaSupervisorSpec implements SupervisorSpec
{
private final DataSchema dataSchema;
private final KafkaTuningConfig tuningConfig;
private final KafkaSupervisorTuningConfig tuningConfig;
private final KafkaSupervisorIOConfig ioConfig;
private final TaskStorage taskStorage;
@ -50,7 +48,7 @@ public class KafkaSupervisorSpec implements SupervisorSpec
@JsonCreator
public KafkaSupervisorSpec(
@JsonProperty("dataSchema") DataSchema dataSchema,
@JsonProperty("tuningConfig") KafkaTuningConfig tuningConfig,
@JsonProperty("tuningConfig") KafkaSupervisorTuningConfig tuningConfig,
@JsonProperty("ioConfig") KafkaSupervisorIOConfig ioConfig,
@JacksonInject TaskStorage taskStorage,
@JacksonInject TaskMaster taskMaster,
@ -62,7 +60,21 @@ public class KafkaSupervisorSpec implements SupervisorSpec
this.dataSchema = Preconditions.checkNotNull(dataSchema, "dataSchema");
this.tuningConfig = tuningConfig != null
? tuningConfig
: new KafkaTuningConfig(null, null, null, null, null, null, null, null, null);
: new KafkaSupervisorTuningConfig(
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null
);
this.ioConfig = Preconditions.checkNotNull(ioConfig, "ioConfig");
this.taskStorage = taskStorage;
@ -79,7 +91,7 @@ public class KafkaSupervisorSpec implements SupervisorSpec
}
@JsonProperty
public KafkaTuningConfig getTuningConfig()
public KafkaSupervisorTuningConfig getTuningConfig()
{
return tuningConfig;
}
@ -108,4 +120,14 @@ public class KafkaSupervisorSpec implements SupervisorSpec
this
);
}
@Override
public String toString()
{
return "KafkaSupervisorSpec{" +
"dataSchema=" + dataSchema +
", tuningConfig=" + tuningConfig +
", ioConfig=" + ioConfig +
'}';
}
}

View File

@ -0,0 +1,119 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexing.kafka.supervisor;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.druid.indexing.kafka.KafkaTuningConfig;
import io.druid.segment.IndexSpec;
import org.joda.time.Duration;
import org.joda.time.Period;
import java.io.File;
public class KafkaSupervisorTuningConfig extends KafkaTuningConfig
{
private final Integer workerThreads;
private final Integer chatThreads;
private final Long chatRetries;
private final Duration httpTimeout;
public KafkaSupervisorTuningConfig(
@JsonProperty("maxRowsInMemory") Integer maxRowsInMemory,
@JsonProperty("maxRowsPerSegment") Integer maxRowsPerSegment,
@JsonProperty("intermediatePersistPeriod") Period intermediatePersistPeriod,
@JsonProperty("basePersistDirectory") File basePersistDirectory,
@JsonProperty("maxPendingPersists") Integer maxPendingPersists,
@JsonProperty("indexSpec") IndexSpec indexSpec,
@JsonProperty("buildV9Directly") Boolean buildV9Directly,
@JsonProperty("reportParseExceptions") Boolean reportParseExceptions,
@JsonProperty("handoffConditionTimeout") Long handoffConditionTimeout,
@JsonProperty("workerThreads") Integer workerThreads,
@JsonProperty("chatThreads") Integer chatThreads,
@JsonProperty("chatRetries") Long chatRetries,
@JsonProperty("httpTimeout") Period httpTimeout
)
{
super(
maxRowsInMemory,
maxRowsPerSegment,
intermediatePersistPeriod,
basePersistDirectory,
maxPendingPersists,
indexSpec,
buildV9Directly,
reportParseExceptions,
handoffConditionTimeout
);
this.workerThreads = workerThreads;
this.chatThreads = chatThreads;
this.chatRetries = (chatRetries != null ? chatRetries : 8);
this.httpTimeout = defaultDuration(httpTimeout, "PT10S");
}
@JsonProperty
public Integer getWorkerThreads()
{
return workerThreads;
}
@JsonProperty
public Integer getChatThreads()
{
return chatThreads;
}
@JsonProperty
public Long getChatRetries()
{
return chatRetries;
}
@JsonProperty
public Duration getHttpTimeout()
{
return httpTimeout;
}
@Override
public String toString()
{
return "KafkaSupervisorTuningConfig{" +
"maxRowsInMemory=" + getMaxRowsInMemory() +
", maxRowsPerSegment=" + getMaxRowsPerSegment() +
", intermediatePersistPeriod=" + getIntermediatePersistPeriod() +
", basePersistDirectory=" + getBasePersistDirectory() +
", maxPendingPersists=" + getMaxPendingPersists() +
", indexSpec=" + getIndexSpec() +
", buildV9Directly=" + getBuildV9Directly() +
", reportParseExceptions=" + isReportParseExceptions() +
", handoffConditionTimeout=" + getHandoffConditionTimeout() +
", workerThreads=" + workerThreads +
", chatThreads=" + chatThreads +
", chatRetries=" + chatRetries +
", httpTimeout=" + httpTimeout +
'}';
}
private static Duration defaultDuration(final Period period, final String theDefault)
{
return (period == null ? new Period(theDefault) : period).toStandardDuration();
}
}

View File

@ -0,0 +1,110 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexing.kafka.supervisor;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.joda.time.DateTime;
import java.util.Map;
public class TaskReportData
{
public enum TaskType
{
ACTIVE, PUBLISHING, UNKNOWN
}
private final String id;
private final Map<Integer, Long> startingOffsets;
private final DateTime startTime;
private final Long remainingSeconds;
private final TaskType type;
private Map<Integer, Long> currentOffsets;
public TaskReportData(
String id,
Map<Integer, Long> startingOffsets,
Map<Integer, Long> currentOffsets,
DateTime startTime,
Long remainingSeconds,
TaskType type
)
{
this.id = id;
this.startingOffsets = startingOffsets;
this.currentOffsets = currentOffsets;
this.startTime = startTime;
this.remainingSeconds = remainingSeconds;
this.type = type;
}
@JsonProperty
public String getId()
{
return id;
}
@JsonProperty
public Map<Integer, Long> getStartingOffsets()
{
return startingOffsets;
}
@JsonProperty
public Map<Integer, Long> getCurrentOffsets()
{
return currentOffsets;
}
public void setCurrentOffsets(Map<Integer, Long> currentOffsets)
{
this.currentOffsets = currentOffsets;
}
@JsonProperty
public DateTime getStartTime()
{
return startTime;
}
@JsonProperty
public Long getRemainingSeconds()
{
return remainingSeconds;
}
@JsonProperty
public TaskType getType()
{
return type;
}
@Override
public String toString()
{
return "{" +
"id='" + id + '\'' +
(startingOffsets != null ? ", startingOffsets=" + startingOffsets : "") +
(currentOffsets != null ? ", currentOffsets=" + currentOffsets : "") +
", startTime=" + startTime +
", remainingSeconds=" + remainingSeconds +
'}';
}
}

View File

@ -21,89 +21,127 @@ package io.druid.indexing.kafka;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.metamx.common.IAE;
import com.metamx.http.client.HttpClient;
import com.metamx.http.client.Request;
import com.metamx.http.client.response.FullResponseHandler;
import com.metamx.http.client.response.FullResponseHolder;
import io.druid.indexing.common.RetryPolicyConfig;
import io.druid.indexing.common.RetryPolicyFactory;
import io.druid.indexing.common.TaskInfoProvider;
import io.druid.indexing.common.TaskLocation;
import io.druid.indexing.common.TaskStatus;
import io.druid.jackson.DefaultObjectMapper;
import org.easymock.Capture;
import org.easymock.EasyMockRunner;
import org.easymock.CaptureType;
import org.easymock.EasyMockSupport;
import org.easymock.Mock;
import org.jboss.netty.handler.codec.http.HttpHeaders;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.codec.http.HttpResponse;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.joda.time.DateTime;
import org.joda.time.Period;
import org.joda.time.Duration;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.io.IOException;
import java.net.URL;
import java.util.List;
import java.util.Map;
import static org.easymock.EasyMock.anyObject;
import static org.easymock.EasyMock.capture;
import static org.easymock.EasyMock.eq;
import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.reset;
@RunWith(EasyMockRunner.class)
@RunWith(Parameterized.class)
public class KafkaIndexTaskClientTest extends EasyMockSupport
{
private static final ObjectMapper objectMapper = new DefaultObjectMapper();
private static final String TEST_ID = "test-id";
private static final List<String> TEST_IDS = Lists.newArrayList("test-id1", "test-id2", "test-id3", "test-id4");
private static final String TEST_HOST = "test-host";
private static final int TEST_PORT = 1234;
private static final String TEST_DATASOURCE = "test-datasource";
private static final Duration TEST_HTTP_TIMEOUT = new Duration(5000);
private static final long TEST_NUM_RETRIES = 0;
private static final String URL_FORMATTER = "http://%s:%d/druid/worker/v1/chat/%s/%s";
@Mock
private int numThreads;
private HttpClient httpClient;
@Mock
private TaskInfoProvider taskInfoProvider;
@Mock
private FullResponseHolder responseHolder;
@Mock
private HttpResponse response;
@Mock
private HttpHeaders headers;
private KafkaIndexTaskClient client;
@Parameterized.Parameters(name = "numThreads = {0}")
public static Iterable<Object[]> constructorFeeder()
{
return ImmutableList.of(new Object[]{1}, new Object[]{8});
}
public KafkaIndexTaskClientTest(int numThreads)
{
this.numThreads = numThreads;
}
@Before
public void setUp() throws Exception
{
httpClient = createMock(HttpClient.class);
taskInfoProvider = createMock(TaskInfoProvider.class);
responseHolder = createMock(FullResponseHolder.class);
response = createMock(HttpResponse.class);
headers = createMock(HttpHeaders.class);
client = new TestableKafkaIndexTaskClient(httpClient, objectMapper, taskInfoProvider);
expect(taskInfoProvider.getTaskLocation(TEST_ID)).andReturn(new TaskLocation(TEST_HOST, TEST_PORT)).anyTimes();
expect(taskInfoProvider.getTaskStatus(TEST_ID)).andReturn(Optional.of(TaskStatus.running(TEST_ID))).anyTimes();
for (int i = 0; i < TEST_IDS.size(); i++) {
expect(taskInfoProvider.getTaskLocation(TEST_IDS.get(i))).andReturn(new TaskLocation(TEST_HOST, TEST_PORT))
.anyTimes();
expect(taskInfoProvider.getTaskStatus(TEST_IDS.get(i))).andReturn(Optional.of(TaskStatus.running(TEST_IDS.get(i))))
.anyTimes();
}
}
@Test(expected = KafkaIndexTaskClient.NoTaskLocationException.class)
public void testNoTaskLocationException() throws Exception
@After
public void tearDown() throws Exception
{
client.close();
}
@Test
public void testNoTaskLocation() throws Exception
{
reset(taskInfoProvider);
expect(taskInfoProvider.getTaskLocation(TEST_ID)).andReturn(TaskLocation.unknown()).anyTimes();
expect(taskInfoProvider.getTaskStatus(TEST_ID)).andReturn(Optional.of(TaskStatus.running(TEST_ID))).anyTimes();
expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.BAD_REQUEST).times(2);
expect(responseHolder.getContent()).andReturn("");
expect(httpClient.go(anyObject(Request.class), anyObject(FullResponseHandler.class))).andReturn(
Futures.immediateFuture(responseHolder)
);
replayAll();
client.getCurrentOffsets(TEST_ID, true);
Assert.assertEquals(false, client.stop(TEST_ID, true));
Assert.assertEquals(false, client.resume(TEST_ID));
Assert.assertEquals(ImmutableMap.of(), client.pause(TEST_ID));
Assert.assertEquals(ImmutableMap.of(), client.pause(TEST_ID, 10));
Assert.assertEquals(KafkaIndexTask.Status.NOT_STARTED, client.getStatus(TEST_ID));
Assert.assertEquals(null, client.getStartTime(TEST_ID));
Assert.assertEquals(ImmutableMap.of(), client.getCurrentOffsets(TEST_ID, true));
Assert.assertEquals(ImmutableMap.of(), client.getEndOffsets(TEST_ID));
Assert.assertEquals(false, client.setEndOffsets(TEST_ID, ImmutableMap.<Integer, Long>of()));
Assert.assertEquals(false, client.setEndOffsets(TEST_ID, ImmutableMap.<Integer, Long>of(), true));
verifyAll();
}
@Test(expected = KafkaIndexTaskClient.TaskNotRunnableException.class)
@ -122,7 +160,13 @@ public class KafkaIndexTaskClientTest extends EasyMockSupport
public void testInternalServerError() throws Exception
{
expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.INTERNAL_SERVER_ERROR).times(2);
expect(httpClient.go(anyObject(Request.class), anyObject(FullResponseHandler.class))).andReturn(
expect(
httpClient.go(
anyObject(Request.class),
anyObject(FullResponseHandler.class),
eq(TEST_HTTP_TIMEOUT)
)
).andReturn(
Futures.immediateFuture(responseHolder)
);
replayAll();
@ -136,7 +180,13 @@ public class KafkaIndexTaskClientTest extends EasyMockSupport
{
expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.BAD_REQUEST).times(2);
expect(responseHolder.getContent()).andReturn("");
expect(httpClient.go(anyObject(Request.class), anyObject(FullResponseHandler.class))).andReturn(
expect(
httpClient.go(
anyObject(Request.class),
anyObject(FullResponseHandler.class),
eq(TEST_HTTP_TIMEOUT)
)
).andReturn(
Futures.immediateFuture(responseHolder)
);
replayAll();
@ -155,7 +205,13 @@ public class KafkaIndexTaskClientTest extends EasyMockSupport
.andReturn("{}");
expect(response.headers()).andReturn(headers);
expect(headers.get("X-Druid-Task-Id")).andReturn("a-different-task-id");
expect(httpClient.go(anyObject(Request.class), anyObject(FullResponseHandler.class))).andReturn(
expect(
httpClient.go(
anyObject(Request.class),
anyObject(FullResponseHandler.class),
eq(TEST_HTTP_TIMEOUT)
)
).andReturn(
Futures.immediateFuture(responseHolder)
).times(2);
replayAll();
@ -172,7 +228,7 @@ public class KafkaIndexTaskClientTest extends EasyMockSupport
Capture<Request> captured = Capture.newInstance();
expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.OK);
expect(responseHolder.getContent()).andReturn("{\"0\":1, \"1\":10}");
expect(httpClient.go(capture(captured), anyObject(FullResponseHandler.class))).andReturn(
expect(httpClient.go(capture(captured), anyObject(FullResponseHandler.class), eq(TEST_HTTP_TIMEOUT))).andReturn(
Futures.immediateFuture(responseHolder)
);
replayAll();
@ -196,58 +252,72 @@ public class KafkaIndexTaskClientTest extends EasyMockSupport
@Test
public void testGetCurrentOffsetsWithRetry() throws Exception
{
client = new RetryingTestableKafkaIndexTaskClient(httpClient, objectMapper, taskInfoProvider);
client = new TestableKafkaIndexTaskClient(httpClient, objectMapper, taskInfoProvider, 3);
Capture<Request> captured = Capture.newInstance();
Capture<Request> captured2 = Capture.newInstance();
expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.NOT_FOUND).times(2)
.andReturn(HttpResponseStatus.OK).times(2);
expect(responseHolder.getContent()).andReturn("")
Capture<Request> captured = Capture.newInstance(CaptureType.ALL);
expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.NOT_FOUND).times(6)
.andReturn(HttpResponseStatus.OK).times(1);
expect(responseHolder.getContent()).andReturn("").times(2)
.andReturn("{\"0\":1, \"1\":10}");
expect(responseHolder.getResponse()).andReturn(response);
expect(response.headers()).andReturn(headers);
expect(headers.get("X-Druid-Task-Id")).andReturn(TEST_ID);
expect(responseHolder.getResponse()).andReturn(response).times(2);
expect(response.headers()).andReturn(headers).times(2);
expect(headers.get("X-Druid-Task-Id")).andReturn(TEST_ID).times(2);
expect(httpClient.go(capture(captured), anyObject(FullResponseHandler.class))).andReturn(
expect(httpClient.go(capture(captured), anyObject(FullResponseHandler.class), eq(TEST_HTTP_TIMEOUT))).andReturn(
Futures.immediateFuture(responseHolder)
);
expect(httpClient.go(capture(captured2), anyObject(FullResponseHandler.class))).andReturn(
Futures.immediateFuture(responseHolder)
);
).times(3);
replayAll();
Map<Integer, Long> results = client.getCurrentOffsets(TEST_ID, true);
verifyAll();
Request request = captured.getValue();
Assert.assertEquals(HttpMethod.GET, request.getMethod());
Assert.assertEquals(
new URL("http://test-host:1234/druid/worker/v1/chat/test-id/offsets/current"),
request.getUrl()
);
Assert.assertTrue(request.getHeaders().get("X-Druid-Task-Id").contains("test-id"));
request = captured2.getValue();
Assert.assertEquals(3, captured.getValues().size());
for (Request request : captured.getValues()) {
Assert.assertEquals(HttpMethod.GET, request.getMethod());
Assert.assertEquals(
new URL("http://test-host:1234/druid/worker/v1/chat/test-id/offsets/current"),
request.getUrl()
);
Assert.assertTrue(request.getHeaders().get("X-Druid-Task-Id").contains("test-id"));
}
Assert.assertEquals(2, results.size());
Assert.assertEquals(1, (long) results.get(0));
Assert.assertEquals(10, (long) results.get(1));
}
@Test(expected = RuntimeException.class)
public void testGetCurrentOffsetsWithExhaustedRetries() throws Exception
{
client = new TestableKafkaIndexTaskClient(httpClient, objectMapper, taskInfoProvider, 2);
expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.NOT_FOUND).anyTimes();
expect(responseHolder.getContent()).andReturn("").anyTimes();
expect(responseHolder.getResponse()).andReturn(response).anyTimes();
expect(response.headers()).andReturn(headers).anyTimes();
expect(headers.get("X-Druid-Task-Id")).andReturn(TEST_ID).anyTimes();
expect(
httpClient.go(
anyObject(Request.class),
anyObject(FullResponseHandler.class),
eq(TEST_HTTP_TIMEOUT)
)
).andReturn(Futures.immediateFuture(responseHolder)).anyTimes();
replayAll();
client.getCurrentOffsets(TEST_ID, true);
verifyAll();
}
@Test
public void testGetEndOffsets() throws Exception
{
Capture<Request> captured = Capture.newInstance();
expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.OK);
expect(responseHolder.getContent()).andReturn("{\"0\":1, \"1\":10}");
expect(httpClient.go(capture(captured), anyObject(FullResponseHandler.class))).andReturn(
expect(httpClient.go(capture(captured), anyObject(FullResponseHandler.class), eq(TEST_HTTP_TIMEOUT))).andReturn(
Futures.immediateFuture(responseHolder)
);
replayAll();
@ -271,7 +341,7 @@ public class KafkaIndexTaskClientTest extends EasyMockSupport
@Test
public void testGetStartTime() throws Exception
{
client = new RetryingTestableKafkaIndexTaskClient(httpClient, objectMapper, taskInfoProvider);
client = new TestableKafkaIndexTaskClient(httpClient, objectMapper, taskInfoProvider, 2);
DateTime now = DateTime.now();
Capture<Request> captured = Capture.newInstance();
@ -281,7 +351,7 @@ public class KafkaIndexTaskClientTest extends EasyMockSupport
expect(response.headers()).andReturn(headers);
expect(headers.get("X-Druid-Task-Id")).andReturn(null);
expect(responseHolder.getContent()).andReturn(String.valueOf(now.getMillis())).anyTimes();
expect(httpClient.go(capture(captured), anyObject(FullResponseHandler.class))).andReturn(
expect(httpClient.go(capture(captured), anyObject(FullResponseHandler.class), eq(TEST_HTTP_TIMEOUT))).andReturn(
Futures.immediateFuture(responseHolder)
).times(2);
replayAll();
@ -308,7 +378,7 @@ public class KafkaIndexTaskClientTest extends EasyMockSupport
Capture<Request> captured = Capture.newInstance();
expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.OK);
expect(responseHolder.getContent()).andReturn(String.format("\"%s\"", status.toString())).anyTimes();
expect(httpClient.go(capture(captured), anyObject(FullResponseHandler.class))).andReturn(
expect(httpClient.go(capture(captured), anyObject(FullResponseHandler.class), eq(TEST_HTTP_TIMEOUT))).andReturn(
Futures.immediateFuture(responseHolder)
);
replayAll();
@ -333,7 +403,7 @@ public class KafkaIndexTaskClientTest extends EasyMockSupport
Capture<Request> captured = Capture.newInstance();
expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.OK).times(2);
expect(responseHolder.getContent()).andReturn("{\"0\":1, \"1\":10}").anyTimes();
expect(httpClient.go(capture(captured), anyObject(FullResponseHandler.class))).andReturn(
expect(httpClient.go(capture(captured), anyObject(FullResponseHandler.class), eq(TEST_HTTP_TIMEOUT))).andReturn(
Futures.immediateFuture(responseHolder)
);
replayAll();
@ -360,7 +430,7 @@ public class KafkaIndexTaskClientTest extends EasyMockSupport
Capture<Request> captured = Capture.newInstance();
expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.OK).times(2);
expect(responseHolder.getContent()).andReturn("{\"0\":1, \"1\":10}").anyTimes();
expect(httpClient.go(capture(captured), anyObject(FullResponseHandler.class))).andReturn(
expect(httpClient.go(capture(captured), anyObject(FullResponseHandler.class), eq(TEST_HTTP_TIMEOUT))).andReturn(
Futures.immediateFuture(responseHolder)
);
replayAll();
@ -391,13 +461,13 @@ public class KafkaIndexTaskClientTest extends EasyMockSupport
.andReturn(HttpResponseStatus.OK).times(2);
expect(responseHolder.getContent()).andReturn("\"PAUSED\"")
.andReturn("{\"0\":1, \"1\":10}").anyTimes();
expect(httpClient.go(capture(captured), anyObject(FullResponseHandler.class))).andReturn(
expect(httpClient.go(capture(captured), anyObject(FullResponseHandler.class), eq(TEST_HTTP_TIMEOUT))).andReturn(
Futures.immediateFuture(responseHolder)
);
expect(httpClient.go(capture(captured2), anyObject(FullResponseHandler.class))).andReturn(
expect(httpClient.go(capture(captured2), anyObject(FullResponseHandler.class), eq(TEST_HTTP_TIMEOUT))).andReturn(
Futures.immediateFuture(responseHolder)
);
expect(httpClient.go(capture(captured3), anyObject(FullResponseHandler.class))).andReturn(
expect(httpClient.go(capture(captured3), anyObject(FullResponseHandler.class), eq(TEST_HTTP_TIMEOUT))).andReturn(
Futures.immediateFuture(responseHolder)
);
@ -437,8 +507,8 @@ public class KafkaIndexTaskClientTest extends EasyMockSupport
public void testResume() throws Exception
{
Capture<Request> captured = Capture.newInstance();
expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.OK);
expect(httpClient.go(capture(captured), anyObject(FullResponseHandler.class))).andReturn(
expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.OK).anyTimes();
expect(httpClient.go(capture(captured), anyObject(FullResponseHandler.class), eq(TEST_HTTP_TIMEOUT))).andReturn(
Futures.immediateFuture(responseHolder)
);
replayAll();
@ -461,8 +531,8 @@ public class KafkaIndexTaskClientTest extends EasyMockSupport
Map<Integer, Long> endOffsets = ImmutableMap.of(0, 15L, 1, 120L);
Capture<Request> captured = Capture.newInstance();
expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.OK);
expect(httpClient.go(capture(captured), anyObject(FullResponseHandler.class))).andReturn(
expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.OK).anyTimes();
expect(httpClient.go(capture(captured), anyObject(FullResponseHandler.class), eq(TEST_HTTP_TIMEOUT))).andReturn(
Futures.immediateFuture(responseHolder)
);
replayAll();
@ -486,8 +556,8 @@ public class KafkaIndexTaskClientTest extends EasyMockSupport
Map<Integer, Long> endOffsets = ImmutableMap.of(0, 15L, 1, 120L);
Capture<Request> captured = Capture.newInstance();
expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.OK);
expect(httpClient.go(capture(captured), anyObject(FullResponseHandler.class))).andReturn(
expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.OK).anyTimes();
expect(httpClient.go(capture(captured), anyObject(FullResponseHandler.class), eq(TEST_HTTP_TIMEOUT))).andReturn(
Futures.immediateFuture(responseHolder)
);
replayAll();
@ -509,8 +579,8 @@ public class KafkaIndexTaskClientTest extends EasyMockSupport
public void testStop() throws Exception
{
Capture<Request> captured = Capture.newInstance();
expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.OK);
expect(httpClient.go(capture(captured), anyObject(FullResponseHandler.class))).andReturn(
expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.OK).anyTimes();
expect(httpClient.go(capture(captured), anyObject(FullResponseHandler.class), eq(TEST_HTTP_TIMEOUT))).andReturn(
Futures.immediateFuture(responseHolder)
);
replayAll();
@ -531,8 +601,8 @@ public class KafkaIndexTaskClientTest extends EasyMockSupport
public void testStopAndPublish() throws Exception
{
Capture<Request> captured = Capture.newInstance();
expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.OK);
expect(httpClient.go(capture(captured), anyObject(FullResponseHandler.class))).andReturn(
expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.OK).anyTimes();
expect(httpClient.go(capture(captured), anyObject(FullResponseHandler.class), eq(TEST_HTTP_TIMEOUT))).andReturn(
Futures.immediateFuture(responseHolder)
);
replayAll();
@ -549,6 +619,345 @@ public class KafkaIndexTaskClientTest extends EasyMockSupport
Assert.assertTrue(request.getHeaders().get("X-Druid-Task-Id").contains("test-id"));
}
@Test
public void testStopAsync() throws Exception
{
final int numRequests = TEST_IDS.size();
Capture<Request> captured = Capture.newInstance(CaptureType.ALL);
expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.OK).anyTimes();
expect(httpClient.go(capture(captured), anyObject(FullResponseHandler.class), eq(TEST_HTTP_TIMEOUT))).andReturn(
Futures.immediateFuture(responseHolder)
).times(numRequests);
replayAll();
List<URL> expectedUrls = Lists.newArrayList();
List<ListenableFuture<Boolean>> futures = Lists.newArrayList();
for (int i = 0; i < numRequests; i++) {
expectedUrls.add(new URL(String.format(URL_FORMATTER, TEST_HOST, TEST_PORT, TEST_IDS.get(i), "stop")));
futures.add(client.stopAsync(TEST_IDS.get(i), false));
}
List<Boolean> responses = Futures.allAsList(futures).get();
verifyAll();
List<Request> requests = captured.getValues();
Assert.assertEquals(numRequests, requests.size());
Assert.assertEquals(numRequests, responses.size());
for (int i = 0; i < numRequests; i++) {
Assert.assertEquals(HttpMethod.POST, requests.get(i).getMethod());
Assert.assertTrue("unexpectedURL", expectedUrls.contains(requests.get(i).getUrl()));
Assert.assertTrue(responses.get(i));
}
}
@Test
public void testResumeAsync() throws Exception
{
final int numRequests = TEST_IDS.size();
Capture<Request> captured = Capture.newInstance(CaptureType.ALL);
expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.OK).anyTimes();
expect(httpClient.go(capture(captured), anyObject(FullResponseHandler.class), eq(TEST_HTTP_TIMEOUT))).andReturn(
Futures.immediateFuture(responseHolder)
).times(numRequests);
replayAll();
List<URL> expectedUrls = Lists.newArrayList();
List<ListenableFuture<Boolean>> futures = Lists.newArrayList();
for (int i = 0; i < numRequests; i++) {
expectedUrls.add(new URL(String.format(URL_FORMATTER, TEST_HOST, TEST_PORT, TEST_IDS.get(i), "resume")));
futures.add(client.resumeAsync(TEST_IDS.get(i)));
}
List<Boolean> responses = Futures.allAsList(futures).get();
verifyAll();
List<Request> requests = captured.getValues();
Assert.assertEquals(numRequests, requests.size());
Assert.assertEquals(numRequests, responses.size());
for (int i = 0; i < numRequests; i++) {
Assert.assertEquals(HttpMethod.POST, requests.get(i).getMethod());
Assert.assertTrue("unexpectedURL", expectedUrls.contains(requests.get(i).getUrl()));
Assert.assertTrue(responses.get(i));
}
}
@Test
public void testPauseAsync() throws Exception
{
final int numRequests = TEST_IDS.size();
Capture<Request> captured = Capture.newInstance(CaptureType.ALL);
expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.OK).anyTimes();
expect(responseHolder.getContent()).andReturn("{\"0\":\"1\"}").anyTimes();
expect(httpClient.go(capture(captured), anyObject(FullResponseHandler.class), eq(TEST_HTTP_TIMEOUT))).andReturn(
Futures.immediateFuture(responseHolder)
).times(numRequests);
replayAll();
List<URL> expectedUrls = Lists.newArrayList();
List<ListenableFuture<Map<Integer, Long>>> futures = Lists.newArrayList();
for (int i = 0; i < numRequests; i++) {
expectedUrls.add(new URL(String.format(URL_FORMATTER, TEST_HOST, TEST_PORT, TEST_IDS.get(i), "pause")));
futures.add(client.pauseAsync(TEST_IDS.get(i)));
}
List<Map<Integer, Long>> responses = Futures.allAsList(futures).get();
verifyAll();
List<Request> requests = captured.getValues();
Assert.assertEquals(numRequests, requests.size());
Assert.assertEquals(numRequests, responses.size());
for (int i = 0; i < numRequests; i++) {
Assert.assertEquals(HttpMethod.POST, requests.get(i).getMethod());
Assert.assertTrue("unexpectedURL", expectedUrls.contains(requests.get(i).getUrl()));
Assert.assertEquals(Maps.newLinkedHashMap(ImmutableMap.of(0, 1L)), responses.get(i));
}
}
@Test
public void testPauseAsyncWithTimeout() throws Exception
{
final int numRequests = TEST_IDS.size();
Capture<Request> captured = Capture.newInstance(CaptureType.ALL);
expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.OK).anyTimes();
expect(responseHolder.getContent()).andReturn("{\"0\":\"1\"}").anyTimes();
expect(httpClient.go(capture(captured), anyObject(FullResponseHandler.class), eq(TEST_HTTP_TIMEOUT))).andReturn(
Futures.immediateFuture(responseHolder)
).times(numRequests);
replayAll();
List<URL> expectedUrls = Lists.newArrayList();
List<ListenableFuture<Map<Integer, Long>>> futures = Lists.newArrayList();
for (int i = 0; i < numRequests; i++) {
expectedUrls.add(new URL(String.format(URL_FORMATTER, TEST_HOST, TEST_PORT, TEST_IDS.get(i), "pause?timeout=9")));
futures.add(client.pauseAsync(TEST_IDS.get(i), 9));
}
List<Map<Integer, Long>> responses = Futures.allAsList(futures).get();
verifyAll();
List<Request> requests = captured.getValues();
Assert.assertEquals(numRequests, requests.size());
Assert.assertEquals(numRequests, responses.size());
for (int i = 0; i < numRequests; i++) {
Assert.assertEquals(HttpMethod.POST, requests.get(i).getMethod());
Assert.assertTrue("unexpectedURL", expectedUrls.contains(requests.get(i).getUrl()));
Assert.assertEquals(Maps.newLinkedHashMap(ImmutableMap.of(0, 1L)), responses.get(i));
}
}
@Test
public void testGetStatusAsync() throws Exception
{
final int numRequests = TEST_IDS.size();
Capture<Request> captured = Capture.newInstance(CaptureType.ALL);
expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.OK).anyTimes();
expect(responseHolder.getContent()).andReturn("\"READING\"").anyTimes();
expect(httpClient.go(capture(captured), anyObject(FullResponseHandler.class), eq(TEST_HTTP_TIMEOUT))).andReturn(
Futures.immediateFuture(responseHolder)
).times(numRequests);
replayAll();
List<URL> expectedUrls = Lists.newArrayList();
List<ListenableFuture<KafkaIndexTask.Status>> futures = Lists.newArrayList();
for (int i = 0; i < numRequests; i++) {
expectedUrls.add(new URL(String.format(URL_FORMATTER, TEST_HOST, TEST_PORT, TEST_IDS.get(i), "status")));
futures.add(client.getStatusAsync(TEST_IDS.get(i)));
}
List<KafkaIndexTask.Status> responses = Futures.allAsList(futures).get();
verifyAll();
List<Request> requests = captured.getValues();
Assert.assertEquals(numRequests, requests.size());
Assert.assertEquals(numRequests, responses.size());
for (int i = 0; i < numRequests; i++) {
Assert.assertEquals(HttpMethod.GET, requests.get(i).getMethod());
Assert.assertTrue("unexpectedURL", expectedUrls.contains(requests.get(i).getUrl()));
Assert.assertEquals(KafkaIndexTask.Status.READING, responses.get(i));
}
}
@Test
public void testGetStartTimeAsync() throws Exception
{
final DateTime now = DateTime.now();
final int numRequests = TEST_IDS.size();
Capture<Request> captured = Capture.newInstance(CaptureType.ALL);
expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.OK).anyTimes();
expect(responseHolder.getContent()).andReturn(String.valueOf(now.getMillis())).anyTimes();
expect(httpClient.go(capture(captured), anyObject(FullResponseHandler.class), eq(TEST_HTTP_TIMEOUT))).andReturn(
Futures.immediateFuture(responseHolder)
).times(numRequests);
replayAll();
List<URL> expectedUrls = Lists.newArrayList();
List<ListenableFuture<DateTime>> futures = Lists.newArrayList();
for (int i = 0; i < numRequests; i++) {
expectedUrls.add(new URL(String.format(URL_FORMATTER, TEST_HOST, TEST_PORT, TEST_IDS.get(i), "time/start")));
futures.add(client.getStartTimeAsync(TEST_IDS.get(i)));
}
List<DateTime> responses = Futures.allAsList(futures).get();
verifyAll();
List<Request> requests = captured.getValues();
Assert.assertEquals(numRequests, requests.size());
Assert.assertEquals(numRequests, responses.size());
for (int i = 0; i < numRequests; i++) {
Assert.assertEquals(HttpMethod.GET, requests.get(i).getMethod());
Assert.assertTrue("unexpectedURL", expectedUrls.contains(requests.get(i).getUrl()));
Assert.assertEquals(now, responses.get(i));
}
}
@Test
public void testGetCurrentOffsetsAsync() throws Exception
{
final int numRequests = TEST_IDS.size();
Capture<Request> captured = Capture.newInstance(CaptureType.ALL);
expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.OK).anyTimes();
expect(responseHolder.getContent()).andReturn("{\"0\":\"1\"}").anyTimes();
expect(httpClient.go(capture(captured), anyObject(FullResponseHandler.class), eq(TEST_HTTP_TIMEOUT))).andReturn(
Futures.immediateFuture(responseHolder)
).times(numRequests);
replayAll();
List<URL> expectedUrls = Lists.newArrayList();
List<ListenableFuture<Map<Integer, Long>>> futures = Lists.newArrayList();
for (int i = 0; i < numRequests; i++) {
expectedUrls.add(new URL(String.format(URL_FORMATTER, TEST_HOST, TEST_PORT, TEST_IDS.get(i), "offsets/current")));
futures.add(client.getCurrentOffsetsAsync(TEST_IDS.get(i), false));
}
List<Map<Integer, Long>> responses = Futures.allAsList(futures).get();
verifyAll();
List<Request> requests = captured.getValues();
Assert.assertEquals(numRequests, requests.size());
Assert.assertEquals(numRequests, responses.size());
for (int i = 0; i < numRequests; i++) {
Assert.assertEquals(HttpMethod.GET, requests.get(i).getMethod());
Assert.assertTrue("unexpectedURL", expectedUrls.contains(requests.get(i).getUrl()));
Assert.assertEquals(Maps.newLinkedHashMap(ImmutableMap.of(0, 1L)), responses.get(i));
}
}
@Test
public void testGetEndOffsetsAsync() throws Exception
{
final int numRequests = TEST_IDS.size();
Capture<Request> captured = Capture.newInstance(CaptureType.ALL);
expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.OK).anyTimes();
expect(responseHolder.getContent()).andReturn("{\"0\":\"1\"}").anyTimes();
expect(httpClient.go(capture(captured), anyObject(FullResponseHandler.class), eq(TEST_HTTP_TIMEOUT))).andReturn(
Futures.immediateFuture(responseHolder)
).times(numRequests);
replayAll();
List<URL> expectedUrls = Lists.newArrayList();
List<ListenableFuture<Map<Integer, Long>>> futures = Lists.newArrayList();
for (int i = 0; i < numRequests; i++) {
expectedUrls.add(new URL(String.format(URL_FORMATTER, TEST_HOST, TEST_PORT, TEST_IDS.get(i), "offsets/end")));
futures.add(client.getEndOffsetsAsync(TEST_IDS.get(i)));
}
List<Map<Integer, Long>> responses = Futures.allAsList(futures).get();
verifyAll();
List<Request> requests = captured.getValues();
Assert.assertEquals(numRequests, requests.size());
Assert.assertEquals(numRequests, responses.size());
for (int i = 0; i < numRequests; i++) {
Assert.assertEquals(HttpMethod.GET, requests.get(i).getMethod());
Assert.assertTrue("unexpectedURL", expectedUrls.contains(requests.get(i).getUrl()));
Assert.assertEquals(Maps.newLinkedHashMap(ImmutableMap.of(0, 1L)), responses.get(i));
}
}
@Test
public void testSetEndOffsetsAsync() throws Exception
{
final Map<Integer, Long> endOffsets = ImmutableMap.of(0, 15L, 1, 120L);
final int numRequests = TEST_IDS.size();
Capture<Request> captured = Capture.newInstance(CaptureType.ALL);
expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.OK).anyTimes();
expect(httpClient.go(capture(captured), anyObject(FullResponseHandler.class), eq(TEST_HTTP_TIMEOUT))).andReturn(
Futures.immediateFuture(responseHolder)
).times(numRequests);
replayAll();
List<URL> expectedUrls = Lists.newArrayList();
List<ListenableFuture<Boolean>> futures = Lists.newArrayList();
for (int i = 0; i < numRequests; i++) {
expectedUrls.add(new URL(String.format(URL_FORMATTER, TEST_HOST, TEST_PORT, TEST_IDS.get(i), "offsets/end")));
futures.add(client.setEndOffsetsAsync(TEST_IDS.get(i), endOffsets));
}
List<Boolean> responses = Futures.allAsList(futures).get();
verifyAll();
List<Request> requests = captured.getValues();
Assert.assertEquals(numRequests, requests.size());
Assert.assertEquals(numRequests, responses.size());
for (int i = 0; i < numRequests; i++) {
Assert.assertEquals(HttpMethod.POST, requests.get(i).getMethod());
Assert.assertTrue("unexpectedURL", expectedUrls.contains(requests.get(i).getUrl()));
Assert.assertTrue(responses.get(i));
}
}
@Test
public void testSetEndOffsetsAsyncWithResume() throws Exception
{
final Map<Integer, Long> endOffsets = ImmutableMap.of(0, 15L, 1, 120L);
final int numRequests = TEST_IDS.size();
Capture<Request> captured = Capture.newInstance(CaptureType.ALL);
expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.OK).anyTimes();
expect(httpClient.go(capture(captured), anyObject(FullResponseHandler.class), eq(TEST_HTTP_TIMEOUT))).andReturn(
Futures.immediateFuture(responseHolder)
).times(numRequests);
replayAll();
List<URL> expectedUrls = Lists.newArrayList();
List<ListenableFuture<Boolean>> futures = Lists.newArrayList();
for (int i = 0; i < numRequests; i++) {
expectedUrls.add(
new URL(
String.format(
URL_FORMATTER,
TEST_HOST,
TEST_PORT,
TEST_IDS.get(i),
"offsets/end?resume=true"
)
)
);
futures.add(client.setEndOffsetsAsync(TEST_IDS.get(i), endOffsets, true));
}
List<Boolean> responses = Futures.allAsList(futures).get();
verifyAll();
List<Request> requests = captured.getValues();
Assert.assertEquals(numRequests, requests.size());
Assert.assertEquals(numRequests, responses.size());
for (int i = 0; i < numRequests; i++) {
Assert.assertEquals(HttpMethod.POST, requests.get(i).getMethod());
Assert.assertTrue("unexpectedURL", expectedUrls.contains(requests.get(i).getUrl()));
Assert.assertTrue(responses.get(i));
}
}
private class TestableKafkaIndexTaskClient extends KafkaIndexTaskClient
{
public TestableKafkaIndexTaskClient(
@ -557,42 +966,20 @@ public class KafkaIndexTaskClientTest extends EasyMockSupport
TaskInfoProvider taskInfoProvider
)
{
super(httpClient, jsonMapper, taskInfoProvider);
this(httpClient, jsonMapper, taskInfoProvider, TEST_NUM_RETRIES);
}
@Override
RetryPolicyFactory createRetryPolicyFactory()
public TestableKafkaIndexTaskClient(
HttpClient httpClient,
ObjectMapper jsonMapper,
TaskInfoProvider taskInfoProvider,
long numRetries
)
{
return new RetryPolicyFactory(
new RetryPolicyConfig()
.setMinWait(new Period("PT1S"))
.setMaxRetryCount(0)
);
super(httpClient, jsonMapper, taskInfoProvider, TEST_DATASOURCE, numThreads, TEST_HTTP_TIMEOUT, numRetries);
}
@Override
void checkConnection(String host, int port) throws IOException { }
}
private class RetryingTestableKafkaIndexTaskClient extends TestableKafkaIndexTaskClient
{
public RetryingTestableKafkaIndexTaskClient(
HttpClient httpClient,
ObjectMapper jsonMapper,
TaskInfoProvider taskInfoProvider
)
{
super(httpClient, jsonMapper, taskInfoProvider);
}
@Override
RetryPolicyFactory createRetryPolicyFactory()
{
return new RetryPolicyFactory(
new RetryPolicyConfig()
.setMinWait(new Period("PT1S"))
.setMaxRetryCount(1)
);
}
}
}

View File

@ -24,7 +24,6 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.segment.IndexSpec;
import io.druid.segment.indexing.TuningConfig;
import org.joda.time.DateTime;
import org.joda.time.Period;
import org.junit.Assert;
import org.junit.Test;
@ -101,4 +100,21 @@ public class KafkaTuningConfigTest
Assert.assertEquals(true, config.isReportParseExceptions());
Assert.assertEquals(100, config.getHandoffConditionTimeout());
}
@Test
public void testCopyOf() throws Exception
{
KafkaTuningConfig original = new KafkaTuningConfig(1, 2, new Period("PT3S"), new File("/tmp/xxx"), 4, new IndexSpec(), true, true, 5L);
KafkaTuningConfig copy = KafkaTuningConfig.copyOf(original);
Assert.assertEquals(1, copy.getMaxRowsInMemory());
Assert.assertEquals(2, copy.getMaxRowsPerSegment());
Assert.assertEquals(new Period("PT3S"), copy.getIntermediatePersistPeriod());
Assert.assertEquals(new File("/tmp/xxx"), copy.getBasePersistDirectory());
Assert.assertEquals(4, copy.getMaxPendingPersists());
Assert.assertEquals(new IndexSpec(), copy.getIndexSpec());
Assert.assertEquals(true, copy.getBuildV9Directly());
Assert.assertEquals(true, copy.isReportParseExceptions());
Assert.assertEquals(5L, copy.getHandoffConditionTimeout());
}
}

View File

@ -24,6 +24,7 @@ import com.google.common.base.Charsets;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.metamx.common.Granularity;
import com.metamx.common.ISE;
@ -70,10 +71,9 @@ import org.apache.kafka.clients.producer.ProducerRecord;
import org.easymock.Capture;
import org.easymock.CaptureType;
import org.easymock.EasyMock;
import org.easymock.EasyMockRunner;
import org.easymock.EasyMockSupport;
import org.easymock.Mock;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.joda.time.Interval;
import org.joda.time.Period;
import org.junit.After;
@ -83,6 +83,7 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.io.File;
import java.util.ArrayList;
@ -95,50 +96,61 @@ import java.util.concurrent.Executor;
import static org.easymock.EasyMock.anyObject;
import static org.easymock.EasyMock.anyString;
import static org.easymock.EasyMock.capture;
import static org.easymock.EasyMock.eq;
import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.expectLastCall;
import static org.easymock.EasyMock.replay;
import static org.easymock.EasyMock.reset;
@RunWith(EasyMockRunner.class)
@RunWith(Parameterized.class)
public class KafkaSupervisorTest extends EasyMockSupport
{
private static final ObjectMapper objectMapper = new DefaultObjectMapper();
private static final String KAFKA_TOPIC = "testTopic";
private static final String DATASOURCE = "testDS";
private static final int NUM_PARTITIONS = 3;
private static final int TEST_CHAT_THREADS = 3;
private static final long TEST_CHAT_RETRIES = 9L;
private static final Period TEST_HTTP_TIMEOUT = new Period("PT10S");
private int numThreads;
private TestingCluster zkServer;
private TestBroker kafkaServer;
private KafkaSupervisor supervisor;
private String kafkaHost;
private DataSchema dataSchema;
private KafkaTuningConfig tuningConfig;
@Mock
private KafkaSupervisorTuningConfig tuningConfig;
private TaskStorage taskStorage;
@Mock
private TaskMaster taskMaster;
@Mock
private TaskRunner taskRunner;
@Mock
private IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator;
@Mock
private KafkaIndexTaskClient taskClient;
@Mock
private TaskQueue taskQueue;
@Rule
public final TemporaryFolder tempFolder = new TemporaryFolder();
@Parameterized.Parameters(name = "numThreads = {0}")
public static Iterable<Object[]> constructorFeeder()
{
return ImmutableList.of(new Object[]{1}, new Object[]{8});
}
public KafkaSupervisorTest(int numThreads)
{
this.numThreads = numThreads;
}
@Before
public void setUp() throws Exception
{
taskStorage = createMock(TaskStorage.class);
taskMaster = createMock(TaskMaster.class);
taskRunner = createMock(TaskRunner.class);
indexerMetadataStorageCoordinator = createMock(IndexerMetadataStorageCoordinator.class);
taskClient = createMock(KafkaIndexTaskClient.class);
taskQueue = createMock(TaskQueue.class);
zkServer = new TestingCluster(1);
zkServer.start();
@ -152,7 +164,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
kafkaHost = String.format("localhost:%d", kafkaServer.getPort());
dataSchema = getDataSchema(DATASOURCE);
tuningConfig = new KafkaTuningConfig(
tuningConfig = new KafkaSupervisorTuningConfig(
1000,
50000,
new Period("P1Y"),
@ -161,7 +173,11 @@ public class KafkaSupervisorTest extends EasyMockSupport
null,
true,
false,
null
null,
numThreads,
TEST_CHAT_THREADS,
TEST_CHAT_RETRIES,
TEST_HTTP_TIMEOUT
);
}
@ -202,7 +218,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
KafkaIndexTask task = captured.getValue();
Assert.assertEquals(dataSchema, task.getDataSchema());
Assert.assertEquals(tuningConfig, task.getTuningConfig());
Assert.assertEquals(KafkaTuningConfig.copyOf(tuningConfig), task.getTuningConfig());
KafkaIOConfig taskConfig = task.getIOConfig();
Assert.assertEquals(kafkaHost, taskConfig.getConsumerProperties().get("bootstrap.servers"));
@ -485,9 +501,9 @@ public class KafkaSupervisorTest extends EasyMockSupport
expect(taskStorage.getTask("id1")).andReturn(Optional.of(id1)).anyTimes();
expect(taskStorage.getTask("id2")).andReturn(Optional.of(id2)).anyTimes();
expect(taskStorage.getTask("id3")).andReturn(Optional.of(id3)).anyTimes();
expect(taskClient.getStatus(anyString())).andThrow(taskClient.new NoTaskLocationException("test-id")).anyTimes();
expect(taskClient.getStartTime(anyString())).andThrow(taskClient.new NoTaskLocationException("test-id"))
expect(taskClient.getStatusAsync(anyString())).andReturn(Futures.immediateFuture(KafkaIndexTask.Status.NOT_STARTED))
.anyTimes();
expect(taskClient.getStartTimeAsync(anyString())).andReturn(Futures.immediateFuture(DateTime.now())).anyTimes();
expect(indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn(
new KafkaDataSourceMetadata(
null
@ -568,9 +584,9 @@ public class KafkaSupervisorTest extends EasyMockSupport
expect(taskStorage.getTask("id3")).andReturn(Optional.of(id3)).anyTimes();
expect(taskStorage.getTask("id4")).andReturn(Optional.of(id3)).anyTimes();
expect(taskStorage.getTask("id5")).andReturn(Optional.of(id3)).anyTimes();
expect(taskClient.getStatus(anyString())).andThrow(taskClient.new NoTaskLocationException("test-id")).anyTimes();
expect(taskClient.getStartTime(anyString())).andThrow(taskClient.new NoTaskLocationException("test-id"))
expect(taskClient.getStatusAsync(anyString())).andReturn(Futures.immediateFuture(KafkaIndexTask.Status.NOT_STARTED))
.anyTimes();
expect(taskClient.getStartTimeAsync(anyString())).andReturn(Futures.immediateFuture(DateTime.now())).anyTimes();
expect(indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn(
new KafkaDataSourceMetadata(
null
@ -598,9 +614,9 @@ public class KafkaSupervisorTest extends EasyMockSupport
expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
expect(taskRunner.getRunningTasks()).andReturn(Collections.EMPTY_LIST).anyTimes();
expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.<Task>of()).anyTimes();
expect(taskClient.getStatus(anyString())).andThrow(taskClient.new NoTaskLocationException("test-id")).anyTimes();
expect(taskClient.getStartTime(anyString())).andThrow(taskClient.new NoTaskLocationException("test-id"))
expect(taskClient.getStatusAsync(anyString())).andReturn(Futures.immediateFuture(KafkaIndexTask.Status.NOT_STARTED))
.anyTimes();
expect(taskClient.getStartTimeAsync(anyString())).andReturn(Futures.immediateFuture(DateTime.now())).anyTimes();
expect(indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn(
new KafkaDataSourceMetadata(
null
@ -681,8 +697,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
expect(taskStorage.getActiveTasks()).andReturn(existingTasks).anyTimes();
expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes();
expect(taskStorage.getTask("id1")).andReturn(Optional.of(id1)).anyTimes();
expect(taskClient.getStartTime(anyString())).andThrow(taskClient.new NoTaskLocationException("test-id"))
.anyTimes();
expect(taskClient.getStatusAsync("id1")).andReturn(Futures.immediateFuture(KafkaIndexTask.Status.READING));
expect(taskClient.getStartTimeAsync("id1")).andReturn(Futures.immediateFuture(now)).anyTimes();
expect(taskQueue.add(capture(captured))).andReturn(true);
expect(indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn(
new KafkaDataSourceMetadata(
@ -705,14 +721,18 @@ public class KafkaSupervisorTest extends EasyMockSupport
KafkaIndexTask iHaveFailed = (KafkaIndexTask) existingTasks.get(0);
reset(taskStorage);
reset(taskQueue);
reset(taskClient);
expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of(captured.getValue())).anyTimes();
expect(taskStorage.getStatus(iHaveFailed.getId())).andReturn(Optional.of(TaskStatus.failure(iHaveFailed.getId())));
expect(taskStorage.getStatus(runningTaskId)).andReturn(Optional.of(TaskStatus.running(runningTaskId))).anyTimes();
expect(taskStorage.getTask(iHaveFailed.getId())).andReturn(Optional.of((Task) iHaveFailed)).anyTimes();
expect(taskStorage.getTask(runningTaskId)).andReturn(Optional.of(captured.getValue())).anyTimes();
expect(taskClient.getStatusAsync(runningTaskId)).andReturn(Futures.immediateFuture(KafkaIndexTask.Status.READING));
expect(taskClient.getStartTimeAsync(runningTaskId)).andReturn(Futures.immediateFuture(now)).anyTimes();
expect(taskQueue.add(capture(aNewTaskCapture))).andReturn(true);
replay(taskStorage);
replay(taskQueue);
replay(taskClient);
supervisor.runInternal();
verifyAll();
@ -739,9 +759,9 @@ public class KafkaSupervisorTest extends EasyMockSupport
expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
expect(taskRunner.getRunningTasks()).andReturn(Collections.EMPTY_LIST).anyTimes();
expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.<Task>of()).anyTimes();
expect(taskClient.getStatus(anyString())).andThrow(taskClient.new NoTaskLocationException("test-id")).anyTimes();
expect(taskClient.getStartTime(anyString())).andThrow(taskClient.new NoTaskLocationException("test-id"))
expect(taskClient.getStatusAsync(anyString())).andReturn(Futures.immediateFuture(KafkaIndexTask.Status.NOT_STARTED))
.anyTimes();
expect(taskClient.getStartTimeAsync(anyString())).andReturn(Futures.immediateFuture(DateTime.now())).anyTimes();
expect(indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn(
new KafkaDataSourceMetadata(
null
@ -776,6 +796,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
KafkaIndexTask iAmSuccess = (KafkaIndexTask) tasks.get(0);
reset(taskStorage);
reset(taskQueue);
reset(taskClient);
expect(taskStorage.getActiveTasks()).andReturn(imStillRunning).anyTimes();
for (Task task : imStillRunning) {
expect(taskStorage.getStatus(task.getId())).andReturn(Optional.of(TaskStatus.running(task.getId()))).anyTimes();
@ -783,10 +804,11 @@ public class KafkaSupervisorTest extends EasyMockSupport
}
expect(taskStorage.getStatus(iAmSuccess.getId())).andReturn(Optional.of(TaskStatus.success(iAmSuccess.getId())));
expect(taskStorage.getTask(iAmSuccess.getId())).andReturn(Optional.of((Task) iAmSuccess)).anyTimes();
taskQueue.shutdown(capture(shutdownTaskIdCapture));
expect(taskQueue.add(capture(newTasksCapture))).andReturn(true).times(2);
expect(taskClient.stopAsync(capture(shutdownTaskIdCapture), eq(false))).andReturn(Futures.immediateFuture(true));
replay(taskStorage);
replay(taskQueue);
replay(taskClient);
supervisor.runInternal();
verifyAll();
@ -835,24 +857,25 @@ public class KafkaSupervisorTest extends EasyMockSupport
expect(taskStorage.getTask(task.getId())).andReturn(Optional.of(task)).anyTimes();
}
expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes();
expect(taskClient.getStatus(anyString()))
.andReturn(KafkaIndexTask.Status.READING)
expect(taskClient.getStatusAsync(anyString()))
.andReturn(Futures.immediateFuture(KafkaIndexTask.Status.READING))
.anyTimes();
expect(taskClient.getStartTime(EasyMock.contains("sequenceName-0")))
.andReturn(DateTime.now().minusMinutes(2))
.andReturn(DateTime.now());
expect(taskClient.getStartTime(EasyMock.contains("sequenceName-1")))
.andReturn(DateTime.now())
expect(taskClient.getStartTimeAsync(EasyMock.contains("sequenceName-0")))
.andReturn(Futures.immediateFuture(DateTime.now().minusMinutes(2)))
.andReturn(Futures.immediateFuture(DateTime.now()));
expect(taskClient.getStartTimeAsync(EasyMock.contains("sequenceName-1")))
.andReturn(Futures.immediateFuture(DateTime.now()))
.times(2);
expect(taskClient.pause(EasyMock.contains("sequenceName-0")))
.andReturn(ImmutableMap.of(0, 10L, 1, 20L, 2, 30L))
.andReturn(ImmutableMap.of(0, 10L, 1, 15L, 2, 35L));
taskClient.setEndOffsets(
expect(taskClient.pauseAsync(EasyMock.contains("sequenceName-0")))
.andReturn(Futures.immediateFuture((Map<Integer, Long>) ImmutableMap.of(0, 10L, 1, 20L, 2, 30L)))
.andReturn(Futures.immediateFuture((Map<Integer, Long>) ImmutableMap.of(0, 10L, 1, 15L, 2, 35L)));
expect(
taskClient.setEndOffsetsAsync(
EasyMock.contains("sequenceName-0"),
EasyMock.eq(ImmutableMap.of(0, 10L, 1, 20L, 2, 35L)),
EasyMock.eq(true)
);
expectLastCall().times(2);
)
).andReturn(Futures.immediateFuture(true)).times(2);
expect(taskQueue.add(capture(captured))).andReturn(true).times(2);
replay(taskStorage, taskRunner, taskClient, taskQueue);
@ -863,7 +886,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
for (Task task : captured.getValues()) {
KafkaIndexTask kafkaIndexTask = (KafkaIndexTask) task;
Assert.assertEquals(dataSchema, kafkaIndexTask.getDataSchema());
Assert.assertEquals(tuningConfig, kafkaIndexTask.getTuningConfig());
Assert.assertEquals(KafkaTuningConfig.copyOf(tuningConfig), kafkaIndexTask.getTuningConfig());
KafkaIOConfig taskConfig = kafkaIndexTask.getIOConfig();
Assert.assertEquals("sequenceName-0", taskConfig.getBaseSequenceName());
@ -909,8 +932,9 @@ public class KafkaSupervisorTest extends EasyMockSupport
null
)
).anyTimes();
expect(taskClient.getStatus("id1")).andReturn(KafkaIndexTask.Status.PUBLISHING);
expect(taskClient.getCurrentOffsets("id1", false)).andReturn(ImmutableMap.of(0, 10L, 1, 20L, 2, 30L));
expect(taskClient.getStatusAsync("id1")).andReturn(Futures.immediateFuture(KafkaIndexTask.Status.PUBLISHING));
expect(taskClient.getCurrentOffsetsAsync("id1", false))
.andReturn(Futures.immediateFuture((Map<Integer, Long>) ImmutableMap.of(0, 10L, 1, 20L, 2, 30L)));
expect(taskClient.getCurrentOffsets("id1", true)).andReturn(ImmutableMap.of(0, 10L, 1, 20L, 2, 30L));
expect(taskQueue.add(capture(captured))).andReturn(true);
@ -936,7 +960,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
Assert.assertEquals(0, payload.getActiveTasks().size());
Assert.assertEquals(1, payload.getPublishingTasks().size());
KafkaSupervisorReport.TaskReportData publishingReport = payload.getPublishingTasks().get(0);
TaskReportData publishingReport = payload.getPublishingTasks().get(0);
Assert.assertEquals("id1", publishingReport.getId());
Assert.assertEquals(ImmutableMap.of(0, 0L, 1, 0L, 2, 0L), publishingReport.getStartingOffsets());
@ -944,7 +968,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
KafkaIndexTask capturedTask = captured.getValue();
Assert.assertEquals(dataSchema, capturedTask.getDataSchema());
Assert.assertEquals(tuningConfig, capturedTask.getTuningConfig());
Assert.assertEquals(KafkaTuningConfig.copyOf(tuningConfig), capturedTask.getTuningConfig());
KafkaIOConfig capturedTaskConfig = capturedTask.getIOConfig();
Assert.assertEquals(kafkaHost, capturedTaskConfig.getConsumerProperties().get("bootstrap.servers"));
@ -997,8 +1021,9 @@ public class KafkaSupervisorTest extends EasyMockSupport
null
)
).anyTimes();
expect(taskClient.getStatus("id1")).andReturn(KafkaIndexTask.Status.PUBLISHING);
expect(taskClient.getCurrentOffsets("id1", false)).andReturn(ImmutableMap.of(0, 10L, 2, 30L));
expect(taskClient.getStatusAsync("id1")).andReturn(Futures.immediateFuture(KafkaIndexTask.Status.PUBLISHING));
expect(taskClient.getCurrentOffsetsAsync("id1", false))
.andReturn(Futures.immediateFuture((Map<Integer, Long>) ImmutableMap.of(0, 10L, 2, 30L)));
expect(taskClient.getCurrentOffsets("id1", true)).andReturn(ImmutableMap.of(0, 10L, 2, 30L));
expect(taskQueue.add(capture(captured))).andReturn(true);
@ -1024,7 +1049,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
Assert.assertEquals(0, payload.getActiveTasks().size());
Assert.assertEquals(1, payload.getPublishingTasks().size());
KafkaSupervisorReport.TaskReportData publishingReport = payload.getPublishingTasks().get(0);
TaskReportData publishingReport = payload.getPublishingTasks().get(0);
Assert.assertEquals("id1", publishingReport.getId());
Assert.assertEquals(ImmutableMap.of(0, 0L, 2, 0L), publishingReport.getStartingOffsets());
@ -1032,7 +1057,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
KafkaIndexTask capturedTask = captured.getValue();
Assert.assertEquals(dataSchema, capturedTask.getDataSchema());
Assert.assertEquals(tuningConfig, capturedTask.getTuningConfig());
Assert.assertEquals(KafkaTuningConfig.copyOf(tuningConfig), capturedTask.getTuningConfig());
KafkaIOConfig capturedTaskConfig = capturedTask.getIOConfig();
Assert.assertEquals(kafkaHost, capturedTaskConfig.getConsumerProperties().get("bootstrap.servers"));
@ -1098,12 +1123,14 @@ public class KafkaSupervisorTest extends EasyMockSupport
null
)
).anyTimes();
expect(taskClient.getStatus("id1")).andReturn(KafkaIndexTask.Status.PUBLISHING);
expect(taskClient.getStatus("id2")).andReturn(KafkaIndexTask.Status.READING);
expect(taskClient.getStartTime("id2")).andReturn(startTime);
expect(taskClient.getCurrentOffsets("id1", false)).andReturn(ImmutableMap.of(0, 10L, 1, 20L, 2, 30L));
expect(taskClient.getStatusAsync("id1")).andReturn(Futures.immediateFuture(KafkaIndexTask.Status.PUBLISHING));
expect(taskClient.getStatusAsync("id2")).andReturn(Futures.immediateFuture(KafkaIndexTask.Status.READING));
expect(taskClient.getStartTimeAsync("id2")).andReturn(Futures.immediateFuture(startTime));
expect(taskClient.getCurrentOffsetsAsync("id1", false))
.andReturn(Futures.immediateFuture((Map<Integer, Long>) ImmutableMap.of(0, 10L, 1, 20L, 2, 30L)));
expect(taskClient.getCurrentOffsets("id1", true)).andReturn(ImmutableMap.of(0, 10L, 1, 20L, 2, 30L));
expect(taskClient.getCurrentOffsets("id2", false)).andReturn(ImmutableMap.of(0, 40L, 1, 50L, 2, 60L));
expect(taskClient.getCurrentOffsetsAsync("id2", false))
.andReturn(Futures.immediateFuture((Map<Integer, Long>) ImmutableMap.of(0, 40L, 1, 50L, 2, 60L)));
taskRunner.registerListener(anyObject(TaskRunnerListener.class), anyObject(Executor.class));
replayAll();
@ -1127,8 +1154,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
Assert.assertEquals(1, payload.getActiveTasks().size());
Assert.assertEquals(1, payload.getPublishingTasks().size());
KafkaSupervisorReport.TaskReportData activeReport = payload.getActiveTasks().get(0);
KafkaSupervisorReport.TaskReportData publishingReport = payload.getPublishingTasks().get(0);
TaskReportData activeReport = payload.getActiveTasks().get(0);
TaskReportData publishingReport = payload.getPublishingTasks().get(0);
Assert.assertEquals("id2", activeReport.getId());
Assert.assertEquals(startTime, activeReport.getStartTime());
@ -1140,6 +1167,191 @@ public class KafkaSupervisorTest extends EasyMockSupport
Assert.assertEquals(ImmutableMap.of(0, 10L, 1, 20L, 2, 30L), publishingReport.getCurrentOffsets());
}
@Test
public void testKillUnresponsiveTasksWhileGettingStartTime() throws Exception
{
supervisor = getSupervisor(2, 2, true, "PT1H", null);
addSomeEvents(1);
Capture<Task> captured = Capture.newInstance(CaptureType.ALL);
expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
expect(taskRunner.getRunningTasks()).andReturn(Collections.EMPTY_LIST).anyTimes();
expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.<Task>of()).anyTimes();
expect(indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn(
new KafkaDataSourceMetadata(
null
)
).anyTimes();
expect(taskQueue.add(capture(captured))).andReturn(true).times(4);
taskRunner.registerListener(anyObject(TaskRunnerListener.class), anyObject(Executor.class));
replayAll();
supervisor.start();
supervisor.runInternal();
verifyAll();
List<Task> tasks = captured.getValues();
reset(taskStorage, taskClient, taskQueue);
expect(taskStorage.getActiveTasks()).andReturn(tasks).anyTimes();
for (Task task : tasks) {
expect(taskStorage.getStatus(task.getId())).andReturn(Optional.of(TaskStatus.running(task.getId()))).anyTimes();
expect(taskStorage.getTask(task.getId())).andReturn(Optional.of(task)).anyTimes();
expect(taskClient.getStatusAsync(task.getId()))
.andReturn(Futures.immediateFuture(KafkaIndexTask.Status.NOT_STARTED));
expect(taskClient.getStartTimeAsync(task.getId()))
.andReturn(Futures.<DateTime>immediateFailedFuture(new RuntimeException()));
taskQueue.shutdown(task.getId());
}
replay(taskStorage, taskClient, taskQueue);
supervisor.runInternal();
verifyAll();
}
@Test
public void testKillUnresponsiveTasksWhilePausing() throws Exception
{
final TaskLocation location = new TaskLocation("testHost", 1234);
supervisor = getSupervisor(2, 2, true, "PT1M", null);
addSomeEvents(100);
Capture<Task> captured = Capture.newInstance(CaptureType.ALL);
expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
expect(taskRunner.getRunningTasks()).andReturn(Collections.EMPTY_LIST).anyTimes();
expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.<Task>of()).anyTimes();
expect(indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn(
new KafkaDataSourceMetadata(
null
)
).anyTimes();
expect(taskQueue.add(capture(captured))).andReturn(true).times(4);
taskRunner.registerListener(anyObject(TaskRunnerListener.class), anyObject(Executor.class));
replayAll();
supervisor.start();
supervisor.runInternal();
verifyAll();
List<Task> tasks = captured.getValues();
Collection workItems = new ArrayList<>();
for (Task task : tasks) {
workItems.add(new TestTaskRunnerWorkItem(task.getId(), null, location));
}
reset(taskStorage, taskRunner, taskClient, taskQueue);
captured = Capture.newInstance(CaptureType.ALL);
expect(taskStorage.getActiveTasks()).andReturn(tasks).anyTimes();
for (Task task : tasks) {
expect(taskStorage.getStatus(task.getId())).andReturn(Optional.of(TaskStatus.running(task.getId()))).anyTimes();
expect(taskStorage.getTask(task.getId())).andReturn(Optional.of(task)).anyTimes();
}
expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes();
expect(taskClient.getStatusAsync(anyString()))
.andReturn(Futures.immediateFuture(KafkaIndexTask.Status.READING))
.anyTimes();
expect(taskClient.getStartTimeAsync(EasyMock.contains("sequenceName-0")))
.andReturn(Futures.immediateFuture(DateTime.now().minusMinutes(2)))
.andReturn(Futures.immediateFuture(DateTime.now()));
expect(taskClient.getStartTimeAsync(EasyMock.contains("sequenceName-1")))
.andReturn(Futures.immediateFuture(DateTime.now()))
.times(2);
expect(taskClient.pauseAsync(EasyMock.contains("sequenceName-0")))
.andReturn(Futures.<Map<Integer, Long>>immediateFailedFuture(new RuntimeException())).times(2);
taskQueue.shutdown(EasyMock.contains("sequenceName-0"));
expectLastCall().times(2);
expect(taskQueue.add(capture(captured))).andReturn(true).times(2);
replay(taskStorage, taskRunner, taskClient, taskQueue);
supervisor.runInternal();
verifyAll();
for (Task task : captured.getValues()) {
KafkaIOConfig taskConfig = ((KafkaIndexTask) task).getIOConfig();
Assert.assertEquals(0L, (long) taskConfig.getStartPartitions().getPartitionOffsetMap().get(0));
Assert.assertEquals(0L, (long) taskConfig.getStartPartitions().getPartitionOffsetMap().get(2));
}
}
@Test
public void testKillUnresponsiveTasksWhileSettingEndOffsets() throws Exception
{
final TaskLocation location = new TaskLocation("testHost", 1234);
supervisor = getSupervisor(2, 2, true, "PT1M", null);
addSomeEvents(100);
Capture<Task> captured = Capture.newInstance(CaptureType.ALL);
expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
expect(taskRunner.getRunningTasks()).andReturn(Collections.EMPTY_LIST).anyTimes();
expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.<Task>of()).anyTimes();
expect(indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn(
new KafkaDataSourceMetadata(
null
)
).anyTimes();
expect(taskQueue.add(capture(captured))).andReturn(true).times(4);
taskRunner.registerListener(anyObject(TaskRunnerListener.class), anyObject(Executor.class));
replayAll();
supervisor.start();
supervisor.runInternal();
verifyAll();
List<Task> tasks = captured.getValues();
Collection workItems = new ArrayList<>();
for (Task task : tasks) {
workItems.add(new TestTaskRunnerWorkItem(task.getId(), null, location));
}
reset(taskStorage, taskRunner, taskClient, taskQueue);
captured = Capture.newInstance(CaptureType.ALL);
expect(taskStorage.getActiveTasks()).andReturn(tasks).anyTimes();
for (Task task : tasks) {
expect(taskStorage.getStatus(task.getId())).andReturn(Optional.of(TaskStatus.running(task.getId()))).anyTimes();
expect(taskStorage.getTask(task.getId())).andReturn(Optional.of(task)).anyTimes();
}
expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes();
expect(taskClient.getStatusAsync(anyString()))
.andReturn(Futures.immediateFuture(KafkaIndexTask.Status.READING))
.anyTimes();
expect(taskClient.getStartTimeAsync(EasyMock.contains("sequenceName-0")))
.andReturn(Futures.immediateFuture(DateTime.now().minusMinutes(2)))
.andReturn(Futures.immediateFuture(DateTime.now()));
expect(taskClient.getStartTimeAsync(EasyMock.contains("sequenceName-1")))
.andReturn(Futures.immediateFuture(DateTime.now()))
.times(2);
expect(taskClient.pauseAsync(EasyMock.contains("sequenceName-0")))
.andReturn(Futures.immediateFuture((Map<Integer, Long>) ImmutableMap.of(0, 10L, 1, 20L, 2, 30L)))
.andReturn(Futures.immediateFuture((Map<Integer, Long>) ImmutableMap.of(0, 10L, 1, 15L, 2, 35L)));
expect(
taskClient.setEndOffsetsAsync(
EasyMock.contains("sequenceName-0"),
EasyMock.eq(ImmutableMap.of(0, 10L, 1, 20L, 2, 35L)),
EasyMock.eq(true)
)
).andReturn(Futures.<Boolean>immediateFailedFuture(new RuntimeException())).times(2);
taskQueue.shutdown(EasyMock.contains("sequenceName-0"));
expectLastCall().times(2);
expect(taskQueue.add(capture(captured))).andReturn(true).times(2);
replay(taskStorage, taskRunner, taskClient, taskQueue);
supervisor.runInternal();
verifyAll();
for (Task task : captured.getValues()) {
KafkaIOConfig taskConfig = ((KafkaIndexTask) task).getIOConfig();
Assert.assertEquals(0L, (long) taskConfig.getStartPartitions().getPartitionOffsetMap().get(0));
Assert.assertEquals(0L, (long) taskConfig.getStartPartitions().getPartitionOffsetMap().get(2));
}
}
@Test(expected = IllegalStateException.class)
public void testStopNotStarted() throws Exception
{
@ -1151,6 +1363,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
public void testStop() throws Exception
{
expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
taskClient.close();
taskRunner.unregisterListener(String.format("KafkaSupervisor-%s", DATASOURCE));
replayAll();
@ -1161,6 +1374,91 @@ public class KafkaSupervisorTest extends EasyMockSupport
verifyAll();
}
@Test
public void testStopGracefully() throws Exception
{
final TaskLocation location1 = new TaskLocation("testHost", 1234);
final TaskLocation location2 = new TaskLocation("testHost2", 145);
final DateTime startTime = new DateTime();
supervisor = getSupervisor(2, 1, true, "PT1H", null);
addSomeEvents(1);
Task id1 = createKafkaIndexTask(
"id1",
DATASOURCE,
"sequenceName-0",
new KafkaPartitions("topic", ImmutableMap.of(0, 0L, 1, 0L, 2, 0L)),
new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)),
null
);
Task id2 = createKafkaIndexTask(
"id2",
DATASOURCE,
"sequenceName-0",
new KafkaPartitions("topic", ImmutableMap.of(0, 10L, 1, 20L, 2, 30L)),
new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)),
null
);
Task id3 = createKafkaIndexTask(
"id3",
DATASOURCE,
"sequenceName-0",
new KafkaPartitions("topic", ImmutableMap.of(0, 10L, 1, 20L, 2, 30L)),
new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)),
null
);
Collection workItems = new ArrayList<>();
workItems.add(new TestTaskRunnerWorkItem(id1.getId(), null, location1));
workItems.add(new TestTaskRunnerWorkItem(id2.getId(), null, location2));
expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes();
expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of(id1, id2, id3)).anyTimes();
expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes();
expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes();
expect(taskStorage.getStatus("id3")).andReturn(Optional.of(TaskStatus.running("id3"))).anyTimes();
expect(taskStorage.getTask("id1")).andReturn(Optional.of(id1)).anyTimes();
expect(taskStorage.getTask("id2")).andReturn(Optional.of(id2)).anyTimes();
expect(taskStorage.getTask("id3")).andReturn(Optional.of(id3)).anyTimes();
expect(indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn(
new KafkaDataSourceMetadata(
null
)
).anyTimes();
expect(taskClient.getStatusAsync("id1")).andReturn(Futures.immediateFuture(KafkaIndexTask.Status.PUBLISHING));
expect(taskClient.getStatusAsync("id2")).andReturn(Futures.immediateFuture(KafkaIndexTask.Status.READING));
expect(taskClient.getStatusAsync("id3")).andReturn(Futures.immediateFuture(KafkaIndexTask.Status.READING));
expect(taskClient.getStartTimeAsync("id2")).andReturn(Futures.immediateFuture(startTime));
expect(taskClient.getStartTimeAsync("id3")).andReturn(Futures.immediateFuture(startTime));
expect(taskClient.getCurrentOffsets("id1", true)).andReturn(ImmutableMap.of(0, 10L, 1, 20L, 2, 30L));
taskRunner.registerListener(anyObject(TaskRunnerListener.class), anyObject(Executor.class));
replayAll();
supervisor.start();
supervisor.runInternal();
verifyAll();
reset(taskRunner, taskClient, taskQueue);
expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes();
expect(taskClient.pauseAsync("id2"))
.andReturn(Futures.immediateFuture((Map<Integer, Long>) ImmutableMap.of(0, 15L, 1, 25L, 2, 30L)));
expect(taskClient.setEndOffsetsAsync("id2", ImmutableMap.of(0, 15L, 1, 25L, 2, 30L), true))
.andReturn(Futures.immediateFuture(true));
taskQueue.shutdown("id3");
expectLastCall().times(2);
replay(taskRunner, taskClient, taskQueue);
supervisor.gracefulShutdownInternal();
verifyAll();
}
private void addSomeEvents(int numEventsPerPartition) throws Exception
{
try (final KafkaProducer<byte[], byte[]> kafkaProducer = kafkaServer.newProducer()) {
@ -1203,8 +1501,17 @@ public class KafkaSupervisorTest extends EasyMockSupport
KafkaIndexTaskClientFactory taskClientFactory = new KafkaIndexTaskClientFactory(null, null)
{
@Override
public KafkaIndexTaskClient build(TaskInfoProvider taskLocationProvider)
public KafkaIndexTaskClient build(
TaskInfoProvider taskInfoProvider,
String dataSource,
int numThreads,
Duration httpTimeout,
long numRetries
)
{
Assert.assertEquals(TEST_CHAT_THREADS, numThreads);
Assert.assertEquals(TEST_HTTP_TIMEOUT.toStandardDuration(), httpTimeout);
Assert.assertEquals(TEST_CHAT_RETRIES, numRetries);
return taskClient;
}
};

View File

@ -0,0 +1,117 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexing.kafka.supervisor;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.druid.indexing.kafka.KafkaIndexTaskModule;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.segment.IndexSpec;
import io.druid.segment.indexing.TuningConfig;
import org.joda.time.Duration;
import org.joda.time.Period;
import org.junit.Assert;
import org.junit.Test;
import java.io.File;
public class KafkaSupervisorTuningConfigTest
{
private final ObjectMapper mapper;
public KafkaSupervisorTuningConfigTest()
{
mapper = new DefaultObjectMapper();
mapper.registerModules((Iterable<Module>) new KafkaIndexTaskModule().getJacksonModules());
}
@Test
public void testSerdeWithDefaults() throws Exception
{
String jsonStr = "{\"type\": \"kafka\"}";
KafkaSupervisorTuningConfig config = (KafkaSupervisorTuningConfig) mapper.readValue(
mapper.writeValueAsString(
mapper.readValue(
jsonStr,
TuningConfig.class
)
),
TuningConfig.class
);
Assert.assertNotNull(config.getBasePersistDirectory());
Assert.assertEquals(75000, config.getMaxRowsInMemory());
Assert.assertEquals(5_000_000, config.getMaxRowsPerSegment());
Assert.assertEquals(new Period("PT10M"), config.getIntermediatePersistPeriod());
Assert.assertEquals(0, config.getMaxPendingPersists());
Assert.assertEquals(new IndexSpec(), config.getIndexSpec());
Assert.assertEquals(false, config.getBuildV9Directly());
Assert.assertEquals(false, config.isReportParseExceptions());
Assert.assertEquals(0, config.getHandoffConditionTimeout());
Assert.assertNull(config.getWorkerThreads());
Assert.assertNull(config.getChatThreads());
Assert.assertEquals(8L, (long) config.getChatRetries());
Assert.assertEquals(Duration.standardSeconds(10), config.getHttpTimeout());
}
@Test
public void testSerdeWithNonDefaults() throws Exception
{
String jsonStr = "{\n"
+ " \"type\": \"kafka\",\n"
+ " \"basePersistDirectory\": \"/tmp/xxx\",\n"
+ " \"maxRowsInMemory\": 100,\n"
+ " \"maxRowsPerSegment\": 100,\n"
+ " \"intermediatePersistPeriod\": \"PT1H\",\n"
+ " \"maxPendingPersists\": 100,\n"
+ " \"buildV9Directly\": true,\n"
+ " \"reportParseExceptions\": true,\n"
+ " \"handoffConditionTimeout\": 100,\n"
+ " \"workerThreads\": 12,\n"
+ " \"chatThreads\": 13,\n"
+ " \"chatRetries\": 14,\n"
+ " \"httpTimeout\": \"PT15S\"\n"
+ "}";
KafkaSupervisorTuningConfig config = (KafkaSupervisorTuningConfig) mapper.readValue(
mapper.writeValueAsString(
mapper.readValue(
jsonStr,
TuningConfig.class
)
),
TuningConfig.class
);
Assert.assertEquals(new File("/tmp/xxx"), config.getBasePersistDirectory());
Assert.assertEquals(100, config.getMaxRowsInMemory());
Assert.assertEquals(100, config.getMaxRowsPerSegment());
Assert.assertEquals(new Period("PT1H"), config.getIntermediatePersistPeriod());
Assert.assertEquals(100, config.getMaxPendingPersists());
Assert.assertEquals(true, config.getBuildV9Directly());
Assert.assertEquals(true, config.isReportParseExceptions());
Assert.assertEquals(100, config.getHandoffConditionTimeout());
Assert.assertEquals(12, (int) config.getWorkerThreads());
Assert.assertEquals(13, (int) config.getChatThreads());
Assert.assertEquals(14L, (long) config.getChatRetries());
Assert.assertEquals(Duration.standardSeconds(15), config.getHttpTimeout());
}
}

View File

@ -23,8 +23,8 @@ RUN wget -q -O - http://www.us.apache.org/dist/zookeeper/zookeeper-3.4.6/zookeep
&& ln -s /usr/local/zookeeper-3.4.6 /usr/local/zookeeper
# Kafka
RUN wget -q -O - http://www.us.apache.org/dist/kafka/0.8.2.0/kafka_2.10-0.8.2.0.tgz | tar -xzf - -C /usr/local \
&& ln -s /usr/local/kafka_2.10-0.8.2.0 /usr/local/kafka
RUN wget -q -O - http://www.us.apache.org/dist/kafka/0.9.0.1/kafka_2.10-0.9.0.1.tgz | tar -xzf - -C /usr/local \
&& ln -s /usr/local/kafka_2.10-0.9.0.1 /usr/local/kafka
# Druid system user
RUN adduser --system --group --no-create-home druid \

View File

@ -65,6 +65,11 @@
<artifactId>mysql-metadata-storage</artifactId>
<version>${project.parent.version}</version>
</dependency>
<dependency>
<groupId>io.druid.extensions</groupId>
<artifactId>druid-kafka-indexing-service</artifactId>
<version>${project.parent.version}</version>
</dependency>
<dependency>
<groupId>io.druid</groupId>
<artifactId>druid-services</artifactId>
@ -109,6 +114,11 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.9.0.1</version>
</dependency>
</dependencies>
<build>

View File

@ -117,7 +117,8 @@ public class OverlordResourceTestClient
public TaskStatus.Status getTaskStatus(String taskID)
{
try {
StatusResponseHolder response = makeRequest( HttpMethod.GET,
StatusResponseHolder response = makeRequest(
HttpMethod.GET,
String.format(
"%stask/%s/status",
getIndexerURL(),
@ -158,7 +159,8 @@ public class OverlordResourceTestClient
private List<TaskResponseObject> getTasks(String identifier)
{
try {
StatusResponseHolder response = makeRequest( HttpMethod.GET,
StatusResponseHolder response = makeRequest(
HttpMethod.GET,
String.format("%s%s", getIndexerURL(), identifier)
);
LOG.info("Tasks %s response %s", identifier, response.getContent());
@ -176,8 +178,10 @@ public class OverlordResourceTestClient
public Map<String, String> shutDownTask(String taskID)
{
try {
StatusResponseHolder response = makeRequest( HttpMethod.POST,
String.format("%stask/%s/shutdown", getIndexerURL(),
StatusResponseHolder response = makeRequest(
HttpMethod.POST,
String.format(
"%stask/%s/shutdown", getIndexerURL(),
URLEncoder.encode(taskID, "UTF-8")
)
);
@ -220,6 +224,61 @@ public class OverlordResourceTestClient
);
}
public String submitSupervisor(String spec)
{
try {
StatusResponseHolder response = httpClient.go(
new Request(HttpMethod.POST, new URL(getIndexerURL() + "supervisor"))
.setContent(
"application/json",
spec.getBytes()
),
responseHandler
).get();
if (!response.getStatus().equals(HttpResponseStatus.OK)) {
throw new ISE(
"Error while submitting supervisor to overlord, response [%s %s]",
response.getStatus(),
response.getContent()
);
}
Map<String, String> responseData = jsonMapper.readValue(
response.getContent(), new TypeReference<Map<String, String>>()
{
}
);
String id = responseData.get("id");
LOG.info("Submitted supervisor with id[%s]", id);
return id;
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
public void shutdownSupervisor(String id)
{
try {
StatusResponseHolder response = httpClient.go(
new Request(
HttpMethod.POST, new URL(String.format("%ssupervisor/%s/shutdown", getIndexerURL(), id))
),
responseHandler
).get();
if (!response.getStatus().equals(HttpResponseStatus.OK)) {
throw new ISE(
"Error while shutting down supervisor, response [%s %s]",
response.getStatus(),
response.getContent()
);
}
LOG.info("Shutdown supervisor with id[%s]", id);
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
private StatusResponseHolder makeRequest(HttpMethod method, String url)
{
try {

View File

@ -0,0 +1,276 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.tests.indexer;
import com.google.common.base.Throwables;
import com.google.inject.Inject;
import com.metamx.common.ISE;
import com.metamx.common.logger.Logger;
import io.druid.testing.IntegrationTestingConfig;
import io.druid.testing.guice.DruidTestModuleFactory;
import io.druid.testing.utils.RetryUtil;
import io.druid.testing.utils.TestQueryHelper;
import kafka.admin.AdminUtils;
import kafka.common.TopicExistsException;
import kafka.utils.ZKStringSerializer$;
import org.I0Itec.zkclient.ZkClient;
import org.apache.commons.io.IOUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
import org.testng.annotations.AfterClass;
import org.testng.annotations.Guice;
import org.testng.annotations.Test;
import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.concurrent.Callable;
/*
* This is a test for the Kafka indexing service.
*/
@Guice(moduleFactory = DruidTestModuleFactory.class)
public class ITKafkaIndexingServiceTest extends AbstractIndexerTest
{
private static final Logger LOG = new Logger(ITKafkaIndexingServiceTest.class);
private static final int DELAY_BETWEEN_EVENTS_SECS = 5;
private static final String INDEXER_FILE = "/indexer/kafka_supervisor_spec.json";
private static final String QUERIES_FILE = "/indexer/kafka_index_queries.json";
private static final String DATASOURCE = "kafka_indexing_service_test";
private static final String TOPIC_NAME = "kafka_indexing_service_topic";
private static final int MINUTES_TO_SEND = 4;
// We'll fill in the current time and numbers for added, deleted and changed
// before sending the event.
final String event_template =
"{\"timestamp\": \"%s\"," +
"\"page\": \"Gypsy Danger\"," +
"\"language\" : \"en\"," +
"\"user\" : \"nuclear\"," +
"\"unpatrolled\" : \"true\"," +
"\"newPage\" : \"true\"," +
"\"robot\": \"false\"," +
"\"anonymous\": \"false\"," +
"\"namespace\":\"article\"," +
"\"continent\":\"North America\"," +
"\"country\":\"United States\"," +
"\"region\":\"Bay Area\"," +
"\"city\":\"San Francisco\"," +
"\"added\":%d," +
"\"deleted\":%d," +
"\"delta\":%d}";
private String supervisorId;
private ZkClient zkClient;
private Boolean segmentsExist; // to tell if we should remove segments during teardown
// format for the querying interval
private final DateTimeFormatter INTERVAL_FMT = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:'00Z'");
// format for the expected timestamp in a query response
private final DateTimeFormatter TIMESTAMP_FMT = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss'.000Z'");
private DateTime dtFirst; // timestamp of 1st event
private DateTime dtLast; // timestamp of last event
@Inject
private TestQueryHelper queryHelper;
@Inject
private IntegrationTestingConfig config;
@Test
public void testKafka()
{
LOG.info("Starting test: ITKafkaIndexingServiceTest");
// create topic
try {
int sessionTimeoutMs = 10000;
int connectionTimeoutMs = 10000;
String zkHosts = config.getZookeeperHosts();
zkClient = new ZkClient(
zkHosts, sessionTimeoutMs, connectionTimeoutMs,
ZKStringSerializer$.MODULE$
);
int numPartitions = 4;
int replicationFactor = 1;
Properties topicConfig = new Properties();
AdminUtils.createTopic(zkClient, TOPIC_NAME, numPartitions, replicationFactor, topicConfig);
}
catch (TopicExistsException e) {
// it's ok if the topic already exists
}
catch (Exception e) {
throw new ISE(e, "could not create kafka topic");
}
String spec;
try {
LOG.info("supervisorSpec name: [%s]", INDEXER_FILE);
spec = getTaskAsString(INDEXER_FILE)
.replaceAll("%%DATASOURCE%%", DATASOURCE)
.replaceAll("%%TOPIC%%", TOPIC_NAME)
.replaceAll("%%KAFKA_BROKER%%", config.getKafkaHost());
LOG.info("supervisorSpec: [%s]\n", spec);
}
catch (Exception e) {
LOG.error("could not read file [%s]", INDEXER_FILE);
throw new ISE(e, "could not read file [%s]", INDEXER_FILE);
}
// start supervisor
supervisorId = indexer.submitSupervisor(spec);
LOG.info("Submitted supervisor");
// set up kafka producer
Properties properties = new Properties();
properties.put("bootstrap.servers", config.getKafkaHost());
LOG.info("Kafka bootstrap.servers: [%s]", config.getKafkaHost());
properties.put("acks", "all");
properties.put("retries", "3");
KafkaProducer<String, String> producer = new KafkaProducer<>(
properties,
new StringSerializer(),
new StringSerializer()
);
DateTimeZone zone = DateTimeZone.forID("UTC");
// format for putting into events
DateTimeFormatter event_fmt = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss'Z'");
DateTime dt = new DateTime(zone); // timestamp to put on events
dtFirst = dt; // timestamp of 1st event
dtLast = dt; // timestamp of last event
// stop sending events when time passes this
DateTime dtStop = dtFirst.plusMinutes(MINUTES_TO_SEND).plusSeconds(30);
// these are used to compute the expected aggregations
int added = 0;
int num_events = 0;
// send data to kafka
while (dt.compareTo(dtStop) < 0) { // as long as we're within the time span
num_events++;
added += num_events;
// construct the event to send
String event = String.format(event_template, event_fmt.print(dt), num_events, 0, num_events);
LOG.info("sending event: [%s]", event);
try {
producer.send(new ProducerRecord<String, String>(TOPIC_NAME, event)).get();
}
catch (Exception ioe) {
throw Throwables.propagate(ioe);
}
try {
Thread.sleep(DELAY_BETWEEN_EVENTS_SECS * 1000);
}
catch (InterruptedException ex) { /* nothing */ }
dtLast = dt;
dt = new DateTime(zone);
}
producer.close();
InputStream is = ITKafkaIndexingServiceTest.class.getResourceAsStream(QUERIES_FILE);
if (null == is) {
throw new ISE("could not open query file: %s", QUERIES_FILE);
}
// put the timestamps into the query structure
String query_response_template;
try {
query_response_template = IOUtils.toString(is, "UTF-8");
}
catch (IOException e) {
throw new ISE(e, "could not read query file: %s", QUERIES_FILE);
}
String queryStr = query_response_template
.replaceAll("%%DATASOURCE%%", DATASOURCE)
.replace("%%TIMEBOUNDARY_RESPONSE_TIMESTAMP%%", TIMESTAMP_FMT.print(dtFirst))
.replace("%%TIMEBOUNDARY_RESPONSE_MAXTIME%%", TIMESTAMP_FMT.print(dtLast))
.replace("%%TIMEBOUNDARY_RESPONSE_MINTIME%%", TIMESTAMP_FMT.print(dtFirst))
.replace("%%TIMESERIES_QUERY_START%%", INTERVAL_FMT.print(dtFirst))
.replace("%%TIMESERIES_QUERY_END%%", INTERVAL_FMT.print(dtFirst.plusMinutes(MINUTES_TO_SEND + 2)))
.replace("%%TIMESERIES_RESPONSE_TIMESTAMP%%", TIMESTAMP_FMT.print(dtFirst))
.replace("%%TIMESERIES_ADDED%%", Integer.toString(added))
.replace("%%TIMESERIES_NUMEVENTS%%", Integer.toString(num_events));
// this query will probably be answered from the indexing tasks but possibly from 2 historical segments / 2 indexing
try {
this.queryHelper.testQueriesFromString(queryStr, 2);
}
catch (Exception e) {
throw Throwables.propagate(e);
}
indexer.shutdownSupervisor(supervisorId);
// wait for segments to be handed off
try {
RetryUtil.retryUntil(
new Callable<Boolean>()
{
@Override
public Boolean call() throws Exception
{
return coordinator.areSegmentsLoaded(DATASOURCE);
}
},
true,
30000,
10,
"Real-time generated segments loaded"
);
}
catch (Exception e) {
throw Throwables.propagate(e);
}
LOG.info("segments are present");
segmentsExist = true;
// this query will be answered by at least 1 historical segment, most likely 2, and possibly up to all 4
try {
this.queryHelper.testQueriesFromString(queryStr, 2);
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
@AfterClass
public void afterClass() throws Exception
{
LOG.info("teardown");
// delete kafka topic
AdminUtils.deleteTopic(zkClient, TOPIC_NAME);
// remove segments
if (segmentsExist) {
unloadAndKillData(DATASOURCE);
}
}
}

View File

@ -125,23 +125,19 @@ public class ITKafkaTest extends AbstractIndexerTest
throw new ISE(e, "could not create kafka topic");
}
String indexerSpec = "";
String indexerSpec;
// replace temp strings in indexer file
try {
LOG.info("indexerFile name: [%s]", INDEXER_FILE);
indexerSpec = getTaskAsString(INDEXER_FILE);
indexerSpec = indexerSpec.replaceAll("%%TOPIC%%", TOPIC_NAME);
indexerSpec = indexerSpec.replaceAll("%%ZOOKEEPER_SERVER%%", config.getZookeeperHosts());
indexerSpec = indexerSpec.replaceAll("%%GROUP_ID%%", Long.toString(System.currentTimeMillis()));
indexerSpec = indexerSpec.replaceAll(
indexerSpec = getTaskAsString(INDEXER_FILE)
.replaceAll("%%DATASOURCE%%", DATASOURCE)
.replaceAll("%%TOPIC%%", TOPIC_NAME)
.replaceAll("%%ZOOKEEPER_SERVER%%", config.getZookeeperHosts())
.replaceAll("%%GROUP_ID%%", Long.toString(System.currentTimeMillis()))
.replaceAll(
"%%SHUTOFFTIME%%",
new DateTime(
System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(
2
* MINUTES_TO_SEND
)
).toString()
new DateTime(System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(2 * MINUTES_TO_SEND)).toString()
);
LOG.info("indexerFile: [%s]\n", indexerSpec);
}
@ -217,11 +213,13 @@ public class ITKafkaTest extends AbstractIndexerTest
try {
query_response_template = IOUtils.toString(is, "UTF-8");
} catch (IOException e) {
}
catch (IOException e) {
throw new ISE(e, "could not read query file: %s", QUERIES_FILE);
}
String queryStr = query_response_template
.replaceAll("%%DATASOURCE%%", DATASOURCE)
// time boundary
.replace("%%TIMEBOUNDARY_RESPONSE_TIMESTAMP%%", TIMESTAMP_FMT.print(dtFirst))
.replace("%%TIMEBOUNDARY_RESPONSE_MAXTIME%%", TIMESTAMP_FMT.print(dtLast))
@ -236,7 +234,8 @@ public class ITKafkaTest extends AbstractIndexerTest
// this query will probably be answered from the realtime task
try {
this.queryHelper.testQueriesFromString(queryStr, 2);
} catch (Exception e) {
}
catch (Exception e) {
throw Throwables.propagate(e);
}

View File

@ -3,7 +3,7 @@
"description": "timeBoundary",
"query": {
"queryType":"timeBoundary",
"dataSource":"kafka_test"
"dataSource":"%%DATASOURCE%%"
},
"expectedResults":[
{
@ -19,7 +19,7 @@
"description": "timeseries",
"query": {
"queryType": "timeseries",
"dataSource": "kafka_test",
"dataSource": "%%DATASOURCE%%",
"intervals": [ "%%TIMESERIES_QUERY_START%%/%%TIMESERIES_QUERY_END%%" ],
"granularity": "all",
"aggregations": [

View File

@ -2,7 +2,7 @@
"type" : "index_realtime",
"spec" : {
"dataSchema": {
"dataSource": "kafka_test",
"dataSource": "%%DATASOURCE%%",
"parser" : {
"type" : "string",
"parseSpec" : {

View File

@ -0,0 +1,63 @@
{
"type": "kafka",
"dataSchema": {
"dataSource": "%%DATASOURCE%%",
"parser": {
"type": "string",
"parseSpec": {
"format": "json",
"timestampSpec": {
"column": "timestamp",
"format": "auto"
},
"dimensionsSpec": {
"dimensions": ["page", "language", "user", "unpatrolled", "newPage", "robot", "anonymous", "namespace", "continent", "country", "region", "city"],
"dimensionExclusions": [],
"spatialDimensions": []
}
}
},
"metricsSpec": [
{
"type": "count",
"name": "count"
},
{
"type": "doubleSum",
"name": "added",
"fieldName": "added"
},
{
"type": "doubleSum",
"name": "deleted",
"fieldName": "deleted"
},
{
"type": "doubleSum",
"name": "delta",
"fieldName": "delta"
}
],
"granularitySpec": {
"type": "uniform",
"segmentGranularity": "MINUTE",
"queryGranularity": "NONE"
}
},
"tuningConfig": {
"type": "kafka",
"intermediatePersistPeriod": "PT30S",
"maxRowsPerSegment": 5000000,
"maxRowsInMemory": 500000
},
"ioConfig": {
"topic": "%%TOPIC%%",
"consumerProperties": {
"bootstrap.servers": "%%KAFKA_BROKER%%"
},
"taskCount": 2,
"replicas": 1,
"taskDuration": "PT2M",
"useEarliestOffset": true
}
}