mirror of https://github.com/apache/druid.git
Merge pull request #571 from metamx/filtered-serverview
FiltereServerView
This commit is contained in:
commit
af40b0c01d
|
@ -27,7 +27,7 @@ import com.google.common.collect.Multimap;
|
|||
import com.google.common.collect.Multimaps;
|
||||
import com.metamx.emitter.service.ServiceEmitter;
|
||||
import com.metamx.metrics.MonitorScheduler;
|
||||
import io.druid.client.ServerView;
|
||||
import io.druid.client.FilteredServerView;
|
||||
import io.druid.indexing.common.actions.SegmentInsertAction;
|
||||
import io.druid.indexing.common.actions.TaskActionClient;
|
||||
import io.druid.indexing.common.actions.TaskActionClientFactory;
|
||||
|
@ -66,7 +66,7 @@ public class TaskToolbox
|
|||
private final DataSegmentArchiver dataSegmentArchiver;
|
||||
private final DataSegmentMover dataSegmentMover;
|
||||
private final DataSegmentAnnouncer segmentAnnouncer;
|
||||
private final ServerView newSegmentServerView;
|
||||
private final FilteredServerView newSegmentServerView;
|
||||
private final QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate;
|
||||
private final MonitorScheduler monitorScheduler;
|
||||
private final ExecutorService queryExecutorService;
|
||||
|
@ -84,7 +84,7 @@ public class TaskToolbox
|
|||
DataSegmentMover dataSegmentMover,
|
||||
DataSegmentArchiver dataSegmentArchiver,
|
||||
DataSegmentAnnouncer segmentAnnouncer,
|
||||
ServerView newSegmentServerView,
|
||||
FilteredServerView newSegmentServerView,
|
||||
QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate,
|
||||
ExecutorService queryExecutorService,
|
||||
MonitorScheduler monitorScheduler,
|
||||
|
@ -151,7 +151,7 @@ public class TaskToolbox
|
|||
return segmentAnnouncer;
|
||||
}
|
||||
|
||||
public ServerView getNewSegmentServerView()
|
||||
public FilteredServerView getNewSegmentServerView()
|
||||
{
|
||||
return newSegmentServerView;
|
||||
}
|
||||
|
|
|
@ -23,7 +23,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
|
|||
import com.google.inject.Inject;
|
||||
import com.metamx.emitter.service.ServiceEmitter;
|
||||
import com.metamx.metrics.MonitorScheduler;
|
||||
import io.druid.client.ServerView;
|
||||
import io.druid.client.FilteredServerView;
|
||||
import io.druid.guice.annotations.Processing;
|
||||
import io.druid.indexing.common.actions.TaskActionClientFactory;
|
||||
import io.druid.indexing.common.config.TaskConfig;
|
||||
|
@ -51,7 +51,7 @@ public class TaskToolboxFactory
|
|||
private final DataSegmentMover dataSegmentMover;
|
||||
private final DataSegmentArchiver dataSegmentArchiver;
|
||||
private final DataSegmentAnnouncer segmentAnnouncer;
|
||||
private final ServerView newSegmentServerView;
|
||||
private final FilteredServerView newSegmentServerView;
|
||||
private final QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate;
|
||||
private final ExecutorService queryExecutorService;
|
||||
private final MonitorScheduler monitorScheduler;
|
||||
|
@ -68,7 +68,7 @@ public class TaskToolboxFactory
|
|||
DataSegmentMover dataSegmentMover,
|
||||
DataSegmentArchiver dataSegmentArchiver,
|
||||
DataSegmentAnnouncer segmentAnnouncer,
|
||||
ServerView newSegmentServerView,
|
||||
FilteredServerView newSegmentServerView,
|
||||
QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate,
|
||||
@Processing ExecutorService queryExecutorService,
|
||||
MonitorScheduler monitorScheduler,
|
||||
|
|
|
@ -55,7 +55,6 @@ import io.druid.segment.realtime.SegmentPublisher;
|
|||
import io.druid.segment.realtime.plumber.Plumber;
|
||||
import io.druid.segment.realtime.plumber.RealtimePlumberSchool;
|
||||
import io.druid.segment.realtime.plumber.RejectionPolicyFactory;
|
||||
import io.druid.segment.realtime.plumber.Sink;
|
||||
import io.druid.segment.realtime.plumber.VersioningPolicy;
|
||||
import io.druid.server.coordination.DataSegmentAnnouncer;
|
||||
import io.druid.timeline.DataSegment;
|
||||
|
|
|
@ -21,33 +21,41 @@ package io.druid.client;
|
|||
|
||||
import com.fasterxml.jackson.core.type.TypeReference;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.api.client.repackaged.com.google.common.base.Preconditions;
|
||||
import com.google.common.base.Predicate;
|
||||
import com.google.common.base.Predicates;
|
||||
import com.google.common.collect.MapMaker;
|
||||
import com.google.common.collect.Sets;
|
||||
import com.google.inject.Inject;
|
||||
import com.metamx.common.ISE;
|
||||
import com.metamx.emitter.EmittingLogger;
|
||||
import io.druid.guice.ManageLifecycle;
|
||||
import io.druid.server.coordination.DruidServerMetadata;
|
||||
import io.druid.server.initialization.ZkPathsConfig;
|
||||
import io.druid.timeline.DataSegment;
|
||||
import org.apache.curator.framework.CuratorFramework;
|
||||
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.Executor;
|
||||
|
||||
/**
|
||||
*/
|
||||
@ManageLifecycle
|
||||
public class BatchServerInventoryView extends ServerInventoryView<Set<DataSegment>>
|
||||
public class BatchServerInventoryView extends ServerInventoryView<Set<DataSegment>> implements FilteredServerView
|
||||
{
|
||||
private static final EmittingLogger log = new EmittingLogger(BatchServerInventoryView.class);
|
||||
|
||||
final ConcurrentMap<String, Set<DataSegment>> zNodes = new MapMaker().makeMap();
|
||||
final private ConcurrentMap<String, Set<DataSegment>> zNodes = new MapMaker().makeMap();
|
||||
final private ConcurrentMap<SegmentCallback, Predicate<DataSegment>> segmentPredicates = new MapMaker().makeMap();
|
||||
final private Predicate<DataSegment> defaultFilter;
|
||||
|
||||
@Inject
|
||||
public BatchServerInventoryView(
|
||||
final ZkPathsConfig zkPaths,
|
||||
final CuratorFramework curator,
|
||||
final ObjectMapper jsonMapper
|
||||
final ObjectMapper jsonMapper,
|
||||
final Predicate<DataSegment> defaultFilter
|
||||
)
|
||||
{
|
||||
super(
|
||||
|
@ -58,6 +66,9 @@ public class BatchServerInventoryView extends ServerInventoryView<Set<DataSegmen
|
|||
jsonMapper,
|
||||
new TypeReference<Set<DataSegment>>(){}
|
||||
);
|
||||
|
||||
Preconditions.checkNotNull(defaultFilter);
|
||||
this.defaultFilter = defaultFilter;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -67,8 +78,11 @@ public class BatchServerInventoryView extends ServerInventoryView<Set<DataSegmen
|
|||
final Set<DataSegment> inventory
|
||||
)
|
||||
{
|
||||
zNodes.put(inventoryKey, inventory);
|
||||
for (DataSegment segment : inventory) {
|
||||
Predicate<DataSegment> predicate = Predicates.or(defaultFilter, Predicates.or(segmentPredicates.values()));
|
||||
Set<DataSegment> filteredInventory = Sets.filter(inventory, predicate);
|
||||
|
||||
zNodes.put(inventoryKey, filteredInventory);
|
||||
for (DataSegment segment : filteredInventory) {
|
||||
addSingleInventory(container, segment);
|
||||
}
|
||||
return container;
|
||||
|
@ -79,18 +93,21 @@ public class BatchServerInventoryView extends ServerInventoryView<Set<DataSegmen
|
|||
DruidServer container, String inventoryKey, Set<DataSegment> inventory
|
||||
)
|
||||
{
|
||||
Predicate<DataSegment> predicate = Predicates.or(defaultFilter, Predicates.or(segmentPredicates.values()));
|
||||
Set<DataSegment> filteredInventory = Sets.filter(inventory, predicate);
|
||||
|
||||
Set<DataSegment> existing = zNodes.get(inventoryKey);
|
||||
if (existing == null) {
|
||||
throw new ISE("Trying to update an inventoryKey[%s] that didn't exist?!", inventoryKey);
|
||||
}
|
||||
|
||||
for (DataSegment segment : Sets.difference(inventory, existing)) {
|
||||
for (DataSegment segment : Sets.difference(filteredInventory, existing)) {
|
||||
addSingleInventory(container, segment);
|
||||
}
|
||||
for (DataSegment segment : Sets.difference(existing, inventory)) {
|
||||
for (DataSegment segment : Sets.difference(existing, filteredInventory)) {
|
||||
removeSingleInventory(container, segment.getIdentifier());
|
||||
}
|
||||
zNodes.put(inventoryKey, inventory);
|
||||
zNodes.put(inventoryKey, filteredInventory);
|
||||
|
||||
return container;
|
||||
}
|
||||
|
@ -111,4 +128,50 @@ public class BatchServerInventoryView extends ServerInventoryView<Set<DataSegmen
|
|||
}
|
||||
return container;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void registerSegmentCallback(
|
||||
final Executor exec, final SegmentCallback callback, final Predicate<DataSegment> filter
|
||||
)
|
||||
{
|
||||
segmentPredicates.put(callback, filter);
|
||||
registerSegmentCallback(
|
||||
exec, new SegmentCallback()
|
||||
{
|
||||
@Override
|
||||
public CallbackAction segmentAdded(
|
||||
DruidServerMetadata server, DataSegment segment
|
||||
)
|
||||
{
|
||||
final CallbackAction action;
|
||||
if(filter.apply(segment)) {
|
||||
action = callback.segmentAdded(server, segment);
|
||||
if (action.equals(CallbackAction.UNREGISTER)) {
|
||||
segmentPredicates.remove(callback);
|
||||
}
|
||||
} else {
|
||||
action = CallbackAction.CONTINUE;
|
||||
}
|
||||
return action;
|
||||
}
|
||||
|
||||
@Override
|
||||
public CallbackAction segmentRemoved(
|
||||
DruidServerMetadata server, DataSegment segment
|
||||
)
|
||||
{
|
||||
final CallbackAction action;
|
||||
if(filter.apply(segment)) {
|
||||
action = callback.segmentRemoved(server, segment);
|
||||
if (action.equals(CallbackAction.UNREGISTER)) {
|
||||
segmentPredicates.remove(callback);
|
||||
}
|
||||
} else {
|
||||
action = CallbackAction.CONTINUE;
|
||||
}
|
||||
return action;
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -21,7 +21,9 @@ package io.druid.client;
|
|||
|
||||
import com.fasterxml.jackson.annotation.JacksonInject;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.base.Predicates;
|
||||
import io.druid.server.initialization.ZkPathsConfig;
|
||||
import io.druid.timeline.DataSegment;
|
||||
import org.apache.curator.framework.CuratorFramework;
|
||||
|
||||
import javax.validation.constraints.NotNull;
|
||||
|
@ -43,8 +45,8 @@ public class BatchServerInventoryViewProvider implements ServerInventoryViewProv
|
|||
private ObjectMapper jsonMapper = null;
|
||||
|
||||
@Override
|
||||
public ServerInventoryView get()
|
||||
public BatchServerInventoryView get()
|
||||
{
|
||||
return new BatchServerInventoryView(zkPaths, curator, jsonMapper);
|
||||
return new BatchServerInventoryView(zkPaths, curator, jsonMapper, Predicates.<DataSegment>alwaysTrue());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -36,6 +36,7 @@ import io.druid.query.QueryDataSource;
|
|||
import io.druid.query.QueryRunner;
|
||||
import io.druid.query.QueryToolChestWarehouse;
|
||||
import io.druid.query.TableDataSource;
|
||||
import io.druid.server.coordination.DruidServerMetadata;
|
||||
import io.druid.timeline.DataSegment;
|
||||
import io.druid.timeline.VersionedIntervalTimeline;
|
||||
import io.druid.timeline.partition.PartitionChunk;
|
||||
|
@ -61,7 +62,7 @@ public class BrokerServerView implements TimelineServerView
|
|||
private final QueryToolChestWarehouse warehouse;
|
||||
private final ObjectMapper smileMapper;
|
||||
private final HttpClient httpClient;
|
||||
private final ServerView baseView;
|
||||
private final ServerInventoryView baseView;
|
||||
private final TierSelectorStrategy tierSelectorStrategy;
|
||||
|
||||
@Inject
|
||||
|
@ -69,7 +70,7 @@ public class BrokerServerView implements TimelineServerView
|
|||
QueryToolChestWarehouse warehouse,
|
||||
ObjectMapper smileMapper,
|
||||
@Client HttpClient httpClient,
|
||||
ServerView baseView,
|
||||
ServerInventoryView baseView,
|
||||
TierSelectorStrategy tierSelectorStrategy
|
||||
)
|
||||
{
|
||||
|
@ -89,14 +90,14 @@ public class BrokerServerView implements TimelineServerView
|
|||
new ServerView.SegmentCallback()
|
||||
{
|
||||
@Override
|
||||
public ServerView.CallbackAction segmentAdded(DruidServer server, DataSegment segment)
|
||||
public ServerView.CallbackAction segmentAdded(DruidServerMetadata server, DataSegment segment)
|
||||
{
|
||||
serverAddedSegment(server, segment);
|
||||
return ServerView.CallbackAction.CONTINUE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ServerView.CallbackAction segmentRemoved(final DruidServer server, DataSegment segment)
|
||||
public ServerView.CallbackAction segmentRemoved(final DruidServerMetadata server, DataSegment segment)
|
||||
{
|
||||
serverRemovedSegment(server, segment);
|
||||
return ServerView.CallbackAction.CONTINUE;
|
||||
|
@ -159,12 +160,12 @@ public class BrokerServerView implements TimelineServerView
|
|||
private QueryableDruidServer removeServer(DruidServer server)
|
||||
{
|
||||
for (DataSegment segment : server.getSegments().values()) {
|
||||
serverRemovedSegment(server, segment);
|
||||
serverRemovedSegment(server.getMetadata(), segment);
|
||||
}
|
||||
return clients.remove(server.getName());
|
||||
}
|
||||
|
||||
private void serverAddedSegment(final DruidServer server, final DataSegment segment)
|
||||
private void serverAddedSegment(final DruidServerMetadata server, final DataSegment segment)
|
||||
{
|
||||
String segmentId = segment.getIdentifier();
|
||||
synchronized (lock) {
|
||||
|
@ -176,7 +177,7 @@ public class BrokerServerView implements TimelineServerView
|
|||
|
||||
VersionedIntervalTimeline<String, ServerSelector> timeline = timelines.get(segment.getDataSource());
|
||||
if (timeline == null) {
|
||||
timeline = new VersionedIntervalTimeline<String, ServerSelector>(Ordering.natural());
|
||||
timeline = new VersionedIntervalTimeline<>(Ordering.natural());
|
||||
timelines.put(segment.getDataSource(), timeline);
|
||||
}
|
||||
|
||||
|
@ -186,13 +187,13 @@ public class BrokerServerView implements TimelineServerView
|
|||
|
||||
QueryableDruidServer queryableDruidServer = clients.get(server.getName());
|
||||
if (queryableDruidServer == null) {
|
||||
queryableDruidServer = addServer(server);
|
||||
queryableDruidServer = addServer(baseView.getInventoryValue(server.getName()));
|
||||
}
|
||||
selector.addServer(queryableDruidServer);
|
||||
}
|
||||
}
|
||||
|
||||
private void serverRemovedSegment(DruidServer server, DataSegment segment)
|
||||
private void serverRemovedSegment(DruidServerMetadata server, DataSegment segment)
|
||||
{
|
||||
String segmentId = segment.getIdentifier();
|
||||
final ServerSelector selector;
|
||||
|
|
|
@ -55,6 +55,7 @@ import io.druid.query.Result;
|
|||
import io.druid.query.SegmentDescriptor;
|
||||
import io.druid.query.aggregation.MetricManipulatorFns;
|
||||
import io.druid.query.spec.MultipleSpecificSegmentSpec;
|
||||
import io.druid.server.coordination.DruidServerMetadata;
|
||||
import io.druid.timeline.DataSegment;
|
||||
import io.druid.timeline.TimelineObjectHolder;
|
||||
import io.druid.timeline.VersionedIntervalTimeline;
|
||||
|
@ -104,7 +105,7 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
|
|||
new ServerView.BaseSegmentCallback()
|
||||
{
|
||||
@Override
|
||||
public ServerView.CallbackAction segmentRemoved(DruidServer server, DataSegment segment)
|
||||
public ServerView.CallbackAction segmentRemoved(DruidServerMetadata server, DataSegment segment)
|
||||
{
|
||||
CachingClusteredClient.this.cache.close(segment.getIdentifier());
|
||||
return ServerView.CallbackAction.CONTINUE;
|
||||
|
|
|
@ -125,6 +125,11 @@ public class DruidServer implements Comparable
|
|||
return metadata.getTier();
|
||||
}
|
||||
|
||||
public boolean isAssignable()
|
||||
{
|
||||
return metadata.isAssignable();
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public int getPriority()
|
||||
{
|
||||
|
@ -138,11 +143,6 @@ public class DruidServer implements Comparable
|
|||
return Collections.unmodifiableMap(segments);
|
||||
}
|
||||
|
||||
public boolean isAssignable()
|
||||
{
|
||||
return getType().equalsIgnoreCase("historical") || getType().equalsIgnoreCase("bridge");
|
||||
}
|
||||
|
||||
public DataSegment getSegment(String segmentName)
|
||||
{
|
||||
return segments.get(segmentName);
|
||||
|
|
|
@ -0,0 +1,50 @@
|
|||
/*
|
||||
* Druid - a distributed column store.
|
||||
* Copyright (C) 2012, 2013, 2014 Metamarkets Group Inc.
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU General Public License
|
||||
* as published by the Free Software Foundation; either version 2
|
||||
* of the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package io.druid.client;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JacksonInject;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.base.Predicates;
|
||||
import io.druid.server.initialization.ZkPathsConfig;
|
||||
import io.druid.timeline.DataSegment;
|
||||
import org.apache.curator.framework.CuratorFramework;
|
||||
|
||||
import javax.validation.constraints.NotNull;
|
||||
|
||||
public class FilteredBatchServerViewProvider implements FilteredServerViewProvider
|
||||
{
|
||||
@JacksonInject
|
||||
@NotNull
|
||||
private ZkPathsConfig zkPaths = null;
|
||||
|
||||
@JacksonInject
|
||||
@NotNull
|
||||
private CuratorFramework curator = null;
|
||||
|
||||
@JacksonInject
|
||||
@NotNull
|
||||
private ObjectMapper jsonMapper = null;
|
||||
|
||||
@Override
|
||||
public BatchServerInventoryView get()
|
||||
{
|
||||
return new BatchServerInventoryView(zkPaths, curator, jsonMapper, Predicates.<DataSegment>alwaysFalse());
|
||||
}
|
||||
}
|
|
@ -0,0 +1,32 @@
|
|||
/*
|
||||
* Druid - a distributed column store.
|
||||
* Copyright (C) 2012, 2013, 2014 Metamarkets Group Inc.
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU General Public License
|
||||
* as published by the Free Software Foundation; either version 2
|
||||
* of the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package io.druid.client;
|
||||
|
||||
import com.google.common.base.Predicate;
|
||||
import io.druid.timeline.DataSegment;
|
||||
|
||||
import java.util.concurrent.Executor;
|
||||
|
||||
public interface FilteredServerView
|
||||
{
|
||||
public void registerSegmentCallback(
|
||||
Executor exec, ServerView.SegmentCallback callback, Predicate<DataSegment> filter
|
||||
);
|
||||
}
|
|
@ -0,0 +1,34 @@
|
|||
/*
|
||||
* Druid - a distributed column store.
|
||||
* Copyright (C) 2012, 2013, 2014 Metamarkets Group Inc.
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU General Public License
|
||||
* as published by the Free Software Foundation; either version 2
|
||||
* of the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package io.druid.client;
|
||||
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonSubTypes;
|
||||
import com.fasterxml.jackson.annotation.JsonTypeInfo;
|
||||
import com.google.inject.Provider;
|
||||
|
||||
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = SingleServerInventoryProvider.class)
|
||||
@JsonSubTypes(value = {
|
||||
@JsonSubTypes.Type(name = "legacy", value = FilteredSingleServerViewProvider.class),
|
||||
@JsonSubTypes.Type(name = "batch", value = FilteredBatchServerViewProvider.class)
|
||||
})
|
||||
public interface FilteredServerViewProvider extends Provider<FilteredServerView>
|
||||
{
|
||||
}
|
|
@ -0,0 +1,50 @@
|
|||
/*
|
||||
* Druid - a distributed column store.
|
||||
* Copyright (C) 2012, 2013, 2014 Metamarkets Group Inc.
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU General Public License
|
||||
* as published by the Free Software Foundation; either version 2
|
||||
* of the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package io.druid.client;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JacksonInject;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.base.Predicates;
|
||||
import io.druid.server.initialization.ZkPathsConfig;
|
||||
import io.druid.timeline.DataSegment;
|
||||
import org.apache.curator.framework.CuratorFramework;
|
||||
|
||||
import javax.validation.constraints.NotNull;
|
||||
|
||||
public class FilteredSingleServerViewProvider implements FilteredServerViewProvider
|
||||
{
|
||||
@JacksonInject
|
||||
@NotNull
|
||||
private ZkPathsConfig zkPaths = null;
|
||||
|
||||
@JacksonInject
|
||||
@NotNull
|
||||
private CuratorFramework curator = null;
|
||||
|
||||
@JacksonInject
|
||||
@NotNull
|
||||
private ObjectMapper jsonMapper = null;
|
||||
|
||||
@Override
|
||||
public SingleServerInventoryView get()
|
||||
{
|
||||
return new SingleServerInventoryView(zkPaths, curator, jsonMapper, Predicates.<DataSegment>alwaysFalse());
|
||||
}
|
||||
}
|
|
@ -290,7 +290,7 @@ public abstract class ServerInventoryView<InventoryType> implements ServerView,
|
|||
@Override
|
||||
public CallbackAction apply(SegmentCallback input)
|
||||
{
|
||||
return input.segmentAdded(container, inventory);
|
||||
return input.segmentAdded(container.getMetadata(), inventory);
|
||||
}
|
||||
}
|
||||
);
|
||||
|
@ -319,7 +319,7 @@ public abstract class ServerInventoryView<InventoryType> implements ServerView,
|
|||
@Override
|
||||
public CallbackAction apply(SegmentCallback input)
|
||||
{
|
||||
return input.segmentRemoved(container, segment);
|
||||
return input.segmentRemoved(container.getMetadata(), segment);
|
||||
}
|
||||
}
|
||||
);
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
|
||||
package io.druid.client;
|
||||
|
||||
import io.druid.server.coordination.DruidServerMetadata;
|
||||
import io.druid.timeline.DataSegment;
|
||||
|
||||
import java.util.concurrent.Executor;
|
||||
|
@ -72,7 +73,7 @@ public interface ServerView
|
|||
* @return UNREGISTER if the callback has completed its work and should be unregistered. CONTINUE if the callback
|
||||
* should remain registered.
|
||||
*/
|
||||
public CallbackAction segmentAdded(DruidServer server, DataSegment segment);
|
||||
public CallbackAction segmentAdded(DruidServerMetadata server, DataSegment segment);
|
||||
|
||||
/**
|
||||
* Called when a segment is removed from a server.
|
||||
|
@ -89,19 +90,19 @@ public interface ServerView
|
|||
* @return UNREGISTER if the callback has completed its work and should be unregistered. CONTINUE if the callback
|
||||
* should remain registered.
|
||||
*/
|
||||
public CallbackAction segmentRemoved(DruidServer server, DataSegment segment);
|
||||
public CallbackAction segmentRemoved(DruidServerMetadata server, DataSegment segment);
|
||||
}
|
||||
|
||||
public static abstract class BaseSegmentCallback implements SegmentCallback
|
||||
{
|
||||
@Override
|
||||
public CallbackAction segmentAdded(DruidServer server, DataSegment segment)
|
||||
public CallbackAction segmentAdded(DruidServerMetadata server, DataSegment segment)
|
||||
{
|
||||
return CallbackAction.CONTINUE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public CallbackAction segmentRemoved(DruidServer server, DataSegment segment)
|
||||
public CallbackAction segmentRemoved(DruidServerMetadata server, DataSegment segment)
|
||||
{
|
||||
return CallbackAction.CONTINUE;
|
||||
}
|
||||
|
|
|
@ -21,7 +21,9 @@ package io.druid.client;
|
|||
|
||||
import com.fasterxml.jackson.annotation.JacksonInject;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.base.Predicates;
|
||||
import io.druid.server.initialization.ZkPathsConfig;
|
||||
import io.druid.timeline.DataSegment;
|
||||
import org.apache.curator.framework.CuratorFramework;
|
||||
|
||||
import javax.validation.constraints.NotNull;
|
||||
|
@ -45,6 +47,6 @@ public class SingleServerInventoryProvider implements ServerInventoryViewProvide
|
|||
@Override
|
||||
public ServerInventoryView get()
|
||||
{
|
||||
return new SingleServerInventoryView(zkPaths, curator, jsonMapper);
|
||||
return new SingleServerInventoryView(zkPaths, curator, jsonMapper, Predicates.<DataSegment>alwaysTrue());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -21,25 +21,37 @@ package io.druid.client;
|
|||
|
||||
import com.fasterxml.jackson.core.type.TypeReference;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.api.client.repackaged.com.google.common.base.Preconditions;
|
||||
import com.google.common.base.Predicate;
|
||||
import com.google.common.base.Predicates;
|
||||
import com.google.common.collect.MapMaker;
|
||||
import com.google.inject.Inject;
|
||||
import com.metamx.emitter.EmittingLogger;
|
||||
import io.druid.guice.ManageLifecycle;
|
||||
import io.druid.server.coordination.DruidServerMetadata;
|
||||
import io.druid.server.initialization.ZkPathsConfig;
|
||||
import io.druid.timeline.DataSegment;
|
||||
import org.apache.curator.framework.CuratorFramework;
|
||||
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.Executor;
|
||||
|
||||
/**
|
||||
*/
|
||||
@ManageLifecycle
|
||||
public class SingleServerInventoryView extends ServerInventoryView<DataSegment>
|
||||
public class SingleServerInventoryView extends ServerInventoryView<DataSegment> implements FilteredServerView
|
||||
{
|
||||
private static final EmittingLogger log = new EmittingLogger(SingleServerInventoryView.class);
|
||||
|
||||
final private ConcurrentMap<SegmentCallback, Predicate<DataSegment>> segmentPredicates = new MapMaker().makeMap();
|
||||
private final Predicate<DataSegment> defaultFilter;
|
||||
|
||||
@Inject
|
||||
public SingleServerInventoryView(
|
||||
final ZkPathsConfig zkPaths,
|
||||
final CuratorFramework curator,
|
||||
final ObjectMapper jsonMapper
|
||||
final ObjectMapper jsonMapper,
|
||||
final Predicate<DataSegment> defaultFilter
|
||||
)
|
||||
{
|
||||
super(
|
||||
|
@ -50,6 +62,9 @@ public class SingleServerInventoryView extends ServerInventoryView<DataSegment>
|
|||
jsonMapper,
|
||||
new TypeReference<DataSegment>(){}
|
||||
);
|
||||
|
||||
Preconditions.checkNotNull(defaultFilter);
|
||||
this.defaultFilter = defaultFilter;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -57,7 +72,10 @@ public class SingleServerInventoryView extends ServerInventoryView<DataSegment>
|
|||
DruidServer container, String inventoryKey, DataSegment inventory
|
||||
)
|
||||
{
|
||||
addSingleInventory(container, inventory);
|
||||
Predicate<DataSegment> predicate = Predicates.or(defaultFilter, Predicates.or(segmentPredicates.values()));
|
||||
if(predicate.apply(inventory)) {
|
||||
addSingleInventory(container, inventory);
|
||||
}
|
||||
return container;
|
||||
}
|
||||
|
||||
|
@ -75,4 +93,52 @@ public class SingleServerInventoryView extends ServerInventoryView<DataSegment>
|
|||
removeSingleInventory(container, inventoryKey);
|
||||
return container;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void registerSegmentCallback(
|
||||
final Executor exec, final SegmentCallback callback, final Predicate<DataSegment> filter
|
||||
)
|
||||
{
|
||||
segmentPredicates.put(callback, filter);
|
||||
registerSegmentCallback(
|
||||
exec, new SegmentCallback()
|
||||
{
|
||||
@Override
|
||||
public CallbackAction segmentAdded(
|
||||
DruidServerMetadata server, DataSegment segment
|
||||
)
|
||||
{
|
||||
final CallbackAction action;
|
||||
if(filter.apply(segment)) {
|
||||
action = callback.segmentAdded(server, segment);
|
||||
if (action.equals(CallbackAction.UNREGISTER)) {
|
||||
segmentPredicates.remove(callback);
|
||||
}
|
||||
} else {
|
||||
action = CallbackAction.CONTINUE;
|
||||
}
|
||||
return action;
|
||||
}
|
||||
|
||||
@Override
|
||||
public CallbackAction segmentRemoved(
|
||||
DruidServerMetadata server, DataSegment segment
|
||||
)
|
||||
{
|
||||
{
|
||||
final CallbackAction action;
|
||||
if(filter.apply(segment)) {
|
||||
action = callback.segmentRemoved(server, segment);
|
||||
if (action.equals(CallbackAction.UNREGISTER)) {
|
||||
segmentPredicates.remove(callback);
|
||||
}
|
||||
} else {
|
||||
action = CallbackAction.CONTINUE;
|
||||
}
|
||||
return action;
|
||||
}
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -21,6 +21,8 @@ package io.druid.guice;
|
|||
|
||||
import com.google.inject.Binder;
|
||||
import com.google.inject.Module;
|
||||
import io.druid.client.FilteredServerView;
|
||||
import io.druid.client.FilteredServerViewProvider;
|
||||
import io.druid.client.InventoryView;
|
||||
import io.druid.client.ServerInventoryView;
|
||||
import io.druid.client.ServerInventoryViewProvider;
|
||||
|
@ -34,8 +36,10 @@ public class ServerViewModule implements Module
|
|||
public void configure(Binder binder)
|
||||
{
|
||||
JsonConfigProvider.bind(binder, "druid.announcer", ServerInventoryViewProvider.class);
|
||||
JsonConfigProvider.bind(binder, "druid.announcer", FilteredServerViewProvider.class);
|
||||
binder.bind(InventoryView.class).to(ServerInventoryView.class);
|
||||
binder.bind(ServerView.class).to(ServerInventoryView.class);
|
||||
binder.bind(ServerInventoryView.class).toProvider(ServerInventoryViewProvider.class).in(ManageLifecycle.class);
|
||||
binder.bind(FilteredServerView.class).toProvider(FilteredServerViewProvider.class).in(ManageLifecycle.class);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -2,6 +2,7 @@ package io.druid.segment.realtime.plumber;
|
|||
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.base.Joiner;
|
||||
import com.google.common.base.Predicate;
|
||||
import com.google.common.base.Throwables;
|
||||
import com.google.common.collect.Iterables;
|
||||
import com.google.common.collect.Lists;
|
||||
|
@ -16,7 +17,7 @@ import com.metamx.common.guava.FunctionalIterable;
|
|||
import com.metamx.emitter.EmittingLogger;
|
||||
import com.metamx.emitter.service.ServiceEmitter;
|
||||
import com.metamx.emitter.service.ServiceMetricEvent;
|
||||
import io.druid.client.DruidServer;
|
||||
import io.druid.client.FilteredServerView;
|
||||
import io.druid.client.ServerView;
|
||||
import io.druid.common.guava.ThreadRenamingCallable;
|
||||
import io.druid.common.guava.ThreadRenamingRunnable;
|
||||
|
@ -43,6 +44,7 @@ import io.druid.segment.realtime.FireDepartmentMetrics;
|
|||
import io.druid.segment.realtime.FireHydrant;
|
||||
import io.druid.segment.realtime.SegmentPublisher;
|
||||
import io.druid.server.coordination.DataSegmentAnnouncer;
|
||||
import io.druid.server.coordination.DruidServerMetadata;
|
||||
import io.druid.timeline.DataSegment;
|
||||
import io.druid.timeline.TimelineObjectHolder;
|
||||
import io.druid.timeline.VersionedIntervalTimeline;
|
||||
|
@ -82,7 +84,7 @@ public class RealtimePlumber implements Plumber
|
|||
private final ExecutorService queryExecutorService;
|
||||
private final DataSegmentPusher dataSegmentPusher;
|
||||
private final SegmentPublisher segmentPublisher;
|
||||
private final ServerView serverView;
|
||||
private final FilteredServerView serverView;
|
||||
private final Object handoffCondition = new Object();
|
||||
private final Map<Long, Sink> sinks = Maps.newConcurrentMap();
|
||||
private final VersionedIntervalTimeline<String, Sink> sinkTimeline = new VersionedIntervalTimeline<String, Sink>(
|
||||
|
@ -104,7 +106,7 @@ public class RealtimePlumber implements Plumber
|
|||
ExecutorService queryExecutorService,
|
||||
DataSegmentPusher dataSegmentPusher,
|
||||
SegmentPublisher segmentPublisher,
|
||||
ServerView serverView
|
||||
FilteredServerView serverView
|
||||
)
|
||||
{
|
||||
this.schema = schema;
|
||||
|
@ -731,7 +733,7 @@ public class RealtimePlumber implements Plumber
|
|||
new ServerView.BaseSegmentCallback()
|
||||
{
|
||||
@Override
|
||||
public ServerView.CallbackAction segmentAdded(DruidServer server, DataSegment segment)
|
||||
public ServerView.CallbackAction segmentAdded(DruidServerMetadata server, DataSegment segment)
|
||||
{
|
||||
if (stopped) {
|
||||
log.info("Unregistering ServerViewCallback");
|
||||
|
@ -766,6 +768,26 @@ public class RealtimePlumber implements Plumber
|
|||
|
||||
return ServerView.CallbackAction.CONTINUE;
|
||||
}
|
||||
},
|
||||
new Predicate<DataSegment>()
|
||||
{
|
||||
@Override
|
||||
public boolean apply(final DataSegment segment)
|
||||
{
|
||||
return
|
||||
schema.getDataSource().equals(segment.getDataSource())
|
||||
&& config.getShardSpec().getPartitionNum() == segment.getShardSpec().getPartitionNum()
|
||||
&& Iterables.any(
|
||||
sinks.keySet(), new Predicate<Long>()
|
||||
{
|
||||
@Override
|
||||
public boolean apply(Long sinkKey)
|
||||
{
|
||||
return segment.getInterval().contains(sinkKey);
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
|
|
|
@ -25,6 +25,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
|
|||
import com.google.common.base.Preconditions;
|
||||
import com.metamx.common.Granularity;
|
||||
import com.metamx.emitter.service.ServiceEmitter;
|
||||
import io.druid.client.FilteredServerView;
|
||||
import io.druid.client.ServerView;
|
||||
import io.druid.guice.annotations.Processing;
|
||||
import io.druid.query.QueryRunnerFactoryConglomerate;
|
||||
|
@ -48,7 +49,7 @@ public class RealtimePlumberSchool implements PlumberSchool
|
|||
private final DataSegmentPusher dataSegmentPusher;
|
||||
private final DataSegmentAnnouncer segmentAnnouncer;
|
||||
private final SegmentPublisher segmentPublisher;
|
||||
private final ServerView serverView;
|
||||
private final FilteredServerView serverView;
|
||||
private final ExecutorService queryExecutorService;
|
||||
|
||||
// Backwards compatible
|
||||
|
@ -66,7 +67,7 @@ public class RealtimePlumberSchool implements PlumberSchool
|
|||
@JacksonInject DataSegmentPusher dataSegmentPusher,
|
||||
@JacksonInject DataSegmentAnnouncer segmentAnnouncer,
|
||||
@JacksonInject SegmentPublisher segmentPublisher,
|
||||
@JacksonInject ServerView serverView,
|
||||
@JacksonInject FilteredServerView serverView,
|
||||
@JacksonInject @Processing ExecutorService executorService,
|
||||
// Backwards compatible
|
||||
@JsonProperty("windowPeriod") Period windowPeriod,
|
||||
|
|
|
@ -92,7 +92,7 @@ public class BridgeZkCoordinator extends BaseZkCoordinator
|
|||
{
|
||||
@Override
|
||||
public ServerView.CallbackAction segmentAdded(
|
||||
DruidServer server, DataSegment theSegment
|
||||
DruidServerMetadata server, DataSegment theSegment
|
||||
)
|
||||
{
|
||||
if (theSegment.equals(segment)) {
|
||||
|
@ -118,7 +118,7 @@ public class BridgeZkCoordinator extends BaseZkCoordinator
|
|||
{
|
||||
@Override
|
||||
public ServerView.CallbackAction segmentRemoved(
|
||||
DruidServer server, DataSegment theSegment
|
||||
DruidServerMetadata server, DataSegment theSegment
|
||||
)
|
||||
{
|
||||
if (theSegment.equals(segment)) {
|
||||
|
|
|
@ -125,7 +125,7 @@ public class DruidClusterBridge
|
|||
{
|
||||
@Override
|
||||
public ServerView.CallbackAction segmentAdded(
|
||||
DruidServer server, DataSegment segment
|
||||
DruidServerMetadata server, DataSegment segment
|
||||
)
|
||||
{
|
||||
try {
|
||||
|
@ -147,7 +147,7 @@ public class DruidClusterBridge
|
|||
}
|
||||
|
||||
@Override
|
||||
public ServerView.CallbackAction segmentRemoved(DruidServer server, DataSegment segment)
|
||||
public ServerView.CallbackAction segmentRemoved(DruidServerMetadata server, DataSegment segment)
|
||||
{
|
||||
try {
|
||||
synchronized (lock) {
|
||||
|
@ -172,7 +172,7 @@ public class DruidClusterBridge
|
|||
{
|
||||
try {
|
||||
for (DataSegment dataSegment : server.getSegments().values()) {
|
||||
serverRemovedSegment(dataSegmentAnnouncer, dataSegment, server);
|
||||
serverRemovedSegment(dataSegmentAnnouncer, dataSegment, server.getMetadata());
|
||||
}
|
||||
}
|
||||
catch (Exception e) {
|
||||
|
@ -370,7 +370,7 @@ public class DruidClusterBridge
|
|||
}
|
||||
}
|
||||
|
||||
private void serverRemovedSegment(DataSegmentAnnouncer dataSegmentAnnouncer, DataSegment segment, DruidServer server)
|
||||
private void serverRemovedSegment(DataSegmentAnnouncer dataSegmentAnnouncer, DataSegment segment, DruidServerMetadata server)
|
||||
throws IOException
|
||||
{
|
||||
Integer count = segments.get(segment);
|
||||
|
|
|
@ -87,6 +87,11 @@ public class DruidServerMetadata
|
|||
return priority;
|
||||
}
|
||||
|
||||
public boolean isAssignable()
|
||||
{
|
||||
return getType().equalsIgnoreCase("historical") || getType().equalsIgnoreCase("bridge");
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o)
|
||||
{
|
||||
|
|
|
@ -20,7 +20,8 @@
|
|||
package io.druid.client.client;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.base.Joiner;
|
||||
import com.google.common.base.Predicate;
|
||||
import com.google.common.base.Predicates;
|
||||
import com.google.common.base.Stopwatch;
|
||||
import com.google.common.collect.Iterables;
|
||||
import com.google.common.collect.Sets;
|
||||
|
@ -28,6 +29,7 @@ import com.google.common.util.concurrent.MoreExecutors;
|
|||
import com.metamx.common.ISE;
|
||||
import io.druid.client.BatchServerInventoryView;
|
||||
import io.druid.client.DruidServer;
|
||||
import io.druid.client.ServerView;
|
||||
import io.druid.curator.PotentiallyGzippedCompressionProvider;
|
||||
import io.druid.curator.announcement.Announcer;
|
||||
import io.druid.jackson.DefaultObjectMapper;
|
||||
|
@ -36,17 +38,23 @@ import io.druid.server.coordination.DruidServerMetadata;
|
|||
import io.druid.server.initialization.BatchDataSegmentAnnouncerConfig;
|
||||
import io.druid.server.initialization.ZkPathsConfig;
|
||||
import io.druid.timeline.DataSegment;
|
||||
import junit.framework.Assert;
|
||||
import org.apache.curator.framework.CuratorFramework;
|
||||
import org.apache.curator.framework.CuratorFrameworkFactory;
|
||||
import org.apache.curator.retry.ExponentialBackoffRetry;
|
||||
import org.apache.curator.test.TestingCluster;
|
||||
import org.easymock.EasyMock;
|
||||
import org.easymock.LogicalOperator;
|
||||
import org.joda.time.DateTime;
|
||||
import org.joda.time.Interval;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.ExpectedException;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.util.Comparator;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
|
@ -55,6 +63,8 @@ import java.util.concurrent.TimeUnit;
|
|||
public class BatchServerInventoryViewTest
|
||||
{
|
||||
private static final String testBasePath = "/test";
|
||||
public static final DateTime SEGMENT_INTERVAL_START = new DateTime("2013-01-01");
|
||||
public static final int INITIAL_SEGMENTS = 100;
|
||||
|
||||
private TestingCluster testingCluster;
|
||||
private CuratorFramework cf;
|
||||
|
@ -63,6 +73,10 @@ public class BatchServerInventoryViewTest
|
|||
private BatchDataSegmentAnnouncer segmentAnnouncer;
|
||||
private Set<DataSegment> testSegments;
|
||||
private BatchServerInventoryView batchServerInventoryView;
|
||||
private BatchServerInventoryView filteredBatchServerInventoryView;
|
||||
|
||||
@Rule
|
||||
public ExpectedException exception = ExpectedException.none();
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception
|
||||
|
@ -117,7 +131,7 @@ public class BatchServerInventoryViewTest
|
|||
segmentAnnouncer.start();
|
||||
|
||||
testSegments = Sets.newHashSet();
|
||||
for (int i = 0; i < 100; i++) {
|
||||
for (int i = 0; i < INITIAL_SEGMENTS; i++) {
|
||||
testSegments.add(makeSegment(i));
|
||||
}
|
||||
|
||||
|
@ -131,16 +145,41 @@ public class BatchServerInventoryViewTest
|
|||
}
|
||||
},
|
||||
cf,
|
||||
jsonMapper
|
||||
jsonMapper,
|
||||
Predicates.<DataSegment>alwaysTrue()
|
||||
);
|
||||
|
||||
batchServerInventoryView.start();
|
||||
|
||||
filteredBatchServerInventoryView = new BatchServerInventoryView(
|
||||
new ZkPathsConfig()
|
||||
{
|
||||
@Override
|
||||
public String getZkBasePath()
|
||||
{
|
||||
return testBasePath;
|
||||
}
|
||||
},
|
||||
cf,
|
||||
jsonMapper,
|
||||
new Predicate<DataSegment>()
|
||||
{
|
||||
@Override
|
||||
public boolean apply(@Nullable DataSegment dataSegment)
|
||||
{
|
||||
return dataSegment.getInterval().getStart().isBefore(SEGMENT_INTERVAL_START.plusDays(INITIAL_SEGMENTS));
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
filteredBatchServerInventoryView.start();
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() throws Exception
|
||||
{
|
||||
batchServerInventoryView.stop();
|
||||
filteredBatchServerInventoryView.stop();
|
||||
segmentAnnouncer.stop();
|
||||
announcer.stop();
|
||||
cf.close();
|
||||
|
@ -152,7 +191,7 @@ public class BatchServerInventoryViewTest
|
|||
{
|
||||
segmentAnnouncer.announceSegments(testSegments);
|
||||
|
||||
waitForSync();
|
||||
waitForSync(batchServerInventoryView, testSegments);
|
||||
|
||||
DruidServer server = Iterables.get(batchServerInventoryView.getInventory(), 0);
|
||||
Set<DataSegment> segments = Sets.newHashSet(server.getSegments().values());
|
||||
|
@ -167,7 +206,7 @@ public class BatchServerInventoryViewTest
|
|||
testSegments.add(segment1);
|
||||
testSegments.add(segment2);
|
||||
|
||||
waitForSync();
|
||||
waitForSync(batchServerInventoryView, testSegments);
|
||||
|
||||
Assert.assertEquals(testSegments, Sets.newHashSet(server.getSegments().values()));
|
||||
|
||||
|
@ -176,32 +215,131 @@ public class BatchServerInventoryViewTest
|
|||
testSegments.remove(segment1);
|
||||
testSegments.remove(segment2);
|
||||
|
||||
waitForSync();
|
||||
waitForSync(batchServerInventoryView, testSegments);
|
||||
|
||||
Assert.assertEquals(testSegments, Sets.newHashSet(server.getSegments().values()));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRunWithFilter() throws Exception
|
||||
{
|
||||
segmentAnnouncer.announceSegments(testSegments);
|
||||
|
||||
waitForSync(filteredBatchServerInventoryView, testSegments);
|
||||
|
||||
DruidServer server = Iterables.get(filteredBatchServerInventoryView.getInventory(), 0);
|
||||
Set<DataSegment> segments = Sets.newHashSet(server.getSegments().values());
|
||||
|
||||
Assert.assertEquals(testSegments, segments);
|
||||
|
||||
// segment outside the range of default filter
|
||||
DataSegment segment1 = makeSegment(101);
|
||||
segmentAnnouncer.announceSegment(segment1);
|
||||
testSegments.add(segment1);
|
||||
|
||||
exception.expect(ISE.class);
|
||||
waitForSync(filteredBatchServerInventoryView, testSegments);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRunWithFilterCallback() throws Exception
|
||||
{
|
||||
segmentAnnouncer.announceSegments(testSegments);
|
||||
|
||||
waitForSync(filteredBatchServerInventoryView, testSegments);
|
||||
|
||||
DruidServer server = Iterables.get(filteredBatchServerInventoryView.getInventory(), 0);
|
||||
Set<DataSegment> segments = Sets.newHashSet(server.getSegments().values());
|
||||
|
||||
Assert.assertEquals(testSegments, segments);
|
||||
|
||||
ServerView.SegmentCallback callback = EasyMock.createStrictMock(ServerView.SegmentCallback.class);
|
||||
Comparator<DataSegment> dataSegmentComparator = new Comparator<DataSegment>()
|
||||
{
|
||||
@Override
|
||||
public int compare(DataSegment o1, DataSegment o2)
|
||||
{
|
||||
return o1.getInterval().equals(o2.getInterval()) ? 0 : -1;
|
||||
}
|
||||
};
|
||||
|
||||
EasyMock
|
||||
.expect(
|
||||
callback.segmentAdded(
|
||||
EasyMock.<DruidServerMetadata>anyObject(),
|
||||
EasyMock.cmp(makeSegment(INITIAL_SEGMENTS + 2), dataSegmentComparator, LogicalOperator.EQUAL)
|
||||
)
|
||||
)
|
||||
.andReturn(ServerView.CallbackAction.CONTINUE)
|
||||
.times(1);
|
||||
|
||||
EasyMock
|
||||
.expect(
|
||||
callback.segmentRemoved(
|
||||
EasyMock.<DruidServerMetadata>anyObject(),
|
||||
EasyMock.cmp(makeSegment(INITIAL_SEGMENTS + 2), dataSegmentComparator, LogicalOperator.EQUAL)
|
||||
)
|
||||
)
|
||||
.andReturn(ServerView.CallbackAction.CONTINUE)
|
||||
.times(1);
|
||||
|
||||
|
||||
EasyMock.replay(callback);
|
||||
|
||||
filteredBatchServerInventoryView.registerSegmentCallback(
|
||||
MoreExecutors.sameThreadExecutor(),
|
||||
callback,
|
||||
new Predicate<DataSegment>()
|
||||
{
|
||||
@Override
|
||||
public boolean apply(@Nullable DataSegment dataSegment)
|
||||
{
|
||||
return dataSegment.getInterval().getStart().equals(SEGMENT_INTERVAL_START.plusDays(INITIAL_SEGMENTS + 2));
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
DataSegment segment2 = makeSegment(INITIAL_SEGMENTS + 2);
|
||||
segmentAnnouncer.announceSegment(segment2);
|
||||
testSegments.add(segment2);
|
||||
|
||||
DataSegment oldSegment = makeSegment(-1);
|
||||
segmentAnnouncer.announceSegment(oldSegment);
|
||||
testSegments.add(oldSegment);
|
||||
|
||||
segmentAnnouncer.unannounceSegment(oldSegment);
|
||||
testSegments.remove(oldSegment);
|
||||
|
||||
waitForSync(filteredBatchServerInventoryView, testSegments);
|
||||
|
||||
segmentAnnouncer.unannounceSegment(segment2);
|
||||
testSegments.remove(segment2);
|
||||
|
||||
waitForSync(filteredBatchServerInventoryView, testSegments);
|
||||
EasyMock.verify(callback);
|
||||
}
|
||||
|
||||
private DataSegment makeSegment(int offset)
|
||||
{
|
||||
return DataSegment.builder()
|
||||
.dataSource("foo")
|
||||
.interval(
|
||||
new Interval(
|
||||
new DateTime("2013-01-01").plusDays(offset),
|
||||
new DateTime("2013-01-02").plusDays(offset)
|
||||
SEGMENT_INTERVAL_START.plusDays(offset),
|
||||
SEGMENT_INTERVAL_START.plusDays(offset + 1)
|
||||
)
|
||||
)
|
||||
.version(new DateTime().toString())
|
||||
.build();
|
||||
}
|
||||
|
||||
private void waitForSync() throws Exception
|
||||
private static void waitForSync(BatchServerInventoryView batchServerInventoryView, Set<DataSegment> testSegments) throws Exception
|
||||
{
|
||||
Stopwatch stopwatch = new Stopwatch().start();
|
||||
while (Iterables.isEmpty(batchServerInventoryView.getInventory())
|
||||
|| Iterables.get(batchServerInventoryView.getInventory(), 0).getSegments().size() != testSegments.size()) {
|
||||
Thread.sleep(500);
|
||||
if (stopwatch.elapsed(TimeUnit.MILLISECONDS) > 5000) {
|
||||
if (stopwatch.elapsed(TimeUnit.MILLISECONDS) > 2000) {
|
||||
throw new ISE("BatchServerInventoryView is not updating");
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
|
||||
package io.druid.segment.realtime.plumber;
|
||||
|
||||
import com.google.common.base.Predicate;
|
||||
import com.google.common.base.Stopwatch;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.io.Files;
|
||||
|
@ -27,6 +28,7 @@ import com.metamx.common.Granularity;
|
|||
import com.metamx.common.ISE;
|
||||
import com.metamx.common.exception.FormattedException;
|
||||
import com.metamx.emitter.service.ServiceEmitter;
|
||||
import io.druid.client.FilteredServerView;
|
||||
import io.druid.client.ServerView;
|
||||
import io.druid.data.input.InputRow;
|
||||
import io.druid.data.input.impl.ParseSpec;
|
||||
|
@ -48,11 +50,8 @@ import io.druid.segment.realtime.FireDepartmentMetrics;
|
|||
import io.druid.segment.realtime.SegmentPublisher;
|
||||
import io.druid.server.coordination.DataSegmentAnnouncer;
|
||||
import io.druid.timeline.DataSegment;
|
||||
import junit.framework.Assert;
|
||||
import org.apache.commons.lang.mutable.MutableBoolean;
|
||||
import org.easymock.EasyMock;
|
||||
import org.joda.time.DateTime;
|
||||
import org.joda.time.Interval;
|
||||
import org.joda.time.Period;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
|
@ -71,7 +70,7 @@ public class RealtimePlumberSchoolTest
|
|||
private DataSegmentAnnouncer announcer;
|
||||
private SegmentPublisher segmentPublisher;
|
||||
private DataSegmentPusher dataSegmentPusher;
|
||||
private ServerView serverView;
|
||||
private FilteredServerView serverView;
|
||||
private ServiceEmitter emitter;
|
||||
|
||||
@Before
|
||||
|
@ -114,10 +113,11 @@ public class RealtimePlumberSchoolTest
|
|||
segmentPublisher = EasyMock.createMock(SegmentPublisher.class);
|
||||
dataSegmentPusher = EasyMock.createMock(DataSegmentPusher.class);
|
||||
|
||||
serverView = EasyMock.createMock(ServerView.class);
|
||||
serverView = EasyMock.createMock(FilteredServerView.class);
|
||||
serverView.registerSegmentCallback(
|
||||
EasyMock.<Executor>anyObject(),
|
||||
EasyMock.<ServerView.SegmentCallback>anyObject()
|
||||
EasyMock.<ServerView.SegmentCallback>anyObject(),
|
||||
EasyMock.<Predicate<DataSegment>>anyObject()
|
||||
);
|
||||
EasyMock.expectLastCall().anyTimes();
|
||||
|
||||
|
|
Loading…
Reference in New Issue