Make brokers backwards compatible

In 0.19, Brokers gained the ability to serve segments. To support this change,
a `BROKER` ServerType was added to `druid.server.coordination`.

Druid nodes prior to this change do not know of this new server type and so
they would fail to deserialize this node's announcement.

This change makes it so that the broker only announces itself if the segment
cache is configured on the broker. It is expected that a Druid admin will only
configure the segment cache on the broker once the cluster has been upgraded
to a version that supports a broker using the segment cache.
This commit is contained in:
Suneet Saldanha 2020-06-28 18:47:22 -07:00
parent 35c7c0ec25
commit f53edb61e0
3 changed files with 56 additions and 13 deletions

View File

@ -23,6 +23,7 @@ import com.google.inject.Binder;
import com.google.inject.Module;
import com.google.inject.Provides;
import com.google.inject.ProvisionException;
import com.google.inject.name.Named;
import com.google.inject.util.Providers;
import org.apache.druid.client.DruidServerConfig;
import org.apache.druid.discovery.DataNodeService;
@ -39,6 +40,8 @@ import javax.annotation.Nullable;
*/
public class StorageNodeModule implements Module
{
public static final String IS_SEGMENT_CACHE_CONFIGURED = "IS_SEGMENT_CACHE_CONFIGURED";
@Override
public void configure(Binder binder)
{
@ -87,4 +90,12 @@ public class StorageNodeModule implements Module
config.getPriority()
);
}
@Provides
@LazySingleton
@Named(IS_SEGMENT_CACHE_CONFIGURED)
public Boolean isSegmentCacheConfigured(SegmentLoaderConfig segmentLoaderConfig)
{
return !segmentLoaderConfig.getLocations().isEmpty();
}
}

View File

@ -20,6 +20,7 @@
package org.apache.druid.cli;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.inject.Key;
import com.google.inject.Module;
import com.google.inject.name.Names;
@ -48,6 +49,7 @@ import org.apache.druid.guice.QueryRunnerFactoryModule;
import org.apache.druid.guice.QueryableModule;
import org.apache.druid.guice.SegmentWranglerModule;
import org.apache.druid.guice.ServerTypeConfig;
import org.apache.druid.guice.StorageNodeModule;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.QuerySegmentWalker;
import org.apache.druid.query.RetryQueryRunnerConfig;
@ -139,11 +141,19 @@ public class CliBroker extends ServerRunnable
LifecycleModule.register(binder, ZkCoordinator.class);
// We only want the broker to announce itself as a data node if the segment cache is configured.
// This is because, prior to 0.19, the BROKER type node did not exist, so older nodes will not be able to
// de-serialize any announcements made by a newer node. If a Druid admin configures the segment cache on the
// broker, it is assumed that all other nodes in the cluster are at the minimum version required to support
// the broker running as a data node.
bindNodeRoleAndAnnouncer(
binder,
DiscoverySideEffectsProvider
.builder(NodeRole.BROKER)
.serviceClasses(ImmutableList.of(DataNodeService.class, LookupNodeService.class))
.serviceClasses(ImmutableMap.of(
LookupNodeService.class, ALWAYS_ENABLED,
DataNodeService.class, StorageNodeModule.IS_SEGMENT_CACHE_CONFIGURED)
)
.useLegacyAnnouncer(true)
.build()
);

View File

@ -19,7 +19,7 @@
package org.apache.druid.cli;
import com.google.common.collect.ImmutableList;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import com.google.inject.Binder;
import com.google.inject.Inject;
@ -27,6 +27,7 @@ import com.google.inject.Injector;
import com.google.inject.Key;
import com.google.inject.Provider;
import com.google.inject.multibindings.Multibinder;
import com.google.inject.name.Names;
import org.apache.druid.curator.discovery.ServiceAnnouncer;
import org.apache.druid.discovery.DiscoveryDruidNode;
import org.apache.druid.discovery.DruidNodeAnnouncer;
@ -39,14 +40,19 @@ import org.apache.druid.java.util.common.lifecycle.Lifecycle;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.server.DruidNode;
import javax.annotation.Nullable;
import java.lang.annotation.Annotation;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
*
*/
public abstract class ServerRunnable extends GuiceRunnable
{
protected static final String ALWAYS_ENABLED = "";
public ServerRunnable(Logger log)
{
super(log);
@ -132,7 +138,7 @@ public abstract class ServerRunnable extends GuiceRunnable
public static class Builder
{
private NodeRole nodeRole;
private List<Class<? extends DruidService>> serviceClasses = ImmutableList.of();
private Map<Class<? extends DruidService>, String> serviceClasses = ImmutableMap.of();
private boolean useLegacyAnnouncer;
public Builder(final NodeRole nodeRole)
@ -140,12 +146,19 @@ public abstract class ServerRunnable extends GuiceRunnable
this.nodeRole = nodeRole;
}
public Builder serviceClasses(final List<Class<? extends DruidService>> serviceClasses)
public Builder serviceClasses(final Map<Class<? extends DruidService>, String> serviceClasses)
{
this.serviceClasses = serviceClasses;
return this;
}
public Builder serviceClasses(final List<Class<? extends DruidService>> serviceClasses)
{
this.serviceClasses =
serviceClasses.stream().collect(Collectors.toMap(clazz -> clazz, clazz -> ALWAYS_ENABLED));
return this;
}
public Builder useLegacyAnnouncer(final boolean useLegacyAnnouncer)
{
this.useLegacyAnnouncer = useLegacyAnnouncer;
@ -154,7 +167,11 @@ public abstract class ServerRunnable extends GuiceRunnable
public DiscoverySideEffectsProvider build()
{
return new DiscoverySideEffectsProvider(nodeRole, serviceClasses, useLegacyAnnouncer);
return new DiscoverySideEffectsProvider(
nodeRole,
serviceClasses,
useLegacyAnnouncer
);
}
}
@ -180,17 +197,17 @@ public abstract class ServerRunnable extends GuiceRunnable
private Injector injector;
private final NodeRole nodeRole;
private final List<Class<? extends DruidService>> serviceClasses;
private final Map<Class<? extends DruidService>, String> serviceClassAndBindingMap;
private final boolean useLegacyAnnouncer;
private DiscoverySideEffectsProvider(
final NodeRole nodeRole,
final List<Class<? extends DruidService>> serviceClasses,
final Map<Class<? extends DruidService>, String> serviceClassAndBindingMap,
final boolean useLegacyAnnouncer
)
{
this.nodeRole = nodeRole;
this.serviceClasses = serviceClasses;
this.serviceClassAndBindingMap = serviceClassAndBindingMap;
this.useLegacyAnnouncer = useLegacyAnnouncer;
}
@ -198,11 +215,17 @@ public abstract class ServerRunnable extends GuiceRunnable
public Child get()
{
ImmutableMap.Builder<String, DruidService> builder = new ImmutableMap.Builder<>();
for (Class<? extends DruidService> clazz : serviceClasses) {
DruidService service = injector.getInstance(clazz);
builder.put(service.getName(), service);
}
for (Map.Entry<Class<? extends DruidService>, String> clazzAndBindingName : serviceClassAndBindingMap.entrySet()) {
@Nullable String bindingName = clazzAndBindingName.getValue();
boolean isServiceEnabled = Strings.isNullOrEmpty(bindingName) || injector.getInstance(
Key.get(Boolean.class, Names.named(bindingName))
);
if (isServiceEnabled) {
DruidService service = injector.getInstance(clazzAndBindingName.getKey());
builder.put(service.getName(), service);
}
}
DiscoveryDruidNode discoveryDruidNode = new DiscoveryDruidNode(druidNode, nodeRole, builder.build());
lifecycle.addHandler(
@ -232,7 +255,6 @@ public abstract class ServerRunnable extends GuiceRunnable
},
Lifecycle.Stage.ANNOUNCEMENTS
);
return new Child();
}
}