Merge pull request #1786 from rasahner/testEnhancements

enhance test support and update realtime test to use serverTime
This commit is contained in:
Charles Allen 2015-10-15 14:32:23 -07:00
commit 28d7233b3e
18 changed files with 567 additions and 121 deletions

View File

@ -84,6 +84,26 @@
</excludes>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<configuration>
<archive>
<manifest>
<mainClass>org.testng.TestNG</mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>test-jar</id>
<phase>package</phase>
<goals>
<goal>test-jar</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>

View File

@ -30,15 +30,18 @@ import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.Map;
import com.metamx.common.logger.Logger;
public class ConfigFileConfigProvider implements IntegrationTestingConfigProvider
{
private final static Logger LOG = new Logger(ConfigFileConfigProvider.class);
private String routerHost = "";
private String brokerHost = "";
private String historicalHost = "";
private String coordinatorHost = "";
private String indexerHost = "";
private String middleManagerHost = "";
private String zookeeperHosts = "";
private String zookeeperHosts = ""; // comma-separated list of host:port
private Map<String, String> props = null;
@JsonCreator
@ -61,10 +64,18 @@ public class ConfigFileConfigProvider implements IntegrationTestingConfigProvide
}
routerHost = props.get("router_host") + ":" + props.get("router_port");
brokerHost = props.get("broker_host") + ":" + props.get("broker_port");
historicalHost = props.get("historical_host") + ":" + props.get("historical_port");
coordinatorHost = props.get("coordinator_host") + ":" + props.get("coordinator_port");
indexerHost = props.get("indexer_host") + ":" + props.get("indexer_port");
middleManagerHost = props.get("middle_manager_host");
middleManagerHost = props.get("middlemanager_host");
zookeeperHosts = props.get("zookeeper_hosts");
LOG.info ("router: [%s]", routerHost);
LOG.info ("broker [%s]: ", brokerHost);
LOG.info ("coordinator: [%s]", coordinatorHost);
LOG.info ("overlord: [%s]", indexerHost);
LOG.info ("middle manager: [%s]", middleManagerHost);
LOG.info ("zookeepers: [%s]", zookeeperHosts);
}
@Override
@ -96,6 +107,12 @@ public class ConfigFileConfigProvider implements IntegrationTestingConfigProvide
return brokerHost;
}
@Override
public String getHistoricalHost()
{
return historicalHost;
}
@Override
public String getMiddleManagerHost()
{
@ -107,6 +124,12 @@ public class ConfigFileConfigProvider implements IntegrationTestingConfigProvide
{
return zookeeperHosts;
}
@Override
public String getProperty(String keyword)
{
return props.get(keyword);
}
};
}
}

View File

@ -58,6 +58,12 @@ public class DockerConfigProvider implements IntegrationTestingConfigProvider
return dockerIp + ":8082";
}
@Override
public String getHistoricalHost()
{
return dockerIp + ":8083";
}
@Override
public String getMiddleManagerHost()
{
@ -69,6 +75,12 @@ public class DockerConfigProvider implements IntegrationTestingConfigProvider
{
return dockerIp + ":2181";
}
@Override
public String getProperty(String prop)
{
throw new UnsupportedOperationException("DockerConfigProvider does not support getProperty()");
}
};
}
}

View File

@ -29,7 +29,11 @@ public interface IntegrationTestingConfig
public String getBrokerHost();
public String getHistoricalHost();
public String getMiddleManagerHost();
public String getZookeeperHosts();
public String getProperty(String prop);
}

View File

@ -23,7 +23,6 @@ import com.google.common.base.Charsets;
import com.google.common.base.Throwables;
import com.google.inject.Inject;
import com.metamx.common.ISE;
import com.metamx.common.logger.Logger;
import com.metamx.http.client.HttpClient;
import com.metamx.http.client.Request;
import com.metamx.http.client.response.StatusResponseHandler;
@ -40,7 +39,6 @@ import java.util.Map;
public class CoordinatorResourceTestClient
{
private final static Logger LOG = new Logger(CoordinatorResourceTestClient.class);
private final ObjectMapper jsonMapper;
private final HttpClient httpClient;
private final String coordinator;
@ -66,11 +64,16 @@ public class CoordinatorResourceTestClient
);
}
private String getLoadStatusURL()
{
return String.format("%s%s", getCoordinatorURL(), "loadstatus");
}
private Map<String, Integer> getLoadStatus()
{
Map<String, Integer> status = null;
try {
StatusResponseHolder response = makeRequest(HttpMethod.GET, getCoordinatorURL() + "loadstatus?simple");
StatusResponseHolder response = makeRequest(HttpMethod.GET, getLoadStatusURL());
status = jsonMapper.readValue(
response.getContent(), new TypeReference<Map<String, Integer>>()
@ -87,7 +90,7 @@ public class CoordinatorResourceTestClient
public boolean areSegmentsLoaded(String dataSource)
{
final Map<String, Integer> status = getLoadStatus();
return (status.containsKey(dataSource) && status.get(dataSource) == 0);
return (status.containsKey(dataSource) && status.get(dataSource) == 100.0);
}
public void unloadSegmentsForDataSource(String dataSource, Interval interval)
@ -135,7 +138,6 @@ public class CoordinatorResourceTestClient
return response;
}
catch (Exception e) {
LOG.error(e, "Exception while sending request");
throw Throwables.propagate(e);
}
}

View File

@ -67,10 +67,15 @@ public class QueryResourceTestClient
}
public List<Map<String, Object>> query(Query query)
{
return query(getBrokerURL(), query);
}
public List<Map<String, Object>> query(String url, Query query)
{
try {
StatusResponseHolder response = httpClient.go(
new Request(HttpMethod.POST, new URL(getBrokerURL())).setContent(
new Request(HttpMethod.POST, new URL(url)).setContent(
"application/json",
jsonMapper.writeValueAsBytes(query)
), responseHandler

View File

@ -1,77 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright 2012 - 2015 Metamarkets Group Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.druid.testing.utils;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.inject.Inject;
import com.metamx.common.ISE;
import com.metamx.common.logger.Logger;
import io.druid.testing.clients.QueryResourceTestClient;
import java.util.List;
import java.util.Map;
public class FromFileTestQueryHelper
{
public static Logger LOG = new Logger(FromFileTestQueryHelper.class);
private final QueryResourceTestClient queryClient;
private final ObjectMapper jsonMapper;
@Inject
FromFileTestQueryHelper(ObjectMapper jsonMapper, QueryResourceTestClient queryClient)
{
this.jsonMapper = jsonMapper;
this.queryClient = queryClient;
}
public void testQueriesFromFile(String filePath, int timesToRun) throws Exception
{
LOG.info("Starting query tests for [%s]", filePath);
List<QueryWithResults> queries =
jsonMapper.readValue(
FromFileTestQueryHelper.class.getResourceAsStream(filePath),
new TypeReference<List<QueryWithResults>>()
{
}
);
for (int i = 0; i < timesToRun; i++) {
LOG.info("Starting Iteration " + i);
boolean failed = false;
for (QueryWithResults queryWithResult : queries) {
LOG.info("Running Query " + queryWithResult.getQuery().getType());
List<Map<String, Object>> result = queryClient.query(queryWithResult.getQuery());
if (!QueryResultVerifier.compareResults(result, queryWithResult.getExpectedResults())) {
LOG.error(
"Failed while executing %s actualResults : %s",
queryWithResult,
jsonMapper.writeValueAsString(result)
);
failed = true;
} else {
LOG.info("Results Verified for Query " + queryWithResult.getQuery().getType());
}
}
if (failed) {
throw new ISE("one or more twitter queries failed");
}
}
}
}

View File

@ -0,0 +1,119 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.testing.utils;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.inject.Inject;
import com.metamx.common.ISE;
import com.metamx.common.logger.Logger;
import io.druid.testing.clients.QueryResourceTestClient;
import io.druid.testing.IntegrationTestingConfig;
import java.util.List;
import java.util.Map;
public class TestQueryHelper
{
public static Logger LOG = new Logger(TestQueryHelper.class);
private final QueryResourceTestClient queryClient;
private final ObjectMapper jsonMapper;
private final String broker;
@Inject
TestQueryHelper(
ObjectMapper jsonMapper,
QueryResourceTestClient queryClient,
IntegrationTestingConfig config
)
{
this.jsonMapper = jsonMapper;
this.queryClient = queryClient;
this.broker = config.getBrokerHost();
}
public void testQueriesFromFile(String filePath, int timesToRun) throws Exception
{
testQueriesFromFile(getBrokerURL(), filePath, timesToRun);
}
public void testQueriesFromFile(String url, String filePath, int timesToRun) throws Exception
{
LOG.info("Starting query tests for [%s]", filePath);
List<QueryWithResults> queries =
jsonMapper.readValue(
TestQueryHelper.class.getResourceAsStream(filePath),
new TypeReference<List<QueryWithResults>>()
{
}
);
testQueries(url, queries, timesToRun);
}
public void testQueriesFromString(String str, int timesToRun) throws Exception
{
testQueriesFromString(getBrokerURL(), str, timesToRun);
}
public void testQueriesFromString(String url, String str, int timesToRun) throws Exception
{
LOG.info("Starting query tests using\n%s", str);
List<QueryWithResults> queries =
jsonMapper.readValue(
str,
new TypeReference<List<QueryWithResults>>()
{
}
);
testQueries(url, queries, timesToRun);
}
private void testQueries(String url, List<QueryWithResults> queries, int timesToRun) throws Exception
{
for (int i = 0; i < timesToRun; i++) {
LOG.info("Starting Iteration %d", i);
boolean failed = false;
for (QueryWithResults queryWithResult : queries) {
LOG.info("Running Query %s", queryWithResult.getQuery().getType());
List<Map<String, Object>> result = queryClient.query(url, queryWithResult.getQuery());
if (!QueryResultVerifier.compareResults(result, queryWithResult.getExpectedResults())) {
LOG.error(
"Failed while executing %s actualResults : %s",
queryWithResult,
jsonMapper.writeValueAsString(result)
);
failed = true;
} else {
LOG.info("Results Verified for Query %s", queryWithResult.getQuery().getType());
}
}
if (failed) {
throw new ISE("one or more queries failed");
}
}
}
private String getBrokerURL()
{
return String.format("http://%s/druid/v2?pretty", broker);
}
}

View File

@ -23,7 +23,7 @@ import io.druid.guice.annotations.Json;
import io.druid.guice.annotations.Smile;
import io.druid.testing.clients.CoordinatorResourceTestClient;
import io.druid.testing.clients.OverlordResourceTestClient;
import io.druid.testing.utils.FromFileTestQueryHelper;
import io.druid.testing.utils.TestQueryHelper;
import io.druid.testing.utils.RetryUtil;
import org.apache.commons.io.IOUtils;
import org.joda.time.Interval;
@ -46,11 +46,16 @@ public abstract class AbstractIndexerTest
@Smile
protected ObjectMapper smileMapper;
@Inject
protected FromFileTestQueryHelper queryHelper;
protected TestQueryHelper queryHelper;
protected void unloadAndKillData(final String dataSource) throws Exception
{
Interval interval = new Interval("2013-01-01T00:00:00.000Z/2013-12-01T00:00:00.000Z");
unloadAndKillData (dataSource, "2013-01-01T00:00:00.000Z", "2013-12-01T00:00:00.000Z");
}
protected void unloadAndKillData(final String dataSource, String start, String end) throws Exception
{
Interval interval = new Interval(start + "/" + end);
coordinator.unloadSegmentsForDataSource(dataSource, interval);
RetryUtil.retryUntilFalse(
new Callable<Boolean>()

View File

@ -20,6 +20,7 @@ package io.druid.tests.indexer;
import com.google.common.base.Throwables;
import com.google.inject.Inject;
import com.metamx.common.logger.Logger;
import com.metamx.common.ISE;
import com.metamx.http.client.HttpClient;
import io.druid.curator.discovery.ServerDiscoveryFactory;
import io.druid.curator.discovery.ServerDiscoverySelector;
@ -30,20 +31,38 @@ import io.druid.testing.guice.DruidTestModuleFactory;
import io.druid.testing.utils.RetryUtil;
import io.druid.testing.utils.ServerDiscoveryUtil;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
import org.testng.annotations.Guice;
import org.testng.annotations.Test;
import javax.ws.rs.core.MediaType;
import org.apache.commons.io.IOUtils;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import com.fasterxml.jackson.core.type.TypeReference;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
/**
* Steps
* 1) Submit a RealtimeIndexTask
* 2) Load Data using EventReceiverFirehose
* 3) Runs queries and verifies that the ingested data is available for queries
* 4) Waits for handover of the segment to historical node
* 5) Queries data from historical node and verifies handover
* 6) Removes and Delete the created Data Segment
* 1) Submit a realtime index task
* 2) Load data using EventReceiverFirehose
* 3) Run queries to verify that the ingested data is available for queries
* 4) Wait for handover of the segment to historical node
* 5) Query data (will be from historical node)
* 6) Disable and delete the created data segment
*/
@Guice(moduleFactory = DruidTestModuleFactory.class)
public class ITRealtimeIndexTaskTest extends AbstractIndexerTest
@ -51,9 +70,22 @@ public class ITRealtimeIndexTaskTest extends AbstractIndexerTest
private static final Logger LOG = new Logger(ITRealtimeIndexTaskTest.class);
private static final String REALTIME_TASK_RESOURCE = "/indexer/wikipedia_realtime_index_task.json";
private static final String EVENT_RECEIVER_SERVICE_NAME = "eventReceiverServiceName";
private static final String EVENT_DATA_FILE = "/indexer/wikipedia_index_data.json";
private static final String INDEX_QUERIES_RESOURCE = "/indexer/wikipedia_index_queries.json";
private static final String EVENT_DATA_FILE = "/indexer/wikipedia_realtime_index_data.json";
private static final String REALTIME_QUERIES_RESOURCE = "/indexer/wikipedia_realtime_index_queries.json";
private static final String INDEX_DATASOURCE = "wikipedia_index_test";
private static final int DELAY_BETWEEN_EVENTS_SECS = 4;
private String taskID;
private final String TIME_PLACEHOLDER = "YYYY-MM-DDTHH:MM:SS";
// format for putting datestamp into events
private final DateTimeFormatter EVENT_FMT = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss");
// format for the querying interval
private final DateTimeFormatter INTERVAL_FMT = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:'00Z'");
// format for the expected timestamp in a query response
private final DateTimeFormatter TIMESTAMP_FMT = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss'.000Z'");
private DateTime dtFirst; // timestamp of 1st event
private DateTime dtLast; // timestamp of last event
@Inject
ServerDiscoveryFactory factory;
@Inject
@ -66,20 +98,57 @@ public class ITRealtimeIndexTaskTest extends AbstractIndexerTest
@Test
public void testRealtimeIndexTask() throws Exception
{
LOG.info("Starting test: ITRealtimeIndexTaskTest");
try {
// the task will run for 3 minutes and then shutdown itself
String task = setShutOffTime(
getTaskAsString(REALTIME_TASK_RESOURCE),
new DateTime(System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(2))
new DateTime(System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(3))
);
String taskID = indexer.submitTask(task);
LOG.info("indexerSpec: [%s]\n", task);
taskID = indexer.submitTask(task);
// this posts 22 events, one every 4 seconds
// each event contains the current time as its timestamp except
// the timestamp for the 14th event is early enough that the event should be ignored
// the timestamp for the 18th event is 2 seconds earlier than the 17th
postEvents();
// sleep for a while to let the events ingested
// sleep for a while to let the events be ingested
TimeUnit.SECONDS.sleep(5);
// should hit the queries on realtime task
this.queryHelper.testQueriesFromFile(INDEX_QUERIES_RESOURCE, 2);
// put the timestamps into the query structure
String query_response_template = null;
InputStream is = ITRealtimeIndexTaskTest.class.getResourceAsStream(REALTIME_QUERIES_RESOURCE);
if (null == is) {
throw new ISE("could not open query file: %s", REALTIME_QUERIES_RESOURCE);
}
query_response_template = IOUtils.toString(is, "UTF-8");
String queryStr = query_response_template
.replace("%%TIMEBOUNDARY_RESPONSE_TIMESTAMP%%", TIMESTAMP_FMT.print(dtFirst))
.replace("%%TIMEBOUNDARY_RESPONSE_MAXTIME%%", TIMESTAMP_FMT.print(dtLast))
.replace("%%TIMEBOUNDARY_RESPONSE_MINTIME%%", TIMESTAMP_FMT.print(dtFirst))
.replace("%%TIMESERIES_QUERY_START%%", INTERVAL_FMT.print(dtFirst))
.replace("%%TIMESERIES_QUERY_END%%", INTERVAL_FMT.print(dtLast.plusMinutes(2)))
.replace("%%TIMESERIES_RESPONSE_TIMESTAMP%%", TIMESTAMP_FMT.print(dtFirst))
.replace("%%POST_AG_REQUEST_START%%", INTERVAL_FMT.print(dtFirst))
.replace("%%POST_AG_REQUEST_END%%", INTERVAL_FMT.print(dtLast.plusMinutes(2)))
.replace(
"%%POST_AG_RESPONSE_TIMESTAMP%%",
TIMESTAMP_FMT.print(dtLast.minusSeconds(24).withSecondOfMinute(0))
);
// should hit the queries all on realtime task or some on realtime task
// and some on historical. Which it is depends on where in the minute we were
// when we started posting events.
try {
this.queryHelper.testQueriesFromString(getRouterURL(), queryStr, 2);
}
catch (Exception e) {
Throwables.propagate(e);
}
// wait for the task to complete
indexer.waitUntilTaskCompletes(taskID);
@ -99,15 +168,16 @@ public class ITRealtimeIndexTaskTest extends AbstractIndexerTest
"Real-time generated segments loaded"
);
// run queries on historical nodes
this.queryHelper.testQueriesFromFile(INDEX_QUERIES_RESOURCE, 2);
// queries should be answered by historical
this.queryHelper.testQueriesFromString(getRouterURL(), queryStr, 2);
}
catch (Exception e) {
e.printStackTrace();
Throwables.propagate(e);
}
finally {
unloadAndKillData(INDEX_DATASOURCE);
String first = DateTimeFormat.forPattern("yyyy-MM-dd'T'00:00:00.000'Z'").print(dtFirst);
String last = DateTimeFormat.forPattern("yyyy-MM-dd'T'00:00:00.000'Z'").print(dtFirst.plusDays(1));
unloadAndKillData(INDEX_DATASOURCE, first, last);
}
}
@ -118,13 +188,23 @@ public class ITRealtimeIndexTaskTest extends AbstractIndexerTest
public void postEvents() throws Exception
{
DateTimeZone zone = DateTimeZone.forID("UTC");
final ServerDiscoverySelector eventReceiverSelector = factory.createSelector(EVENT_RECEIVER_SERVICE_NAME);
eventReceiverSelector.start();
BufferedReader reader = null;
InputStreamReader isr = null;
try {
isr = new InputStreamReader(ITRealtimeIndexTaskTest.class.getResourceAsStream(EVENT_DATA_FILE));
}
catch (Exception e) {
Throwables.propagate(e);
}
try {
reader = new BufferedReader(isr);
ServerDiscoveryUtil.waitUntilInstanceReady(eventReceiverSelector, "Event Receiver");
// Access the docker VM mapped host and port instead of service announced in zookeeper
// Use the host from the config file and the port announced in zookeeper
String host = config.getMiddleManagerHost() + ":" + eventReceiverSelector.pick().getPort();
LOG.info("Event Receiver Found at host %s", host);
LOG.info("Event Receiver Found at host [%s]", host);
EventReceiverFirehoseTestClient client = new EventReceiverFirehoseTestClient(
host,
EVENT_RECEIVER_SERVICE_NAME,
@ -132,10 +212,57 @@ public class ITRealtimeIndexTaskTest extends AbstractIndexerTest
httpClient,
smileMapper
);
client.postEventsFromFile(EVENT_DATA_FILE);
// there are 22 lines in the file
int i = 1;
DateTime dt = new DateTime(zone); // timestamp used for sending each event
dtFirst = dt; // timestamp of 1st event
dtLast = dt; // timestamp of last event
String line;
while ((line = reader.readLine()) != null) {
if (i == 15) { // for the 15th line, use a time before the window
dt = dt.minusMinutes(10);
} else if (i == 18) { // use a time 6 seconds ago so it will be out of order
dt = dt.minusSeconds(6);
}
String event = line.replace(TIME_PLACEHOLDER, EVENT_FMT.print(dt));
LOG.info("sending event: [%s]\n", event);
Collection<Map<String, Object>> events = new ArrayList<Map<String, Object>>();
events.add(
(Map<String, Object>) this.jsonMapper.readValue(
event, new TypeReference<Map<String, Object>>()
{
}
)
);
int eventsPosted = client.postEvents(events, this.jsonMapper, MediaType.APPLICATION_JSON);
if (eventsPosted != events.size()) {
throw new ISE("Event not posted");
}
try {
Thread.sleep(DELAY_BETWEEN_EVENTS_SECS * 1000);
}
catch (InterruptedException ex) { /* nothing */ }
dtLast = dt;
dt = new DateTime(zone);
i++;
}
}
catch (Exception e) {
Throwables.propagate(e);
}
finally {
reader.close();
eventReceiverSelector.stop();
}
}
private String getRouterURL()
{
return String.format(
"http://%s/druid/v2?pretty",
config.getRouterHost()
);
}
}

View File

@ -42,7 +42,7 @@ import java.util.concurrent.TimeUnit;
public class ITUnionQueryTest extends AbstractIndexerTest
{
private static final Logger LOG = new Logger(ITUnionQueryTest.class);
private static final String REALTIME_TASK_RESOURCE = "/indexer/wikipedia_realtime_index_task.json";
private static final String UNION_TASK_RESOURCE = "/indexer/wikipedia_union_index_task.json";
private static final String EVENT_RECEIVER_SERVICE_PREFIX = "eventReceiverServiceName";
private static final String UNION_DATA_FILE = "/indexer/wikipedia_index_data.json";
private static final String UNION_QUERIES_RESOURCE = "/indexer/union_queries.json";
@ -66,7 +66,7 @@ public class ITUnionQueryTest extends AbstractIndexerTest
try {
// Load 4 datasources with same dimensions
String task = setShutOffTime(
getTaskAsString(REALTIME_TASK_RESOURCE),
getTaskAsString(UNION_TASK_RESOURCE),
new DateTime(System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(3))
);
List<String> taskIDs = Lists.newArrayList();

View File

@ -20,7 +20,7 @@ package io.druid.tests.query;
import com.google.inject.Inject;
import io.druid.testing.clients.CoordinatorResourceTestClient;
import io.druid.testing.guice.DruidTestModuleFactory;
import io.druid.testing.utils.FromFileTestQueryHelper;
import io.druid.testing.utils.TestQueryHelper;
import io.druid.testing.utils.RetryUtil;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Guice;
@ -36,12 +36,12 @@ public class ITTwitterQueryTest
@Inject
CoordinatorResourceTestClient coordinatorClient;
@Inject
private FromFileTestQueryHelper queryHelper;
private TestQueryHelper queryHelper;
@BeforeMethod
public void before()
{
// ensure that the segments twitter segments are loaded completely
// ensure that the twitter segments are loaded completely
RetryUtil.retryUntilTrue(
new Callable<Boolean>()
{

View File

@ -20,7 +20,7 @@ package io.druid.tests.query;
import com.google.inject.Inject;
import io.druid.testing.clients.CoordinatorResourceTestClient;
import io.druid.testing.guice.DruidTestModuleFactory;
import io.druid.testing.utils.FromFileTestQueryHelper;
import io.druid.testing.utils.TestQueryHelper;
import io.druid.testing.utils.RetryUtil;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Guice;
@ -36,12 +36,12 @@ public class ITWikipediaQueryTest
@Inject
private CoordinatorResourceTestClient coordinatorClient;
@Inject
private FromFileTestQueryHelper queryHelper;
private TestQueryHelper queryHelper;
@BeforeMethod
public void before()
{
// ensure that twitter segments are loaded completely
// ensure that wikipedia segments are loaded completely
RetryUtil.retryUntilTrue(
new Callable<Boolean>()
{

View File

@ -0,0 +1,22 @@
{"timestamp": "YYYY-MM-DDTHH:MM:SSZ", "page": "Gypsy Danger", "language" : "en", "user" : "nuclear", "unpatrolled" : "true", "newPage" : "true", "robot": "false", "anonymous": "false", "namespace":"article", "continent":"North America", "country":"United States", "region":"Bay Area", "city":"San Francisco", "added": 1, "deleted": 1, "delta": 0},
{"timestamp": "YYYY-MM-DDTHH:MM:SSZ", "page": "Striker Eureka", "language" : "en", "user" : "speed", "unpatrolled" : "false", "newPage" : "true", "robot": "true", "anonymous": "false", "namespace":"wikipedia", "continent":"Australia", "country":"Australia", "region":"Cantebury", "city":"Syndey", "added": 3, "deleted": 3, "delta": 0},
{"timestamp": "YYYY-MM-DDTHH:MM:SSZ", "page": "Cherno Alpha", "language" : "ru", "user" : "masterYi", "unpatrolled" : "false", "newPage" : "true", "robot": "true", "anonymous": "false", "namespace":"article", "continent":"Asia", "country":"Russia", "region":"Oblast", "city":"Moscow", "added": 5, "deleted": 5, "delta": 0},
{"timestamp": "YYYY-MM-DDTHH:MM:SSZ", "page": "Gypsy Danger", "language" : "en", "user" : "nuclear", "unpatrolled" : "true", "newPage" : "true", "robot": "false", "anonymous": "false", "namespace":"article", "continent":"North America", "country":"United States", "region":"Bay Area", "city":"San Francisco", "added": 7, "deleted": 3, "delta": 4}
{"timestamp": "YYYY-MM-DDTHH:MM:SSZ", "page": "Coyote Tango", "language" : "ja", "user" : "stringer", "unpatrolled" : "true", "newPage" : "false", "robot": "true", "anonymous": "false", "namespace":"wikipedia", "continent":"Asia", "country":"Japan", "region":"Kanto", "city":"Tokyo", "added": 11, "deleted": 11, "delta": 0},
{"timestamp": "YYYY-MM-DDTHH:MM:SSZ", "page": "Gypsy Danger", "language" : "en", "user" : "nuclear", "unpatrolled" : "true", "newPage" : "true", "robot": "false", "anonymous": "false", "namespace":"article", "continent":"North America", "country":"United States", "region":"Bay Area", "city":"San Francisco", "added": 13, "deleted": 13, "delta": 0},
{"timestamp": "YYYY-MM-DDTHH:MM:SSZ", "page": "Gypsy Danger", "language" : "en", "user" : "nuclear", "unpatrolled" : "true", "newPage" : "true", "robot": "false", "anonymous": "false", "namespace":"article", "continent":"North America", "country":"United States", "region":"Bay Area", "city":"San Francisco", "added": 17, "deleted": 17, "delta": 0},
{"timestamp": "YYYY-MM-DDTHH:MM:SSZ", "page": "Striker Eureka", "language" : "en", "user" : "speed", "unpatrolled" : "false", "newPage" : "true", "robot": "true", "anonymous": "false", "namespace":"wikipedia", "continent":"Australia", "country":"Australia", "region":"Cantebury", "city":"Syndey", "added": 19, "deleted": 19, "delta": 0},
{"timestamp": "YYYY-MM-DDTHH:MM:SSZ", "page": "Cherno Alpha", "language" : "ru", "user" : "masterYi", "unpatrolled" : "false", "newPage" : "true", "robot": "true", "anonymous": "false", "namespace":"article", "continent":"Asia", "country":"Russia", "region":"Oblast", "city":"Moscow", "added": 23, "deleted": 23, "delta": 0},
{"timestamp": "YYYY-MM-DDTHH:MM:SSZ", "page": "Striker Eureka", "language" : "en", "user" : "speed", "unpatrolled" : "false", "newPage" : "true", "robot": "true", "anonymous": "false", "namespace":"wikipedia", "continent":"Australia", "country":"Australia", "region":"Cantebury", "city":"Syndey", "added": 29, "deleted": 31, "delta": -1}
{"timestamp": "YYYY-MM-DDTHH:MM:SSZ", "page": "Coyote Tango", "language" : "ja", "user" : "stringer", "unpatrolled" : "true", "newPage" : "false", "robot": "true", "anonymous": "false", "namespace":"wikipedia", "continent":"Asia", "country":"Japan", "region":"Kanto", "city":"Tokyo", "added": 31, "deleted": 31, "delta": 0},
{"timestamp": "YYYY-MM-DDTHH:MM:SSZ", "page": "Gypsy Danger", "language" : "en", "user" : "nuclear", "unpatrolled" : "true", "newPage" : "true", "robot": "false", "anonymous": "false", "namespace":"article", "continent":"North America", "country":"United States", "region":"Bay Area", "city":"San Francisco", "added": 37, "deleted": 37, "delta": 0},
{"timestamp": "YYYY-MM-DDTHH:MM:SSZ", "page": "Gypsy Danger", "language" : "en", "user" : "nuclear", "unpatrolled" : "true", "newPage" : "true", "robot": "false", "anonymous": "false", "namespace":"article", "continent":"North America", "country":"United States", "region":"Bay Area", "city":"San Francisco", "added": 41, "deleted": 41, "delta": 0},
{"timestamp": "YYYY-MM-DDTHH:MM:SSZ", "page": "Striker Eureka", "language" : "en", "user" : "speed", "unpatrolled" : "false", "newPage" : "true", "robot": "true", "anonymous": "false", "namespace":"wikipedia", "continent":"Australia", "country":"Australia", "region":"Cantebury", "city":"Syndey", "added": 43, "deleted": 43, "delta": 0},
{"timestamp": "YYYY-MM-DDTHH:MM:SSZ", "page": "Cherno Alpha", "language" : "ru", "user" : "masterYi", "unpatrolled" : "false", "newPage" : "true", "robot": "true", "anonymous": "false", "namespace":"article", "continent":"Asia", "country":"Russia", "region":"Oblast", "city":"Moscow", "added": 47, "deleted": 47, "delta": 0},
{"timestamp": "YYYY-MM-DDTHH:MM:SSZ", "page": "Crimson Typhoon", "language" : "zh", "user" : "triplets", "unpatrolled" : "true", "newPage" : "false", "robot": "true", "anonymous": "false", "namespace":"wikipedia", "continent":"Asia", "country":"China", "region":"Shanxi", "city":"Taiyuan", "added": 905, "deleted": 5, "delta": 900}
{"timestamp": "YYYY-MM-DDTHH:MM:SSZ", "page": "Coyote Tango", "language" : "ja", "user" : "stringer", "unpatrolled" : "true", "newPage" : "false", "robot": "true", "anonymous": "false", "namespace":"wikipedia", "continent":"Asia", "country":"Japan", "region":"Kanto", "city":"Tokyo", "added": 59, "deleted": 59, "delta": 0},
{"timestamp": "YYYY-MM-DDTHH:MM:SSZ", "page": "Gypsy Danger", "language" : "en", "user" : "nuclear", "unpatrolled" : "true", "newPage" : "true", "robot": "false", "anonymous": "false", "namespace":"article", "continent":"North America", "country":"United States", "region":"Bay Area", "city":"San Francisco", "added": 61, "deleted": 61, "delta": 0},
{"timestamp": "YYYY-MM-DDTHH:MM:SSZ", "page": "Gypsy Danger", "language" : "en", "user" : "nuclear", "unpatrolled" : "true", "newPage" : "true", "robot": "false", "anonymous": "false", "namespace":"article", "continent":"North America", "country":"United States", "region":"Bay Area", "city":"San Francisco", "added": 67, "deleted": 67, "delta": 0},
{"timestamp": "YYYY-MM-DDTHH:MM:SSZ", "page": "Gypsy Danger", "language" : "en", "user" : "nuclear", "unpatrolled" : "true", "newPage" : "true", "robot": "false", "anonymous": "false", "namespace":"article", "continent":"North America", "country":"United States", "region":"Bay Area", "city":"San Francisco", "added": 71, "deleted": 71, "delta": 0},
{"timestamp": "YYYY-MM-DDTHH:MM:SSZ", "page": "Striker Eureka", "language" : "en", "user" : "speed", "unpatrolled" : "false", "newPage" : "true", "robot": "true", "anonymous": "false", "namespace":"wikipedia", "continent":"Australia", "country":"Australia", "region":"Cantebury", "city":"Syndey", "added": 73, "deleted": 73, "delta": 0},
{"timestamp": "YYYY-MM-DDTHH:MM:SSZ", "page": "Cherno Alpha", "language" : "ru", "user" : "masterYi", "unpatrolled" : "false", "newPage" : "true", "robot": "true", "anonymous": "false", "namespace":"article", "continent":"Asia", "country":"Russia", "region":"Oblast", "city":"Moscow", "added": 79, "deleted": 79, "delta": 0}

View File

@ -0,0 +1,87 @@
[
{
"description": "timeBoundary",
"query": {
"queryType":"timeBoundary",
"dataSource":"wikipedia_index_test"
},
"expectedResults":[
{
"timestamp":"%%TIMEBOUNDARY_RESPONSE_TIMESTAMP%%",
"result": {
"maxTime" : "%%TIMEBOUNDARY_RESPONSE_MAXTIME%%",
"minTime":"%%TIMEBOUNDARY_RESPONSE_MINTIME%%"
}
}
]
},
{
"description": "timeseries",
"query": {
"queryType": "timeseries",
"dataSource": "wikipedia_index_test",
"intervals": [ "%%TIMESERIES_QUERY_START%%/%%TIMESERIES_QUERY_END%%" ],
"granularity": "all",
"aggregations": [
{"type": "longSum", "fieldName": "count", "name": "edit_count"},
{"type": "doubleSum", "fieldName": "added", "name": "chars_added"}
]
},
"expectedResults": [
{
"timestamp" : "%%TIMESERIES_RESPONSE_TIMESTAMP%%",
"result" : {
"chars_added" : 1595.0,
"edit_count" : 21
}
}
]
},
{
"description":"having spec on post aggregation",
"query":{
"queryType":"groupBy",
"dataSource":"wikipedia_index_test",
"granularity":"minute",
"dimensions":[
"page"
],
"aggregations":[
{
"type":"count",
"name":"rows"
},
{
"type":"longSum",
"fieldName":"added",
"name":"added_count"
}
],
"postAggregations": [
{
"type":"arithmetic",
"name":"added_count_times_ten",
"fn":"*",
"fields":[
{"type":"fieldAccess", "name":"added_count", "fieldName":"added_count"},
{"type":"constant", "name":"const", "value":10}
]
}
],
"having":{"type":"greaterThan", "aggregation":"added_count_times_ten", "value":9000},
"intervals":[
"%%POST_AG_REQUEST_START%%/%%POST_AG_REQUEST_END%%"
]
},
"expectedResults":[ {
"version" : "v1",
"timestamp" : "%%POST_AG_RESPONSE_TIMESTAMP%%",
"event" : {
"added_count_times_ten" : 9050.0,
"page" : "Crimson Typhoon",
"added_count" : 905,
"rows" : 1
}
} ]
}
]

View File

@ -25,7 +25,7 @@
}
],
"granularitySpec": {
"segmentGranularity": "DAY",
"segmentGranularity": "minute",
"queryGranularity": "second"
},
"parser": {
@ -90,7 +90,7 @@
"intermediatePersistPeriod": "PT1M",
"windowPeriod": "PT1M",
"rejectionPolicy": {
"type": "none"
"type": "serverTime"
}
}
}

View File

@ -0,0 +1,97 @@
{
"type": "index_realtime",
"spec": {
"dataSchema": {
"dataSource": "wikipedia_index_test",
"metricsSpec": [
{
"type": "count",
"name": "count"
},
{
"type": "doubleSum",
"name": "added",
"fieldName": "added"
},
{
"type": "doubleSum",
"name": "deleted",
"fieldName": "deleted"
},
{
"type": "doubleSum",
"name": "delta",
"fieldName": "delta"
}
],
"granularitySpec": {
"segmentGranularity": "DAY",
"queryGranularity": "second"
},
"parser": {
"type": "map",
"parseSpec": {
"columns": [
"timestamp",
"page",
"language",
"user",
"unpatrolled",
"newPage",
"robot",
"anonymous",
"namespace",
"continent",
"country",
"region",
"city",
"added",
"deleted",
"delta"
],
"timestampSpec": {
"column": "timestamp",
"format": "iso"
},
"dimensionsSpec": {
"dimensions": [
"page",
"language",
"user",
"unpatrolled",
"newPage",
"robot",
"anonymous",
"namespace",
"continent",
"country",
"region",
"city"
]
}
}
}
},
"ioConfig": {
"type": "realtime",
"firehose": {
"type": "timed",
"shutoffTime": "#SHUTOFFTIME",
"delegate": {
"type": "receiver",
"serviceName": "eventReceiverServiceName",
"bufferSize": 100000
}
}
},
"tuningConfig": {
"type": "realtime",
"maxRowsInMemory": 1,
"intermediatePersistPeriod": "PT1M",
"windowPeriod": "PT1M",
"rejectionPolicy": {
"type": "none"
}
}
}
}

View File

@ -373,7 +373,7 @@
"context": {
"useCache": "true",
"populateCache": "true",
"timeout": 60000
"timeout": 180000
}
},
"expectedResults": [
@ -488,7 +488,7 @@
"context": {
"useCache": "true",
"populateCache": "true",
"timeout": 60000
"timeout": 120000
}
},
"expectedResults": [
@ -609,7 +609,7 @@
"context": {
"useCache": "true",
"populateCache": "true",
"timeout": 60000
"timeout": 120000
}
},
"expectedResults": [