diff --git a/server/src/main/java/io/druid/client/BatchServerInventoryView.java b/server/src/main/java/io/druid/client/BatchServerInventoryView.java index 3a1bf636aa2..ebb808b3d73 100644 --- a/server/src/main/java/io/druid/client/BatchServerInventoryView.java +++ b/server/src/main/java/io/druid/client/BatchServerInventoryView.java @@ -21,33 +21,42 @@ package io.druid.client; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Predicate; +import com.google.common.base.Predicates; import com.google.common.collect.MapMaker; import com.google.common.collect.Sets; import com.google.inject.Inject; import com.metamx.common.ISE; import com.metamx.emitter.EmittingLogger; import io.druid.guice.ManageLifecycle; +import io.druid.segment.filter.Filters; +import io.druid.server.coordination.DruidServerMetadata; import io.druid.server.initialization.ZkPathsConfig; import io.druid.timeline.DataSegment; import org.apache.curator.framework.CuratorFramework; +import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Executor; /** */ @ManageLifecycle -public class BatchServerInventoryView extends ServerInventoryView> +public class BatchServerInventoryView extends ServerInventoryView> implements FilteredServerView { private static final EmittingLogger log = new EmittingLogger(BatchServerInventoryView.class); - final ConcurrentMap> zNodes = new MapMaker().makeMap(); + final private ConcurrentMap> zNodes = new MapMaker().makeMap(); + final private Map> segmentPredicates = new MapMaker().makeMap(); + final private Predicate defaultFilter; @Inject public BatchServerInventoryView( final ZkPathsConfig zkPaths, final CuratorFramework curator, - final ObjectMapper jsonMapper + final ObjectMapper jsonMapper, + final Predicate defaultFilter ) { super( @@ -58,6 +67,12 @@ public class BatchServerInventoryView extends ServerInventoryView>(){} ); + + if(defaultFilter != null) { + this.defaultFilter = defaultFilter; + } else { + this.defaultFilter = Predicates.alwaysTrue(); + } } @Override @@ -67,8 +82,11 @@ public class BatchServerInventoryView extends ServerInventoryView inventory ) { - zNodes.put(inventoryKey, inventory); - for (DataSegment segment : inventory) { + Predicate predicate = Predicates.or(defaultFilter, Predicates.or(segmentPredicates.values())); + Set filteredInventory = Sets.filter(inventory, predicate); + + zNodes.put(inventoryKey, filteredInventory); + for (DataSegment segment : filteredInventory) { addSingleInventory(container, segment); } return container; @@ -79,18 +97,21 @@ public class BatchServerInventoryView extends ServerInventoryView inventory ) { + Predicate predicate = Predicates.or(defaultFilter, Predicates.or(segmentPredicates.values())); + Set filteredInventory = Sets.filter(inventory, predicate); + Set existing = zNodes.get(inventoryKey); if (existing == null) { throw new ISE("Trying to update an inventoryKey[%s] that didn't exist?!", inventoryKey); } - for (DataSegment segment : Sets.difference(inventory, existing)) { + for (DataSegment segment : Sets.difference(filteredInventory, existing)) { addSingleInventory(container, segment); } - for (DataSegment segment : Sets.difference(existing, inventory)) { + for (DataSegment segment : Sets.difference(existing, filteredInventory)) { removeSingleInventory(container, segment.getIdentifier()); } - zNodes.put(inventoryKey, inventory); + zNodes.put(inventoryKey, filteredInventory); return container; } @@ -111,4 +132,40 @@ public class BatchServerInventoryView extends ServerInventoryView filter + ) + { + segmentPredicates.put(callback, filter); + registerSegmentCallback( + exec, new SegmentCallback() + { + @Override + public CallbackAction segmentAdded( + DruidServerMetadata server, DataSegment segment + ) + { + final CallbackAction action = callback.segmentAdded(server, segment); + if (action.equals(CallbackAction.UNREGISTER)) { + segmentPredicates.remove(callback); + } + return action; + } + + @Override + public CallbackAction segmentRemoved( + DruidServerMetadata server, DataSegment segment + ) + { + final CallbackAction action = callback.segmentRemoved(server, segment); + if (action.equals(CallbackAction.UNREGISTER)) { + segmentPredicates.remove(callback); + } + return action; + } + } + ); + } } diff --git a/server/src/main/java/io/druid/client/BatchServerInventoryViewProvider.java b/server/src/main/java/io/druid/client/BatchServerInventoryViewProvider.java index 609a47754aa..dd9304f8a4a 100644 --- a/server/src/main/java/io/druid/client/BatchServerInventoryViewProvider.java +++ b/server/src/main/java/io/druid/client/BatchServerInventoryViewProvider.java @@ -43,8 +43,8 @@ public class BatchServerInventoryViewProvider implements ServerInventoryViewProv private ObjectMapper jsonMapper = null; @Override - public ServerInventoryView get() + public BatchServerInventoryView get() { - return new BatchServerInventoryView(zkPaths, curator, jsonMapper); + return new BatchServerInventoryView(zkPaths, curator, jsonMapper, null); } } diff --git a/server/src/main/java/io/druid/client/BrokerServerView.java b/server/src/main/java/io/druid/client/BrokerServerView.java index 3495692d40d..57663154156 100644 --- a/server/src/main/java/io/druid/client/BrokerServerView.java +++ b/server/src/main/java/io/druid/client/BrokerServerView.java @@ -36,6 +36,7 @@ import io.druid.query.QueryDataSource; import io.druid.query.QueryRunner; import io.druid.query.QueryToolChestWarehouse; import io.druid.query.TableDataSource; +import io.druid.server.coordination.DruidServerMetadata; import io.druid.timeline.DataSegment; import io.druid.timeline.VersionedIntervalTimeline; import io.druid.timeline.partition.PartitionChunk; @@ -61,7 +62,7 @@ public class BrokerServerView implements TimelineServerView private final QueryToolChestWarehouse warehouse; private final ObjectMapper smileMapper; private final HttpClient httpClient; - private final ServerView baseView; + private final ServerInventoryView baseView; private final TierSelectorStrategy tierSelectorStrategy; @Inject @@ -69,7 +70,7 @@ public class BrokerServerView implements TimelineServerView QueryToolChestWarehouse warehouse, ObjectMapper smileMapper, @Client HttpClient httpClient, - ServerView baseView, + ServerInventoryView baseView, TierSelectorStrategy tierSelectorStrategy ) { @@ -89,14 +90,14 @@ public class BrokerServerView implements TimelineServerView new ServerView.SegmentCallback() { @Override - public ServerView.CallbackAction segmentAdded(DruidServer server, DataSegment segment) + public ServerView.CallbackAction segmentAdded(DruidServerMetadata server, DataSegment segment) { serverAddedSegment(server, segment); return ServerView.CallbackAction.CONTINUE; } @Override - public ServerView.CallbackAction segmentRemoved(final DruidServer server, DataSegment segment) + public ServerView.CallbackAction segmentRemoved(final DruidServerMetadata server, DataSegment segment) { serverRemovedSegment(server, segment); return ServerView.CallbackAction.CONTINUE; @@ -159,12 +160,12 @@ public class BrokerServerView implements TimelineServerView private QueryableDruidServer removeServer(DruidServer server) { for (DataSegment segment : server.getSegments().values()) { - serverRemovedSegment(server, segment); + serverRemovedSegment(server.getMetadata(), segment); } return clients.remove(server.getName()); } - private void serverAddedSegment(final DruidServer server, final DataSegment segment) + private void serverAddedSegment(final DruidServerMetadata server, final DataSegment segment) { String segmentId = segment.getIdentifier(); synchronized (lock) { @@ -176,7 +177,7 @@ public class BrokerServerView implements TimelineServerView VersionedIntervalTimeline timeline = timelines.get(segment.getDataSource()); if (timeline == null) { - timeline = new VersionedIntervalTimeline(Ordering.natural()); + timeline = new VersionedIntervalTimeline<>(Ordering.natural()); timelines.put(segment.getDataSource(), timeline); } @@ -186,13 +187,13 @@ public class BrokerServerView implements TimelineServerView QueryableDruidServer queryableDruidServer = clients.get(server.getName()); if (queryableDruidServer == null) { - queryableDruidServer = addServer(server); + queryableDruidServer = addServer(baseView.getInventoryValue(server.getName())); } selector.addServer(queryableDruidServer); } } - private void serverRemovedSegment(DruidServer server, DataSegment segment) + private void serverRemovedSegment(DruidServerMetadata server, DataSegment segment) { String segmentId = segment.getIdentifier(); final ServerSelector selector; diff --git a/server/src/main/java/io/druid/client/CachingClusteredClient.java b/server/src/main/java/io/druid/client/CachingClusteredClient.java index ff374c3c4b1..cf5f09228f6 100644 --- a/server/src/main/java/io/druid/client/CachingClusteredClient.java +++ b/server/src/main/java/io/druid/client/CachingClusteredClient.java @@ -55,6 +55,7 @@ import io.druid.query.Result; import io.druid.query.SegmentDescriptor; import io.druid.query.aggregation.MetricManipulatorFns; import io.druid.query.spec.MultipleSpecificSegmentSpec; +import io.druid.server.coordination.DruidServerMetadata; import io.druid.timeline.DataSegment; import io.druid.timeline.TimelineObjectHolder; import io.druid.timeline.VersionedIntervalTimeline; @@ -104,7 +105,7 @@ public class CachingClusteredClient implements QueryRunner new ServerView.BaseSegmentCallback() { @Override - public ServerView.CallbackAction segmentRemoved(DruidServer server, DataSegment segment) + public ServerView.CallbackAction segmentRemoved(DruidServerMetadata server, DataSegment segment) { CachingClusteredClient.this.cache.close(segment.getIdentifier()); return ServerView.CallbackAction.CONTINUE; diff --git a/server/src/main/java/io/druid/client/DruidServer.java b/server/src/main/java/io/druid/client/DruidServer.java index 22e48a2116a..b96c82361bb 100644 --- a/server/src/main/java/io/druid/client/DruidServer.java +++ b/server/src/main/java/io/druid/client/DruidServer.java @@ -125,6 +125,11 @@ public class DruidServer implements Comparable return metadata.getTier(); } + public boolean isAssignable() + { + return metadata.isAssignable(); + } + @JsonProperty public int getPriority() { @@ -138,11 +143,6 @@ public class DruidServer implements Comparable return Collections.unmodifiableMap(segments); } - public boolean isAssignable() - { - return getType().equalsIgnoreCase("historical") || getType().equalsIgnoreCase("bridge"); - } - public DataSegment getSegment(String segmentName) { return segments.get(segmentName); diff --git a/server/src/main/java/io/druid/client/FilteredBatchServerViewProvider.java b/server/src/main/java/io/druid/client/FilteredBatchServerViewProvider.java new file mode 100644 index 00000000000..74b35d92d31 --- /dev/null +++ b/server/src/main/java/io/druid/client/FilteredBatchServerViewProvider.java @@ -0,0 +1,50 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013, 2014 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.client; + +import com.fasterxml.jackson.annotation.JacksonInject; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Predicates; +import io.druid.server.initialization.ZkPathsConfig; +import io.druid.timeline.DataSegment; +import org.apache.curator.framework.CuratorFramework; + +import javax.validation.constraints.NotNull; + +public class FilteredBatchServerViewProvider implements FilteredServerViewProvider +{ + @JacksonInject + @NotNull + private ZkPathsConfig zkPaths = null; + + @JacksonInject + @NotNull + private CuratorFramework curator = null; + + @JacksonInject + @NotNull + private ObjectMapper jsonMapper = null; + + @Override + public BatchServerInventoryView get() + { + return new BatchServerInventoryView(zkPaths, curator, jsonMapper, Predicates.alwaysFalse()); + } +} diff --git a/server/src/main/java/io/druid/client/FilteredServerView.java b/server/src/main/java/io/druid/client/FilteredServerView.java new file mode 100644 index 00000000000..91d4e6111f7 --- /dev/null +++ b/server/src/main/java/io/druid/client/FilteredServerView.java @@ -0,0 +1,32 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013, 2014 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.client; + +import com.google.common.base.Predicate; +import io.druid.timeline.DataSegment; + +import java.util.concurrent.Executor; + +public interface FilteredServerView +{ + public void registerSegmentCallback( + Executor exec, ServerView.SegmentCallback callback, Predicate filter + ); +} diff --git a/server/src/main/java/io/druid/client/FilteredServerViewProvider.java b/server/src/main/java/io/druid/client/FilteredServerViewProvider.java new file mode 100644 index 00000000000..f9b20106ba2 --- /dev/null +++ b/server/src/main/java/io/druid/client/FilteredServerViewProvider.java @@ -0,0 +1,34 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013, 2014 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.client; + + +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import com.google.inject.Provider; + +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = SingleServerInventoryProvider.class) +@JsonSubTypes(value = { + @JsonSubTypes.Type(name = "legacy", value = FilteredSingleServerViewProvider.class), + @JsonSubTypes.Type(name = "batch", value = FilteredBatchServerViewProvider.class) +}) +public interface FilteredServerViewProvider extends Provider +{ +} diff --git a/server/src/main/java/io/druid/client/FilteredSingleServerViewProvider.java b/server/src/main/java/io/druid/client/FilteredSingleServerViewProvider.java new file mode 100644 index 00000000000..fdd03142190 --- /dev/null +++ b/server/src/main/java/io/druid/client/FilteredSingleServerViewProvider.java @@ -0,0 +1,50 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013, 2014 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.client; + +import com.fasterxml.jackson.annotation.JacksonInject; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Predicates; +import io.druid.server.initialization.ZkPathsConfig; +import io.druid.timeline.DataSegment; +import org.apache.curator.framework.CuratorFramework; + +import javax.validation.constraints.NotNull; + +public class FilteredSingleServerViewProvider implements FilteredServerViewProvider +{ + @JacksonInject + @NotNull + private ZkPathsConfig zkPaths = null; + + @JacksonInject + @NotNull + private CuratorFramework curator = null; + + @JacksonInject + @NotNull + private ObjectMapper jsonMapper = null; + + @Override + public SingleServerInventoryView get() + { + return new SingleServerInventoryView(zkPaths, curator, jsonMapper, Predicates.alwaysFalse()); + } +} diff --git a/server/src/main/java/io/druid/client/ServerInventoryView.java b/server/src/main/java/io/druid/client/ServerInventoryView.java index f9187828ac3..f19f166b97c 100644 --- a/server/src/main/java/io/druid/client/ServerInventoryView.java +++ b/server/src/main/java/io/druid/client/ServerInventoryView.java @@ -290,7 +290,7 @@ public abstract class ServerInventoryView implements ServerView, @Override public CallbackAction apply(SegmentCallback input) { - return input.segmentAdded(container, inventory); + return input.segmentAdded(container.getMetadata(), inventory); } } ); @@ -319,7 +319,7 @@ public abstract class ServerInventoryView implements ServerView, @Override public CallbackAction apply(SegmentCallback input) { - return input.segmentRemoved(container, segment); + return input.segmentRemoved(container.getMetadata(), segment); } } ); diff --git a/server/src/main/java/io/druid/client/ServerView.java b/server/src/main/java/io/druid/client/ServerView.java index cbd740269b8..67ad13efab4 100644 --- a/server/src/main/java/io/druid/client/ServerView.java +++ b/server/src/main/java/io/druid/client/ServerView.java @@ -19,6 +19,7 @@ package io.druid.client; +import io.druid.server.coordination.DruidServerMetadata; import io.druid.timeline.DataSegment; import java.util.concurrent.Executor; @@ -72,7 +73,7 @@ public interface ServerView * @return UNREGISTER if the callback has completed its work and should be unregistered. CONTINUE if the callback * should remain registered. */ - public CallbackAction segmentAdded(DruidServer server, DataSegment segment); + public CallbackAction segmentAdded(DruidServerMetadata server, DataSegment segment); /** * Called when a segment is removed from a server. @@ -89,19 +90,19 @@ public interface ServerView * @return UNREGISTER if the callback has completed its work and should be unregistered. CONTINUE if the callback * should remain registered. */ - public CallbackAction segmentRemoved(DruidServer server, DataSegment segment); + public CallbackAction segmentRemoved(DruidServerMetadata server, DataSegment segment); } public static abstract class BaseSegmentCallback implements SegmentCallback { @Override - public CallbackAction segmentAdded(DruidServer server, DataSegment segment) + public CallbackAction segmentAdded(DruidServerMetadata server, DataSegment segment) { return CallbackAction.CONTINUE; } @Override - public CallbackAction segmentRemoved(DruidServer server, DataSegment segment) + public CallbackAction segmentRemoved(DruidServerMetadata server, DataSegment segment) { return CallbackAction.CONTINUE; } diff --git a/server/src/main/java/io/druid/client/SingleServerInventoryProvider.java b/server/src/main/java/io/druid/client/SingleServerInventoryProvider.java index 7ebce938791..214a2331b7c 100644 --- a/server/src/main/java/io/druid/client/SingleServerInventoryProvider.java +++ b/server/src/main/java/io/druid/client/SingleServerInventoryProvider.java @@ -45,6 +45,6 @@ public class SingleServerInventoryProvider implements ServerInventoryViewProvide @Override public ServerInventoryView get() { - return new SingleServerInventoryView(zkPaths, curator, jsonMapper); + return new SingleServerInventoryView(zkPaths, curator, jsonMapper, null); } } diff --git a/server/src/main/java/io/druid/client/SingleServerInventoryView.java b/server/src/main/java/io/druid/client/SingleServerInventoryView.java index 801b78c5d2a..084f24d7437 100644 --- a/server/src/main/java/io/druid/client/SingleServerInventoryView.java +++ b/server/src/main/java/io/druid/client/SingleServerInventoryView.java @@ -21,25 +21,38 @@ package io.druid.client; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Predicate; +import com.google.common.base.Predicates; +import com.google.common.collect.MapMaker; +import com.google.common.collect.Sets; import com.google.inject.Inject; import com.metamx.emitter.EmittingLogger; import io.druid.guice.ManageLifecycle; +import io.druid.server.coordination.DruidServerMetadata; import io.druid.server.initialization.ZkPathsConfig; import io.druid.timeline.DataSegment; import org.apache.curator.framework.CuratorFramework; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Executor; + /** */ @ManageLifecycle -public class SingleServerInventoryView extends ServerInventoryView +public class SingleServerInventoryView extends ServerInventoryView implements FilteredServerView { private static final EmittingLogger log = new EmittingLogger(SingleServerInventoryView.class); + final private Map> segmentPredicates = new MapMaker().makeMap(); + private final Predicate defaultFilter; + @Inject public SingleServerInventoryView( final ZkPathsConfig zkPaths, final CuratorFramework curator, - final ObjectMapper jsonMapper + final ObjectMapper jsonMapper, + final Predicate defaultFilter ) { super( @@ -50,14 +63,22 @@ public class SingleServerInventoryView extends ServerInventoryView jsonMapper, new TypeReference(){} ); - } + + if(defaultFilter != null) { + this.defaultFilter = defaultFilter; + } else { + this.defaultFilter = Predicates.alwaysTrue(); + } } @Override protected DruidServer addInnerInventory( DruidServer container, String inventoryKey, DataSegment inventory ) { - addSingleInventory(container, inventory); + Predicate predicate = Predicates.or(defaultFilter, Predicates.or(segmentPredicates.values())); + if(predicate.apply(inventory)) { + addSingleInventory(container, inventory); + } return container; } @@ -75,4 +96,40 @@ public class SingleServerInventoryView extends ServerInventoryView removeSingleInventory(container, inventoryKey); return container; } + + @Override + public void registerSegmentCallback( + Executor exec, final SegmentCallback callback, Predicate filter + ) + { + segmentPredicates.put(callback, filter); + registerSegmentCallback( + exec, new SegmentCallback() + { + @Override + public CallbackAction segmentAdded( + DruidServerMetadata server, DataSegment segment + ) + { + final CallbackAction action = callback.segmentAdded(server, segment); + if (action.equals(CallbackAction.UNREGISTER)) { + segmentPredicates.remove(callback); + } + return action; + } + + @Override + public CallbackAction segmentRemoved( + DruidServerMetadata server, DataSegment segment + ) + { + final CallbackAction action = callback.segmentRemoved(server, segment); + if (action.equals(CallbackAction.UNREGISTER)) { + segmentPredicates.remove(callback); + } + return action; + } + } + ); + } } diff --git a/server/src/main/java/io/druid/guice/ServerViewModule.java b/server/src/main/java/io/druid/guice/ServerViewModule.java index 9ccee0984e0..2fd92bb4415 100644 --- a/server/src/main/java/io/druid/guice/ServerViewModule.java +++ b/server/src/main/java/io/druid/guice/ServerViewModule.java @@ -21,6 +21,8 @@ package io.druid.guice; import com.google.inject.Binder; import com.google.inject.Module; +import io.druid.client.FilteredServerView; +import io.druid.client.FilteredServerViewProvider; import io.druid.client.InventoryView; import io.druid.client.ServerInventoryView; import io.druid.client.ServerInventoryViewProvider; @@ -34,8 +36,10 @@ public class ServerViewModule implements Module public void configure(Binder binder) { JsonConfigProvider.bind(binder, "druid.announcer", ServerInventoryViewProvider.class); + JsonConfigProvider.bind(binder, "druid.announcer", FilteredServerViewProvider.class); binder.bind(InventoryView.class).to(ServerInventoryView.class); binder.bind(ServerView.class).to(ServerInventoryView.class); binder.bind(ServerInventoryView.class).toProvider(ServerInventoryViewProvider.class).in(ManageLifecycle.class); + binder.bind(FilteredServerView.class).toProvider(FilteredServerViewProvider.class).in(ManageLifecycle.class); } } diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java index 65592008336..306350aaaa3 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java @@ -2,6 +2,7 @@ package io.druid.segment.realtime.plumber; import com.google.common.base.Function; import com.google.common.base.Joiner; +import com.google.common.base.Predicate; import com.google.common.base.Throwables; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; @@ -16,7 +17,7 @@ import com.metamx.common.guava.FunctionalIterable; import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.service.ServiceEmitter; import com.metamx.emitter.service.ServiceMetricEvent; -import io.druid.client.DruidServer; +import io.druid.client.FilteredServerView; import io.druid.client.ServerView; import io.druid.common.guava.ThreadRenamingCallable; import io.druid.common.guava.ThreadRenamingRunnable; @@ -43,6 +44,7 @@ import io.druid.segment.realtime.FireDepartmentMetrics; import io.druid.segment.realtime.FireHydrant; import io.druid.segment.realtime.SegmentPublisher; import io.druid.server.coordination.DataSegmentAnnouncer; +import io.druid.server.coordination.DruidServerMetadata; import io.druid.timeline.DataSegment; import io.druid.timeline.TimelineObjectHolder; import io.druid.timeline.VersionedIntervalTimeline; @@ -82,7 +84,7 @@ public class RealtimePlumber implements Plumber private final ExecutorService queryExecutorService; private final DataSegmentPusher dataSegmentPusher; private final SegmentPublisher segmentPublisher; - private final ServerView serverView; + private final FilteredServerView serverView; private final Object handoffCondition = new Object(); private final Map sinks = Maps.newConcurrentMap(); private final VersionedIntervalTimeline sinkTimeline = new VersionedIntervalTimeline( @@ -104,7 +106,7 @@ public class RealtimePlumber implements Plumber ExecutorService queryExecutorService, DataSegmentPusher dataSegmentPusher, SegmentPublisher segmentPublisher, - ServerView serverView + FilteredServerView serverView ) { this.schema = schema; @@ -731,7 +733,7 @@ public class RealtimePlumber implements Plumber new ServerView.BaseSegmentCallback() { @Override - public ServerView.CallbackAction segmentAdded(DruidServer server, DataSegment segment) + public ServerView.CallbackAction segmentAdded(DruidServerMetadata server, DataSegment segment) { if (stopped) { log.info("Unregistering ServerViewCallback"); @@ -766,6 +768,26 @@ public class RealtimePlumber implements Plumber return ServerView.CallbackAction.CONTINUE; } + }, + new Predicate() + { + @Override + public boolean apply(final DataSegment segment) + { + return + schema.getDataSource().equals(segment.getDataSource()) + && config.getShardSpec().getPartitionNum() == segment.getShardSpec().getPartitionNum() + && Iterables.any( + sinks.keySet(), new Predicate() + { + @Override + public boolean apply(Long sinkKey) + { + return segment.getInterval().contains(sinkKey); + } + } + ); + } } ); } diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumberSchool.java b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumberSchool.java index 830b860611a..eb52a30ba31 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumberSchool.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumberSchool.java @@ -25,6 +25,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; import com.metamx.common.Granularity; import com.metamx.emitter.service.ServiceEmitter; +import io.druid.client.FilteredServerView; import io.druid.client.ServerView; import io.druid.guice.annotations.Processing; import io.druid.query.QueryRunnerFactoryConglomerate; @@ -48,7 +49,7 @@ public class RealtimePlumberSchool implements PlumberSchool private final DataSegmentPusher dataSegmentPusher; private final DataSegmentAnnouncer segmentAnnouncer; private final SegmentPublisher segmentPublisher; - private final ServerView serverView; + private final FilteredServerView serverView; private final ExecutorService queryExecutorService; // Backwards compatible @@ -66,7 +67,7 @@ public class RealtimePlumberSchool implements PlumberSchool @JacksonInject DataSegmentPusher dataSegmentPusher, @JacksonInject DataSegmentAnnouncer segmentAnnouncer, @JacksonInject SegmentPublisher segmentPublisher, - @JacksonInject ServerView serverView, + @JacksonInject FilteredServerView serverView, @JacksonInject @Processing ExecutorService executorService, // Backwards compatible @JsonProperty("windowPeriod") Period windowPeriod, diff --git a/server/src/main/java/io/druid/server/bridge/BridgeZkCoordinator.java b/server/src/main/java/io/druid/server/bridge/BridgeZkCoordinator.java index 400b270779c..9684eb1f19e 100644 --- a/server/src/main/java/io/druid/server/bridge/BridgeZkCoordinator.java +++ b/server/src/main/java/io/druid/server/bridge/BridgeZkCoordinator.java @@ -92,7 +92,7 @@ public class BridgeZkCoordinator extends BaseZkCoordinator { @Override public ServerView.CallbackAction segmentAdded( - DruidServer server, DataSegment theSegment + DruidServerMetadata server, DataSegment theSegment ) { if (theSegment.equals(segment)) { @@ -118,7 +118,7 @@ public class BridgeZkCoordinator extends BaseZkCoordinator { @Override public ServerView.CallbackAction segmentRemoved( - DruidServer server, DataSegment theSegment + DruidServerMetadata server, DataSegment theSegment ) { if (theSegment.equals(segment)) { diff --git a/server/src/main/java/io/druid/server/bridge/DruidClusterBridge.java b/server/src/main/java/io/druid/server/bridge/DruidClusterBridge.java index a029fe855f5..f0e7355a1c6 100644 --- a/server/src/main/java/io/druid/server/bridge/DruidClusterBridge.java +++ b/server/src/main/java/io/druid/server/bridge/DruidClusterBridge.java @@ -125,7 +125,7 @@ public class DruidClusterBridge { @Override public ServerView.CallbackAction segmentAdded( - DruidServer server, DataSegment segment + DruidServerMetadata server, DataSegment segment ) { try { @@ -147,7 +147,7 @@ public class DruidClusterBridge } @Override - public ServerView.CallbackAction segmentRemoved(DruidServer server, DataSegment segment) + public ServerView.CallbackAction segmentRemoved(DruidServerMetadata server, DataSegment segment) { try { synchronized (lock) { @@ -172,7 +172,7 @@ public class DruidClusterBridge { try { for (DataSegment dataSegment : server.getSegments().values()) { - serverRemovedSegment(dataSegmentAnnouncer, dataSegment, server); + serverRemovedSegment(dataSegmentAnnouncer, dataSegment, server.getMetadata()); } } catch (Exception e) { @@ -370,7 +370,7 @@ public class DruidClusterBridge } } - private void serverRemovedSegment(DataSegmentAnnouncer dataSegmentAnnouncer, DataSegment segment, DruidServer server) + private void serverRemovedSegment(DataSegmentAnnouncer dataSegmentAnnouncer, DataSegment segment, DruidServerMetadata server) throws IOException { Integer count = segments.get(segment); diff --git a/server/src/main/java/io/druid/server/coordination/DruidServerMetadata.java b/server/src/main/java/io/druid/server/coordination/DruidServerMetadata.java index fe447d7350b..c32872523cd 100644 --- a/server/src/main/java/io/druid/server/coordination/DruidServerMetadata.java +++ b/server/src/main/java/io/druid/server/coordination/DruidServerMetadata.java @@ -87,6 +87,11 @@ public class DruidServerMetadata return priority; } + public boolean isAssignable() + { + return getType().equalsIgnoreCase("historical") || getType().equalsIgnoreCase("bridge"); + } + @Override public boolean equals(Object o) { diff --git a/server/src/test/java/io/druid/client/client/BatchServerInventoryViewTest.java b/server/src/test/java/io/druid/client/client/BatchServerInventoryViewTest.java index f3bd64a40f6..4203193a78f 100644 --- a/server/src/test/java/io/druid/client/client/BatchServerInventoryViewTest.java +++ b/server/src/test/java/io/druid/client/client/BatchServerInventoryViewTest.java @@ -131,7 +131,8 @@ public class BatchServerInventoryViewTest } }, cf, - jsonMapper + jsonMapper, + null ); batchServerInventoryView.start(); diff --git a/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java b/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java index 884baeb1054..ddcb503af58 100644 --- a/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java +++ b/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java @@ -19,6 +19,7 @@ package io.druid.segment.realtime.plumber; +import com.google.common.base.Predicate; import com.google.common.base.Stopwatch; import com.google.common.collect.Maps; import com.google.common.io.Files; @@ -27,6 +28,7 @@ import com.metamx.common.Granularity; import com.metamx.common.ISE; import com.metamx.common.exception.FormattedException; import com.metamx.emitter.service.ServiceEmitter; +import io.druid.client.FilteredServerView; import io.druid.client.ServerView; import io.druid.data.input.InputRow; import io.druid.data.input.impl.ParseSpec; @@ -48,11 +50,8 @@ import io.druid.segment.realtime.FireDepartmentMetrics; import io.druid.segment.realtime.SegmentPublisher; import io.druid.server.coordination.DataSegmentAnnouncer; import io.druid.timeline.DataSegment; -import junit.framework.Assert; import org.apache.commons.lang.mutable.MutableBoolean; import org.easymock.EasyMock; -import org.joda.time.DateTime; -import org.joda.time.Interval; import org.joda.time.Period; import org.junit.After; import org.junit.Before; @@ -71,7 +70,7 @@ public class RealtimePlumberSchoolTest private DataSegmentAnnouncer announcer; private SegmentPublisher segmentPublisher; private DataSegmentPusher dataSegmentPusher; - private ServerView serverView; + private FilteredServerView serverView; private ServiceEmitter emitter; @Before @@ -114,10 +113,11 @@ public class RealtimePlumberSchoolTest segmentPublisher = EasyMock.createMock(SegmentPublisher.class); dataSegmentPusher = EasyMock.createMock(DataSegmentPusher.class); - serverView = EasyMock.createMock(ServerView.class); + serverView = EasyMock.createMock(FilteredServerView.class); serverView.registerSegmentCallback( EasyMock.anyObject(), - EasyMock.anyObject() + EasyMock.anyObject(), + EasyMock.>anyObject() ); EasyMock.expectLastCall().anyTimes();