Added LeaderClient interface and NoopDruidLeaderClient class

This commit is contained in:
Surekha Saharan 2018-10-02 16:25:56 -07:00
parent f53600ff1f
commit 100fa46e39
7 changed files with 153 additions and 22 deletions

View File

@ -27,7 +27,8 @@ import org.apache.druid.benchmark.datagen.BenchmarkSchemas;
import org.apache.druid.benchmark.datagen.SegmentGenerator;
import org.apache.druid.client.TimelineServerView;
import org.apache.druid.data.input.Row;
import org.apache.druid.discovery.DruidLeaderClient;
import org.apache.druid.discovery.LeaderClient;
import org.apache.druid.discovery.NoopDruidLeaderClient;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.granularity.Granularities;
@ -114,7 +115,7 @@ public class SqlBenchmark
.createQueryRunnerFactoryConglomerate();
final QueryRunnerFactoryConglomerate conglomerate = conglomerateCloserPair.lhs;
final PlannerConfig plannerConfig = new PlannerConfig();
final DruidLeaderClient druidLeaderClient = EasyMock.createMock(DruidLeaderClient.class);
final LeaderClient druidLeaderClient = new NoopDruidLeaderClient();
this.walker = new SpecificSegmentsQuerySegmentWalker(conglomerate).add(dataSegment, index);
plannerFactory = new PlannerFactory(

View File

@ -59,7 +59,7 @@ import java.util.concurrent.atomic.AtomicReference;
* request.setXXX(..)
* FullResponseHolder responseHolder = druidLeaderClient.go(request)
*/
public class DruidLeaderClient
public class DruidLeaderClient implements LeaderClient
{
private final Logger log = new Logger(DruidLeaderClient.class);
@ -93,6 +93,7 @@ public class DruidLeaderClient
this.serverDiscoverySelector = serverDiscoverySelector;
}
@Override
@LifecycleStart
public void start()
{
@ -110,6 +111,7 @@ public class DruidLeaderClient
}
}
@Override
@LifecycleStop
public void stop()
{
@ -123,6 +125,7 @@ public class DruidLeaderClient
/**
* Make a Request object aimed at the leader. Throws IOException if the leader cannot be located.
*/
@Override
public Request makeRequest(HttpMethod httpMethod, String urlPath) throws IOException
{
Preconditions.checkState(lifecycleLock.awaitStarted(1, TimeUnit.MILLISECONDS));
@ -137,6 +140,7 @@ public class DruidLeaderClient
/**
* Executes the request object aimed at the leader and process the response with given handler
*/
@Override
public <Intermediate, Final> ListenableFuture<Final> goAsync(
final Request request,
final HttpResponseHandler<Intermediate, Final> handler
@ -148,6 +152,7 @@ public class DruidLeaderClient
/**
* Executes a Request object aimed at the leader. Throws IOException if the leader cannot be located.
*/
@Override
public FullResponseHolder go(
Request request,
HttpResponseHandler<FullResponseHolder, FullResponseHolder> responseHandler

View File

@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.discovery;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.druid.java.util.http.client.Request;
import org.apache.druid.java.util.http.client.response.FullResponseHolder;
import org.apache.druid.java.util.http.client.response.HttpResponseHandler;
import org.jboss.netty.handler.codec.http.HttpMethod;
import java.io.IOException;
/**
* This is used to facilitate interaction with Coordinator/Overlord leader nodes. Instance of this interface is injected
* via Guice with annotations @Coordinator or @IndexingService .
*/
public interface LeaderClient
{
void start();
void stop();
/**
* Make a Request object aimed at the leader. Throws IOException if the leader cannot be located.
*/
Request makeRequest(HttpMethod httpMethod, String urlPath) throws IOException;
/**
* Executes a Request object aimed at the leader. Throws IOException if the leader cannot be located.
*/
FullResponseHolder go(Request request, HttpResponseHandler<FullResponseHolder, FullResponseHolder> responseHandler)
throws IOException, InterruptedException;
/**
* Executes the request object aimed at the leader and process the response with given handler
*/
<Intermediate, Final> ListenableFuture<Final> goAsync(
Request request,
HttpResponseHandler<Intermediate, Final> handler
);
}

View File

@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.discovery;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.druid.java.util.http.client.Request;
import org.apache.druid.java.util.http.client.response.FullResponseHolder;
import org.apache.druid.java.util.http.client.response.HttpResponseHandler;
import org.jboss.netty.handler.codec.http.HttpMethod;
public class NoopDruidLeaderClient implements LeaderClient
{
@Override
public void start()
{
//Noop
}
@Override
public void stop()
{
//Noop
}
@Override
public Request makeRequest(
HttpMethod httpMethod, String urlPath
)
{
throw new UnsupportedOperationException();
}
@Override
public FullResponseHolder go(
Request request, HttpResponseHandler<FullResponseHolder, FullResponseHolder> responseHandler
)
{
throw new UnsupportedOperationException();
}
@Override
public <Intermediate, Final> ListenableFuture<Final> goAsync(
Request request, HttpResponseHandler<Intermediate, Final> handler
)
{
throw new UnsupportedOperationException();
}
}

View File

@ -37,7 +37,7 @@ import org.apache.calcite.util.DateString;
import org.apache.calcite.util.TimeString;
import org.apache.calcite.util.TimestampString;
import org.apache.druid.client.TimelineServerView;
import org.apache.druid.discovery.DruidLeaderClient;
import org.apache.druid.discovery.LeaderClient;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
@ -105,8 +105,8 @@ public class Calcites
final TimelineServerView serverView,
final DruidSchema druidSchema,
final AuthorizerMapper authorizerMapper,
final DruidLeaderClient coordinatorDruidLeaderClient,
final DruidLeaderClient overlordDruidLeaderClient,
final LeaderClient coordinatorDruidLeaderClient,
final LeaderClient overlordDruidLeaderClient,
final ObjectMapper jsonMapper
)
{

View File

@ -38,7 +38,7 @@ import org.apache.calcite.tools.Frameworks;
import org.apache.druid.client.TimelineServerView;
import org.apache.druid.client.coordinator.Coordinator;
import org.apache.druid.client.indexing.IndexingService;
import org.apache.druid.discovery.DruidLeaderClient;
import org.apache.druid.discovery.LeaderClient;
import org.apache.druid.guice.annotations.Json;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.server.QueryLifecycleFactory;
@ -68,8 +68,8 @@ public class PlannerFactory
private final PlannerConfig plannerConfig;
private final ObjectMapper jsonMapper;
private final AuthorizerMapper authorizerMapper;
private final DruidLeaderClient coordinatorDruidLeaderClient;
private final DruidLeaderClient overlordDruidLeaderClient;
private final LeaderClient coordinatorDruidLeaderClient;
private final LeaderClient overlordDruidLeaderClient;
@Inject
public PlannerFactory(
@ -81,8 +81,8 @@ public class PlannerFactory
final PlannerConfig plannerConfig,
final AuthorizerMapper authorizerMapper,
final @Json ObjectMapper jsonMapper,
final @Coordinator DruidLeaderClient coordinatorDruidLeaderClient,
final @IndexingService DruidLeaderClient overlordDruidLeaderClient
final @Coordinator LeaderClient coordinatorDruidLeaderClient,
final @IndexingService LeaderClient overlordDruidLeaderClient
)
{
this.druidSchema = druidSchema;

View File

@ -25,7 +25,6 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.ListenableFuture;
@ -46,7 +45,7 @@ import org.apache.druid.client.JsonParserIterator;
import org.apache.druid.client.TimelineServerView;
import org.apache.druid.client.coordinator.Coordinator;
import org.apache.druid.client.indexing.IndexingService;
import org.apache.druid.discovery.DruidLeaderClient;
import org.apache.druid.discovery.LeaderClient;
import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
@ -62,6 +61,7 @@ import org.apache.druid.server.security.Action;
import org.apache.druid.server.security.AuthenticationResult;
import org.apache.druid.server.security.AuthorizationUtils;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.server.security.ForbiddenException;
import org.apache.druid.server.security.Resource;
import org.apache.druid.server.security.ResourceAction;
import org.apache.druid.server.security.ResourceType;
@ -158,8 +158,8 @@ public class SystemSchema extends AbstractSchema
final DruidSchema druidSchema,
final TimelineServerView serverView,
final AuthorizerMapper authorizerMapper,
final @Coordinator DruidLeaderClient coordinatorDruidLeaderClient,
final @IndexingService DruidLeaderClient overlordDruidLeaderClient,
final @Coordinator LeaderClient coordinatorDruidLeaderClient,
final @IndexingService LeaderClient overlordDruidLeaderClient,
final ObjectMapper jsonMapper
)
{
@ -182,14 +182,14 @@ public class SystemSchema extends AbstractSchema
static class SegmentsTable extends AbstractTable implements ScannableTable
{
private final DruidSchema druidSchema;
private final DruidLeaderClient druidLeaderClient;
private final LeaderClient druidLeaderClient;
private final ObjectMapper jsonMapper;
private final BytesAccumulatingResponseHandler responseHandler;
private final AuthorizerMapper authorizerMapper;
public SegmentsTable(
DruidSchema druidSchemna,
DruidLeaderClient druidLeaderClient,
LeaderClient druidLeaderClient,
ObjectMapper jsonMapper,
BytesAccumulatingResponseHandler responseHandler,
AuthorizerMapper authorizerMapper
@ -343,7 +343,7 @@ public class SystemSchema extends AbstractSchema
// Note that coordinator must be up to get segments
private static JsonParserIterator<DataSegment> getMetadataSegments(
DruidLeaderClient coordinatorClient,
LeaderClient coordinatorClient,
ObjectMapper jsonMapper,
BytesAccumulatingResponseHandler responseHandler
)
@ -424,7 +424,7 @@ public class SystemSchema extends AbstractSchema
authorizerMapper
);
if (!access.isAllowed()) {
return Linq4j.asEnumerable(ImmutableList.of());
throw new ForbiddenException("Insufficient permission to view servers :" + access.toString());
}
final FluentIterable<Object[]> results = FluentIterable
.from(druidServers)
@ -486,13 +486,13 @@ public class SystemSchema extends AbstractSchema
static class TasksTable extends AbstractTable implements ScannableTable
{
private final DruidLeaderClient druidLeaderClient;
private final LeaderClient druidLeaderClient;
private final ObjectMapper jsonMapper;
private final BytesAccumulatingResponseHandler responseHandler;
private final AuthorizerMapper authorizerMapper;
public TasksTable(
DruidLeaderClient druidLeaderClient,
LeaderClient druidLeaderClient,
ObjectMapper jsonMapper,
BytesAccumulatingResponseHandler responseHandler,
AuthorizerMapper authorizerMapper
@ -609,7 +609,7 @@ public class SystemSchema extends AbstractSchema
//Note that overlord must be up to get tasks
private static JsonParserIterator<TaskStatusPlus> getTasks(
DruidLeaderClient indexingServiceClient,
LeaderClient indexingServiceClient,
ObjectMapper jsonMapper,
BytesAccumulatingResponseHandler responseHandler
)