From 100fa46e396ab0f68da6c4bef80951f6b996657e Mon Sep 17 00:00:00 2001 From: Surekha Saharan Date: Tue, 2 Oct 2018 16:25:56 -0700 Subject: [PATCH] Added LeaderClient interface and NoopDruidLeaderClient class --- .../druid/benchmark/query/SqlBenchmark.java | 5 +- .../druid/discovery/DruidLeaderClient.java | 7 +- .../apache/druid/discovery/LeaderClient.java | 59 +++++++++++++++++ .../discovery/NoopDruidLeaderClient.java | 66 +++++++++++++++++++ .../druid/sql/calcite/planner/Calcites.java | 6 +- .../sql/calcite/planner/PlannerFactory.java | 10 +-- .../sql/calcite/schema/SystemSchema.java | 22 +++---- 7 files changed, 153 insertions(+), 22 deletions(-) create mode 100644 server/src/main/java/org/apache/druid/discovery/LeaderClient.java create mode 100644 server/src/main/java/org/apache/druid/discovery/NoopDruidLeaderClient.java diff --git a/benchmarks/src/main/java/org/apache/druid/benchmark/query/SqlBenchmark.java b/benchmarks/src/main/java/org/apache/druid/benchmark/query/SqlBenchmark.java index 5c82513a3fb..c7f91284cb2 100644 --- a/benchmarks/src/main/java/org/apache/druid/benchmark/query/SqlBenchmark.java +++ b/benchmarks/src/main/java/org/apache/druid/benchmark/query/SqlBenchmark.java @@ -27,7 +27,8 @@ import org.apache.druid.benchmark.datagen.BenchmarkSchemas; import org.apache.druid.benchmark.datagen.SegmentGenerator; import org.apache.druid.client.TimelineServerView; import org.apache.druid.data.input.Row; -import org.apache.druid.discovery.DruidLeaderClient; +import org.apache.druid.discovery.LeaderClient; +import org.apache.druid.discovery.NoopDruidLeaderClient; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.granularity.Granularities; @@ -114,7 +115,7 @@ public class SqlBenchmark .createQueryRunnerFactoryConglomerate(); final QueryRunnerFactoryConglomerate conglomerate = conglomerateCloserPair.lhs; final PlannerConfig plannerConfig = new PlannerConfig(); - final DruidLeaderClient druidLeaderClient = EasyMock.createMock(DruidLeaderClient.class); + final LeaderClient druidLeaderClient = new NoopDruidLeaderClient(); this.walker = new SpecificSegmentsQuerySegmentWalker(conglomerate).add(dataSegment, index); plannerFactory = new PlannerFactory( diff --git a/server/src/main/java/org/apache/druid/discovery/DruidLeaderClient.java b/server/src/main/java/org/apache/druid/discovery/DruidLeaderClient.java index bde95f18137..7d954a03eff 100644 --- a/server/src/main/java/org/apache/druid/discovery/DruidLeaderClient.java +++ b/server/src/main/java/org/apache/druid/discovery/DruidLeaderClient.java @@ -59,7 +59,7 @@ import java.util.concurrent.atomic.AtomicReference; * request.setXXX(..) * FullResponseHolder responseHolder = druidLeaderClient.go(request) */ -public class DruidLeaderClient +public class DruidLeaderClient implements LeaderClient { private final Logger log = new Logger(DruidLeaderClient.class); @@ -93,6 +93,7 @@ public class DruidLeaderClient this.serverDiscoverySelector = serverDiscoverySelector; } + @Override @LifecycleStart public void start() { @@ -110,6 +111,7 @@ public class DruidLeaderClient } } + @Override @LifecycleStop public void stop() { @@ -123,6 +125,7 @@ public class DruidLeaderClient /** * Make a Request object aimed at the leader. Throws IOException if the leader cannot be located. */ + @Override public Request makeRequest(HttpMethod httpMethod, String urlPath) throws IOException { Preconditions.checkState(lifecycleLock.awaitStarted(1, TimeUnit.MILLISECONDS)); @@ -137,6 +140,7 @@ public class DruidLeaderClient /** * Executes the request object aimed at the leader and process the response with given handler */ + @Override public ListenableFuture goAsync( final Request request, final HttpResponseHandler handler @@ -148,6 +152,7 @@ public class DruidLeaderClient /** * Executes a Request object aimed at the leader. Throws IOException if the leader cannot be located. */ + @Override public FullResponseHolder go( Request request, HttpResponseHandler responseHandler diff --git a/server/src/main/java/org/apache/druid/discovery/LeaderClient.java b/server/src/main/java/org/apache/druid/discovery/LeaderClient.java new file mode 100644 index 00000000000..aa56f6a9636 --- /dev/null +++ b/server/src/main/java/org/apache/druid/discovery/LeaderClient.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.druid.discovery; + +import com.google.common.util.concurrent.ListenableFuture; +import org.apache.druid.java.util.http.client.Request; +import org.apache.druid.java.util.http.client.response.FullResponseHolder; +import org.apache.druid.java.util.http.client.response.HttpResponseHandler; +import org.jboss.netty.handler.codec.http.HttpMethod; + +import java.io.IOException; + +/** + * This is used to facilitate interaction with Coordinator/Overlord leader nodes. Instance of this interface is injected + * via Guice with annotations @Coordinator or @IndexingService . + */ +public interface LeaderClient +{ + + void start(); + + void stop(); + + /** + * Make a Request object aimed at the leader. Throws IOException if the leader cannot be located. + */ + Request makeRequest(HttpMethod httpMethod, String urlPath) throws IOException; + + /** + * Executes a Request object aimed at the leader. Throws IOException if the leader cannot be located. + */ + FullResponseHolder go(Request request, HttpResponseHandler responseHandler) + throws IOException, InterruptedException; + + /** + * Executes the request object aimed at the leader and process the response with given handler + */ + ListenableFuture goAsync( + Request request, + HttpResponseHandler handler + ); + +} diff --git a/server/src/main/java/org/apache/druid/discovery/NoopDruidLeaderClient.java b/server/src/main/java/org/apache/druid/discovery/NoopDruidLeaderClient.java new file mode 100644 index 00000000000..d562487ce7e --- /dev/null +++ b/server/src/main/java/org/apache/druid/discovery/NoopDruidLeaderClient.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.druid.discovery; + +import com.google.common.util.concurrent.ListenableFuture; +import org.apache.druid.java.util.http.client.Request; +import org.apache.druid.java.util.http.client.response.FullResponseHolder; +import org.apache.druid.java.util.http.client.response.HttpResponseHandler; +import org.jboss.netty.handler.codec.http.HttpMethod; + +public class NoopDruidLeaderClient implements LeaderClient +{ + + @Override + public void start() + { + //Noop + } + + @Override + public void stop() + { + //Noop + } + + @Override + public Request makeRequest( + HttpMethod httpMethod, String urlPath + ) + { + throw new UnsupportedOperationException(); + } + + @Override + public FullResponseHolder go( + Request request, HttpResponseHandler responseHandler + ) + { + throw new UnsupportedOperationException(); + } + + @Override + public ListenableFuture goAsync( + Request request, HttpResponseHandler handler + ) + { + throw new UnsupportedOperationException(); + } + +} diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/Calcites.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/Calcites.java index 8096ddd4910..905ad85b1e4 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/Calcites.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/Calcites.java @@ -37,7 +37,7 @@ import org.apache.calcite.util.DateString; import org.apache.calcite.util.TimeString; import org.apache.calcite.util.TimestampString; import org.apache.druid.client.TimelineServerView; -import org.apache.druid.discovery.DruidLeaderClient; +import org.apache.druid.discovery.LeaderClient; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.ISE; @@ -105,8 +105,8 @@ public class Calcites final TimelineServerView serverView, final DruidSchema druidSchema, final AuthorizerMapper authorizerMapper, - final DruidLeaderClient coordinatorDruidLeaderClient, - final DruidLeaderClient overlordDruidLeaderClient, + final LeaderClient coordinatorDruidLeaderClient, + final LeaderClient overlordDruidLeaderClient, final ObjectMapper jsonMapper ) { diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerFactory.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerFactory.java index 4c203964753..59e694d3387 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerFactory.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerFactory.java @@ -38,7 +38,7 @@ import org.apache.calcite.tools.Frameworks; import org.apache.druid.client.TimelineServerView; import org.apache.druid.client.coordinator.Coordinator; import org.apache.druid.client.indexing.IndexingService; -import org.apache.druid.discovery.DruidLeaderClient; +import org.apache.druid.discovery.LeaderClient; import org.apache.druid.guice.annotations.Json; import org.apache.druid.math.expr.ExprMacroTable; import org.apache.druid.server.QueryLifecycleFactory; @@ -68,8 +68,8 @@ public class PlannerFactory private final PlannerConfig plannerConfig; private final ObjectMapper jsonMapper; private final AuthorizerMapper authorizerMapper; - private final DruidLeaderClient coordinatorDruidLeaderClient; - private final DruidLeaderClient overlordDruidLeaderClient; + private final LeaderClient coordinatorDruidLeaderClient; + private final LeaderClient overlordDruidLeaderClient; @Inject public PlannerFactory( @@ -81,8 +81,8 @@ public class PlannerFactory final PlannerConfig plannerConfig, final AuthorizerMapper authorizerMapper, final @Json ObjectMapper jsonMapper, - final @Coordinator DruidLeaderClient coordinatorDruidLeaderClient, - final @IndexingService DruidLeaderClient overlordDruidLeaderClient + final @Coordinator LeaderClient coordinatorDruidLeaderClient, + final @IndexingService LeaderClient overlordDruidLeaderClient ) { this.druidSchema = druidSchema; diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java index d782481af77..5e74a0ef11f 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java @@ -25,7 +25,6 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Function; import com.google.common.base.Preconditions; import com.google.common.collect.FluentIterable; -import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; import com.google.common.util.concurrent.ListenableFuture; @@ -46,7 +45,7 @@ import org.apache.druid.client.JsonParserIterator; import org.apache.druid.client.TimelineServerView; import org.apache.druid.client.coordinator.Coordinator; import org.apache.druid.client.indexing.IndexingService; -import org.apache.druid.discovery.DruidLeaderClient; +import org.apache.druid.discovery.LeaderClient; import org.apache.druid.indexer.TaskStatusPlus; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.StringUtils; @@ -62,6 +61,7 @@ import org.apache.druid.server.security.Action; import org.apache.druid.server.security.AuthenticationResult; import org.apache.druid.server.security.AuthorizationUtils; import org.apache.druid.server.security.AuthorizerMapper; +import org.apache.druid.server.security.ForbiddenException; import org.apache.druid.server.security.Resource; import org.apache.druid.server.security.ResourceAction; import org.apache.druid.server.security.ResourceType; @@ -158,8 +158,8 @@ public class SystemSchema extends AbstractSchema final DruidSchema druidSchema, final TimelineServerView serverView, final AuthorizerMapper authorizerMapper, - final @Coordinator DruidLeaderClient coordinatorDruidLeaderClient, - final @IndexingService DruidLeaderClient overlordDruidLeaderClient, + final @Coordinator LeaderClient coordinatorDruidLeaderClient, + final @IndexingService LeaderClient overlordDruidLeaderClient, final ObjectMapper jsonMapper ) { @@ -182,14 +182,14 @@ public class SystemSchema extends AbstractSchema static class SegmentsTable extends AbstractTable implements ScannableTable { private final DruidSchema druidSchema; - private final DruidLeaderClient druidLeaderClient; + private final LeaderClient druidLeaderClient; private final ObjectMapper jsonMapper; private final BytesAccumulatingResponseHandler responseHandler; private final AuthorizerMapper authorizerMapper; public SegmentsTable( DruidSchema druidSchemna, - DruidLeaderClient druidLeaderClient, + LeaderClient druidLeaderClient, ObjectMapper jsonMapper, BytesAccumulatingResponseHandler responseHandler, AuthorizerMapper authorizerMapper @@ -343,7 +343,7 @@ public class SystemSchema extends AbstractSchema // Note that coordinator must be up to get segments private static JsonParserIterator getMetadataSegments( - DruidLeaderClient coordinatorClient, + LeaderClient coordinatorClient, ObjectMapper jsonMapper, BytesAccumulatingResponseHandler responseHandler ) @@ -424,7 +424,7 @@ public class SystemSchema extends AbstractSchema authorizerMapper ); if (!access.isAllowed()) { - return Linq4j.asEnumerable(ImmutableList.of()); + throw new ForbiddenException("Insufficient permission to view servers :" + access.toString()); } final FluentIterable results = FluentIterable .from(druidServers) @@ -486,13 +486,13 @@ public class SystemSchema extends AbstractSchema static class TasksTable extends AbstractTable implements ScannableTable { - private final DruidLeaderClient druidLeaderClient; + private final LeaderClient druidLeaderClient; private final ObjectMapper jsonMapper; private final BytesAccumulatingResponseHandler responseHandler; private final AuthorizerMapper authorizerMapper; public TasksTable( - DruidLeaderClient druidLeaderClient, + LeaderClient druidLeaderClient, ObjectMapper jsonMapper, BytesAccumulatingResponseHandler responseHandler, AuthorizerMapper authorizerMapper @@ -609,7 +609,7 @@ public class SystemSchema extends AbstractSchema //Note that overlord must be up to get tasks private static JsonParserIterator getTasks( - DruidLeaderClient indexingServiceClient, + LeaderClient indexingServiceClient, ObjectMapper jsonMapper, BytesAccumulatingResponseHandler responseHandler )