Wait until all data sources are ready for querying in ITUnionQueryTest (#4362)

This commit is contained in:
Jihoon Son 2017-06-04 09:02:12 +09:00 committed by Fangjin Yang
parent ebabe14fbe
commit ba816063cb
3 changed files with 47 additions and 7 deletions

View File

@ -21,10 +21,15 @@ package io.druid.testing.utils;
import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.inject.Inject; import com.google.inject.Inject;
import io.druid.java.util.common.ISE; import io.druid.java.util.common.ISE;
import io.druid.java.util.common.granularity.Granularities;
import io.druid.java.util.common.logger.Logger; import io.druid.java.util.common.logger.Logger;
import io.druid.query.Druids;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.LongSumAggregatorFactory;
import io.druid.query.timeseries.TimeseriesQuery;
import io.druid.testing.IntegrationTestingConfig; import io.druid.testing.IntegrationTestingConfig;
import io.druid.testing.clients.QueryResourceTestClient; import io.druid.testing.clients.QueryResourceTestClient;
@ -118,4 +123,27 @@ public class TestQueryHelper
{ {
return String.format("%s/druid/v2?pretty", broker); return String.format("%s/druid/v2?pretty", broker);
} }
@SuppressWarnings("unchecked")
public int countRows(String dataSource, String interval)
{
TimeseriesQuery query = Druids.newTimeseriesQueryBuilder()
.dataSource(dataSource)
.aggregators(
ImmutableList.<AggregatorFactory>of(
new LongSumAggregatorFactory("rows", "count")
)
).granularity(Granularities.ALL)
.intervals(interval)
.build();
List<Map<String, Object>> results = queryClient.query(getBrokerURL(), query);
if (results.isEmpty()) {
return 0;
} else {
Map<String, Object> map = (Map<String, Object>) results.get(0).get("result");
return (Integer) map.get("rows");
}
}
} }

View File

@ -21,7 +21,6 @@ package io.druid.tests.indexer;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import com.google.inject.Inject; import com.google.inject.Inject;
import io.druid.java.util.common.logger.Logger; import io.druid.java.util.common.logger.Logger;
import io.druid.testing.IntegrationTestingConfig; import io.druid.testing.IntegrationTestingConfig;
import io.druid.testing.clients.ClientInfoResourceTestClient; import io.druid.testing.clients.ClientInfoResourceTestClient;
@ -66,7 +65,7 @@ public class ITIndexerTest extends AbstractIndexerTest
Assert.assertFalse("dimensions : " + dimensions, dimensions.contains("robot")); Assert.assertFalse("dimensions : " + dimensions, dimensions.contains("robot"));
} }
catch (Exception e) { catch (Exception e) {
e.printStackTrace(); LOG.error(e, "Error while testing");
throw Throwables.propagate(e); throw Throwables.propagate(e);
} }
finally { finally {

View File

@ -86,8 +86,21 @@ public class ITUnionQueryTest extends AbstractIndexerTest
postEvents(i); postEvents(i);
} }
// sleep for a while to let the events ingested // wait until all events are ingested
TimeUnit.SECONDS.sleep(5); RetryUtil.retryUntil(
() -> {
for (int i = 0; i < numTasks; i++) {
if (queryHelper.countRows(UNION_DATASOURCE + i, "2013-08-31/2013-09-01") < 5) {
return false;
}
}
return true;
},
true,
1000,
100,
"Waiting all events are ingested"
);
// should hit the queries on realtime task // should hit the queries on realtime task
LOG.info("Running Union Queries.."); LOG.info("Running Union Queries..");
@ -110,7 +123,7 @@ public class ITUnionQueryTest extends AbstractIndexerTest
} }
}, },
true, true,
60000, 10000,
10, 10,
"Real-time generated segments loaded" "Real-time generated segments loaded"
); );
@ -120,7 +133,7 @@ public class ITUnionQueryTest extends AbstractIndexerTest
} }
catch (Exception e) { catch (Exception e) {
e.printStackTrace(); LOG.error(e, "Error while testing");
throw Throwables.propagate(e); throw Throwables.propagate(e);
} }
finally { finally {