Wait for datasource to be ready for SQL in integration tests (#12189)

* Wait for datasource to be ready for SQL in integration tests

* add limit to the check query
This commit is contained in:
Jihoon Son 2022-01-25 10:14:26 -08:00 committed by GitHub
parent 2b32d86f3b
commit 20347e0c86
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 93 additions and 30 deletions

View File

@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.testing.utils;
import com.google.inject.Inject;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.testing.clients.CoordinatorResourceTestClient;
public final class DataLoaderHelper
{
@Inject
private SqlTestQueryHelper sqlTestQueryHelper;
@Inject
private CoordinatorResourceTestClient coordinator;
public void waitUntilDatasourceIsReady(String datasource)
{
ITRetryUtil.retryUntilTrue(
() -> coordinator.areSegmentsLoaded(datasource),
StringUtils.format(
"Segment Load for datasource [%s]",
datasource
)
);
ITRetryUtil.retryUntilTrue(
() -> sqlTestQueryHelper.isDatasourceLoadedInSQL(datasource),
StringUtils.format("Waiting for [%s] to be ready for SQL queries", datasource)
);
}
}

View File

@ -22,11 +22,14 @@ package org.apache.druid.testing.utils;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.inject.Inject;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.sql.http.SqlQuery;
import org.apache.druid.testing.IntegrationTestingConfig;
import org.apache.druid.testing.clients.SqlResourceTestClient;
public class SqlTestQueryHelper extends AbstractTestQueryHelper<SqlQueryWithResults>
{
private static final Logger LOG = new Logger(SqlTestQueryHelper.class);
@Inject
public SqlTestQueryHelper(
@ -43,4 +46,27 @@ public class SqlTestQueryHelper extends AbstractTestQueryHelper<SqlQueryWithResu
{
return StringUtils.format("%s/druid/v2/sql", schemeAndHost);
}
public boolean isDatasourceLoadedInSQL(String datasource)
{
final SqlQuery query = new SqlQuery(
"SELECT 1 FROM \"" + datasource + "\" LIMIT 1",
null,
false,
false,
false,
null,
null
);
try {
//noinspection unchecked
queryClient.query(getQueryURL(broker), query);
return true;
}
catch (Exception e) {
LOG.debug(e, "Check query failed");
return false;
}
}
}

View File

@ -31,6 +31,7 @@ import org.apache.druid.testing.IntegrationTestingConfig;
import org.apache.druid.testing.clients.CoordinatorResourceTestClient;
import org.apache.druid.testing.guice.DruidTestModuleFactory;
import org.apache.druid.testing.guice.TestClient;
import org.apache.druid.testing.utils.DataLoaderHelper;
import org.apache.druid.testing.utils.ITRetryUtil;
import org.apache.druid.testing.utils.SqlTestQueryHelper;
import org.apache.druid.tests.TestNGGroup;
@ -59,6 +60,9 @@ public class ITBroadcastJoinQueryTest extends AbstractIndexerTest
@Inject
SqlTestQueryHelper queryHelper;
@Inject
DataLoaderHelper dataLoaderHelper;
@Inject
@TestClient
HttpClient httpClient;
@ -94,9 +98,7 @@ public class ITBroadcastJoinQueryTest extends AbstractIndexerTest
String taskJson = replaceJoinTemplate(getResourceAsString(BROADCAST_JOIN_TASK), BROADCAST_JOIN_DATASOURCE);
indexer.submitTask(taskJson);
ITRetryUtil.retryUntilTrue(
() -> coordinatorClient.areSegmentsLoaded(BROADCAST_JOIN_DATASOURCE), "broadcast segment load"
);
dataLoaderHelper.waitUntilDatasourceIsReady(BROADCAST_JOIN_DATASOURCE);
// query metadata until druid schema is refreshed and datasource is available joinable
ITRetryUtil.retryUntilTrue(

View File

@ -26,9 +26,8 @@ import org.apache.druid.https.SSLClientConfig;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.testing.IntegrationTestingConfig;
import org.apache.druid.testing.clients.CoordinatorResourceTestClient;
import org.apache.druid.testing.guice.DruidTestModuleFactory;
import org.apache.druid.testing.utils.ITRetryUtil;
import org.apache.druid.testing.utils.DataLoaderHelper;
import org.apache.druid.tests.TestNGGroup;
import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
@ -77,7 +76,7 @@ public class ITJdbcQueryTest
SSLClientConfig sslConfig;
@Inject
private CoordinatorResourceTestClient coordinatorClient;
private DataLoaderHelper dataLoaderHelper;
@BeforeMethod
public void before()
@ -108,9 +107,7 @@ public class ITJdbcQueryTest
)
};
// ensure that wikipedia segments are loaded completely
ITRetryUtil.retryUntilTrue(
() -> coordinatorClient.areSegmentsLoaded(WIKIPEDIA_DATA_SOURCE), "wikipedia segment load"
);
dataLoaderHelper.waitUntilDatasourceIsReady(WIKIPEDIA_DATA_SOURCE);
}
@Test

View File

@ -24,9 +24,8 @@ import com.google.inject.Inject;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.server.coordination.ServerManagerForQueryErrorTest;
import org.apache.druid.testing.clients.CoordinatorResourceTestClient;
import org.apache.druid.testing.guice.DruidTestModuleFactory;
import org.apache.druid.testing.utils.ITRetryUtil;
import org.apache.druid.testing.utils.DataLoaderHelper;
import org.apache.druid.testing.utils.SqlTestQueryHelper;
import org.apache.druid.testing.utils.TestQueryHelper;
import org.apache.druid.tests.TestNGGroup;
@ -69,7 +68,7 @@ public class ITQueryErrorTest
private static final String SQL_PLAN_FAILURE_RESOURCE = "/queries/sql_plan_failure_query.json";
@Inject
private CoordinatorResourceTestClient coordinatorClient;
private DataLoaderHelper dataLoaderHelper;
@Inject
private TestQueryHelper queryHelper;
@Inject
@ -81,9 +80,7 @@ public class ITQueryErrorTest
public void before()
{
// ensure that wikipedia segments are loaded completely
ITRetryUtil.retryUntilTrue(
() -> coordinatorClient.areSegmentsLoaded(WIKIPEDIA_DATA_SOURCE), "wikipedia segment load"
);
dataLoaderHelper.waitUntilDatasourceIsReady(WIKIPEDIA_DATA_SOURCE);
}
@Test(expectedExceptions = {RuntimeException.class}, expectedExceptionsMessageRegExp = "(?s).*400.*")

View File

@ -30,10 +30,9 @@ import org.apache.druid.query.QueryException;
import org.apache.druid.query.QueryInterruptedException;
import org.apache.druid.sql.http.SqlQuery;
import org.apache.druid.testing.IntegrationTestingConfig;
import org.apache.druid.testing.clients.CoordinatorResourceTestClient;
import org.apache.druid.testing.clients.SqlResourceTestClient;
import org.apache.druid.testing.guice.DruidTestModuleFactory;
import org.apache.druid.testing.utils.ITRetryUtil;
import org.apache.druid.testing.utils.DataLoaderHelper;
import org.apache.druid.testing.utils.SqlTestQueryHelper;
import org.apache.druid.tests.TestNGGroup;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
@ -61,7 +60,7 @@ public class ITSqlCancelTest
private static final int NUM_QUERIES = 3;
@Inject
private CoordinatorResourceTestClient coordinatorClient;
private DataLoaderHelper dataLoaderHelper;
@Inject
private SqlTestQueryHelper sqlHelper;
@Inject
@ -75,9 +74,7 @@ public class ITSqlCancelTest
public void before()
{
// ensure that wikipedia segments are loaded completely
ITRetryUtil.retryUntilTrue(
() -> coordinatorClient.areSegmentsLoaded(WIKIPEDIA_DATA_SOURCE), "wikipedia segment load"
);
dataLoaderHelper.waitUntilDatasourceIsReady(WIKIPEDIA_DATA_SOURCE);
}
@Test

View File

@ -21,9 +21,8 @@ package org.apache.druid.tests.query;
import com.google.inject.Inject;
import org.apache.druid.testing.IntegrationTestingConfig;
import org.apache.druid.testing.clients.CoordinatorResourceTestClient;
import org.apache.druid.testing.guice.DruidTestModuleFactory;
import org.apache.druid.testing.utils.ITRetryUtil;
import org.apache.druid.testing.utils.DataLoaderHelper;
import org.apache.druid.testing.utils.SqlTestQueryHelper;
import org.apache.druid.tests.TestNGGroup;
import org.testng.annotations.BeforeMethod;
@ -39,7 +38,7 @@ public class ITSystemTableQueryTest
private static final String SYSTEM_QUERIES_RESOURCE = "/queries/sys_queries.json";
@Inject
CoordinatorResourceTestClient coordinatorClient;
DataLoaderHelper dataLoaderHelper;
@Inject
private SqlTestQueryHelper queryHelper;
@Inject
@ -49,14 +48,10 @@ public class ITSystemTableQueryTest
public void before()
{
// ensure that wikipedia segments are loaded completely
ITRetryUtil.retryUntilTrue(
() -> coordinatorClient.areSegmentsLoaded(WIKIPEDIA_DATA_SOURCE), "wikipedia segment load"
);
dataLoaderHelper.waitUntilDatasourceIsReady(WIKIPEDIA_DATA_SOURCE);
// ensure that the twitter segments are loaded completely
ITRetryUtil.retryUntilTrue(
() -> coordinatorClient.areSegmentsLoaded(TWITTER_DATA_SOURCE), "twitter segment load"
);
dataLoaderHelper.waitUntilDatasourceIsReady(TWITTER_DATA_SOURCE);
}
@Test