Add sleep function for testing (#11626)

* Add sleep function for testing

* sql function

* javadoc
This commit is contained in:
Jihoon Son 2021-08-24 00:30:31 -07:00 committed by GitHub
parent 59e560e24d
commit 78b4be467e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 179 additions and 1 deletions

View File

@ -19,6 +19,7 @@
package org.apache.druid.math.expr;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.java.util.common.DateTimes;
@ -27,6 +28,8 @@ import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.RE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.UOE;
import org.apache.druid.math.expr.Expr.InputBindingInspector;
import org.apache.druid.math.expr.Expr.ObjectBinding;
import org.apache.druid.math.expr.vector.CastToTypeVectorProcessor;
import org.apache.druid.math.expr.vector.ExprVectorProcessor;
import org.apache.druid.math.expr.vector.VectorMathProcessors;
@ -3639,4 +3642,56 @@ public interface Function
return HumanReadableBytes.UnitSystem.DECIMAL;
}
}
/**
* This function makes the current thread sleep for the given amount of seconds.
* Fractional-second delays can be specified.
*
* This function is applied per row. The actual query time can vary depending on how much parallelism is used
* for the query. As it does not provide consistent sleep time, this function should be used only for testing
* when you want to keep a certain query running during the test.
*/
@VisibleForTesting
class Sleep implements Function
{
@Override
public String name()
{
return "sleep";
}
@Override
public ExprEval apply(List<Expr> args, ObjectBinding bindings)
{
ExprEval eval = args.get(0).eval(bindings);
try {
if (!eval.isNumericNull()) {
double seconds = eval.asDouble();
if (seconds > 0) {
Thread.sleep((long) (seconds * 1000));
}
}
return ExprEval.of(null);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
}
@Override
public void validateArguments(List<Expr> args)
{
if (args.size() != 1) {
throw new IAE("Function[%s] needs 1 argument", name());
}
}
@Nullable
@Override
public ExprType getOutputType(InputBindingInspector inspector, List<Expr> args)
{
return ExprType.STRING;
}
}
}

View File

@ -783,6 +783,36 @@ public class FunctionTest extends InitializedNullHandlingTest
assertExpr("repeat(nonexistent, 10)", null);
}
@Test
public void testSleep()
{
assertExpr("sleep(1)", null);
assertExpr("sleep(0.5)", null);
assertExpr("sleep(null)", null);
assertExpr("sleep(0)", null);
assertExpr("sleep(-1)", null);
assertTimeElapsed("sleep(1)", 1000);
assertTimeElapsed("sleep(0.5)", 500);
assertTimeElapsed("sleep(null)", 0);
assertTimeElapsed("sleep(0)", 0);
assertTimeElapsed("sleep(-1)", 0);
}
private void assertTimeElapsed(String expression, long expectedTimeElapsedMs)
{
final long detla = 50;
final long before = System.currentTimeMillis();
final Expr expr = Parser.parse(expression, ExprMacroTable.nil());
expr.eval(bindings).value();
final long after = System.currentTimeMillis();
final long elapsed = after - before;
Assert.assertTrue(
StringUtils.format("Expected [%s], but actual elapsed was [%s]", expectedTimeElapsedMs, elapsed),
elapsed >= expectedTimeElapsedMs
&& elapsed < expectedTimeElapsedMs + detla
);
}
private void assertExpr(final String expression, @Nullable final Object expectedResult)
{
@ -793,7 +823,6 @@ public class FunctionTest extends InitializedNullHandlingTest
final Expr roundTrip = Parser.parse(exprNoFlatten.stringify(), ExprMacroTable.nil());
Assert.assertEquals(expr.stringify(), expectedResult, roundTrip.eval(bindings).value());
final Expr roundTripFlatten = Parser.parse(expr.stringify(), ExprMacroTable.nil());
Assert.assertEquals(expr.stringify(), expectedResult, roundTripFlatten.eval(bindings).value());

View File

@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.sql.calcite.expression.builtin;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.SqlFunction;
import org.apache.calcite.sql.SqlFunctionCategory;
import org.apache.calcite.sql.SqlOperator;
import org.apache.calcite.sql.type.SqlTypeFamily;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.sql.calcite.expression.DruidExpression;
import org.apache.druid.sql.calcite.expression.OperatorConversions;
import org.apache.druid.sql.calcite.expression.SqlOperatorConversion;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import javax.annotation.Nullable;
/**
* A SQL operator conversion for the {@link org.apache.druid.math.expr.Function.Sleep} expression.
* The expression is currently evaluated during the query planning when the given argument is a number literal.
*/
public class SleepOperatorConversion implements SqlOperatorConversion
{
private static final SqlFunction SQL_FUNCTION = OperatorConversions
.operatorBuilder("SLEEP")
.operandTypes(SqlTypeFamily.NUMERIC)
.requiredOperands(1)
.returnTypeNullable(SqlTypeName.VARCHAR) // always null
.functionCategory(SqlFunctionCategory.TIMEDATE)
.build();
@Override
public SqlOperator calciteOperator()
{
return SQL_FUNCTION;
}
@Nullable
@Override
public DruidExpression toDruidExpression(PlannerContext plannerContext, RowSignature rowSignature, RexNode rexNode)
{
return OperatorConversions.convertCall(plannerContext, rowSignature, rexNode, "sleep");
}
}

View File

@ -97,6 +97,7 @@ import org.apache.druid.sql.calcite.expression.builtin.RepeatOperatorConversion;
import org.apache.druid.sql.calcite.expression.builtin.ReverseOperatorConversion;
import org.apache.druid.sql.calcite.expression.builtin.RightOperatorConversion;
import org.apache.druid.sql.calcite.expression.builtin.RoundOperatorConversion;
import org.apache.druid.sql.calcite.expression.builtin.SleepOperatorConversion;
import org.apache.druid.sql.calcite.expression.builtin.StringFormatOperatorConversion;
import org.apache.druid.sql.calcite.expression.builtin.StringToArrayOperatorConversion;
import org.apache.druid.sql.calcite.expression.builtin.StrposOperatorConversion;
@ -168,6 +169,7 @@ public class DruidOperatorTable implements SqlOperatorTable
.add(new TimeParseOperatorConversion())
.add(new TimeShiftOperatorConversion())
.add(new TimestampToMillisOperatorConversion())
.add(new SleepOperatorConversion())
.build();
private static final List<SqlOperatorConversion> STRING_OPERATOR_CONVERSIONS =

View File

@ -18858,4 +18858,34 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
ImmutableList.of()
);
}
@Test
public void testSleepFunction() throws Exception
{
testQuery(
"SELECT sleep(m1) from foo where m1 < 2.0",
ImmutableList.of(
Druids.newScanQueryBuilder()
.dataSource(new TableDataSource("foo"))
.intervals(querySegmentSpec(Filtration.eternity()))
.virtualColumns(
new ExpressionVirtualColumn(
"v0",
"sleep(\"m1\")",
ValueType.STRING,
ExprMacroTable.nil()
)
)
.columns("v0")
.filters(new BoundDimFilter("m1", null, "2.0", null, true, null, null, StringComparators.NUMERIC))
.resultFormat(ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
.legacy(false)
.context(QUERY_CONTEXT_DEFAULT)
.build()
),
ImmutableList.of(
new Object[]{NullHandling.replaceWithDefault() ? "" : null}
)
);
}
}