mirror of https://github.com/apache/druid.git
* Revert "remove ServerDiscoverySelector from DruidLeaderClient (#9481)"
This reverts commit 072bbe210f
.
* fix build
This commit is contained in:
parent
cda9f41e69
commit
b8f7128b2d
|
@ -22,6 +22,8 @@ package org.apache.druid.discovery;
|
|||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.base.Throwables;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import org.apache.druid.client.selector.DiscoverySelector;
|
||||
import org.apache.druid.client.selector.Server;
|
||||
import org.apache.druid.concurrent.LifecycleLock;
|
||||
import org.apache.druid.java.util.common.IOE;
|
||||
import org.apache.druid.java.util.common.ISE;
|
||||
|
@ -70,6 +72,9 @@ public class DruidLeaderClient
|
|||
|
||||
private final String leaderRequestPath;
|
||||
|
||||
//Note: This is kept for back compatibility with pre 0.11.0 releases and should be removed in future.
|
||||
private final DiscoverySelector<Server> serverDiscoverySelector;
|
||||
|
||||
private LifecycleLock lifecycleLock = new LifecycleLock();
|
||||
private DruidNodeDiscovery druidNodeDiscovery;
|
||||
private AtomicReference<String> currentKnownLeader = new AtomicReference<>();
|
||||
|
@ -78,13 +83,15 @@ public class DruidLeaderClient
|
|||
HttpClient httpClient,
|
||||
DruidNodeDiscoveryProvider druidNodeDiscoveryProvider,
|
||||
NodeRole nodeRoleToWatch,
|
||||
String leaderRequestPath
|
||||
String leaderRequestPath,
|
||||
DiscoverySelector<Server> serverDiscoverySelector
|
||||
)
|
||||
{
|
||||
this.httpClient = httpClient;
|
||||
this.druidNodeDiscoveryProvider = druidNodeDiscoveryProvider;
|
||||
this.nodeRoleToWatch = nodeRoleToWatch;
|
||||
this.leaderRequestPath = leaderRequestPath;
|
||||
this.serverDiscoverySelector = serverDiscoverySelector;
|
||||
}
|
||||
|
||||
@LifecycleStart
|
||||
|
@ -296,6 +303,16 @@ public class DruidLeaderClient
|
|||
@Nullable
|
||||
private String pickOneHost()
|
||||
{
|
||||
Server server = serverDiscoverySelector.pick();
|
||||
if (server != null) {
|
||||
return StringUtils.format(
|
||||
"%s://%s:%s",
|
||||
server.getScheme(),
|
||||
server.getAddress(),
|
||||
server.getPort()
|
||||
);
|
||||
}
|
||||
|
||||
Iterator<DiscoveryDruidNode> iter = druidNodeDiscovery.getAllNodes().iterator();
|
||||
if (iter.hasNext()) {
|
||||
DiscoveryDruidNode node = iter.next();
|
||||
|
|
|
@ -24,6 +24,8 @@ import com.google.inject.Module;
|
|||
import com.google.inject.Provides;
|
||||
import org.apache.druid.client.coordinator.Coordinator;
|
||||
import org.apache.druid.client.coordinator.CoordinatorSelectorConfig;
|
||||
import org.apache.druid.curator.discovery.ServerDiscoveryFactory;
|
||||
import org.apache.druid.curator.discovery.ServerDiscoverySelector;
|
||||
import org.apache.druid.discovery.DruidLeaderClient;
|
||||
import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
|
||||
import org.apache.druid.discovery.NodeRole;
|
||||
|
@ -40,19 +42,32 @@ public class CoordinatorDiscoveryModule implements Module
|
|||
JsonConfigProvider.bind(binder, "druid.selectors.coordinator", CoordinatorSelectorConfig.class);
|
||||
}
|
||||
|
||||
@Provides
|
||||
@Coordinator
|
||||
@ManageLifecycle
|
||||
public ServerDiscoverySelector getServiceProvider(
|
||||
CoordinatorSelectorConfig config,
|
||||
ServerDiscoveryFactory serverDiscoveryFactory
|
||||
)
|
||||
{
|
||||
return serverDiscoveryFactory.createSelector(config.getServiceName());
|
||||
}
|
||||
|
||||
@Provides
|
||||
@Coordinator
|
||||
@ManageLifecycle
|
||||
public DruidLeaderClient getLeaderHttpClient(
|
||||
@EscalatedGlobal HttpClient httpClient,
|
||||
DruidNodeDiscoveryProvider druidNodeDiscoveryProvider
|
||||
DruidNodeDiscoveryProvider druidNodeDiscoveryProvider,
|
||||
@Coordinator ServerDiscoverySelector serverDiscoverySelector
|
||||
)
|
||||
{
|
||||
return new DruidLeaderClient(
|
||||
httpClient,
|
||||
druidNodeDiscoveryProvider,
|
||||
NodeRole.COORDINATOR,
|
||||
"/druid/coordinator/v1/leader"
|
||||
"/druid/coordinator/v1/leader",
|
||||
serverDiscoverySelector
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -24,6 +24,8 @@ import com.google.inject.Module;
|
|||
import com.google.inject.Provides;
|
||||
import org.apache.druid.client.indexing.IndexingService;
|
||||
import org.apache.druid.client.indexing.IndexingServiceSelectorConfig;
|
||||
import org.apache.druid.curator.discovery.ServerDiscoveryFactory;
|
||||
import org.apache.druid.curator.discovery.ServerDiscoverySelector;
|
||||
import org.apache.druid.discovery.DruidLeaderClient;
|
||||
import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
|
||||
import org.apache.druid.discovery.NodeRole;
|
||||
|
@ -40,19 +42,32 @@ public class IndexingServiceDiscoveryModule implements Module
|
|||
JsonConfigProvider.bind(binder, "druid.selectors.indexing", IndexingServiceSelectorConfig.class);
|
||||
}
|
||||
|
||||
@Provides
|
||||
@IndexingService
|
||||
@ManageLifecycle
|
||||
public ServerDiscoverySelector getServiceProvider(
|
||||
IndexingServiceSelectorConfig config,
|
||||
ServerDiscoveryFactory serverDiscoveryFactory
|
||||
)
|
||||
{
|
||||
return serverDiscoveryFactory.createSelector(config.getServiceName());
|
||||
}
|
||||
|
||||
@Provides
|
||||
@IndexingService
|
||||
@ManageLifecycle
|
||||
public DruidLeaderClient getLeaderHttpClient(
|
||||
@EscalatedGlobal HttpClient httpClient,
|
||||
DruidNodeDiscoveryProvider druidNodeDiscoveryProvider
|
||||
DruidNodeDiscoveryProvider druidNodeDiscoveryProvider,
|
||||
@IndexingService ServerDiscoverySelector serverDiscoverySelector
|
||||
)
|
||||
{
|
||||
return new DruidLeaderClient(
|
||||
httpClient,
|
||||
druidNodeDiscoveryProvider,
|
||||
NodeRole.OVERLORD,
|
||||
"/druid/indexer/v1/leader"
|
||||
"/druid/indexer/v1/leader",
|
||||
serverDiscoverySelector
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -29,6 +29,7 @@ import com.google.inject.Module;
|
|||
import com.google.inject.name.Named;
|
||||
import com.google.inject.name.Names;
|
||||
import com.google.inject.servlet.GuiceFilter;
|
||||
import org.apache.druid.curator.discovery.ServerDiscoverySelector;
|
||||
import org.apache.druid.guice.GuiceInjectors;
|
||||
import org.apache.druid.guice.Jerseys;
|
||||
import org.apache.druid.guice.JsonConfigProvider;
|
||||
|
@ -122,7 +123,8 @@ public class DruidLeaderClientTest extends BaseJettyTest
|
|||
httpClient,
|
||||
druidNodeDiscoveryProvider,
|
||||
NodeRole.PEON,
|
||||
"/simple/leader"
|
||||
"/simple/leader",
|
||||
EasyMock.createNiceMock(ServerDiscoverySelector.class)
|
||||
);
|
||||
druidLeaderClient.start();
|
||||
|
||||
|
@ -146,7 +148,8 @@ public class DruidLeaderClientTest extends BaseJettyTest
|
|||
httpClient,
|
||||
druidNodeDiscoveryProvider,
|
||||
NodeRole.PEON,
|
||||
"/simple/leader"
|
||||
"/simple/leader",
|
||||
EasyMock.createNiceMock(ServerDiscoverySelector.class)
|
||||
);
|
||||
druidLeaderClient.start();
|
||||
|
||||
|
@ -172,7 +175,8 @@ public class DruidLeaderClientTest extends BaseJettyTest
|
|||
httpClient,
|
||||
druidNodeDiscoveryProvider,
|
||||
NodeRole.PEON,
|
||||
"/simple/leader"
|
||||
"/simple/leader",
|
||||
EasyMock.createNiceMock(ServerDiscoverySelector.class)
|
||||
);
|
||||
druidLeaderClient.start();
|
||||
|
||||
|
@ -184,6 +188,9 @@ public class DruidLeaderClientTest extends BaseJettyTest
|
|||
@Test
|
||||
public void testServerFailureAndRedirect() throws Exception
|
||||
{
|
||||
ServerDiscoverySelector serverDiscoverySelector = EasyMock.createMock(ServerDiscoverySelector.class);
|
||||
EasyMock.expect(serverDiscoverySelector.pick()).andReturn(null).anyTimes();
|
||||
|
||||
DruidNodeDiscovery druidNodeDiscovery = EasyMock.createMock(DruidNodeDiscovery.class);
|
||||
DiscoveryDruidNode dummyNode = new DiscoveryDruidNode(
|
||||
new DruidNode("test", "dummyhost", false, 64231, null, true, false),
|
||||
|
@ -196,13 +203,14 @@ public class DruidLeaderClientTest extends BaseJettyTest
|
|||
DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = EasyMock.createMock(DruidNodeDiscoveryProvider.class);
|
||||
EasyMock.expect(druidNodeDiscoveryProvider.getForNodeRole(NodeRole.PEON)).andReturn(druidNodeDiscovery).anyTimes();
|
||||
|
||||
EasyMock.replay(druidNodeDiscovery, druidNodeDiscoveryProvider);
|
||||
EasyMock.replay(serverDiscoverySelector, druidNodeDiscovery, druidNodeDiscoveryProvider);
|
||||
|
||||
DruidLeaderClient druidLeaderClient = new DruidLeaderClient(
|
||||
httpClient,
|
||||
druidNodeDiscoveryProvider,
|
||||
NodeRole.PEON,
|
||||
"/simple/leader"
|
||||
"/simple/leader",
|
||||
serverDiscoverySelector
|
||||
);
|
||||
druidLeaderClient.start();
|
||||
|
||||
|
@ -228,7 +236,8 @@ public class DruidLeaderClientTest extends BaseJettyTest
|
|||
httpClient,
|
||||
druidNodeDiscoveryProvider,
|
||||
NodeRole.PEON,
|
||||
"/simple/leader"
|
||||
"/simple/leader",
|
||||
EasyMock.createNiceMock(ServerDiscoverySelector.class)
|
||||
);
|
||||
druidLeaderClient.start();
|
||||
|
||||
|
|
|
@ -360,7 +360,7 @@ public class CompactSegmentsTest
|
|||
|
||||
private TestDruidLeaderClient(ObjectMapper jsonMapper)
|
||||
{
|
||||
super(null, new TestNodeDiscoveryProvider(), null, null);
|
||||
super(null, new TestNodeDiscoveryProvider(), null, null, null);
|
||||
this.jsonMapper = jsonMapper;
|
||||
}
|
||||
|
||||
|
|
|
@ -773,7 +773,10 @@ public class CalciteTests
|
|||
new FakeHttpClient(),
|
||||
provider,
|
||||
NodeRole.COORDINATOR,
|
||||
"/simple/leader"
|
||||
"/simple/leader",
|
||||
() -> {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
);
|
||||
|
||||
return new SystemSchema(
|
||||
|
|
Loading…
Reference in New Issue