Better serverView exec name; remove SingleServerInventoryView (#11770)

Druid currently has 2 serverViews, regular serverView and filtered serverView. The regular serverView is used to monitor all segment announcements from all data nodes (historicals, tasks, indexers). The filtered serverView is used when you want to watch segment announcements from particular tiers. Since these server views keep track of different sets of druidServers and segments in memory, they should be maintained separately. However, they currently share the same name for their executorService, which can cause confusion and make debugging harder especially in the broker since it is using both serverViews, the filtered view for normal query processing and the regular view to serve the servers table (I'm unsure whether this is intended or whether this is a good behavior). This PR changes it to a more obvious name.

This PR also removes SingleServerInventoryView. This view was deprecated a long time ago and has not been documented at least since 0.13 (#6127). I also don't think this can be better in any case than BatchServerInventoryView. Finally, I merged AbstractCuratorServerInventoryView and BatchServerInventoryView as we no longer need AbstractCuratorServerInventoryView after SingleServerInventoryView is removed.
This commit is contained in:
Jihoon Son 2021-12-04 05:13:05 -08:00 committed by GitHub
parent 76d281d64f
commit 1f052b43c5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
22 changed files with 386 additions and 680 deletions

View File

@ -259,7 +259,7 @@ public class DatasourceOptimizerTest extends CuratorTestBase
private void setupViews() throws Exception
{
baseView = new BatchServerInventoryView(zkPathsConfig, curator, jsonMapper, Predicates.alwaysTrue())
baseView = new BatchServerInventoryView(zkPathsConfig, curator, jsonMapper, Predicates.alwaysTrue(), "test")
{
@Override
public void registerSegmentCallback(Executor exec, final SegmentCallback callback)

View File

@ -1,335 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.client;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import org.apache.curator.framework.CuratorFramework;
import org.apache.druid.curator.inventory.CuratorInventoryManager;
import org.apache.druid.curator.inventory.CuratorInventoryManagerStrategy;
import org.apache.druid.curator.inventory.InventoryManagerConfig;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import java.io.IOException;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
/**
*/
public abstract class AbstractCuratorServerInventoryView<InventoryType> implements ServerInventoryView
{
private final EmittingLogger log;
private final CuratorInventoryManager<DruidServer, InventoryType> inventoryManager;
private final AtomicBoolean started = new AtomicBoolean(false);
private final ConcurrentMap<ServerRemovedCallback, Executor> serverRemovedCallbacks = new ConcurrentHashMap<>();
private final ConcurrentMap<SegmentCallback, Executor> segmentCallbacks = new ConcurrentHashMap<>();
public AbstractCuratorServerInventoryView(
final EmittingLogger log,
final String announcementsPath,
final String inventoryPath,
final CuratorFramework curator,
final ObjectMapper jsonMapper,
final TypeReference<InventoryType> typeReference
)
{
this.log = log;
this.inventoryManager = new CuratorInventoryManager<>(
curator,
new InventoryManagerConfig()
{
@Override
public String getContainerPath()
{
return announcementsPath;
}
@Override
public String getInventoryPath()
{
return inventoryPath;
}
},
Execs.singleThreaded("ServerInventoryView-%s"),
new CuratorInventoryManagerStrategy<DruidServer, InventoryType>()
{
@Override
public DruidServer deserializeContainer(byte[] bytes)
{
try {
return jsonMapper.readValue(bytes, DruidServer.class);
}
catch (IOException e) {
throw new RuntimeException(e);
}
}
@Override
public InventoryType deserializeInventory(byte[] bytes)
{
try {
return jsonMapper.readValue(bytes, typeReference);
}
catch (IOException e) {
log.error(e, "Could not parse json: %s", StringUtils.fromUtf8(bytes));
throw new RuntimeException(e);
}
}
@Override
public void newContainer(DruidServer container)
{
log.info("New Server[%s]", container);
}
@Override
public void deadContainer(DruidServer deadContainer)
{
log.info("Server Disappeared[%s]", deadContainer);
runServerRemovedCallbacks(deadContainer);
}
@Override
public DruidServer updateContainer(DruidServer oldContainer, DruidServer newContainer)
{
return newContainer.addDataSegments(oldContainer);
}
@Override
public DruidServer addInventory(
final DruidServer container,
String inventoryKey,
final InventoryType inventory
)
{
return addInnerInventory(container, inventoryKey, inventory);
}
@Override
public DruidServer updateInventory(DruidServer container, String inventoryKey, InventoryType inventory)
{
return updateInnerInventory(container, inventoryKey, inventory);
}
@Override
public DruidServer removeInventory(final DruidServer container, String inventoryKey)
{
return removeInnerInventory(container, inventoryKey);
}
@Override
public void inventoryInitialized()
{
log.info("Inventory Initialized");
runSegmentCallbacks(
input -> input.segmentViewInitialized()
);
}
}
);
}
@LifecycleStart
public void start() throws Exception
{
synchronized (started) {
if (!started.get()) {
inventoryManager.start();
started.set(true);
}
}
}
@LifecycleStop
public void stop() throws IOException
{
synchronized (started) {
if (started.getAndSet(false)) {
inventoryManager.stop();
}
}
}
@Override
public boolean isStarted()
{
return started.get();
}
@Override
public DruidServer getInventoryValue(String containerKey)
{
return inventoryManager.getInventoryValue(containerKey);
}
@Override
public Collection<DruidServer> getInventory()
{
return inventoryManager.getInventory();
}
@Override
public void registerServerRemovedCallback(Executor exec, ServerRemovedCallback callback)
{
serverRemovedCallbacks.put(callback, exec);
}
@Override
public void registerSegmentCallback(Executor exec, SegmentCallback callback)
{
segmentCallbacks.put(callback, exec);
}
public InventoryManagerConfig getInventoryManagerConfig()
{
return inventoryManager.getConfig();
}
protected void runSegmentCallbacks(
final Function<SegmentCallback, CallbackAction> fn
)
{
for (final Map.Entry<SegmentCallback, Executor> entry : segmentCallbacks.entrySet()) {
entry.getValue().execute(
() -> {
if (CallbackAction.UNREGISTER == fn.apply(entry.getKey())) {
segmentCallbackRemoved(entry.getKey());
segmentCallbacks.remove(entry.getKey());
}
}
);
}
}
private void runServerRemovedCallbacks(final DruidServer server)
{
for (final Map.Entry<ServerRemovedCallback, Executor> entry : serverRemovedCallbacks.entrySet()) {
entry.getValue().execute(
() -> {
if (CallbackAction.UNREGISTER == entry.getKey().serverRemoved(server)) {
serverRemovedCallbacks.remove(entry.getKey());
}
}
);
}
}
protected void addSingleInventory(final DruidServer container, final DataSegment inventory)
{
log.debug("Server[%s] added segment[%s]", container.getName(), inventory.getId());
if (container.getSegment(inventory.getId()) != null) {
log.warn(
"Not adding or running callbacks for existing segment[%s] on server[%s]",
inventory.getId(),
container.getName()
);
return;
}
container.addDataSegment(inventory);
runSegmentCallbacks(
input -> input.segmentAdded(container.getMetadata(), inventory)
);
}
void removeSingleInventory(DruidServer container, SegmentId segmentId)
{
log.debug("Server[%s] removed segment[%s]", container.getName(), segmentId);
if (!doRemoveSingleInventory(container, segmentId)) {
log.warn(
"Not running cleanup or callbacks for non-existing segment[%s] on server[%s]",
segmentId,
container.getName()
);
}
}
void removeSingleInventory(final DruidServer container, String segmentId)
{
log.debug("Server[%s] removed segment[%s]", container.getName(), segmentId);
for (SegmentId possibleSegmentId : SegmentId.iterateAllPossibleParsings(segmentId)) {
if (doRemoveSingleInventory(container, possibleSegmentId)) {
return;
}
}
log.warn(
"Not running cleanup or callbacks for non-existing segment[%s] on server[%s]",
segmentId,
container.getName()
);
}
private boolean doRemoveSingleInventory(DruidServer container, SegmentId segmentId)
{
DataSegment segment = container.removeDataSegment(segmentId);
if (segment != null) {
runSegmentCallbacks(
input -> input.segmentRemoved(container.getMetadata(), segment)
);
return true;
} else {
return false;
}
}
@Override
public boolean isSegmentLoadedByServer(String serverKey, DataSegment segment)
{
try {
DruidServer server = getInventoryValue(serverKey);
return server != null && server.getSegment(segment.getId()) != null;
}
catch (Exception ex) {
throw new RuntimeException(ex);
}
}
protected abstract DruidServer addInnerInventory(
DruidServer container,
String inventoryKey,
InventoryType inventory
);
protected abstract DruidServer updateInnerInventory(
DruidServer container,
String inventoryKey,
InventoryType inventory
);
protected abstract DruidServer removeInnerInventory(DruidServer container, String inventoryKey);
protected abstract void segmentCallbackRemoved(SegmentCallback callback);
}

View File

@ -27,59 +27,295 @@ import com.google.common.base.Predicate;
import com.google.common.base.Predicates;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import com.google.inject.Inject;
import org.apache.curator.framework.CuratorFramework;
import org.apache.druid.curator.inventory.CuratorInventoryManager;
import org.apache.druid.curator.inventory.CuratorInventoryManagerStrategy;
import org.apache.druid.curator.inventory.InventoryManagerConfig;
import org.apache.druid.guice.ManageLifecycle;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.server.coordination.DruidServerMetadata;
import org.apache.druid.server.initialization.ZkPathsConfig;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import java.io.IOException;
import java.util.Collection;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* This class is deprecated. Use {@link HttpServerInventoryView} instead.
*/
@Deprecated
@ManageLifecycle
public class BatchServerInventoryView extends AbstractCuratorServerInventoryView<Set<DataSegment>>
implements FilteredServerInventoryView
public class BatchServerInventoryView implements ServerInventoryView, FilteredServerInventoryView
{
private static final EmittingLogger log = new EmittingLogger(BatchServerInventoryView.class);
private final CuratorInventoryManager<DruidServer, Set<DataSegment>> inventoryManager;
private final AtomicBoolean started = new AtomicBoolean(false);
private final ConcurrentMap<ServerRemovedCallback, Executor> serverRemovedCallbacks = new ConcurrentHashMap<>();
private final ConcurrentMap<SegmentCallback, Executor> segmentCallbacks = new ConcurrentHashMap<>();
private final ConcurrentMap<String, Set<DataSegment>> zNodes = new ConcurrentHashMap<>();
private final ConcurrentMap<SegmentCallback, Predicate<Pair<DruidServerMetadata, DataSegment>>> segmentPredicates =
new ConcurrentHashMap<>();
private final Predicate<Pair<DruidServerMetadata, DataSegment>> defaultFilter;
@Inject
public BatchServerInventoryView(
final ZkPathsConfig zkPaths,
final CuratorFramework curator,
final ObjectMapper jsonMapper,
final Predicate<Pair<DruidServerMetadata, DataSegment>> defaultFilter
final Predicate<Pair<DruidServerMetadata, DataSegment>> defaultFilter,
final String pathChildrenCacheExecPrefix
)
{
super(
log,
zkPaths.getAnnouncementsPath(),
zkPaths.getLiveSegmentsPath(),
this.inventoryManager = new CuratorInventoryManager<>(
curator,
jsonMapper,
new TypeReference<Set<DataSegment>>()
new InventoryManagerConfig()
{
@Override
public String getContainerPath()
{
return zkPaths.getAnnouncementsPath();
}
@Override
public String getInventoryPath()
{
return zkPaths.getLiveSegmentsPath();
}
},
Execs.singleThreaded(pathChildrenCacheExecPrefix + "-%s"),
new CuratorInventoryManagerStrategy<DruidServer, Set<DataSegment>>()
{
@Override
public DruidServer deserializeContainer(byte[] bytes)
{
try {
return jsonMapper.readValue(bytes, DruidServer.class);
}
catch (IOException e) {
throw new RuntimeException(e);
}
}
@Override
public Set<DataSegment> deserializeInventory(byte[] bytes)
{
try {
return jsonMapper.readValue(bytes, new TypeReference<Set<DataSegment>>()
{
});
}
catch (IOException e) {
log.error(e, "Could not parse json: %s", StringUtils.fromUtf8(bytes));
throw new RuntimeException(e);
}
}
@Override
public void newContainer(DruidServer container)
{
log.info("New Server[%s]", container);
}
@Override
public void deadContainer(DruidServer deadContainer)
{
log.info("Server Disappeared[%s]", deadContainer);
runServerRemovedCallbacks(deadContainer);
}
@Override
public DruidServer updateContainer(DruidServer oldContainer, DruidServer newContainer)
{
return newContainer.addDataSegments(oldContainer);
}
@Override
public DruidServer addInventory(
final DruidServer container,
String inventoryKey,
final Set<DataSegment> inventory
)
{
return addInnerInventory(container, inventoryKey, inventory);
}
@Override
public DruidServer updateInventory(DruidServer container, String inventoryKey, Set<DataSegment> inventory)
{
return updateInnerInventory(container, inventoryKey, inventory);
}
@Override
public DruidServer removeInventory(final DruidServer container, String inventoryKey)
{
return removeInnerInventory(container, inventoryKey);
}
@Override
public void inventoryInitialized()
{
log.info("Inventory Initialized");
runSegmentCallbacks(SegmentCallback::segmentViewInitialized);
}
}
);
this.defaultFilter = Preconditions.checkNotNull(defaultFilter);
}
@LifecycleStart
public void start() throws Exception
{
synchronized (started) {
if (!started.get()) {
inventoryManager.start();
started.set(true);
}
}
}
@LifecycleStop
public void stop() throws IOException
{
synchronized (started) {
if (started.getAndSet(false)) {
inventoryManager.stop();
}
}
}
@Override
public boolean isStarted()
{
return started.get();
}
@Override
public DruidServer getInventoryValue(String containerKey)
{
return inventoryManager.getInventoryValue(containerKey);
}
@Override
public Collection<DruidServer> getInventory()
{
return inventoryManager.getInventory();
}
@Override
public void registerServerRemovedCallback(Executor exec, ServerRemovedCallback callback)
{
serverRemovedCallbacks.put(callback, exec);
}
@Override
public void registerSegmentCallback(Executor exec, SegmentCallback callback)
{
segmentCallbacks.put(callback, exec);
}
protected void runSegmentCallbacks(
final Function<SegmentCallback, CallbackAction> fn
)
{
for (final Map.Entry<SegmentCallback, Executor> entry : segmentCallbacks.entrySet()) {
entry.getValue().execute(
() -> {
if (CallbackAction.UNREGISTER == fn.apply(entry.getKey())) {
segmentCallbackRemoved(entry.getKey());
segmentCallbacks.remove(entry.getKey());
}
}
);
}
}
private void runServerRemovedCallbacks(final DruidServer server)
{
for (final Map.Entry<ServerRemovedCallback, Executor> entry : serverRemovedCallbacks.entrySet()) {
entry.getValue().execute(
() -> {
if (CallbackAction.UNREGISTER == entry.getKey().serverRemoved(server)) {
serverRemovedCallbacks.remove(entry.getKey());
}
}
);
}
}
protected void addSingleInventory(final DruidServer container, final DataSegment inventory)
{
log.debug("Server[%s] added segment[%s]", container.getName(), inventory.getId());
if (container.getSegment(inventory.getId()) != null) {
log.warn(
"Not adding or running callbacks for existing segment[%s] on server[%s]",
inventory.getId(),
container.getName()
);
return;
}
container.addDataSegment(inventory);
runSegmentCallbacks(
input -> input.segmentAdded(container.getMetadata(), inventory)
);
}
void removeSingleInventory(DruidServer container, SegmentId segmentId)
{
log.debug("Server[%s] removed segment[%s]", container.getName(), segmentId);
if (!doRemoveSingleInventory(container, segmentId)) {
log.warn(
"Not running cleanup or callbacks for non-existing segment[%s] on server[%s]",
segmentId,
container.getName()
);
}
}
private boolean doRemoveSingleInventory(DruidServer container, SegmentId segmentId)
{
DataSegment segment = container.removeDataSegment(segmentId);
if (segment != null) {
runSegmentCallbacks(
input -> input.segmentRemoved(container.getMetadata(), segment)
);
return true;
} else {
return false;
}
}
@Override
public boolean isSegmentLoadedByServer(String serverKey, DataSegment segment)
{
try {
DruidServer server = getInventoryValue(serverKey);
return server != null && server.getSegment(segment.getId()) != null;
}
catch (Exception ex) {
throw new RuntimeException(ex);
}
}
protected DruidServer addInnerInventory(
final DruidServer container,
String inventoryKey,
@ -102,36 +338,35 @@ public class BatchServerInventoryView extends AbstractCuratorServerInventoryView
);
// make a copy of the set and not just a filtered view, in order to not keep all the segment data in memory
Set<DataSegment> filteredInventory = Sets.newHashSet(Iterables.transform(
Iterables.filter(
Iterables.transform(
inventory,
new Function<DataSegment, Pair<DruidServerMetadata, DataSegment>>()
{
@Override
public Pair<DruidServerMetadata, DataSegment> apply(DataSegment input)
{
return Pair.of(container.getMetadata(), input);
}
}
Set<DataSegment> filteredInventory = Sets.newHashSet(
Iterables.transform(
Iterables.filter(
Iterables.transform(
inventory,
new Function<DataSegment, Pair<DruidServerMetadata, DataSegment>>()
{
@Override
public Pair<DruidServerMetadata, DataSegment> apply(DataSegment input)
{
return Pair.of(container.getMetadata(), input);
}
}
),
predicate
),
predicate
),
new Function<Pair<DruidServerMetadata, DataSegment>, DataSegment>()
{
@Override
public DataSegment apply(
Pair<DruidServerMetadata, DataSegment> input
)
{
return DataSegmentInterner.intern(input.rhs);
}
}
));
new Function<Pair<DruidServerMetadata, DataSegment>, DataSegment>()
{
@Override
public DataSegment apply(Pair<DruidServerMetadata, DataSegment> input)
{
return DataSegmentInterner.intern(input.rhs);
}
}
)
);
return filteredInventory;
}
@Override
protected DruidServer updateInnerInventory(DruidServer container, String inventoryKey, Set<DataSegment> inventory)
{
Set<DataSegment> filteredInventory = filterInventory(container, inventory);
@ -152,7 +387,6 @@ public class BatchServerInventoryView extends AbstractCuratorServerInventoryView
return container;
}
@Override
protected DruidServer removeInnerInventory(final DruidServer container, String inventoryKey)
{
log.debug("Server[%s] removed container[%s]", container.getName(), inventoryKey);
@ -176,7 +410,7 @@ public class BatchServerInventoryView extends AbstractCuratorServerInventoryView
final Predicate<Pair<DruidServerMetadata, DataSegment>> filter
)
{
SegmentCallback filteringCallback = new SingleServerInventoryView.FilteringSegmentCallback(callback, filter);
SegmentCallback filteringCallback = new FilteringSegmentCallback(callback, filter);
segmentPredicates.put(filteringCallback, filter);
registerSegmentCallback(
exec,
@ -184,7 +418,6 @@ public class BatchServerInventoryView extends AbstractCuratorServerInventoryView
);
}
@Override
protected void segmentCallbackRemoved(SegmentCallback callback)
{
segmentPredicates.remove(callback);

View File

@ -50,7 +50,8 @@ public class BatchServerInventoryViewProvider implements ServerInventoryViewProv
zkPaths,
curator,
jsonMapper,
Predicates.alwaysTrue()
Predicates.alwaysTrue(),
"BatchServerInventoryView"
);
}
}

View File

@ -48,7 +48,8 @@ public class FilteredBatchServerInventoryViewProvider implements FilteredServerI
zkPaths,
curator,
jsonMapper,
Predicates.alwaysFalse()
Predicates.alwaysFalse(),
"FilteredBatchServerInventoryView"
);
}
}

View File

@ -59,7 +59,8 @@ public class FilteredHttpServerInventoryViewProvider implements FilteredServerIn
httpClient,
druidNodeDiscoveryProvider,
Predicates.alwaysFalse(),
config
config,
"FilteredHttpServerInventoryView"
);
}
}

View File

@ -26,7 +26,6 @@ import com.google.inject.Provider;
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = FilteredBatchServerInventoryViewProvider.class)
@JsonSubTypes(value = {
@JsonSubTypes.Type(name = "legacy", value = FilteredSingleServerInventoryViewProvider.class),
@JsonSubTypes.Type(name = "batch", value = FilteredBatchServerInventoryViewProvider.class),
@JsonSubTypes.Type(name = "http", value = FilteredHttpServerInventoryViewProvider.class)
})

View File

@ -1,49 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.client;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Predicates;
import org.apache.curator.framework.CuratorFramework;
import org.apache.druid.server.initialization.ZkPathsConfig;
import javax.validation.constraints.NotNull;
public class FilteredSingleServerInventoryViewProvider implements FilteredServerInventoryViewProvider
{
@JacksonInject
@NotNull
private ZkPathsConfig zkPaths = null;
@JacksonInject
@NotNull
private CuratorFramework curator = null;
@JacksonInject
@NotNull
private ObjectMapper jsonMapper = null;
@Override
public SingleServerInventoryView get()
{
return new SingleServerInventoryView(zkPaths, curator, jsonMapper, Predicates.alwaysFalse());
}
}

View File

@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.client;
import com.google.common.base.Predicate;
import org.apache.druid.client.ServerView.CallbackAction;
import org.apache.druid.client.ServerView.SegmentCallback;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.server.coordination.DruidServerMetadata;
import org.apache.druid.timeline.DataSegment;
/**
* A SegmentCallback that is called only when the given filter satisfies.
* {@link #segmentViewInitialized()} is an exception and always called
* when the view is initialized without using the filter.
* Callback methods return {@link CallbackAction#CONTINUE} when the filter does not satisfy.
*/
public class FilteringSegmentCallback implements SegmentCallback
{
private final SegmentCallback callback;
private final Predicate<Pair<DruidServerMetadata, DataSegment>> filter;
public FilteringSegmentCallback(SegmentCallback callback, Predicate<Pair<DruidServerMetadata, DataSegment>> filter)
{
this.callback = callback;
this.filter = filter;
}
@Override
public CallbackAction segmentAdded(DruidServerMetadata server, DataSegment segment)
{
final CallbackAction action;
if (filter.apply(Pair.of(server, segment))) {
action = callback.segmentAdded(server, segment);
} else {
action = CallbackAction.CONTINUE;
}
return action;
}
@Override
public CallbackAction segmentRemoved(DruidServerMetadata server, DataSegment segment)
{
final CallbackAction action;
if (filter.apply(Pair.of(server, segment))) {
action = callback.segmentRemoved(server, segment);
} else {
action = CallbackAction.CONTINUE;
}
return action;
}
@Override
public CallbackAction segmentViewInitialized()
{
return callback.segmentViewInitialized();
}
}

View File

@ -28,14 +28,11 @@ import com.google.common.base.Predicates;
import com.google.common.collect.Collections2;
import com.google.common.collect.Maps;
import com.google.common.net.HostAndPort;
import com.google.inject.Inject;
import org.apache.druid.concurrent.LifecycleLock;
import org.apache.druid.discovery.DataNodeService;
import org.apache.druid.discovery.DiscoveryDruidNode;
import org.apache.druid.discovery.DruidNodeDiscovery;
import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
import org.apache.druid.guice.annotations.EscalatedGlobal;
import org.apache.druid.guice.annotations.Smile;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Pair;
@ -104,19 +101,20 @@ public class HttpServerInventoryView implements ServerInventoryView, FilteredSer
// For each queryable server, a name -> DruidServerHolder entry is kept
private final ConcurrentHashMap<String, DruidServerHolder> servers = new ConcurrentHashMap<>();
private final String execNamePrefix;
private volatile ScheduledExecutorService executor;
private final HttpClient httpClient;
private final ObjectMapper smileMapper;
private final HttpServerInventoryViewConfig config;
@Inject
public HttpServerInventoryView(
final @Smile ObjectMapper smileMapper,
final @EscalatedGlobal HttpClient httpClient,
final ObjectMapper smileMapper,
final HttpClient httpClient,
final DruidNodeDiscoveryProvider druidNodeDiscoveryProvider,
final Predicate<Pair<DruidServerMetadata, DataSegment>> defaultFilter,
final HttpServerInventoryViewConfig config
final HttpServerInventoryViewConfig config,
final String execNamePrefix
)
{
this.httpClient = httpClient;
@ -125,6 +123,7 @@ public class HttpServerInventoryView implements ServerInventoryView, FilteredSer
this.defaultFilter = defaultFilter;
this.finalPredicate = defaultFilter;
this.config = config;
this.execNamePrefix = execNamePrefix;
}
@ -136,12 +135,12 @@ public class HttpServerInventoryView implements ServerInventoryView, FilteredSer
throw new ISE("can't start.");
}
log.info("Starting HttpServerInventoryView.");
log.info("Starting %s.", execNamePrefix);
try {
executor = ScheduledExecutors.fixed(
config.getNumThreads(),
"HttpServerInventoryView-%s"
execNamePrefix + "-%s"
);
DruidNodeDiscovery druidNodeDiscovery = druidNodeDiscoveryProvider.getForService(DataNodeService.DISCOVERY_SERVICE_KEY);
@ -194,7 +193,7 @@ public class HttpServerInventoryView implements ServerInventoryView, FilteredSer
lifecycleLock.exitStart();
}
log.info("Started HttpServerInventoryView.");
log.info("Started %s.", execNamePrefix);
}
}
@ -206,13 +205,13 @@ public class HttpServerInventoryView implements ServerInventoryView, FilteredSer
throw new ISE("can't stop.");
}
log.info("Stopping HttpServerInventoryView.");
log.info("Stopping %s.", execNamePrefix);
if (executor != null) {
executor.shutdownNow();
}
log.info("Stopped HttpServerInventoryView.");
log.info("Stopped %s.", execNamePrefix);
}
}
@ -227,7 +226,7 @@ public class HttpServerInventoryView implements ServerInventoryView, FilteredSer
throw new ISE("Lifecycle has already started.");
}
SegmentCallback filteringSegmentCallback = new SingleServerInventoryView.FilteringSegmentCallback(callback, filter);
SegmentCallback filteringSegmentCallback = new FilteringSegmentCallback(callback, filter);
segmentCallbacks.put(filteringSegmentCallback, exec);
segmentPredicates.put(filteringSegmentCallback, filter);

View File

@ -59,7 +59,8 @@ public class HttpServerInventoryViewProvider implements ServerInventoryViewProvi
httpClient,
druidNodeDiscoveryProvider,
Predicates.alwaysTrue(),
config
config,
"HttpServerInventoryView"
);
}
}

View File

@ -27,7 +27,6 @@ import com.google.inject.Provider;
*/
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = BatchServerInventoryViewProvider.class)
@JsonSubTypes(value = {
@JsonSubTypes.Type(name = "legacy", value = SingleServerInventoryProvider.class),
@JsonSubTypes.Type(name = "batch", value = BatchServerInventoryViewProvider.class),
@JsonSubTypes.Type(name = "http", value = HttpServerInventoryViewProvider.class),
})

View File

@ -1,56 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.client;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Predicates;
import org.apache.curator.framework.CuratorFramework;
import org.apache.druid.server.initialization.ZkPathsConfig;
import javax.validation.constraints.NotNull;
/**
*/
public class SingleServerInventoryProvider implements ServerInventoryViewProvider
{
@JacksonInject
@NotNull
private ZkPathsConfig zkPaths = null;
@JacksonInject
@NotNull
private CuratorFramework curator = null;
@JacksonInject
@NotNull
private ObjectMapper jsonMapper = null;
@Override
public ServerInventoryView get()
{
return new SingleServerInventoryView(
zkPaths,
curator,
jsonMapper,
Predicates.alwaysTrue()
);
}
}

View File

@ -1,166 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.client;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.base.Predicates;
import com.google.inject.Inject;
import org.apache.curator.framework.CuratorFramework;
import org.apache.druid.guice.ManageLifecycle;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.server.coordination.DruidServerMetadata;
import org.apache.druid.server.initialization.ZkPathsConfig;
import org.apache.druid.timeline.DataSegment;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
/**
* This class is deprecated. Use {@link HttpServerInventoryView} instead.
*/
@Deprecated
@ManageLifecycle
public class SingleServerInventoryView extends AbstractCuratorServerInventoryView<DataSegment> implements FilteredServerInventoryView
{
private static final EmittingLogger log = new EmittingLogger(SingleServerInventoryView.class);
private final ConcurrentMap<SegmentCallback, Predicate<Pair<DruidServerMetadata, DataSegment>>> segmentPredicates =
new ConcurrentHashMap<>();
private final Predicate<Pair<DruidServerMetadata, DataSegment>> defaultFilter;
@Inject
public SingleServerInventoryView(
final ZkPathsConfig zkPaths,
final CuratorFramework curator,
final ObjectMapper jsonMapper,
final Predicate<Pair<DruidServerMetadata, DataSegment>> defaultFilter
)
{
super(
log,
zkPaths.getAnnouncementsPath(),
zkPaths.getServedSegmentsPath(),
curator,
jsonMapper,
new TypeReference<DataSegment>()
{
}
);
Preconditions.checkNotNull(defaultFilter);
this.defaultFilter = defaultFilter;
}
@Override
protected DruidServer addInnerInventory(DruidServer container, String inventoryKey, DataSegment inventory)
{
Predicate<Pair<DruidServerMetadata, DataSegment>> predicate = Predicates.or(
defaultFilter,
Predicates.or(segmentPredicates.values())
);
if (predicate.apply(Pair.of(container.getMetadata(), inventory))) {
addSingleInventory(container, DataSegmentInterner.intern(inventory));
}
return container;
}
@Override
protected DruidServer updateInnerInventory(DruidServer container, String inventoryKey, DataSegment inventory)
{
return addInnerInventory(container, inventoryKey, inventory);
}
@Override
protected DruidServer removeInnerInventory(DruidServer container, String inventoryKey)
{
removeSingleInventory(container, inventoryKey);
return container;
}
@Override
public void registerSegmentCallback(
final Executor exec,
final SegmentCallback callback,
final Predicate<Pair<DruidServerMetadata, DataSegment>> filter
)
{
SegmentCallback filteringCallback = new SingleServerInventoryView.FilteringSegmentCallback(callback, filter);
segmentPredicates.put(filteringCallback, filter);
registerSegmentCallback(
exec,
filteringCallback
);
}
@Override
protected void segmentCallbackRemoved(SegmentCallback callback)
{
segmentPredicates.remove(callback);
}
public static class FilteringSegmentCallback implements SegmentCallback
{
private final SegmentCallback callback;
private final Predicate<Pair<DruidServerMetadata, DataSegment>> filter;
public FilteringSegmentCallback(SegmentCallback callback, Predicate<Pair<DruidServerMetadata, DataSegment>> filter)
{
this.callback = callback;
this.filter = filter;
}
@Override
public CallbackAction segmentAdded(DruidServerMetadata server, DataSegment segment)
{
final CallbackAction action;
if (filter.apply(Pair.of(server, segment))) {
action = callback.segmentAdded(server, segment);
} else {
action = CallbackAction.CONTINUE;
}
return action;
}
@Override
public CallbackAction segmentRemoved(DruidServerMetadata server, DataSegment segment)
{
final CallbackAction action;
if (filter.apply(Pair.of(server, segment))) {
action = callback.segmentRemoved(server, segment);
} else {
action = CallbackAction.CONTINUE;
}
return action;
}
@Override
public CallbackAction segmentViewInitialized()
{
return callback.segmentViewInitialized();
}
}
}

View File

@ -29,8 +29,6 @@ import org.apache.curator.utils.ZKPaths;
import org.apache.druid.curator.cache.PathChildrenCacheFactory;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.java.util.common.logger.Logger;
import javax.annotation.Nullable;
@ -100,7 +98,6 @@ public class CuratorInventoryManager<ContainerClass, InventoryClass>
.build();
}
@LifecycleStart
public void start() throws Exception
{
PathChildrenCache childrenCache;
@ -131,7 +128,6 @@ public class CuratorInventoryManager<ContainerClass, InventoryClass>
}
}
@LifecycleStop
public void stop() throws IOException
{
synchronized (lock) {

View File

@ -599,7 +599,8 @@ public class BrokerServerViewTest extends CuratorTestBase
zkPathsConfig,
curator,
jsonMapper,
Predicates.alwaysTrue()
Predicates.alwaysTrue(),
"test"
)
{
@Override

View File

@ -292,7 +292,8 @@ public class CoordinatorServerViewTest extends CuratorTestBase
zkPathsConfig,
curator,
jsonMapper,
Predicates.alwaysTrue()
Predicates.alwaysTrue(),
"test"
)
{
@Override

View File

@ -182,7 +182,8 @@ public class HttpServerInventoryViewTest
httpClient,
druidNodeDiscoveryProvider,
(pair) -> !pair.rhs.getDataSource().equals("non-loading-datasource"),
new HttpServerInventoryViewConfig(null, null, null)
new HttpServerInventoryViewConfig(null, null, null),
"test"
);
CountDownLatch initializeCallback1 = new CountDownLatch(1);

View File

@ -181,7 +181,8 @@ public class BatchServerInventoryViewTest
},
cf,
jsonMapper,
Predicates.alwaysTrue()
Predicates.alwaysTrue(),
"test"
);
batchServerInventoryView.start();
@ -204,7 +205,8 @@ public class BatchServerInventoryViewTest
{
return input.rhs.getInterval().getStart().isBefore(SEGMENT_INTERVAL_START.plusDays(INITIAL_SEGMENTS));
}
}
},
"test"
)
{
@Override

View File

@ -476,7 +476,8 @@ public class CuratorDruidCoordinatorTest extends CuratorTestBase
zkPathsConfig,
curator,
jsonMapper,
Predicates.alwaysTrue()
Predicates.alwaysTrue(),
"test"
)
{
@Override

View File

@ -29,12 +29,12 @@ import it.unimi.dsi.fastutil.objects.Object2LongMap;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.druid.client.BatchServerInventoryView;
import org.apache.druid.client.DataSourcesSnapshot;
import org.apache.druid.client.DruidDataSource;
import org.apache.druid.client.DruidServer;
import org.apache.druid.client.ImmutableDruidDataSource;
import org.apache.druid.client.ImmutableDruidServer;
import org.apache.druid.client.SingleServerInventoryView;
import org.apache.druid.common.config.JacksonConfigManager;
import org.apache.druid.curator.CuratorTestBase;
import org.apache.druid.curator.CuratorUtils;
@ -95,7 +95,7 @@ public class DruidCoordinatorTest extends CuratorTestBase
private DataSourcesSnapshot dataSourcesSnapshot;
private DruidCoordinatorRuntimeParams coordinatorRuntimeParams;
private SingleServerInventoryView serverInventoryView;
private BatchServerInventoryView serverInventoryView;
private ScheduledExecutorFactory scheduledExecutorFactory;
private DruidServer druidServer;
private ConcurrentMap<String, LoadQueuePeon> loadManagementPeons;
@ -113,7 +113,7 @@ public class DruidCoordinatorTest extends CuratorTestBase
public void setUp() throws Exception
{
druidServer = EasyMock.createMock(DruidServer.class);
serverInventoryView = EasyMock.createMock(SingleServerInventoryView.class);
serverInventoryView = EasyMock.createMock(BatchServerInventoryView.class);
segmentsMetadataManager = EasyMock.createNiceMock(SegmentsMetadataManager.class);
dataSourcesSnapshot = EasyMock.createNiceMock(DataSourcesSnapshot.class);
coordinatorRuntimeParams = EasyMock.createNiceMock(DruidCoordinatorRuntimeParams.class);

View File

@ -29,10 +29,10 @@ import org.apache.druid.client.BrokerSegmentWatcherConfig;
import org.apache.druid.client.BrokerServerView;
import org.apache.druid.client.DruidServer;
import org.apache.druid.client.FilteredServerInventoryView;
import org.apache.druid.client.FilteringSegmentCallback;
import org.apache.druid.client.ServerView.CallbackAction;
import org.apache.druid.client.ServerView.SegmentCallback;
import org.apache.druid.client.ServerView.ServerRemovedCallback;
import org.apache.druid.client.SingleServerInventoryView.FilteringSegmentCallback;
import org.apache.druid.client.TimelineServerView.TimelineCallback;
import org.apache.druid.client.selector.HighestPriorityTierSelectorStrategy;
import org.apache.druid.client.selector.RandomServerSelectorStrategy;