mirror of https://github.com/apache/druid.git
Remove FilteredServerView
This commit is contained in:
parent
9491e8de3b
commit
a32906c7fd
|
@ -28,12 +28,10 @@ import com.google.common.collect.Multimap;
|
|||
import com.google.common.collect.Multimaps;
|
||||
import com.metamx.emitter.service.ServiceEmitter;
|
||||
import com.metamx.metrics.MonitorScheduler;
|
||||
import io.druid.client.FilteredServerView;
|
||||
import io.druid.client.cache.Cache;
|
||||
import io.druid.client.cache.CacheConfig;
|
||||
import io.druid.indexing.common.actions.SegmentInsertAction;
|
||||
import io.druid.indexing.common.actions.TaskActionClient;
|
||||
import io.druid.indexing.common.actions.TaskActionClientFactory;
|
||||
import io.druid.indexing.common.config.TaskConfig;
|
||||
import io.druid.indexing.common.task.Task;
|
||||
import io.druid.query.QueryRunnerFactoryConglomerate;
|
||||
|
|
|
@ -19,34 +19,21 @@
|
|||
|
||||
package io.druid.indexing.test;
|
||||
|
||||
import com.google.api.client.util.Lists;
|
||||
import com.google.common.base.Predicate;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.metamx.common.Pair;
|
||||
import io.druid.client.FilteredServerView;
|
||||
import io.druid.client.ServerView;
|
||||
import io.druid.server.coordination.DruidServerMetadata;
|
||||
import io.druid.timeline.DataSegment;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.Executor;
|
||||
|
||||
public class TestServerView implements FilteredServerView, ServerView.SegmentCallback
|
||||
public class TestServerView implements ServerView.SegmentCallback
|
||||
{
|
||||
final ConcurrentMap<ServerView.SegmentCallback, Pair<Predicate<DataSegment>, Executor>> callbacks = Maps.newConcurrentMap();
|
||||
|
||||
@Override
|
||||
public void registerSegmentCallback(
|
||||
final Executor exec,
|
||||
final ServerView.SegmentCallback callback,
|
||||
final Predicate<DataSegment> filter
|
||||
)
|
||||
{
|
||||
callbacks.put(callback, Pair.of(filter, exec));
|
||||
}
|
||||
|
||||
@Override
|
||||
public ServerView.CallbackAction segmentAdded(
|
||||
final DruidServerMetadata server,
|
||||
|
|
|
@ -21,42 +21,33 @@ package io.druid.client;
|
|||
|
||||
import com.fasterxml.jackson.core.type.TypeReference;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.base.Predicate;
|
||||
import com.google.common.base.Predicates;
|
||||
import com.google.common.collect.Iterables;
|
||||
import com.google.common.collect.MapMaker;
|
||||
import com.google.common.collect.Sets;
|
||||
import com.google.inject.Inject;
|
||||
import com.metamx.common.ISE;
|
||||
import com.metamx.emitter.EmittingLogger;
|
||||
import io.druid.guice.ManageLifecycle;
|
||||
import io.druid.server.coordination.DruidServerMetadata;
|
||||
import io.druid.server.initialization.ZkPathsConfig;
|
||||
import io.druid.timeline.DataSegment;
|
||||
import org.apache.curator.framework.CuratorFramework;
|
||||
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.Executor;
|
||||
|
||||
/**
|
||||
*/
|
||||
@ManageLifecycle
|
||||
public class BatchServerInventoryView extends ServerInventoryView<Set<DataSegment>> implements FilteredServerView
|
||||
public class BatchServerInventoryView extends ServerInventoryView<Set<DataSegment>>
|
||||
{
|
||||
private static final EmittingLogger log = new EmittingLogger(BatchServerInventoryView.class);
|
||||
|
||||
final private ConcurrentMap<String, Set<DataSegment>> zNodes = new MapMaker().makeMap();
|
||||
final private ConcurrentMap<SegmentCallback, Predicate<DataSegment>> segmentPredicates = new MapMaker().makeMap();
|
||||
final private Predicate<DataSegment> defaultFilter;
|
||||
|
||||
@Inject
|
||||
public BatchServerInventoryView(
|
||||
final ZkPathsConfig zkPaths,
|
||||
final CuratorFramework curator,
|
||||
final ObjectMapper jsonMapper,
|
||||
final Predicate<DataSegment> defaultFilter
|
||||
final ObjectMapper jsonMapper
|
||||
)
|
||||
{
|
||||
super(
|
||||
|
@ -65,11 +56,10 @@ public class BatchServerInventoryView extends ServerInventoryView<Set<DataSegmen
|
|||
zkPaths.getLiveSegmentsPath(),
|
||||
curator,
|
||||
jsonMapper,
|
||||
new TypeReference<Set<DataSegment>>(){}
|
||||
new TypeReference<Set<DataSegment>>()
|
||||
{
|
||||
}
|
||||
);
|
||||
|
||||
Preconditions.checkNotNull(defaultFilter);
|
||||
this.defaultFilter = defaultFilter;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -79,12 +69,8 @@ public class BatchServerInventoryView extends ServerInventoryView<Set<DataSegmen
|
|||
final Set<DataSegment> inventory
|
||||
)
|
||||
{
|
||||
Predicate<DataSegment> predicate = Predicates.or(defaultFilter, Predicates.or(segmentPredicates.values()));
|
||||
// make a copy of the set and not just a filtered view, in order to not keep all the segment data in memory
|
||||
Set<DataSegment> filteredInventory = Sets.newHashSet(Iterables.filter(inventory, predicate));
|
||||
|
||||
zNodes.put(inventoryKey, filteredInventory);
|
||||
for (DataSegment segment : filteredInventory) {
|
||||
zNodes.put(inventoryKey, inventory);
|
||||
for (DataSegment segment : inventory) {
|
||||
addSingleInventory(container, segment);
|
||||
}
|
||||
return container;
|
||||
|
@ -95,22 +81,18 @@ public class BatchServerInventoryView extends ServerInventoryView<Set<DataSegmen
|
|||
DruidServer container, String inventoryKey, Set<DataSegment> inventory
|
||||
)
|
||||
{
|
||||
Predicate<DataSegment> predicate = Predicates.or(defaultFilter, Predicates.or(segmentPredicates.values()));
|
||||
// make a copy of the set and not just a filtered view, in order to not keep all the segment data in memory
|
||||
Set<DataSegment> filteredInventory = Sets.newHashSet(Iterables.filter(inventory, predicate));
|
||||
|
||||
Set<DataSegment> existing = zNodes.get(inventoryKey);
|
||||
if (existing == null) {
|
||||
throw new ISE("Trying to update an inventoryKey[%s] that didn't exist?!", inventoryKey);
|
||||
}
|
||||
|
||||
for (DataSegment segment : Sets.difference(filteredInventory, existing)) {
|
||||
for (DataSegment segment : Sets.difference(inventory, existing)) {
|
||||
addSingleInventory(container, segment);
|
||||
}
|
||||
for (DataSegment segment : Sets.difference(existing, filteredInventory)) {
|
||||
for (DataSegment segment : Sets.difference(existing, inventory)) {
|
||||
removeSingleInventory(container, segment.getIdentifier());
|
||||
}
|
||||
zNodes.put(inventoryKey, filteredInventory);
|
||||
zNodes.put(inventoryKey, inventory);
|
||||
|
||||
return container;
|
||||
}
|
||||
|
@ -131,56 +113,4 @@ public class BatchServerInventoryView extends ServerInventoryView<Set<DataSegmen
|
|||
}
|
||||
return container;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void registerSegmentCallback(
|
||||
final Executor exec, final SegmentCallback callback, final Predicate<DataSegment> filter
|
||||
)
|
||||
{
|
||||
segmentPredicates.put(callback, filter);
|
||||
registerSegmentCallback(
|
||||
exec, new SegmentCallback()
|
||||
{
|
||||
@Override
|
||||
public CallbackAction segmentAdded(
|
||||
DruidServerMetadata server, DataSegment segment
|
||||
)
|
||||
{
|
||||
final CallbackAction action;
|
||||
if(filter.apply(segment)) {
|
||||
action = callback.segmentAdded(server, segment);
|
||||
if (action.equals(CallbackAction.UNREGISTER)) {
|
||||
segmentPredicates.remove(callback);
|
||||
}
|
||||
} else {
|
||||
action = CallbackAction.CONTINUE;
|
||||
}
|
||||
return action;
|
||||
}
|
||||
|
||||
@Override
|
||||
public CallbackAction segmentRemoved(
|
||||
DruidServerMetadata server, DataSegment segment
|
||||
)
|
||||
{
|
||||
final CallbackAction action;
|
||||
if(filter.apply(segment)) {
|
||||
action = callback.segmentRemoved(server, segment);
|
||||
if (action.equals(CallbackAction.UNREGISTER)) {
|
||||
segmentPredicates.remove(callback);
|
||||
}
|
||||
} else {
|
||||
action = CallbackAction.CONTINUE;
|
||||
}
|
||||
return action;
|
||||
}
|
||||
|
||||
@Override
|
||||
public CallbackAction segmentViewInitialized()
|
||||
{
|
||||
return callback.segmentViewInitialized();
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -47,6 +47,6 @@ public class BatchServerInventoryViewProvider implements ServerInventoryViewProv
|
|||
@Override
|
||||
public BatchServerInventoryView get()
|
||||
{
|
||||
return new BatchServerInventoryView(zkPaths, curator, jsonMapper, Predicates.<DataSegment>alwaysTrue());
|
||||
return new BatchServerInventoryView(zkPaths, curator, jsonMapper);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,50 +0,0 @@
|
|||
/*
|
||||
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Metamarkets licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package io.druid.client;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JacksonInject;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.base.Predicates;
|
||||
import io.druid.server.initialization.ZkPathsConfig;
|
||||
import io.druid.timeline.DataSegment;
|
||||
import org.apache.curator.framework.CuratorFramework;
|
||||
|
||||
import javax.validation.constraints.NotNull;
|
||||
|
||||
public class FilteredBatchServerViewProvider implements FilteredServerViewProvider
|
||||
{
|
||||
@JacksonInject
|
||||
@NotNull
|
||||
private ZkPathsConfig zkPaths = null;
|
||||
|
||||
@JacksonInject
|
||||
@NotNull
|
||||
private CuratorFramework curator = null;
|
||||
|
||||
@JacksonInject
|
||||
@NotNull
|
||||
private ObjectMapper jsonMapper = null;
|
||||
|
||||
@Override
|
||||
public BatchServerInventoryView get()
|
||||
{
|
||||
return new BatchServerInventoryView(zkPaths, curator, jsonMapper, Predicates.<DataSegment>alwaysFalse());
|
||||
}
|
||||
}
|
|
@ -1,32 +0,0 @@
|
|||
/*
|
||||
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Metamarkets licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package io.druid.client;
|
||||
|
||||
import com.google.common.base.Predicate;
|
||||
import io.druid.timeline.DataSegment;
|
||||
|
||||
import java.util.concurrent.Executor;
|
||||
|
||||
public interface FilteredServerView
|
||||
{
|
||||
public void registerSegmentCallback(
|
||||
Executor exec, ServerView.SegmentCallback callback, Predicate<DataSegment> filter
|
||||
);
|
||||
}
|
|
@ -1,34 +0,0 @@
|
|||
/*
|
||||
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Metamarkets licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package io.druid.client;
|
||||
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonSubTypes;
|
||||
import com.fasterxml.jackson.annotation.JsonTypeInfo;
|
||||
import com.google.inject.Provider;
|
||||
|
||||
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = FilteredBatchServerViewProvider.class)
|
||||
@JsonSubTypes(value = {
|
||||
@JsonSubTypes.Type(name = "legacy", value = FilteredSingleServerViewProvider.class),
|
||||
@JsonSubTypes.Type(name = "batch", value = FilteredBatchServerViewProvider.class)
|
||||
})
|
||||
public interface FilteredServerViewProvider extends Provider<FilteredServerView>
|
||||
{
|
||||
}
|
|
@ -1,50 +0,0 @@
|
|||
/*
|
||||
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Metamarkets licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package io.druid.client;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JacksonInject;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.base.Predicates;
|
||||
import io.druid.server.initialization.ZkPathsConfig;
|
||||
import io.druid.timeline.DataSegment;
|
||||
import org.apache.curator.framework.CuratorFramework;
|
||||
|
||||
import javax.validation.constraints.NotNull;
|
||||
|
||||
public class FilteredSingleServerViewProvider implements FilteredServerViewProvider
|
||||
{
|
||||
@JacksonInject
|
||||
@NotNull
|
||||
private ZkPathsConfig zkPaths = null;
|
||||
|
||||
@JacksonInject
|
||||
@NotNull
|
||||
private CuratorFramework curator = null;
|
||||
|
||||
@JacksonInject
|
||||
@NotNull
|
||||
private ObjectMapper jsonMapper = null;
|
||||
|
||||
@Override
|
||||
public SingleServerInventoryView get()
|
||||
{
|
||||
return new SingleServerInventoryView(zkPaths, curator, jsonMapper, Predicates.<DataSegment>alwaysFalse());
|
||||
}
|
||||
}
|
|
@ -47,6 +47,6 @@ public class SingleServerInventoryProvider implements ServerInventoryViewProvide
|
|||
@Override
|
||||
public ServerInventoryView get()
|
||||
{
|
||||
return new SingleServerInventoryView(zkPaths, curator, jsonMapper, Predicates.<DataSegment>alwaysTrue());
|
||||
return new SingleServerInventoryView(zkPaths, curator, jsonMapper);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -21,37 +21,25 @@ package io.druid.client;
|
|||
|
||||
import com.fasterxml.jackson.core.type.TypeReference;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.base.Predicate;
|
||||
import com.google.common.base.Predicates;
|
||||
import com.google.common.collect.MapMaker;
|
||||
import com.google.inject.Inject;
|
||||
import com.metamx.emitter.EmittingLogger;
|
||||
import io.druid.guice.ManageLifecycle;
|
||||
import io.druid.server.coordination.DruidServerMetadata;
|
||||
import io.druid.server.initialization.ZkPathsConfig;
|
||||
import io.druid.timeline.DataSegment;
|
||||
import org.apache.curator.framework.CuratorFramework;
|
||||
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.Executor;
|
||||
|
||||
/**
|
||||
*/
|
||||
@ManageLifecycle
|
||||
public class SingleServerInventoryView extends ServerInventoryView<DataSegment> implements FilteredServerView
|
||||
public class SingleServerInventoryView extends ServerInventoryView<DataSegment>
|
||||
{
|
||||
private static final EmittingLogger log = new EmittingLogger(SingleServerInventoryView.class);
|
||||
|
||||
final private ConcurrentMap<SegmentCallback, Predicate<DataSegment>> segmentPredicates = new MapMaker().makeMap();
|
||||
private final Predicate<DataSegment> defaultFilter;
|
||||
|
||||
@Inject
|
||||
public SingleServerInventoryView(
|
||||
final ZkPathsConfig zkPaths,
|
||||
final CuratorFramework curator,
|
||||
final ObjectMapper jsonMapper,
|
||||
final Predicate<DataSegment> defaultFilter
|
||||
final ObjectMapper jsonMapper
|
||||
)
|
||||
{
|
||||
super(
|
||||
|
@ -60,11 +48,10 @@ public class SingleServerInventoryView extends ServerInventoryView<DataSegment>
|
|||
zkPaths.getServedSegmentsPath(),
|
||||
curator,
|
||||
jsonMapper,
|
||||
new TypeReference<DataSegment>(){}
|
||||
new TypeReference<DataSegment>()
|
||||
{
|
||||
}
|
||||
);
|
||||
|
||||
Preconditions.checkNotNull(defaultFilter);
|
||||
this.defaultFilter = defaultFilter;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -72,10 +59,7 @@ public class SingleServerInventoryView extends ServerInventoryView<DataSegment>
|
|||
DruidServer container, String inventoryKey, DataSegment inventory
|
||||
)
|
||||
{
|
||||
Predicate<DataSegment> predicate = Predicates.or(defaultFilter, Predicates.or(segmentPredicates.values()));
|
||||
if(predicate.apply(inventory)) {
|
||||
addSingleInventory(container, inventory);
|
||||
}
|
||||
addSingleInventory(container, inventory);
|
||||
return container;
|
||||
}
|
||||
|
||||
|
@ -93,58 +77,4 @@ public class SingleServerInventoryView extends ServerInventoryView<DataSegment>
|
|||
removeSingleInventory(container, inventoryKey);
|
||||
return container;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void registerSegmentCallback(
|
||||
final Executor exec, final SegmentCallback callback, final Predicate<DataSegment> filter
|
||||
)
|
||||
{
|
||||
segmentPredicates.put(callback, filter);
|
||||
registerSegmentCallback(
|
||||
exec, new SegmentCallback()
|
||||
{
|
||||
@Override
|
||||
public CallbackAction segmentAdded(
|
||||
DruidServerMetadata server, DataSegment segment
|
||||
)
|
||||
{
|
||||
final CallbackAction action;
|
||||
if(filter.apply(segment)) {
|
||||
action = callback.segmentAdded(server, segment);
|
||||
if (action.equals(CallbackAction.UNREGISTER)) {
|
||||
segmentPredicates.remove(callback);
|
||||
}
|
||||
} else {
|
||||
action = CallbackAction.CONTINUE;
|
||||
}
|
||||
return action;
|
||||
}
|
||||
|
||||
@Override
|
||||
public CallbackAction segmentRemoved(
|
||||
DruidServerMetadata server, DataSegment segment
|
||||
)
|
||||
{
|
||||
{
|
||||
final CallbackAction action;
|
||||
if(filter.apply(segment)) {
|
||||
action = callback.segmentRemoved(server, segment);
|
||||
if (action.equals(CallbackAction.UNREGISTER)) {
|
||||
segmentPredicates.remove(callback);
|
||||
}
|
||||
} else {
|
||||
action = CallbackAction.CONTINUE;
|
||||
}
|
||||
return action;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public CallbackAction segmentViewInitialized()
|
||||
{
|
||||
return callback.segmentViewInitialized();
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -21,8 +21,6 @@ package io.druid.guice;
|
|||
|
||||
import com.google.inject.Binder;
|
||||
import com.google.inject.Module;
|
||||
import io.druid.client.FilteredServerView;
|
||||
import io.druid.client.FilteredServerViewProvider;
|
||||
import io.druid.client.InventoryView;
|
||||
import io.druid.client.ServerInventoryView;
|
||||
import io.druid.client.ServerInventoryViewProvider;
|
||||
|
@ -36,10 +34,8 @@ public class ServerViewModule implements Module
|
|||
public void configure(Binder binder)
|
||||
{
|
||||
JsonConfigProvider.bind(binder, "druid.announcer", ServerInventoryViewProvider.class);
|
||||
JsonConfigProvider.bind(binder, "druid.announcer", FilteredServerViewProvider.class);
|
||||
binder.bind(InventoryView.class).to(ServerInventoryView.class);
|
||||
binder.bind(ServerView.class).to(ServerInventoryView.class);
|
||||
binder.bind(ServerInventoryView.class).toProvider(ServerInventoryViewProvider.class).in(ManageLifecycle.class);
|
||||
binder.bind(FilteredServerView.class).toProvider(FilteredServerViewProvider.class).in(ManageLifecycle.class);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -24,7 +24,6 @@ import com.fasterxml.jackson.annotation.JsonCreator;
|
|||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.metamx.emitter.service.ServiceEmitter;
|
||||
import io.druid.client.FilteredServerView;
|
||||
import io.druid.client.cache.Cache;
|
||||
import io.druid.client.cache.CacheConfig;
|
||||
import io.druid.guice.annotations.Processing;
|
||||
|
|
|
@ -319,8 +319,7 @@ public class BrokerServerViewTest extends CuratorTestBase
|
|||
baseView = new BatchServerInventoryView(
|
||||
zkPathsConfig,
|
||||
curator,
|
||||
jsonMapper,
|
||||
Predicates.<DataSegment>alwaysTrue()
|
||||
jsonMapper
|
||||
)
|
||||
{
|
||||
@Override
|
||||
|
|
|
@ -306,8 +306,7 @@ public class CoordinatorServerViewTest extends CuratorTestBase
|
|||
baseView = new BatchServerInventoryView(
|
||||
zkPathsConfig,
|
||||
curator,
|
||||
jsonMapper,
|
||||
Predicates.<DataSegment>alwaysTrue()
|
||||
jsonMapper
|
||||
)
|
||||
{
|
||||
@Override
|
||||
|
|
|
@ -20,8 +20,6 @@
|
|||
package io.druid.client.client;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.base.Predicate;
|
||||
import com.google.common.base.Predicates;
|
||||
import com.google.common.base.Stopwatch;
|
||||
import com.google.common.base.Throwables;
|
||||
import com.google.common.collect.Iterables;
|
||||
|
@ -33,7 +31,6 @@ import com.google.common.util.concurrent.MoreExecutors;
|
|||
import com.metamx.common.ISE;
|
||||
import io.druid.client.BatchServerInventoryView;
|
||||
import io.druid.client.DruidServer;
|
||||
import io.druid.client.ServerView;
|
||||
import io.druid.curator.PotentiallyGzippedCompressionProvider;
|
||||
import io.druid.curator.announcement.Announcer;
|
||||
import io.druid.jackson.DefaultObjectMapper;
|
||||
|
@ -47,9 +44,6 @@ import org.apache.curator.framework.CuratorFrameworkFactory;
|
|||
import org.apache.curator.retry.ExponentialBackoffRetry;
|
||||
import org.apache.curator.test.TestingCluster;
|
||||
import org.apache.curator.test.Timing;
|
||||
import org.easymock.EasyMock;
|
||||
import org.easymock.IAnswer;
|
||||
import org.easymock.LogicalOperator;
|
||||
import org.joda.time.DateTime;
|
||||
import org.joda.time.Interval;
|
||||
import org.junit.After;
|
||||
|
@ -59,9 +53,7 @@ import org.junit.Rule;
|
|||
import org.junit.Test;
|
||||
import org.junit.rules.ExpectedException;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Comparator;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.Callable;
|
||||
|
@ -159,8 +151,7 @@ public class BatchServerInventoryViewTest
|
|||
}
|
||||
},
|
||||
cf,
|
||||
jsonMapper,
|
||||
Predicates.<DataSegment>alwaysTrue()
|
||||
jsonMapper
|
||||
);
|
||||
|
||||
batchServerInventoryView.start();
|
||||
|
@ -175,16 +166,9 @@ public class BatchServerInventoryViewTest
|
|||
}
|
||||
},
|
||||
cf,
|
||||
jsonMapper,
|
||||
new Predicate<DataSegment>()
|
||||
{
|
||||
@Override
|
||||
public boolean apply(@Nullable DataSegment dataSegment)
|
||||
{
|
||||
return dataSegment.getInterval().getStart().isBefore(SEGMENT_INTERVAL_START.plusDays(INITIAL_SEGMENTS));
|
||||
}
|
||||
}
|
||||
){
|
||||
jsonMapper
|
||||
)
|
||||
{
|
||||
@Override
|
||||
protected DruidServer addInnerInventory(
|
||||
DruidServer container, String inventoryKey, Set<DataSegment> inventory
|
||||
|
@ -243,122 +227,6 @@ public class BatchServerInventoryViewTest
|
|||
Assert.assertEquals(testSegments, Sets.newHashSet(server.getSegments().values()));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRunWithFilter() throws Exception
|
||||
{
|
||||
segmentAnnouncer.announceSegments(testSegments);
|
||||
|
||||
waitForSync(filteredBatchServerInventoryView, testSegments);
|
||||
|
||||
DruidServer server = Iterables.get(filteredBatchServerInventoryView.getInventory(), 0);
|
||||
Set<DataSegment> segments = Sets.newHashSet(server.getSegments().values());
|
||||
|
||||
Assert.assertEquals(testSegments, segments);
|
||||
int prevUpdateCount = inventoryUpdateCounter.get();
|
||||
// segment outside the range of default filter
|
||||
DataSegment segment1 = makeSegment(101);
|
||||
segmentAnnouncer.announceSegment(segment1);
|
||||
testSegments.add(segment1);
|
||||
|
||||
waitForUpdateEvents(prevUpdateCount + 1);
|
||||
Assert.assertNull(
|
||||
Iterables.getOnlyElement(filteredBatchServerInventoryView.getInventory())
|
||||
.getSegment(segment1.getIdentifier())
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRunWithFilterCallback() throws Exception
|
||||
{
|
||||
final CountDownLatch removeCallbackLatch = new CountDownLatch(1);
|
||||
|
||||
segmentAnnouncer.announceSegments(testSegments);
|
||||
|
||||
waitForSync(filteredBatchServerInventoryView, testSegments);
|
||||
|
||||
DruidServer server = Iterables.get(filteredBatchServerInventoryView.getInventory(), 0);
|
||||
Set<DataSegment> segments = Sets.newHashSet(server.getSegments().values());
|
||||
|
||||
Assert.assertEquals(testSegments, segments);
|
||||
|
||||
ServerView.SegmentCallback callback = EasyMock.createStrictMock(ServerView.SegmentCallback.class);
|
||||
Comparator<DataSegment> dataSegmentComparator = new Comparator<DataSegment>()
|
||||
{
|
||||
@Override
|
||||
public int compare(DataSegment o1, DataSegment o2)
|
||||
{
|
||||
return o1.getInterval().equals(o2.getInterval()) ? 0 : -1;
|
||||
}
|
||||
};
|
||||
|
||||
EasyMock
|
||||
.expect(
|
||||
callback.segmentAdded(
|
||||
EasyMock.<DruidServerMetadata>anyObject(),
|
||||
EasyMock.cmp(makeSegment(INITIAL_SEGMENTS + 2), dataSegmentComparator, LogicalOperator.EQUAL)
|
||||
)
|
||||
)
|
||||
.andReturn(ServerView.CallbackAction.CONTINUE)
|
||||
.times(1);
|
||||
|
||||
EasyMock
|
||||
.expect(
|
||||
callback.segmentRemoved(
|
||||
EasyMock.<DruidServerMetadata>anyObject(),
|
||||
EasyMock.cmp(makeSegment(INITIAL_SEGMENTS + 2), dataSegmentComparator, LogicalOperator.EQUAL)
|
||||
)
|
||||
)
|
||||
.andAnswer(
|
||||
new IAnswer<ServerView.CallbackAction>()
|
||||
{
|
||||
@Override
|
||||
public ServerView.CallbackAction answer() throws Throwable
|
||||
{
|
||||
removeCallbackLatch.countDown();
|
||||
return ServerView.CallbackAction.CONTINUE;
|
||||
}
|
||||
}
|
||||
)
|
||||
.times(1);
|
||||
|
||||
|
||||
EasyMock.replay(callback);
|
||||
|
||||
filteredBatchServerInventoryView.registerSegmentCallback(
|
||||
MoreExecutors.sameThreadExecutor(),
|
||||
callback,
|
||||
new Predicate<DataSegment>()
|
||||
{
|
||||
@Override
|
||||
public boolean apply(@Nullable DataSegment dataSegment)
|
||||
{
|
||||
return dataSegment.getInterval().getStart().equals(SEGMENT_INTERVAL_START.plusDays(INITIAL_SEGMENTS + 2));
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
DataSegment segment2 = makeSegment(INITIAL_SEGMENTS + 2);
|
||||
segmentAnnouncer.announceSegment(segment2);
|
||||
testSegments.add(segment2);
|
||||
|
||||
DataSegment oldSegment = makeSegment(-1);
|
||||
segmentAnnouncer.announceSegment(oldSegment);
|
||||
testSegments.add(oldSegment);
|
||||
|
||||
segmentAnnouncer.unannounceSegment(oldSegment);
|
||||
testSegments.remove(oldSegment);
|
||||
|
||||
waitForSync(filteredBatchServerInventoryView, testSegments);
|
||||
|
||||
segmentAnnouncer.unannounceSegment(segment2);
|
||||
testSegments.remove(segment2);
|
||||
|
||||
waitForSync(filteredBatchServerInventoryView, testSegments);
|
||||
timing.forWaiting().awaitLatch(removeCallbackLatch);
|
||||
|
||||
EasyMock.verify(callback);
|
||||
}
|
||||
|
||||
private DataSegment makeSegment(int offset)
|
||||
{
|
||||
return DataSegment.builder()
|
||||
|
@ -395,7 +263,11 @@ public class BatchServerInventoryViewTest
|
|||
while (inventoryUpdateCounter.get() != count) {
|
||||
Thread.sleep(100);
|
||||
if (stopwatch.elapsed(TimeUnit.MILLISECONDS) > forWaitingTiming.milliseconds()) {
|
||||
throw new ISE("BatchServerInventoryView is not updating counter expected[%d] value[%d]", count, inventoryUpdateCounter.get());
|
||||
throw new ISE(
|
||||
"BatchServerInventoryView is not updating counter expected[%d] value[%d]",
|
||||
count,
|
||||
inventoryUpdateCounter.get()
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -459,7 +331,7 @@ public class BatchServerInventoryViewTest
|
|||
List<DataSegment> segments = new ArrayList<DataSegment>();
|
||||
try {
|
||||
for (int j = 0; j < INITIAL_SEGMENTS / numThreads; ++j) {
|
||||
segments.add(makeSegment(INITIAL_SEGMENTS + ii + numThreads * j));
|
||||
segments.add(makeSegment(INITIAL_SEGMENTS + ii + numThreads * j));
|
||||
}
|
||||
latch.countDown();
|
||||
latch.await();
|
||||
|
|
|
@ -20,7 +20,6 @@
|
|||
package io.druid.segment.realtime.plumber;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.base.Predicate;
|
||||
import com.google.common.base.Suppliers;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
|
@ -28,8 +27,6 @@ import com.google.common.io.Files;
|
|||
import com.google.common.util.concurrent.MoreExecutors;
|
||||
import com.metamx.common.Granularity;
|
||||
import com.metamx.emitter.service.ServiceEmitter;
|
||||
import io.druid.client.FilteredServerView;
|
||||
import io.druid.client.ServerView;
|
||||
import io.druid.client.cache.MapCache;
|
||||
import io.druid.data.input.Committer;
|
||||
import io.druid.data.input.InputRow;
|
||||
|
|
Loading…
Reference in New Issue