diff --git a/integration-tests/src/main/java/io/druid/testing/utils/TestQueryHelper.java b/integration-tests/src/main/java/io/druid/testing/utils/TestQueryHelper.java index f0369d31bc5..bb00c7dcf70 100644 --- a/integration-tests/src/main/java/io/druid/testing/utils/TestQueryHelper.java +++ b/integration-tests/src/main/java/io/druid/testing/utils/TestQueryHelper.java @@ -21,10 +21,15 @@ package io.druid.testing.utils; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableList; import com.google.inject.Inject; - import io.druid.java.util.common.ISE; +import io.druid.java.util.common.granularity.Granularities; import io.druid.java.util.common.logger.Logger; +import io.druid.query.Druids; +import io.druid.query.aggregation.AggregatorFactory; +import io.druid.query.aggregation.LongSumAggregatorFactory; +import io.druid.query.timeseries.TimeseriesQuery; import io.druid.testing.IntegrationTestingConfig; import io.druid.testing.clients.QueryResourceTestClient; @@ -118,4 +123,27 @@ public class TestQueryHelper { return String.format("%s/druid/v2?pretty", broker); } + + @SuppressWarnings("unchecked") + public int countRows(String dataSource, String interval) + { + TimeseriesQuery query = Druids.newTimeseriesQueryBuilder() + .dataSource(dataSource) + .aggregators( + ImmutableList.of( + new LongSumAggregatorFactory("rows", "count") + ) + ).granularity(Granularities.ALL) + .intervals(interval) + .build(); + + List> results = queryClient.query(getBrokerURL(), query); + if (results.isEmpty()) { + return 0; + } else { + Map map = (Map) results.get(0).get("result"); + + return (Integer) map.get("rows"); + } + } } diff --git a/integration-tests/src/test/java/io/druid/tests/indexer/ITIndexerTest.java b/integration-tests/src/test/java/io/druid/tests/indexer/ITIndexerTest.java index ef5328fc3db..defa334251c 100644 --- a/integration-tests/src/test/java/io/druid/tests/indexer/ITIndexerTest.java +++ b/integration-tests/src/test/java/io/druid/tests/indexer/ITIndexerTest.java @@ -21,7 +21,6 @@ package io.druid.tests.indexer; import com.google.common.base.Throwables; import com.google.inject.Inject; - import io.druid.java.util.common.logger.Logger; import io.druid.testing.IntegrationTestingConfig; import io.druid.testing.clients.ClientInfoResourceTestClient; @@ -66,7 +65,7 @@ public class ITIndexerTest extends AbstractIndexerTest Assert.assertFalse("dimensions : " + dimensions, dimensions.contains("robot")); } catch (Exception e) { - e.printStackTrace(); + LOG.error(e, "Error while testing"); throw Throwables.propagate(e); } finally { diff --git a/integration-tests/src/test/java/io/druid/tests/indexer/ITUnionQueryTest.java b/integration-tests/src/test/java/io/druid/tests/indexer/ITUnionQueryTest.java index e3ca8726df6..dbdc49f0107 100644 --- a/integration-tests/src/test/java/io/druid/tests/indexer/ITUnionQueryTest.java +++ b/integration-tests/src/test/java/io/druid/tests/indexer/ITUnionQueryTest.java @@ -86,8 +86,21 @@ public class ITUnionQueryTest extends AbstractIndexerTest postEvents(i); } - // sleep for a while to let the events ingested - TimeUnit.SECONDS.sleep(5); + // wait until all events are ingested + RetryUtil.retryUntil( + () -> { + for (int i = 0; i < numTasks; i++) { + if (queryHelper.countRows(UNION_DATASOURCE + i, "2013-08-31/2013-09-01") < 5) { + return false; + } + } + return true; + }, + true, + 1000, + 100, + "Waiting all events are ingested" + ); // should hit the queries on realtime task LOG.info("Running Union Queries.."); @@ -110,7 +123,7 @@ public class ITUnionQueryTest extends AbstractIndexerTest } }, true, - 60000, + 10000, 10, "Real-time generated segments loaded" ); @@ -120,7 +133,7 @@ public class ITUnionQueryTest extends AbstractIndexerTest } catch (Exception e) { - e.printStackTrace(); + LOG.error(e, "Error while testing"); throw Throwables.propagate(e); } finally {