From 187e21afaea8495d94ed780254beef9b12536757 Mon Sep 17 00:00:00 2001 From: Abhishek Radhakrishnan Date: Mon, 21 Oct 2024 11:05:53 -0700 Subject: [PATCH] Add `BrokerClient` implementation (#17382) This patch is extracted from PR 17353. Changes: - Added BrokerClient and BrokerClientImpl to the sql package that leverages the ServiceClient functionality; similar to OverlordClient and CoordinatorClient implementations in the server module. - For now, only two broker API stubs are added: submitSqlTask() and fetchExplainPlan(). - Added a new POJO class ExplainPlan that encapsulates explain plan info. - Deprecated org.apache.druid.discovery.BrokerClient in favor of the new BrokerClient in this patch. - Clean up ExplainAttributesTest a bit and added serde verification. --- .../apache/druid/discovery/BrokerClient.java | 4 + .../druid/rpc/guice/ServiceClientModule.java | 15 +- .../rpc/guice/ServiceClientModuleTest.java | 99 ++++++++++++ .../calcite/planner/ExplainAttributes.java | 26 +++ .../org/apache/druid/sql/client/Broker.java | 34 ++++ .../apache/druid/sql/client/BrokerClient.java | 51 ++++++ .../druid/sql/client/BrokerClientImpl.java | 84 ++++++++++ .../druid/sql/guice/BrokerServiceModule.java | 91 +++++++++++ .../apache/druid/sql/http/ExplainPlan.java | 117 ++++++++++++++ .../planner/ExplainAttributesTest.java | 81 ++++++---- .../sql/client/BrokerClientImplTest.java | 148 ++++++++++++++++++ .../sql/guice/BrokerServiceModuleTest.java | 114 ++++++++++++++ .../druid/sql/http/ExplainPlanTest.java | 122 +++++++++++++++ 13 files changed, 953 insertions(+), 33 deletions(-) create mode 100644 server/src/test/java/org/apache/druid/rpc/guice/ServiceClientModuleTest.java create mode 100644 sql/src/main/java/org/apache/druid/sql/client/Broker.java create mode 100644 sql/src/main/java/org/apache/druid/sql/client/BrokerClient.java create mode 100644 sql/src/main/java/org/apache/druid/sql/client/BrokerClientImpl.java create mode 100644 sql/src/main/java/org/apache/druid/sql/guice/BrokerServiceModule.java create mode 100644 sql/src/main/java/org/apache/druid/sql/http/ExplainPlan.java create mode 100644 sql/src/test/java/org/apache/druid/sql/client/BrokerClientImplTest.java create mode 100644 sql/src/test/java/org/apache/druid/sql/guice/BrokerServiceModuleTest.java create mode 100644 sql/src/test/java/org/apache/druid/sql/http/ExplainPlanTest.java diff --git a/server/src/main/java/org/apache/druid/discovery/BrokerClient.java b/server/src/main/java/org/apache/druid/discovery/BrokerClient.java index bdee9b8dfe4..a0ddbf42bed 100644 --- a/server/src/main/java/org/apache/druid/discovery/BrokerClient.java +++ b/server/src/main/java/org/apache/druid/discovery/BrokerClient.java @@ -29,6 +29,7 @@ import org.apache.druid.java.util.http.client.HttpClient; import org.apache.druid.java.util.http.client.Request; import org.apache.druid.java.util.http.client.response.StringFullResponseHandler; import org.apache.druid.java.util.http.client.response.StringFullResponseHolder; +import org.apache.druid.rpc.ServiceClient; import org.jboss.netty.channel.ChannelException; import org.jboss.netty.handler.codec.http.HttpMethod; import org.jboss.netty.handler.codec.http.HttpResponseStatus; @@ -41,7 +42,10 @@ import java.util.concurrent.ExecutionException; /** * This class facilitates interaction with Broker. + * Note that this should be removed and reconciled with org.apache.druid.sql.client.BrokerClient, which has the + * built-in functionality of {@link ServiceClient}, and proper Guice and service discovery wired in. */ +@Deprecated public class BrokerClient { private static final int MAX_RETRIES = 5; diff --git a/server/src/main/java/org/apache/druid/rpc/guice/ServiceClientModule.java b/server/src/main/java/org/apache/druid/rpc/guice/ServiceClientModule.java index 51dd2b89d73..94dfeb29d95 100644 --- a/server/src/main/java/org/apache/druid/rpc/guice/ServiceClientModule.java +++ b/server/src/main/java/org/apache/druid/rpc/guice/ServiceClientModule.java @@ -47,8 +47,8 @@ import java.util.concurrent.ScheduledExecutorService; public class ServiceClientModule implements DruidModule { + public static final int CLIENT_MAX_ATTEMPTS = 6; private static final int CONNECT_EXEC_THREADS = 4; - private static final int CLIENT_MAX_ATTEMPTS = 6; @Override public void configure(Binder binder) @@ -59,11 +59,9 @@ public class ServiceClientModule implements DruidModule @Provides @LazySingleton @EscalatedGlobal - public ServiceClientFactory makeServiceClientFactory(@EscalatedGlobal final HttpClient httpClient) + public ServiceClientFactory getServiceClientFactory(@EscalatedGlobal final HttpClient httpClient) { - final ScheduledExecutorService connectExec = - ScheduledExecutors.fixed(CONNECT_EXEC_THREADS, "ServiceClientFactory-%d"); - return new ServiceClientFactoryImpl(httpClient, connectExec); + return makeServiceClientFactory(httpClient); } @Provides @@ -117,4 +115,11 @@ public class ServiceClientModule implements DruidModule jsonMapper ); } + + public static ServiceClientFactory makeServiceClientFactory(@EscalatedGlobal final HttpClient httpClient) + { + final ScheduledExecutorService connectExec = + ScheduledExecutors.fixed(CONNECT_EXEC_THREADS, "ServiceClientFactory-%d"); + return new ServiceClientFactoryImpl(httpClient, connectExec); + } } diff --git a/server/src/test/java/org/apache/druid/rpc/guice/ServiceClientModuleTest.java b/server/src/test/java/org/apache/druid/rpc/guice/ServiceClientModuleTest.java new file mode 100644 index 00000000000..5d8a07d20a8 --- /dev/null +++ b/server/src/test/java/org/apache/druid/rpc/guice/ServiceClientModuleTest.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.rpc.guice; + +import com.google.common.collect.ImmutableList; +import com.google.inject.Guice; +import com.google.inject.Injector; +import org.apache.druid.client.coordinator.CoordinatorClient; +import org.apache.druid.discovery.DruidNodeDiscoveryProvider; +import org.apache.druid.guice.DruidGuiceExtensions; +import org.apache.druid.guice.LifecycleModule; +import org.apache.druid.guice.annotations.EscalatedGlobal; +import org.apache.druid.jackson.JacksonModule; +import org.apache.druid.java.util.http.client.HttpClient; +import org.apache.druid.rpc.ServiceClientFactory; +import org.apache.druid.rpc.ServiceLocator; +import org.apache.druid.rpc.indexing.OverlordClient; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnit; +import org.mockito.junit.MockitoRule; + +import static org.junit.Assert.assertNotNull; + +public class ServiceClientModuleTest +{ + private Injector injector; + + @Rule + public MockitoRule mockitoRule = MockitoJUnit.rule(); + + @Mock + private HttpClient httpClient; + + @Mock + private DruidNodeDiscoveryProvider discoveryProvider; + + @Mock + private ServiceLocator serviceLocator; + + @Mock + private ServiceClientFactory serviceClientFactory; + + @Before + public void setUp() + { + injector = Guice.createInjector( + ImmutableList.of( + new DruidGuiceExtensions(), + new LifecycleModule(), + new JacksonModule(), + new ServiceClientModule(), + binder -> { + binder.bind(HttpClient.class).annotatedWith(EscalatedGlobal.class).toInstance(httpClient); + binder.bind(ServiceLocator.class).toInstance(serviceLocator); + binder.bind(DruidNodeDiscoveryProvider.class).toInstance(discoveryProvider); + binder.bind(ServiceClientFactory.class).toInstance(serviceClientFactory); + } + ) + ); + } + + @Test + public void testGetServiceClientFactory() + { + assertNotNull(injector.getInstance(ServiceClientFactory.class)); + } + + @Test + public void testGetOverlordClient() + { + assertNotNull(injector.getInstance(OverlordClient.class)); + } + + @Test + public void testGetCoordinatorClient() + { + assertNotNull(injector.getInstance(CoordinatorClient.class)); + } +} diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/ExplainAttributes.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/ExplainAttributes.java index e2ae4fa7a10..533de7d58f2 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/ExplainAttributes.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/ExplainAttributes.java @@ -19,12 +19,14 @@ package org.apache.druid.sql.calcite.planner; +import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonProperty; import org.apache.druid.java.util.common.granularity.Granularity; import javax.annotation.Nullable; import java.util.List; +import java.util.Objects; /** * ExplainAttributes holds the attributes of a SQL statement that is used in the EXPLAIN PLAN result. @@ -45,6 +47,7 @@ public final class ExplainAttributes @Nullable private final String replaceTimeChunks; + @JsonCreator public ExplainAttributes( @JsonProperty("statementType") final String statementType, @JsonProperty("targetDataSource") @Nullable final String targetDataSource, @@ -117,6 +120,29 @@ public final class ExplainAttributes return replaceTimeChunks; } + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + ExplainAttributes that = (ExplainAttributes) o; + return Objects.equals(statementType, that.statementType) + && Objects.equals(targetDataSource, that.targetDataSource) + && Objects.equals(partitionedBy, that.partitionedBy) + && Objects.equals(clusteredBy, that.clusteredBy) + && Objects.equals(replaceTimeChunks, that.replaceTimeChunks); + } + + @Override + public int hashCode() + { + return Objects.hash(statementType, targetDataSource, partitionedBy, clusteredBy, replaceTimeChunks); + } + @Override public String toString() { diff --git a/sql/src/main/java/org/apache/druid/sql/client/Broker.java b/sql/src/main/java/org/apache/druid/sql/client/Broker.java new file mode 100644 index 00000000000..fb20c5166c8 --- /dev/null +++ b/sql/src/main/java/org/apache/druid/sql/client/Broker.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.sql.client; + +import com.google.inject.BindingAnnotation; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +@BindingAnnotation +@Target({ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD}) +@Retention(RetentionPolicy.RUNTIME) +public @interface Broker +{ +} diff --git a/sql/src/main/java/org/apache/druid/sql/client/BrokerClient.java b/sql/src/main/java/org/apache/druid/sql/client/BrokerClient.java new file mode 100644 index 00000000000..14cbbb7bff6 --- /dev/null +++ b/sql/src/main/java/org/apache/druid/sql/client/BrokerClient.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.sql.client; + +import com.google.common.util.concurrent.ListenableFuture; +import org.apache.druid.sql.http.ExplainPlan; +import org.apache.druid.sql.http.SqlQuery; +import org.apache.druid.sql.http.SqlTaskStatus; + +import java.util.List; + +/** + * High-level Broker client. + *

+ * All methods return futures, enabling asynchronous logic. If you want a synchronous response, use + * {@code FutureUtils.get} or {@code FutureUtils.getUnchecked}. + * Futures resolve to exceptions in the manner described by {@link org.apache.druid.rpc.ServiceClient#asyncRequest}. + *

+ * Typically acquired via Guice, where it is registered using {@link org.apache.druid.rpc.guice.ServiceClientModule}. + */ +public interface BrokerClient +{ + /** + * Submit the given {@code sqlQuery} to the Broker's SQL task endpoint. + */ + ListenableFuture submitSqlTask(SqlQuery sqlQuery); + + /** + * Fetches the explain plan for the given {@code sqlQuery} from the Broker's SQL task endpoint. + * + * @param sqlQuery the SQL query for which the {@code EXPLAIN PLAN FOR} information is to be fetched + */ + ListenableFuture> fetchExplainPlan(SqlQuery sqlQuery); +} diff --git a/sql/src/main/java/org/apache/druid/sql/client/BrokerClientImpl.java b/sql/src/main/java/org/apache/druid/sql/client/BrokerClientImpl.java new file mode 100644 index 00000000000..b3e064341e6 --- /dev/null +++ b/sql/src/main/java/org/apache/druid/sql/client/BrokerClientImpl.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.sql.client; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.util.concurrent.ListenableFuture; +import org.apache.druid.common.guava.FutureUtils; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.jackson.JacksonUtils; +import org.apache.druid.java.util.http.client.response.BytesFullResponseHandler; +import org.apache.druid.rpc.RequestBuilder; +import org.apache.druid.rpc.ServiceClient; +import org.apache.druid.sql.http.ExplainPlan; +import org.apache.druid.sql.http.SqlQuery; +import org.apache.druid.sql.http.SqlTaskStatus; +import org.jboss.netty.handler.codec.http.HttpMethod; + +import java.util.List; + +public class BrokerClientImpl implements BrokerClient +{ + private final ServiceClient client; + private final ObjectMapper jsonMapper; + + public BrokerClientImpl(final ServiceClient client, final ObjectMapper jsonMapper) + { + this.client = client; + this.jsonMapper = jsonMapper; + } + + @Override + public ListenableFuture submitSqlTask(final SqlQuery sqlQuery) + { + return FutureUtils.transform( + client.asyncRequest( + new RequestBuilder(HttpMethod.POST, "/druid/v2/sql/task/") + .jsonContent(jsonMapper, sqlQuery), + new BytesFullResponseHandler() + ), + holder -> JacksonUtils.readValue(jsonMapper, holder.getContent(), SqlTaskStatus.class) + ); + } + + @Override + public ListenableFuture> fetchExplainPlan(final SqlQuery sqlQuery) + { + final SqlQuery explainSqlQuery = new SqlQuery( + StringUtils.format("EXPLAIN PLAN FOR %s", sqlQuery.getQuery()), + null, + false, + false, + false, + null, + null + ); + return FutureUtils.transform( + client.asyncRequest( + new RequestBuilder(HttpMethod.POST, "/druid/v2/sql/task/") + .jsonContent(jsonMapper, explainSqlQuery), + new BytesFullResponseHandler() + ), + holder -> JacksonUtils.readValue(jsonMapper, holder.getContent(), new TypeReference>() {}) + ); + } +} + diff --git a/sql/src/main/java/org/apache/druid/sql/guice/BrokerServiceModule.java b/sql/src/main/java/org/apache/druid/sql/guice/BrokerServiceModule.java new file mode 100644 index 00000000000..05e022f8310 --- /dev/null +++ b/sql/src/main/java/org/apache/druid/sql/guice/BrokerServiceModule.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.sql.guice; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.inject.Binder; +import com.google.inject.Provides; +import org.apache.druid.discovery.DruidNodeDiscoveryProvider; +import org.apache.druid.discovery.NodeRole; +import org.apache.druid.guice.LazySingleton; +import org.apache.druid.guice.ManageLifecycle; +import org.apache.druid.guice.annotations.EscalatedGlobal; +import org.apache.druid.guice.annotations.Json; +import org.apache.druid.initialization.DruidModule; +import org.apache.druid.java.util.http.client.HttpClient; +import org.apache.druid.rpc.DiscoveryServiceLocator; +import org.apache.druid.rpc.ServiceClientFactory; +import org.apache.druid.rpc.ServiceLocator; +import org.apache.druid.rpc.StandardRetryPolicy; +import org.apache.druid.rpc.guice.ServiceClientModule; +import org.apache.druid.sql.client.Broker; +import org.apache.druid.sql.client.BrokerClient; +import org.apache.druid.sql.client.BrokerClientImpl; + +/** + * Module that processes can install if they require a {@link BrokerClient}. + *

+ * Similar to {@link ServiceClientModule}, but since {@link BrokerClient} depends + * on classes from the sql module, this is a separate module within the sql package. + *

+ */ +public class BrokerServiceModule implements DruidModule +{ + @Override + public void configure(Binder binder) + { + // Nothing to do. + } + + @Provides + @LazySingleton + @EscalatedGlobal + public ServiceClientFactory getServiceClientFactory(@EscalatedGlobal final HttpClient httpClient) + { + return ServiceClientModule.makeServiceClientFactory(httpClient); + } + + @Provides + @ManageLifecycle + @Broker + public ServiceLocator makeBrokerServiceLocator(final DruidNodeDiscoveryProvider discoveryProvider) + { + return new DiscoveryServiceLocator(discoveryProvider, NodeRole.BROKER); + } + + @Provides + @LazySingleton + public BrokerClient makeBrokerClient( + @Json final ObjectMapper jsonMapper, + @EscalatedGlobal final ServiceClientFactory clientFactory, + @Broker final ServiceLocator serviceLocator + ) + { + return new BrokerClientImpl( + clientFactory.makeClient( + NodeRole.BROKER.getJsonName(), + serviceLocator, + StandardRetryPolicy.builder().maxAttempts(ServiceClientModule.CLIENT_MAX_ATTEMPTS).build() + ), + jsonMapper + ); + } +} + diff --git a/sql/src/main/java/org/apache/druid/sql/http/ExplainPlan.java b/sql/src/main/java/org/apache/druid/sql/http/ExplainPlan.java new file mode 100644 index 00000000000..68defc4b2a4 --- /dev/null +++ b/sql/src/main/java/org/apache/druid/sql/http/ExplainPlan.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.sql.http; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.JsonDeserializer; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import org.apache.druid.sql.calcite.planner.ExplainAttributes; + +import java.io.IOException; +import java.util.Objects; + +/** + * Class that encapsulates the information of a single plan for an {@code EXPLAIN PLAN FOR} query. + *

+ * Similar to {@link #getAttributes()}, it's possible to provide more structure to {@link #getPlan()}, + * at least for the native query explain, but there's currently no use case for it. + *

+ */ +public class ExplainPlan +{ + @JsonProperty("PLAN") + private final String plan; + + @JsonProperty("RESOURCES") + private final String resources; + + @JsonProperty("ATTRIBUTES") + @JsonDeserialize(using = ExplainAttributesDeserializer.class) + private final ExplainAttributes attributes; + + @JsonCreator + public ExplainPlan( + @JsonProperty("PLAN") final String plan, + @JsonProperty("RESOURCES") final String resources, + @JsonProperty("ATTRIBUTES") final ExplainAttributes attributes + ) + { + this.plan = plan; + this.resources = resources; + this.attributes = attributes; + } + + public String getPlan() + { + return plan; + } + + public String getResources() + { + return resources; + } + + public ExplainAttributes getAttributes() + { + return attributes; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + ExplainPlan that = (ExplainPlan) o; + return Objects.equals(plan, that.plan) + && Objects.equals(resources, that.resources) + && Objects.equals(attributes, that.attributes); + } + + @Override + public int hashCode() + { + return Objects.hash(plan, resources, attributes); + } + + /** + * Custom deserializer for {@link ExplainAttributes} because the value for {@link #attributes} in the plan + * is encoded as a JSON string. This deserializer tells Jackson on how to parse the JSON string + * and map it to the fields in the {@link ExplainAttributes} class. + */ + private static class ExplainAttributesDeserializer extends JsonDeserializer + { + @Override + public ExplainAttributes deserialize(JsonParser jsonParser, DeserializationContext context) throws IOException + { + final ObjectMapper objectMapper = (ObjectMapper) jsonParser.getCodec(); + return objectMapper.readValue(jsonParser.getText(), ExplainAttributes.class); + } + } +} + + diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/planner/ExplainAttributesTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/planner/ExplainAttributesTest.java index 53e2abf2749..d203dd34002 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/planner/ExplainAttributesTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/planner/ExplainAttributesTest.java @@ -19,8 +19,8 @@ package org.apache.druid.sql.calcite.planner; -import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.druid.error.DruidException; import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.granularity.Granularities; import org.junit.Assert; @@ -28,14 +28,16 @@ import org.junit.Test; import java.util.Arrays; +import static org.junit.Assert.assertEquals; + public class ExplainAttributesTest { - private static final ObjectMapper DEFAULT_OBJECT_MAPPER = new DefaultObjectMapper(); + private static final ObjectMapper MAPPER = new DefaultObjectMapper(); @Test - public void testSimpleGetters() + public void testGetters() { - ExplainAttributes selectAttributes = new ExplainAttributes("SELECT", null, null, null, null); + final ExplainAttributes selectAttributes = new ExplainAttributes("SELECT", null, null, null, null); Assert.assertEquals("SELECT", selectAttributes.getStatementType()); Assert.assertNull(selectAttributes.getTargetDataSource()); Assert.assertNull(selectAttributes.getPartitionedBy()); @@ -44,9 +46,9 @@ public class ExplainAttributesTest } @Test - public void testSerializeSelectAttributes() throws JsonProcessingException + public void testSerdeOfSelectAttributes() { - ExplainAttributes selectAttributes = new ExplainAttributes( + final ExplainAttributes selectAttributes = new ExplainAttributes( "SELECT", null, null, @@ -56,13 +58,14 @@ public class ExplainAttributesTest final String expectedAttributes = "{" + "\"statementType\":\"SELECT\"" + "}"; - Assert.assertEquals(expectedAttributes, DEFAULT_OBJECT_MAPPER.writeValueAsString(selectAttributes)); + + testSerde(selectAttributes, expectedAttributes); } @Test - public void testSerializeInsertAttributes() throws JsonProcessingException + public void testSerdeOfInsertAttributes() { - ExplainAttributes insertAttributes = new ExplainAttributes( + final ExplainAttributes insertAttributes = new ExplainAttributes( "INSERT", "foo", Granularities.DAY, @@ -74,13 +77,13 @@ public class ExplainAttributesTest + "\"targetDataSource\":\"foo\"," + "\"partitionedBy\":\"DAY\"" + "}"; - Assert.assertEquals(expectedAttributes, DEFAULT_OBJECT_MAPPER.writeValueAsString(insertAttributes)); + testSerde(insertAttributes, expectedAttributes); } @Test - public void testSerializeInsertAllAttributes() throws JsonProcessingException + public void testSerdeOfInsertAllAttributes() { - ExplainAttributes insertAttributes = new ExplainAttributes( + final ExplainAttributes insertAttributes = new ExplainAttributes( "INSERT", "foo", Granularities.ALL, @@ -92,78 +95,100 @@ public class ExplainAttributesTest + "\"targetDataSource\":\"foo\"," + "\"partitionedBy\":{\"type\":\"all\"}" + "}"; - Assert.assertEquals(expectedAttributes, DEFAULT_OBJECT_MAPPER.writeValueAsString(insertAttributes)); + testSerde(insertAttributes, expectedAttributes); } @Test - public void testSerializeReplaceAttributes() throws JsonProcessingException + public void testSerdeOfReplaceAttributes() { - ExplainAttributes replaceAttributes1 = new ExplainAttributes( + final ExplainAttributes replaceAttributes = new ExplainAttributes( "REPLACE", "foo", Granularities.HOUR, null, "ALL" ); - final String expectedAttributes1 = "{" + final String expectedAttributes = "{" + "\"statementType\":\"REPLACE\"," + "\"targetDataSource\":\"foo\"," + "\"partitionedBy\":\"HOUR\"," + "\"replaceTimeChunks\":\"ALL\"" + "}"; - Assert.assertEquals(expectedAttributes1, DEFAULT_OBJECT_MAPPER.writeValueAsString(replaceAttributes1)); + testSerde(replaceAttributes, expectedAttributes); + } - ExplainAttributes replaceAttributes2 = new ExplainAttributes( + @Test + public void testSerdeOfReplaceAttributesWithTimeChunks() + { + final ExplainAttributes replaceAttributes = new ExplainAttributes( "REPLACE", "foo", Granularities.HOUR, null, "2019-08-25T02:00:00.000Z/2019-08-25T03:00:00.000Z" ); - final String expectedAttributes2 = "{" + final String expectedAttributes = "{" + "\"statementType\":\"REPLACE\"," + "\"targetDataSource\":\"foo\"," + "\"partitionedBy\":\"HOUR\"," + "\"replaceTimeChunks\":\"2019-08-25T02:00:00.000Z/2019-08-25T03:00:00.000Z\"" + "}"; - Assert.assertEquals(expectedAttributes2, DEFAULT_OBJECT_MAPPER.writeValueAsString(replaceAttributes2)); + testSerde(replaceAttributes, expectedAttributes); } @Test - public void testSerializeReplaceWithClusteredByAttributes() throws JsonProcessingException + public void testReplaceAttributesWithClusteredBy() { - ExplainAttributes replaceAttributes1 = new ExplainAttributes( + final ExplainAttributes replaceAttributes = new ExplainAttributes( "REPLACE", "foo", Granularities.HOUR, Arrays.asList("foo", "CEIL(`f2`)"), "ALL" ); - final String expectedAttributes1 = "{" + final String expectedAttributes = "{" + "\"statementType\":\"REPLACE\"," + "\"targetDataSource\":\"foo\"," + "\"partitionedBy\":\"HOUR\"," + "\"clusteredBy\":[\"foo\",\"CEIL(`f2`)\"]," + "\"replaceTimeChunks\":\"ALL\"" + "}"; - Assert.assertEquals(expectedAttributes1, DEFAULT_OBJECT_MAPPER.writeValueAsString(replaceAttributes1)); + testSerde(replaceAttributes, expectedAttributes); + } - - ExplainAttributes replaceAttributes2 = new ExplainAttributes( + @Test + public void testReplaceAttributesWithClusteredByAndTimeChunks() + { + final ExplainAttributes replaceAttributes = new ExplainAttributes( "REPLACE", "foo", Granularities.HOUR, Arrays.asList("foo", "boo"), "2019-08-25T02:00:00.000Z/2019-08-25T03:00:00.000Z" ); - final String expectedAttributes2 = "{" + final String expectedAttributes = "{" + "\"statementType\":\"REPLACE\"," + "\"targetDataSource\":\"foo\"," + "\"partitionedBy\":\"HOUR\"," + "\"clusteredBy\":[\"foo\",\"boo\"]," + "\"replaceTimeChunks\":\"2019-08-25T02:00:00.000Z/2019-08-25T03:00:00.000Z\"" + "}"; - Assert.assertEquals(expectedAttributes2, DEFAULT_OBJECT_MAPPER.writeValueAsString(replaceAttributes2)); + testSerde(replaceAttributes, expectedAttributes); } + + private void testSerde(final ExplainAttributes explainAttributes, final String expectedSerializedAttributes) + { + final ExplainAttributes observedAttributes; + try { + final String observedSerializedAttributes = MAPPER.writeValueAsString(explainAttributes); + assertEquals(expectedSerializedAttributes, observedSerializedAttributes); + observedAttributes = MAPPER.readValue(observedSerializedAttributes, ExplainAttributes.class); + } + catch (Exception e) { + throw DruidException.defensive(e, "Error serializing/deserializing explain plan[%s].", explainAttributes); + } + assertEquals(explainAttributes, observedAttributes); + } + } diff --git a/sql/src/test/java/org/apache/druid/sql/client/BrokerClientImplTest.java b/sql/src/test/java/org/apache/druid/sql/client/BrokerClientImplTest.java new file mode 100644 index 00000000000..51d66f03816 --- /dev/null +++ b/sql/src/test/java/org/apache/druid/sql/client/BrokerClientImplTest.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.sql.client; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import org.apache.calcite.avatica.SqlType; +import org.apache.druid.indexer.TaskState; +import org.apache.druid.jackson.DefaultObjectMapper; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.rpc.MockServiceClient; +import org.apache.druid.rpc.RequestBuilder; +import org.apache.druid.sql.calcite.planner.ExplainAttributes; +import org.apache.druid.sql.http.ExplainPlan; +import org.apache.druid.sql.http.ResultFormat; +import org.apache.druid.sql.http.SqlParameter; +import org.apache.druid.sql.http.SqlQuery; +import org.apache.druid.sql.http.SqlTaskStatus; +import org.jboss.netty.handler.codec.http.HttpMethod; +import org.jboss.netty.handler.codec.http.HttpResponseStatus; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import javax.ws.rs.core.HttpHeaders; +import javax.ws.rs.core.MediaType; + +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.assertEquals; + +public class BrokerClientImplTest +{ + private ObjectMapper jsonMapper; + private MockServiceClient serviceClient; + private BrokerClient brokerClient; + + @Before + public void setup() + { + jsonMapper = new DefaultObjectMapper(); + serviceClient = new MockServiceClient(); + brokerClient = new BrokerClientImpl(serviceClient, jsonMapper); + } + + @After + public void tearDown() + { + serviceClient.verify(); + } + + @Test + public void testSubmitSqlTask() throws Exception + { + final SqlQuery query = new SqlQuery( + "REPLACE INTO foo OVERWRITE ALL SELECT * FROM bar PARTITIONED BY ALL", + ResultFormat.ARRAY, + true, + true, + true, + ImmutableMap.of("useCache", false), + ImmutableList.of(new SqlParameter(SqlType.INTEGER, 1)) + ); + final SqlTaskStatus taskStatus = new SqlTaskStatus("taskId1", TaskState.RUNNING, null); + + serviceClient.expectAndRespond( + new RequestBuilder(HttpMethod.POST, "/druid/v2/sql/task/") + .jsonContent(jsonMapper, query), + HttpResponseStatus.OK, + ImmutableMap.of(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON), + jsonMapper.writeValueAsBytes(taskStatus) + ); + + assertEquals(taskStatus, brokerClient.submitSqlTask(query).get()); + } + + @Test + public void testFetchExplainPlan() throws Exception + { + final SqlQuery query = new SqlQuery( + "REPLACE INTO foo OVERWRITE ALL SELECT * FROM bar PARTITIONED BY ALL", + ResultFormat.ARRAY, + true, + true, + true, + ImmutableMap.of("useCache", false), + ImmutableList.of(new SqlParameter(SqlType.INTEGER, 1)) + ); + final SqlQuery explainQuery = new SqlQuery( + StringUtils.format("EXPLAIN PLAN FOR %s", query.getQuery()), + null, + false, + false, + false, + null, + null + ); + + final String plan = "[{\"query\":{\"queryType\":\"scan\",\"dataSource\":{\"type\":\"external\",\"inputSource\":{\"type\":\"inline\",\"data\":\"a,b,1\\nc,d,2\\n\"},\"inputFormat\":{\"type\":\"csv\",\"columns\":[\"x\",\"y\",\"z\"]},\"signature\":[{\"name\":\"x\",\"type\":\"STRING\"},{\"name\":\"y\",\"type\":\"STRING\"},{\"name\":\"z\",\"type\":\"LONG\"}]},\"intervals\":{\"type\":\"intervals\",\"intervals\":[\"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z\"]},\"resultFormat\":\"compactedList\",\"columns\":[\"x\",\"y\",\"z\"],\"context\":{\"defaultTimeout\":300000,\"maxScatterGatherBytes\":9223372036854775807,\"sqlCurrentTimestamp\":\"2000-01-01T00:00:00Z\",\"sqlInsertSegmentGranularity\":\"{\\\"type\\\":\\\"all\\\"}\",\"sqlQueryId\":\"dummy\",\"sqlReplaceTimeChunks\":\"all\",\"vectorize\":\"false\",\"vectorizeVirtualColumns\":\"false\"},\"columnTypes\":[\"STRING\",\"STRING\",\"LONG\"],\"granularity\":{\"type\":\"all\"},\"legacy\":false},\"signature\":[{\"name\":\"x\",\"type\":\"STRING\"},{\"name\":\"y\",\"type\":\"STRING\"},{\"name\":\"z\",\"type\":\"LONG\"}],\"columnMappings\":[{\"queryColumn\":\"x\",\"outputColumn\":\"x\"},{\"queryColumn\":\"y\",\"outputColumn\":\"y\"},{\"queryColumn\":\"z\",\"outputColumn\":\"z\"}]}]"; + final String resources = "[{\"name\":\"EXTERNAL\",\"type\":\"EXTERNAL\"},{\"name\":\"dst\",\"type\":\"DATASOURCE\"}]"; + final ExplainAttributes attributes = new ExplainAttributes("REPLACE", "foo", Granularities.ALL, null, "all"); + + final List> givenPlans = ImmutableList.of( + ImmutableMap.of( + "PLAN", + plan, + "RESOURCES", + resources, + "ATTRIBUTES", + jsonMapper.writeValueAsString(attributes) + ) + ); + + serviceClient.expectAndRespond( + new RequestBuilder(HttpMethod.POST, "/druid/v2/sql/task/") + .jsonContent(jsonMapper, explainQuery), + HttpResponseStatus.OK, + ImmutableMap.of(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON), + jsonMapper.writeValueAsBytes(givenPlans) + ); + + assertEquals( + ImmutableList.of(new ExplainPlan(plan, resources, attributes)), + brokerClient.fetchExplainPlan(query).get() + ); + } + +} diff --git a/sql/src/test/java/org/apache/druid/sql/guice/BrokerServiceModuleTest.java b/sql/src/test/java/org/apache/druid/sql/guice/BrokerServiceModuleTest.java new file mode 100644 index 00000000000..9b59ad86733 --- /dev/null +++ b/sql/src/test/java/org/apache/druid/sql/guice/BrokerServiceModuleTest.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.sql.guice; + +import com.google.common.collect.ImmutableList; +import com.google.inject.ConfigurationException; +import com.google.inject.Guice; +import com.google.inject.Injector; +import org.apache.druid.client.coordinator.CoordinatorClient; +import org.apache.druid.discovery.DruidNodeDiscoveryProvider; +import org.apache.druid.guice.DruidGuiceExtensions; +import org.apache.druid.guice.LifecycleModule; +import org.apache.druid.guice.annotations.EscalatedGlobal; +import org.apache.druid.jackson.JacksonModule; +import org.apache.druid.java.util.http.client.HttpClient; +import org.apache.druid.rpc.ServiceClientFactory; +import org.apache.druid.rpc.ServiceLocator; +import org.apache.druid.rpc.indexing.OverlordClient; +import org.apache.druid.sql.client.BrokerClient; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnit; +import org.mockito.junit.MockitoRule; + +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThrows; + +public class BrokerServiceModuleTest +{ + private Injector injector; + + @Rule + public MockitoRule mockitoRule = MockitoJUnit.rule(); + + @Mock + private HttpClient httpClient; + + @Mock + private DruidNodeDiscoveryProvider discoveryProvider; + + @Mock + private ServiceLocator serviceLocator; + + @Mock + private ServiceClientFactory serviceClientFactory; + + @Before + public void setUp() + { + injector = Guice.createInjector( + ImmutableList.of( + new DruidGuiceExtensions(), + new LifecycleModule(), + new JacksonModule(), + new BrokerServiceModule(), + binder -> { + binder.bind(HttpClient.class).annotatedWith(EscalatedGlobal.class).toInstance(httpClient); + binder.bind(ServiceLocator.class).toInstance(serviceLocator); + binder.bind(DruidNodeDiscoveryProvider.class).toInstance(discoveryProvider); + binder.bind(ServiceClientFactory.class).toInstance(serviceClientFactory); + } + ) + ); + } + + @Test + public void testGetServiceClientFactory() + { + assertNotNull(injector.getInstance(ServiceClientFactory.class)); + } + + @Test + public void testGetBrokerClient() + { + assertNotNull(injector.getInstance(BrokerClient.class)); + } + + @Test + public void testGetCoordinatorClient() + { + assertThrows( + ConfigurationException.class, + () -> injector.getInstance(CoordinatorClient.class) + ); + } + + @Test + public void testGetOverlordClient() + { + assertThrows( + ConfigurationException.class, + () -> injector.getInstance(OverlordClient.class) + ); + } +} diff --git a/sql/src/test/java/org/apache/druid/sql/http/ExplainPlanTest.java b/sql/src/test/java/org/apache/druid/sql/http/ExplainPlanTest.java new file mode 100644 index 00000000000..e3385fc5f51 --- /dev/null +++ b/sql/src/test/java/org/apache/druid/sql/http/ExplainPlanTest.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.sql.http; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import org.apache.druid.error.DruidException; +import org.apache.druid.jackson.DefaultObjectMapper; +import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.sql.calcite.planner.ExplainAttributes; +import org.junit.Test; + +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.assertEquals; + +public class ExplainPlanTest +{ + private static final ObjectMapper MAPPER = new DefaultObjectMapper(); + + @Test + public void testExplainPlanSerdeForSelectQuery() throws JsonProcessingException + { + final String plan = "[{\"query\":{\"queryType\":\"scan\",\"dataSource\":{\"type\":\"inline\",\"columnNames\":[\"EXPR$0\"],\"columnTypes\":[\"LONG\"],\"rows\":[[2]]},\"intervals\":{\"type\":\"intervals\",\"intervals\":[\"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z\"]},\"resultFormat\":\"compactedList\",\"columns\":[\"EXPR$0\"],\"context\":{\"defaultTimeout\":300000,\"maxScatterGatherBytes\":9223372036854775807,\"sqlCurrentTimestamp\":\"2000-01-01T00:00:00Z\",\"sqlQueryId\":\"dummy\",\"vectorize\":\"false\",\"vectorizeVirtualColumns\":\"false\"},\"columnTypes\":[\"LONG\"],\"granularity\":{\"type\":\"all\"},\"legacy\":false},\"signature\":[{\"name\":\"EXPR$0\",\"type\":\"LONG\"}],\"columnMappings\":[{\"queryColumn\":\"EXPR$0\",\"outputColumn\":\"EXPR$0\"}]}]"; + final String resources = "[{\"name\":\"foo\",\"type\":\"DATASOURCE\"}]"; + final ExplainAttributes attributes = new ExplainAttributes("SELECT", null, null, null, null); + + final List> givenPlans = ImmutableList.of( + ImmutableMap.of( + "PLAN", + plan, + "RESOURCES", + resources, + "ATTRIBUTES", + MAPPER.writeValueAsString(attributes) + ) + ); + + testSerde(givenPlans, ImmutableList.of(new ExplainPlan(plan, resources, attributes))); + } + + @Test + public void testExplainPlanSerdeForReplaceQuery() throws JsonProcessingException + { + final String plan = "[{\"query\":{\"queryType\":\"scan\",\"dataSource\":{\"type\":\"external\",\"inputSource\":{\"type\":\"inline\",\"data\":\"a,b,1\\nc,d,2\\n\"},\"inputFormat\":{\"type\":\"csv\",\"columns\":[\"x\",\"y\",\"z\"]},\"signature\":[{\"name\":\"x\",\"type\":\"STRING\"},{\"name\":\"y\",\"type\":\"STRING\"},{\"name\":\"z\",\"type\":\"LONG\"}]},\"intervals\":{\"type\":\"intervals\",\"intervals\":[\"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z\"]},\"resultFormat\":\"compactedList\",\"columns\":[\"x\",\"y\",\"z\"],\"context\":{\"defaultTimeout\":300000,\"maxScatterGatherBytes\":9223372036854775807,\"sqlCurrentTimestamp\":\"2000-01-01T00:00:00Z\",\"sqlInsertSegmentGranularity\":\"{\\\"type\\\":\\\"all\\\"}\",\"sqlQueryId\":\"dummy\",\"sqlReplaceTimeChunks\":\"all\",\"vectorize\":\"false\",\"vectorizeVirtualColumns\":\"false\"},\"columnTypes\":[\"STRING\",\"STRING\",\"LONG\"],\"granularity\":{\"type\":\"all\"},\"legacy\":false},\"signature\":[{\"name\":\"x\",\"type\":\"STRING\"},{\"name\":\"y\",\"type\":\"STRING\"},{\"name\":\"z\",\"type\":\"LONG\"}],\"columnMappings\":[{\"queryColumn\":\"x\",\"outputColumn\":\"x\"},{\"queryColumn\":\"y\",\"outputColumn\":\"y\"},{\"queryColumn\":\"z\",\"outputColumn\":\"z\"}]}]"; + final String resources = "[{\"name\":\"EXTERNAL\",\"type\":\"EXTERNAL\"},{\"name\":\"dst\",\"type\":\"DATASOURCE\"}]"; + final ExplainAttributes attributes = new ExplainAttributes("REPLACE", "dst", Granularities.ALL, null, "all"); + + final List> givenPlans = ImmutableList.of( + ImmutableMap.of( + "PLAN", + plan, + "RESOURCES", + resources, + "ATTRIBUTES", + MAPPER.writeValueAsString(attributes) + ) + ); + + testSerde(givenPlans, ImmutableList.of(new ExplainPlan(plan, resources, attributes))); + } + + @Test + public void testExplainPlanSerdeForInsertQuery() throws JsonProcessingException + { + final String plan = "[{\"query\":{\"queryType\":\"scan\",\"dataSource\":{\"type\":\"external\",\"inputSource\":{\"type\":\"inline\",\"data\":\"a,b,1\\nc,d,2\\n\"},\"inputFormat\":{\"type\":\"csv\",\"columns\":[\"x\",\"y\",\"z\"]},\"signature\":[{\"name\":\"x\",\"type\":\"STRING\"},{\"name\":\"y\",\"type\":\"STRING\"},{\"name\":\"z\",\"type\":\"LONG\"}]},\"intervals\":{\"type\":\"intervals\",\"intervals\":[\"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z\"]},\"resultFormat\":\"compactedList\",\"columns\":[\"x\",\"y\",\"z\"],\"context\":{\"defaultTimeout\":300000,\"maxScatterGatherBytes\":9223372036854775807,\"sqlCurrentTimestamp\":\"2000-01-01T00:00:00Z\",\"sqlInsertSegmentGranularity\":\"{\\\"type\\\":\\\"all\\\"}\",\"sqlQueryId\":\"dummy\",\"sqlReplaceTimeChunks\":\"all\",\"vectorize\":\"false\",\"vectorizeVirtualColumns\":\"false\"},\"columnTypes\":[\"STRING\",\"STRING\",\"LONG\"],\"granularity\":{\"type\":\"all\"},\"legacy\":false},\"signature\":[{\"name\":\"x\",\"type\":\"STRING\"},{\"name\":\"y\",\"type\":\"STRING\"},{\"name\":\"z\",\"type\":\"LONG\"}],\"columnMappings\":[{\"queryColumn\":\"x\",\"outputColumn\":\"x\"},{\"queryColumn\":\"y\",\"outputColumn\":\"y\"},{\"queryColumn\":\"z\",\"outputColumn\":\"z\"}]}]"; + final String resources = "[{\"name\":\"dst\",\"type\":\"DATASOURCE\"},{\"name\":\"foo\",\"type\":\"DATASOURCE\"}]"; + final ExplainAttributes attributes = new ExplainAttributes("INSERT", "foo", Granularities.DAY, ImmutableList.of("floor_m1", "dim1", "CEIL(\"m2\")"), null); + + final List> givenPlans = ImmutableList.of( + ImmutableMap.of( + "PLAN", + plan, + "RESOURCES", + resources, + "ATTRIBUTES", + MAPPER.writeValueAsString(attributes) + ) + ); + + testSerde(givenPlans, ImmutableList.of(new ExplainPlan(plan, resources, attributes))); + } + + private void testSerde( + final List> givenPlans, + final List expectedExplainPlans + ) + { + final List observedExplainPlans; + try { + observedExplainPlans = MAPPER.readValue( + MAPPER.writeValueAsString(givenPlans), + new TypeReference>() {} + ); + } + catch (Exception e) { + throw DruidException.defensive(e, "Error deserializing given plans[%s] into explain plans.", givenPlans); + } + assertEquals(expectedExplainPlans, observedExplainPlans); + } +}