From dbf63f738fbdd74b4d9f9b48f6c02a59615f64ae Mon Sep 17 00:00:00 2001 From: Nishant Date: Tue, 19 Apr 2016 22:40:06 +0530 Subject: [PATCH] Add ability to filter segments for specific dataSources on broker without creating tiers (#2848) * Add back FilteredServerView removed in a32906c7fd11c9a8554df2621a172353a523a9dd to reduce memory usage using watched tiers. * Add functionality to specify "druid.broker.segment.watchedDataSources" --- docs/content/configuration/broker.md | 3 +- .../druid/indexing/test/TestServerView.java | 44 +++++- .../client/BatchServerInventoryView.java | 87 +++++++++- .../BatchServerInventoryViewProvider.java | 9 +- .../client/BrokerSegmentWatcherConfig.java | 8 + .../io/druid/client/BrokerServerView.java | 47 ++++-- ...teredBatchServerInventoryViewProvider.java | 57 +++++++ .../client/FilteredServerInventoryView.java | 36 +++++ .../FilteredServerInventoryViewProvider.java | 34 ++++ ...eredSingleServerInventoryViewProvider.java | 52 ++++++ .../io/druid/client/ServerInventoryView.java | 3 + .../client/SingleServerInventoryProvider.java | 9 +- .../client/SingleServerInventoryView.java | 93 ++++++++++- .../java/io/druid/guice/ServerViewModule.java | 6 + .../io/druid/server/ClientInfoResource.java | 5 +- .../BrokerSegmentWatcherConfigTest.java | 4 +- .../io/druid/client/BrokerServerViewTest.java | 6 +- .../client/CoordinatorServerViewTest.java | 4 +- .../client/BatchServerInventoryViewTest.java | 149 ++++++++++++++++-- .../druid/server/ClientInfoResourceTest.java | 7 +- 20 files changed, 609 insertions(+), 54 deletions(-) create mode 100644 server/src/main/java/io/druid/client/FilteredBatchServerInventoryViewProvider.java create mode 100644 server/src/main/java/io/druid/client/FilteredServerInventoryView.java create mode 100644 server/src/main/java/io/druid/client/FilteredServerInventoryViewProvider.java create mode 100644 server/src/main/java/io/druid/client/FilteredSingleServerInventoryViewProvider.java diff --git a/docs/content/configuration/broker.md b/docs/content/configuration/broker.md index 494b1fb7ab2..6ea33be3c5f 100644 --- a/docs/content/configuration/broker.md +++ b/docs/content/configuration/broker.md @@ -100,5 +100,6 @@ See [cache configuration](caching.html) for how to configure cache settings. |Property|Possible Values|Description|Default| |--------|---------------|-----------|-------| -|`druid.broker.segment.watchedTiers`|List of strings|Broker watches the segment announcements from nodes serving segments to build cache of which node is serving which segments, this configuration allows to only consider segments being served from a whitelist of tiers. By default, Broker would consider all. This can be used to partition your dataSources in specific historical tiers and configure brokers in partitions so that they are only queryable for specific dataSources.|none| +|`druid.broker.segment.watchedTiers`|List of strings|Broker watches the segment announcements from nodes serving segments to build cache of which node is serving which segments, this configuration allows to only consider segments being served from a whitelist of tiers. By default, Broker would consider all tiers. This can be used to partition your dataSources in specific historical tiers and configure brokers in partitions so that they are only queryable for specific dataSources.|none| +|`druid.broker.segment.watchedDataSources`|List of strings|Broker watches the segment announcements from nodes serving segments to build cache of which node is serving which segments, this configuration allows to only consider segments being served from a whitelist of dataSources. By default, Broker would consider all datasources. This can be used to configure brokers in partitions so that they are only queryable for specific dataSources.|none| diff --git a/indexing-service/src/test/java/io/druid/indexing/test/TestServerView.java b/indexing-service/src/test/java/io/druid/indexing/test/TestServerView.java index d281c8db492..8a333c0d08e 100644 --- a/indexing-service/src/test/java/io/druid/indexing/test/TestServerView.java +++ b/indexing-service/src/test/java/io/druid/indexing/test/TestServerView.java @@ -22,6 +22,8 @@ package io.druid.indexing.test; import com.google.common.base.Predicate; import com.google.common.collect.Maps; import com.metamx.common.Pair; +import io.druid.client.DruidServer; +import io.druid.client.FilteredServerInventoryView; import io.druid.client.ServerView; import io.druid.server.coordination.DruidServerMetadata; import io.druid.timeline.DataSegment; @@ -30,9 +32,25 @@ import java.util.Map; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executor; -public class TestServerView implements ServerView.SegmentCallback +public class TestServerView implements FilteredServerInventoryView, ServerView.SegmentCallback { - final ConcurrentMap, Executor>> callbacks = Maps.newConcurrentMap(); + final ConcurrentMap>, Executor>> callbacks = Maps.newConcurrentMap(); + + @Override + public void registerSegmentCallback( + final Executor exec, + final ServerView.SegmentCallback callback, + final Predicate> filter + ) + { + callbacks.put(callback, Pair.of(filter, exec)); + } + + @Override + public void registerServerCallback(Executor exec, ServerView.ServerCallback callback) + { + // No-op + } @Override public ServerView.CallbackAction segmentAdded( @@ -40,8 +58,8 @@ public class TestServerView implements ServerView.SegmentCallback final DataSegment segment ) { - for (final Map.Entry, Executor>> entry : callbacks.entrySet()) { - if (entry.getValue().lhs.apply(segment)) { + for (final Map.Entry>, Executor>> entry : callbacks.entrySet()) { + if (entry.getValue().lhs.apply(Pair.of(server,segment))) { entry.getValue().rhs.execute( new Runnable() { @@ -64,8 +82,8 @@ public class TestServerView implements ServerView.SegmentCallback final DataSegment segment ) { - for (final Map.Entry, Executor>> entry : callbacks.entrySet()) { - if (entry.getValue().lhs.apply(segment)) { + for (final Map.Entry>, Executor>> entry : callbacks.entrySet()) { + if (entry.getValue().lhs.apply(Pair.of(server, segment))) { entry.getValue().rhs.execute( new Runnable() { @@ -85,7 +103,7 @@ public class TestServerView implements ServerView.SegmentCallback @Override public ServerView.CallbackAction segmentViewInitialized() { - for (final Map.Entry, Executor>> entry : callbacks.entrySet()) { + for (final Map.Entry>, Executor>> entry : callbacks.entrySet()) { entry.getValue().rhs.execute( new Runnable() { @@ -100,4 +118,16 @@ public class TestServerView implements ServerView.SegmentCallback return ServerView.CallbackAction.CONTINUE; } + + @Override + public DruidServer getInventoryValue(String string) + { + return null; + } + + @Override + public Iterable getInventory() + { + return null; + } } diff --git a/server/src/main/java/io/druid/client/BatchServerInventoryView.java b/server/src/main/java/io/druid/client/BatchServerInventoryView.java index 35fc92325dc..d868416bf8c 100644 --- a/server/src/main/java/io/druid/client/BatchServerInventoryView.java +++ b/server/src/main/java/io/druid/client/BatchServerInventoryView.java @@ -21,33 +21,46 @@ package io.druid.client; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Function; +import com.google.common.base.Preconditions; +import com.google.common.base.Predicate; +import com.google.common.base.Predicates; +import com.google.common.collect.Iterables; import com.google.common.collect.MapMaker; import com.google.common.collect.Sets; import com.google.inject.Inject; import com.metamx.common.ISE; +import com.metamx.common.Pair; import com.metamx.emitter.EmittingLogger; import io.druid.guice.ManageLifecycle; +import io.druid.server.coordination.DruidServerMetadata; import io.druid.server.initialization.ZkPathsConfig; import io.druid.timeline.DataSegment; import org.apache.curator.framework.CuratorFramework; import java.util.Set; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Executor; /** */ @ManageLifecycle public class BatchServerInventoryView extends ServerInventoryView> + implements FilteredServerInventoryView { private static final EmittingLogger log = new EmittingLogger(BatchServerInventoryView.class); final private ConcurrentMap> zNodes = new MapMaker().makeMap(); + final private ConcurrentMap>> segmentPredicates = new MapMaker() + .makeMap(); + final private Predicate> defaultFilter; @Inject public BatchServerInventoryView( final ZkPathsConfig zkPaths, final CuratorFramework curator, - final ObjectMapper jsonMapper + final ObjectMapper jsonMapper, + final Predicate> defaultFilter ) { super( @@ -60,6 +73,8 @@ public class BatchServerInventoryView extends ServerInventoryView inventory ) { - zNodes.put(inventoryKey, inventory); - for (DataSegment segment : inventory) { + Set filteredInventory = filterInventory(container, inventory); + zNodes.put(inventoryKey, filteredInventory); + for (DataSegment segment : filteredInventory) { addSingleInventory(container, segment); } return container; } + private Set filterInventory(final DruidServer container, Set inventory) + { + Predicate> predicate = Predicates.or( + defaultFilter, + Predicates.or(segmentPredicates.values()) + ); + + // make a copy of the set and not just a filtered view, in order to not keep all the segment data in memory + Set filteredInventory = Sets.newHashSet(Iterables.transform( + Iterables.filter( + Iterables.transform( + inventory, + new Function>() + { + @Override + public Pair apply(DataSegment input) + { + return Pair.of(container.getMetadata(), input); + } + } + ), + predicate + ), + new Function, DataSegment>() + { + @Override + public DataSegment apply( + Pair input + ) + { + return input.rhs; + } + } + )); + return filteredInventory; + } + @Override protected DruidServer updateInnerInventory( DruidServer container, String inventoryKey, Set inventory ) { + Set filteredInventory = filterInventory(container, inventory); + Set existing = zNodes.get(inventoryKey); if (existing == null) { throw new ISE("Trying to update an inventoryKey[%s] that didn't exist?!", inventoryKey); } - for (DataSegment segment : Sets.difference(inventory, existing)) { + for (DataSegment segment : Sets.difference(filteredInventory, existing)) { addSingleInventory(container, segment); } - for (DataSegment segment : Sets.difference(existing, inventory)) { + for (DataSegment segment : Sets.difference(existing, filteredInventory)) { removeSingleInventory(container, segment.getIdentifier()); } - zNodes.put(inventoryKey, inventory); + zNodes.put(inventoryKey, filteredInventory); return container; } @@ -113,4 +168,24 @@ public class BatchServerInventoryView extends ServerInventoryView> filter + ) + { + SegmentCallback filteringCallback = new SingleServerInventoryView.FilteringSegmentCallback(callback, filter); + segmentPredicates.put(filteringCallback, filter); + registerSegmentCallback( + exec, + filteringCallback + ); + } + + @Override + protected void segmentCallbackRemoved(SegmentCallback callback) + { + segmentPredicates.remove(callback); + } } diff --git a/server/src/main/java/io/druid/client/BatchServerInventoryViewProvider.java b/server/src/main/java/io/druid/client/BatchServerInventoryViewProvider.java index 9a1ef188f00..4b462577489 100644 --- a/server/src/main/java/io/druid/client/BatchServerInventoryViewProvider.java +++ b/server/src/main/java/io/druid/client/BatchServerInventoryViewProvider.java @@ -22,6 +22,8 @@ package io.druid.client; import com.fasterxml.jackson.annotation.JacksonInject; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Predicates; +import com.metamx.common.Pair; +import io.druid.server.coordination.DruidServerMetadata; import io.druid.server.initialization.ZkPathsConfig; import io.druid.timeline.DataSegment; import org.apache.curator.framework.CuratorFramework; @@ -47,6 +49,11 @@ public class BatchServerInventoryViewProvider implements ServerInventoryViewProv @Override public BatchServerInventoryView get() { - return new BatchServerInventoryView(zkPaths, curator, jsonMapper); + return new BatchServerInventoryView( + zkPaths, + curator, + jsonMapper, + Predicates.>alwaysTrue() + ); } } diff --git a/server/src/main/java/io/druid/client/BrokerSegmentWatcherConfig.java b/server/src/main/java/io/druid/client/BrokerSegmentWatcherConfig.java index 0ef6f090a39..f19b7b898b9 100644 --- a/server/src/main/java/io/druid/client/BrokerSegmentWatcherConfig.java +++ b/server/src/main/java/io/druid/client/BrokerSegmentWatcherConfig.java @@ -30,8 +30,16 @@ public class BrokerSegmentWatcherConfig @JsonProperty private Set watchedTiers = null; + @JsonProperty + private Set watchedDataSources = null; + public Set getWatchedTiers() { return watchedTiers; } + + public Set getWatchedDataSources() + { + return watchedDataSources; + } } diff --git a/server/src/main/java/io/druid/client/BrokerServerView.java b/server/src/main/java/io/druid/client/BrokerServerView.java index 1373c9c877c..95826bb60ca 100644 --- a/server/src/main/java/io/druid/client/BrokerServerView.java +++ b/server/src/main/java/io/druid/client/BrokerServerView.java @@ -20,10 +20,12 @@ package io.druid.client; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Predicate; import com.google.common.collect.Iterables; import com.google.common.collect.Maps; import com.google.common.collect.Ordering; import com.google.inject.Inject; +import com.metamx.common.Pair; import com.metamx.common.logger.Logger; import com.metamx.emitter.service.ServiceEmitter; import com.metamx.http.client.HttpClient; @@ -42,6 +44,7 @@ import io.druid.timeline.DataSegment; import io.druid.timeline.VersionedIntervalTimeline; import io.druid.timeline.partition.PartitionChunk; +import javax.annotation.Nullable; import java.util.Iterator; import java.util.Map; import java.util.concurrent.ConcurrentMap; @@ -64,10 +67,10 @@ public class BrokerServerView implements TimelineServerView private final QueryWatcher queryWatcher; private final ObjectMapper smileMapper; private final HttpClient httpClient; - private final ServerInventoryView baseView; + private final FilteredServerInventoryView baseView; private final TierSelectorStrategy tierSelectorStrategy; private final ServiceEmitter emitter; - private final BrokerSegmentWatcherConfig segmentWatcherConfig; + private final Predicate> segmentFilter; private volatile boolean initialized = false; @@ -77,10 +80,10 @@ public class BrokerServerView implements TimelineServerView QueryWatcher queryWatcher, @Smile ObjectMapper smileMapper, @Client HttpClient httpClient, - ServerInventoryView baseView, + FilteredServerInventoryView baseView, TierSelectorStrategy tierSelectorStrategy, ServiceEmitter emitter, - BrokerSegmentWatcherConfig segmentWatcherConfig + final BrokerSegmentWatcherConfig segmentWatcherConfig ) { this.warehouse = warehouse; @@ -90,12 +93,30 @@ public class BrokerServerView implements TimelineServerView this.baseView = baseView; this.tierSelectorStrategy = tierSelectorStrategy; this.emitter = emitter; - this.segmentWatcherConfig = segmentWatcherConfig; - this.clients = Maps.newConcurrentMap(); this.selectors = Maps.newHashMap(); this.timelines = Maps.newHashMap(); + this.segmentFilter = new Predicate>() + { + @Override + public boolean apply( + Pair input + ) + { + if (segmentWatcherConfig.getWatchedTiers() != null + && !segmentWatcherConfig.getWatchedTiers().contains(input.lhs.getTier())) { + return false; + } + + if (segmentWatcherConfig.getWatchedDataSources() != null + && !segmentWatcherConfig.getWatchedDataSources().contains(input.rhs.getDataSource())) { + return false; + } + + return true; + } + }; ExecutorService exec = Execs.singleThreaded("BrokerServerView-%s"); baseView.registerSegmentCallback( exec, @@ -121,7 +142,8 @@ public class BrokerServerView implements TimelineServerView initialized = true; return ServerView.CallbackAction.CONTINUE; } - } + }, + segmentFilter ); baseView.registerServerCallback( @@ -191,10 +213,7 @@ public class BrokerServerView implements TimelineServerView private void serverAddedSegment(final DruidServerMetadata server, final DataSegment segment) { - if (segmentWatcherConfig.getWatchedTiers() != null && !segmentWatcherConfig.getWatchedTiers() - .contains(server.getTier())) { - return; - } + String segmentId = segment.getIdentifier(); synchronized (lock) { @@ -224,10 +243,6 @@ public class BrokerServerView implements TimelineServerView private void serverRemovedSegment(DruidServerMetadata server, DataSegment segment) { - if (segmentWatcherConfig.getWatchedTiers() != null && !segmentWatcherConfig.getWatchedTiers() - .contains(server.getTier())) { - return; - } String segmentId = segment.getIdentifier(); final ServerSelector selector; @@ -301,6 +316,6 @@ public class BrokerServerView implements TimelineServerView @Override public void registerSegmentCallback(Executor exec, SegmentCallback callback) { - baseView.registerSegmentCallback(exec, callback); + baseView.registerSegmentCallback(exec, callback, segmentFilter); } } diff --git a/server/src/main/java/io/druid/client/FilteredBatchServerInventoryViewProvider.java b/server/src/main/java/io/druid/client/FilteredBatchServerInventoryViewProvider.java new file mode 100644 index 00000000000..581470cee1d --- /dev/null +++ b/server/src/main/java/io/druid/client/FilteredBatchServerInventoryViewProvider.java @@ -0,0 +1,57 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.client; + +import com.fasterxml.jackson.annotation.JacksonInject; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Predicates; +import com.metamx.common.Pair; +import io.druid.server.coordination.DruidServerMetadata; +import io.druid.server.initialization.ZkPathsConfig; +import io.druid.timeline.DataSegment; +import org.apache.curator.framework.CuratorFramework; + +import javax.validation.constraints.NotNull; + +public class FilteredBatchServerInventoryViewProvider implements FilteredServerInventoryViewProvider +{ + @JacksonInject + @NotNull + private ZkPathsConfig zkPaths = null; + + @JacksonInject + @NotNull + private CuratorFramework curator = null; + + @JacksonInject + @NotNull + private ObjectMapper jsonMapper = null; + + @Override + public BatchServerInventoryView get() + { + return new BatchServerInventoryView( + zkPaths, + curator, + jsonMapper, + Predicates.>alwaysFalse() + ); + } +} diff --git a/server/src/main/java/io/druid/client/FilteredServerInventoryView.java b/server/src/main/java/io/druid/client/FilteredServerInventoryView.java new file mode 100644 index 00000000000..91196424088 --- /dev/null +++ b/server/src/main/java/io/druid/client/FilteredServerInventoryView.java @@ -0,0 +1,36 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.client; + +import com.google.common.base.Predicate; +import com.metamx.common.Pair; +import io.druid.server.coordination.DruidServerMetadata; +import io.druid.timeline.DataSegment; + +import java.util.concurrent.Executor; + +public interface FilteredServerInventoryView extends InventoryView +{ + public void registerSegmentCallback( + Executor exec, ServerView.SegmentCallback callback, Predicate> filter + ); + + public void registerServerCallback(Executor exec, ServerView.ServerCallback callback); +} diff --git a/server/src/main/java/io/druid/client/FilteredServerInventoryViewProvider.java b/server/src/main/java/io/druid/client/FilteredServerInventoryViewProvider.java new file mode 100644 index 00000000000..f6f7fa74608 --- /dev/null +++ b/server/src/main/java/io/druid/client/FilteredServerInventoryViewProvider.java @@ -0,0 +1,34 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.client; + + +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import com.google.inject.Provider; + +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = FilteredBatchServerInventoryViewProvider.class) +@JsonSubTypes(value = { + @JsonSubTypes.Type(name = "legacy", value = FilteredSingleServerInventoryViewProvider.class), + @JsonSubTypes.Type(name = "batch", value = FilteredBatchServerInventoryViewProvider.class) +}) +public interface FilteredServerInventoryViewProvider extends Provider +{ +} diff --git a/server/src/main/java/io/druid/client/FilteredSingleServerInventoryViewProvider.java b/server/src/main/java/io/druid/client/FilteredSingleServerInventoryViewProvider.java new file mode 100644 index 00000000000..37c6406af4e --- /dev/null +++ b/server/src/main/java/io/druid/client/FilteredSingleServerInventoryViewProvider.java @@ -0,0 +1,52 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.client; + +import com.fasterxml.jackson.annotation.JacksonInject; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Predicates; +import com.metamx.common.Pair; +import io.druid.server.coordination.DruidServerMetadata; +import io.druid.server.initialization.ZkPathsConfig; +import io.druid.timeline.DataSegment; +import org.apache.curator.framework.CuratorFramework; + +import javax.validation.constraints.NotNull; + +public class FilteredSingleServerInventoryViewProvider implements FilteredServerInventoryViewProvider +{ + @JacksonInject + @NotNull + private ZkPathsConfig zkPaths = null; + + @JacksonInject + @NotNull + private CuratorFramework curator = null; + + @JacksonInject + @NotNull + private ObjectMapper jsonMapper = null; + + @Override + public SingleServerInventoryView get() + { + return new SingleServerInventoryView(zkPaths, curator, jsonMapper, Predicates.>alwaysFalse()); + } +} diff --git a/server/src/main/java/io/druid/client/ServerInventoryView.java b/server/src/main/java/io/druid/client/ServerInventoryView.java index e103e9f7f64..9aa32e28d76 100644 --- a/server/src/main/java/io/druid/client/ServerInventoryView.java +++ b/server/src/main/java/io/druid/client/ServerInventoryView.java @@ -240,6 +240,7 @@ public abstract class ServerInventoryView implements ServerView, public void run() { if (CallbackAction.UNREGISTER == fn.apply(entry.getKey())) { + segmentCallbackRemoved(entry.getKey()); segmentCallbacks.remove(entry.getKey()); } } @@ -342,4 +343,6 @@ public abstract class ServerInventoryView implements ServerView, final DruidServer container, String inventoryKey ); + + protected abstract void segmentCallbackRemoved(SegmentCallback callback); } diff --git a/server/src/main/java/io/druid/client/SingleServerInventoryProvider.java b/server/src/main/java/io/druid/client/SingleServerInventoryProvider.java index 76d170a34af..f7c1e2749f1 100644 --- a/server/src/main/java/io/druid/client/SingleServerInventoryProvider.java +++ b/server/src/main/java/io/druid/client/SingleServerInventoryProvider.java @@ -22,6 +22,8 @@ package io.druid.client; import com.fasterxml.jackson.annotation.JacksonInject; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Predicates; +import com.metamx.common.Pair; +import io.druid.server.coordination.DruidServerMetadata; import io.druid.server.initialization.ZkPathsConfig; import io.druid.timeline.DataSegment; import org.apache.curator.framework.CuratorFramework; @@ -47,6 +49,11 @@ public class SingleServerInventoryProvider implements ServerInventoryViewProvide @Override public ServerInventoryView get() { - return new SingleServerInventoryView(zkPaths, curator, jsonMapper); + return new SingleServerInventoryView( + zkPaths, + curator, + jsonMapper, + Predicates.>alwaysTrue() + ); } } diff --git a/server/src/main/java/io/druid/client/SingleServerInventoryView.java b/server/src/main/java/io/druid/client/SingleServerInventoryView.java index 3a51cf0dd96..929644e0fad 100644 --- a/server/src/main/java/io/druid/client/SingleServerInventoryView.java +++ b/server/src/main/java/io/druid/client/SingleServerInventoryView.java @@ -21,25 +21,39 @@ package io.druid.client; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Preconditions; +import com.google.common.base.Predicate; +import com.google.common.base.Predicates; +import com.google.common.collect.MapMaker; import com.google.inject.Inject; +import com.metamx.common.Pair; import com.metamx.emitter.EmittingLogger; import io.druid.guice.ManageLifecycle; +import io.druid.server.coordination.DruidServerMetadata; import io.druid.server.initialization.ZkPathsConfig; import io.druid.timeline.DataSegment; import org.apache.curator.framework.CuratorFramework; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Executor; + /** */ @ManageLifecycle -public class SingleServerInventoryView extends ServerInventoryView +public class SingleServerInventoryView extends ServerInventoryView implements FilteredServerInventoryView { private static final EmittingLogger log = new EmittingLogger(SingleServerInventoryView.class); + final private ConcurrentMap>> segmentPredicates = new MapMaker() + .makeMap(); + private final Predicate> defaultFilter; + @Inject public SingleServerInventoryView( final ZkPathsConfig zkPaths, final CuratorFramework curator, - final ObjectMapper jsonMapper + final ObjectMapper jsonMapper, + final Predicate> defaultFilter ) { super( @@ -52,6 +66,9 @@ public class SingleServerInventoryView extends ServerInventoryView { } ); + + Preconditions.checkNotNull(defaultFilter); + this.defaultFilter = defaultFilter; } @Override @@ -59,7 +76,13 @@ public class SingleServerInventoryView extends ServerInventoryView DruidServer container, String inventoryKey, DataSegment inventory ) { - addSingleInventory(container, inventory); + Predicate> predicate = Predicates.or( + defaultFilter, + Predicates.or(segmentPredicates.values()) + ); + if (predicate.apply(Pair.of(container.getMetadata(), inventory))) { + addSingleInventory(container, inventory); + } return container; } @@ -77,4 +100,68 @@ public class SingleServerInventoryView extends ServerInventoryView removeSingleInventory(container, inventoryKey); return container; } + + public void registerSegmentCallback( + final Executor exec, + final SegmentCallback callback, + final Predicate> filter + ) + { + SegmentCallback filteringCallback = new SingleServerInventoryView.FilteringSegmentCallback(callback, filter); + segmentPredicates.put(filteringCallback, filter); + registerSegmentCallback( + exec, + filteringCallback + ); + } + + @Override + protected void segmentCallbackRemoved(SegmentCallback callback) + { + segmentPredicates.remove(callback); + } + + static class FilteringSegmentCallback implements SegmentCallback + { + + private final SegmentCallback callback; + private final Predicate> filter; + + FilteringSegmentCallback(SegmentCallback callback, Predicate> filter) + { + this.callback = callback; + this.filter = filter; + } + + @Override + public CallbackAction segmentAdded(DruidServerMetadata server, DataSegment segment) + { + final CallbackAction action; + if (filter.apply(Pair.of(server, segment))) { + action = callback.segmentAdded(server, segment); + } else { + action = CallbackAction.CONTINUE; + } + return action; + } + + @Override + public CallbackAction segmentRemoved(DruidServerMetadata server, DataSegment segment) + { + final CallbackAction action; + if (filter.apply(Pair.of(server, segment))) { + action = callback.segmentRemoved(server, segment); + } else { + action = CallbackAction.CONTINUE; + } + return action; + } + + @Override + public CallbackAction segmentViewInitialized() + { + return callback.segmentViewInitialized(); + } + } + } diff --git a/server/src/main/java/io/druid/guice/ServerViewModule.java b/server/src/main/java/io/druid/guice/ServerViewModule.java index 356122e65a8..e88b1dbc55f 100644 --- a/server/src/main/java/io/druid/guice/ServerViewModule.java +++ b/server/src/main/java/io/druid/guice/ServerViewModule.java @@ -21,6 +21,8 @@ package io.druid.guice; import com.google.inject.Binder; import com.google.inject.Module; +import io.druid.client.FilteredServerInventoryView; +import io.druid.client.FilteredServerInventoryViewProvider; import io.druid.client.InventoryView; import io.druid.client.ServerInventoryView; import io.druid.client.ServerInventoryViewProvider; @@ -34,8 +36,12 @@ public class ServerViewModule implements Module public void configure(Binder binder) { JsonConfigProvider.bind(binder, "druid.announcer", ServerInventoryViewProvider.class); + JsonConfigProvider.bind(binder, "druid.announcer", FilteredServerInventoryViewProvider.class); binder.bind(InventoryView.class).to(ServerInventoryView.class); binder.bind(ServerView.class).to(ServerInventoryView.class); binder.bind(ServerInventoryView.class).toProvider(ServerInventoryViewProvider.class).in(ManageLifecycle.class); + binder.bind(FilteredServerInventoryView.class) + .toProvider(FilteredServerInventoryViewProvider.class) + .in(ManageLifecycle.class); } } diff --git a/server/src/main/java/io/druid/server/ClientInfoResource.java b/server/src/main/java/io/druid/server/ClientInfoResource.java index b5de6f5c582..e3a653fe371 100644 --- a/server/src/main/java/io/druid/server/ClientInfoResource.java +++ b/server/src/main/java/io/druid/server/ClientInfoResource.java @@ -28,6 +28,7 @@ import com.google.inject.Inject; import com.metamx.common.logger.Logger; import io.druid.client.DruidDataSource; import io.druid.client.DruidServer; +import io.druid.client.FilteredServerInventoryView; import io.druid.client.InventoryView; import io.druid.client.TimelineServerView; import io.druid.client.selector.ServerSelector; @@ -63,13 +64,13 @@ public class ClientInfoResource private static final String KEY_DIMENSIONS = "dimensions"; private static final String KEY_METRICS = "metrics"; - private InventoryView serverInventoryView; + private FilteredServerInventoryView serverInventoryView; private TimelineServerView timelineServerView; private SegmentMetadataQueryConfig segmentMetadataQueryConfig; @Inject public ClientInfoResource( - InventoryView serverInventoryView, + FilteredServerInventoryView serverInventoryView, TimelineServerView timelineServerView, SegmentMetadataQueryConfig segmentMetadataQueryConfig ) diff --git a/server/src/test/java/io/druid/client/BrokerSegmentWatcherConfigTest.java b/server/src/test/java/io/druid/client/BrokerSegmentWatcherConfigTest.java index 95a90bd7dd3..a1a786fd0a1 100644 --- a/server/src/test/java/io/druid/client/BrokerSegmentWatcherConfigTest.java +++ b/server/src/test/java/io/druid/client/BrokerSegmentWatcherConfigTest.java @@ -47,7 +47,7 @@ public class BrokerSegmentWatcherConfigTest Assert.assertNull(config.getWatchedTiers()); //non-defaults - json = "{ \"watchedTiers\": [\"t1\", \"t2\"] }"; + json = "{ \"watchedTiers\": [\"t1\", \"t2\"], \"watchedDataSources\": [\"ds1\", \"ds2\"] }"; config = MAPPER.readValue( MAPPER.writeValueAsString( @@ -57,5 +57,7 @@ public class BrokerSegmentWatcherConfigTest ); Assert.assertEquals(ImmutableSet.of("t1", "t2"), config.getWatchedTiers()); + Assert.assertEquals(ImmutableSet.of("ds1", "ds2"), config.getWatchedDataSources()); + } } diff --git a/server/src/test/java/io/druid/client/BrokerServerViewTest.java b/server/src/test/java/io/druid/client/BrokerServerViewTest.java index 24a5d7510a1..2eee70602e9 100644 --- a/server/src/test/java/io/druid/client/BrokerServerViewTest.java +++ b/server/src/test/java/io/druid/client/BrokerServerViewTest.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.dataformat.smile.SmileFactory; import com.fasterxml.jackson.dataformat.smile.SmileGenerator; import com.google.common.base.Function; +import com.google.common.base.Predicates; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; @@ -67,7 +68,7 @@ public class BrokerServerViewTest extends CuratorTestBase private CountDownLatch segmentAddedLatch; private CountDownLatch segmentRemovedLatch; - private ServerInventoryView baseView; + private BatchServerInventoryView baseView; private BrokerServerView brokerServerView; public BrokerServerViewTest() @@ -289,7 +290,8 @@ public class BrokerServerViewTest extends CuratorTestBase baseView = new BatchServerInventoryView( zkPathsConfig, curator, - jsonMapper + jsonMapper, + Predicates.>alwaysTrue() ) { @Override diff --git a/server/src/test/java/io/druid/client/CoordinatorServerViewTest.java b/server/src/test/java/io/druid/client/CoordinatorServerViewTest.java index 17636c03122..234b765b1b9 100644 --- a/server/src/test/java/io/druid/client/CoordinatorServerViewTest.java +++ b/server/src/test/java/io/druid/client/CoordinatorServerViewTest.java @@ -21,6 +21,7 @@ package io.druid.client; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Function; +import com.google.common.base.Predicates; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; @@ -291,7 +292,8 @@ public class CoordinatorServerViewTest extends CuratorTestBase baseView = new BatchServerInventoryView( zkPathsConfig, curator, - jsonMapper + jsonMapper, + Predicates.>alwaysTrue() ) { @Override diff --git a/server/src/test/java/io/druid/client/client/BatchServerInventoryViewTest.java b/server/src/test/java/io/druid/client/client/BatchServerInventoryViewTest.java index df30f9abeff..9f01467780d 100644 --- a/server/src/test/java/io/druid/client/client/BatchServerInventoryViewTest.java +++ b/server/src/test/java/io/druid/client/client/BatchServerInventoryViewTest.java @@ -20,6 +20,8 @@ package io.druid.client.client; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Predicate; +import com.google.common.base.Predicates; import com.google.common.base.Stopwatch; import com.google.common.base.Throwables; import com.google.common.collect.Iterables; @@ -29,8 +31,10 @@ import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import com.metamx.common.ISE; +import com.metamx.common.Pair; import io.druid.client.BatchServerInventoryView; import io.druid.client.DruidServer; +import io.druid.client.ServerView; import io.druid.curator.PotentiallyGzippedCompressionProvider; import io.druid.curator.announcement.Announcer; import io.druid.jackson.DefaultObjectMapper; @@ -44,6 +48,9 @@ import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.curator.test.TestingCluster; import org.apache.curator.test.Timing; +import org.easymock.EasyMock; +import org.easymock.IAnswer; +import org.easymock.LogicalOperator; import org.joda.time.DateTime; import org.joda.time.Interval; import org.junit.After; @@ -53,7 +60,9 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; +import javax.annotation.Nullable; import java.util.ArrayList; +import java.util.Comparator; import java.util.List; import java.util.Set; import java.util.concurrent.Callable; @@ -152,7 +161,8 @@ public class BatchServerInventoryViewTest } }, cf, - jsonMapper + jsonMapper, + Predicates.>alwaysTrue() ); batchServerInventoryView.start(); @@ -167,9 +177,16 @@ public class BatchServerInventoryViewTest } }, cf, - jsonMapper - ) - { + jsonMapper, + new Predicate>() + { + @Override + public boolean apply(@Nullable Pair input) + { + return input.rhs.getInterval().getStart().isBefore(SEGMENT_INTERVAL_START.plusDays(INITIAL_SEGMENTS)); + } + } + ){ @Override protected DruidServer addInnerInventory( DruidServer container, String inventoryKey, Set inventory @@ -228,6 +245,122 @@ public class BatchServerInventoryViewTest Assert.assertEquals(testSegments, Sets.newHashSet(server.getSegments().values())); } + @Test + public void testRunWithFilter() throws Exception + { + segmentAnnouncer.announceSegments(testSegments); + + waitForSync(filteredBatchServerInventoryView, testSegments); + + DruidServer server = Iterables.get(filteredBatchServerInventoryView.getInventory(), 0); + Set segments = Sets.newHashSet(server.getSegments().values()); + + Assert.assertEquals(testSegments, segments); + int prevUpdateCount = inventoryUpdateCounter.get(); + // segment outside the range of default filter + DataSegment segment1 = makeSegment(101); + segmentAnnouncer.announceSegment(segment1); + testSegments.add(segment1); + + waitForUpdateEvents(prevUpdateCount + 1); + Assert.assertNull( + Iterables.getOnlyElement(filteredBatchServerInventoryView.getInventory()) + .getSegment(segment1.getIdentifier()) + ); + } + + @Test + public void testRunWithFilterCallback() throws Exception + { + final CountDownLatch removeCallbackLatch = new CountDownLatch(1); + + segmentAnnouncer.announceSegments(testSegments); + + waitForSync(filteredBatchServerInventoryView, testSegments); + + DruidServer server = Iterables.get(filteredBatchServerInventoryView.getInventory(), 0); + Set segments = Sets.newHashSet(server.getSegments().values()); + + Assert.assertEquals(testSegments, segments); + + ServerView.SegmentCallback callback = EasyMock.createStrictMock(ServerView.SegmentCallback.class); + Comparator dataSegmentComparator = new Comparator() + { + @Override + public int compare(DataSegment o1, DataSegment o2) + { + return o1.getInterval().equals(o2.getInterval()) ? 0 : -1; + } + }; + + EasyMock + .expect( + callback.segmentAdded( + EasyMock.anyObject(), + EasyMock.cmp(makeSegment(INITIAL_SEGMENTS + 2), dataSegmentComparator, LogicalOperator.EQUAL) + ) + ) + .andReturn(ServerView.CallbackAction.CONTINUE) + .times(1); + + EasyMock + .expect( + callback.segmentRemoved( + EasyMock.anyObject(), + EasyMock.cmp(makeSegment(INITIAL_SEGMENTS + 2), dataSegmentComparator, LogicalOperator.EQUAL) + ) + ) + .andAnswer( + new IAnswer() + { + @Override + public ServerView.CallbackAction answer() throws Throwable + { + removeCallbackLatch.countDown(); + return ServerView.CallbackAction.CONTINUE; + } + } + ) + .times(1); + + + EasyMock.replay(callback); + + filteredBatchServerInventoryView.registerSegmentCallback( + MoreExecutors.sameThreadExecutor(), + callback, + new Predicate>() + { + @Override + public boolean apply(@Nullable Pair input) + { + return input.rhs.getInterval().getStart().equals(SEGMENT_INTERVAL_START.plusDays(INITIAL_SEGMENTS + 2)); + } + } + ); + + DataSegment segment2 = makeSegment(INITIAL_SEGMENTS + 2); + segmentAnnouncer.announceSegment(segment2); + testSegments.add(segment2); + + DataSegment oldSegment = makeSegment(-1); + segmentAnnouncer.announceSegment(oldSegment); + testSegments.add(oldSegment); + + segmentAnnouncer.unannounceSegment(oldSegment); + testSegments.remove(oldSegment); + + waitForSync(filteredBatchServerInventoryView, testSegments); + + segmentAnnouncer.unannounceSegment(segment2); + testSegments.remove(segment2); + + waitForSync(filteredBatchServerInventoryView, testSegments); + timing.forWaiting().awaitLatch(removeCallbackLatch); + + EasyMock.verify(callback); + } + private DataSegment makeSegment(int offset) { return DataSegment.builder() @@ -264,11 +397,7 @@ public class BatchServerInventoryViewTest while (inventoryUpdateCounter.get() != count) { Thread.sleep(100); if (stopwatch.elapsed(TimeUnit.MILLISECONDS) > forWaitingTiming.milliseconds()) { - throw new ISE( - "BatchServerInventoryView is not updating counter expected[%d] value[%d]", - count, - inventoryUpdateCounter.get() - ); + throw new ISE("BatchServerInventoryView is not updating counter expected[%d] value[%d]", count, inventoryUpdateCounter.get()); } } } @@ -332,7 +461,7 @@ public class BatchServerInventoryViewTest List segments = new ArrayList(); try { for (int j = 0; j < INITIAL_SEGMENTS / numThreads; ++j) { - segments.add(makeSegment(INITIAL_SEGMENTS + ii + numThreads * j)); + segments.add(makeSegment(INITIAL_SEGMENTS + ii + numThreads * j)); } latch.countDown(); latch.await(); diff --git a/server/src/test/java/io/druid/server/ClientInfoResourceTest.java b/server/src/test/java/io/druid/server/ClientInfoResourceTest.java index 2fec13f7318..a81938a7284 100644 --- a/server/src/test/java/io/druid/server/ClientInfoResourceTest.java +++ b/server/src/test/java/io/druid/server/ClientInfoResourceTest.java @@ -41,6 +41,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Ordering; import io.druid.client.DruidServer; +import io.druid.client.FilteredServerInventoryView; import io.druid.client.InventoryView; import io.druid.client.TimelineServerView; import io.druid.client.selector.ServerSelector; @@ -70,7 +71,7 @@ public class ClientInfoResourceTest private final String dataSource = "test-data-source"; - private InventoryView serverInventoryView; + private FilteredServerInventoryView serverInventoryView; private TimelineServerView timelineServerView; private ClientInfoResource resource; @@ -130,7 +131,7 @@ public class ClientInfoResourceTest new NumberedShardSpec(0, 2) ); - serverInventoryView = EasyMock.createMock(InventoryView.class); + serverInventoryView = EasyMock.createMock(FilteredServerInventoryView.class); EasyMock.expect(serverInventoryView.getInventory()).andReturn(ImmutableList.of(server)).anyTimes(); timelineServerView = EasyMock.createMock(TimelineServerView.class); @@ -405,7 +406,7 @@ public class ClientInfoResourceTest } private ClientInfoResource getResourceTestHelper( - InventoryView serverInventoryView, + FilteredServerInventoryView serverInventoryView, TimelineServerView timelineServerView, SegmentMetadataQueryConfig segmentMetadataQueryConfig )