Add `BrokerClient` implementation (#17382)

This patch is extracted from PR 17353.

Changes:

- Added BrokerClient and BrokerClientImpl to the sql package that leverages the ServiceClient functionality; similar to OverlordClient and CoordinatorClient implementations in the server module.
- For now, only two broker API stubs are added: submitSqlTask() and fetchExplainPlan().
- Added a new POJO class ExplainPlan that encapsulates explain plan info.
- Deprecated org.apache.druid.discovery.BrokerClient in favor of the new BrokerClient in this patch.
- Clean up ExplainAttributesTest a bit and added serde verification.
This commit is contained in:
Abhishek Radhakrishnan 2024-10-21 11:05:53 -07:00 committed by GitHub
parent 5da9949992
commit 187e21afae
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 953 additions and 33 deletions

View File

@ -29,6 +29,7 @@ import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.java.util.http.client.Request;
import org.apache.druid.java.util.http.client.response.StringFullResponseHandler;
import org.apache.druid.java.util.http.client.response.StringFullResponseHolder;
import org.apache.druid.rpc.ServiceClient;
import org.jboss.netty.channel.ChannelException;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
@ -41,7 +42,10 @@ import java.util.concurrent.ExecutionException;
/**
* This class facilitates interaction with Broker.
* Note that this should be removed and reconciled with org.apache.druid.sql.client.BrokerClient, which has the
* built-in functionality of {@link ServiceClient}, and proper Guice and service discovery wired in.
*/
@Deprecated
public class BrokerClient
{
private static final int MAX_RETRIES = 5;

View File

@ -47,8 +47,8 @@ import java.util.concurrent.ScheduledExecutorService;
public class ServiceClientModule implements DruidModule
{
public static final int CLIENT_MAX_ATTEMPTS = 6;
private static final int CONNECT_EXEC_THREADS = 4;
private static final int CLIENT_MAX_ATTEMPTS = 6;
@Override
public void configure(Binder binder)
@ -59,11 +59,9 @@ public class ServiceClientModule implements DruidModule
@Provides
@LazySingleton
@EscalatedGlobal
public ServiceClientFactory makeServiceClientFactory(@EscalatedGlobal final HttpClient httpClient)
public ServiceClientFactory getServiceClientFactory(@EscalatedGlobal final HttpClient httpClient)
{
final ScheduledExecutorService connectExec =
ScheduledExecutors.fixed(CONNECT_EXEC_THREADS, "ServiceClientFactory-%d");
return new ServiceClientFactoryImpl(httpClient, connectExec);
return makeServiceClientFactory(httpClient);
}
@Provides
@ -117,4 +115,11 @@ public class ServiceClientModule implements DruidModule
jsonMapper
);
}
public static ServiceClientFactory makeServiceClientFactory(@EscalatedGlobal final HttpClient httpClient)
{
final ScheduledExecutorService connectExec =
ScheduledExecutors.fixed(CONNECT_EXEC_THREADS, "ServiceClientFactory-%d");
return new ServiceClientFactoryImpl(httpClient, connectExec);
}
}

View File

@ -0,0 +1,99 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.rpc.guice;
import com.google.common.collect.ImmutableList;
import com.google.inject.Guice;
import com.google.inject.Injector;
import org.apache.druid.client.coordinator.CoordinatorClient;
import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
import org.apache.druid.guice.DruidGuiceExtensions;
import org.apache.druid.guice.LifecycleModule;
import org.apache.druid.guice.annotations.EscalatedGlobal;
import org.apache.druid.jackson.JacksonModule;
import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.rpc.ServiceClientFactory;
import org.apache.druid.rpc.ServiceLocator;
import org.apache.druid.rpc.indexing.OverlordClient;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnit;
import org.mockito.junit.MockitoRule;
import static org.junit.Assert.assertNotNull;
public class ServiceClientModuleTest
{
private Injector injector;
@Rule
public MockitoRule mockitoRule = MockitoJUnit.rule();
@Mock
private HttpClient httpClient;
@Mock
private DruidNodeDiscoveryProvider discoveryProvider;
@Mock
private ServiceLocator serviceLocator;
@Mock
private ServiceClientFactory serviceClientFactory;
@Before
public void setUp()
{
injector = Guice.createInjector(
ImmutableList.of(
new DruidGuiceExtensions(),
new LifecycleModule(),
new JacksonModule(),
new ServiceClientModule(),
binder -> {
binder.bind(HttpClient.class).annotatedWith(EscalatedGlobal.class).toInstance(httpClient);
binder.bind(ServiceLocator.class).toInstance(serviceLocator);
binder.bind(DruidNodeDiscoveryProvider.class).toInstance(discoveryProvider);
binder.bind(ServiceClientFactory.class).toInstance(serviceClientFactory);
}
)
);
}
@Test
public void testGetServiceClientFactory()
{
assertNotNull(injector.getInstance(ServiceClientFactory.class));
}
@Test
public void testGetOverlordClient()
{
assertNotNull(injector.getInstance(OverlordClient.class));
}
@Test
public void testGetCoordinatorClient()
{
assertNotNull(injector.getInstance(CoordinatorClient.class));
}
}

View File

@ -19,12 +19,14 @@
package org.apache.druid.sql.calcite.planner;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.java.util.common.granularity.Granularity;
import javax.annotation.Nullable;
import java.util.List;
import java.util.Objects;
/**
* ExplainAttributes holds the attributes of a SQL statement that is used in the EXPLAIN PLAN result.
@ -45,6 +47,7 @@ public final class ExplainAttributes
@Nullable
private final String replaceTimeChunks;
@JsonCreator
public ExplainAttributes(
@JsonProperty("statementType") final String statementType,
@JsonProperty("targetDataSource") @Nullable final String targetDataSource,
@ -117,6 +120,29 @@ public final class ExplainAttributes
return replaceTimeChunks;
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
ExplainAttributes that = (ExplainAttributes) o;
return Objects.equals(statementType, that.statementType)
&& Objects.equals(targetDataSource, that.targetDataSource)
&& Objects.equals(partitionedBy, that.partitionedBy)
&& Objects.equals(clusteredBy, that.clusteredBy)
&& Objects.equals(replaceTimeChunks, that.replaceTimeChunks);
}
@Override
public int hashCode()
{
return Objects.hash(statementType, targetDataSource, partitionedBy, clusteredBy, replaceTimeChunks);
}
@Override
public String toString()
{

View File

@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.sql.client;
import com.google.inject.BindingAnnotation;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
@BindingAnnotation
@Target({ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface Broker
{
}

View File

@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.sql.client;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.druid.sql.http.ExplainPlan;
import org.apache.druid.sql.http.SqlQuery;
import org.apache.druid.sql.http.SqlTaskStatus;
import java.util.List;
/**
* High-level Broker client.
* <p>
* All methods return futures, enabling asynchronous logic. If you want a synchronous response, use
* {@code FutureUtils.get} or {@code FutureUtils.getUnchecked}.
* Futures resolve to exceptions in the manner described by {@link org.apache.druid.rpc.ServiceClient#asyncRequest}.
* </p>
* Typically acquired via Guice, where it is registered using {@link org.apache.druid.rpc.guice.ServiceClientModule}.
*/
public interface BrokerClient
{
/**
* Submit the given {@code sqlQuery} to the Broker's SQL task endpoint.
*/
ListenableFuture<SqlTaskStatus> submitSqlTask(SqlQuery sqlQuery);
/**
* Fetches the explain plan for the given {@code sqlQuery} from the Broker's SQL task endpoint.
*
* @param sqlQuery the SQL query for which the {@code EXPLAIN PLAN FOR} information is to be fetched
*/
ListenableFuture<List<ExplainPlan>> fetchExplainPlan(SqlQuery sqlQuery);
}

View File

@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.sql.client;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.druid.common.guava.FutureUtils;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.jackson.JacksonUtils;
import org.apache.druid.java.util.http.client.response.BytesFullResponseHandler;
import org.apache.druid.rpc.RequestBuilder;
import org.apache.druid.rpc.ServiceClient;
import org.apache.druid.sql.http.ExplainPlan;
import org.apache.druid.sql.http.SqlQuery;
import org.apache.druid.sql.http.SqlTaskStatus;
import org.jboss.netty.handler.codec.http.HttpMethod;
import java.util.List;
public class BrokerClientImpl implements BrokerClient
{
private final ServiceClient client;
private final ObjectMapper jsonMapper;
public BrokerClientImpl(final ServiceClient client, final ObjectMapper jsonMapper)
{
this.client = client;
this.jsonMapper = jsonMapper;
}
@Override
public ListenableFuture<SqlTaskStatus> submitSqlTask(final SqlQuery sqlQuery)
{
return FutureUtils.transform(
client.asyncRequest(
new RequestBuilder(HttpMethod.POST, "/druid/v2/sql/task/")
.jsonContent(jsonMapper, sqlQuery),
new BytesFullResponseHandler()
),
holder -> JacksonUtils.readValue(jsonMapper, holder.getContent(), SqlTaskStatus.class)
);
}
@Override
public ListenableFuture<List<ExplainPlan>> fetchExplainPlan(final SqlQuery sqlQuery)
{
final SqlQuery explainSqlQuery = new SqlQuery(
StringUtils.format("EXPLAIN PLAN FOR %s", sqlQuery.getQuery()),
null,
false,
false,
false,
null,
null
);
return FutureUtils.transform(
client.asyncRequest(
new RequestBuilder(HttpMethod.POST, "/druid/v2/sql/task/")
.jsonContent(jsonMapper, explainSqlQuery),
new BytesFullResponseHandler()
),
holder -> JacksonUtils.readValue(jsonMapper, holder.getContent(), new TypeReference<List<ExplainPlan>>() {})
);
}
}

View File

@ -0,0 +1,91 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.sql.guice;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.inject.Binder;
import com.google.inject.Provides;
import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
import org.apache.druid.discovery.NodeRole;
import org.apache.druid.guice.LazySingleton;
import org.apache.druid.guice.ManageLifecycle;
import org.apache.druid.guice.annotations.EscalatedGlobal;
import org.apache.druid.guice.annotations.Json;
import org.apache.druid.initialization.DruidModule;
import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.rpc.DiscoveryServiceLocator;
import org.apache.druid.rpc.ServiceClientFactory;
import org.apache.druid.rpc.ServiceLocator;
import org.apache.druid.rpc.StandardRetryPolicy;
import org.apache.druid.rpc.guice.ServiceClientModule;
import org.apache.druid.sql.client.Broker;
import org.apache.druid.sql.client.BrokerClient;
import org.apache.druid.sql.client.BrokerClientImpl;
/**
* Module that processes can install if they require a {@link BrokerClient}.
* <p>
* Similar to {@link ServiceClientModule}, but since {@link BrokerClient} depends
* on classes from the sql module, this is a separate module within the sql package.
* </p>
*/
public class BrokerServiceModule implements DruidModule
{
@Override
public void configure(Binder binder)
{
// Nothing to do.
}
@Provides
@LazySingleton
@EscalatedGlobal
public ServiceClientFactory getServiceClientFactory(@EscalatedGlobal final HttpClient httpClient)
{
return ServiceClientModule.makeServiceClientFactory(httpClient);
}
@Provides
@ManageLifecycle
@Broker
public ServiceLocator makeBrokerServiceLocator(final DruidNodeDiscoveryProvider discoveryProvider)
{
return new DiscoveryServiceLocator(discoveryProvider, NodeRole.BROKER);
}
@Provides
@LazySingleton
public BrokerClient makeBrokerClient(
@Json final ObjectMapper jsonMapper,
@EscalatedGlobal final ServiceClientFactory clientFactory,
@Broker final ServiceLocator serviceLocator
)
{
return new BrokerClientImpl(
clientFactory.makeClient(
NodeRole.BROKER.getJsonName(),
serviceLocator,
StandardRetryPolicy.builder().maxAttempts(ServiceClientModule.CLIENT_MAX_ATTEMPTS).build()
),
jsonMapper
);
}
}

View File

@ -0,0 +1,117 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.sql.http;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.JsonDeserializer;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import org.apache.druid.sql.calcite.planner.ExplainAttributes;
import java.io.IOException;
import java.util.Objects;
/**
* Class that encapsulates the information of a single plan for an {@code EXPLAIN PLAN FOR} query.
* <p>
* Similar to {@link #getAttributes()}, it's possible to provide more structure to {@link #getPlan()},
* at least for the native query explain, but there's currently no use case for it.
* </p>
*/
public class ExplainPlan
{
@JsonProperty("PLAN")
private final String plan;
@JsonProperty("RESOURCES")
private final String resources;
@JsonProperty("ATTRIBUTES")
@JsonDeserialize(using = ExplainAttributesDeserializer.class)
private final ExplainAttributes attributes;
@JsonCreator
public ExplainPlan(
@JsonProperty("PLAN") final String plan,
@JsonProperty("RESOURCES") final String resources,
@JsonProperty("ATTRIBUTES") final ExplainAttributes attributes
)
{
this.plan = plan;
this.resources = resources;
this.attributes = attributes;
}
public String getPlan()
{
return plan;
}
public String getResources()
{
return resources;
}
public ExplainAttributes getAttributes()
{
return attributes;
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
ExplainPlan that = (ExplainPlan) o;
return Objects.equals(plan, that.plan)
&& Objects.equals(resources, that.resources)
&& Objects.equals(attributes, that.attributes);
}
@Override
public int hashCode()
{
return Objects.hash(plan, resources, attributes);
}
/**
* Custom deserializer for {@link ExplainAttributes} because the value for {@link #attributes} in the plan
* is encoded as a JSON string. This deserializer tells Jackson on how to parse the JSON string
* and map it to the fields in the {@link ExplainAttributes} class.
*/
private static class ExplainAttributesDeserializer extends JsonDeserializer<ExplainAttributes>
{
@Override
public ExplainAttributes deserialize(JsonParser jsonParser, DeserializationContext context) throws IOException
{
final ObjectMapper objectMapper = (ObjectMapper) jsonParser.getCodec();
return objectMapper.readValue(jsonParser.getText(), ExplainAttributes.class);
}
}
}

View File

@ -19,8 +19,8 @@
package org.apache.druid.sql.calcite.planner;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.error.DruidException;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.junit.Assert;
@ -28,14 +28,16 @@ import org.junit.Test;
import java.util.Arrays;
import static org.junit.Assert.assertEquals;
public class ExplainAttributesTest
{
private static final ObjectMapper DEFAULT_OBJECT_MAPPER = new DefaultObjectMapper();
private static final ObjectMapper MAPPER = new DefaultObjectMapper();
@Test
public void testSimpleGetters()
public void testGetters()
{
ExplainAttributes selectAttributes = new ExplainAttributes("SELECT", null, null, null, null);
final ExplainAttributes selectAttributes = new ExplainAttributes("SELECT", null, null, null, null);
Assert.assertEquals("SELECT", selectAttributes.getStatementType());
Assert.assertNull(selectAttributes.getTargetDataSource());
Assert.assertNull(selectAttributes.getPartitionedBy());
@ -44,9 +46,9 @@ public class ExplainAttributesTest
}
@Test
public void testSerializeSelectAttributes() throws JsonProcessingException
public void testSerdeOfSelectAttributes()
{
ExplainAttributes selectAttributes = new ExplainAttributes(
final ExplainAttributes selectAttributes = new ExplainAttributes(
"SELECT",
null,
null,
@ -56,13 +58,14 @@ public class ExplainAttributesTest
final String expectedAttributes = "{"
+ "\"statementType\":\"SELECT\""
+ "}";
Assert.assertEquals(expectedAttributes, DEFAULT_OBJECT_MAPPER.writeValueAsString(selectAttributes));
testSerde(selectAttributes, expectedAttributes);
}
@Test
public void testSerializeInsertAttributes() throws JsonProcessingException
public void testSerdeOfInsertAttributes()
{
ExplainAttributes insertAttributes = new ExplainAttributes(
final ExplainAttributes insertAttributes = new ExplainAttributes(
"INSERT",
"foo",
Granularities.DAY,
@ -74,13 +77,13 @@ public class ExplainAttributesTest
+ "\"targetDataSource\":\"foo\","
+ "\"partitionedBy\":\"DAY\""
+ "}";
Assert.assertEquals(expectedAttributes, DEFAULT_OBJECT_MAPPER.writeValueAsString(insertAttributes));
testSerde(insertAttributes, expectedAttributes);
}
@Test
public void testSerializeInsertAllAttributes() throws JsonProcessingException
public void testSerdeOfInsertAllAttributes()
{
ExplainAttributes insertAttributes = new ExplainAttributes(
final ExplainAttributes insertAttributes = new ExplainAttributes(
"INSERT",
"foo",
Granularities.ALL,
@ -92,78 +95,100 @@ public class ExplainAttributesTest
+ "\"targetDataSource\":\"foo\","
+ "\"partitionedBy\":{\"type\":\"all\"}"
+ "}";
Assert.assertEquals(expectedAttributes, DEFAULT_OBJECT_MAPPER.writeValueAsString(insertAttributes));
testSerde(insertAttributes, expectedAttributes);
}
@Test
public void testSerializeReplaceAttributes() throws JsonProcessingException
public void testSerdeOfReplaceAttributes()
{
ExplainAttributes replaceAttributes1 = new ExplainAttributes(
final ExplainAttributes replaceAttributes = new ExplainAttributes(
"REPLACE",
"foo",
Granularities.HOUR,
null,
"ALL"
);
final String expectedAttributes1 = "{"
final String expectedAttributes = "{"
+ "\"statementType\":\"REPLACE\","
+ "\"targetDataSource\":\"foo\","
+ "\"partitionedBy\":\"HOUR\","
+ "\"replaceTimeChunks\":\"ALL\""
+ "}";
Assert.assertEquals(expectedAttributes1, DEFAULT_OBJECT_MAPPER.writeValueAsString(replaceAttributes1));
testSerde(replaceAttributes, expectedAttributes);
}
ExplainAttributes replaceAttributes2 = new ExplainAttributes(
@Test
public void testSerdeOfReplaceAttributesWithTimeChunks()
{
final ExplainAttributes replaceAttributes = new ExplainAttributes(
"REPLACE",
"foo",
Granularities.HOUR,
null,
"2019-08-25T02:00:00.000Z/2019-08-25T03:00:00.000Z"
);
final String expectedAttributes2 = "{"
final String expectedAttributes = "{"
+ "\"statementType\":\"REPLACE\","
+ "\"targetDataSource\":\"foo\","
+ "\"partitionedBy\":\"HOUR\","
+ "\"replaceTimeChunks\":\"2019-08-25T02:00:00.000Z/2019-08-25T03:00:00.000Z\""
+ "}";
Assert.assertEquals(expectedAttributes2, DEFAULT_OBJECT_MAPPER.writeValueAsString(replaceAttributes2));
testSerde(replaceAttributes, expectedAttributes);
}
@Test
public void testSerializeReplaceWithClusteredByAttributes() throws JsonProcessingException
public void testReplaceAttributesWithClusteredBy()
{
ExplainAttributes replaceAttributes1 = new ExplainAttributes(
final ExplainAttributes replaceAttributes = new ExplainAttributes(
"REPLACE",
"foo",
Granularities.HOUR,
Arrays.asList("foo", "CEIL(`f2`)"),
"ALL"
);
final String expectedAttributes1 = "{"
final String expectedAttributes = "{"
+ "\"statementType\":\"REPLACE\","
+ "\"targetDataSource\":\"foo\","
+ "\"partitionedBy\":\"HOUR\","
+ "\"clusteredBy\":[\"foo\",\"CEIL(`f2`)\"],"
+ "\"replaceTimeChunks\":\"ALL\""
+ "}";
Assert.assertEquals(expectedAttributes1, DEFAULT_OBJECT_MAPPER.writeValueAsString(replaceAttributes1));
testSerde(replaceAttributes, expectedAttributes);
}
ExplainAttributes replaceAttributes2 = new ExplainAttributes(
@Test
public void testReplaceAttributesWithClusteredByAndTimeChunks()
{
final ExplainAttributes replaceAttributes = new ExplainAttributes(
"REPLACE",
"foo",
Granularities.HOUR,
Arrays.asList("foo", "boo"),
"2019-08-25T02:00:00.000Z/2019-08-25T03:00:00.000Z"
);
final String expectedAttributes2 = "{"
final String expectedAttributes = "{"
+ "\"statementType\":\"REPLACE\","
+ "\"targetDataSource\":\"foo\","
+ "\"partitionedBy\":\"HOUR\","
+ "\"clusteredBy\":[\"foo\",\"boo\"],"
+ "\"replaceTimeChunks\":\"2019-08-25T02:00:00.000Z/2019-08-25T03:00:00.000Z\""
+ "}";
Assert.assertEquals(expectedAttributes2, DEFAULT_OBJECT_MAPPER.writeValueAsString(replaceAttributes2));
testSerde(replaceAttributes, expectedAttributes);
}
private void testSerde(final ExplainAttributes explainAttributes, final String expectedSerializedAttributes)
{
final ExplainAttributes observedAttributes;
try {
final String observedSerializedAttributes = MAPPER.writeValueAsString(explainAttributes);
assertEquals(expectedSerializedAttributes, observedSerializedAttributes);
observedAttributes = MAPPER.readValue(observedSerializedAttributes, ExplainAttributes.class);
}
catch (Exception e) {
throw DruidException.defensive(e, "Error serializing/deserializing explain plan[%s].", explainAttributes);
}
assertEquals(explainAttributes, observedAttributes);
}
}

View File

@ -0,0 +1,148 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.sql.client;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.calcite.avatica.SqlType;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.rpc.MockServiceClient;
import org.apache.druid.rpc.RequestBuilder;
import org.apache.druid.sql.calcite.planner.ExplainAttributes;
import org.apache.druid.sql.http.ExplainPlan;
import org.apache.druid.sql.http.ResultFormat;
import org.apache.druid.sql.http.SqlParameter;
import org.apache.druid.sql.http.SqlQuery;
import org.apache.druid.sql.http.SqlTaskStatus;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.MediaType;
import java.util.List;
import java.util.Map;
import static org.junit.Assert.assertEquals;
public class BrokerClientImplTest
{
private ObjectMapper jsonMapper;
private MockServiceClient serviceClient;
private BrokerClient brokerClient;
@Before
public void setup()
{
jsonMapper = new DefaultObjectMapper();
serviceClient = new MockServiceClient();
brokerClient = new BrokerClientImpl(serviceClient, jsonMapper);
}
@After
public void tearDown()
{
serviceClient.verify();
}
@Test
public void testSubmitSqlTask() throws Exception
{
final SqlQuery query = new SqlQuery(
"REPLACE INTO foo OVERWRITE ALL SELECT * FROM bar PARTITIONED BY ALL",
ResultFormat.ARRAY,
true,
true,
true,
ImmutableMap.of("useCache", false),
ImmutableList.of(new SqlParameter(SqlType.INTEGER, 1))
);
final SqlTaskStatus taskStatus = new SqlTaskStatus("taskId1", TaskState.RUNNING, null);
serviceClient.expectAndRespond(
new RequestBuilder(HttpMethod.POST, "/druid/v2/sql/task/")
.jsonContent(jsonMapper, query),
HttpResponseStatus.OK,
ImmutableMap.of(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON),
jsonMapper.writeValueAsBytes(taskStatus)
);
assertEquals(taskStatus, brokerClient.submitSqlTask(query).get());
}
@Test
public void testFetchExplainPlan() throws Exception
{
final SqlQuery query = new SqlQuery(
"REPLACE INTO foo OVERWRITE ALL SELECT * FROM bar PARTITIONED BY ALL",
ResultFormat.ARRAY,
true,
true,
true,
ImmutableMap.of("useCache", false),
ImmutableList.of(new SqlParameter(SqlType.INTEGER, 1))
);
final SqlQuery explainQuery = new SqlQuery(
StringUtils.format("EXPLAIN PLAN FOR %s", query.getQuery()),
null,
false,
false,
false,
null,
null
);
final String plan = "[{\"query\":{\"queryType\":\"scan\",\"dataSource\":{\"type\":\"external\",\"inputSource\":{\"type\":\"inline\",\"data\":\"a,b,1\\nc,d,2\\n\"},\"inputFormat\":{\"type\":\"csv\",\"columns\":[\"x\",\"y\",\"z\"]},\"signature\":[{\"name\":\"x\",\"type\":\"STRING\"},{\"name\":\"y\",\"type\":\"STRING\"},{\"name\":\"z\",\"type\":\"LONG\"}]},\"intervals\":{\"type\":\"intervals\",\"intervals\":[\"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z\"]},\"resultFormat\":\"compactedList\",\"columns\":[\"x\",\"y\",\"z\"],\"context\":{\"defaultTimeout\":300000,\"maxScatterGatherBytes\":9223372036854775807,\"sqlCurrentTimestamp\":\"2000-01-01T00:00:00Z\",\"sqlInsertSegmentGranularity\":\"{\\\"type\\\":\\\"all\\\"}\",\"sqlQueryId\":\"dummy\",\"sqlReplaceTimeChunks\":\"all\",\"vectorize\":\"false\",\"vectorizeVirtualColumns\":\"false\"},\"columnTypes\":[\"STRING\",\"STRING\",\"LONG\"],\"granularity\":{\"type\":\"all\"},\"legacy\":false},\"signature\":[{\"name\":\"x\",\"type\":\"STRING\"},{\"name\":\"y\",\"type\":\"STRING\"},{\"name\":\"z\",\"type\":\"LONG\"}],\"columnMappings\":[{\"queryColumn\":\"x\",\"outputColumn\":\"x\"},{\"queryColumn\":\"y\",\"outputColumn\":\"y\"},{\"queryColumn\":\"z\",\"outputColumn\":\"z\"}]}]";
final String resources = "[{\"name\":\"EXTERNAL\",\"type\":\"EXTERNAL\"},{\"name\":\"dst\",\"type\":\"DATASOURCE\"}]";
final ExplainAttributes attributes = new ExplainAttributes("REPLACE", "foo", Granularities.ALL, null, "all");
final List<Map<String, Object>> givenPlans = ImmutableList.of(
ImmutableMap.of(
"PLAN",
plan,
"RESOURCES",
resources,
"ATTRIBUTES",
jsonMapper.writeValueAsString(attributes)
)
);
serviceClient.expectAndRespond(
new RequestBuilder(HttpMethod.POST, "/druid/v2/sql/task/")
.jsonContent(jsonMapper, explainQuery),
HttpResponseStatus.OK,
ImmutableMap.of(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON),
jsonMapper.writeValueAsBytes(givenPlans)
);
assertEquals(
ImmutableList.of(new ExplainPlan(plan, resources, attributes)),
brokerClient.fetchExplainPlan(query).get()
);
}
}

View File

@ -0,0 +1,114 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.sql.guice;
import com.google.common.collect.ImmutableList;
import com.google.inject.ConfigurationException;
import com.google.inject.Guice;
import com.google.inject.Injector;
import org.apache.druid.client.coordinator.CoordinatorClient;
import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
import org.apache.druid.guice.DruidGuiceExtensions;
import org.apache.druid.guice.LifecycleModule;
import org.apache.druid.guice.annotations.EscalatedGlobal;
import org.apache.druid.jackson.JacksonModule;
import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.rpc.ServiceClientFactory;
import org.apache.druid.rpc.ServiceLocator;
import org.apache.druid.rpc.indexing.OverlordClient;
import org.apache.druid.sql.client.BrokerClient;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnit;
import org.mockito.junit.MockitoRule;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThrows;
public class BrokerServiceModuleTest
{
private Injector injector;
@Rule
public MockitoRule mockitoRule = MockitoJUnit.rule();
@Mock
private HttpClient httpClient;
@Mock
private DruidNodeDiscoveryProvider discoveryProvider;
@Mock
private ServiceLocator serviceLocator;
@Mock
private ServiceClientFactory serviceClientFactory;
@Before
public void setUp()
{
injector = Guice.createInjector(
ImmutableList.of(
new DruidGuiceExtensions(),
new LifecycleModule(),
new JacksonModule(),
new BrokerServiceModule(),
binder -> {
binder.bind(HttpClient.class).annotatedWith(EscalatedGlobal.class).toInstance(httpClient);
binder.bind(ServiceLocator.class).toInstance(serviceLocator);
binder.bind(DruidNodeDiscoveryProvider.class).toInstance(discoveryProvider);
binder.bind(ServiceClientFactory.class).toInstance(serviceClientFactory);
}
)
);
}
@Test
public void testGetServiceClientFactory()
{
assertNotNull(injector.getInstance(ServiceClientFactory.class));
}
@Test
public void testGetBrokerClient()
{
assertNotNull(injector.getInstance(BrokerClient.class));
}
@Test
public void testGetCoordinatorClient()
{
assertThrows(
ConfigurationException.class,
() -> injector.getInstance(CoordinatorClient.class)
);
}
@Test
public void testGetOverlordClient()
{
assertThrows(
ConfigurationException.class,
() -> injector.getInstance(OverlordClient.class)
);
}
}

View File

@ -0,0 +1,122 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.sql.http;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.error.DruidException;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.sql.calcite.planner.ExplainAttributes;
import org.junit.Test;
import java.util.List;
import java.util.Map;
import static org.junit.Assert.assertEquals;
public class ExplainPlanTest
{
private static final ObjectMapper MAPPER = new DefaultObjectMapper();
@Test
public void testExplainPlanSerdeForSelectQuery() throws JsonProcessingException
{
final String plan = "[{\"query\":{\"queryType\":\"scan\",\"dataSource\":{\"type\":\"inline\",\"columnNames\":[\"EXPR$0\"],\"columnTypes\":[\"LONG\"],\"rows\":[[2]]},\"intervals\":{\"type\":\"intervals\",\"intervals\":[\"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z\"]},\"resultFormat\":\"compactedList\",\"columns\":[\"EXPR$0\"],\"context\":{\"defaultTimeout\":300000,\"maxScatterGatherBytes\":9223372036854775807,\"sqlCurrentTimestamp\":\"2000-01-01T00:00:00Z\",\"sqlQueryId\":\"dummy\",\"vectorize\":\"false\",\"vectorizeVirtualColumns\":\"false\"},\"columnTypes\":[\"LONG\"],\"granularity\":{\"type\":\"all\"},\"legacy\":false},\"signature\":[{\"name\":\"EXPR$0\",\"type\":\"LONG\"}],\"columnMappings\":[{\"queryColumn\":\"EXPR$0\",\"outputColumn\":\"EXPR$0\"}]}]";
final String resources = "[{\"name\":\"foo\",\"type\":\"DATASOURCE\"}]";
final ExplainAttributes attributes = new ExplainAttributes("SELECT", null, null, null, null);
final List<Map<String, Object>> givenPlans = ImmutableList.of(
ImmutableMap.of(
"PLAN",
plan,
"RESOURCES",
resources,
"ATTRIBUTES",
MAPPER.writeValueAsString(attributes)
)
);
testSerde(givenPlans, ImmutableList.of(new ExplainPlan(plan, resources, attributes)));
}
@Test
public void testExplainPlanSerdeForReplaceQuery() throws JsonProcessingException
{
final String plan = "[{\"query\":{\"queryType\":\"scan\",\"dataSource\":{\"type\":\"external\",\"inputSource\":{\"type\":\"inline\",\"data\":\"a,b,1\\nc,d,2\\n\"},\"inputFormat\":{\"type\":\"csv\",\"columns\":[\"x\",\"y\",\"z\"]},\"signature\":[{\"name\":\"x\",\"type\":\"STRING\"},{\"name\":\"y\",\"type\":\"STRING\"},{\"name\":\"z\",\"type\":\"LONG\"}]},\"intervals\":{\"type\":\"intervals\",\"intervals\":[\"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z\"]},\"resultFormat\":\"compactedList\",\"columns\":[\"x\",\"y\",\"z\"],\"context\":{\"defaultTimeout\":300000,\"maxScatterGatherBytes\":9223372036854775807,\"sqlCurrentTimestamp\":\"2000-01-01T00:00:00Z\",\"sqlInsertSegmentGranularity\":\"{\\\"type\\\":\\\"all\\\"}\",\"sqlQueryId\":\"dummy\",\"sqlReplaceTimeChunks\":\"all\",\"vectorize\":\"false\",\"vectorizeVirtualColumns\":\"false\"},\"columnTypes\":[\"STRING\",\"STRING\",\"LONG\"],\"granularity\":{\"type\":\"all\"},\"legacy\":false},\"signature\":[{\"name\":\"x\",\"type\":\"STRING\"},{\"name\":\"y\",\"type\":\"STRING\"},{\"name\":\"z\",\"type\":\"LONG\"}],\"columnMappings\":[{\"queryColumn\":\"x\",\"outputColumn\":\"x\"},{\"queryColumn\":\"y\",\"outputColumn\":\"y\"},{\"queryColumn\":\"z\",\"outputColumn\":\"z\"}]}]";
final String resources = "[{\"name\":\"EXTERNAL\",\"type\":\"EXTERNAL\"},{\"name\":\"dst\",\"type\":\"DATASOURCE\"}]";
final ExplainAttributes attributes = new ExplainAttributes("REPLACE", "dst", Granularities.ALL, null, "all");
final List<Map<String, Object>> givenPlans = ImmutableList.of(
ImmutableMap.of(
"PLAN",
plan,
"RESOURCES",
resources,
"ATTRIBUTES",
MAPPER.writeValueAsString(attributes)
)
);
testSerde(givenPlans, ImmutableList.of(new ExplainPlan(plan, resources, attributes)));
}
@Test
public void testExplainPlanSerdeForInsertQuery() throws JsonProcessingException
{
final String plan = "[{\"query\":{\"queryType\":\"scan\",\"dataSource\":{\"type\":\"external\",\"inputSource\":{\"type\":\"inline\",\"data\":\"a,b,1\\nc,d,2\\n\"},\"inputFormat\":{\"type\":\"csv\",\"columns\":[\"x\",\"y\",\"z\"]},\"signature\":[{\"name\":\"x\",\"type\":\"STRING\"},{\"name\":\"y\",\"type\":\"STRING\"},{\"name\":\"z\",\"type\":\"LONG\"}]},\"intervals\":{\"type\":\"intervals\",\"intervals\":[\"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z\"]},\"resultFormat\":\"compactedList\",\"columns\":[\"x\",\"y\",\"z\"],\"context\":{\"defaultTimeout\":300000,\"maxScatterGatherBytes\":9223372036854775807,\"sqlCurrentTimestamp\":\"2000-01-01T00:00:00Z\",\"sqlInsertSegmentGranularity\":\"{\\\"type\\\":\\\"all\\\"}\",\"sqlQueryId\":\"dummy\",\"sqlReplaceTimeChunks\":\"all\",\"vectorize\":\"false\",\"vectorizeVirtualColumns\":\"false\"},\"columnTypes\":[\"STRING\",\"STRING\",\"LONG\"],\"granularity\":{\"type\":\"all\"},\"legacy\":false},\"signature\":[{\"name\":\"x\",\"type\":\"STRING\"},{\"name\":\"y\",\"type\":\"STRING\"},{\"name\":\"z\",\"type\":\"LONG\"}],\"columnMappings\":[{\"queryColumn\":\"x\",\"outputColumn\":\"x\"},{\"queryColumn\":\"y\",\"outputColumn\":\"y\"},{\"queryColumn\":\"z\",\"outputColumn\":\"z\"}]}]";
final String resources = "[{\"name\":\"dst\",\"type\":\"DATASOURCE\"},{\"name\":\"foo\",\"type\":\"DATASOURCE\"}]";
final ExplainAttributes attributes = new ExplainAttributes("INSERT", "foo", Granularities.DAY, ImmutableList.of("floor_m1", "dim1", "CEIL(\"m2\")"), null);
final List<Map<String, Object>> givenPlans = ImmutableList.of(
ImmutableMap.of(
"PLAN",
plan,
"RESOURCES",
resources,
"ATTRIBUTES",
MAPPER.writeValueAsString(attributes)
)
);
testSerde(givenPlans, ImmutableList.of(new ExplainPlan(plan, resources, attributes)));
}
private void testSerde(
final List<Map<String, Object>> givenPlans,
final List<ExplainPlan> expectedExplainPlans
)
{
final List<ExplainPlan> observedExplainPlans;
try {
observedExplainPlans = MAPPER.readValue(
MAPPER.writeValueAsString(givenPlans),
new TypeReference<List<ExplainPlan>>() {}
);
}
catch (Exception e) {
throw DruidException.defensive(e, "Error deserializing given plans[%s] into explain plans.", givenPlans);
}
assertEquals(expectedExplainPlans, observedExplainPlans);
}
}