diff --git a/integration-tests/src/test/java/io/druid/tests/indexer/ITKafkaTest.java b/integration-tests/src/test/java/io/druid/tests/indexer/ITKafkaTest.java index 68ae5e4b3c6..137c7242555 100644 --- a/integration-tests/src/test/java/io/druid/tests/indexer/ITKafkaTest.java +++ b/integration-tests/src/test/java/io/druid/tests/indexer/ITKafkaTest.java @@ -21,7 +21,6 @@ package io.druid.tests.indexer; import com.google.common.base.Throwables; import com.google.inject.Inject; - import io.druid.java.util.common.ISE; import io.druid.java.util.common.logger.Logger; import io.druid.testing.IntegrationTestingConfig; @@ -48,7 +47,6 @@ import java.io.IOException; import java.io.InputStream; import java.util.Properties; import java.util.concurrent.Callable; -import java.util.concurrent.TimeUnit; /* * This is a test for the kafka firehose. @@ -126,32 +124,6 @@ public class ITKafkaTest extends AbstractIndexerTest throw new ISE(e, "could not create kafka topic"); } - String indexerSpec; - - // replace temp strings in indexer file - try { - LOG.info("indexerFile name: [%s]", INDEXER_FILE); - indexerSpec = getTaskAsString(INDEXER_FILE) - .replaceAll("%%DATASOURCE%%", DATASOURCE) - .replaceAll("%%TOPIC%%", TOPIC_NAME) - .replaceAll("%%ZOOKEEPER_SERVER%%", config.getZookeeperHosts()) - .replaceAll("%%GROUP_ID%%", Long.toString(System.currentTimeMillis())) - .replaceAll( - "%%SHUTOFFTIME%%", - new DateTime(System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(2 * MINUTES_TO_SEND)).toString() - ); - LOG.info("indexerFile: [%s]\n", indexerSpec); - } - catch (Exception e) { - // log here so the message will appear in the console output - LOG.error("could not read indexer file [%s]", INDEXER_FILE); - throw new ISE(e, "could not read indexer file [%s]", INDEXER_FILE); - } - - // start indexing task - taskID = indexer.submitTask(indexerSpec); - LOG.info("-------------SUBMITTED TASK"); - // set up kafka producer Properties properties = new Properties(); properties.put("metadata.broker.list", config.getKafkaHost()); @@ -169,21 +141,18 @@ public class ITKafkaTest extends AbstractIndexerTest DateTime dt = new DateTime(zone); // timestamp to put on events dtFirst = dt; // timestamp of 1st event dtLast = dt; // timestamp of last event - // stop sending events when time passes this - DateTime dtStop = dtFirst.plusMinutes(MINUTES_TO_SEND).plusSeconds(30); // these are used to compute the expected aggregations int added = 0; - int num_events = 0; + int num_events = 10; // send data to kafka - while (dt.compareTo(dtStop) < 0) { // as long as we're within the time span - num_events++; - added += num_events; + for (int i = 0; i < num_events; i++) { + added += i; // construct the event to send String event = String.format( event_template, - event_fmt.print(dt), num_events, 0, num_events + event_fmt.print(dt), i, 0, i ); LOG.info("sending event: [%s]", event); try { @@ -195,16 +164,61 @@ public class ITKafkaTest extends AbstractIndexerTest throw Throwables.propagate(ioe); } - try { - Thread.sleep(DELAY_BETWEEN_EVENTS_SECS * 1000); - } - catch (InterruptedException ex) { /* nothing */ } dtLast = dt; dt = new DateTime(zone); } producer.close(); + String indexerSpec; + + // replace temp strings in indexer file + try { + LOG.info("indexerFile name: [%s]", INDEXER_FILE); + indexerSpec = getTaskAsString(INDEXER_FILE) + .replaceAll("%%DATASOURCE%%", DATASOURCE) + .replaceAll("%%TOPIC%%", TOPIC_NAME) + .replaceAll("%%ZOOKEEPER_SERVER%%", config.getZookeeperHosts()) + .replaceAll("%%GROUP_ID%%", Long.toString(System.currentTimeMillis())) + .replaceAll("%%COUNT%%", Integer.toString(num_events)); + LOG.info("indexerFile: [%s]\n", indexerSpec); + } + catch (Exception e) { + // log here so the message will appear in the console output + LOG.error("could not read indexer file [%s]", INDEXER_FILE); + throw new ISE(e, "could not read indexer file [%s]", INDEXER_FILE); + } + + // start indexing task + taskID = indexer.submitTask(indexerSpec); + LOG.info("-------------SUBMITTED TASK"); + + // wait for the task to finish + indexer.waitUntilTaskCompletes (taskID, 20000, 30); + + // wait for segments to be handed off + try { + RetryUtil.retryUntil( + new Callable() + { + @Override + public Boolean call() throws Exception + { + return coordinator.areSegmentsLoaded(DATASOURCE); + } + }, + true, + 30000, + 10, + "Real-time generated segments loaded" + ); + } + catch (Exception e) { + throw Throwables.propagate(e); + } + LOG.info("segments are present"); + segmentsExist = true; + // put the timestamps into the query structure String query_response_template = null; InputStream is = ITKafkaTest.class.getResourceAsStream(QUERIES_FILE); @@ -239,37 +253,6 @@ public class ITKafkaTest extends AbstractIndexerTest catch (Exception e) { throw Throwables.propagate(e); } - - // wait for segments to be handed off - try { - RetryUtil.retryUntil( - new Callable() - { - @Override - public Boolean call() throws Exception - { - return coordinator.areSegmentsLoaded(DATASOURCE); - } - }, - true, - 30000, - 10, - "Real-time generated segments loaded" - ); - } - catch (Exception e) { - throw Throwables.propagate(e); - } - LOG.info("segments are present"); - segmentsExist = true; - - // this query will be answered by historical - try { - this.queryHelper.testQueriesFromString(queryStr, 2); - } - catch (Exception e) { - throw Throwables.propagate(e); - } } @AfterClass @@ -277,9 +260,6 @@ public class ITKafkaTest extends AbstractIndexerTest { LOG.info("teardown"); - // wait for the task to complete - indexer.waitUntilTaskCompletes(taskID); - // delete kafka topic AdminUtils.deleteTopic(zkClient, TOPIC_NAME); diff --git a/integration-tests/src/test/resources/indexer/kafka_index_task.json b/integration-tests/src/test/resources/indexer/kafka_index_task.json index 282133af53f..9674570c805 100644 --- a/integration-tests/src/test/resources/indexer/kafka_index_task.json +++ b/integration-tests/src/test/resources/indexer/kafka_index_task.json @@ -48,8 +48,8 @@ "ioConfig" : { "type" : "realtime", "firehose": { - "type": "timed", - "shutoffTime": "%%SHUTOFFTIME%%", + "type": "fixedCount", + "count": "%%COUNT%%", "delegate": { "type": "kafka-0.8", "consumerProps": { diff --git a/server/src/main/java/io/druid/guice/FirehoseModule.java b/server/src/main/java/io/druid/guice/FirehoseModule.java index 670121240a7..dd8a605af96 100644 --- a/server/src/main/java/io/druid/guice/FirehoseModule.java +++ b/server/src/main/java/io/druid/guice/FirehoseModule.java @@ -27,6 +27,7 @@ import io.druid.initialization.DruidModule; import io.druid.segment.realtime.firehose.ClippedFirehoseFactory; import io.druid.segment.realtime.firehose.CombiningFirehoseFactory; import io.druid.segment.realtime.firehose.EventReceiverFirehoseFactory; +import io.druid.segment.realtime.firehose.FixedCountFirehoseFactory; import io.druid.segment.realtime.firehose.IrcFirehoseFactory; import io.druid.segment.realtime.firehose.LocalFirehoseFactory; import io.druid.segment.realtime.firehose.TimedShutoffFirehoseFactory; @@ -54,7 +55,8 @@ public class FirehoseModule implements DruidModule new NamedType(IrcFirehoseFactory.class, "irc"), new NamedType(LocalFirehoseFactory.class, "local"), new NamedType(EventReceiverFirehoseFactory.class, "receiver"), - new NamedType(CombiningFirehoseFactory.class, "combining") + new NamedType(CombiningFirehoseFactory.class, "combining"), + new NamedType(FixedCountFirehoseFactory.class, "fixedCount") ) ); } diff --git a/server/src/main/java/io/druid/segment/realtime/firehose/FixedCountFirehoseFactory.java b/server/src/main/java/io/druid/segment/realtime/firehose/FixedCountFirehoseFactory.java new file mode 100644 index 00000000000..300b6d13922 --- /dev/null +++ b/server/src/main/java/io/druid/segment/realtime/firehose/FixedCountFirehoseFactory.java @@ -0,0 +1,96 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.segment.realtime.firehose; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; +import io.druid.data.input.Firehose; +import io.druid.data.input.FirehoseFactory; +import io.druid.data.input.InputRow; +import io.druid.data.input.impl.InputRowParser; + +import java.io.IOException; + +/** + * Firehose to give out only first n events from the delegate firehose. + */ +public class FixedCountFirehoseFactory implements FirehoseFactory +{ + private final FirehoseFactory delegate; + private final int count; + + @JsonCreator + public FixedCountFirehoseFactory( + @JsonProperty("delegate") FirehoseFactory delegate, + @JsonProperty("count") int count + ) + { + this.delegate = delegate; + this.count = count; + } + + @JsonProperty + public FirehoseFactory getDelegate() + { + return delegate; + } + + @JsonProperty + public int getCount() + { + return count; + } + + @Override + public Firehose connect(final InputRowParser parser) throws IOException + { + return new Firehose() + { + private int i = 0; + private Firehose delegateFirehose = delegate.connect(parser); + + @Override + public boolean hasMore() + { + return i < count && delegateFirehose.hasMore(); + } + + @Override + public InputRow nextRow() + { + Preconditions.checkArgument(i++ < count, "Max events limit reached."); + return delegateFirehose.nextRow(); + } + + @Override + public Runnable commit() + { + return delegateFirehose.commit(); + } + + @Override + public void close() throws IOException + { + delegateFirehose.close(); + } + }; + } +}