diff --git a/extensions-contrib/materialized-view-selection/src/test/java/org/apache/druid/query/materializedview/DatasourceOptimizerTest.java b/extensions-contrib/materialized-view-selection/src/test/java/org/apache/druid/query/materializedview/DatasourceOptimizerTest.java index bd070e3b24d..4f0d387c1ad 100644 --- a/extensions-contrib/materialized-view-selection/src/test/java/org/apache/druid/query/materializedview/DatasourceOptimizerTest.java +++ b/extensions-contrib/materialized-view-selection/src/test/java/org/apache/druid/query/materializedview/DatasourceOptimizerTest.java @@ -259,7 +259,7 @@ public class DatasourceOptimizerTest extends CuratorTestBase private void setupViews() throws Exception { - baseView = new BatchServerInventoryView(zkPathsConfig, curator, jsonMapper, Predicates.alwaysTrue()) + baseView = new BatchServerInventoryView(zkPathsConfig, curator, jsonMapper, Predicates.alwaysTrue(), "test") { @Override public void registerSegmentCallback(Executor exec, final SegmentCallback callback) diff --git a/server/src/main/java/org/apache/druid/client/AbstractCuratorServerInventoryView.java b/server/src/main/java/org/apache/druid/client/AbstractCuratorServerInventoryView.java deleted file mode 100644 index 48d1ace22d3..00000000000 --- a/server/src/main/java/org/apache/druid/client/AbstractCuratorServerInventoryView.java +++ /dev/null @@ -1,335 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.client; - -import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Function; -import org.apache.curator.framework.CuratorFramework; -import org.apache.druid.curator.inventory.CuratorInventoryManager; -import org.apache.druid.curator.inventory.CuratorInventoryManagerStrategy; -import org.apache.druid.curator.inventory.InventoryManagerConfig; -import org.apache.druid.java.util.common.StringUtils; -import org.apache.druid.java.util.common.concurrent.Execs; -import org.apache.druid.java.util.common.lifecycle.LifecycleStart; -import org.apache.druid.java.util.common.lifecycle.LifecycleStop; -import org.apache.druid.java.util.emitter.EmittingLogger; -import org.apache.druid.timeline.DataSegment; -import org.apache.druid.timeline.SegmentId; - -import java.io.IOException; -import java.util.Collection; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.Executor; -import java.util.concurrent.atomic.AtomicBoolean; - -/** - */ -public abstract class AbstractCuratorServerInventoryView implements ServerInventoryView -{ - - private final EmittingLogger log; - private final CuratorInventoryManager inventoryManager; - private final AtomicBoolean started = new AtomicBoolean(false); - - private final ConcurrentMap serverRemovedCallbacks = new ConcurrentHashMap<>(); - private final ConcurrentMap segmentCallbacks = new ConcurrentHashMap<>(); - - public AbstractCuratorServerInventoryView( - final EmittingLogger log, - final String announcementsPath, - final String inventoryPath, - final CuratorFramework curator, - final ObjectMapper jsonMapper, - final TypeReference typeReference - ) - { - this.log = log; - this.inventoryManager = new CuratorInventoryManager<>( - curator, - new InventoryManagerConfig() - { - @Override - public String getContainerPath() - { - return announcementsPath; - } - - @Override - public String getInventoryPath() - { - return inventoryPath; - } - }, - Execs.singleThreaded("ServerInventoryView-%s"), - new CuratorInventoryManagerStrategy() - { - @Override - public DruidServer deserializeContainer(byte[] bytes) - { - try { - return jsonMapper.readValue(bytes, DruidServer.class); - } - catch (IOException e) { - throw new RuntimeException(e); - } - } - - @Override - public InventoryType deserializeInventory(byte[] bytes) - { - try { - return jsonMapper.readValue(bytes, typeReference); - } - catch (IOException e) { - log.error(e, "Could not parse json: %s", StringUtils.fromUtf8(bytes)); - throw new RuntimeException(e); - } - } - - @Override - public void newContainer(DruidServer container) - { - log.info("New Server[%s]", container); - } - - @Override - public void deadContainer(DruidServer deadContainer) - { - log.info("Server Disappeared[%s]", deadContainer); - runServerRemovedCallbacks(deadContainer); - } - - @Override - public DruidServer updateContainer(DruidServer oldContainer, DruidServer newContainer) - { - return newContainer.addDataSegments(oldContainer); - } - - @Override - public DruidServer addInventory( - final DruidServer container, - String inventoryKey, - final InventoryType inventory - ) - { - return addInnerInventory(container, inventoryKey, inventory); - } - - @Override - public DruidServer updateInventory(DruidServer container, String inventoryKey, InventoryType inventory) - { - return updateInnerInventory(container, inventoryKey, inventory); - } - - @Override - public DruidServer removeInventory(final DruidServer container, String inventoryKey) - { - return removeInnerInventory(container, inventoryKey); - } - - @Override - public void inventoryInitialized() - { - log.info("Inventory Initialized"); - runSegmentCallbacks( - input -> input.segmentViewInitialized() - ); - } - } - ); - } - - @LifecycleStart - public void start() throws Exception - { - synchronized (started) { - if (!started.get()) { - inventoryManager.start(); - started.set(true); - } - } - } - - @LifecycleStop - public void stop() throws IOException - { - synchronized (started) { - if (started.getAndSet(false)) { - inventoryManager.stop(); - } - } - } - - @Override - public boolean isStarted() - { - return started.get(); - } - - @Override - public DruidServer getInventoryValue(String containerKey) - { - return inventoryManager.getInventoryValue(containerKey); - } - - @Override - public Collection getInventory() - { - return inventoryManager.getInventory(); - } - - @Override - public void registerServerRemovedCallback(Executor exec, ServerRemovedCallback callback) - { - serverRemovedCallbacks.put(callback, exec); - } - - @Override - public void registerSegmentCallback(Executor exec, SegmentCallback callback) - { - segmentCallbacks.put(callback, exec); - } - - public InventoryManagerConfig getInventoryManagerConfig() - { - return inventoryManager.getConfig(); - } - - protected void runSegmentCallbacks( - final Function fn - ) - { - for (final Map.Entry entry : segmentCallbacks.entrySet()) { - entry.getValue().execute( - () -> { - if (CallbackAction.UNREGISTER == fn.apply(entry.getKey())) { - segmentCallbackRemoved(entry.getKey()); - segmentCallbacks.remove(entry.getKey()); - } - } - ); - } - } - - private void runServerRemovedCallbacks(final DruidServer server) - { - for (final Map.Entry entry : serverRemovedCallbacks.entrySet()) { - entry.getValue().execute( - () -> { - if (CallbackAction.UNREGISTER == entry.getKey().serverRemoved(server)) { - serverRemovedCallbacks.remove(entry.getKey()); - } - } - ); - } - } - - protected void addSingleInventory(final DruidServer container, final DataSegment inventory) - { - log.debug("Server[%s] added segment[%s]", container.getName(), inventory.getId()); - - if (container.getSegment(inventory.getId()) != null) { - log.warn( - "Not adding or running callbacks for existing segment[%s] on server[%s]", - inventory.getId(), - container.getName() - ); - - return; - } - - container.addDataSegment(inventory); - - runSegmentCallbacks( - input -> input.segmentAdded(container.getMetadata(), inventory) - ); - } - - void removeSingleInventory(DruidServer container, SegmentId segmentId) - { - log.debug("Server[%s] removed segment[%s]", container.getName(), segmentId); - if (!doRemoveSingleInventory(container, segmentId)) { - log.warn( - "Not running cleanup or callbacks for non-existing segment[%s] on server[%s]", - segmentId, - container.getName() - ); - } - } - - void removeSingleInventory(final DruidServer container, String segmentId) - { - log.debug("Server[%s] removed segment[%s]", container.getName(), segmentId); - for (SegmentId possibleSegmentId : SegmentId.iterateAllPossibleParsings(segmentId)) { - if (doRemoveSingleInventory(container, possibleSegmentId)) { - return; - } - } - log.warn( - "Not running cleanup or callbacks for non-existing segment[%s] on server[%s]", - segmentId, - container.getName() - ); - } - - private boolean doRemoveSingleInventory(DruidServer container, SegmentId segmentId) - { - DataSegment segment = container.removeDataSegment(segmentId); - if (segment != null) { - runSegmentCallbacks( - input -> input.segmentRemoved(container.getMetadata(), segment) - ); - return true; - } else { - return false; - } - } - - @Override - public boolean isSegmentLoadedByServer(String serverKey, DataSegment segment) - { - try { - DruidServer server = getInventoryValue(serverKey); - return server != null && server.getSegment(segment.getId()) != null; - } - catch (Exception ex) { - throw new RuntimeException(ex); - } - } - - protected abstract DruidServer addInnerInventory( - DruidServer container, - String inventoryKey, - InventoryType inventory - ); - - protected abstract DruidServer updateInnerInventory( - DruidServer container, - String inventoryKey, - InventoryType inventory - ); - - protected abstract DruidServer removeInnerInventory(DruidServer container, String inventoryKey); - - protected abstract void segmentCallbackRemoved(SegmentCallback callback); -} diff --git a/server/src/main/java/org/apache/druid/client/BatchServerInventoryView.java b/server/src/main/java/org/apache/druid/client/BatchServerInventoryView.java index d3b254f192a..00dfbce8b89 100644 --- a/server/src/main/java/org/apache/druid/client/BatchServerInventoryView.java +++ b/server/src/main/java/org/apache/druid/client/BatchServerInventoryView.java @@ -27,59 +27,295 @@ import com.google.common.base.Predicate; import com.google.common.base.Predicates; import com.google.common.collect.Iterables; import com.google.common.collect.Sets; -import com.google.inject.Inject; import org.apache.curator.framework.CuratorFramework; +import org.apache.druid.curator.inventory.CuratorInventoryManager; +import org.apache.druid.curator.inventory.CuratorInventoryManagerStrategy; +import org.apache.druid.curator.inventory.InventoryManagerConfig; import org.apache.druid.guice.ManageLifecycle; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Pair; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.concurrent.Execs; +import org.apache.druid.java.util.common.lifecycle.LifecycleStart; +import org.apache.druid.java.util.common.lifecycle.LifecycleStop; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.server.coordination.DruidServerMetadata; import org.apache.druid.server.initialization.ZkPathsConfig; import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.SegmentId; +import java.io.IOException; +import java.util.Collection; +import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicBoolean; /** * This class is deprecated. Use {@link HttpServerInventoryView} instead. */ @Deprecated @ManageLifecycle -public class BatchServerInventoryView extends AbstractCuratorServerInventoryView> - implements FilteredServerInventoryView +public class BatchServerInventoryView implements ServerInventoryView, FilteredServerInventoryView { private static final EmittingLogger log = new EmittingLogger(BatchServerInventoryView.class); + private final CuratorInventoryManager> inventoryManager; + private final AtomicBoolean started = new AtomicBoolean(false); + + private final ConcurrentMap serverRemovedCallbacks = new ConcurrentHashMap<>(); + private final ConcurrentMap segmentCallbacks = new ConcurrentHashMap<>(); + private final ConcurrentMap> zNodes = new ConcurrentHashMap<>(); private final ConcurrentMap>> segmentPredicates = new ConcurrentHashMap<>(); private final Predicate> defaultFilter; - @Inject public BatchServerInventoryView( final ZkPathsConfig zkPaths, final CuratorFramework curator, final ObjectMapper jsonMapper, - final Predicate> defaultFilter + final Predicate> defaultFilter, + final String pathChildrenCacheExecPrefix ) { - super( - log, - zkPaths.getAnnouncementsPath(), - zkPaths.getLiveSegmentsPath(), + this.inventoryManager = new CuratorInventoryManager<>( curator, - jsonMapper, - new TypeReference>() + new InventoryManagerConfig() { + @Override + public String getContainerPath() + { + return zkPaths.getAnnouncementsPath(); + } + + @Override + public String getInventoryPath() + { + return zkPaths.getLiveSegmentsPath(); + } + }, + Execs.singleThreaded(pathChildrenCacheExecPrefix + "-%s"), + new CuratorInventoryManagerStrategy>() + { + @Override + public DruidServer deserializeContainer(byte[] bytes) + { + try { + return jsonMapper.readValue(bytes, DruidServer.class); + } + catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public Set deserializeInventory(byte[] bytes) + { + try { + return jsonMapper.readValue(bytes, new TypeReference>() + { + }); + } + catch (IOException e) { + log.error(e, "Could not parse json: %s", StringUtils.fromUtf8(bytes)); + throw new RuntimeException(e); + } + } + + @Override + public void newContainer(DruidServer container) + { + log.info("New Server[%s]", container); + } + + @Override + public void deadContainer(DruidServer deadContainer) + { + log.info("Server Disappeared[%s]", deadContainer); + runServerRemovedCallbacks(deadContainer); + } + + @Override + public DruidServer updateContainer(DruidServer oldContainer, DruidServer newContainer) + { + return newContainer.addDataSegments(oldContainer); + } + + @Override + public DruidServer addInventory( + final DruidServer container, + String inventoryKey, + final Set inventory + ) + { + return addInnerInventory(container, inventoryKey, inventory); + } + + @Override + public DruidServer updateInventory(DruidServer container, String inventoryKey, Set inventory) + { + return updateInnerInventory(container, inventoryKey, inventory); + } + + @Override + public DruidServer removeInventory(final DruidServer container, String inventoryKey) + { + return removeInnerInventory(container, inventoryKey); + } + + @Override + public void inventoryInitialized() + { + log.info("Inventory Initialized"); + runSegmentCallbacks(SegmentCallback::segmentViewInitialized); + } } ); this.defaultFilter = Preconditions.checkNotNull(defaultFilter); } + @LifecycleStart + public void start() throws Exception + { + synchronized (started) { + if (!started.get()) { + inventoryManager.start(); + started.set(true); + } + } + } + + @LifecycleStop + public void stop() throws IOException + { + synchronized (started) { + if (started.getAndSet(false)) { + inventoryManager.stop(); + } + } + } + @Override + public boolean isStarted() + { + return started.get(); + } + + @Override + public DruidServer getInventoryValue(String containerKey) + { + return inventoryManager.getInventoryValue(containerKey); + } + + @Override + public Collection getInventory() + { + return inventoryManager.getInventory(); + } + + @Override + public void registerServerRemovedCallback(Executor exec, ServerRemovedCallback callback) + { + serverRemovedCallbacks.put(callback, exec); + } + + @Override + public void registerSegmentCallback(Executor exec, SegmentCallback callback) + { + segmentCallbacks.put(callback, exec); + } + + protected void runSegmentCallbacks( + final Function fn + ) + { + for (final Map.Entry entry : segmentCallbacks.entrySet()) { + entry.getValue().execute( + () -> { + if (CallbackAction.UNREGISTER == fn.apply(entry.getKey())) { + segmentCallbackRemoved(entry.getKey()); + segmentCallbacks.remove(entry.getKey()); + } + } + ); + } + } + + private void runServerRemovedCallbacks(final DruidServer server) + { + for (final Map.Entry entry : serverRemovedCallbacks.entrySet()) { + entry.getValue().execute( + () -> { + if (CallbackAction.UNREGISTER == entry.getKey().serverRemoved(server)) { + serverRemovedCallbacks.remove(entry.getKey()); + } + } + ); + } + } + + protected void addSingleInventory(final DruidServer container, final DataSegment inventory) + { + log.debug("Server[%s] added segment[%s]", container.getName(), inventory.getId()); + + if (container.getSegment(inventory.getId()) != null) { + log.warn( + "Not adding or running callbacks for existing segment[%s] on server[%s]", + inventory.getId(), + container.getName() + ); + + return; + } + + container.addDataSegment(inventory); + + runSegmentCallbacks( + input -> input.segmentAdded(container.getMetadata(), inventory) + ); + } + + void removeSingleInventory(DruidServer container, SegmentId segmentId) + { + log.debug("Server[%s] removed segment[%s]", container.getName(), segmentId); + if (!doRemoveSingleInventory(container, segmentId)) { + log.warn( + "Not running cleanup or callbacks for non-existing segment[%s] on server[%s]", + segmentId, + container.getName() + ); + } + } + + private boolean doRemoveSingleInventory(DruidServer container, SegmentId segmentId) + { + DataSegment segment = container.removeDataSegment(segmentId); + if (segment != null) { + runSegmentCallbacks( + input -> input.segmentRemoved(container.getMetadata(), segment) + ); + return true; + } else { + return false; + } + } + + @Override + public boolean isSegmentLoadedByServer(String serverKey, DataSegment segment) + { + try { + DruidServer server = getInventoryValue(serverKey); + return server != null && server.getSegment(segment.getId()) != null; + } + catch (Exception ex) { + throw new RuntimeException(ex); + } + } + protected DruidServer addInnerInventory( final DruidServer container, String inventoryKey, @@ -102,36 +338,35 @@ public class BatchServerInventoryView extends AbstractCuratorServerInventoryView ); // make a copy of the set and not just a filtered view, in order to not keep all the segment data in memory - Set filteredInventory = Sets.newHashSet(Iterables.transform( - Iterables.filter( - Iterables.transform( - inventory, - new Function>() - { - @Override - public Pair apply(DataSegment input) - { - return Pair.of(container.getMetadata(), input); - } - } + Set filteredInventory = Sets.newHashSet( + Iterables.transform( + Iterables.filter( + Iterables.transform( + inventory, + new Function>() + { + @Override + public Pair apply(DataSegment input) + { + return Pair.of(container.getMetadata(), input); + } + } + ), + predicate ), - predicate - ), - new Function, DataSegment>() - { - @Override - public DataSegment apply( - Pair input - ) - { - return DataSegmentInterner.intern(input.rhs); - } - } - )); + new Function, DataSegment>() + { + @Override + public DataSegment apply(Pair input) + { + return DataSegmentInterner.intern(input.rhs); + } + } + ) + ); return filteredInventory; } - @Override protected DruidServer updateInnerInventory(DruidServer container, String inventoryKey, Set inventory) { Set filteredInventory = filterInventory(container, inventory); @@ -152,7 +387,6 @@ public class BatchServerInventoryView extends AbstractCuratorServerInventoryView return container; } - @Override protected DruidServer removeInnerInventory(final DruidServer container, String inventoryKey) { log.debug("Server[%s] removed container[%s]", container.getName(), inventoryKey); @@ -176,7 +410,7 @@ public class BatchServerInventoryView extends AbstractCuratorServerInventoryView final Predicate> filter ) { - SegmentCallback filteringCallback = new SingleServerInventoryView.FilteringSegmentCallback(callback, filter); + SegmentCallback filteringCallback = new FilteringSegmentCallback(callback, filter); segmentPredicates.put(filteringCallback, filter); registerSegmentCallback( exec, @@ -184,7 +418,6 @@ public class BatchServerInventoryView extends AbstractCuratorServerInventoryView ); } - @Override protected void segmentCallbackRemoved(SegmentCallback callback) { segmentPredicates.remove(callback); diff --git a/server/src/main/java/org/apache/druid/client/BatchServerInventoryViewProvider.java b/server/src/main/java/org/apache/druid/client/BatchServerInventoryViewProvider.java index 1686a2f7790..9c555d35b2f 100644 --- a/server/src/main/java/org/apache/druid/client/BatchServerInventoryViewProvider.java +++ b/server/src/main/java/org/apache/druid/client/BatchServerInventoryViewProvider.java @@ -50,7 +50,8 @@ public class BatchServerInventoryViewProvider implements ServerInventoryViewProv zkPaths, curator, jsonMapper, - Predicates.alwaysTrue() + Predicates.alwaysTrue(), + "BatchServerInventoryView" ); } } diff --git a/server/src/main/java/org/apache/druid/client/FilteredBatchServerInventoryViewProvider.java b/server/src/main/java/org/apache/druid/client/FilteredBatchServerInventoryViewProvider.java index d8df8d17a94..bb796936af3 100644 --- a/server/src/main/java/org/apache/druid/client/FilteredBatchServerInventoryViewProvider.java +++ b/server/src/main/java/org/apache/druid/client/FilteredBatchServerInventoryViewProvider.java @@ -48,7 +48,8 @@ public class FilteredBatchServerInventoryViewProvider implements FilteredServerI zkPaths, curator, jsonMapper, - Predicates.alwaysFalse() + Predicates.alwaysFalse(), + "FilteredBatchServerInventoryView" ); } } diff --git a/server/src/main/java/org/apache/druid/client/FilteredHttpServerInventoryViewProvider.java b/server/src/main/java/org/apache/druid/client/FilteredHttpServerInventoryViewProvider.java index af18fe01acd..c614a6aeaed 100644 --- a/server/src/main/java/org/apache/druid/client/FilteredHttpServerInventoryViewProvider.java +++ b/server/src/main/java/org/apache/druid/client/FilteredHttpServerInventoryViewProvider.java @@ -59,7 +59,8 @@ public class FilteredHttpServerInventoryViewProvider implements FilteredServerIn httpClient, druidNodeDiscoveryProvider, Predicates.alwaysFalse(), - config + config, + "FilteredHttpServerInventoryView" ); } } diff --git a/server/src/main/java/org/apache/druid/client/FilteredServerInventoryViewProvider.java b/server/src/main/java/org/apache/druid/client/FilteredServerInventoryViewProvider.java index 5ea3ac2c4d2..fdbb27e6382 100644 --- a/server/src/main/java/org/apache/druid/client/FilteredServerInventoryViewProvider.java +++ b/server/src/main/java/org/apache/druid/client/FilteredServerInventoryViewProvider.java @@ -26,7 +26,6 @@ import com.google.inject.Provider; @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = FilteredBatchServerInventoryViewProvider.class) @JsonSubTypes(value = { - @JsonSubTypes.Type(name = "legacy", value = FilteredSingleServerInventoryViewProvider.class), @JsonSubTypes.Type(name = "batch", value = FilteredBatchServerInventoryViewProvider.class), @JsonSubTypes.Type(name = "http", value = FilteredHttpServerInventoryViewProvider.class) }) diff --git a/server/src/main/java/org/apache/druid/client/FilteredSingleServerInventoryViewProvider.java b/server/src/main/java/org/apache/druid/client/FilteredSingleServerInventoryViewProvider.java deleted file mode 100644 index 4ae88a5040c..00000000000 --- a/server/src/main/java/org/apache/druid/client/FilteredSingleServerInventoryViewProvider.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.client; - -import com.fasterxml.jackson.annotation.JacksonInject; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Predicates; -import org.apache.curator.framework.CuratorFramework; -import org.apache.druid.server.initialization.ZkPathsConfig; - -import javax.validation.constraints.NotNull; - -public class FilteredSingleServerInventoryViewProvider implements FilteredServerInventoryViewProvider -{ - @JacksonInject - @NotNull - private ZkPathsConfig zkPaths = null; - - @JacksonInject - @NotNull - private CuratorFramework curator = null; - - @JacksonInject - @NotNull - private ObjectMapper jsonMapper = null; - - @Override - public SingleServerInventoryView get() - { - return new SingleServerInventoryView(zkPaths, curator, jsonMapper, Predicates.alwaysFalse()); - } -} diff --git a/server/src/main/java/org/apache/druid/client/FilteringSegmentCallback.java b/server/src/main/java/org/apache/druid/client/FilteringSegmentCallback.java new file mode 100644 index 00000000000..f2d0a225706 --- /dev/null +++ b/server/src/main/java/org/apache/druid/client/FilteringSegmentCallback.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.client; + +import com.google.common.base.Predicate; +import org.apache.druid.client.ServerView.CallbackAction; +import org.apache.druid.client.ServerView.SegmentCallback; +import org.apache.druid.java.util.common.Pair; +import org.apache.druid.server.coordination.DruidServerMetadata; +import org.apache.druid.timeline.DataSegment; + +/** + * A SegmentCallback that is called only when the given filter satisfies. + * {@link #segmentViewInitialized()} is an exception and always called + * when the view is initialized without using the filter. + * Callback methods return {@link CallbackAction#CONTINUE} when the filter does not satisfy. + */ +public class FilteringSegmentCallback implements SegmentCallback +{ + + private final SegmentCallback callback; + private final Predicate> filter; + + public FilteringSegmentCallback(SegmentCallback callback, Predicate> filter) + { + this.callback = callback; + this.filter = filter; + } + + @Override + public CallbackAction segmentAdded(DruidServerMetadata server, DataSegment segment) + { + final CallbackAction action; + if (filter.apply(Pair.of(server, segment))) { + action = callback.segmentAdded(server, segment); + } else { + action = CallbackAction.CONTINUE; + } + return action; + } + + @Override + public CallbackAction segmentRemoved(DruidServerMetadata server, DataSegment segment) + { + final CallbackAction action; + if (filter.apply(Pair.of(server, segment))) { + action = callback.segmentRemoved(server, segment); + } else { + action = CallbackAction.CONTINUE; + } + return action; + } + + @Override + public CallbackAction segmentViewInitialized() + { + return callback.segmentViewInitialized(); + } +} diff --git a/server/src/main/java/org/apache/druid/client/HttpServerInventoryView.java b/server/src/main/java/org/apache/druid/client/HttpServerInventoryView.java index edb95c00002..9c6f96dfda4 100644 --- a/server/src/main/java/org/apache/druid/client/HttpServerInventoryView.java +++ b/server/src/main/java/org/apache/druid/client/HttpServerInventoryView.java @@ -28,14 +28,11 @@ import com.google.common.base.Predicates; import com.google.common.collect.Collections2; import com.google.common.collect.Maps; import com.google.common.net.HostAndPort; -import com.google.inject.Inject; import org.apache.druid.concurrent.LifecycleLock; import org.apache.druid.discovery.DataNodeService; import org.apache.druid.discovery.DiscoveryDruidNode; import org.apache.druid.discovery.DruidNodeDiscovery; import org.apache.druid.discovery.DruidNodeDiscoveryProvider; -import org.apache.druid.guice.annotations.EscalatedGlobal; -import org.apache.druid.guice.annotations.Smile; import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Pair; @@ -104,19 +101,20 @@ public class HttpServerInventoryView implements ServerInventoryView, FilteredSer // For each queryable server, a name -> DruidServerHolder entry is kept private final ConcurrentHashMap servers = new ConcurrentHashMap<>(); + private final String execNamePrefix; private volatile ScheduledExecutorService executor; private final HttpClient httpClient; private final ObjectMapper smileMapper; private final HttpServerInventoryViewConfig config; - @Inject public HttpServerInventoryView( - final @Smile ObjectMapper smileMapper, - final @EscalatedGlobal HttpClient httpClient, + final ObjectMapper smileMapper, + final HttpClient httpClient, final DruidNodeDiscoveryProvider druidNodeDiscoveryProvider, final Predicate> defaultFilter, - final HttpServerInventoryViewConfig config + final HttpServerInventoryViewConfig config, + final String execNamePrefix ) { this.httpClient = httpClient; @@ -125,6 +123,7 @@ public class HttpServerInventoryView implements ServerInventoryView, FilteredSer this.defaultFilter = defaultFilter; this.finalPredicate = defaultFilter; this.config = config; + this.execNamePrefix = execNamePrefix; } @@ -136,12 +135,12 @@ public class HttpServerInventoryView implements ServerInventoryView, FilteredSer throw new ISE("can't start."); } - log.info("Starting HttpServerInventoryView."); + log.info("Starting %s.", execNamePrefix); try { executor = ScheduledExecutors.fixed( config.getNumThreads(), - "HttpServerInventoryView-%s" + execNamePrefix + "-%s" ); DruidNodeDiscovery druidNodeDiscovery = druidNodeDiscoveryProvider.getForService(DataNodeService.DISCOVERY_SERVICE_KEY); @@ -194,7 +193,7 @@ public class HttpServerInventoryView implements ServerInventoryView, FilteredSer lifecycleLock.exitStart(); } - log.info("Started HttpServerInventoryView."); + log.info("Started %s.", execNamePrefix); } } @@ -206,13 +205,13 @@ public class HttpServerInventoryView implements ServerInventoryView, FilteredSer throw new ISE("can't stop."); } - log.info("Stopping HttpServerInventoryView."); + log.info("Stopping %s.", execNamePrefix); if (executor != null) { executor.shutdownNow(); } - log.info("Stopped HttpServerInventoryView."); + log.info("Stopped %s.", execNamePrefix); } } @@ -227,7 +226,7 @@ public class HttpServerInventoryView implements ServerInventoryView, FilteredSer throw new ISE("Lifecycle has already started."); } - SegmentCallback filteringSegmentCallback = new SingleServerInventoryView.FilteringSegmentCallback(callback, filter); + SegmentCallback filteringSegmentCallback = new FilteringSegmentCallback(callback, filter); segmentCallbacks.put(filteringSegmentCallback, exec); segmentPredicates.put(filteringSegmentCallback, filter); diff --git a/server/src/main/java/org/apache/druid/client/HttpServerInventoryViewProvider.java b/server/src/main/java/org/apache/druid/client/HttpServerInventoryViewProvider.java index 931129270fc..7f008c58798 100644 --- a/server/src/main/java/org/apache/druid/client/HttpServerInventoryViewProvider.java +++ b/server/src/main/java/org/apache/druid/client/HttpServerInventoryViewProvider.java @@ -59,7 +59,8 @@ public class HttpServerInventoryViewProvider implements ServerInventoryViewProvi httpClient, druidNodeDiscoveryProvider, Predicates.alwaysTrue(), - config + config, + "HttpServerInventoryView" ); } } diff --git a/server/src/main/java/org/apache/druid/client/ServerInventoryViewProvider.java b/server/src/main/java/org/apache/druid/client/ServerInventoryViewProvider.java index ad34f70b618..bc87500f905 100644 --- a/server/src/main/java/org/apache/druid/client/ServerInventoryViewProvider.java +++ b/server/src/main/java/org/apache/druid/client/ServerInventoryViewProvider.java @@ -27,7 +27,6 @@ import com.google.inject.Provider; */ @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = BatchServerInventoryViewProvider.class) @JsonSubTypes(value = { - @JsonSubTypes.Type(name = "legacy", value = SingleServerInventoryProvider.class), @JsonSubTypes.Type(name = "batch", value = BatchServerInventoryViewProvider.class), @JsonSubTypes.Type(name = "http", value = HttpServerInventoryViewProvider.class), }) diff --git a/server/src/main/java/org/apache/druid/client/SingleServerInventoryProvider.java b/server/src/main/java/org/apache/druid/client/SingleServerInventoryProvider.java deleted file mode 100644 index 3e2ad48a335..00000000000 --- a/server/src/main/java/org/apache/druid/client/SingleServerInventoryProvider.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.client; - -import com.fasterxml.jackson.annotation.JacksonInject; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Predicates; -import org.apache.curator.framework.CuratorFramework; -import org.apache.druid.server.initialization.ZkPathsConfig; - -import javax.validation.constraints.NotNull; - -/** - */ -public class SingleServerInventoryProvider implements ServerInventoryViewProvider -{ - @JacksonInject - @NotNull - private ZkPathsConfig zkPaths = null; - - @JacksonInject - @NotNull - private CuratorFramework curator = null; - - @JacksonInject - @NotNull - private ObjectMapper jsonMapper = null; - - @Override - public ServerInventoryView get() - { - return new SingleServerInventoryView( - zkPaths, - curator, - jsonMapper, - Predicates.alwaysTrue() - ); - } -} diff --git a/server/src/main/java/org/apache/druid/client/SingleServerInventoryView.java b/server/src/main/java/org/apache/druid/client/SingleServerInventoryView.java deleted file mode 100644 index 0b591378b62..00000000000 --- a/server/src/main/java/org/apache/druid/client/SingleServerInventoryView.java +++ /dev/null @@ -1,166 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.client; - -import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Preconditions; -import com.google.common.base.Predicate; -import com.google.common.base.Predicates; -import com.google.inject.Inject; -import org.apache.curator.framework.CuratorFramework; -import org.apache.druid.guice.ManageLifecycle; -import org.apache.druid.java.util.common.Pair; -import org.apache.druid.java.util.emitter.EmittingLogger; -import org.apache.druid.server.coordination.DruidServerMetadata; -import org.apache.druid.server.initialization.ZkPathsConfig; -import org.apache.druid.timeline.DataSegment; - -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.Executor; - -/** - * This class is deprecated. Use {@link HttpServerInventoryView} instead. - */ -@Deprecated -@ManageLifecycle -public class SingleServerInventoryView extends AbstractCuratorServerInventoryView implements FilteredServerInventoryView -{ - private static final EmittingLogger log = new EmittingLogger(SingleServerInventoryView.class); - - private final ConcurrentMap>> segmentPredicates = - new ConcurrentHashMap<>(); - private final Predicate> defaultFilter; - - @Inject - public SingleServerInventoryView( - final ZkPathsConfig zkPaths, - final CuratorFramework curator, - final ObjectMapper jsonMapper, - final Predicate> defaultFilter - ) - { - super( - log, - zkPaths.getAnnouncementsPath(), - zkPaths.getServedSegmentsPath(), - curator, - jsonMapper, - new TypeReference() - { - } - ); - - Preconditions.checkNotNull(defaultFilter); - this.defaultFilter = defaultFilter; - } - - @Override - protected DruidServer addInnerInventory(DruidServer container, String inventoryKey, DataSegment inventory) - { - Predicate> predicate = Predicates.or( - defaultFilter, - Predicates.or(segmentPredicates.values()) - ); - if (predicate.apply(Pair.of(container.getMetadata(), inventory))) { - addSingleInventory(container, DataSegmentInterner.intern(inventory)); - } - return container; - } - - @Override - protected DruidServer updateInnerInventory(DruidServer container, String inventoryKey, DataSegment inventory) - { - return addInnerInventory(container, inventoryKey, inventory); - } - - @Override - protected DruidServer removeInnerInventory(DruidServer container, String inventoryKey) - { - removeSingleInventory(container, inventoryKey); - return container; - } - - @Override - public void registerSegmentCallback( - final Executor exec, - final SegmentCallback callback, - final Predicate> filter - ) - { - SegmentCallback filteringCallback = new SingleServerInventoryView.FilteringSegmentCallback(callback, filter); - segmentPredicates.put(filteringCallback, filter); - registerSegmentCallback( - exec, - filteringCallback - ); - } - - @Override - protected void segmentCallbackRemoved(SegmentCallback callback) - { - segmentPredicates.remove(callback); - } - - public static class FilteringSegmentCallback implements SegmentCallback - { - - private final SegmentCallback callback; - private final Predicate> filter; - - public FilteringSegmentCallback(SegmentCallback callback, Predicate> filter) - { - this.callback = callback; - this.filter = filter; - } - - @Override - public CallbackAction segmentAdded(DruidServerMetadata server, DataSegment segment) - { - final CallbackAction action; - if (filter.apply(Pair.of(server, segment))) { - action = callback.segmentAdded(server, segment); - } else { - action = CallbackAction.CONTINUE; - } - return action; - } - - @Override - public CallbackAction segmentRemoved(DruidServerMetadata server, DataSegment segment) - { - final CallbackAction action; - if (filter.apply(Pair.of(server, segment))) { - action = callback.segmentRemoved(server, segment); - } else { - action = CallbackAction.CONTINUE; - } - return action; - } - - @Override - public CallbackAction segmentViewInitialized() - { - return callback.segmentViewInitialized(); - } - } - -} diff --git a/server/src/main/java/org/apache/druid/curator/inventory/CuratorInventoryManager.java b/server/src/main/java/org/apache/druid/curator/inventory/CuratorInventoryManager.java index 88c9d8fb1c6..5df07e96249 100644 --- a/server/src/main/java/org/apache/druid/curator/inventory/CuratorInventoryManager.java +++ b/server/src/main/java/org/apache/druid/curator/inventory/CuratorInventoryManager.java @@ -29,8 +29,6 @@ import org.apache.curator.utils.ZKPaths; import org.apache.druid.curator.cache.PathChildrenCacheFactory; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.io.Closer; -import org.apache.druid.java.util.common.lifecycle.LifecycleStart; -import org.apache.druid.java.util.common.lifecycle.LifecycleStop; import org.apache.druid.java.util.common.logger.Logger; import javax.annotation.Nullable; @@ -100,7 +98,6 @@ public class CuratorInventoryManager .build(); } - @LifecycleStart public void start() throws Exception { PathChildrenCache childrenCache; @@ -131,7 +128,6 @@ public class CuratorInventoryManager } } - @LifecycleStop public void stop() throws IOException { synchronized (lock) { diff --git a/server/src/test/java/org/apache/druid/client/BrokerServerViewTest.java b/server/src/test/java/org/apache/druid/client/BrokerServerViewTest.java index 0a0e6b83cac..5855c9d2e78 100644 --- a/server/src/test/java/org/apache/druid/client/BrokerServerViewTest.java +++ b/server/src/test/java/org/apache/druid/client/BrokerServerViewTest.java @@ -599,7 +599,8 @@ public class BrokerServerViewTest extends CuratorTestBase zkPathsConfig, curator, jsonMapper, - Predicates.alwaysTrue() + Predicates.alwaysTrue(), + "test" ) { @Override diff --git a/server/src/test/java/org/apache/druid/client/CoordinatorServerViewTest.java b/server/src/test/java/org/apache/druid/client/CoordinatorServerViewTest.java index 961303ca57b..5a36fecb605 100644 --- a/server/src/test/java/org/apache/druid/client/CoordinatorServerViewTest.java +++ b/server/src/test/java/org/apache/druid/client/CoordinatorServerViewTest.java @@ -292,7 +292,8 @@ public class CoordinatorServerViewTest extends CuratorTestBase zkPathsConfig, curator, jsonMapper, - Predicates.alwaysTrue() + Predicates.alwaysTrue(), + "test" ) { @Override diff --git a/server/src/test/java/org/apache/druid/client/HttpServerInventoryViewTest.java b/server/src/test/java/org/apache/druid/client/HttpServerInventoryViewTest.java index f0fd54afddf..896f90d67dd 100644 --- a/server/src/test/java/org/apache/druid/client/HttpServerInventoryViewTest.java +++ b/server/src/test/java/org/apache/druid/client/HttpServerInventoryViewTest.java @@ -182,7 +182,8 @@ public class HttpServerInventoryViewTest httpClient, druidNodeDiscoveryProvider, (pair) -> !pair.rhs.getDataSource().equals("non-loading-datasource"), - new HttpServerInventoryViewConfig(null, null, null) + new HttpServerInventoryViewConfig(null, null, null), + "test" ); CountDownLatch initializeCallback1 = new CountDownLatch(1); diff --git a/server/src/test/java/org/apache/druid/client/client/BatchServerInventoryViewTest.java b/server/src/test/java/org/apache/druid/client/client/BatchServerInventoryViewTest.java index 9ba44665963..25ac1ba34bc 100644 --- a/server/src/test/java/org/apache/druid/client/client/BatchServerInventoryViewTest.java +++ b/server/src/test/java/org/apache/druid/client/client/BatchServerInventoryViewTest.java @@ -181,7 +181,8 @@ public class BatchServerInventoryViewTest }, cf, jsonMapper, - Predicates.alwaysTrue() + Predicates.alwaysTrue(), + "test" ); batchServerInventoryView.start(); @@ -204,7 +205,8 @@ public class BatchServerInventoryViewTest { return input.rhs.getInterval().getStart().isBefore(SEGMENT_INTERVAL_START.plusDays(INITIAL_SEGMENTS)); } - } + }, + "test" ) { @Override diff --git a/server/src/test/java/org/apache/druid/server/coordinator/CuratorDruidCoordinatorTest.java b/server/src/test/java/org/apache/druid/server/coordinator/CuratorDruidCoordinatorTest.java index f8304085a2d..3adbbd638eb 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/CuratorDruidCoordinatorTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/CuratorDruidCoordinatorTest.java @@ -476,7 +476,8 @@ public class CuratorDruidCoordinatorTest extends CuratorTestBase zkPathsConfig, curator, jsonMapper, - Predicates.alwaysTrue() + Predicates.alwaysTrue(), + "test" ) { @Override diff --git a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java index 0a2db2cf1cb..ec0201bd5c1 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java @@ -29,12 +29,12 @@ import it.unimi.dsi.fastutil.objects.Object2LongMap; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.recipes.cache.PathChildrenCache; import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; +import org.apache.druid.client.BatchServerInventoryView; import org.apache.druid.client.DataSourcesSnapshot; import org.apache.druid.client.DruidDataSource; import org.apache.druid.client.DruidServer; import org.apache.druid.client.ImmutableDruidDataSource; import org.apache.druid.client.ImmutableDruidServer; -import org.apache.druid.client.SingleServerInventoryView; import org.apache.druid.common.config.JacksonConfigManager; import org.apache.druid.curator.CuratorTestBase; import org.apache.druid.curator.CuratorUtils; @@ -95,7 +95,7 @@ public class DruidCoordinatorTest extends CuratorTestBase private DataSourcesSnapshot dataSourcesSnapshot; private DruidCoordinatorRuntimeParams coordinatorRuntimeParams; - private SingleServerInventoryView serverInventoryView; + private BatchServerInventoryView serverInventoryView; private ScheduledExecutorFactory scheduledExecutorFactory; private DruidServer druidServer; private ConcurrentMap loadManagementPeons; @@ -113,7 +113,7 @@ public class DruidCoordinatorTest extends CuratorTestBase public void setUp() throws Exception { druidServer = EasyMock.createMock(DruidServer.class); - serverInventoryView = EasyMock.createMock(SingleServerInventoryView.class); + serverInventoryView = EasyMock.createMock(BatchServerInventoryView.class); segmentsMetadataManager = EasyMock.createNiceMock(SegmentsMetadataManager.class); dataSourcesSnapshot = EasyMock.createNiceMock(DataSourcesSnapshot.class); coordinatorRuntimeParams = EasyMock.createNiceMock(DruidCoordinatorRuntimeParams.class); diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaConcurrencyTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaConcurrencyTest.java index 6f0d0b42c17..67a626dc536 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaConcurrencyTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaConcurrencyTest.java @@ -29,10 +29,10 @@ import org.apache.druid.client.BrokerSegmentWatcherConfig; import org.apache.druid.client.BrokerServerView; import org.apache.druid.client.DruidServer; import org.apache.druid.client.FilteredServerInventoryView; +import org.apache.druid.client.FilteringSegmentCallback; import org.apache.druid.client.ServerView.CallbackAction; import org.apache.druid.client.ServerView.SegmentCallback; import org.apache.druid.client.ServerView.ServerRemovedCallback; -import org.apache.druid.client.SingleServerInventoryView.FilteringSegmentCallback; import org.apache.druid.client.TimelineServerView.TimelineCallback; import org.apache.druid.client.selector.HighestPriorityTierSelectorStrategy; import org.apache.druid.client.selector.RandomServerSelectorStrategy;