mirror of https://github.com/apache/druid.git
refactor NodeRole so extensions can participate in disco and announcement (#10700)
* refactor NodeRole so extensions can participate in disco and announcement * fixes, maybe * retries * javadoc * fix * spelling
This commit is contained in:
parent
797371598d
commit
74fbdd322d
|
@ -59,6 +59,7 @@ getConfPath() {
|
|||
coordinator | overlord) echo $cluster_conf_base/master/coordinator-overlord ;;
|
||||
broker) echo $cluster_conf_base/query/broker ;;
|
||||
router) echo $cluster_conf_base/query/router ;;
|
||||
*) echo $cluster_conf_base/misc/$1 ;;
|
||||
esac
|
||||
}
|
||||
COMMON_CONF_DIR=$(getConfPath _common)
|
||||
|
|
|
@ -57,6 +57,7 @@ services:
|
|||
- druid_manager_segments_pollDuration=PT10S
|
||||
- druid_coordinator_period=PT10S
|
||||
depends_on:
|
||||
- druid-coordinator
|
||||
- druid-metadata-storage
|
||||
- druid-zookeeper-kafka
|
||||
|
||||
|
@ -113,6 +114,35 @@ services:
|
|||
- druid-overlord-two
|
||||
- druid-broker
|
||||
|
||||
druid-custom-node-role:
|
||||
image: druid/cluster
|
||||
container_name: druid-custom-node-role
|
||||
networks:
|
||||
druid-it-net:
|
||||
ipv4_address: 172.172.172.90
|
||||
ports:
|
||||
- 50011:50011
|
||||
- 9301:9301
|
||||
- 9501:9501
|
||||
privileged: true
|
||||
volumes:
|
||||
- ${HOME}/shared:/shared
|
||||
- ./service-supervisords/druid.conf:/usr/lib/druid/conf/druid.conf
|
||||
environment:
|
||||
- DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP}
|
||||
- DRUID_SERVICE=custom-node-role
|
||||
- DRUID_LOG_PATH=/shared/logs/custom-node-role.log
|
||||
- SERVICE_DRUID_JAVA_OPTS=-server -Xmx32m -Xms32m -XX:+UseG1GC -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5011
|
||||
- druid_host=druid-custom-node-role
|
||||
- druid_auth_basic_common_cacheDirectory=/tmp/authCache/custom_node_role
|
||||
- druid_server_https_crlPath=/tls/revocations.crl
|
||||
env_file:
|
||||
- ./environment-configs/common
|
||||
depends_on:
|
||||
- druid-zookeeper-kafka
|
||||
- druid-coordinator
|
||||
- druid-coordinator-two
|
||||
|
||||
networks:
|
||||
druid-it-net:
|
||||
name: druid-it-net
|
||||
|
|
|
@ -30,6 +30,7 @@ getConfPath()
|
|||
broker) echo $cluster_conf_base/query/broker ;;
|
||||
router) echo $cluster_conf_base/query/router ;;
|
||||
overlord) echo $cluster_conf_base/master/overlord ;;
|
||||
*) echo $cluster_conf_base/misc/$1 ;;
|
||||
esac
|
||||
}
|
||||
|
||||
|
@ -91,7 +92,7 @@ setupData()
|
|||
# below s3 credentials needed to access the pre-existing s3 bucket
|
||||
setKey $DRUID_SERVICE druid.s3.accessKey AKIAJI7DG7CDECGBQ6NA
|
||||
setKey $DRUID_SERVICE druid.s3.secretKey OBaLISDFjKLajSTrJ53JoTtzTZLjPlRePcwa+Pjv
|
||||
if [ "$DRUID_INTEGRATION_TEST_GROUP" = "query-retry" ]; then
|
||||
if [ "$DRUID_INTEGRATION_TEST_GROUP" = "query-retry" ] || [ "$DRUID_INTEGRATION_TEST_GROUP" = "high-availability" ]; then
|
||||
setKey $DRUID_SERVICE druid.extensions.loadList [\"druid-s3-extensions\",\"druid-integration-tests\"]
|
||||
else
|
||||
setKey $DRUID_SERVICE druid.extensions.loadList [\"druid-s3-extensions\"]
|
||||
|
|
|
@ -299,6 +299,21 @@
|
|||
<classifier>osx-x86_64</classifier>
|
||||
<scope>runtime</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.eclipse.jetty</groupId>
|
||||
<artifactId>jetty-servlet</artifactId>
|
||||
<version>${jetty.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.eclipse.jetty</groupId>
|
||||
<artifactId>jetty-server</artifactId>
|
||||
<version>${jetty.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.google.inject.extensions</groupId>
|
||||
<artifactId>guice-servlet</artifactId>
|
||||
<version>${guice.version}</version>
|
||||
</dependency>
|
||||
|
||||
<!-- Tests -->
|
||||
<dependency>
|
||||
|
|
|
@ -0,0 +1,166 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.cli;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.inject.Inject;
|
||||
import com.google.inject.Injector;
|
||||
import com.google.inject.Key;
|
||||
import com.google.inject.Module;
|
||||
import com.google.inject.name.Names;
|
||||
import com.google.inject.servlet.GuiceFilter;
|
||||
import io.airlift.airline.Command;
|
||||
import org.apache.druid.client.coordinator.CoordinatorClient;
|
||||
import org.apache.druid.discovery.NodeRole;
|
||||
import org.apache.druid.guice.Jerseys;
|
||||
import org.apache.druid.guice.LazySingleton;
|
||||
import org.apache.druid.guice.LifecycleModule;
|
||||
import org.apache.druid.guice.annotations.Json;
|
||||
import org.apache.druid.java.util.common.logger.Logger;
|
||||
import org.apache.druid.server.http.SelfDiscoveryResource;
|
||||
import org.apache.druid.server.initialization.ServerConfig;
|
||||
import org.apache.druid.server.initialization.jetty.JettyServerInitUtils;
|
||||
import org.apache.druid.server.initialization.jetty.JettyServerInitializer;
|
||||
import org.apache.druid.server.security.AuthenticationUtils;
|
||||
import org.apache.druid.server.security.Authenticator;
|
||||
import org.apache.druid.server.security.AuthenticatorMapper;
|
||||
import org.eclipse.jetty.server.Handler;
|
||||
import org.eclipse.jetty.server.Server;
|
||||
import org.eclipse.jetty.server.handler.HandlerList;
|
||||
import org.eclipse.jetty.server.handler.StatisticsHandler;
|
||||
import org.eclipse.jetty.servlet.DefaultServlet;
|
||||
import org.eclipse.jetty.servlet.ServletContextHandler;
|
||||
import org.eclipse.jetty.servlet.ServletHolder;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
@Command(
|
||||
name = CliCustomNodeRole.SERVICE_NAME,
|
||||
description = "Some custom druid node role defined in an extension"
|
||||
)
|
||||
public class CliCustomNodeRole extends ServerRunnable
|
||||
{
|
||||
private static final Logger LOG = new Logger(CliCustomNodeRole.class);
|
||||
|
||||
public static final String SERVICE_NAME = "custom-node-role";
|
||||
public static final int PORT = 9301;
|
||||
public static final int TLS_PORT = 9501;
|
||||
|
||||
public CliCustomNodeRole()
|
||||
{
|
||||
super(LOG);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected List<? extends Module> getModules()
|
||||
{
|
||||
return ImmutableList.of(
|
||||
binder -> {
|
||||
LOG.info("starting up");
|
||||
binder.bindConstant().annotatedWith(Names.named("serviceName")).to(CliCustomNodeRole.SERVICE_NAME);
|
||||
binder.bindConstant().annotatedWith(Names.named("servicePort")).to(CliCustomNodeRole.PORT);
|
||||
binder.bindConstant().annotatedWith(Names.named("tlsServicePort")).to(CliCustomNodeRole.TLS_PORT);
|
||||
|
||||
binder.bind(CoordinatorClient.class).in(LazySingleton.class);
|
||||
|
||||
binder.bind(JettyServerInitializer.class).to(CustomJettyServiceInitializer.class).in(LazySingleton.class);
|
||||
LifecycleModule.register(binder, Server.class);
|
||||
|
||||
bindNodeRoleAndAnnouncer(
|
||||
binder,
|
||||
DiscoverySideEffectsProvider.builder(new NodeRole(CliCustomNodeRole.SERVICE_NAME)).build()
|
||||
);
|
||||
Jerseys.addResource(binder, SelfDiscoveryResource.class);
|
||||
LifecycleModule.registerKey(binder, Key.get(SelfDiscoveryResource.class));
|
||||
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
// ugly mimic of other jetty initializers
|
||||
private static class CustomJettyServiceInitializer implements JettyServerInitializer
|
||||
{
|
||||
private static List<String> UNSECURED_PATHS = ImmutableList.of(
|
||||
"/status/health"
|
||||
);
|
||||
|
||||
private final ServerConfig serverConfig;
|
||||
|
||||
@Inject
|
||||
public CustomJettyServiceInitializer(ServerConfig serverConfig)
|
||||
{
|
||||
this.serverConfig = serverConfig;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void initialize(Server server, Injector injector)
|
||||
{
|
||||
final ServletContextHandler root = new ServletContextHandler(ServletContextHandler.SESSIONS);
|
||||
root.addServlet(new ServletHolder(new DefaultServlet()), "/*");
|
||||
|
||||
final ObjectMapper jsonMapper = injector.getInstance(Key.get(ObjectMapper.class, Json.class));
|
||||
final AuthenticatorMapper authenticatorMapper = injector.getInstance(AuthenticatorMapper.class);
|
||||
|
||||
AuthenticationUtils.addSecuritySanityCheckFilter(root, jsonMapper);
|
||||
|
||||
// perform no-op authorization for these resources
|
||||
AuthenticationUtils.addNoopAuthenticationAndAuthorizationFilters(root, UNSECURED_PATHS);
|
||||
|
||||
List<Authenticator> authenticators = authenticatorMapper.getAuthenticatorChain();
|
||||
AuthenticationUtils.addAuthenticationFilterChain(root, authenticators);
|
||||
|
||||
JettyServerInitUtils.addAllowHttpMethodsFilter(root, serverConfig.getAllowedHttpMethods());
|
||||
|
||||
JettyServerInitUtils.addExtensionFilters(root, injector);
|
||||
|
||||
// Check that requests were authorized before sending responses
|
||||
AuthenticationUtils.addPreResponseAuthorizationCheckFilter(
|
||||
root,
|
||||
authenticators,
|
||||
jsonMapper
|
||||
);
|
||||
|
||||
root.addFilter(GuiceFilter.class, "/*", null);
|
||||
|
||||
final HandlerList handlerList = new HandlerList();
|
||||
// Do not change the order of the handlers that have already been added
|
||||
for (Handler handler : server.getHandlers()) {
|
||||
handlerList.addHandler(handler);
|
||||
}
|
||||
|
||||
handlerList.addHandler(JettyServerInitUtils.getJettyRequestLogHandler());
|
||||
|
||||
// Add Gzip handler at the very end
|
||||
handlerList.addHandler(
|
||||
JettyServerInitUtils.wrapWithDefaultGzipHandler(
|
||||
root,
|
||||
serverConfig.getInflateBufferSize(),
|
||||
serverConfig.getCompressionLevel()
|
||||
)
|
||||
);
|
||||
|
||||
final StatisticsHandler statisticsHandler = new StatisticsHandler();
|
||||
statisticsHandler.setHandler(handlerList);
|
||||
|
||||
server.setHandler(statisticsHandler);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,31 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.cli;
|
||||
|
||||
import io.airlift.airline.Cli;
|
||||
|
||||
public class CustomNodeRoleCommandCreator implements CliCommandCreator
|
||||
{
|
||||
@Override
|
||||
public void addCommands(Cli.CliBuilder builder)
|
||||
{
|
||||
builder.withGroup("server").withCommands(CliCustomNodeRole.class);
|
||||
}
|
||||
}
|
|
@ -516,6 +516,12 @@ public class ConfigFileConfigProvider implements IntegrationTestingConfigProvide
|
|||
{
|
||||
return "";
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isDocker()
|
||||
{
|
||||
return false;
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
|
|
@ -26,6 +26,7 @@ import com.fasterxml.jackson.databind.DeserializationContext;
|
|||
import com.fasterxml.jackson.databind.JsonDeserializer;
|
||||
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import javax.validation.constraints.NotNull;
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
|
@ -387,6 +388,19 @@ public class DockerConfigProvider implements IntegrationTestingConfigProvider
|
|||
{
|
||||
return streamEndpoint;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isDocker()
|
||||
{
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
@Nullable
|
||||
public String getDockerHost()
|
||||
{
|
||||
return dockerIp;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
|
||||
package org.apache.druid.testing;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
|
@ -162,4 +163,12 @@ public interface IntegrationTestingConfig
|
|||
String getHadoopGcsCredentialsPath();
|
||||
|
||||
String getStreamEndpoint();
|
||||
|
||||
boolean isDocker();
|
||||
|
||||
@Nullable
|
||||
default String getDockerHost()
|
||||
{
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -13,4 +13,5 @@
|
|||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
org.apache.druid.cli.QueryRetryTestCommandCreator
|
||||
org.apache.druid.cli.QueryRetryTestCommandCreator
|
||||
org.apache.druid.cli.CustomNodeRoleCommandCreator
|
||||
|
|
|
@ -391,8 +391,9 @@ public abstract class AbstractITBatchIndexTest extends AbstractIndexerTest
|
|||
for (DataSegment compactedSegment : foundCompactedSegments) {
|
||||
Assert.assertNotNull(compactedSegment.getLastCompactionState());
|
||||
Assert.assertNotNull(compactedSegment.getLastCompactionState().getPartitionsSpec());
|
||||
Assert.assertEquals(compactedSegment.getLastCompactionState().getPartitionsSpec().getType(),
|
||||
SecondaryPartitionType.LINEAR
|
||||
Assert.assertEquals(
|
||||
compactedSegment.getLastCompactionState().getPartitionsSpec().getType(),
|
||||
SecondaryPartitionType.LINEAR
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -20,11 +20,18 @@
|
|||
package org.apache.druid.tests.leadership;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.inject.Inject;
|
||||
import org.apache.druid.cli.CliCustomNodeRole;
|
||||
import org.apache.druid.common.config.NullHandling;
|
||||
import org.apache.druid.curator.discovery.ServerDiscoveryFactory;
|
||||
import org.apache.druid.discovery.DiscoveryDruidNode;
|
||||
import org.apache.druid.discovery.DruidNodeDiscovery;
|
||||
import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
|
||||
import org.apache.druid.discovery.NodeRole;
|
||||
import org.apache.druid.java.util.common.ISE;
|
||||
import org.apache.druid.java.util.common.StringUtils;
|
||||
import org.apache.druid.java.util.common.logger.Logger;
|
||||
import org.apache.druid.java.util.http.client.HttpClient;
|
||||
import org.apache.druid.java.util.http.client.Request;
|
||||
import org.apache.druid.java.util.http.client.response.StatusResponseHandler;
|
||||
|
@ -34,6 +41,7 @@ import org.apache.druid.testing.clients.CoordinatorResourceTestClient;
|
|||
import org.apache.druid.testing.guice.DruidTestModuleFactory;
|
||||
import org.apache.druid.testing.guice.TestClient;
|
||||
import org.apache.druid.testing.utils.DruidClusterAdminClient;
|
||||
import org.apache.druid.testing.utils.ITRetryUtil;
|
||||
import org.apache.druid.testing.utils.SqlTestQueryHelper;
|
||||
import org.apache.druid.tests.TestNGGroup;
|
||||
import org.apache.druid.tests.indexer.AbstractIndexerTest;
|
||||
|
@ -43,12 +51,18 @@ import org.testng.Assert;
|
|||
import org.testng.annotations.Guice;
|
||||
import org.testng.annotations.Test;
|
||||
|
||||
import java.net.MalformedURLException;
|
||||
import java.net.URL;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
@Test(groups = TestNGGroup.HIGH_AVAILABILTY)
|
||||
@Guice(moduleFactory = DruidTestModuleFactory.class)
|
||||
public class ITHighAvailabilityTest
|
||||
{
|
||||
private static final Logger LOG = new Logger(ITHighAvailabilityTest.class);
|
||||
private static final String SYSTEM_QUERIES_RESOURCE = "/queries/high_availability_sys.json";
|
||||
private static final int NUM_LEADERSHIP_SWAPS = 3;
|
||||
|
||||
|
@ -61,6 +75,9 @@ public class ITHighAvailabilityTest
|
|||
@Inject
|
||||
ServerDiscoveryFactory factory;
|
||||
|
||||
@Inject
|
||||
DruidNodeDiscoveryProvider druidNodeDiscovery;
|
||||
|
||||
@Inject
|
||||
CoordinatorResourceTestClient coordinatorClient;
|
||||
|
||||
|
@ -104,6 +121,75 @@ public class ITHighAvailabilityTest
|
|||
} while (runCount++ < NUM_LEADERSHIP_SWAPS);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDiscoveryAndSelfDiscovery()
|
||||
{
|
||||
ITRetryUtil.retryUntil(
|
||||
() -> {
|
||||
List<DruidNodeDiscovery> disco = ImmutableList.of(
|
||||
druidNodeDiscovery.getForNodeRole(NodeRole.COORDINATOR),
|
||||
druidNodeDiscovery.getForNodeRole(NodeRole.OVERLORD),
|
||||
druidNodeDiscovery.getForNodeRole(NodeRole.HISTORICAL),
|
||||
druidNodeDiscovery.getForNodeRole(NodeRole.MIDDLE_MANAGER),
|
||||
druidNodeDiscovery.getForNodeRole(NodeRole.INDEXER),
|
||||
druidNodeDiscovery.getForNodeRole(NodeRole.BROKER),
|
||||
druidNodeDiscovery.getForNodeRole(NodeRole.ROUTER)
|
||||
);
|
||||
|
||||
int servicesDiscovered = 0;
|
||||
for (DruidNodeDiscovery nodeRole : disco) {
|
||||
Collection<DiscoveryDruidNode> nodes = nodeRole.getAllNodes();
|
||||
servicesDiscovered += testSelfDiscovery(nodes);
|
||||
}
|
||||
return servicesDiscovered > 5;
|
||||
},
|
||||
true,
|
||||
TimeUnit.SECONDS.toMillis(5),
|
||||
60,
|
||||
"Standard services discovered"
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCustomDiscovery()
|
||||
{
|
||||
ITRetryUtil.retryUntil(
|
||||
() -> {
|
||||
DruidNodeDiscovery customDisco =
|
||||
druidNodeDiscovery.getForNodeRole(new NodeRole(CliCustomNodeRole.SERVICE_NAME));
|
||||
int count = testSelfDiscovery(customDisco.getAllNodes());
|
||||
return count > 0;
|
||||
},
|
||||
true,
|
||||
TimeUnit.SECONDS.toMillis(5),
|
||||
60,
|
||||
"Custom service discovered"
|
||||
);
|
||||
}
|
||||
|
||||
private int testSelfDiscovery(Collection<DiscoveryDruidNode> nodes)
|
||||
throws MalformedURLException, ExecutionException, InterruptedException
|
||||
{
|
||||
int count = 0;
|
||||
|
||||
for (DiscoveryDruidNode node : nodes) {
|
||||
final String location = StringUtils.format(
|
||||
"http://%s:%s/status/selfDiscovered",
|
||||
config.isDocker() ? config.getDockerHost() : node.getDruidNode().getHost(),
|
||||
node.getDruidNode().getPlaintextPort()
|
||||
);
|
||||
LOG.info("testing self discovery %s", location);
|
||||
StatusResponseHolder response = httpClient.go(
|
||||
new Request(HttpMethod.GET, new URL(location)),
|
||||
StatusResponseHandler.getInstance()
|
||||
).get();
|
||||
LOG.info("%s responded with %s", location, response.getStatus().getCode());
|
||||
Assert.assertEquals(response.getStatus(), HttpResponseStatus.OK);
|
||||
count++;
|
||||
}
|
||||
return count;
|
||||
}
|
||||
|
||||
private void swapLeadersAndWait(String coordinatorLeader, String overlordLeader)
|
||||
{
|
||||
Runnable waitUntilCoordinatorSupplier;
|
||||
|
@ -193,7 +279,6 @@ public class ITHighAvailabilityTest
|
|||
return working;
|
||||
}
|
||||
|
||||
|
||||
private static boolean isCoordinatorOneLeader(IntegrationTestingConfig config, String coordinatorLeader)
|
||||
{
|
||||
return coordinatorLeader.contains(transformHost(config.getCoordinatorInternalHost()));
|
||||
|
|
|
@ -126,7 +126,7 @@ public class ITJdbcQueryTest
|
|||
catalogs.add(catalog);
|
||||
}
|
||||
LOG.info("catalogs %s", catalogs);
|
||||
Assert.assertEquals(ImmutableList.of("druid"), catalogs);
|
||||
Assert.assertEquals(catalogs, ImmutableList.of("druid"));
|
||||
|
||||
Set<String> schemas = new HashSet<>();
|
||||
ResultSet schemasMetadata = metadata.getSchemas("druid", null);
|
||||
|
@ -180,7 +180,7 @@ public class ITJdbcQueryTest
|
|||
resultRowCount++;
|
||||
LOG.info("%s,%s,%s", resultSet.getString(1), resultSet.getLong(2), resultSet.getLong(3));
|
||||
}
|
||||
Assert.assertEquals(10, resultRowCount);
|
||||
Assert.assertEquals(resultRowCount, 10);
|
||||
resultSet.close();
|
||||
}
|
||||
}
|
||||
|
@ -203,7 +203,7 @@ public class ITJdbcQueryTest
|
|||
resultRowCount++;
|
||||
LOG.info("%s,%s,%s", resultSet.getString(1), resultSet.getLong(2), resultSet.getLong(3));
|
||||
}
|
||||
Assert.assertEquals(10, resultRowCount);
|
||||
Assert.assertEquals(resultRowCount, 10);
|
||||
resultSet.close();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -191,20 +191,20 @@ public class ITQueryRetryTestOnMissingSegments
|
|||
|
||||
switch (expectation) {
|
||||
case ALL_SUCCESS:
|
||||
Assert.assertEquals(ITQueryRetryTestOnMissingSegments.TIMES_TO_RUN, querySuccess);
|
||||
Assert.assertEquals(0, queryFailure);
|
||||
Assert.assertEquals(ITQueryRetryTestOnMissingSegments.TIMES_TO_RUN, resultMatches);
|
||||
Assert.assertEquals(0, resultMismatches);
|
||||
Assert.assertEquals(querySuccess, ITQueryRetryTestOnMissingSegments.TIMES_TO_RUN);
|
||||
Assert.assertEquals(queryFailure, 0);
|
||||
Assert.assertEquals(resultMatches, ITQueryRetryTestOnMissingSegments.TIMES_TO_RUN);
|
||||
Assert.assertEquals(resultMismatches, 0);
|
||||
break;
|
||||
case QUERY_FAILURE:
|
||||
Assert.assertTrue(querySuccess > 0, "At least one query is expected to succeed.");
|
||||
Assert.assertTrue(queryFailure > 0, "At least one query is expected to fail.");
|
||||
Assert.assertEquals(querySuccess, resultMatches);
|
||||
Assert.assertEquals(0, resultMismatches);
|
||||
Assert.assertEquals(resultMismatches, 0);
|
||||
break;
|
||||
case INCORRECT_RESULT:
|
||||
Assert.assertEquals(ITQueryRetryTestOnMissingSegments.TIMES_TO_RUN, querySuccess);
|
||||
Assert.assertEquals(0, queryFailure);
|
||||
Assert.assertEquals(querySuccess, ITQueryRetryTestOnMissingSegments.TIMES_TO_RUN);
|
||||
Assert.assertEquals(queryFailure, 0);
|
||||
Assert.assertTrue(resultMatches > 0, "At least one query is expected to return correct results.");
|
||||
Assert.assertTrue(resultMismatches > 0, "At least one query is expected to return less results.");
|
||||
break;
|
||||
|
|
|
@ -123,14 +123,14 @@ public class ITWikipediaQueryTest
|
|||
getQueryBuilder().build()
|
||||
).get();
|
||||
|
||||
Assert.assertEquals(HttpResponseStatus.OK.getCode(), followUp.getStatus().getCode());
|
||||
Assert.assertEquals(followUp.getStatus().getCode(), HttpResponseStatus.OK.getCode());
|
||||
|
||||
StatusResponseHolder andAnother = queryClient.queryAsync(
|
||||
queryHelper.getQueryURL(config.getBrokerUrl()),
|
||||
getQueryBuilder().build()
|
||||
).get();
|
||||
|
||||
Assert.assertEquals(HttpResponseStatus.OK.getCode(), andAnother.getStatus().getCode());
|
||||
Assert.assertEquals(andAnother.getStatus().getCode(), HttpResponseStatus.OK.getCode());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
@ -39,6 +39,6 @@ public class ITCoordinatorOverlordProxyAuthTest
|
|||
public void testProxyAuth()
|
||||
{
|
||||
HttpResponseStatus responseStatus = coordinatorClient.getProxiedOverlordScalingResponseStatus();
|
||||
Assert.assertEquals(HttpResponseStatus.OK, responseStatus);
|
||||
Assert.assertEquals(responseStatus, HttpResponseStatus.OK);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,45 +19,136 @@
|
|||
|
||||
package org.apache.druid.discovery;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonValue;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* This is a historical occasion that this enum is different from {@link
|
||||
* org.apache.druid.server.coordination.ServerType} (also called "node type" in various places) because they are
|
||||
* essentially the same abstraction, but merging them could only increase the complexity and drop the code safety,
|
||||
* because they name the same types differently ("peon" - "indexer-executor" and "middleManager" - "realtime") and both
|
||||
* expose them via JSON APIs.
|
||||
* Defines the 'role' of a Druid service, utilized to strongly type announcement and service discovery.
|
||||
*
|
||||
* These abstractions can probably be merged when Druid updates to Jackson 2.9 that supports JsonAliases, see
|
||||
* Originally, this was an enum to add type safety for discovery and announcement purposes, but was expanded
|
||||
* into a class to allow for extensibility while retaining the type safety of using defined types instead of raw
|
||||
* strings. As such, this class tries to mimic the interface provided by the previous enum.
|
||||
*
|
||||
* Built in node roles define a {@link #name} that is distinct from {@link #jsonName}, and is the previous value
|
||||
* which would occur when the enum was used in a 'toString' context. Custom node roles allow extension to participate
|
||||
* in announcement and discovery, but are limited to only using {@link #jsonName} for both toString and JSON serde.
|
||||
*
|
||||
* The historical context of why the enum was different from {@link org.apache.druid.server.coordination.ServerType}
|
||||
* (also called "node type" in various places) is because while they are essentially the same abstraction, merging them
|
||||
* could only increase the complexity and drop the code safety, because they name the same types differently
|
||||
* ("peon" - "indexer-executor" and "middleManager" - "realtime") and both expose them via JSON APIs.
|
||||
*
|
||||
* These abstractions can all potentially be merged when Druid updates to Jackson 2.9 that supports JsonAliases,
|
||||
* see https://github.com/apache/druid/issues/7152.
|
||||
*/
|
||||
public enum NodeRole
|
||||
public class NodeRole
|
||||
{
|
||||
COORDINATOR("coordinator"),
|
||||
HISTORICAL("historical"),
|
||||
BROKER("broker"),
|
||||
OVERLORD("overlord"),
|
||||
PEON("peon"),
|
||||
ROUTER("router"),
|
||||
MIDDLE_MANAGER("middleManager"),
|
||||
INDEXER("indexer");
|
||||
public static final NodeRole COORDINATOR = new NodeRole("COORDINATOR", "coordinator");
|
||||
public static final NodeRole HISTORICAL = new NodeRole("HISTORICAL", "historical");
|
||||
public static final NodeRole BROKER = new NodeRole("BROKER", "broker");
|
||||
public static final NodeRole OVERLORD = new NodeRole("OVERLORD", "overlord");
|
||||
public static final NodeRole PEON = new NodeRole("PEON", "peon");
|
||||
public static final NodeRole ROUTER = new NodeRole("ROUTER", "router");
|
||||
public static final NodeRole MIDDLE_MANAGER = new NodeRole("MIDDLE_MANAGER", "middleManager");
|
||||
public static final NodeRole INDEXER = new NodeRole("INDEXER", "indexer");
|
||||
|
||||
private static final NodeRole[] BUILT_IN = new NodeRole[]{
|
||||
COORDINATOR,
|
||||
HISTORICAL,
|
||||
BROKER,
|
||||
OVERLORD,
|
||||
PEON,
|
||||
ROUTER,
|
||||
MIDDLE_MANAGER,
|
||||
INDEXER
|
||||
};
|
||||
|
||||
private static final Map<String, NodeRole> BUILT_IN_LOOKUP =
|
||||
Arrays.stream(BUILT_IN).collect(Collectors.toMap(NodeRole::getJsonName, Function.identity()));
|
||||
|
||||
/**
|
||||
* For built-in roles, to preserve backwards compatibility when this was an enum, this provides compatibility for
|
||||
* usages of the enum name as a string, (e.g. allcaps 'COORDINATOR'), which is used by system tables for displaying
|
||||
* node role, and by curator discovery for the discovery path of a node role (the actual payload at the zk location
|
||||
* uses {@link #jsonName})
|
||||
*/
|
||||
private final String name;
|
||||
|
||||
/**
|
||||
* JSON serialized value for {@link NodeRole}
|
||||
*/
|
||||
private final String jsonName;
|
||||
|
||||
NodeRole(String jsonName)
|
||||
/**
|
||||
* Create a custom node role. Known Druid node roles should ALWAYS use the built-in static node roles:
|
||||
* ({@link #COORDINATOR}, {@link #OVERLORD}, {@link #ROUTER}, {@link #BROKER}{@link #INDEXER},
|
||||
* {@link #MIDDLE_MANAGER}, {@link #HISTORICAL}) instead of constructing a new instance.
|
||||
*/
|
||||
public NodeRole(String jsonName)
|
||||
{
|
||||
this.jsonName = jsonName;
|
||||
this(jsonName, jsonName);
|
||||
}
|
||||
|
||||
/**
|
||||
* Lowercase for backward compatibility, as a part of the {@link DiscoveryDruidNode}'s JSON format.
|
||||
*
|
||||
* Don't need to define {@link com.fasterxml.jackson.annotation.JsonCreator} because for enum types {@link JsonValue}
|
||||
* serves for both serialization and deserialization, see the Javadoc comment of {@link JsonValue}.
|
||||
* for built-in roles, to preserve backwards compatibility when this was an enum, allow built-in node roles to specify
|
||||
* the 'name' which is used by 'toString' to be separate from the jsonName, which is the value which the node role
|
||||
* will be serialized as and deserialized from
|
||||
*/
|
||||
private NodeRole(String name, String jsonName)
|
||||
{
|
||||
this.name = name;
|
||||
this.jsonName = jsonName;
|
||||
}
|
||||
|
||||
@JsonValue
|
||||
public String getJsonName()
|
||||
{
|
||||
return jsonName;
|
||||
}
|
||||
|
||||
@JsonCreator
|
||||
public static NodeRole fromJsonName(String jsonName)
|
||||
{
|
||||
return BUILT_IN_LOOKUP.getOrDefault(jsonName, new NodeRole(jsonName));
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
// for built-in roles, to preserve backwards compatibility when this was an enum
|
||||
return name;
|
||||
}
|
||||
|
||||
/**
|
||||
* built-in node roles
|
||||
*/
|
||||
public static NodeRole[] values()
|
||||
{
|
||||
return Arrays.copyOf(BUILT_IN, BUILT_IN.length);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o)
|
||||
{
|
||||
if (this == o) {
|
||||
return true;
|
||||
}
|
||||
if (o == null || getClass() != o.getClass()) {
|
||||
return false;
|
||||
}
|
||||
NodeRole nodeRole = (NodeRole) o;
|
||||
return name.equals(nodeRole.name) && jsonName.equals(nodeRole.jsonName);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode()
|
||||
{
|
||||
return Objects.hash(name, jsonName);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -127,7 +127,7 @@ public abstract class ServerRunnable extends GuiceRunnable
|
|||
* This is a helper class used by CliXXX classes to announce {@link DiscoveryDruidNode}
|
||||
* as part of {@link Lifecycle.Stage#ANNOUNCEMENTS}.
|
||||
*/
|
||||
protected static class DiscoverySideEffectsProvider implements Provider<DiscoverySideEffectsProvider.Child>
|
||||
public static class DiscoverySideEffectsProvider implements Provider<DiscoverySideEffectsProvider.Child>
|
||||
{
|
||||
public static class Child
|
||||
{
|
||||
|
|
Loading…
Reference in New Issue