SQL: ConstantProcessor can now handle NamedWriteable (#39876)

Enhance ConstantProcessor to properly serialize complex objects
(Intervals) that have their own custom serialization/deserialization
mechanism

Fix #39875

(cherry picked from commit ed8a1f9340673e69a44ea7a89679cadb4762e43d)
This commit is contained in:
Costin Leau 2019-03-11 11:50:10 +02:00 committed by Costin Leau
parent 471aa6a16a
commit a079b9fd6d
4 changed files with 51 additions and 5 deletions

View File

@ -107,7 +107,7 @@ public abstract class SpecBaseIntegrationTestCase extends JdbcIntegrationTestCas
}
protected int fetchSize() {
return between(1, 500);
return between(1, 150);
}
// TODO: use UTC for now until deciding on a strategy for handling date extraction

View File

@ -346,6 +346,22 @@ SELECT YEAR(NOW() - INTERVAL 2 YEARS) / 1000 AS result;
2
;
dateAndIntervalPaginated
SELECT YEAR(birth_date - INTERVAL 2 YEARS) / 1000 AS result FROM test_emp ORDER BY birth_date LIMIT 10;
result
---------------
1
1
1
1
1
1
1
1
1
1
;
currentTimestampFilter
SELECT first_name FROM test_emp WHERE hire_date > NOW() - INTERVAL 100 YEARS ORDER BY first_name ASC LIMIT 10;

View File

@ -5,6 +5,7 @@
*/
package org.elasticsearch.xpack.sql.expression.gen.processor;
import org.elasticsearch.common.io.stream.NamedWriteable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
@ -16,17 +17,40 @@ public class ConstantProcessor implements Processor {
public static String NAME = "c";
private final Object constant;
private final boolean namedWriteable;
private final Class<?> clazz;
public ConstantProcessor(Object value) {
this.constant = value;
this.namedWriteable = value instanceof NamedWriteable;
this.clazz = namedWriteable ? value.getClass() : null;
}
@SuppressWarnings("unchecked")
public ConstantProcessor(StreamInput in) throws IOException {
constant = in.readGenericValue();
namedWriteable = in.readBoolean();
if (namedWriteable) {
try {
clazz = ConstantProcessor.class.getClassLoader().loadClass(in.readString());
} catch (ClassNotFoundException e) {
throw new IOException(e);
}
constant = in.readNamedWriteable((Class<NamedWriteable>) clazz);
} else {
clazz = null;
constant = in.readGenericValue();
}
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeGenericValue(constant);
out.writeBoolean(namedWriteable);
if (namedWriteable) {
out.writeString(constant.getClass().getName());
out.writeNamedWriteable((NamedWriteable) constant);
} else {
out.writeGenericValue(constant);
}
}
@Override

View File

@ -7,9 +7,12 @@ package org.elasticsearch.xpack.sql.expression.gen.processor;
import org.elasticsearch.common.io.stream.Writeable.Reader;
import org.elasticsearch.test.AbstractWireSerializingTestCase;
import org.elasticsearch.xpack.sql.expression.gen.processor.ConstantProcessor;
import org.elasticsearch.xpack.sql.expression.literal.IntervalDayTime;
import org.elasticsearch.xpack.sql.type.DataType;
import java.io.IOException;
import java.time.Duration;
import java.util.concurrent.TimeUnit;
public class ConstantProcessorTests extends AbstractWireSerializingTestCase<ConstantProcessor> {
public static ConstantProcessor randomConstantProcessor() {
@ -28,7 +31,10 @@ public class ConstantProcessorTests extends AbstractWireSerializingTestCase<Cons
@Override
protected ConstantProcessor mutateInstance(ConstantProcessor instance) throws IOException {
return new ConstantProcessor(randomValueOtherThan(instance.process(null), () -> randomAlphaOfLength(5)));
return new ConstantProcessor(randomValueOtherThan(instance.process(null),
() -> new IntervalDayTime(Duration.ofSeconds(
randomLongBetween(TimeUnit.SECONDS.convert(3, TimeUnit.HOURS), TimeUnit.SECONDS.convert(23, TimeUnit.HOURS))),
DataType.INTERVAL_DAY_TO_SECOND)));
}
public void testApply() {