Add ability to filter segments for specific dataSources on broker without creating tiers (#2848)

* Add back FilteredServerView removed in a32906c7fd to reduce memory usage using watched tiers.

* Add functionality to specify "druid.broker.segment.watchedDataSources"
This commit is contained in:
Nishant 2016-04-19 22:40:06 +05:30 committed by Xavier Léauté
parent 08c784fbf6
commit dbf63f738f
20 changed files with 609 additions and 54 deletions

View File

@ -100,5 +100,6 @@ See [cache configuration](caching.html) for how to configure cache settings.
|Property|Possible Values|Description|Default|
|--------|---------------|-----------|-------|
|`druid.broker.segment.watchedTiers`|List of strings|Broker watches the segment announcements from nodes serving segments to build cache of which node is serving which segments, this configuration allows to only consider segments being served from a whitelist of tiers. By default, Broker would consider all. This can be used to partition your dataSources in specific historical tiers and configure brokers in partitions so that they are only queryable for specific dataSources.|none|
|`druid.broker.segment.watchedTiers`|List of strings|Broker watches the segment announcements from nodes serving segments to build cache of which node is serving which segments, this configuration allows to only consider segments being served from a whitelist of tiers. By default, Broker would consider all tiers. This can be used to partition your dataSources in specific historical tiers and configure brokers in partitions so that they are only queryable for specific dataSources.|none|
|`druid.broker.segment.watchedDataSources`|List of strings|Broker watches the segment announcements from nodes serving segments to build cache of which node is serving which segments, this configuration allows to only consider segments being served from a whitelist of dataSources. By default, Broker would consider all datasources. This can be used to configure brokers in partitions so that they are only queryable for specific dataSources.|none|

View File

@ -22,6 +22,8 @@ package io.druid.indexing.test;
import com.google.common.base.Predicate;
import com.google.common.collect.Maps;
import com.metamx.common.Pair;
import io.druid.client.DruidServer;
import io.druid.client.FilteredServerInventoryView;
import io.druid.client.ServerView;
import io.druid.server.coordination.DruidServerMetadata;
import io.druid.timeline.DataSegment;
@ -30,9 +32,25 @@ import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
public class TestServerView implements ServerView.SegmentCallback
public class TestServerView implements FilteredServerInventoryView, ServerView.SegmentCallback
{
final ConcurrentMap<ServerView.SegmentCallback, Pair<Predicate<DataSegment>, Executor>> callbacks = Maps.newConcurrentMap();
final ConcurrentMap<ServerView.SegmentCallback, Pair<Predicate<Pair<DruidServerMetadata, DataSegment>>, Executor>> callbacks = Maps.newConcurrentMap();
@Override
public void registerSegmentCallback(
final Executor exec,
final ServerView.SegmentCallback callback,
final Predicate<Pair<DruidServerMetadata, DataSegment>> filter
)
{
callbacks.put(callback, Pair.of(filter, exec));
}
@Override
public void registerServerCallback(Executor exec, ServerView.ServerCallback callback)
{
// No-op
}
@Override
public ServerView.CallbackAction segmentAdded(
@ -40,8 +58,8 @@ public class TestServerView implements ServerView.SegmentCallback
final DataSegment segment
)
{
for (final Map.Entry<ServerView.SegmentCallback, Pair<Predicate<DataSegment>, Executor>> entry : callbacks.entrySet()) {
if (entry.getValue().lhs.apply(segment)) {
for (final Map.Entry<ServerView.SegmentCallback, Pair<Predicate<Pair<DruidServerMetadata, DataSegment>>, Executor>> entry : callbacks.entrySet()) {
if (entry.getValue().lhs.apply(Pair.of(server,segment))) {
entry.getValue().rhs.execute(
new Runnable()
{
@ -64,8 +82,8 @@ public class TestServerView implements ServerView.SegmentCallback
final DataSegment segment
)
{
for (final Map.Entry<ServerView.SegmentCallback, Pair<Predicate<DataSegment>, Executor>> entry : callbacks.entrySet()) {
if (entry.getValue().lhs.apply(segment)) {
for (final Map.Entry<ServerView.SegmentCallback, Pair<Predicate<Pair<DruidServerMetadata, DataSegment>>, Executor>> entry : callbacks.entrySet()) {
if (entry.getValue().lhs.apply(Pair.of(server, segment))) {
entry.getValue().rhs.execute(
new Runnable()
{
@ -85,7 +103,7 @@ public class TestServerView implements ServerView.SegmentCallback
@Override
public ServerView.CallbackAction segmentViewInitialized()
{
for (final Map.Entry<ServerView.SegmentCallback, Pair<Predicate<DataSegment>, Executor>> entry : callbacks.entrySet()) {
for (final Map.Entry<ServerView.SegmentCallback, Pair<Predicate<Pair<DruidServerMetadata, DataSegment>>, Executor>> entry : callbacks.entrySet()) {
entry.getValue().rhs.execute(
new Runnable()
{
@ -100,4 +118,16 @@ public class TestServerView implements ServerView.SegmentCallback
return ServerView.CallbackAction.CONTINUE;
}
@Override
public DruidServer getInventoryValue(String string)
{
return null;
}
@Override
public Iterable<DruidServer> getInventory()
{
return null;
}
}

View File

@ -21,33 +21,46 @@ package io.druid.client;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.base.Predicates;
import com.google.common.collect.Iterables;
import com.google.common.collect.MapMaker;
import com.google.common.collect.Sets;
import com.google.inject.Inject;
import com.metamx.common.ISE;
import com.metamx.common.Pair;
import com.metamx.emitter.EmittingLogger;
import io.druid.guice.ManageLifecycle;
import io.druid.server.coordination.DruidServerMetadata;
import io.druid.server.initialization.ZkPathsConfig;
import io.druid.timeline.DataSegment;
import org.apache.curator.framework.CuratorFramework;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
/**
*/
@ManageLifecycle
public class BatchServerInventoryView extends ServerInventoryView<Set<DataSegment>>
implements FilteredServerInventoryView
{
private static final EmittingLogger log = new EmittingLogger(BatchServerInventoryView.class);
final private ConcurrentMap<String, Set<DataSegment>> zNodes = new MapMaker().makeMap();
final private ConcurrentMap<SegmentCallback, Predicate<Pair<DruidServerMetadata, DataSegment>>> segmentPredicates = new MapMaker()
.makeMap();
final private Predicate<Pair<DruidServerMetadata, DataSegment>> defaultFilter;
@Inject
public BatchServerInventoryView(
final ZkPathsConfig zkPaths,
final CuratorFramework curator,
final ObjectMapper jsonMapper
final ObjectMapper jsonMapper,
final Predicate<Pair<DruidServerMetadata, DataSegment>> defaultFilter
)
{
super(
@ -60,6 +73,8 @@ public class BatchServerInventoryView extends ServerInventoryView<Set<DataSegmen
{
}
);
this.defaultFilter = Preconditions.checkNotNull(defaultFilter);
}
@Override
@ -69,30 +84,70 @@ public class BatchServerInventoryView extends ServerInventoryView<Set<DataSegmen
final Set<DataSegment> inventory
)
{
zNodes.put(inventoryKey, inventory);
for (DataSegment segment : inventory) {
Set<DataSegment> filteredInventory = filterInventory(container, inventory);
zNodes.put(inventoryKey, filteredInventory);
for (DataSegment segment : filteredInventory) {
addSingleInventory(container, segment);
}
return container;
}
private Set<DataSegment> filterInventory(final DruidServer container, Set<DataSegment> inventory)
{
Predicate<Pair<DruidServerMetadata, DataSegment>> predicate = Predicates.or(
defaultFilter,
Predicates.or(segmentPredicates.values())
);
// make a copy of the set and not just a filtered view, in order to not keep all the segment data in memory
Set<DataSegment> filteredInventory = Sets.newHashSet(Iterables.transform(
Iterables.filter(
Iterables.transform(
inventory,
new Function<DataSegment, Pair<DruidServerMetadata, DataSegment>>()
{
@Override
public Pair<DruidServerMetadata, DataSegment> apply(DataSegment input)
{
return Pair.of(container.getMetadata(), input);
}
}
),
predicate
),
new Function<Pair<DruidServerMetadata, DataSegment>, DataSegment>()
{
@Override
public DataSegment apply(
Pair<DruidServerMetadata, DataSegment> input
)
{
return input.rhs;
}
}
));
return filteredInventory;
}
@Override
protected DruidServer updateInnerInventory(
DruidServer container, String inventoryKey, Set<DataSegment> inventory
)
{
Set<DataSegment> filteredInventory = filterInventory(container, inventory);
Set<DataSegment> existing = zNodes.get(inventoryKey);
if (existing == null) {
throw new ISE("Trying to update an inventoryKey[%s] that didn't exist?!", inventoryKey);
}
for (DataSegment segment : Sets.difference(inventory, existing)) {
for (DataSegment segment : Sets.difference(filteredInventory, existing)) {
addSingleInventory(container, segment);
}
for (DataSegment segment : Sets.difference(existing, inventory)) {
for (DataSegment segment : Sets.difference(existing, filteredInventory)) {
removeSingleInventory(container, segment.getIdentifier());
}
zNodes.put(inventoryKey, inventory);
zNodes.put(inventoryKey, filteredInventory);
return container;
}
@ -113,4 +168,24 @@ public class BatchServerInventoryView extends ServerInventoryView<Set<DataSegmen
}
return container;
}
public void registerSegmentCallback(
final Executor exec,
final SegmentCallback callback,
final Predicate<Pair<DruidServerMetadata, DataSegment>> filter
)
{
SegmentCallback filteringCallback = new SingleServerInventoryView.FilteringSegmentCallback(callback, filter);
segmentPredicates.put(filteringCallback, filter);
registerSegmentCallback(
exec,
filteringCallback
);
}
@Override
protected void segmentCallbackRemoved(SegmentCallback callback)
{
segmentPredicates.remove(callback);
}
}

View File

@ -22,6 +22,8 @@ package io.druid.client;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Predicates;
import com.metamx.common.Pair;
import io.druid.server.coordination.DruidServerMetadata;
import io.druid.server.initialization.ZkPathsConfig;
import io.druid.timeline.DataSegment;
import org.apache.curator.framework.CuratorFramework;
@ -47,6 +49,11 @@ public class BatchServerInventoryViewProvider implements ServerInventoryViewProv
@Override
public BatchServerInventoryView get()
{
return new BatchServerInventoryView(zkPaths, curator, jsonMapper);
return new BatchServerInventoryView(
zkPaths,
curator,
jsonMapper,
Predicates.<Pair<DruidServerMetadata, DataSegment>>alwaysTrue()
);
}
}

View File

@ -30,8 +30,16 @@ public class BrokerSegmentWatcherConfig
@JsonProperty
private Set<String> watchedTiers = null;
@JsonProperty
private Set<String> watchedDataSources = null;
public Set<String> getWatchedTiers()
{
return watchedTiers;
}
public Set<String> getWatchedDataSources()
{
return watchedDataSources;
}
}

View File

@ -20,10 +20,12 @@
package io.druid.client;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import com.google.common.collect.Ordering;
import com.google.inject.Inject;
import com.metamx.common.Pair;
import com.metamx.common.logger.Logger;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.http.client.HttpClient;
@ -42,6 +44,7 @@ import io.druid.timeline.DataSegment;
import io.druid.timeline.VersionedIntervalTimeline;
import io.druid.timeline.partition.PartitionChunk;
import javax.annotation.Nullable;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
@ -64,10 +67,10 @@ public class BrokerServerView implements TimelineServerView
private final QueryWatcher queryWatcher;
private final ObjectMapper smileMapper;
private final HttpClient httpClient;
private final ServerInventoryView baseView;
private final FilteredServerInventoryView baseView;
private final TierSelectorStrategy tierSelectorStrategy;
private final ServiceEmitter emitter;
private final BrokerSegmentWatcherConfig segmentWatcherConfig;
private final Predicate<Pair<DruidServerMetadata, DataSegment>> segmentFilter;
private volatile boolean initialized = false;
@ -77,10 +80,10 @@ public class BrokerServerView implements TimelineServerView
QueryWatcher queryWatcher,
@Smile ObjectMapper smileMapper,
@Client HttpClient httpClient,
ServerInventoryView baseView,
FilteredServerInventoryView baseView,
TierSelectorStrategy tierSelectorStrategy,
ServiceEmitter emitter,
BrokerSegmentWatcherConfig segmentWatcherConfig
final BrokerSegmentWatcherConfig segmentWatcherConfig
)
{
this.warehouse = warehouse;
@ -90,12 +93,30 @@ public class BrokerServerView implements TimelineServerView
this.baseView = baseView;
this.tierSelectorStrategy = tierSelectorStrategy;
this.emitter = emitter;
this.segmentWatcherConfig = segmentWatcherConfig;
this.clients = Maps.newConcurrentMap();
this.selectors = Maps.newHashMap();
this.timelines = Maps.newHashMap();
this.segmentFilter = new Predicate<Pair<DruidServerMetadata, DataSegment>>()
{
@Override
public boolean apply(
Pair<DruidServerMetadata, DataSegment> input
)
{
if (segmentWatcherConfig.getWatchedTiers() != null
&& !segmentWatcherConfig.getWatchedTiers().contains(input.lhs.getTier())) {
return false;
}
if (segmentWatcherConfig.getWatchedDataSources() != null
&& !segmentWatcherConfig.getWatchedDataSources().contains(input.rhs.getDataSource())) {
return false;
}
return true;
}
};
ExecutorService exec = Execs.singleThreaded("BrokerServerView-%s");
baseView.registerSegmentCallback(
exec,
@ -121,7 +142,8 @@ public class BrokerServerView implements TimelineServerView
initialized = true;
return ServerView.CallbackAction.CONTINUE;
}
}
},
segmentFilter
);
baseView.registerServerCallback(
@ -191,10 +213,7 @@ public class BrokerServerView implements TimelineServerView
private void serverAddedSegment(final DruidServerMetadata server, final DataSegment segment)
{
if (segmentWatcherConfig.getWatchedTiers() != null && !segmentWatcherConfig.getWatchedTiers()
.contains(server.getTier())) {
return;
}
String segmentId = segment.getIdentifier();
synchronized (lock) {
@ -224,10 +243,6 @@ public class BrokerServerView implements TimelineServerView
private void serverRemovedSegment(DruidServerMetadata server, DataSegment segment)
{
if (segmentWatcherConfig.getWatchedTiers() != null && !segmentWatcherConfig.getWatchedTiers()
.contains(server.getTier())) {
return;
}
String segmentId = segment.getIdentifier();
final ServerSelector selector;
@ -301,6 +316,6 @@ public class BrokerServerView implements TimelineServerView
@Override
public void registerSegmentCallback(Executor exec, SegmentCallback callback)
{
baseView.registerSegmentCallback(exec, callback);
baseView.registerSegmentCallback(exec, callback, segmentFilter);
}
}

View File

@ -0,0 +1,57 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.client;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Predicates;
import com.metamx.common.Pair;
import io.druid.server.coordination.DruidServerMetadata;
import io.druid.server.initialization.ZkPathsConfig;
import io.druid.timeline.DataSegment;
import org.apache.curator.framework.CuratorFramework;
import javax.validation.constraints.NotNull;
public class FilteredBatchServerInventoryViewProvider implements FilteredServerInventoryViewProvider
{
@JacksonInject
@NotNull
private ZkPathsConfig zkPaths = null;
@JacksonInject
@NotNull
private CuratorFramework curator = null;
@JacksonInject
@NotNull
private ObjectMapper jsonMapper = null;
@Override
public BatchServerInventoryView get()
{
return new BatchServerInventoryView(
zkPaths,
curator,
jsonMapper,
Predicates.<Pair<DruidServerMetadata, DataSegment>>alwaysFalse()
);
}
}

View File

@ -0,0 +1,36 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.client;
import com.google.common.base.Predicate;
import com.metamx.common.Pair;
import io.druid.server.coordination.DruidServerMetadata;
import io.druid.timeline.DataSegment;
import java.util.concurrent.Executor;
public interface FilteredServerInventoryView extends InventoryView
{
public void registerSegmentCallback(
Executor exec, ServerView.SegmentCallback callback, Predicate<Pair<DruidServerMetadata, DataSegment>> filter
);
public void registerServerCallback(Executor exec, ServerView.ServerCallback callback);
}

View File

@ -0,0 +1,34 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.client;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.google.inject.Provider;
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = FilteredBatchServerInventoryViewProvider.class)
@JsonSubTypes(value = {
@JsonSubTypes.Type(name = "legacy", value = FilteredSingleServerInventoryViewProvider.class),
@JsonSubTypes.Type(name = "batch", value = FilteredBatchServerInventoryViewProvider.class)
})
public interface FilteredServerInventoryViewProvider extends Provider<FilteredServerInventoryView>
{
}

View File

@ -0,0 +1,52 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.client;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Predicates;
import com.metamx.common.Pair;
import io.druid.server.coordination.DruidServerMetadata;
import io.druid.server.initialization.ZkPathsConfig;
import io.druid.timeline.DataSegment;
import org.apache.curator.framework.CuratorFramework;
import javax.validation.constraints.NotNull;
public class FilteredSingleServerInventoryViewProvider implements FilteredServerInventoryViewProvider
{
@JacksonInject
@NotNull
private ZkPathsConfig zkPaths = null;
@JacksonInject
@NotNull
private CuratorFramework curator = null;
@JacksonInject
@NotNull
private ObjectMapper jsonMapper = null;
@Override
public SingleServerInventoryView get()
{
return new SingleServerInventoryView(zkPaths, curator, jsonMapper, Predicates.<Pair<DruidServerMetadata, DataSegment>>alwaysFalse());
}
}

View File

@ -240,6 +240,7 @@ public abstract class ServerInventoryView<InventoryType> implements ServerView,
public void run()
{
if (CallbackAction.UNREGISTER == fn.apply(entry.getKey())) {
segmentCallbackRemoved(entry.getKey());
segmentCallbacks.remove(entry.getKey());
}
}
@ -342,4 +343,6 @@ public abstract class ServerInventoryView<InventoryType> implements ServerView,
final DruidServer container,
String inventoryKey
);
protected abstract void segmentCallbackRemoved(SegmentCallback callback);
}

View File

@ -22,6 +22,8 @@ package io.druid.client;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Predicates;
import com.metamx.common.Pair;
import io.druid.server.coordination.DruidServerMetadata;
import io.druid.server.initialization.ZkPathsConfig;
import io.druid.timeline.DataSegment;
import org.apache.curator.framework.CuratorFramework;
@ -47,6 +49,11 @@ public class SingleServerInventoryProvider implements ServerInventoryViewProvide
@Override
public ServerInventoryView get()
{
return new SingleServerInventoryView(zkPaths, curator, jsonMapper);
return new SingleServerInventoryView(
zkPaths,
curator,
jsonMapper,
Predicates.<Pair<DruidServerMetadata, DataSegment>>alwaysTrue()
);
}
}

View File

@ -21,25 +21,39 @@ package io.druid.client;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.base.Predicates;
import com.google.common.collect.MapMaker;
import com.google.inject.Inject;
import com.metamx.common.Pair;
import com.metamx.emitter.EmittingLogger;
import io.druid.guice.ManageLifecycle;
import io.druid.server.coordination.DruidServerMetadata;
import io.druid.server.initialization.ZkPathsConfig;
import io.druid.timeline.DataSegment;
import org.apache.curator.framework.CuratorFramework;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
/**
*/
@ManageLifecycle
public class SingleServerInventoryView extends ServerInventoryView<DataSegment>
public class SingleServerInventoryView extends ServerInventoryView<DataSegment> implements FilteredServerInventoryView
{
private static final EmittingLogger log = new EmittingLogger(SingleServerInventoryView.class);
final private ConcurrentMap<SegmentCallback, Predicate<Pair<DruidServerMetadata, DataSegment>>> segmentPredicates = new MapMaker()
.makeMap();
private final Predicate<Pair<DruidServerMetadata, DataSegment>> defaultFilter;
@Inject
public SingleServerInventoryView(
final ZkPathsConfig zkPaths,
final CuratorFramework curator,
final ObjectMapper jsonMapper
final ObjectMapper jsonMapper,
final Predicate<Pair<DruidServerMetadata, DataSegment>> defaultFilter
)
{
super(
@ -52,6 +66,9 @@ public class SingleServerInventoryView extends ServerInventoryView<DataSegment>
{
}
);
Preconditions.checkNotNull(defaultFilter);
this.defaultFilter = defaultFilter;
}
@Override
@ -59,7 +76,13 @@ public class SingleServerInventoryView extends ServerInventoryView<DataSegment>
DruidServer container, String inventoryKey, DataSegment inventory
)
{
Predicate<Pair<DruidServerMetadata, DataSegment>> predicate = Predicates.or(
defaultFilter,
Predicates.or(segmentPredicates.values())
);
if (predicate.apply(Pair.of(container.getMetadata(), inventory))) {
addSingleInventory(container, inventory);
}
return container;
}
@ -77,4 +100,68 @@ public class SingleServerInventoryView extends ServerInventoryView<DataSegment>
removeSingleInventory(container, inventoryKey);
return container;
}
public void registerSegmentCallback(
final Executor exec,
final SegmentCallback callback,
final Predicate<Pair<DruidServerMetadata, DataSegment>> filter
)
{
SegmentCallback filteringCallback = new SingleServerInventoryView.FilteringSegmentCallback(callback, filter);
segmentPredicates.put(filteringCallback, filter);
registerSegmentCallback(
exec,
filteringCallback
);
}
@Override
protected void segmentCallbackRemoved(SegmentCallback callback)
{
segmentPredicates.remove(callback);
}
static class FilteringSegmentCallback implements SegmentCallback
{
private final SegmentCallback callback;
private final Predicate<Pair<DruidServerMetadata, DataSegment>> filter;
FilteringSegmentCallback(SegmentCallback callback, Predicate<Pair<DruidServerMetadata, DataSegment>> filter)
{
this.callback = callback;
this.filter = filter;
}
@Override
public CallbackAction segmentAdded(DruidServerMetadata server, DataSegment segment)
{
final CallbackAction action;
if (filter.apply(Pair.of(server, segment))) {
action = callback.segmentAdded(server, segment);
} else {
action = CallbackAction.CONTINUE;
}
return action;
}
@Override
public CallbackAction segmentRemoved(DruidServerMetadata server, DataSegment segment)
{
final CallbackAction action;
if (filter.apply(Pair.of(server, segment))) {
action = callback.segmentRemoved(server, segment);
} else {
action = CallbackAction.CONTINUE;
}
return action;
}
@Override
public CallbackAction segmentViewInitialized()
{
return callback.segmentViewInitialized();
}
}
}

View File

@ -21,6 +21,8 @@ package io.druid.guice;
import com.google.inject.Binder;
import com.google.inject.Module;
import io.druid.client.FilteredServerInventoryView;
import io.druid.client.FilteredServerInventoryViewProvider;
import io.druid.client.InventoryView;
import io.druid.client.ServerInventoryView;
import io.druid.client.ServerInventoryViewProvider;
@ -34,8 +36,12 @@ public class ServerViewModule implements Module
public void configure(Binder binder)
{
JsonConfigProvider.bind(binder, "druid.announcer", ServerInventoryViewProvider.class);
JsonConfigProvider.bind(binder, "druid.announcer", FilteredServerInventoryViewProvider.class);
binder.bind(InventoryView.class).to(ServerInventoryView.class);
binder.bind(ServerView.class).to(ServerInventoryView.class);
binder.bind(ServerInventoryView.class).toProvider(ServerInventoryViewProvider.class).in(ManageLifecycle.class);
binder.bind(FilteredServerInventoryView.class)
.toProvider(FilteredServerInventoryViewProvider.class)
.in(ManageLifecycle.class);
}
}

View File

@ -28,6 +28,7 @@ import com.google.inject.Inject;
import com.metamx.common.logger.Logger;
import io.druid.client.DruidDataSource;
import io.druid.client.DruidServer;
import io.druid.client.FilteredServerInventoryView;
import io.druid.client.InventoryView;
import io.druid.client.TimelineServerView;
import io.druid.client.selector.ServerSelector;
@ -63,13 +64,13 @@ public class ClientInfoResource
private static final String KEY_DIMENSIONS = "dimensions";
private static final String KEY_METRICS = "metrics";
private InventoryView serverInventoryView;
private FilteredServerInventoryView serverInventoryView;
private TimelineServerView timelineServerView;
private SegmentMetadataQueryConfig segmentMetadataQueryConfig;
@Inject
public ClientInfoResource(
InventoryView serverInventoryView,
FilteredServerInventoryView serverInventoryView,
TimelineServerView timelineServerView,
SegmentMetadataQueryConfig segmentMetadataQueryConfig
)

View File

@ -47,7 +47,7 @@ public class BrokerSegmentWatcherConfigTest
Assert.assertNull(config.getWatchedTiers());
//non-defaults
json = "{ \"watchedTiers\": [\"t1\", \"t2\"] }";
json = "{ \"watchedTiers\": [\"t1\", \"t2\"], \"watchedDataSources\": [\"ds1\", \"ds2\"] }";
config = MAPPER.readValue(
MAPPER.writeValueAsString(
@ -57,5 +57,7 @@ public class BrokerSegmentWatcherConfigTest
);
Assert.assertEquals(ImmutableSet.of("t1", "t2"), config.getWatchedTiers());
Assert.assertEquals(ImmutableSet.of("ds1", "ds2"), config.getWatchedDataSources());
}
}

View File

@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.smile.SmileFactory;
import com.fasterxml.jackson.dataformat.smile.SmileGenerator;
import com.google.common.base.Function;
import com.google.common.base.Predicates;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
@ -67,7 +68,7 @@ public class BrokerServerViewTest extends CuratorTestBase
private CountDownLatch segmentAddedLatch;
private CountDownLatch segmentRemovedLatch;
private ServerInventoryView baseView;
private BatchServerInventoryView baseView;
private BrokerServerView brokerServerView;
public BrokerServerViewTest()
@ -289,7 +290,8 @@ public class BrokerServerViewTest extends CuratorTestBase
baseView = new BatchServerInventoryView(
zkPathsConfig,
curator,
jsonMapper
jsonMapper,
Predicates.<Pair<DruidServerMetadata, DataSegment>>alwaysTrue()
)
{
@Override

View File

@ -21,6 +21,7 @@ package io.druid.client;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.base.Predicates;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
@ -291,7 +292,8 @@ public class CoordinatorServerViewTest extends CuratorTestBase
baseView = new BatchServerInventoryView(
zkPathsConfig,
curator,
jsonMapper
jsonMapper,
Predicates.<Pair<DruidServerMetadata, DataSegment>>alwaysTrue()
)
{
@Override

View File

@ -20,6 +20,8 @@
package io.druid.client.client;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Predicate;
import com.google.common.base.Predicates;
import com.google.common.base.Stopwatch;
import com.google.common.base.Throwables;
import com.google.common.collect.Iterables;
@ -29,8 +31,10 @@ import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.metamx.common.ISE;
import com.metamx.common.Pair;
import io.druid.client.BatchServerInventoryView;
import io.druid.client.DruidServer;
import io.druid.client.ServerView;
import io.druid.curator.PotentiallyGzippedCompressionProvider;
import io.druid.curator.announcement.Announcer;
import io.druid.jackson.DefaultObjectMapper;
@ -44,6 +48,9 @@ import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.test.TestingCluster;
import org.apache.curator.test.Timing;
import org.easymock.EasyMock;
import org.easymock.IAnswer;
import org.easymock.LogicalOperator;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.junit.After;
@ -53,7 +60,9 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
@ -152,7 +161,8 @@ public class BatchServerInventoryViewTest
}
},
cf,
jsonMapper
jsonMapper,
Predicates.<Pair<DruidServerMetadata, DataSegment>>alwaysTrue()
);
batchServerInventoryView.start();
@ -167,9 +177,16 @@ public class BatchServerInventoryViewTest
}
},
cf,
jsonMapper
)
jsonMapper,
new Predicate<Pair<DruidServerMetadata, DataSegment>>()
{
@Override
public boolean apply(@Nullable Pair<DruidServerMetadata, DataSegment> input)
{
return input.rhs.getInterval().getStart().isBefore(SEGMENT_INTERVAL_START.plusDays(INITIAL_SEGMENTS));
}
}
){
@Override
protected DruidServer addInnerInventory(
DruidServer container, String inventoryKey, Set<DataSegment> inventory
@ -228,6 +245,122 @@ public class BatchServerInventoryViewTest
Assert.assertEquals(testSegments, Sets.newHashSet(server.getSegments().values()));
}
@Test
public void testRunWithFilter() throws Exception
{
segmentAnnouncer.announceSegments(testSegments);
waitForSync(filteredBatchServerInventoryView, testSegments);
DruidServer server = Iterables.get(filteredBatchServerInventoryView.getInventory(), 0);
Set<DataSegment> segments = Sets.newHashSet(server.getSegments().values());
Assert.assertEquals(testSegments, segments);
int prevUpdateCount = inventoryUpdateCounter.get();
// segment outside the range of default filter
DataSegment segment1 = makeSegment(101);
segmentAnnouncer.announceSegment(segment1);
testSegments.add(segment1);
waitForUpdateEvents(prevUpdateCount + 1);
Assert.assertNull(
Iterables.getOnlyElement(filteredBatchServerInventoryView.getInventory())
.getSegment(segment1.getIdentifier())
);
}
@Test
public void testRunWithFilterCallback() throws Exception
{
final CountDownLatch removeCallbackLatch = new CountDownLatch(1);
segmentAnnouncer.announceSegments(testSegments);
waitForSync(filteredBatchServerInventoryView, testSegments);
DruidServer server = Iterables.get(filteredBatchServerInventoryView.getInventory(), 0);
Set<DataSegment> segments = Sets.newHashSet(server.getSegments().values());
Assert.assertEquals(testSegments, segments);
ServerView.SegmentCallback callback = EasyMock.createStrictMock(ServerView.SegmentCallback.class);
Comparator<DataSegment> dataSegmentComparator = new Comparator<DataSegment>()
{
@Override
public int compare(DataSegment o1, DataSegment o2)
{
return o1.getInterval().equals(o2.getInterval()) ? 0 : -1;
}
};
EasyMock
.expect(
callback.segmentAdded(
EasyMock.<DruidServerMetadata>anyObject(),
EasyMock.cmp(makeSegment(INITIAL_SEGMENTS + 2), dataSegmentComparator, LogicalOperator.EQUAL)
)
)
.andReturn(ServerView.CallbackAction.CONTINUE)
.times(1);
EasyMock
.expect(
callback.segmentRemoved(
EasyMock.<DruidServerMetadata>anyObject(),
EasyMock.cmp(makeSegment(INITIAL_SEGMENTS + 2), dataSegmentComparator, LogicalOperator.EQUAL)
)
)
.andAnswer(
new IAnswer<ServerView.CallbackAction>()
{
@Override
public ServerView.CallbackAction answer() throws Throwable
{
removeCallbackLatch.countDown();
return ServerView.CallbackAction.CONTINUE;
}
}
)
.times(1);
EasyMock.replay(callback);
filteredBatchServerInventoryView.registerSegmentCallback(
MoreExecutors.sameThreadExecutor(),
callback,
new Predicate<Pair<DruidServerMetadata, DataSegment>>()
{
@Override
public boolean apply(@Nullable Pair<DruidServerMetadata, DataSegment> input)
{
return input.rhs.getInterval().getStart().equals(SEGMENT_INTERVAL_START.plusDays(INITIAL_SEGMENTS + 2));
}
}
);
DataSegment segment2 = makeSegment(INITIAL_SEGMENTS + 2);
segmentAnnouncer.announceSegment(segment2);
testSegments.add(segment2);
DataSegment oldSegment = makeSegment(-1);
segmentAnnouncer.announceSegment(oldSegment);
testSegments.add(oldSegment);
segmentAnnouncer.unannounceSegment(oldSegment);
testSegments.remove(oldSegment);
waitForSync(filteredBatchServerInventoryView, testSegments);
segmentAnnouncer.unannounceSegment(segment2);
testSegments.remove(segment2);
waitForSync(filteredBatchServerInventoryView, testSegments);
timing.forWaiting().awaitLatch(removeCallbackLatch);
EasyMock.verify(callback);
}
private DataSegment makeSegment(int offset)
{
return DataSegment.builder()
@ -264,11 +397,7 @@ public class BatchServerInventoryViewTest
while (inventoryUpdateCounter.get() != count) {
Thread.sleep(100);
if (stopwatch.elapsed(TimeUnit.MILLISECONDS) > forWaitingTiming.milliseconds()) {
throw new ISE(
"BatchServerInventoryView is not updating counter expected[%d] value[%d]",
count,
inventoryUpdateCounter.get()
);
throw new ISE("BatchServerInventoryView is not updating counter expected[%d] value[%d]", count, inventoryUpdateCounter.get());
}
}
}

View File

@ -41,6 +41,7 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Ordering;
import io.druid.client.DruidServer;
import io.druid.client.FilteredServerInventoryView;
import io.druid.client.InventoryView;
import io.druid.client.TimelineServerView;
import io.druid.client.selector.ServerSelector;
@ -70,7 +71,7 @@ public class ClientInfoResourceTest
private final String dataSource = "test-data-source";
private InventoryView serverInventoryView;
private FilteredServerInventoryView serverInventoryView;
private TimelineServerView timelineServerView;
private ClientInfoResource resource;
@ -130,7 +131,7 @@ public class ClientInfoResourceTest
new NumberedShardSpec(0, 2)
);
serverInventoryView = EasyMock.createMock(InventoryView.class);
serverInventoryView = EasyMock.createMock(FilteredServerInventoryView.class);
EasyMock.expect(serverInventoryView.getInventory()).andReturn(ImmutableList.of(server)).anyTimes();
timelineServerView = EasyMock.createMock(TimelineServerView.class);
@ -405,7 +406,7 @@ public class ClientInfoResourceTest
}
private ClientInfoResource getResourceTestHelper(
InventoryView serverInventoryView,
FilteredServerInventoryView serverInventoryView,
TimelineServerView timelineServerView,
SegmentMetadataQueryConfig segmentMetadataQueryConfig
)