FilteredServerView

This commit is contained in:
Xavier Léauté 2014-05-28 16:35:21 -07:00
parent 53638a74ec
commit d98a10a7d8
21 changed files with 371 additions and 55 deletions

View File

@ -21,33 +21,42 @@ package io.druid.client;
import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Predicate;
import com.google.common.base.Predicates;
import com.google.common.collect.MapMaker; import com.google.common.collect.MapMaker;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.metamx.common.ISE; import com.metamx.common.ISE;
import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.EmittingLogger;
import io.druid.guice.ManageLifecycle; import io.druid.guice.ManageLifecycle;
import io.druid.segment.filter.Filters;
import io.druid.server.coordination.DruidServerMetadata;
import io.druid.server.initialization.ZkPathsConfig; import io.druid.server.initialization.ZkPathsConfig;
import io.druid.timeline.DataSegment; import io.druid.timeline.DataSegment;
import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFramework;
import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
/** /**
*/ */
@ManageLifecycle @ManageLifecycle
public class BatchServerInventoryView extends ServerInventoryView<Set<DataSegment>> public class BatchServerInventoryView extends ServerInventoryView<Set<DataSegment>> implements FilteredServerView
{ {
private static final EmittingLogger log = new EmittingLogger(BatchServerInventoryView.class); private static final EmittingLogger log = new EmittingLogger(BatchServerInventoryView.class);
final ConcurrentMap<String, Set<DataSegment>> zNodes = new MapMaker().makeMap(); final private ConcurrentMap<String, Set<DataSegment>> zNodes = new MapMaker().makeMap();
final private Map<SegmentCallback, Predicate<DataSegment>> segmentPredicates = new MapMaker().makeMap();
final private Predicate<DataSegment> defaultFilter;
@Inject @Inject
public BatchServerInventoryView( public BatchServerInventoryView(
final ZkPathsConfig zkPaths, final ZkPathsConfig zkPaths,
final CuratorFramework curator, final CuratorFramework curator,
final ObjectMapper jsonMapper final ObjectMapper jsonMapper,
final Predicate<DataSegment> defaultFilter
) )
{ {
super( super(
@ -58,6 +67,12 @@ public class BatchServerInventoryView extends ServerInventoryView<Set<DataSegmen
jsonMapper, jsonMapper,
new TypeReference<Set<DataSegment>>(){} new TypeReference<Set<DataSegment>>(){}
); );
if(defaultFilter != null) {
this.defaultFilter = defaultFilter;
} else {
this.defaultFilter = Predicates.alwaysTrue();
}
} }
@Override @Override
@ -67,8 +82,11 @@ public class BatchServerInventoryView extends ServerInventoryView<Set<DataSegmen
final Set<DataSegment> inventory final Set<DataSegment> inventory
) )
{ {
zNodes.put(inventoryKey, inventory); Predicate<DataSegment> predicate = Predicates.or(defaultFilter, Predicates.or(segmentPredicates.values()));
for (DataSegment segment : inventory) { Set<DataSegment> filteredInventory = Sets.filter(inventory, predicate);
zNodes.put(inventoryKey, filteredInventory);
for (DataSegment segment : filteredInventory) {
addSingleInventory(container, segment); addSingleInventory(container, segment);
} }
return container; return container;
@ -79,18 +97,21 @@ public class BatchServerInventoryView extends ServerInventoryView<Set<DataSegmen
DruidServer container, String inventoryKey, Set<DataSegment> inventory DruidServer container, String inventoryKey, Set<DataSegment> inventory
) )
{ {
Predicate<DataSegment> predicate = Predicates.or(defaultFilter, Predicates.or(segmentPredicates.values()));
Set<DataSegment> filteredInventory = Sets.filter(inventory, predicate);
Set<DataSegment> existing = zNodes.get(inventoryKey); Set<DataSegment> existing = zNodes.get(inventoryKey);
if (existing == null) { if (existing == null) {
throw new ISE("Trying to update an inventoryKey[%s] that didn't exist?!", inventoryKey); throw new ISE("Trying to update an inventoryKey[%s] that didn't exist?!", inventoryKey);
} }
for (DataSegment segment : Sets.difference(inventory, existing)) { for (DataSegment segment : Sets.difference(filteredInventory, existing)) {
addSingleInventory(container, segment); addSingleInventory(container, segment);
} }
for (DataSegment segment : Sets.difference(existing, inventory)) { for (DataSegment segment : Sets.difference(existing, filteredInventory)) {
removeSingleInventory(container, segment.getIdentifier()); removeSingleInventory(container, segment.getIdentifier());
} }
zNodes.put(inventoryKey, inventory); zNodes.put(inventoryKey, filteredInventory);
return container; return container;
} }
@ -111,4 +132,40 @@ public class BatchServerInventoryView extends ServerInventoryView<Set<DataSegmen
} }
return container; return container;
} }
@Override
public void registerSegmentCallback(
Executor exec, final SegmentCallback callback, Predicate<DataSegment> filter
)
{
segmentPredicates.put(callback, filter);
registerSegmentCallback(
exec, new SegmentCallback()
{
@Override
public CallbackAction segmentAdded(
DruidServerMetadata server, DataSegment segment
)
{
final CallbackAction action = callback.segmentAdded(server, segment);
if (action.equals(CallbackAction.UNREGISTER)) {
segmentPredicates.remove(callback);
}
return action;
}
@Override
public CallbackAction segmentRemoved(
DruidServerMetadata server, DataSegment segment
)
{
final CallbackAction action = callback.segmentRemoved(server, segment);
if (action.equals(CallbackAction.UNREGISTER)) {
segmentPredicates.remove(callback);
}
return action;
}
}
);
}
} }

View File

@ -43,8 +43,8 @@ public class BatchServerInventoryViewProvider implements ServerInventoryViewProv
private ObjectMapper jsonMapper = null; private ObjectMapper jsonMapper = null;
@Override @Override
public ServerInventoryView get() public BatchServerInventoryView get()
{ {
return new BatchServerInventoryView(zkPaths, curator, jsonMapper); return new BatchServerInventoryView(zkPaths, curator, jsonMapper, null);
} }
} }

View File

@ -36,6 +36,7 @@ import io.druid.query.QueryDataSource;
import io.druid.query.QueryRunner; import io.druid.query.QueryRunner;
import io.druid.query.QueryToolChestWarehouse; import io.druid.query.QueryToolChestWarehouse;
import io.druid.query.TableDataSource; import io.druid.query.TableDataSource;
import io.druid.server.coordination.DruidServerMetadata;
import io.druid.timeline.DataSegment; import io.druid.timeline.DataSegment;
import io.druid.timeline.VersionedIntervalTimeline; import io.druid.timeline.VersionedIntervalTimeline;
import io.druid.timeline.partition.PartitionChunk; import io.druid.timeline.partition.PartitionChunk;
@ -61,7 +62,7 @@ public class BrokerServerView implements TimelineServerView
private final QueryToolChestWarehouse warehouse; private final QueryToolChestWarehouse warehouse;
private final ObjectMapper smileMapper; private final ObjectMapper smileMapper;
private final HttpClient httpClient; private final HttpClient httpClient;
private final ServerView baseView; private final ServerInventoryView baseView;
private final TierSelectorStrategy tierSelectorStrategy; private final TierSelectorStrategy tierSelectorStrategy;
@Inject @Inject
@ -69,7 +70,7 @@ public class BrokerServerView implements TimelineServerView
QueryToolChestWarehouse warehouse, QueryToolChestWarehouse warehouse,
ObjectMapper smileMapper, ObjectMapper smileMapper,
@Client HttpClient httpClient, @Client HttpClient httpClient,
ServerView baseView, ServerInventoryView baseView,
TierSelectorStrategy tierSelectorStrategy TierSelectorStrategy tierSelectorStrategy
) )
{ {
@ -89,14 +90,14 @@ public class BrokerServerView implements TimelineServerView
new ServerView.SegmentCallback() new ServerView.SegmentCallback()
{ {
@Override @Override
public ServerView.CallbackAction segmentAdded(DruidServer server, DataSegment segment) public ServerView.CallbackAction segmentAdded(DruidServerMetadata server, DataSegment segment)
{ {
serverAddedSegment(server, segment); serverAddedSegment(server, segment);
return ServerView.CallbackAction.CONTINUE; return ServerView.CallbackAction.CONTINUE;
} }
@Override @Override
public ServerView.CallbackAction segmentRemoved(final DruidServer server, DataSegment segment) public ServerView.CallbackAction segmentRemoved(final DruidServerMetadata server, DataSegment segment)
{ {
serverRemovedSegment(server, segment); serverRemovedSegment(server, segment);
return ServerView.CallbackAction.CONTINUE; return ServerView.CallbackAction.CONTINUE;
@ -159,12 +160,12 @@ public class BrokerServerView implements TimelineServerView
private QueryableDruidServer removeServer(DruidServer server) private QueryableDruidServer removeServer(DruidServer server)
{ {
for (DataSegment segment : server.getSegments().values()) { for (DataSegment segment : server.getSegments().values()) {
serverRemovedSegment(server, segment); serverRemovedSegment(server.getMetadata(), segment);
} }
return clients.remove(server.getName()); return clients.remove(server.getName());
} }
private void serverAddedSegment(final DruidServer server, final DataSegment segment) private void serverAddedSegment(final DruidServerMetadata server, final DataSegment segment)
{ {
String segmentId = segment.getIdentifier(); String segmentId = segment.getIdentifier();
synchronized (lock) { synchronized (lock) {
@ -176,7 +177,7 @@ public class BrokerServerView implements TimelineServerView
VersionedIntervalTimeline<String, ServerSelector> timeline = timelines.get(segment.getDataSource()); VersionedIntervalTimeline<String, ServerSelector> timeline = timelines.get(segment.getDataSource());
if (timeline == null) { if (timeline == null) {
timeline = new VersionedIntervalTimeline<String, ServerSelector>(Ordering.natural()); timeline = new VersionedIntervalTimeline<>(Ordering.natural());
timelines.put(segment.getDataSource(), timeline); timelines.put(segment.getDataSource(), timeline);
} }
@ -186,13 +187,13 @@ public class BrokerServerView implements TimelineServerView
QueryableDruidServer queryableDruidServer = clients.get(server.getName()); QueryableDruidServer queryableDruidServer = clients.get(server.getName());
if (queryableDruidServer == null) { if (queryableDruidServer == null) {
queryableDruidServer = addServer(server); queryableDruidServer = addServer(baseView.getInventoryValue(server.getName()));
} }
selector.addServer(queryableDruidServer); selector.addServer(queryableDruidServer);
} }
} }
private void serverRemovedSegment(DruidServer server, DataSegment segment) private void serverRemovedSegment(DruidServerMetadata server, DataSegment segment)
{ {
String segmentId = segment.getIdentifier(); String segmentId = segment.getIdentifier();
final ServerSelector selector; final ServerSelector selector;

View File

@ -55,6 +55,7 @@ import io.druid.query.Result;
import io.druid.query.SegmentDescriptor; import io.druid.query.SegmentDescriptor;
import io.druid.query.aggregation.MetricManipulatorFns; import io.druid.query.aggregation.MetricManipulatorFns;
import io.druid.query.spec.MultipleSpecificSegmentSpec; import io.druid.query.spec.MultipleSpecificSegmentSpec;
import io.druid.server.coordination.DruidServerMetadata;
import io.druid.timeline.DataSegment; import io.druid.timeline.DataSegment;
import io.druid.timeline.TimelineObjectHolder; import io.druid.timeline.TimelineObjectHolder;
import io.druid.timeline.VersionedIntervalTimeline; import io.druid.timeline.VersionedIntervalTimeline;
@ -104,7 +105,7 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
new ServerView.BaseSegmentCallback() new ServerView.BaseSegmentCallback()
{ {
@Override @Override
public ServerView.CallbackAction segmentRemoved(DruidServer server, DataSegment segment) public ServerView.CallbackAction segmentRemoved(DruidServerMetadata server, DataSegment segment)
{ {
CachingClusteredClient.this.cache.close(segment.getIdentifier()); CachingClusteredClient.this.cache.close(segment.getIdentifier());
return ServerView.CallbackAction.CONTINUE; return ServerView.CallbackAction.CONTINUE;

View File

@ -125,6 +125,11 @@ public class DruidServer implements Comparable
return metadata.getTier(); return metadata.getTier();
} }
public boolean isAssignable()
{
return metadata.isAssignable();
}
@JsonProperty @JsonProperty
public int getPriority() public int getPriority()
{ {
@ -138,11 +143,6 @@ public class DruidServer implements Comparable
return Collections.unmodifiableMap(segments); return Collections.unmodifiableMap(segments);
} }
public boolean isAssignable()
{
return getType().equalsIgnoreCase("historical") || getType().equalsIgnoreCase("bridge");
}
public DataSegment getSegment(String segmentName) public DataSegment getSegment(String segmentName)
{ {
return segments.get(segmentName); return segments.get(segmentName);

View File

@ -0,0 +1,50 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013, 2014 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.client;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Predicates;
import io.druid.server.initialization.ZkPathsConfig;
import io.druid.timeline.DataSegment;
import org.apache.curator.framework.CuratorFramework;
import javax.validation.constraints.NotNull;
public class FilteredBatchServerViewProvider implements FilteredServerViewProvider
{
@JacksonInject
@NotNull
private ZkPathsConfig zkPaths = null;
@JacksonInject
@NotNull
private CuratorFramework curator = null;
@JacksonInject
@NotNull
private ObjectMapper jsonMapper = null;
@Override
public BatchServerInventoryView get()
{
return new BatchServerInventoryView(zkPaths, curator, jsonMapper, Predicates.<DataSegment>alwaysFalse());
}
}

View File

@ -0,0 +1,32 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013, 2014 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.client;
import com.google.common.base.Predicate;
import io.druid.timeline.DataSegment;
import java.util.concurrent.Executor;
public interface FilteredServerView
{
public void registerSegmentCallback(
Executor exec, ServerView.SegmentCallback callback, Predicate<DataSegment> filter
);
}

View File

@ -0,0 +1,34 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013, 2014 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.client;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.google.inject.Provider;
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = SingleServerInventoryProvider.class)
@JsonSubTypes(value = {
@JsonSubTypes.Type(name = "legacy", value = FilteredSingleServerViewProvider.class),
@JsonSubTypes.Type(name = "batch", value = FilteredBatchServerViewProvider.class)
})
public interface FilteredServerViewProvider extends Provider<FilteredServerView>
{
}

View File

@ -0,0 +1,50 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013, 2014 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.client;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Predicates;
import io.druid.server.initialization.ZkPathsConfig;
import io.druid.timeline.DataSegment;
import org.apache.curator.framework.CuratorFramework;
import javax.validation.constraints.NotNull;
public class FilteredSingleServerViewProvider implements FilteredServerViewProvider
{
@JacksonInject
@NotNull
private ZkPathsConfig zkPaths = null;
@JacksonInject
@NotNull
private CuratorFramework curator = null;
@JacksonInject
@NotNull
private ObjectMapper jsonMapper = null;
@Override
public SingleServerInventoryView get()
{
return new SingleServerInventoryView(zkPaths, curator, jsonMapper, Predicates.<DataSegment>alwaysFalse());
}
}

View File

@ -290,7 +290,7 @@ public abstract class ServerInventoryView<InventoryType> implements ServerView,
@Override @Override
public CallbackAction apply(SegmentCallback input) public CallbackAction apply(SegmentCallback input)
{ {
return input.segmentAdded(container, inventory); return input.segmentAdded(container.getMetadata(), inventory);
} }
} }
); );
@ -319,7 +319,7 @@ public abstract class ServerInventoryView<InventoryType> implements ServerView,
@Override @Override
public CallbackAction apply(SegmentCallback input) public CallbackAction apply(SegmentCallback input)
{ {
return input.segmentRemoved(container, segment); return input.segmentRemoved(container.getMetadata(), segment);
} }
} }
); );

View File

@ -19,6 +19,7 @@
package io.druid.client; package io.druid.client;
import io.druid.server.coordination.DruidServerMetadata;
import io.druid.timeline.DataSegment; import io.druid.timeline.DataSegment;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
@ -72,7 +73,7 @@ public interface ServerView
* @return UNREGISTER if the callback has completed its work and should be unregistered. CONTINUE if the callback * @return UNREGISTER if the callback has completed its work and should be unregistered. CONTINUE if the callback
* should remain registered. * should remain registered.
*/ */
public CallbackAction segmentAdded(DruidServer server, DataSegment segment); public CallbackAction segmentAdded(DruidServerMetadata server, DataSegment segment);
/** /**
* Called when a segment is removed from a server. * Called when a segment is removed from a server.
@ -89,19 +90,19 @@ public interface ServerView
* @return UNREGISTER if the callback has completed its work and should be unregistered. CONTINUE if the callback * @return UNREGISTER if the callback has completed its work and should be unregistered. CONTINUE if the callback
* should remain registered. * should remain registered.
*/ */
public CallbackAction segmentRemoved(DruidServer server, DataSegment segment); public CallbackAction segmentRemoved(DruidServerMetadata server, DataSegment segment);
} }
public static abstract class BaseSegmentCallback implements SegmentCallback public static abstract class BaseSegmentCallback implements SegmentCallback
{ {
@Override @Override
public CallbackAction segmentAdded(DruidServer server, DataSegment segment) public CallbackAction segmentAdded(DruidServerMetadata server, DataSegment segment)
{ {
return CallbackAction.CONTINUE; return CallbackAction.CONTINUE;
} }
@Override @Override
public CallbackAction segmentRemoved(DruidServer server, DataSegment segment) public CallbackAction segmentRemoved(DruidServerMetadata server, DataSegment segment)
{ {
return CallbackAction.CONTINUE; return CallbackAction.CONTINUE;
} }

View File

@ -45,6 +45,6 @@ public class SingleServerInventoryProvider implements ServerInventoryViewProvide
@Override @Override
public ServerInventoryView get() public ServerInventoryView get()
{ {
return new SingleServerInventoryView(zkPaths, curator, jsonMapper); return new SingleServerInventoryView(zkPaths, curator, jsonMapper, null);
} }
} }

View File

@ -21,25 +21,38 @@ package io.druid.client;
import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Predicate;
import com.google.common.base.Predicates;
import com.google.common.collect.MapMaker;
import com.google.common.collect.Sets;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.EmittingLogger;
import io.druid.guice.ManageLifecycle; import io.druid.guice.ManageLifecycle;
import io.druid.server.coordination.DruidServerMetadata;
import io.druid.server.initialization.ZkPathsConfig; import io.druid.server.initialization.ZkPathsConfig;
import io.druid.timeline.DataSegment; import io.druid.timeline.DataSegment;
import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFramework;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executor;
/** /**
*/ */
@ManageLifecycle @ManageLifecycle
public class SingleServerInventoryView extends ServerInventoryView<DataSegment> public class SingleServerInventoryView extends ServerInventoryView<DataSegment> implements FilteredServerView
{ {
private static final EmittingLogger log = new EmittingLogger(SingleServerInventoryView.class); private static final EmittingLogger log = new EmittingLogger(SingleServerInventoryView.class);
final private Map<SegmentCallback, Predicate<DataSegment>> segmentPredicates = new MapMaker().makeMap();
private final Predicate<DataSegment> defaultFilter;
@Inject @Inject
public SingleServerInventoryView( public SingleServerInventoryView(
final ZkPathsConfig zkPaths, final ZkPathsConfig zkPaths,
final CuratorFramework curator, final CuratorFramework curator,
final ObjectMapper jsonMapper final ObjectMapper jsonMapper,
final Predicate<DataSegment> defaultFilter
) )
{ {
super( super(
@ -50,14 +63,22 @@ public class SingleServerInventoryView extends ServerInventoryView<DataSegment>
jsonMapper, jsonMapper,
new TypeReference<DataSegment>(){} new TypeReference<DataSegment>(){}
); );
}
if(defaultFilter != null) {
this.defaultFilter = defaultFilter;
} else {
this.defaultFilter = Predicates.alwaysTrue();
} }
@Override @Override
protected DruidServer addInnerInventory( protected DruidServer addInnerInventory(
DruidServer container, String inventoryKey, DataSegment inventory DruidServer container, String inventoryKey, DataSegment inventory
) )
{ {
addSingleInventory(container, inventory); Predicate<DataSegment> predicate = Predicates.or(defaultFilter, Predicates.or(segmentPredicates.values()));
if(predicate.apply(inventory)) {
addSingleInventory(container, inventory);
}
return container; return container;
} }
@ -75,4 +96,40 @@ public class SingleServerInventoryView extends ServerInventoryView<DataSegment>
removeSingleInventory(container, inventoryKey); removeSingleInventory(container, inventoryKey);
return container; return container;
} }
@Override
public void registerSegmentCallback(
Executor exec, final SegmentCallback callback, Predicate<DataSegment> filter
)
{
segmentPredicates.put(callback, filter);
registerSegmentCallback(
exec, new SegmentCallback()
{
@Override
public CallbackAction segmentAdded(
DruidServerMetadata server, DataSegment segment
)
{
final CallbackAction action = callback.segmentAdded(server, segment);
if (action.equals(CallbackAction.UNREGISTER)) {
segmentPredicates.remove(callback);
}
return action;
}
@Override
public CallbackAction segmentRemoved(
DruidServerMetadata server, DataSegment segment
)
{
final CallbackAction action = callback.segmentRemoved(server, segment);
if (action.equals(CallbackAction.UNREGISTER)) {
segmentPredicates.remove(callback);
}
return action;
}
}
);
}
} }

View File

@ -21,6 +21,8 @@ package io.druid.guice;
import com.google.inject.Binder; import com.google.inject.Binder;
import com.google.inject.Module; import com.google.inject.Module;
import io.druid.client.FilteredServerView;
import io.druid.client.FilteredServerViewProvider;
import io.druid.client.InventoryView; import io.druid.client.InventoryView;
import io.druid.client.ServerInventoryView; import io.druid.client.ServerInventoryView;
import io.druid.client.ServerInventoryViewProvider; import io.druid.client.ServerInventoryViewProvider;
@ -34,8 +36,10 @@ public class ServerViewModule implements Module
public void configure(Binder binder) public void configure(Binder binder)
{ {
JsonConfigProvider.bind(binder, "druid.announcer", ServerInventoryViewProvider.class); JsonConfigProvider.bind(binder, "druid.announcer", ServerInventoryViewProvider.class);
JsonConfigProvider.bind(binder, "druid.announcer", FilteredServerViewProvider.class);
binder.bind(InventoryView.class).to(ServerInventoryView.class); binder.bind(InventoryView.class).to(ServerInventoryView.class);
binder.bind(ServerView.class).to(ServerInventoryView.class); binder.bind(ServerView.class).to(ServerInventoryView.class);
binder.bind(ServerInventoryView.class).toProvider(ServerInventoryViewProvider.class).in(ManageLifecycle.class); binder.bind(ServerInventoryView.class).toProvider(ServerInventoryViewProvider.class).in(ManageLifecycle.class);
binder.bind(FilteredServerView.class).toProvider(FilteredServerViewProvider.class).in(ManageLifecycle.class);
} }
} }

View File

@ -2,6 +2,7 @@ package io.druid.segment.realtime.plumber;
import com.google.common.base.Function; import com.google.common.base.Function;
import com.google.common.base.Joiner; import com.google.common.base.Joiner;
import com.google.common.base.Predicate;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import com.google.common.collect.Iterables; import com.google.common.collect.Iterables;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
@ -16,7 +17,7 @@ import com.metamx.common.guava.FunctionalIterable;
import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.service.ServiceEmitter; import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceMetricEvent; import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.client.DruidServer; import io.druid.client.FilteredServerView;
import io.druid.client.ServerView; import io.druid.client.ServerView;
import io.druid.common.guava.ThreadRenamingCallable; import io.druid.common.guava.ThreadRenamingCallable;
import io.druid.common.guava.ThreadRenamingRunnable; import io.druid.common.guava.ThreadRenamingRunnable;
@ -43,6 +44,7 @@ import io.druid.segment.realtime.FireDepartmentMetrics;
import io.druid.segment.realtime.FireHydrant; import io.druid.segment.realtime.FireHydrant;
import io.druid.segment.realtime.SegmentPublisher; import io.druid.segment.realtime.SegmentPublisher;
import io.druid.server.coordination.DataSegmentAnnouncer; import io.druid.server.coordination.DataSegmentAnnouncer;
import io.druid.server.coordination.DruidServerMetadata;
import io.druid.timeline.DataSegment; import io.druid.timeline.DataSegment;
import io.druid.timeline.TimelineObjectHolder; import io.druid.timeline.TimelineObjectHolder;
import io.druid.timeline.VersionedIntervalTimeline; import io.druid.timeline.VersionedIntervalTimeline;
@ -82,7 +84,7 @@ public class RealtimePlumber implements Plumber
private final ExecutorService queryExecutorService; private final ExecutorService queryExecutorService;
private final DataSegmentPusher dataSegmentPusher; private final DataSegmentPusher dataSegmentPusher;
private final SegmentPublisher segmentPublisher; private final SegmentPublisher segmentPublisher;
private final ServerView serverView; private final FilteredServerView serverView;
private final Object handoffCondition = new Object(); private final Object handoffCondition = new Object();
private final Map<Long, Sink> sinks = Maps.newConcurrentMap(); private final Map<Long, Sink> sinks = Maps.newConcurrentMap();
private final VersionedIntervalTimeline<String, Sink> sinkTimeline = new VersionedIntervalTimeline<String, Sink>( private final VersionedIntervalTimeline<String, Sink> sinkTimeline = new VersionedIntervalTimeline<String, Sink>(
@ -104,7 +106,7 @@ public class RealtimePlumber implements Plumber
ExecutorService queryExecutorService, ExecutorService queryExecutorService,
DataSegmentPusher dataSegmentPusher, DataSegmentPusher dataSegmentPusher,
SegmentPublisher segmentPublisher, SegmentPublisher segmentPublisher,
ServerView serverView FilteredServerView serverView
) )
{ {
this.schema = schema; this.schema = schema;
@ -731,7 +733,7 @@ public class RealtimePlumber implements Plumber
new ServerView.BaseSegmentCallback() new ServerView.BaseSegmentCallback()
{ {
@Override @Override
public ServerView.CallbackAction segmentAdded(DruidServer server, DataSegment segment) public ServerView.CallbackAction segmentAdded(DruidServerMetadata server, DataSegment segment)
{ {
if (stopped) { if (stopped) {
log.info("Unregistering ServerViewCallback"); log.info("Unregistering ServerViewCallback");
@ -766,6 +768,26 @@ public class RealtimePlumber implements Plumber
return ServerView.CallbackAction.CONTINUE; return ServerView.CallbackAction.CONTINUE;
} }
},
new Predicate<DataSegment>()
{
@Override
public boolean apply(final DataSegment segment)
{
return
schema.getDataSource().equals(segment.getDataSource())
&& config.getShardSpec().getPartitionNum() == segment.getShardSpec().getPartitionNum()
&& Iterables.any(
sinks.keySet(), new Predicate<Long>()
{
@Override
public boolean apply(Long sinkKey)
{
return segment.getInterval().contains(sinkKey);
}
}
);
}
} }
); );
} }

View File

@ -25,6 +25,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.metamx.common.Granularity; import com.metamx.common.Granularity;
import com.metamx.emitter.service.ServiceEmitter; import com.metamx.emitter.service.ServiceEmitter;
import io.druid.client.FilteredServerView;
import io.druid.client.ServerView; import io.druid.client.ServerView;
import io.druid.guice.annotations.Processing; import io.druid.guice.annotations.Processing;
import io.druid.query.QueryRunnerFactoryConglomerate; import io.druid.query.QueryRunnerFactoryConglomerate;
@ -48,7 +49,7 @@ public class RealtimePlumberSchool implements PlumberSchool
private final DataSegmentPusher dataSegmentPusher; private final DataSegmentPusher dataSegmentPusher;
private final DataSegmentAnnouncer segmentAnnouncer; private final DataSegmentAnnouncer segmentAnnouncer;
private final SegmentPublisher segmentPublisher; private final SegmentPublisher segmentPublisher;
private final ServerView serverView; private final FilteredServerView serverView;
private final ExecutorService queryExecutorService; private final ExecutorService queryExecutorService;
// Backwards compatible // Backwards compatible
@ -66,7 +67,7 @@ public class RealtimePlumberSchool implements PlumberSchool
@JacksonInject DataSegmentPusher dataSegmentPusher, @JacksonInject DataSegmentPusher dataSegmentPusher,
@JacksonInject DataSegmentAnnouncer segmentAnnouncer, @JacksonInject DataSegmentAnnouncer segmentAnnouncer,
@JacksonInject SegmentPublisher segmentPublisher, @JacksonInject SegmentPublisher segmentPublisher,
@JacksonInject ServerView serverView, @JacksonInject FilteredServerView serverView,
@JacksonInject @Processing ExecutorService executorService, @JacksonInject @Processing ExecutorService executorService,
// Backwards compatible // Backwards compatible
@JsonProperty("windowPeriod") Period windowPeriod, @JsonProperty("windowPeriod") Period windowPeriod,

View File

@ -92,7 +92,7 @@ public class BridgeZkCoordinator extends BaseZkCoordinator
{ {
@Override @Override
public ServerView.CallbackAction segmentAdded( public ServerView.CallbackAction segmentAdded(
DruidServer server, DataSegment theSegment DruidServerMetadata server, DataSegment theSegment
) )
{ {
if (theSegment.equals(segment)) { if (theSegment.equals(segment)) {
@ -118,7 +118,7 @@ public class BridgeZkCoordinator extends BaseZkCoordinator
{ {
@Override @Override
public ServerView.CallbackAction segmentRemoved( public ServerView.CallbackAction segmentRemoved(
DruidServer server, DataSegment theSegment DruidServerMetadata server, DataSegment theSegment
) )
{ {
if (theSegment.equals(segment)) { if (theSegment.equals(segment)) {

View File

@ -125,7 +125,7 @@ public class DruidClusterBridge
{ {
@Override @Override
public ServerView.CallbackAction segmentAdded( public ServerView.CallbackAction segmentAdded(
DruidServer server, DataSegment segment DruidServerMetadata server, DataSegment segment
) )
{ {
try { try {
@ -147,7 +147,7 @@ public class DruidClusterBridge
} }
@Override @Override
public ServerView.CallbackAction segmentRemoved(DruidServer server, DataSegment segment) public ServerView.CallbackAction segmentRemoved(DruidServerMetadata server, DataSegment segment)
{ {
try { try {
synchronized (lock) { synchronized (lock) {
@ -172,7 +172,7 @@ public class DruidClusterBridge
{ {
try { try {
for (DataSegment dataSegment : server.getSegments().values()) { for (DataSegment dataSegment : server.getSegments().values()) {
serverRemovedSegment(dataSegmentAnnouncer, dataSegment, server); serverRemovedSegment(dataSegmentAnnouncer, dataSegment, server.getMetadata());
} }
} }
catch (Exception e) { catch (Exception e) {
@ -370,7 +370,7 @@ public class DruidClusterBridge
} }
} }
private void serverRemovedSegment(DataSegmentAnnouncer dataSegmentAnnouncer, DataSegment segment, DruidServer server) private void serverRemovedSegment(DataSegmentAnnouncer dataSegmentAnnouncer, DataSegment segment, DruidServerMetadata server)
throws IOException throws IOException
{ {
Integer count = segments.get(segment); Integer count = segments.get(segment);

View File

@ -87,6 +87,11 @@ public class DruidServerMetadata
return priority; return priority;
} }
public boolean isAssignable()
{
return getType().equalsIgnoreCase("historical") || getType().equalsIgnoreCase("bridge");
}
@Override @Override
public boolean equals(Object o) public boolean equals(Object o)
{ {

View File

@ -131,7 +131,8 @@ public class BatchServerInventoryViewTest
} }
}, },
cf, cf,
jsonMapper jsonMapper,
null
); );
batchServerInventoryView.start(); batchServerInventoryView.start();

View File

@ -19,6 +19,7 @@
package io.druid.segment.realtime.plumber; package io.druid.segment.realtime.plumber;
import com.google.common.base.Predicate;
import com.google.common.base.Stopwatch; import com.google.common.base.Stopwatch;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.google.common.io.Files; import com.google.common.io.Files;
@ -27,6 +28,7 @@ import com.metamx.common.Granularity;
import com.metamx.common.ISE; import com.metamx.common.ISE;
import com.metamx.common.exception.FormattedException; import com.metamx.common.exception.FormattedException;
import com.metamx.emitter.service.ServiceEmitter; import com.metamx.emitter.service.ServiceEmitter;
import io.druid.client.FilteredServerView;
import io.druid.client.ServerView; import io.druid.client.ServerView;
import io.druid.data.input.InputRow; import io.druid.data.input.InputRow;
import io.druid.data.input.impl.ParseSpec; import io.druid.data.input.impl.ParseSpec;
@ -48,11 +50,8 @@ import io.druid.segment.realtime.FireDepartmentMetrics;
import io.druid.segment.realtime.SegmentPublisher; import io.druid.segment.realtime.SegmentPublisher;
import io.druid.server.coordination.DataSegmentAnnouncer; import io.druid.server.coordination.DataSegmentAnnouncer;
import io.druid.timeline.DataSegment; import io.druid.timeline.DataSegment;
import junit.framework.Assert;
import org.apache.commons.lang.mutable.MutableBoolean; import org.apache.commons.lang.mutable.MutableBoolean;
import org.easymock.EasyMock; import org.easymock.EasyMock;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.joda.time.Period; import org.joda.time.Period;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
@ -71,7 +70,7 @@ public class RealtimePlumberSchoolTest
private DataSegmentAnnouncer announcer; private DataSegmentAnnouncer announcer;
private SegmentPublisher segmentPublisher; private SegmentPublisher segmentPublisher;
private DataSegmentPusher dataSegmentPusher; private DataSegmentPusher dataSegmentPusher;
private ServerView serverView; private FilteredServerView serverView;
private ServiceEmitter emitter; private ServiceEmitter emitter;
@Before @Before
@ -114,10 +113,11 @@ public class RealtimePlumberSchoolTest
segmentPublisher = EasyMock.createMock(SegmentPublisher.class); segmentPublisher = EasyMock.createMock(SegmentPublisher.class);
dataSegmentPusher = EasyMock.createMock(DataSegmentPusher.class); dataSegmentPusher = EasyMock.createMock(DataSegmentPusher.class);
serverView = EasyMock.createMock(ServerView.class); serverView = EasyMock.createMock(FilteredServerView.class);
serverView.registerSegmentCallback( serverView.registerSegmentCallback(
EasyMock.<Executor>anyObject(), EasyMock.<Executor>anyObject(),
EasyMock.<ServerView.SegmentCallback>anyObject() EasyMock.<ServerView.SegmentCallback>anyObject(),
EasyMock.<Predicate<DataSegment>>anyObject()
); );
EasyMock.expectLastCall().anyTimes(); EasyMock.expectLastCall().anyTimes();