Do not create ZK paths if not needed (#16816)

Background:
ZK-based segment loading has been completely disabled in #15705 .
ZK `servedSegmentsPath` has been deprecated since Druid 0.7.1, #1182 .
This legacy path has been replaced by the `liveSegmentsPath` and is not used in the code anymore.

Changes:
- Never create ZK loadQueuePath as it is never used.
- Never create ZK servedSegmentsPath as it is never used.
- Do not create ZK liveSegmentsPath if announcement on ZK is disabled
- Fix up tests
This commit is contained in:
Kashif Faraz 2024-08-06 06:59:13 -07:00 committed by GitHub
parent de40d81b29
commit aa49be61ea
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 67 additions and 252 deletions

View File

@ -160,7 +160,6 @@ Druid interacts with ZooKeeper through a set of standard path configurations. We
|`druid.zk.paths.announcementsPath`|Druid service announcement path.|`${druid.zk.paths.base}/announcements`|
|`druid.zk.paths.liveSegmentsPath`|Current path for where Druid services announce their segments.|`${druid.zk.paths.base}/segments`|
|`druid.zk.paths.coordinatorPath`|Used by the Coordinator for leader election.|`${druid.zk.paths.base}/coordinator`|
|`druid.zk.paths.servedSegmentsPath`|Deprecated. Legacy path for where Druid services announce their segments.|`${druid.zk.paths.base}/servedSegments`|
The indexing service also uses its own set of paths. These configs can be included in the common configuration.

View File

@ -53,7 +53,7 @@ ${druid.zk.paths.coordinatorPath}/_COORDINATOR
## Segment "publishing" protocol from Historical and Realtime
The `announcementsPath` and `servedSegmentsPath` are used for this.
The `announcementsPath` and `liveSegmentsPath` are used for this.
All [Historical](../design/historical.md) processes publish themselves on the `announcementsPath`, specifically, they will create an ephemeral znode at
@ -64,13 +64,13 @@ ${druid.zk.paths.announcementsPath}/${druid.host}
Which signifies that they exist. They will also subsequently create a permanent znode at
```
${druid.zk.paths.servedSegmentsPath}/${druid.host}
${druid.zk.paths.liveSegmentsPath}/${druid.host}
```
And as they load up segments, they will attach ephemeral znodes that look like
```
${druid.zk.paths.servedSegmentsPath}/${druid.host}/_segment_identifier_
${druid.zk.paths.liveSegmentsPath}/${druid.host}/_segment_identifier_
```
Processes like the [Coordinator](../design/coordinator.md) and [Broker](../design/broker.md) can then watch these paths to see which processes are currently serving which segments.

View File

@ -37,7 +37,7 @@ public class ZkEnablementConfig
@JsonCreator
public ZkEnablementConfig(@JsonProperty("enabled") Boolean enabled)
{
this.enabled = enabled == null ? true : enabled.booleanValue();
this.enabled = enabled == null || enabled;
}
public boolean isEnabled()
@ -48,6 +48,6 @@ public class ZkEnablementConfig
public static boolean isEnabled(Properties properties)
{
String value = properties.getProperty(PROP_KEY_ENABLED);
return value == null ? true : Boolean.parseBoolean(value);
return value == null || Boolean.parseBoolean(value);
}
}

View File

@ -41,10 +41,6 @@ public class ServerViewModule implements Module
public static final String SERVERVIEW_TYPE_HTTP = "http";
public static final String SERVERVIEW_TYPE_BATCH = "batch";
// this value should be consistent with the default implementation used in
// {@code ServerInventoryViewProvider} & {@code FilteredServerInventoryViewProvider}
public static final String DEFAULT_SERVERVIEW_TYPE = "http";
@Override
public void configure(Binder binder)
{

View File

@ -19,29 +19,25 @@
package org.apache.druid.server.coordination;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Throwables;
import com.google.inject.Inject;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.utils.ZKPaths;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.segment.loading.SegmentLoaderConfig;
import org.apache.druid.server.initialization.BatchDataSegmentAnnouncerConfig;
import org.apache.druid.server.initialization.ZkPathsConfig;
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.concurrent.ExecutorService;
/**
* We are gradually migrating to {@link org.apache.druid.server.http.SegmentListerResource} for driving segment
* loads/drops on data server processes.
* Creates paths for announcing served segments on Zookeeper.
*
* However, this class is still the default mechanism as of this writing (2020-12-03).
* @deprecated as Druid has already migrated to HTTP-based segment loading and
* will soon migrate to HTTP-based inventory view using {@code SegmentListerResource}.
*
* @see org.apache.druid.server.http.SegmentListerResource
*/
@Deprecated
public class ZkCoordinator
@ -50,36 +46,25 @@ public class ZkCoordinator
private final Object lock = new Object();
private final DataSegmentChangeHandler dataSegmentChangeHandler;
private final ObjectMapper jsonMapper;
private final ZkPathsConfig zkPaths;
private final DruidServerMetadata me;
private final CuratorFramework curator;
private final BatchDataSegmentAnnouncerConfig announcerConfig;
@Nullable
private volatile PathChildrenCache loadQueueCache;
private volatile boolean started = false;
private final ExecutorService segmentLoadUnloadService;
@Inject
public ZkCoordinator(
SegmentLoadDropHandler loadDropHandler,
ObjectMapper jsonMapper,
ZkPathsConfig zkPaths,
DruidServerMetadata me,
CuratorFramework curator,
SegmentLoaderConfig config
BatchDataSegmentAnnouncerConfig announcerConfig
)
{
this.dataSegmentChangeHandler = loadDropHandler;
this.jsonMapper = jsonMapper;
this.zkPaths = zkPaths;
this.me = me;
this.curator = curator;
this.segmentLoadUnloadService = Execs.multiThreaded(
config.getNumLoadingThreads(),
"ZKCoordinator--%d"
);
this.announcerConfig = announcerConfig;
}
@LifecycleStart
@ -92,40 +77,17 @@ public class ZkCoordinator
log.info("Starting zkCoordinator for server[%s]", me.getName());
final String loadQueueLocation = ZKPaths.makePath(zkPaths.getLoadQueuePath(), me.getName());
final String servedSegmentsLocation = ZKPaths.makePath(zkPaths.getServedSegmentsPath(), me.getName());
final String liveSegmentsLocation = ZKPaths.makePath(zkPaths.getLiveSegmentsPath(), me.getName());
if (announcerConfig.isSkipSegmentAnnouncementOnZk()) {
log.info("Skipping zkPath creation as segment announcement on ZK is disabled.");
started = true;
return;
}
loadQueueCache = new PathChildrenCache(
curator,
loadQueueLocation,
true,
true,
Execs.singleThreaded("ZkCoordinator")
);
final String liveSegmentsLocation = ZKPaths.makePath(zkPaths.getLiveSegmentsPath(), me.getName());
log.info("Creating zkPath[%s] for announcing live segments.", liveSegmentsLocation);
try {
curator.newNamespaceAwareEnsurePath(loadQueueLocation).ensure(curator.getZookeeperClient());
curator.newNamespaceAwareEnsurePath(servedSegmentsLocation).ensure(curator.getZookeeperClient());
curator.newNamespaceAwareEnsurePath(liveSegmentsLocation).ensure(curator.getZookeeperClient());
loadQueueCache.getListenable().addListener(
(client, event) -> {
final ChildData child = event.getData();
switch (event.getType()) {
case CHILD_ADDED:
childAdded(child);
break;
case CHILD_REMOVED:
log.info("zNode[%s] was removed", event.getData().getPath());
break;
default:
log.info("Ignoring event[%s]", event);
}
}
);
loadQueueCache.start();
}
catch (Exception e) {
Throwables.propagateIfPossible(e, IOException.class);
@ -136,54 +98,6 @@ public class ZkCoordinator
}
}
private void childAdded(ChildData child)
{
segmentLoadUnloadService.submit(() -> {
final String path = child.getPath();
DataSegmentChangeRequest request = new SegmentChangeRequestNoop();
try {
final DataSegmentChangeRequest finalRequest = jsonMapper.readValue(
child.getData(),
DataSegmentChangeRequest.class
);
finalRequest.go(
dataSegmentChangeHandler,
() -> {
try {
curator.delete().guaranteed().forPath(path);
log.info("Completed request [%s]", finalRequest.asString());
}
catch (Exception e) {
try {
curator.delete().guaranteed().forPath(path);
}
catch (Exception e1) {
log.error(e1, "Failed to delete zNode[%s], but ignoring exception.", path);
}
log.error(e, "Exception while removing zNode[%s]", path);
throw new RuntimeException(e);
}
}
);
}
catch (Exception e) {
// Something went wrong in either deserializing the request using jsonMapper or when invoking it
try {
curator.delete().guaranteed().forPath(path);
}
catch (Exception e1) {
log.error(e1, "Failed to delete zNode[%s], but ignoring exception.", path);
}
log.makeAlert(e, "Segment load/unload: uncaught exception.")
.addData("node", path)
.addData("nodeProperties", request)
.emit();
}
});
}
@LifecycleStop
public void stop()
{
@ -193,21 +107,7 @@ public class ZkCoordinator
return;
}
try {
loadQueueCache.close();
}
catch (Exception e) {
throw new RuntimeException(e);
}
finally {
loadQueueCache = null;
started = false;
}
started = false;
}
}
public boolean isStarted()
{
return started;
}
}

View File

@ -31,15 +31,10 @@ public class ZkPathsConfig
@JsonProperty
private String announcementsPath;
@JsonProperty
@Deprecated
private String servedSegmentsPath;
@JsonProperty
private String liveSegmentsPath;
@JsonProperty
private String coordinatorPath;
@JsonProperty
private String loadQueuePath;
@JsonProperty
private String connectorPath;
public String getBase()
@ -57,12 +52,12 @@ public class ZkPathsConfig
return (null == announcementsPath) ? defaultPath("announcements") : announcementsPath;
}
/**
* Path to announce served segments on.
*
* @deprecated Use HTTP-based segment discovery instead.
*/
@Deprecated
public String getServedSegmentsPath()
{
return (null == servedSegmentsPath) ? defaultPath("servedSegments") : servedSegmentsPath;
}
public String getLiveSegmentsPath()
{
return (null == liveSegmentsPath) ? defaultPath("segments") : liveSegmentsPath;
@ -78,11 +73,6 @@ public class ZkPathsConfig
return defaultPath("overlord");
}
public String getLoadQueuePath()
{
return (null == loadQueuePath) ? defaultPath("loadQueue") : loadQueuePath;
}
public String getConnectorPath()
{
return (null == connectorPath) ? defaultPath("connector") : connectorPath;
@ -116,9 +106,7 @@ public class ZkPathsConfig
this.getConnectorPath().equals(otherConfig.getConnectorPath()) &&
this.getLiveSegmentsPath().equals(otherConfig.getLiveSegmentsPath()) &&
this.getCoordinatorPath().equals(otherConfig.getCoordinatorPath()) &&
this.getLoadQueuePath().equals(otherConfig.getLoadQueuePath()) &&
this.getPropertiesPath().equals(otherConfig.getPropertiesPath()) &&
this.getServedSegmentsPath().equals(otherConfig.getServedSegmentsPath())) {
this.getPropertiesPath().equals(otherConfig.getPropertiesPath())) {
return true;
}
return false;
@ -130,10 +118,8 @@ public class ZkPathsConfig
int result = base != null ? base.hashCode() : 0;
result = 31 * result + (propertiesPath != null ? propertiesPath.hashCode() : 0);
result = 31 * result + (announcementsPath != null ? announcementsPath.hashCode() : 0);
result = 31 * result + (servedSegmentsPath != null ? servedSegmentsPath.hashCode() : 0);
result = 31 * result + (liveSegmentsPath != null ? liveSegmentsPath.hashCode() : 0);
result = 31 * result + (coordinatorPath != null ? coordinatorPath.hashCode() : 0);
result = 31 * result + (loadQueuePath != null ? loadQueuePath.hashCode() : 0);
result = 31 * result + (connectorPath != null ? connectorPath.hashCode() : 0);
return result;
}

View File

@ -56,10 +56,8 @@ public class ZkPathsConfigTest extends JsonConfigTesterBase<ZkPathsConfig>
propertyValues.put(StringUtils.format("%s.base", CONFIG_PREFIX), base);
propertyValues.put(StringUtils.format("%s.propertiesPath", CONFIG_PREFIX), ZKPaths.makePath(base, "properties"));
propertyValues.put(StringUtils.format("%s.announcementsPath", CONFIG_PREFIX), ZKPaths.makePath(base, "announcements"));
propertyValues.put(StringUtils.format("%s.servedSegmentsPath", CONFIG_PREFIX), ZKPaths.makePath(base, "servedSegments"));
propertyValues.put(StringUtils.format("%s.liveSegmentsPath", CONFIG_PREFIX), ZKPaths.makePath(base, "segments"));
propertyValues.put(StringUtils.format("%s.coordinatorPath", CONFIG_PREFIX), ZKPaths.makePath(base, "coordinator"));
propertyValues.put(StringUtils.format("%s.loadQueuePath", CONFIG_PREFIX), ZKPaths.makePath(base, "loadQueue"));
propertyValues.put(StringUtils.format("%s.connectorPath", CONFIG_PREFIX), ZKPaths.makePath(base, "connector"));
ZkPathsConfig zkPathsConfigObj = zkPathsConfig.get();

View File

@ -19,34 +19,16 @@
package org.apache.druid.server.coordination;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.utils.EnsurePath;
import org.apache.curator.utils.ZKPaths;
import org.apache.druid.curator.CuratorTestBase;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.loading.SegmentLoaderConfig;
import org.apache.druid.server.SegmentManager;
import org.apache.druid.server.initialization.BatchDataSegmentAnnouncerConfig;
import org.apache.druid.server.initialization.ZkPathsConfig;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.NoneShardSpec;
import org.apache.zookeeper.CreateMode;
import org.easymock.EasyMock;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.util.Arrays;
import java.util.concurrent.CountDownLatch;
/**
*/
public class ZkCoordinatorTest extends CuratorTestBase
public class ZkCoordinatorTest
{
private final ObjectMapper jsonMapper = TestHelper.makeJsonMapper();
private final DruidServerMetadata me = new DruidServerMetadata(
"dummyServer",
"dummyHost",
@ -65,100 +47,55 @@ public class ZkCoordinatorTest extends CuratorTestBase
}
};
@Before
public void setUp() throws Exception
@Test(timeout = 60_000L)
public void testSegmentPathIsCreatedIfZkAnnouncementIsEnabled() throws Exception
{
setupServerAndCurator();
curator.start();
curator.blockUntilConnected();
}
@After
public void tearDown()
{
tearDownServerAndCurator();
testSegmentPathCreated(true);
}
@Test(timeout = 60_000L)
public void testLoadDrop() throws Exception
public void testSegmentPathIsNotCreatedIfZkAnnouncementIsDisabled() throws Exception
{
EmittingLogger.registerEmitter(new NoopServiceEmitter());
DataSegment segment = new DataSegment(
"test",
Intervals.of("P1d/2011-04-02"),
"v0",
ImmutableMap.of("version", "v0", "interval", Intervals.of("P1d/2011-04-02"), "cacheDir", "/no"),
Arrays.asList("dim1", "dim2", "dim3"),
Arrays.asList("metric1", "metric2"),
NoneShardSpec.instance(),
IndexIO.CURRENT_VERSION_ID,
123L
testSegmentPathCreated(false);
}
private void testSegmentPathCreated(boolean announceSegmentsOnZk) throws Exception
{
final String liveSegmentsPath = ZKPaths.makePath(
zkPaths.getLiveSegmentsPath(),
me.getName()
);
CountDownLatch loadLatch = new CountDownLatch(1);
CountDownLatch dropLatch = new CountDownLatch(1);
final EnsurePath mockEnsurePath = EasyMock.mock(EnsurePath.class);
final CuratorFramework mockCurator = EasyMock.mock(CuratorFramework.class);
SegmentLoadDropHandler segmentLoadDropHandler = new SegmentLoadDropHandler(
new SegmentLoaderConfig(),
EasyMock.createNiceMock(DataSegmentAnnouncer.class),
EasyMock.createNiceMock(SegmentManager.class)
)
{
@Override
public void addSegment(DataSegment s, DataSegmentChangeCallback callback)
{
if (segment.getId().equals(s.getId())) {
loadLatch.countDown();
callback.execute();
}
}
if (announceSegmentsOnZk) {
EasyMock.expect(mockCurator.newNamespaceAwareEnsurePath(liveSegmentsPath))
.andReturn(mockEnsurePath).once();
@Override
public void removeSegment(DataSegment s, DataSegmentChangeCallback callback)
{
if (segment.getId().equals(s.getId())) {
dropLatch.countDown();
callback.execute();
}
}
};
EasyMock.expect(mockCurator.getZookeeperClient())
.andReturn(null).once();
ZkCoordinator zkCoordinator = new ZkCoordinator(
segmentLoadDropHandler,
jsonMapper,
mockEnsurePath.ensure(EasyMock.anyObject());
EasyMock.expectLastCall().once();
}
EasyMock.replay(mockCurator, mockEnsurePath);
final ZkCoordinator zkCoordinator = new ZkCoordinator(
zkPaths,
me,
curator,
new SegmentLoaderConfig()
mockCurator,
new BatchDataSegmentAnnouncerConfig() {
@Override
public boolean isSkipSegmentAnnouncementOnZk()
{
return !announceSegmentsOnZk;
}
}
);
zkCoordinator.start();
String segmentZkPath = ZKPaths.makePath(zkPaths.getLoadQueuePath(), me.getName(), segment.getId().toString());
curator
.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.EPHEMERAL)
.forPath(segmentZkPath, jsonMapper.writeValueAsBytes(new SegmentChangeRequestLoad(segment)));
loadLatch.await();
while (curator.checkExists().forPath(segmentZkPath) != null) {
Thread.sleep(100);
}
curator
.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.EPHEMERAL)
.forPath(segmentZkPath, jsonMapper.writeValueAsBytes(new SegmentChangeRequestDrop(segment)));
dropLatch.await();
while (curator.checkExists().forPath(segmentZkPath) != null) {
Thread.sleep(100);
}
EasyMock.verify();
zkCoordinator.stop();
}
}

View File

@ -63,7 +63,6 @@ of the coordinator in these situations.
interfaces to communicate with external dependencies have been provided as simple in-memory implementations:
- communication with metadata store: `SegmentMetadataManager`, `MetadataRuleManager`
- communication with historicals: `HttpClient`, `ServerInventoryView`
- `CuratorFramework`: provided as a mock as simulations of `CuratorLoadQueuePeon` are not supported yet
4. __Inventory__: The coordinator maintains an inventory view of the cluster state. Simulations can choose from two
modes of inventory update - auto and manual. In auto update mode, any change made to the cluster is immediately
reflected in the inventory view. In manual update mode, the inventory must be explicitly synchronized with the