From 66e183eeab234ecec24a98d3e57b6d543e8058dc Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Wed, 7 Feb 2018 11:19:25 -0800 Subject: [PATCH] Integration test for AppenderatorDriverRealtimeIndexTask (#5355) --- integration-tests/docker/Dockerfile | 9 +- .../AbstractITRealtimeIndexTaskTest.java | 178 ++++++++++++++++++ ...penderatorDriverRealtimeIndexTaskTest.java | 144 ++++++++++++++ .../indexer/ITRealtimeIndexTaskTest.java | 147 ++------------- ...a_realtime_appenderator_index_queries.json | 87 +++++++++ ...edia_realtime_appenderator_index_task.json | 93 +++++++++ 6 files changed, 521 insertions(+), 137 deletions(-) create mode 100644 integration-tests/src/test/java/io/druid/tests/indexer/AbstractITRealtimeIndexTaskTest.java create mode 100644 integration-tests/src/test/java/io/druid/tests/indexer/ITAppenderatorDriverRealtimeIndexTaskTest.java create mode 100644 integration-tests/src/test/resources/indexer/wikipedia_realtime_appenderator_index_queries.json create mode 100644 integration-tests/src/test/resources/indexer/wikipedia_realtime_appenderator_index_task.json diff --git a/integration-tests/docker/Dockerfile b/integration-tests/docker/Dockerfile index 88cadeb440a..9edc5a5d642 100644 --- a/integration-tests/docker/Dockerfile +++ b/integration-tests/docker/Dockerfile @@ -2,7 +2,8 @@ FROM imply/druiditbase # Setup metadata store -RUN /etc/init.d/mysql start \ +# touch is needed because OverlayFS's copy-up operation breaks POSIX standards. See https://github.com/docker/for-linux/issues/72. +RUN find /var/lib/mysql -type f -exec touch {} \; && /etc/init.d/mysql start \ && echo "GRANT ALL ON druid.* TO 'druid'@'%' IDENTIFIED BY 'diurd'; CREATE database druid DEFAULT CHARACTER SET utf8;" | mysql -u root \ && /etc/init.d/mysql stop @@ -10,11 +11,13 @@ RUN /etc/init.d/mysql start \ ADD lib/* /usr/local/druid/lib/ # Add sample data -RUN /etc/init.d/mysql start \ +# touch is needed because OverlayFS's copy-up operation breaks POSIX standards. See https://github.com/docker/for-linux/issues/72. +RUN find /var/lib/mysql -type f -exec touch {} \; && service mysql start \ && java -cp "/usr/local/druid/lib/*" -Ddruid.metadata.storage.type=mysql io.druid.cli.Main tools metadata-init --connectURI="jdbc:mysql://localhost:3306/druid" --user=druid --password=diurd \ && /etc/init.d/mysql stop ADD sample-data.sql sample-data.sql -RUN /etc/init.d/mysql start \ +# touch is needed because OverlayFS's copy-up operation breaks POSIX standards. See https://github.com/docker/for-linux/issues/72. +RUN find /var/lib/mysql -type f -exec touch {} \; && service mysql start \ && cat sample-data.sql | mysql -u root druid \ && /etc/init.d/mysql stop diff --git a/integration-tests/src/test/java/io/druid/tests/indexer/AbstractITRealtimeIndexTaskTest.java b/integration-tests/src/test/java/io/druid/tests/indexer/AbstractITRealtimeIndexTaskTest.java new file mode 100644 index 00000000000..499f4f1dad4 --- /dev/null +++ b/integration-tests/src/test/java/io/druid/tests/indexer/AbstractITRealtimeIndexTaskTest.java @@ -0,0 +1,178 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package io.druid.tests.indexer; + +import com.google.common.base.Throwables; +import com.google.inject.Inject; +import io.druid.curator.discovery.ServerDiscoveryFactory; +import io.druid.java.util.common.DateTimes; +import io.druid.java.util.common.ISE; +import io.druid.java.util.common.StringUtils; +import io.druid.java.util.common.logger.Logger; +import io.druid.java.util.http.client.HttpClient; +import io.druid.testing.IntegrationTestingConfig; +import io.druid.testing.guice.TestClient; +import io.druid.testing.utils.RetryUtil; +import org.apache.commons.io.IOUtils; +import org.joda.time.DateTime; +import org.joda.time.format.DateTimeFormat; +import org.joda.time.format.DateTimeFormatter; + +import java.io.InputStream; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; + +/** + * Steps + * 1) Submit a realtime index task + * 2) Load data using EventReceiverFirehose + * 3) Run queries to verify that the ingested data is available for queries + * 4) Wait for handover of the segment to historical node + * 5) Query data (will be from historical node) + * 6) Disable and delete the created data segment + */ +public abstract class AbstractITRealtimeIndexTaskTest extends AbstractIndexerTest +{ + static final String EVENT_RECEIVER_SERVICE_NAME = "eventReceiverServiceName"; + static final String EVENT_DATA_FILE = "/indexer/wikipedia_realtime_index_data.json"; + + private static final Logger LOG = new Logger(AbstractITRealtimeIndexTaskTest.class); + private static final String INDEX_DATASOURCE = "wikipedia_index_test"; + + static final int DELAY_BETWEEN_EVENTS_SECS = 4; + String taskID; + final String TIME_PLACEHOLDER = "YYYY-MM-DDTHH:MM:SS"; + // format for putting datestamp into events + final DateTimeFormatter EVENT_FMT = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss"); + // format for the querying interval + final DateTimeFormatter INTERVAL_FMT = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:'00Z'"); + // format for the expected timestamp in a query response + final DateTimeFormatter TIMESTAMP_FMT = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss'.000Z'"); + DateTime dtFirst; // timestamp of 1st event + DateTime dtLast; // timestamp of last event + DateTime dtGroupBy; // timestamp for expected response for groupBy query + + @Inject + ServerDiscoveryFactory factory; + @Inject + @TestClient + HttpClient httpClient; + + @Inject + IntegrationTestingConfig config; + + void doTest() throws Exception + { + LOG.info("Starting test: ITRealtimeIndexTaskTest"); + try { + // the task will run for 3 minutes and then shutdown itself + String task = setShutOffTime( + getTaskAsString(getTaskResource()), + DateTimes.utc(System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(3)) + ); + LOG.info("indexerSpec: [%s]\n", task); + taskID = indexer.submitTask(task); + + // this posts 22 events, one every 4 seconds + // each event contains the current time as its timestamp except + // the timestamp for the 14th event is early enough that the event should be ignored + // the timestamp for the 18th event is 2 seconds earlier than the 17th + postEvents(); + + // sleep for a while to let the events be ingested + TimeUnit.SECONDS.sleep(5); + + // put the timestamps into the query structure + String query_response_template = null; + InputStream is = ITRealtimeIndexTaskTest.class.getResourceAsStream(getQueriesResource()); + if (null == is) { + throw new ISE("could not open query file: %s", getQueriesResource()); + } + query_response_template = IOUtils.toString(is, "UTF-8"); + + String queryStr = query_response_template + .replace("%%TIMEBOUNDARY_RESPONSE_TIMESTAMP%%", TIMESTAMP_FMT.print(dtFirst)) + .replace("%%TIMEBOUNDARY_RESPONSE_MAXTIME%%", TIMESTAMP_FMT.print(dtLast)) + .replace("%%TIMEBOUNDARY_RESPONSE_MINTIME%%", TIMESTAMP_FMT.print(dtFirst)) + .replace("%%TIMESERIES_QUERY_START%%", INTERVAL_FMT.print(dtFirst)) + .replace("%%TIMESERIES_QUERY_END%%", INTERVAL_FMT.print(dtLast.plusMinutes(2))) + .replace("%%TIMESERIES_RESPONSE_TIMESTAMP%%", TIMESTAMP_FMT.print(dtFirst)) + .replace("%%POST_AG_REQUEST_START%%", INTERVAL_FMT.print(dtFirst)) + .replace("%%POST_AG_REQUEST_END%%", INTERVAL_FMT.print(dtLast.plusMinutes(2))) + .replace("%%POST_AG_RESPONSE_TIMESTAMP%%", TIMESTAMP_FMT.print(dtGroupBy.withSecondOfMinute(0)) + ); + + // should hit the queries all on realtime task or some on realtime task + // and some on historical. Which it is depends on where in the minute we were + // when we started posting events. + try { + this.queryHelper.testQueriesFromString(getRouterURL(), queryStr, 2); + } + catch (Exception e) { + throw Throwables.propagate(e); + } + + // wait for the task to complete + indexer.waitUntilTaskCompletes(taskID); + + // task should complete only after the segments are loaded by historical node + RetryUtil.retryUntil( + new Callable() + { + @Override + public Boolean call() throws Exception + { + return coordinator.areSegmentsLoaded(INDEX_DATASOURCE); + } + }, + true, + 60000, + 10, + "Real-time generated segments loaded" + ); + + // queries should be answered by historical + this.queryHelper.testQueriesFromString(getRouterURL(), queryStr, 2); + } + catch (Exception e) { + throw Throwables.propagate(e); + } + finally { + unloadAndKillData(INDEX_DATASOURCE); + } + } + + String setShutOffTime(String taskAsString, DateTime time) + { + return taskAsString.replace("#SHUTOFFTIME", time.toString()); + } + + String getRouterURL() + { + return StringUtils.format( + "%s/druid/v2?pretty", + config.getRouterUrl() + ); + } + + abstract String getTaskResource(); + abstract String getQueriesResource(); + + abstract void postEvents() throws Exception; +} diff --git a/integration-tests/src/test/java/io/druid/tests/indexer/ITAppenderatorDriverRealtimeIndexTaskTest.java b/integration-tests/src/test/java/io/druid/tests/indexer/ITAppenderatorDriverRealtimeIndexTaskTest.java new file mode 100644 index 00000000000..d7c074a8352 --- /dev/null +++ b/integration-tests/src/test/java/io/druid/tests/indexer/ITAppenderatorDriverRealtimeIndexTaskTest.java @@ -0,0 +1,144 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package io.druid.tests.indexer; + +import com.google.common.base.Throwables; +import io.druid.curator.discovery.ServerDiscoverySelector; +import io.druid.java.util.common.DateTimes; +import io.druid.java.util.common.ISE; +import io.druid.java.util.common.jackson.JacksonUtils; +import io.druid.java.util.common.logger.Logger; +import io.druid.testing.clients.EventReceiverFirehoseTestClient; +import io.druid.testing.guice.DruidTestModuleFactory; +import io.druid.testing.utils.ServerDiscoveryUtil; +import org.joda.time.DateTime; +import org.testng.annotations.Guice; +import org.testng.annotations.Test; + +import javax.ws.rs.core.MediaType; +import java.io.BufferedReader; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Map; + +/** + * See {@link AbstractITRealtimeIndexTaskTest} for test details. + */ +@Guice(moduleFactory = DruidTestModuleFactory.class) +public class ITAppenderatorDriverRealtimeIndexTaskTest extends AbstractITRealtimeIndexTaskTest +{ + private static final Logger LOG = new Logger(ITAppenderatorDriverRealtimeIndexTaskTest.class); + private static final String REALTIME_TASK_RESOURCE = "/indexer/wikipedia_realtime_appenderator_index_task.json"; + private static final String REALTIME_QUERIES_RESOURCE = "/indexer/wikipedia_realtime_appenderator_index_queries.json"; + + @Test + public void testRealtimeIndexTask() throws Exception + { + doTest(); + } + + @Override + public void postEvents() throws Exception + { + final ServerDiscoverySelector eventReceiverSelector = factory.createSelector(EVENT_RECEIVER_SERVICE_NAME); + eventReceiverSelector.start(); + BufferedReader reader = null; + InputStreamReader isr = null; + try { + isr = new InputStreamReader( + ITRealtimeIndexTaskTest.class.getResourceAsStream(EVENT_DATA_FILE), + StandardCharsets.UTF_8 + ); + } + catch (Exception e) { + throw Throwables.propagate(e); + } + try { + reader = new BufferedReader(isr); + ServerDiscoveryUtil.waitUntilInstanceReady(eventReceiverSelector, "Event Receiver"); + // Use the host from the config file and the port announced in zookeeper + String host = config.getMiddleManagerHost() + ":" + eventReceiverSelector.pick().getPort(); + LOG.info("Event Receiver Found at host [%s]", host); + EventReceiverFirehoseTestClient client = new EventReceiverFirehoseTestClient( + host, + EVENT_RECEIVER_SERVICE_NAME, + jsonMapper, + httpClient, + smileMapper + ); + // there are 22 lines in the file + int i = 1; + DateTime dt = DateTimes.nowUtc(); // timestamp used for sending each event + dtFirst = dt; + dtLast = dt; + String line; + while ((line = reader.readLine()) != null) { + if (i == 15) { // for the 15th line, use a time before the window + dt = dt.minusMinutes(10); + dtFirst = dt; // oldest timestamp + } else if (i == 16) { // remember this time to use in the expected response from the groupBy query + dtGroupBy = dt; + } else if (i == 18) { // use a time 6 seconds ago so it will be out of order + dt = dt.minusSeconds(6); + } + String event = line.replace(TIME_PLACEHOLDER, EVENT_FMT.print(dt)); + LOG.info("sending event: [%s]\n", event); + Collection> events = new ArrayList>(); + events.add( + this.jsonMapper.readValue( + event, JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT + ) + ); + int eventsPosted = client.postEvents(events, this.jsonMapper, MediaType.APPLICATION_JSON); + if (eventsPosted != events.size()) { + throw new ISE("Event not posted"); + } + + try { + Thread.sleep(DELAY_BETWEEN_EVENTS_SECS * 1000); + } + catch (InterruptedException ex) { /* nothing */ } + dtLast = dt; // latest timestamp + dt = DateTimes.nowUtc(); + i++; + } + } + catch (Exception e) { + throw Throwables.propagate(e); + } + finally { + reader.close(); + eventReceiverSelector.stop(); + } + } + + @Override + String getTaskResource() + { + return REALTIME_TASK_RESOURCE; + } + + @Override + String getQueriesResource() + { + return REALTIME_QUERIES_RESOURCE; + } +} diff --git a/integration-tests/src/test/java/io/druid/tests/indexer/ITRealtimeIndexTaskTest.java b/integration-tests/src/test/java/io/druid/tests/indexer/ITRealtimeIndexTaskTest.java index 54d31540b20..fdac3ba3f3f 100644 --- a/integration-tests/src/test/java/io/druid/tests/indexer/ITRealtimeIndexTaskTest.java +++ b/integration-tests/src/test/java/io/druid/tests/indexer/ITRealtimeIndexTaskTest.java @@ -20,167 +20,55 @@ package io.druid.tests.indexer; import com.google.common.base.Throwables; -import com.google.inject.Inject; -import io.druid.java.util.http.client.HttpClient; -import io.druid.curator.discovery.ServerDiscoveryFactory; import io.druid.curator.discovery.ServerDiscoverySelector; import io.druid.java.util.common.DateTimes; import io.druid.java.util.common.ISE; -import io.druid.java.util.common.StringUtils; import io.druid.java.util.common.jackson.JacksonUtils; import io.druid.java.util.common.logger.Logger; -import io.druid.testing.IntegrationTestingConfig; import io.druid.testing.clients.EventReceiverFirehoseTestClient; import io.druid.testing.guice.DruidTestModuleFactory; -import io.druid.testing.guice.TestClient; -import io.druid.testing.utils.RetryUtil; import io.druid.testing.utils.ServerDiscoveryUtil; -import org.apache.commons.io.IOUtils; import org.joda.time.DateTime; -import org.joda.time.format.DateTimeFormat; -import org.joda.time.format.DateTimeFormatter; import org.testng.annotations.Guice; import org.testng.annotations.Test; import javax.ws.rs.core.MediaType; import java.io.BufferedReader; -import java.io.InputStream; import java.io.InputStreamReader; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collection; import java.util.Map; -import java.util.concurrent.Callable; -import java.util.concurrent.TimeUnit; /** - * Steps - * 1) Submit a realtime index task - * 2) Load data using EventReceiverFirehose - * 3) Run queries to verify that the ingested data is available for queries - * 4) Wait for handover of the segment to historical node - * 5) Query data (will be from historical node) - * 6) Disable and delete the created data segment + * See {@link AbstractITRealtimeIndexTaskTest} for test details. */ @Guice(moduleFactory = DruidTestModuleFactory.class) -public class ITRealtimeIndexTaskTest extends AbstractIndexerTest +public class ITRealtimeIndexTaskTest extends AbstractITRealtimeIndexTaskTest { private static final Logger LOG = new Logger(ITRealtimeIndexTaskTest.class); private static final String REALTIME_TASK_RESOURCE = "/indexer/wikipedia_realtime_index_task.json"; - private static final String EVENT_RECEIVER_SERVICE_NAME = "eventReceiverServiceName"; - private static final String EVENT_DATA_FILE = "/indexer/wikipedia_realtime_index_data.json"; private static final String REALTIME_QUERIES_RESOURCE = "/indexer/wikipedia_realtime_index_queries.json"; - private static final String INDEX_DATASOURCE = "wikipedia_index_test"; - - private static final int DELAY_BETWEEN_EVENTS_SECS = 4; - private String taskID; - private final String TIME_PLACEHOLDER = "YYYY-MM-DDTHH:MM:SS"; - // format for putting datestamp into events - private final DateTimeFormatter EVENT_FMT = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss"); - // format for the querying interval - private final DateTimeFormatter INTERVAL_FMT = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:'00Z'"); - // format for the expected timestamp in a query response - private final DateTimeFormatter TIMESTAMP_FMT = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss'.000Z'"); - private DateTime dtFirst; // timestamp of 1st event - private DateTime dtLast; // timestamp of last event - private DateTime dtGroupBy; // timestamp for expected response for groupBy query - - @Inject - ServerDiscoveryFactory factory; - @Inject - @TestClient - HttpClient httpClient; - - @Inject - IntegrationTestingConfig config; @Test public void testRealtimeIndexTask() throws Exception { - LOG.info("Starting test: ITRealtimeIndexTaskTest"); - try { - // the task will run for 3 minutes and then shutdown itself - String task = setShutOffTime( - getTaskAsString(REALTIME_TASK_RESOURCE), - DateTimes.utc(System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(3)) - ); - LOG.info("indexerSpec: [%s]\n", task); - taskID = indexer.submitTask(task); - - // this posts 22 events, one every 4 seconds - // each event contains the current time as its timestamp except - // the timestamp for the 14th event is early enough that the event should be ignored - // the timestamp for the 18th event is 2 seconds earlier than the 17th - postEvents(); - - // sleep for a while to let the events be ingested - TimeUnit.SECONDS.sleep(5); - - // put the timestamps into the query structure - String query_response_template = null; - InputStream is = ITRealtimeIndexTaskTest.class.getResourceAsStream(REALTIME_QUERIES_RESOURCE); - if (null == is) { - throw new ISE("could not open query file: %s", REALTIME_QUERIES_RESOURCE); - } - query_response_template = IOUtils.toString(is, "UTF-8"); - - String queryStr = query_response_template - .replace("%%TIMEBOUNDARY_RESPONSE_TIMESTAMP%%", TIMESTAMP_FMT.print(dtFirst)) - .replace("%%TIMEBOUNDARY_RESPONSE_MAXTIME%%", TIMESTAMP_FMT.print(dtLast)) - .replace("%%TIMEBOUNDARY_RESPONSE_MINTIME%%", TIMESTAMP_FMT.print(dtFirst)) - .replace("%%TIMESERIES_QUERY_START%%", INTERVAL_FMT.print(dtFirst)) - .replace("%%TIMESERIES_QUERY_END%%", INTERVAL_FMT.print(dtLast.plusMinutes(2))) - .replace("%%TIMESERIES_RESPONSE_TIMESTAMP%%", TIMESTAMP_FMT.print(dtFirst)) - .replace("%%POST_AG_REQUEST_START%%", INTERVAL_FMT.print(dtFirst)) - .replace("%%POST_AG_REQUEST_END%%", INTERVAL_FMT.print(dtLast.plusMinutes(2))) - .replace("%%POST_AG_RESPONSE_TIMESTAMP%%", TIMESTAMP_FMT.print(dtGroupBy.withSecondOfMinute(0)) - ); - - // should hit the queries all on realtime task or some on realtime task - // and some on historical. Which it is depends on where in the minute we were - // when we started posting events. - try { - this.queryHelper.testQueriesFromString(getRouterURL(), queryStr, 2); - } - catch (Exception e) { - throw Throwables.propagate(e); - } - - // wait for the task to complete - indexer.waitUntilTaskCompletes(taskID); - - // task should complete only after the segments are loaded by historical node - RetryUtil.retryUntil( - new Callable() - { - @Override - public Boolean call() throws Exception - { - return coordinator.areSegmentsLoaded(INDEX_DATASOURCE); - } - }, - true, - 60000, - 10, - "Real-time generated segments loaded" - ); - - // queries should be answered by historical - this.queryHelper.testQueriesFromString(getRouterURL(), queryStr, 2); - } - catch (Exception e) { - throw Throwables.propagate(e); - } - finally { - unloadAndKillData(INDEX_DATASOURCE); - } + doTest(); } - private String setShutOffTime(String taskAsString, DateTime time) + @Override + String getTaskResource() { - return taskAsString.replace("#SHUTOFFTIME", time.toString()); + return REALTIME_TASK_RESOURCE; } + @Override + String getQueriesResource() + { + return REALTIME_QUERIES_RESOURCE; + } + + @Override public void postEvents() throws Exception { final ServerDiscoverySelector eventReceiverSelector = factory.createSelector(EVENT_RECEIVER_SERVICE_NAME); @@ -253,13 +141,4 @@ public class ITRealtimeIndexTaskTest extends AbstractIndexerTest eventReceiverSelector.stop(); } } - - private String getRouterURL() - { - return StringUtils.format( - "%s/druid/v2?pretty", - config.getRouterUrl() - ); - } - } diff --git a/integration-tests/src/test/resources/indexer/wikipedia_realtime_appenderator_index_queries.json b/integration-tests/src/test/resources/indexer/wikipedia_realtime_appenderator_index_queries.json new file mode 100644 index 00000000000..acd88ca893e --- /dev/null +++ b/integration-tests/src/test/resources/indexer/wikipedia_realtime_appenderator_index_queries.json @@ -0,0 +1,87 @@ +[ + { + "description": "timeBoundary", + "query": { + "queryType":"timeBoundary", + "dataSource":"wikipedia_index_test" + }, + "expectedResults":[ + { + "timestamp":"%%TIMEBOUNDARY_RESPONSE_TIMESTAMP%%", + "result": { + "maxTime" : "%%TIMEBOUNDARY_RESPONSE_MAXTIME%%", + "minTime":"%%TIMEBOUNDARY_RESPONSE_MINTIME%%" + } + } + ] + }, + { + "description": "timeseries", + "query": { + "queryType": "timeseries", + "dataSource": "wikipedia_index_test", + "intervals": [ "%%TIMESERIES_QUERY_START%%/%%TIMESERIES_QUERY_END%%" ], + "granularity": "all", + "aggregations": [ + {"type": "longSum", "fieldName": "count", "name": "edit_count"}, + {"type": "doubleSum", "fieldName": "added", "name": "chars_added"} + ] + }, + "expectedResults": [ + { + "timestamp" : "%%TIMESERIES_RESPONSE_TIMESTAMP%%", + "result" : { + "chars_added" : 1642.0, + "edit_count" : 22 + } + } + ] + }, + { + "description":"having spec on post aggregation", + "query":{ + "queryType":"groupBy", + "dataSource":"wikipedia_index_test", + "granularity":"minute", + "dimensions":[ + "page" + ], + "aggregations":[ + { + "type":"count", + "name":"rows" + }, + { + "type":"longSum", + "fieldName":"added", + "name":"added_count" + } + ], + "postAggregations": [ + { + "type":"arithmetic", + "name":"added_count_times_ten", + "fn":"*", + "fields":[ + {"type":"fieldAccess", "name":"added_count", "fieldName":"added_count"}, + {"type":"constant", "name":"const", "value":10} + ] + } + ], + "having":{"type":"greaterThan", "aggregation":"added_count_times_ten", "value":9000}, + "intervals":[ + "%%POST_AG_REQUEST_START%%/%%POST_AG_REQUEST_END%%" + ] + }, + "expectedResults":[ { + "version" : "v1", + "timestamp" : "%%POST_AG_RESPONSE_TIMESTAMP%%", + "event" : { + "added_count_times_ten" : 9050.0, + "page" : "Crimson Typhoon", + "added_count" : 905, + "rows" : 1 + } + } ] + } +] diff --git a/integration-tests/src/test/resources/indexer/wikipedia_realtime_appenderator_index_task.json b/integration-tests/src/test/resources/indexer/wikipedia_realtime_appenderator_index_task.json new file mode 100644 index 00000000000..91f39221082 --- /dev/null +++ b/integration-tests/src/test/resources/indexer/wikipedia_realtime_appenderator_index_task.json @@ -0,0 +1,93 @@ +{ + "type": "index_realtime_appenderator", + "spec": { + "dataSchema": { + "dataSource": "wikipedia_index_test", + "metricsSpec": [ + { + "type": "count", + "name": "count" + }, + { + "type": "doubleSum", + "name": "added", + "fieldName": "added" + }, + { + "type": "doubleSum", + "name": "deleted", + "fieldName": "deleted" + }, + { + "type": "doubleSum", + "name": "delta", + "fieldName": "delta" + } + ], + "granularitySpec": { + "segmentGranularity": "minute", + "queryGranularity": "second" + }, + "parser": { + "type": "map", + "parseSpec": { + "columns": [ + "timestamp", + "page", + "language", + "user", + "unpatrolled", + "newPage", + "robot", + "anonymous", + "namespace", + "continent", + "country", + "region", + "city", + "added", + "deleted", + "delta" + ], + "timestampSpec": { + "column": "timestamp", + "format": "iso" + }, + "dimensionsSpec": { + "dimensions": [ + "page", + "language", + "user", + "unpatrolled", + "newPage", + "robot", + "anonymous", + "namespace", + "continent", + "country", + "region", + "city" + ] + } + } + } + }, + "ioConfig": { + "type": "realtime", + "firehose": { + "type": "timed", + "shutoffTime": "#SHUTOFFTIME", + "delegate": { + "type": "receiver", + "serviceName": "eventReceiverServiceName", + "bufferSize": 100000 + } + } + }, + "tuningConfig": { + "type": "realtime_appenderator", + "maxRowsInMemory": 1, + "intermediatePersistPeriod": "PT1M" + } + } +}