mirror of https://github.com/apache/druid.git
Integration test for AppenderatorDriverRealtimeIndexTask (#5355)
This commit is contained in:
parent
eb17fba0e2
commit
66e183eeab
|
@ -2,7 +2,8 @@
|
||||||
FROM imply/druiditbase
|
FROM imply/druiditbase
|
||||||
|
|
||||||
# Setup metadata store
|
# Setup metadata store
|
||||||
RUN /etc/init.d/mysql start \
|
# touch is needed because OverlayFS's copy-up operation breaks POSIX standards. See https://github.com/docker/for-linux/issues/72.
|
||||||
|
RUN find /var/lib/mysql -type f -exec touch {} \; && /etc/init.d/mysql start \
|
||||||
&& echo "GRANT ALL ON druid.* TO 'druid'@'%' IDENTIFIED BY 'diurd'; CREATE database druid DEFAULT CHARACTER SET utf8;" | mysql -u root \
|
&& echo "GRANT ALL ON druid.* TO 'druid'@'%' IDENTIFIED BY 'diurd'; CREATE database druid DEFAULT CHARACTER SET utf8;" | mysql -u root \
|
||||||
&& /etc/init.d/mysql stop
|
&& /etc/init.d/mysql stop
|
||||||
|
|
||||||
|
@ -10,11 +11,13 @@ RUN /etc/init.d/mysql start \
|
||||||
ADD lib/* /usr/local/druid/lib/
|
ADD lib/* /usr/local/druid/lib/
|
||||||
|
|
||||||
# Add sample data
|
# Add sample data
|
||||||
RUN /etc/init.d/mysql start \
|
# touch is needed because OverlayFS's copy-up operation breaks POSIX standards. See https://github.com/docker/for-linux/issues/72.
|
||||||
|
RUN find /var/lib/mysql -type f -exec touch {} \; && service mysql start \
|
||||||
&& java -cp "/usr/local/druid/lib/*" -Ddruid.metadata.storage.type=mysql io.druid.cli.Main tools metadata-init --connectURI="jdbc:mysql://localhost:3306/druid" --user=druid --password=diurd \
|
&& java -cp "/usr/local/druid/lib/*" -Ddruid.metadata.storage.type=mysql io.druid.cli.Main tools metadata-init --connectURI="jdbc:mysql://localhost:3306/druid" --user=druid --password=diurd \
|
||||||
&& /etc/init.d/mysql stop
|
&& /etc/init.d/mysql stop
|
||||||
ADD sample-data.sql sample-data.sql
|
ADD sample-data.sql sample-data.sql
|
||||||
RUN /etc/init.d/mysql start \
|
# touch is needed because OverlayFS's copy-up operation breaks POSIX standards. See https://github.com/docker/for-linux/issues/72.
|
||||||
|
RUN find /var/lib/mysql -type f -exec touch {} \; && service mysql start \
|
||||||
&& cat sample-data.sql | mysql -u root druid \
|
&& cat sample-data.sql | mysql -u root druid \
|
||||||
&& /etc/init.d/mysql stop
|
&& /etc/init.d/mysql stop
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,178 @@
|
||||||
|
/*
|
||||||
|
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. Metamarkets licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
package io.druid.tests.indexer;
|
||||||
|
|
||||||
|
import com.google.common.base.Throwables;
|
||||||
|
import com.google.inject.Inject;
|
||||||
|
import io.druid.curator.discovery.ServerDiscoveryFactory;
|
||||||
|
import io.druid.java.util.common.DateTimes;
|
||||||
|
import io.druid.java.util.common.ISE;
|
||||||
|
import io.druid.java.util.common.StringUtils;
|
||||||
|
import io.druid.java.util.common.logger.Logger;
|
||||||
|
import io.druid.java.util.http.client.HttpClient;
|
||||||
|
import io.druid.testing.IntegrationTestingConfig;
|
||||||
|
import io.druid.testing.guice.TestClient;
|
||||||
|
import io.druid.testing.utils.RetryUtil;
|
||||||
|
import org.apache.commons.io.IOUtils;
|
||||||
|
import org.joda.time.DateTime;
|
||||||
|
import org.joda.time.format.DateTimeFormat;
|
||||||
|
import org.joda.time.format.DateTimeFormatter;
|
||||||
|
|
||||||
|
import java.io.InputStream;
|
||||||
|
import java.util.concurrent.Callable;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Steps
|
||||||
|
* 1) Submit a realtime index task
|
||||||
|
* 2) Load data using EventReceiverFirehose
|
||||||
|
* 3) Run queries to verify that the ingested data is available for queries
|
||||||
|
* 4) Wait for handover of the segment to historical node
|
||||||
|
* 5) Query data (will be from historical node)
|
||||||
|
* 6) Disable and delete the created data segment
|
||||||
|
*/
|
||||||
|
public abstract class AbstractITRealtimeIndexTaskTest extends AbstractIndexerTest
|
||||||
|
{
|
||||||
|
static final String EVENT_RECEIVER_SERVICE_NAME = "eventReceiverServiceName";
|
||||||
|
static final String EVENT_DATA_FILE = "/indexer/wikipedia_realtime_index_data.json";
|
||||||
|
|
||||||
|
private static final Logger LOG = new Logger(AbstractITRealtimeIndexTaskTest.class);
|
||||||
|
private static final String INDEX_DATASOURCE = "wikipedia_index_test";
|
||||||
|
|
||||||
|
static final int DELAY_BETWEEN_EVENTS_SECS = 4;
|
||||||
|
String taskID;
|
||||||
|
final String TIME_PLACEHOLDER = "YYYY-MM-DDTHH:MM:SS";
|
||||||
|
// format for putting datestamp into events
|
||||||
|
final DateTimeFormatter EVENT_FMT = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss");
|
||||||
|
// format for the querying interval
|
||||||
|
final DateTimeFormatter INTERVAL_FMT = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:'00Z'");
|
||||||
|
// format for the expected timestamp in a query response
|
||||||
|
final DateTimeFormatter TIMESTAMP_FMT = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss'.000Z'");
|
||||||
|
DateTime dtFirst; // timestamp of 1st event
|
||||||
|
DateTime dtLast; // timestamp of last event
|
||||||
|
DateTime dtGroupBy; // timestamp for expected response for groupBy query
|
||||||
|
|
||||||
|
@Inject
|
||||||
|
ServerDiscoveryFactory factory;
|
||||||
|
@Inject
|
||||||
|
@TestClient
|
||||||
|
HttpClient httpClient;
|
||||||
|
|
||||||
|
@Inject
|
||||||
|
IntegrationTestingConfig config;
|
||||||
|
|
||||||
|
void doTest() throws Exception
|
||||||
|
{
|
||||||
|
LOG.info("Starting test: ITRealtimeIndexTaskTest");
|
||||||
|
try {
|
||||||
|
// the task will run for 3 minutes and then shutdown itself
|
||||||
|
String task = setShutOffTime(
|
||||||
|
getTaskAsString(getTaskResource()),
|
||||||
|
DateTimes.utc(System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(3))
|
||||||
|
);
|
||||||
|
LOG.info("indexerSpec: [%s]\n", task);
|
||||||
|
taskID = indexer.submitTask(task);
|
||||||
|
|
||||||
|
// this posts 22 events, one every 4 seconds
|
||||||
|
// each event contains the current time as its timestamp except
|
||||||
|
// the timestamp for the 14th event is early enough that the event should be ignored
|
||||||
|
// the timestamp for the 18th event is 2 seconds earlier than the 17th
|
||||||
|
postEvents();
|
||||||
|
|
||||||
|
// sleep for a while to let the events be ingested
|
||||||
|
TimeUnit.SECONDS.sleep(5);
|
||||||
|
|
||||||
|
// put the timestamps into the query structure
|
||||||
|
String query_response_template = null;
|
||||||
|
InputStream is = ITRealtimeIndexTaskTest.class.getResourceAsStream(getQueriesResource());
|
||||||
|
if (null == is) {
|
||||||
|
throw new ISE("could not open query file: %s", getQueriesResource());
|
||||||
|
}
|
||||||
|
query_response_template = IOUtils.toString(is, "UTF-8");
|
||||||
|
|
||||||
|
String queryStr = query_response_template
|
||||||
|
.replace("%%TIMEBOUNDARY_RESPONSE_TIMESTAMP%%", TIMESTAMP_FMT.print(dtFirst))
|
||||||
|
.replace("%%TIMEBOUNDARY_RESPONSE_MAXTIME%%", TIMESTAMP_FMT.print(dtLast))
|
||||||
|
.replace("%%TIMEBOUNDARY_RESPONSE_MINTIME%%", TIMESTAMP_FMT.print(dtFirst))
|
||||||
|
.replace("%%TIMESERIES_QUERY_START%%", INTERVAL_FMT.print(dtFirst))
|
||||||
|
.replace("%%TIMESERIES_QUERY_END%%", INTERVAL_FMT.print(dtLast.plusMinutes(2)))
|
||||||
|
.replace("%%TIMESERIES_RESPONSE_TIMESTAMP%%", TIMESTAMP_FMT.print(dtFirst))
|
||||||
|
.replace("%%POST_AG_REQUEST_START%%", INTERVAL_FMT.print(dtFirst))
|
||||||
|
.replace("%%POST_AG_REQUEST_END%%", INTERVAL_FMT.print(dtLast.plusMinutes(2)))
|
||||||
|
.replace("%%POST_AG_RESPONSE_TIMESTAMP%%", TIMESTAMP_FMT.print(dtGroupBy.withSecondOfMinute(0))
|
||||||
|
);
|
||||||
|
|
||||||
|
// should hit the queries all on realtime task or some on realtime task
|
||||||
|
// and some on historical. Which it is depends on where in the minute we were
|
||||||
|
// when we started posting events.
|
||||||
|
try {
|
||||||
|
this.queryHelper.testQueriesFromString(getRouterURL(), queryStr, 2);
|
||||||
|
}
|
||||||
|
catch (Exception e) {
|
||||||
|
throw Throwables.propagate(e);
|
||||||
|
}
|
||||||
|
|
||||||
|
// wait for the task to complete
|
||||||
|
indexer.waitUntilTaskCompletes(taskID);
|
||||||
|
|
||||||
|
// task should complete only after the segments are loaded by historical node
|
||||||
|
RetryUtil.retryUntil(
|
||||||
|
new Callable<Boolean>()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public Boolean call() throws Exception
|
||||||
|
{
|
||||||
|
return coordinator.areSegmentsLoaded(INDEX_DATASOURCE);
|
||||||
|
}
|
||||||
|
},
|
||||||
|
true,
|
||||||
|
60000,
|
||||||
|
10,
|
||||||
|
"Real-time generated segments loaded"
|
||||||
|
);
|
||||||
|
|
||||||
|
// queries should be answered by historical
|
||||||
|
this.queryHelper.testQueriesFromString(getRouterURL(), queryStr, 2);
|
||||||
|
}
|
||||||
|
catch (Exception e) {
|
||||||
|
throw Throwables.propagate(e);
|
||||||
|
}
|
||||||
|
finally {
|
||||||
|
unloadAndKillData(INDEX_DATASOURCE);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
String setShutOffTime(String taskAsString, DateTime time)
|
||||||
|
{
|
||||||
|
return taskAsString.replace("#SHUTOFFTIME", time.toString());
|
||||||
|
}
|
||||||
|
|
||||||
|
String getRouterURL()
|
||||||
|
{
|
||||||
|
return StringUtils.format(
|
||||||
|
"%s/druid/v2?pretty",
|
||||||
|
config.getRouterUrl()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
abstract String getTaskResource();
|
||||||
|
abstract String getQueriesResource();
|
||||||
|
|
||||||
|
abstract void postEvents() throws Exception;
|
||||||
|
}
|
|
@ -0,0 +1,144 @@
|
||||||
|
/*
|
||||||
|
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. Metamarkets licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
package io.druid.tests.indexer;
|
||||||
|
|
||||||
|
import com.google.common.base.Throwables;
|
||||||
|
import io.druid.curator.discovery.ServerDiscoverySelector;
|
||||||
|
import io.druid.java.util.common.DateTimes;
|
||||||
|
import io.druid.java.util.common.ISE;
|
||||||
|
import io.druid.java.util.common.jackson.JacksonUtils;
|
||||||
|
import io.druid.java.util.common.logger.Logger;
|
||||||
|
import io.druid.testing.clients.EventReceiverFirehoseTestClient;
|
||||||
|
import io.druid.testing.guice.DruidTestModuleFactory;
|
||||||
|
import io.druid.testing.utils.ServerDiscoveryUtil;
|
||||||
|
import org.joda.time.DateTime;
|
||||||
|
import org.testng.annotations.Guice;
|
||||||
|
import org.testng.annotations.Test;
|
||||||
|
|
||||||
|
import javax.ws.rs.core.MediaType;
|
||||||
|
import java.io.BufferedReader;
|
||||||
|
import java.io.InputStreamReader;
|
||||||
|
import java.nio.charset.StandardCharsets;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* See {@link AbstractITRealtimeIndexTaskTest} for test details.
|
||||||
|
*/
|
||||||
|
@Guice(moduleFactory = DruidTestModuleFactory.class)
|
||||||
|
public class ITAppenderatorDriverRealtimeIndexTaskTest extends AbstractITRealtimeIndexTaskTest
|
||||||
|
{
|
||||||
|
private static final Logger LOG = new Logger(ITAppenderatorDriverRealtimeIndexTaskTest.class);
|
||||||
|
private static final String REALTIME_TASK_RESOURCE = "/indexer/wikipedia_realtime_appenderator_index_task.json";
|
||||||
|
private static final String REALTIME_QUERIES_RESOURCE = "/indexer/wikipedia_realtime_appenderator_index_queries.json";
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRealtimeIndexTask() throws Exception
|
||||||
|
{
|
||||||
|
doTest();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void postEvents() throws Exception
|
||||||
|
{
|
||||||
|
final ServerDiscoverySelector eventReceiverSelector = factory.createSelector(EVENT_RECEIVER_SERVICE_NAME);
|
||||||
|
eventReceiverSelector.start();
|
||||||
|
BufferedReader reader = null;
|
||||||
|
InputStreamReader isr = null;
|
||||||
|
try {
|
||||||
|
isr = new InputStreamReader(
|
||||||
|
ITRealtimeIndexTaskTest.class.getResourceAsStream(EVENT_DATA_FILE),
|
||||||
|
StandardCharsets.UTF_8
|
||||||
|
);
|
||||||
|
}
|
||||||
|
catch (Exception e) {
|
||||||
|
throw Throwables.propagate(e);
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
reader = new BufferedReader(isr);
|
||||||
|
ServerDiscoveryUtil.waitUntilInstanceReady(eventReceiverSelector, "Event Receiver");
|
||||||
|
// Use the host from the config file and the port announced in zookeeper
|
||||||
|
String host = config.getMiddleManagerHost() + ":" + eventReceiverSelector.pick().getPort();
|
||||||
|
LOG.info("Event Receiver Found at host [%s]", host);
|
||||||
|
EventReceiverFirehoseTestClient client = new EventReceiverFirehoseTestClient(
|
||||||
|
host,
|
||||||
|
EVENT_RECEIVER_SERVICE_NAME,
|
||||||
|
jsonMapper,
|
||||||
|
httpClient,
|
||||||
|
smileMapper
|
||||||
|
);
|
||||||
|
// there are 22 lines in the file
|
||||||
|
int i = 1;
|
||||||
|
DateTime dt = DateTimes.nowUtc(); // timestamp used for sending each event
|
||||||
|
dtFirst = dt;
|
||||||
|
dtLast = dt;
|
||||||
|
String line;
|
||||||
|
while ((line = reader.readLine()) != null) {
|
||||||
|
if (i == 15) { // for the 15th line, use a time before the window
|
||||||
|
dt = dt.minusMinutes(10);
|
||||||
|
dtFirst = dt; // oldest timestamp
|
||||||
|
} else if (i == 16) { // remember this time to use in the expected response from the groupBy query
|
||||||
|
dtGroupBy = dt;
|
||||||
|
} else if (i == 18) { // use a time 6 seconds ago so it will be out of order
|
||||||
|
dt = dt.minusSeconds(6);
|
||||||
|
}
|
||||||
|
String event = line.replace(TIME_PLACEHOLDER, EVENT_FMT.print(dt));
|
||||||
|
LOG.info("sending event: [%s]\n", event);
|
||||||
|
Collection<Map<String, Object>> events = new ArrayList<Map<String, Object>>();
|
||||||
|
events.add(
|
||||||
|
this.jsonMapper.readValue(
|
||||||
|
event, JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT
|
||||||
|
)
|
||||||
|
);
|
||||||
|
int eventsPosted = client.postEvents(events, this.jsonMapper, MediaType.APPLICATION_JSON);
|
||||||
|
if (eventsPosted != events.size()) {
|
||||||
|
throw new ISE("Event not posted");
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
Thread.sleep(DELAY_BETWEEN_EVENTS_SECS * 1000);
|
||||||
|
}
|
||||||
|
catch (InterruptedException ex) { /* nothing */ }
|
||||||
|
dtLast = dt; // latest timestamp
|
||||||
|
dt = DateTimes.nowUtc();
|
||||||
|
i++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch (Exception e) {
|
||||||
|
throw Throwables.propagate(e);
|
||||||
|
}
|
||||||
|
finally {
|
||||||
|
reader.close();
|
||||||
|
eventReceiverSelector.stop();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
String getTaskResource()
|
||||||
|
{
|
||||||
|
return REALTIME_TASK_RESOURCE;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
String getQueriesResource()
|
||||||
|
{
|
||||||
|
return REALTIME_QUERIES_RESOURCE;
|
||||||
|
}
|
||||||
|
}
|
|
@ -20,167 +20,55 @@
|
||||||
package io.druid.tests.indexer;
|
package io.druid.tests.indexer;
|
||||||
|
|
||||||
import com.google.common.base.Throwables;
|
import com.google.common.base.Throwables;
|
||||||
import com.google.inject.Inject;
|
|
||||||
import io.druid.java.util.http.client.HttpClient;
|
|
||||||
import io.druid.curator.discovery.ServerDiscoveryFactory;
|
|
||||||
import io.druid.curator.discovery.ServerDiscoverySelector;
|
import io.druid.curator.discovery.ServerDiscoverySelector;
|
||||||
import io.druid.java.util.common.DateTimes;
|
import io.druid.java.util.common.DateTimes;
|
||||||
import io.druid.java.util.common.ISE;
|
import io.druid.java.util.common.ISE;
|
||||||
import io.druid.java.util.common.StringUtils;
|
|
||||||
import io.druid.java.util.common.jackson.JacksonUtils;
|
import io.druid.java.util.common.jackson.JacksonUtils;
|
||||||
import io.druid.java.util.common.logger.Logger;
|
import io.druid.java.util.common.logger.Logger;
|
||||||
import io.druid.testing.IntegrationTestingConfig;
|
|
||||||
import io.druid.testing.clients.EventReceiverFirehoseTestClient;
|
import io.druid.testing.clients.EventReceiverFirehoseTestClient;
|
||||||
import io.druid.testing.guice.DruidTestModuleFactory;
|
import io.druid.testing.guice.DruidTestModuleFactory;
|
||||||
import io.druid.testing.guice.TestClient;
|
|
||||||
import io.druid.testing.utils.RetryUtil;
|
|
||||||
import io.druid.testing.utils.ServerDiscoveryUtil;
|
import io.druid.testing.utils.ServerDiscoveryUtil;
|
||||||
import org.apache.commons.io.IOUtils;
|
|
||||||
import org.joda.time.DateTime;
|
import org.joda.time.DateTime;
|
||||||
import org.joda.time.format.DateTimeFormat;
|
|
||||||
import org.joda.time.format.DateTimeFormatter;
|
|
||||||
import org.testng.annotations.Guice;
|
import org.testng.annotations.Guice;
|
||||||
import org.testng.annotations.Test;
|
import org.testng.annotations.Test;
|
||||||
|
|
||||||
import javax.ws.rs.core.MediaType;
|
import javax.ws.rs.core.MediaType;
|
||||||
import java.io.BufferedReader;
|
import java.io.BufferedReader;
|
||||||
import java.io.InputStream;
|
|
||||||
import java.io.InputStreamReader;
|
import java.io.InputStreamReader;
|
||||||
import java.nio.charset.StandardCharsets;
|
import java.nio.charset.StandardCharsets;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.Callable;
|
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Steps
|
* See {@link AbstractITRealtimeIndexTaskTest} for test details.
|
||||||
* 1) Submit a realtime index task
|
|
||||||
* 2) Load data using EventReceiverFirehose
|
|
||||||
* 3) Run queries to verify that the ingested data is available for queries
|
|
||||||
* 4) Wait for handover of the segment to historical node
|
|
||||||
* 5) Query data (will be from historical node)
|
|
||||||
* 6) Disable and delete the created data segment
|
|
||||||
*/
|
*/
|
||||||
@Guice(moduleFactory = DruidTestModuleFactory.class)
|
@Guice(moduleFactory = DruidTestModuleFactory.class)
|
||||||
public class ITRealtimeIndexTaskTest extends AbstractIndexerTest
|
public class ITRealtimeIndexTaskTest extends AbstractITRealtimeIndexTaskTest
|
||||||
{
|
{
|
||||||
private static final Logger LOG = new Logger(ITRealtimeIndexTaskTest.class);
|
private static final Logger LOG = new Logger(ITRealtimeIndexTaskTest.class);
|
||||||
private static final String REALTIME_TASK_RESOURCE = "/indexer/wikipedia_realtime_index_task.json";
|
private static final String REALTIME_TASK_RESOURCE = "/indexer/wikipedia_realtime_index_task.json";
|
||||||
private static final String EVENT_RECEIVER_SERVICE_NAME = "eventReceiverServiceName";
|
|
||||||
private static final String EVENT_DATA_FILE = "/indexer/wikipedia_realtime_index_data.json";
|
|
||||||
private static final String REALTIME_QUERIES_RESOURCE = "/indexer/wikipedia_realtime_index_queries.json";
|
private static final String REALTIME_QUERIES_RESOURCE = "/indexer/wikipedia_realtime_index_queries.json";
|
||||||
private static final String INDEX_DATASOURCE = "wikipedia_index_test";
|
|
||||||
|
|
||||||
private static final int DELAY_BETWEEN_EVENTS_SECS = 4;
|
|
||||||
private String taskID;
|
|
||||||
private final String TIME_PLACEHOLDER = "YYYY-MM-DDTHH:MM:SS";
|
|
||||||
// format for putting datestamp into events
|
|
||||||
private final DateTimeFormatter EVENT_FMT = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss");
|
|
||||||
// format for the querying interval
|
|
||||||
private final DateTimeFormatter INTERVAL_FMT = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:'00Z'");
|
|
||||||
// format for the expected timestamp in a query response
|
|
||||||
private final DateTimeFormatter TIMESTAMP_FMT = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss'.000Z'");
|
|
||||||
private DateTime dtFirst; // timestamp of 1st event
|
|
||||||
private DateTime dtLast; // timestamp of last event
|
|
||||||
private DateTime dtGroupBy; // timestamp for expected response for groupBy query
|
|
||||||
|
|
||||||
@Inject
|
|
||||||
ServerDiscoveryFactory factory;
|
|
||||||
@Inject
|
|
||||||
@TestClient
|
|
||||||
HttpClient httpClient;
|
|
||||||
|
|
||||||
@Inject
|
|
||||||
IntegrationTestingConfig config;
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testRealtimeIndexTask() throws Exception
|
public void testRealtimeIndexTask() throws Exception
|
||||||
{
|
{
|
||||||
LOG.info("Starting test: ITRealtimeIndexTaskTest");
|
doTest();
|
||||||
try {
|
|
||||||
// the task will run for 3 minutes and then shutdown itself
|
|
||||||
String task = setShutOffTime(
|
|
||||||
getTaskAsString(REALTIME_TASK_RESOURCE),
|
|
||||||
DateTimes.utc(System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(3))
|
|
||||||
);
|
|
||||||
LOG.info("indexerSpec: [%s]\n", task);
|
|
||||||
taskID = indexer.submitTask(task);
|
|
||||||
|
|
||||||
// this posts 22 events, one every 4 seconds
|
|
||||||
// each event contains the current time as its timestamp except
|
|
||||||
// the timestamp for the 14th event is early enough that the event should be ignored
|
|
||||||
// the timestamp for the 18th event is 2 seconds earlier than the 17th
|
|
||||||
postEvents();
|
|
||||||
|
|
||||||
// sleep for a while to let the events be ingested
|
|
||||||
TimeUnit.SECONDS.sleep(5);
|
|
||||||
|
|
||||||
// put the timestamps into the query structure
|
|
||||||
String query_response_template = null;
|
|
||||||
InputStream is = ITRealtimeIndexTaskTest.class.getResourceAsStream(REALTIME_QUERIES_RESOURCE);
|
|
||||||
if (null == is) {
|
|
||||||
throw new ISE("could not open query file: %s", REALTIME_QUERIES_RESOURCE);
|
|
||||||
}
|
|
||||||
query_response_template = IOUtils.toString(is, "UTF-8");
|
|
||||||
|
|
||||||
String queryStr = query_response_template
|
|
||||||
.replace("%%TIMEBOUNDARY_RESPONSE_TIMESTAMP%%", TIMESTAMP_FMT.print(dtFirst))
|
|
||||||
.replace("%%TIMEBOUNDARY_RESPONSE_MAXTIME%%", TIMESTAMP_FMT.print(dtLast))
|
|
||||||
.replace("%%TIMEBOUNDARY_RESPONSE_MINTIME%%", TIMESTAMP_FMT.print(dtFirst))
|
|
||||||
.replace("%%TIMESERIES_QUERY_START%%", INTERVAL_FMT.print(dtFirst))
|
|
||||||
.replace("%%TIMESERIES_QUERY_END%%", INTERVAL_FMT.print(dtLast.plusMinutes(2)))
|
|
||||||
.replace("%%TIMESERIES_RESPONSE_TIMESTAMP%%", TIMESTAMP_FMT.print(dtFirst))
|
|
||||||
.replace("%%POST_AG_REQUEST_START%%", INTERVAL_FMT.print(dtFirst))
|
|
||||||
.replace("%%POST_AG_REQUEST_END%%", INTERVAL_FMT.print(dtLast.plusMinutes(2)))
|
|
||||||
.replace("%%POST_AG_RESPONSE_TIMESTAMP%%", TIMESTAMP_FMT.print(dtGroupBy.withSecondOfMinute(0))
|
|
||||||
);
|
|
||||||
|
|
||||||
// should hit the queries all on realtime task or some on realtime task
|
|
||||||
// and some on historical. Which it is depends on where in the minute we were
|
|
||||||
// when we started posting events.
|
|
||||||
try {
|
|
||||||
this.queryHelper.testQueriesFromString(getRouterURL(), queryStr, 2);
|
|
||||||
}
|
|
||||||
catch (Exception e) {
|
|
||||||
throw Throwables.propagate(e);
|
|
||||||
}
|
|
||||||
|
|
||||||
// wait for the task to complete
|
|
||||||
indexer.waitUntilTaskCompletes(taskID);
|
|
||||||
|
|
||||||
// task should complete only after the segments are loaded by historical node
|
|
||||||
RetryUtil.retryUntil(
|
|
||||||
new Callable<Boolean>()
|
|
||||||
{
|
|
||||||
@Override
|
|
||||||
public Boolean call() throws Exception
|
|
||||||
{
|
|
||||||
return coordinator.areSegmentsLoaded(INDEX_DATASOURCE);
|
|
||||||
}
|
|
||||||
},
|
|
||||||
true,
|
|
||||||
60000,
|
|
||||||
10,
|
|
||||||
"Real-time generated segments loaded"
|
|
||||||
);
|
|
||||||
|
|
||||||
// queries should be answered by historical
|
|
||||||
this.queryHelper.testQueriesFromString(getRouterURL(), queryStr, 2);
|
|
||||||
}
|
|
||||||
catch (Exception e) {
|
|
||||||
throw Throwables.propagate(e);
|
|
||||||
}
|
|
||||||
finally {
|
|
||||||
unloadAndKillData(INDEX_DATASOURCE);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private String setShutOffTime(String taskAsString, DateTime time)
|
@Override
|
||||||
|
String getTaskResource()
|
||||||
{
|
{
|
||||||
return taskAsString.replace("#SHUTOFFTIME", time.toString());
|
return REALTIME_TASK_RESOURCE;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
String getQueriesResource()
|
||||||
|
{
|
||||||
|
return REALTIME_QUERIES_RESOURCE;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public void postEvents() throws Exception
|
public void postEvents() throws Exception
|
||||||
{
|
{
|
||||||
final ServerDiscoverySelector eventReceiverSelector = factory.createSelector(EVENT_RECEIVER_SERVICE_NAME);
|
final ServerDiscoverySelector eventReceiverSelector = factory.createSelector(EVENT_RECEIVER_SERVICE_NAME);
|
||||||
|
@ -253,13 +141,4 @@ public class ITRealtimeIndexTaskTest extends AbstractIndexerTest
|
||||||
eventReceiverSelector.stop();
|
eventReceiverSelector.stop();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private String getRouterURL()
|
|
||||||
{
|
|
||||||
return StringUtils.format(
|
|
||||||
"%s/druid/v2?pretty",
|
|
||||||
config.getRouterUrl()
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,87 @@
|
||||||
|
[
|
||||||
|
{
|
||||||
|
"description": "timeBoundary",
|
||||||
|
"query": {
|
||||||
|
"queryType":"timeBoundary",
|
||||||
|
"dataSource":"wikipedia_index_test"
|
||||||
|
},
|
||||||
|
"expectedResults":[
|
||||||
|
{
|
||||||
|
"timestamp":"%%TIMEBOUNDARY_RESPONSE_TIMESTAMP%%",
|
||||||
|
"result": {
|
||||||
|
"maxTime" : "%%TIMEBOUNDARY_RESPONSE_MAXTIME%%",
|
||||||
|
"minTime":"%%TIMEBOUNDARY_RESPONSE_MINTIME%%"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"description": "timeseries",
|
||||||
|
"query": {
|
||||||
|
"queryType": "timeseries",
|
||||||
|
"dataSource": "wikipedia_index_test",
|
||||||
|
"intervals": [ "%%TIMESERIES_QUERY_START%%/%%TIMESERIES_QUERY_END%%" ],
|
||||||
|
"granularity": "all",
|
||||||
|
"aggregations": [
|
||||||
|
{"type": "longSum", "fieldName": "count", "name": "edit_count"},
|
||||||
|
{"type": "doubleSum", "fieldName": "added", "name": "chars_added"}
|
||||||
|
]
|
||||||
|
},
|
||||||
|
"expectedResults": [
|
||||||
|
{
|
||||||
|
"timestamp" : "%%TIMESERIES_RESPONSE_TIMESTAMP%%",
|
||||||
|
"result" : {
|
||||||
|
"chars_added" : 1642.0,
|
||||||
|
"edit_count" : 22
|
||||||
|
}
|
||||||
|
}
|
||||||
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"description":"having spec on post aggregation",
|
||||||
|
"query":{
|
||||||
|
"queryType":"groupBy",
|
||||||
|
"dataSource":"wikipedia_index_test",
|
||||||
|
"granularity":"minute",
|
||||||
|
"dimensions":[
|
||||||
|
"page"
|
||||||
|
],
|
||||||
|
"aggregations":[
|
||||||
|
{
|
||||||
|
"type":"count",
|
||||||
|
"name":"rows"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"type":"longSum",
|
||||||
|
"fieldName":"added",
|
||||||
|
"name":"added_count"
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"postAggregations": [
|
||||||
|
{
|
||||||
|
"type":"arithmetic",
|
||||||
|
"name":"added_count_times_ten",
|
||||||
|
"fn":"*",
|
||||||
|
"fields":[
|
||||||
|
{"type":"fieldAccess", "name":"added_count", "fieldName":"added_count"},
|
||||||
|
{"type":"constant", "name":"const", "value":10}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"having":{"type":"greaterThan", "aggregation":"added_count_times_ten", "value":9000},
|
||||||
|
"intervals":[
|
||||||
|
"%%POST_AG_REQUEST_START%%/%%POST_AG_REQUEST_END%%"
|
||||||
|
]
|
||||||
|
},
|
||||||
|
"expectedResults":[ {
|
||||||
|
"version" : "v1",
|
||||||
|
"timestamp" : "%%POST_AG_RESPONSE_TIMESTAMP%%",
|
||||||
|
"event" : {
|
||||||
|
"added_count_times_ten" : 9050.0,
|
||||||
|
"page" : "Crimson Typhoon",
|
||||||
|
"added_count" : 905,
|
||||||
|
"rows" : 1
|
||||||
|
}
|
||||||
|
} ]
|
||||||
|
}
|
||||||
|
]
|
|
@ -0,0 +1,93 @@
|
||||||
|
{
|
||||||
|
"type": "index_realtime_appenderator",
|
||||||
|
"spec": {
|
||||||
|
"dataSchema": {
|
||||||
|
"dataSource": "wikipedia_index_test",
|
||||||
|
"metricsSpec": [
|
||||||
|
{
|
||||||
|
"type": "count",
|
||||||
|
"name": "count"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"type": "doubleSum",
|
||||||
|
"name": "added",
|
||||||
|
"fieldName": "added"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"type": "doubleSum",
|
||||||
|
"name": "deleted",
|
||||||
|
"fieldName": "deleted"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"type": "doubleSum",
|
||||||
|
"name": "delta",
|
||||||
|
"fieldName": "delta"
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"granularitySpec": {
|
||||||
|
"segmentGranularity": "minute",
|
||||||
|
"queryGranularity": "second"
|
||||||
|
},
|
||||||
|
"parser": {
|
||||||
|
"type": "map",
|
||||||
|
"parseSpec": {
|
||||||
|
"columns": [
|
||||||
|
"timestamp",
|
||||||
|
"page",
|
||||||
|
"language",
|
||||||
|
"user",
|
||||||
|
"unpatrolled",
|
||||||
|
"newPage",
|
||||||
|
"robot",
|
||||||
|
"anonymous",
|
||||||
|
"namespace",
|
||||||
|
"continent",
|
||||||
|
"country",
|
||||||
|
"region",
|
||||||
|
"city",
|
||||||
|
"added",
|
||||||
|
"deleted",
|
||||||
|
"delta"
|
||||||
|
],
|
||||||
|
"timestampSpec": {
|
||||||
|
"column": "timestamp",
|
||||||
|
"format": "iso"
|
||||||
|
},
|
||||||
|
"dimensionsSpec": {
|
||||||
|
"dimensions": [
|
||||||
|
"page",
|
||||||
|
"language",
|
||||||
|
"user",
|
||||||
|
"unpatrolled",
|
||||||
|
"newPage",
|
||||||
|
"robot",
|
||||||
|
"anonymous",
|
||||||
|
"namespace",
|
||||||
|
"continent",
|
||||||
|
"country",
|
||||||
|
"region",
|
||||||
|
"city"
|
||||||
|
]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"ioConfig": {
|
||||||
|
"type": "realtime",
|
||||||
|
"firehose": {
|
||||||
|
"type": "timed",
|
||||||
|
"shutoffTime": "#SHUTOFFTIME",
|
||||||
|
"delegate": {
|
||||||
|
"type": "receiver",
|
||||||
|
"serviceName": "eventReceiverServiceName",
|
||||||
|
"bufferSize": 100000
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"tuningConfig": {
|
||||||
|
"type": "realtime_appenderator",
|
||||||
|
"maxRowsInMemory": 1,
|
||||||
|
"intermediatePersistPeriod": "PT1M"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue